Files
assist/src/agent/react_agent.py
Jeason 46b6a10730 feat: 租户级系统提示词 + 模块权限树
1. 系统提示词按租户配置:
   - 去掉所有硬编码的'奇瑞汽车'品牌名
   - realtime_chat 的 _build_chat_prompt 按 tenant_id 动态获取提示词
   - _generate_response 和 _generate_response_stream 都传递 tenant_id
   - 默认提示词为通用客服助手(不含品牌名)
   - 租户编辑弹窗新增系统提示词配置区

2. 模块权限树:
   - Tenant config 新增 modules 字段(14个模块的开关)
   - GET /api/tenants/my-modules 返回当前用户所属租户的模块权限
   - 前端 applyModulePermissions() 初始化时隐藏无权限的侧边栏标签
   - 租户编辑弹窗新增模块权限 checkbox 配置区
   - 默认全部模块开启,取消勾选即隐藏

3. 其他清理:
   - llm_client.py 通用提示词去掉品牌名
   - react_agent.py SYSTEM_PROMPT 去掉'车辆'限定
2026-04-02 23:06:59 +08:00

346 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
ReAct Agent - 基于 ReAct 模式的智能代理
用单次 LLM 调用 + 工具循环替代原有的多步流水线
"""
import logging
import json
import re
from typing import Dict, Any, List, Optional
from datetime import datetime
from src.agent.llm_client import LLMManager
from src.config.unified_config import get_config
logger = logging.getLogger(__name__)
# ── 工具定义(供 LLM 理解可用能力) ──────────────────────────
TOOL_DEFINITIONS = [
{
"name": "search_knowledge",
"description": "搜索知识库,根据关键词查找相关的问题和答案",
"parameters": {
"query": {"type": "string", "description": "搜索关键词", "required": True},
"top_k": {"type": "integer", "description": "返回结果数量默认3", "required": False}
}
},
{
"name": "add_knowledge",
"description": "向知识库添加新的问答条目",
"parameters": {
"question": {"type": "string", "description": "问题", "required": True},
"answer": {"type": "string", "description": "答案", "required": True},
"category": {"type": "string", "description": "分类", "required": False}
}
},
{
"name": "query_vehicle",
"description": "查询车辆信息支持按VIN码或车牌号查询",
"parameters": {
"vin": {"type": "string", "description": "VIN码", "required": False},
"plate_number": {"type": "string", "description": "车牌号", "required": False}
}
},
{
"name": "get_analytics",
"description": "获取系统数据分析报告,如每日统计、分类统计等",
"parameters": {
"report_type": {
"type": "string",
"description": "报告类型: daily_analytics / summary / category_performance",
"required": True
}
}
},
{
"name": "send_feishu_message",
"description": "通过飞书发送消息通知",
"parameters": {
"message": {"type": "string", "description": "消息内容", "required": True},
"chat_id": {"type": "string", "description": "飞书群聊ID可选", "required": False}
}
},
]
def _build_tools_prompt() -> str:
"""构建工具描述文本供 system prompt 使用"""
lines = []
for t in TOOL_DEFINITIONS:
params_desc = []
for pname, pinfo in t["parameters"].items():
req = "必填" if pinfo.get("required") else "可选"
params_desc.append(f" - {pname} ({pinfo['type']}, {req}): {pinfo['description']}")
lines.append(f"- {t['name']}: {t['description']}\n 参数:\n" + "\n".join(params_desc))
return "\n".join(lines)
SYSTEM_PROMPT = f"""你是 TSP 智能客服助手,帮助用户解决售后问题、查询知识库、管理客诉信息。
你可以使用以下工具来完成任务:
{_build_tools_prompt()}
## 回复规则
1. 如果你需要使用工具,请严格按以下 JSON 格式回复(不要包含其他内容):
```json
{{"tool": "工具名", "parameters": {{"参数名": "参数值"}}}}
```
2. 如果你不需要使用工具,可以直接用自然语言回复用户。
3. 每次只调用一个工具。
4. 根据工具返回的结果,综合生成最终回复。
5. 回复要简洁专业,使用中文。
"""
class ReactAgent:
"""基于 ReAct 模式的 Agent"""
MAX_TOOL_ROUNDS = 5 # 最多工具调用轮次,防止死循环
def __init__(self):
config = get_config()
self.llm = LLMManager(config.llm)
self._tool_handlers = self._register_tool_handlers()
self.execution_history: List[Dict[str, Any]] = []
logger.info("ReactAgent 初始化完成")
# ── 工具处理器注册 ──────────────────────────────────────
def _register_tool_handlers(self) -> Dict[str, Any]:
return {
"search_knowledge": self._tool_search_knowledge,
"add_knowledge": self._tool_add_knowledge,
"query_vehicle": self._tool_query_vehicle,
"get_analytics": self._tool_get_analytics,
"send_feishu_message": self._tool_send_feishu_message,
}
# ── 主处理入口 ──────────────────────────────────────────
async def chat(
self,
message: str,
user_id: str = "anonymous",
conversation_history: Optional[List[Dict[str, str]]] = None,
) -> Dict[str, Any]:
"""处理用户消息,返回最终回复"""
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
# 加入历史对话(最近 10 轮)
if conversation_history:
messages.extend(conversation_history[-10:])
messages.append({"role": "user", "content": message})
tool_calls_log = []
for round_idx in range(self.MAX_TOOL_ROUNDS):
# 调用 LLM
try:
response_text = await self.llm.chat(messages, temperature=0.3, max_tokens=2000)
except Exception as e:
logger.error(f"LLM 调用失败: {e}")
return self._error_response(str(e))
# 尝试解析工具调用
tool_call = self._parse_tool_call(response_text)
if tool_call is None:
# 没有工具调用 → 这是最终回复
self._record_execution(message, user_id, tool_calls_log, response_text)
return {
"success": True,
"response": response_text,
"tool_calls": tool_calls_log,
"rounds": round_idx + 1,
}
# 执行工具
tool_name = tool_call["tool"]
tool_params = tool_call.get("parameters", {})
logger.info(f"[Round {round_idx+1}] 调用工具: {tool_name}, 参数: {tool_params}")
tool_result = await self._execute_tool(tool_name, tool_params)
tool_calls_log.append({
"tool": tool_name,
"parameters": tool_params,
"result": tool_result,
"round": round_idx + 1,
})
# 把工具调用和结果加入对话上下文
messages.append({"role": "assistant", "content": response_text})
messages.append({
"role": "user",
"content": f"工具 `{tool_name}` 返回结果:\n```json\n{json.dumps(tool_result, ensure_ascii=False, default=str)}\n```\n请根据以上结果回复用户。"
})
# 超过最大轮次
self._record_execution(message, user_id, tool_calls_log, "[达到最大工具调用轮次]")
return {
"success": True,
"response": "抱歉,处理过程较复杂,请稍后重试或换个方式描述您的问题。",
"tool_calls": tool_calls_log,
"rounds": self.MAX_TOOL_ROUNDS,
}
# ── 工具调用解析 ──────────────────────────────────────
def _parse_tool_call(self, text: str) -> Optional[Dict[str, Any]]:
"""从 LLM 回复中解析工具调用 JSON"""
if not text:
return None
# 尝试从 ```json ... ``` 代码块中提取
code_block = re.search(r'```json\s*(\{.*?\})\s*```', text, re.DOTALL)
if code_block:
try:
data = json.loads(code_block.group(1))
if "tool" in data:
return data
except json.JSONDecodeError:
pass
# 尝试直接解析整段文本为 JSON
try:
data = json.loads(text.strip())
if isinstance(data, dict) and "tool" in data:
return data
except json.JSONDecodeError:
pass
# 尝试从文本中找到第一个 JSON 对象
json_match = re.search(r'\{[^{}]*"tool"\s*:\s*"[^"]+?"[^{}]*\}', text, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
if "tool" in data:
return data
except json.JSONDecodeError:
pass
return None
# ── 工具执行 ──────────────────────────────────────────
async def _execute_tool(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""执行指定工具"""
handler = self._tool_handlers.get(tool_name)
if not handler:
return {"error": f"未知工具: {tool_name}"}
try:
return await handler(**params)
except Exception as e:
logger.error(f"工具 {tool_name} 执行失败: {e}")
return {"error": str(e)}
# ── 具体工具实现 ──────────────────────────────────────
async def _tool_search_knowledge(self, query: str, top_k: int = 3, **kw) -> Dict[str, Any]:
"""搜索知识库"""
try:
from src.knowledge_base.knowledge_manager import KnowledgeManager
km = KnowledgeManager()
results = km.search_knowledge(query, top_k)
return {"results": results, "count": len(results)}
except Exception as e:
return {"error": str(e)}
async def _tool_add_knowledge(self, question: str, answer: str, category: str = "通用", **kw) -> Dict[str, Any]:
"""添加知识库条目"""
try:
from src.knowledge_base.knowledge_manager import KnowledgeManager
km = KnowledgeManager()
success = km.add_knowledge_entry(question=question, answer=answer, category=category)
return {"success": success}
except Exception as e:
return {"error": str(e)}
async def _tool_query_vehicle(self, vin: str = None, plate_number: str = None, **kw) -> Dict[str, Any]:
"""查询车辆信息"""
try:
from src.vehicle.vehicle_data_manager import VehicleDataManager
vm = VehicleDataManager()
if vin:
result = vm.get_latest_vehicle_data_by_vin(vin)
return {"vehicle_data": result} if result else {"error": "未找到该VIN的车辆数据"}
elif plate_number:
return {"error": "暂不支持按车牌号查询请使用VIN码"}
else:
return {"error": "请提供 VIN 码"}
except Exception as e:
return {"error": str(e)}
async def _tool_get_analytics(self, report_type: str = "summary", **kw) -> Dict[str, Any]:
"""获取分析报告"""
try:
from src.analytics.analytics_manager import AnalyticsManager
am = AnalyticsManager()
if report_type == "daily_analytics":
return am.generate_daily_analytics()
elif report_type == "summary":
return am.get_analytics_summary()
elif report_type == "category_performance":
return am.get_category_performance()
else:
return {"error": f"不支持的报告类型: {report_type}"}
except Exception as e:
return {"error": str(e)}
async def _tool_send_feishu_message(self, message: str, chat_id: str = None, **kw) -> Dict[str, Any]:
"""发送飞书消息"""
try:
from src.integrations.feishu_service import FeishuService
fs = FeishuService()
if not chat_id:
return {"error": "请提供飞书群聊 chat_id"}
success = fs.send_message(receive_id=chat_id, content=message, receive_id_type="chat_id")
return {"success": success}
except Exception as e:
return {"error": str(e)}
# ── 辅助方法 ──────────────────────────────────────────
def _record_execution(self, message: str, user_id: str, tool_calls: list, response: str):
"""记录执行历史"""
record = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"message": message,
"tool_calls": tool_calls,
"response": response[:500],
}
self.execution_history.append(record)
if len(self.execution_history) > 500:
self.execution_history = self.execution_history[-500:]
def _error_response(self, error_msg: str) -> Dict[str, Any]:
return {
"success": False,
"response": "抱歉,系统处理出现问题,请稍后重试。",
"error": error_msg,
"tool_calls": [],
"rounds": 0,
}
def get_tool_definitions(self) -> List[Dict[str, Any]]:
"""返回工具定义列表(供 API 展示)"""
return TOOL_DEFINITIONS
def get_execution_history(self, limit: int = 50) -> List[Dict[str, Any]]:
"""获取执行历史"""
return self.execution_history[-limit:]
def get_status(self) -> Dict[str, Any]:
"""获取 Agent 状态"""
return {
"status": "active",
"available_tools": [t["name"] for t in TOOL_DEFINITIONS],
"tool_count": len(TOOL_DEFINITIONS),
"history_count": len(self.execution_history),
"max_tool_rounds": self.MAX_TOOL_ROUNDS,
}