Files
assist/src/agent_assistant.py

1259 lines
50 KiB
Python
Raw Normal View History

2025-09-06 21:06:18 +08:00
# -*- coding: utf-8 -*-
"""
增强版TSP助手 - 集成Agent功能
重构版本模块化设计降低代码复杂度
2025-09-06 21:06:18 +08:00
"""
import logging
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime
from src.agent.agent_assistant_core import TSPAgentAssistantCore
from src.agent.agent_message_handler import AgentMessageHandler
from src.agent.agent_sample_actions import AgentSampleActions
2025-09-06 21:06:18 +08:00
logger = logging.getLogger(__name__)
class TSPAgentAssistant(TSPAgentAssistantCore):
"""TSP Agent助手 - 重构版本"""
2025-09-06 21:06:18 +08:00
def __init__(self, llm_config=None):
# 初始化核心功能
super().__init__(llm_config)
2025-09-06 21:06:18 +08:00
# 初始化消息处理器
self.message_handler = AgentMessageHandler(self)
2025-09-06 21:06:18 +08:00
# 初始化示例动作处理器
self.sample_actions = AgentSampleActions(self)
logger.info("TSP Agent助手初始化完成重构版本")
2025-09-06 21:06:18 +08:00
async def process_message_agent(
self,
message: str,
user_id: str = None,
work_order_id: int = None,
enable_proactive: bool = True
) -> Dict[str, Any]:
"""Agent模式处理用户消息"""
try:
# 构建请求
request = {
"message": message,
"user_id": user_id,
"work_order_id": work_order_id,
"context": {
"session_id": f"session_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"timestamp": datetime.now().isoformat()
}
}
# 使用Agent核心处理请求
agent_result = await self.agent_core.process_request(request)
# 如果启用主动模式,检查是否需要主动行动
if enable_proactive:
proactive_result = await self.agent_core.proactive_action()
if proactive_result:
agent_result["proactive_action"] = proactive_result
# 记录Agent执行信息
agent_result["agent_mode"] = True
agent_result["agent_status"] = self.agent_core.get_status()
return agent_result
except Exception as e:
logger.error(f"Agent模式处理消息失败: {e}")
# 回退到传统模式
return await self._fallback_to_traditional_mode(message, user_id, work_order_id)
async def _fallback_to_traditional_mode(
self,
message: str,
user_id: str = None,
work_order_id: int = None
) -> Dict[str, Any]:
"""回退到传统模式"""
logger.info("回退到传统TSP助手模式")
# 使用原有的处理方式
result = self.process_message(message, user_id, work_order_id)
# 添加Agent标识
result["agent_mode"] = False
result["fallback_reason"] = "Agent处理失败使用传统模式"
return result
async def create_intelligent_work_order(
self,
user_message: str,
user_id: str = None,
auto_categorize: bool = True,
auto_priority: bool = True
) -> Dict[str, Any]:
"""智能创建工单 - 使用Agent能力"""
try:
# 使用Agent分析用户消息
request = {
"message": user_message,
"user_id": user_id,
"context": {"action": "create_work_order"}
}
agent_result = await self.agent_core.process_request(request)
if "error" in agent_result:
return agent_result
# 从Agent结果中提取工单信息
work_order_info = self._extract_work_order_info(agent_result, user_message)
# 创建工单
work_order = self.create_work_order(
title=work_order_info["title"],
description=work_order_info["description"],
category=work_order_info["category"],
priority=work_order_info["priority"]
)
# 添加Agent分析结果
work_order["agent_analysis"] = agent_result
work_order["intelligent_features"] = {
"auto_categorized": auto_categorize,
"auto_prioritized": auto_priority,
"confidence_score": work_order_info.get("confidence", 0.8)
}
return work_order
except Exception as e:
logger.error(f"智能创建工单失败: {e}")
return {"error": f"智能创建失败: {str(e)}"}
def _extract_work_order_info(self, agent_result: Dict[str, Any], user_message: str) -> Dict[str, Any]:
"""从Agent结果中提取工单信息"""
# 这里可以根据Agent的分析结果智能提取工单信息
# 暂时使用简单的提取逻辑
# 使用LLM提取关键信息
from src.core.llm_client import QwenClient
llm_client = QwenClient()
2025-09-06 21:06:18 +08:00
prompt = f"""
请从以下用户消息中提取工单信息
用户消息: {user_message}
请提取
1. 工单标题简洁明了
2. 问题描述详细描述
3. 问题类别技术问题账户问题服务问题等
4. 优先级highmediumlow
5. 置信度0-1
请以JSON格式返回
"""
messages = [
{"role": "system", "content": "你是一个工单信息提取专家,擅长从用户消息中提取关键信息。"},
{"role": "user", "content": prompt}
]
result = llm_client.chat_completion(messages, temperature=0.3)
if "error" in result:
# 使用默认值
return {
"title": "用户问题",
"description": user_message,
"category": "技术问题",
"priority": "medium",
"confidence": 0.5
}
try:
response_content = result["choices"][0]["message"]["content"]
import re
json_match = re.search(r'\{.*\}', response_content, re.DOTALL)
if json_match:
extracted_info = json.loads(json_match.group())
return {
"title": extracted_info.get("title", "用户问题"),
"description": extracted_info.get("description", user_message),
"category": extracted_info.get("category", "技术问题"),
"priority": extracted_info.get("priority", "medium"),
"confidence": extracted_info.get("confidence", 0.7)
}
else:
return {
"title": "用户问题",
"description": user_message,
"category": "技术问题",
"priority": "medium",
"confidence": 0.5
}
except Exception as e:
logger.error(f"提取工单信息失败: {e}")
return {
"title": "用户问题",
"description": user_message,
"category": "技术问题",
"priority": "medium",
"confidence": 0.5
}
async def intelligent_knowledge_search(
self,
query: str,
context: Dict[str, Any] = None,
use_reasoning: bool = True
) -> Dict[str, Any]:
"""智能知识库搜索 - 使用推理能力"""
try:
# 基础搜索
basic_results = self.search_knowledge(query)
if not use_reasoning:
return basic_results
# 使用推理引擎增强搜索
reasoning_result = await self.agent_core.reasoning_engine.reason_about_problem(
problem=f"搜索知识: {query}",
available_information={
"search_results": basic_results.get("results", []),
"context": context or {}
},
reasoning_type="inductive"
)
# 结合搜索结果和推理结果
enhanced_results = {
"basic_search": basic_results,
"reasoning_analysis": reasoning_result,
"enhanced_results": self._enhance_search_results(
basic_results.get("results", []),
reasoning_result
),
"search_strategy": "intelligent_with_reasoning"
}
return enhanced_results
except Exception as e:
logger.error(f"智能知识搜索失败: {e}")
return self.search_knowledge(query)
def _enhance_search_results(
self,
basic_results: List[Dict[str, Any]],
reasoning_result: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""增强搜索结果"""
enhanced_results = []
for result in basic_results:
enhanced_result = result.copy()
# 添加推理增强信息
if "analysis" in reasoning_result:
enhanced_result["reasoning_insights"] = reasoning_result["analysis"]
# 计算增强置信度
original_confidence = result.get("confidence_score", 0.5)
reasoning_confidence = reasoning_result.get("confidence", 0.5)
enhanced_result["enhanced_confidence"] = (original_confidence + reasoning_confidence) / 2
enhanced_results.append(enhanced_result)
return enhanced_results
async def proactive_monitoring(self) -> Dict[str, Any]:
"""主动监控 - Agent主动检查系统状态"""
try:
proactive_actions = []
# 检查预警
alerts = self.get_alerts()
if alerts.get("count", 0) > 0:
# 创建预警上下文
alert_context = AlertContext(
alert_id=f"alert_{datetime.now().timestamp()}",
alert_type="system_alert",
severity="high",
description=f"发现 {alerts['count']} 个活跃预警",
affected_systems=["main_system"],
metrics={"alert_count": alerts['count']}
)
# 使用智能Agent处理预警
alert_actions = await self.intelligent_agent.process_alert(alert_context)
for action in alert_actions:
# 执行动作
result = await self.action_executor.execute_action(action)
proactive_actions.append({
"type": "alert_response",
"description": action.description,
"priority": action.priority,
"confidence": action.confidence,
"result": result
})
2025-09-06 21:06:18 +08:00
# 检查系统健康
system_status = self.get_system_status()
if system_status.get("health_score", 1.0) < 0.8:
# 创建系统健康预警上下文
health_alert_context = AlertContext(
alert_id=f"health_{datetime.now().timestamp()}",
alert_type="system_health",
severity="medium",
description="系统健康状态不佳",
affected_systems=["main_system"],
metrics={"health_score": system_status.get("health_score", 0.5)}
)
health_actions = await self.intelligent_agent.process_alert(health_alert_context)
for action in health_actions:
result = await self.action_executor.execute_action(action)
proactive_actions.append({
"type": "system_maintenance",
"description": action.description,
"priority": action.priority,
"confidence": action.confidence,
"result": result
})
2025-09-06 21:06:18 +08:00
# 检查知识库质量
knowledge_stats = self.knowledge_manager.get_knowledge_stats()
if knowledge_stats.get("average_confidence", 0.8) < 0.6:
# 处理低置信度知识
low_confidence_items = knowledge_stats.get("low_confidence_items", [])
for item in low_confidence_items:
knowledge_context = KnowledgeContext(
question=item.get("question", ""),
answer=item.get("answer", ""),
confidence=item.get("confidence", 0.3),
source=item.get("source", "unknown"),
category=item.get("category", "general")
)
# 使用智能Agent处理知识库置信度
knowledge_actions = await self.intelligent_agent.process_knowledge_confidence(knowledge_context)
for action in knowledge_actions:
result = await self.action_executor.execute_action(action)
proactive_actions.append({
"type": "knowledge_improvement",
"description": action.description,
"priority": action.priority,
"confidence": action.confidence,
"result": result
})
2025-09-06 21:06:18 +08:00
return {
"proactive_actions": proactive_actions,
"timestamp": datetime.now().isoformat(),
"agent_status": self.agent_core.get_status(),
"llm_usage": self.llm_manager.get_usage_stats()
2025-09-06 21:06:18 +08:00
}
except Exception as e:
logger.error(f"主动监控失败: {e}")
return {"error": str(e)}
async def intelligent_analytics(
self,
analysis_type: str = "comprehensive",
date_range: str = "last_7_days"
) -> Dict[str, Any]:
"""智能分析 - 使用Agent推理能力"""
try:
# 基础分析
basic_analytics = self.generate_analytics(date_range)
if analysis_type == "basic":
return basic_analytics
# 使用Agent进行深度分析
analysis_request = {
"message": f"分析{date_range}的数据",
"context": {
"analysis_type": analysis_type,
"basic_data": basic_analytics
}
}
agent_analysis = await self.agent_core.process_request(analysis_request)
# 结合基础分析和Agent分析
intelligent_analytics = {
"basic_analytics": basic_analytics,
"agent_insights": agent_analysis,
"intelligent_recommendations": self._generate_recommendations(
basic_analytics,
agent_analysis
),
"analysis_confidence": self._calculate_analysis_confidence(agent_analysis)
}
return intelligent_analytics
except Exception as e:
logger.error(f"智能分析失败: {e}")
return self.generate_analytics(date_range)
def _generate_recommendations(
self,
basic_analytics: Dict[str, Any],
agent_analysis: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""生成智能推荐"""
recommendations = []
# 基于基础分析生成推荐
if basic_analytics.get("summary", {}).get("avg_satisfaction", 0) < 0.7:
recommendations.append({
"type": "improvement",
"title": "提升客户满意度",
"description": "客户满意度较低,建议优化服务质量",
"priority": "high",
"action_items": [
"分析低满意度工单",
"改进响应时间",
"提升解决方案质量"
]
})
if basic_analytics.get("summary", {}).get("avg_resolution_time_hours", 0) > 24:
recommendations.append({
"type": "efficiency",
"title": "缩短解决时间",
"description": "平均解决时间过长,建议提升处理效率",
"priority": "medium",
"action_items": [
"优化工作流程",
"增加自动化处理",
"提升知识库质量"
]
})
return recommendations
def _calculate_analysis_confidence(self, agent_analysis: Dict[str, Any]) -> float:
"""计算分析置信度"""
# 基于Agent分析结果计算置信度
if "error" in agent_analysis:
return 0.3
# 这里可以实现更复杂的置信度计算逻辑
return 0.8
def get_agent_status(self) -> Dict[str, Any]:
"""获取Agent状态"""
2025-09-08 15:27:22 +08:00
try:
# 获取自动监控状态
monitor_status = self.auto_monitor.get_monitoring_status()
2025-09-08 15:27:22 +08:00
return {
"success": True,
"agent_mode": self.is_agent_mode,
"monitoring_active": monitor_status["is_running"],
2025-09-08 15:27:22 +08:00
"status": "active" if self.is_agent_mode else "inactive",
"active_goals": 0, # 简化处理
"available_tools": 8, # 增加工具数量
"llm_usage": self.llm_manager.get_usage_stats(),
"action_executor_stats": self.action_executor.get_action_statistics(),
2025-09-08 15:27:22 +08:00
"tools": [
{"name": "search_knowledge", "usage_count": 0, "success_rate": 0.8},
{"name": "create_work_order", "usage_count": 0, "success_rate": 0.8},
{"name": "update_work_order", "usage_count": 0, "success_rate": 0.8},
{"name": "generate_response", "usage_count": 0, "success_rate": 0.8},
{"name": "analyze_data", "usage_count": 0, "success_rate": 0.8},
{"name": "send_notification", "usage_count": 0, "success_rate": 0.8},
{"name": "process_alert", "usage_count": 0, "success_rate": 0.9},
{"name": "enhance_knowledge", "usage_count": 0, "success_rate": 0.7}
2025-09-08 15:27:22 +08:00
],
"execution_history": self.action_executor.get_execution_history(10),
"auto_monitor": monitor_status
2025-09-08 15:27:22 +08:00
}
except Exception as e:
logger.error(f"获取Agent状态失败: {e}")
return {
"success": False,
"error": str(e),
"agent_mode": False,
"monitoring_active": False,
"status": "error"
}
2025-09-06 21:06:18 +08:00
def toggle_agent_mode(self, enabled: bool) -> bool:
"""切换Agent模式"""
try:
if enabled:
# 同步方式切换
self.is_agent_mode = True
logger.info("已切换到Agent模式")
return True
else:
return self.switch_to_traditional_mode()
except Exception as e:
logger.error(f"切换Agent模式失败: {e}")
return False
def start_proactive_monitoring(self) -> bool:
"""启动主动监控"""
try:
2025-09-08 15:27:22 +08:00
# 启动基础监控
self.start_monitoring()
# 启动自动监控服务
success = self.auto_monitor.start_auto_monitoring()
if success:
logger.info("主动监控已启动")
return True
else:
logger.error("启动自动监控服务失败")
return False
2025-09-06 21:06:18 +08:00
except Exception as e:
logger.error(f"启动主动监控失败: {e}")
return False
def stop_proactive_monitoring(self) -> bool:
"""停止主动监控"""
try:
2025-09-08 15:27:22 +08:00
# 停止基础监控
self.stop_monitoring()
# 停止自动监控服务
success = self.auto_monitor.stop_auto_monitoring()
if success:
logger.info("主动监控已停止")
return True
else:
logger.error("停止自动监控服务失败")
return False
2025-09-06 21:06:18 +08:00
except Exception as e:
logger.error(f"停止主动监控失败: {e}")
return False
2025-09-08 15:27:22 +08:00
def _start_monitoring_loop(self):
"""启动监控循环(同步版本)"""
try:
self._monitoring_active = True
logger.info("监控循环已启动")
except Exception as e:
logger.error(f"启动监控循环失败: {e}")
def _stop_monitoring_loop(self):
"""停止监控循环"""
try:
self._monitoring_active = False
logger.info("监控循环已停止")
except Exception as e:
logger.error(f"停止监控循环失败: {e}")
2025-09-06 21:06:18 +08:00
def run_proactive_monitoring(self) -> Dict[str, Any]:
"""运行主动监控"""
try:
# 模拟主动监控结果
proactive_actions = [
{"type": "alert_response", "description": "发现系统性能预警"},
{"type": "knowledge_update", "description": "建议更新知识库"},
{"type": "user_assistance", "description": "检测到用户可能需要帮助"}
]
return {
"success": True,
"proactive_actions": proactive_actions
}
except Exception as e:
logger.error(f"运行主动监控失败: {e}")
return {"success": False, "error": str(e)}
def run_intelligent_analysis(self) -> Dict[str, Any]:
"""运行智能分析"""
try:
from datetime import datetime, timedelta
from src.core.database import db_manager
from src.core.models import WorkOrder, Conversation
# 基于实际数据分析趋势
with db_manager.get_session() as session:
# 获取最近7天的数据
end_date = datetime.now()
start_date = end_date - timedelta(days=7)
# 工单数据
work_orders = session.query(WorkOrder).filter(
WorkOrder.created_at >= start_date,
WorkOrder.created_at <= end_date
).all()
# 对话数据
conversations = session.query(Conversation).filter(
Conversation.timestamp >= start_date,
Conversation.timestamp <= end_date
).all()
# 计算实际趋势数据
dates = []
satisfaction_scores = []
resolution_times = []
for i in range(7):
date = start_date + timedelta(days=i)
dates.append(date.strftime("%Y-%m-%d"))
# 计算当天的满意度
day_orders = [wo for wo in work_orders if wo.created_at.date() == date.date()]
day_satisfaction = [wo.satisfaction_score for wo in day_orders if wo.satisfaction_score]
avg_satisfaction = sum(day_satisfaction) / len(day_satisfaction) if day_satisfaction else 0
satisfaction_scores.append(round(avg_satisfaction, 2))
# 计算当天的解决时间
resolved_orders = [wo for wo in day_orders if wo.status == "resolved" and wo.updated_at]
day_resolution_times = []
for wo in resolved_orders:
resolution_time = (wo.updated_at - wo.created_at).total_seconds() / 3600
day_resolution_times.append(resolution_time)
avg_resolution_time = sum(day_resolution_times) / len(day_resolution_times) if day_resolution_times else 0
resolution_times.append(round(avg_resolution_time, 1))
# 基于实际数据生成建议
recommendations = []
# 满意度建议
avg_satisfaction = sum(satisfaction_scores) / len(satisfaction_scores) if satisfaction_scores else 0
if avg_satisfaction < 0.7:
recommendations.append({
"type": "improvement",
"title": "提升客户满意度",
"description": f"当前平均满意度{avg_satisfaction:.2f},建议优化服务质量"
})
# 解决时间建议
avg_resolution_time = sum(resolution_times) / len(resolution_times) if resolution_times else 0
if avg_resolution_time > 24:
recommendations.append({
"type": "optimization",
"title": "优化解决时间",
"description": f"当前平均解决时间{avg_resolution_time:.1f}小时,建议提升处理效率"
})
# 知识库建议
knowledge_hit_rate = len([c for c in conversations if c.knowledge_used]) / len(conversations) if conversations else 0
if knowledge_hit_rate < 0.5:
recommendations.append({
"type": "optimization",
"title": "知识库优化",
"description": f"知识库命中率{knowledge_hit_rate:.2f},建议增加更多技术问题解答"
})
analysis = {
"trends": {
"dates": dates,
"satisfaction": satisfaction_scores,
"resolution_time": resolution_times
},
"recommendations": recommendations,
"summary": {
"total_orders": len(work_orders),
"total_conversations": len(conversations),
"avg_satisfaction": round(avg_satisfaction, 2),
"avg_resolution_time": round(avg_resolution_time, 1),
"knowledge_hit_rate": round(knowledge_hit_rate, 2)
}
}
return analysis
2025-09-06 21:06:18 +08:00
except Exception as e:
logger.error(f"运行智能分析失败: {e}")
return {"error": str(e)}
def process_file_to_knowledge(self, file_path: str, filename: str) -> Dict[str, Any]:
"""处理文件并生成知识库"""
try:
import os
import mimetypes
# 检查文件类型
mime_type, _ = mimetypes.guess_type(file_path)
file_ext = os.path.splitext(filename)[1].lower()
# 读取文件内容
content = self._read_file_content(file_path, file_ext)
if not content:
return {"success": False, "error": "无法读取文件内容"}
# 使用LLM处理内容
knowledge_entries = self._extract_knowledge_from_content(content, filename)
# 保存到知识库
saved_count = 0
for i, entry in enumerate(knowledge_entries):
try:
logger.info(f"保存知识条目 {i+1}: {entry.get('question', '')[:50]}...")
success = self.knowledge_manager.add_knowledge_entry(
question=entry["question"],
answer=entry["answer"],
category=entry.get("category", "其他"),
confidence_score=entry.get("confidence_score", 0.7),
is_verified=False # 新添加的知识库条目默认为未验证
)
if success:
saved_count += 1
logger.info(f"知识条目 {i+1} 保存成功")
else:
logger.error(f"知识条目 {i+1} 保存失败")
except Exception as save_error:
logger.error(f"保存知识条目 {i+1} 时出错: {save_error}")
logger.error(f"条目内容: {entry}")
return {
"success": True,
"knowledge_count": saved_count,
"total_extracted": len(knowledge_entries),
"filename": filename
}
except Exception as e:
logger.error(f"处理文件失败: {e}")
return {"success": False, "error": str(e)}
def _read_file_content(self, file_path: str, file_ext: str) -> str:
"""读取文件内容"""
try:
if file_ext in ['.txt', '.md']:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
elif file_ext == '.pdf':
# 需要安装 PyPDF2 或 pdfplumber
try:
import PyPDF2
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return text
except ImportError:
return "PDF文件需要安装PyPDF2库"
elif file_ext in ['.doc', '.docx']:
# 需要安装 python-docx
try:
from docx import Document
doc = Document(file_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text
except ImportError:
return "Word文件需要安装python-docx库"
else:
return "不支持的文件格式"
except Exception as e:
logger.error(f"读取文件失败: {e}")
return ""
def _extract_knowledge_from_content(self, content: str, filename: str) -> List[Dict[str, Any]]:
"""从内容中提取知识,结合工单数据优化"""
2025-09-06 21:06:18 +08:00
try:
# 获取历史工单数据用于参考
workorder_data = self._get_workorder_insights()
# 构建增强的提示词
2025-09-06 21:06:18 +08:00
prompt = f"""
请从以下文档内容中提取问答对用于构建知识库请结合历史工单数据来优化提取结果
2025-09-06 21:06:18 +08:00
文档名称{filename}
文档内容
{content[:2000]}...
历史工单数据参考
{workorder_data}
2025-09-06 21:06:18 +08:00
请按照以下格式提取问答对
1. 问题具体的问题描述参考工单中的常见问题
2. 答案详细的答案内容结合工单处理经验
2025-09-06 21:06:18 +08:00
3. 分类问题所属类别技术问题APP功能远程控制车辆绑定其他
4. 置信度0-1之间的数值
5. 工单关联是否与历史工单相关
2025-09-06 21:06:18 +08:00
请提取3-5个最有价值的问答对优先提取与历史工单问题相关的问答对
2025-09-06 21:06:18 +08:00
返回格式为JSON数组例如
[
{{
"question": "如何远程启动车辆?",
"answer": "远程启动车辆需要满足以下条件1. 车辆处于P档 2. 手刹拉起 3. 车门已锁 4. 电池电量充足。如果仍然无法启动,请检查车辆是否处于可启动状态。",
2025-09-06 21:06:18 +08:00
"category": "远程控制",
"confidence_score": 0.9,
"workorder_related": true
2025-09-06 21:06:18 +08:00
}}
]
"""
# 调用LLM
response = self.llm_client.chat_completion(
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=2000
)
if response and 'choices' in response:
content_text = response['choices'][0]['message']['content']
logger.info(f"LLM响应内容: {content_text[:500]}...")
# 尝试解析JSON
try:
import json
# 提取JSON部分
start_idx = content_text.find('[')
end_idx = content_text.rfind(']') + 1
if start_idx != -1 and end_idx != 0:
json_str = content_text[start_idx:end_idx]
knowledge_entries = json.loads(json_str)
logger.info(f"成功解析JSON提取到 {len(knowledge_entries)} 条知识")
# 验证每个条目的字段
for i, entry in enumerate(knowledge_entries):
if not isinstance(entry, dict):
logger.error(f"条目 {i} 不是字典格式: {entry}")
continue
if 'question' not in entry:
logger.error(f"条目 {i} 缺少question字段: {entry}")
continue
if 'answer' not in entry:
logger.error(f"条目 {i} 缺少answer字段: {entry}")
continue
logger.info(f"条目 {i} 验证通过: {entry.get('question', '')[:50]}...")
return knowledge_entries
except Exception as json_error:
logger.warning(f"JSON解析失败: {json_error}")
logger.warning(f"原始内容: {content_text}")
# 如果JSON解析失败尝试手动解析
manual_entries = self._parse_knowledge_manually(content_text)
logger.info(f"手动解析提取到 {len(manual_entries)} 条知识")
return manual_entries
else:
logger.error("LLM响应格式错误")
logger.error(f"响应内容: {response}")
return []
except Exception as e:
logger.error(f"提取知识失败: {e}")
return []
def _get_workorder_insights(self) -> str:
"""获取工单数据洞察"""
try:
# 获取工单数据
workorders = self.get_workorders()
if not isinstance(workorders, list):
return "暂无工单数据"
# 分析工单数据
categories = {}
common_issues = []
resolutions = []
for workorder in workorders[:20]: # 取最近20个工单
category = workorder.get("category", "其他")
categories[category] = categories.get(category, 0) + 1
# 提取常见问题
title = workorder.get("title", "")
description = workorder.get("description", "")
if title and len(title) > 5:
common_issues.append(title)
# 提取解决方案(如果有)
resolution = workorder.get("resolution", "")
if resolution and len(resolution) > 10:
resolutions.append(resolution[:100]) # 截取前100字符
# 构建工单洞察文本
insights = f"""
工单统计
- 总工单数{len(workorders)}
- 问题分类分布{dict(list(categories.items())[:5])}
常见问题
{chr(10).join(common_issues[:10])}
解决方案示例
{chr(10).join(resolutions[:5])}
"""
return insights
except Exception as e:
logger.error(f"获取工单洞察失败: {e}")
return "获取工单数据失败"
def get_action_history(self, limit: int = 50) -> List[Dict[str, Any]]:
"""获取动作执行历史"""
try:
return self.action_executor.get_execution_history(limit)
except Exception as e:
logger.error(f"获取动作历史失败: {e}")
return []
def get_llm_usage_stats(self) -> Dict[str, Any]:
"""获取LLM使用统计"""
try:
return self.llm_manager.get_usage_stats()
except Exception as e:
logger.error(f"获取LLM使用统计失败: {e}")
return {}
async def process_alert_with_agent(self, alert_data: Dict[str, Any]) -> Dict[str, Any]:
"""使用Agent处理预警"""
try:
# 创建预警上下文
alert_context = AlertContext(
alert_id=alert_data.get("id", f"alert_{datetime.now().timestamp()}"),
alert_type=alert_data.get("type", "unknown"),
severity=alert_data.get("severity", "medium"),
description=alert_data.get("description", ""),
affected_systems=alert_data.get("affected_systems", []),
metrics=alert_data.get("metrics", {})
)
# 使用智能Agent处理预警
actions = await self.intelligent_agent.process_alert(alert_context)
# 执行动作
results = []
for action in actions:
result = await self.action_executor.execute_action(action)
results.append({
"action": action.description,
"priority": action.priority,
"confidence": action.confidence,
"result": result
})
return {
"success": True,
"alert_id": alert_context.alert_id,
"actions_taken": len(actions),
"results": results
}
except Exception as e:
logger.error(f"Agent处理预警失败: {e}")
return {"success": False, "error": str(e)}
async def enhance_knowledge_with_agent(self, knowledge_data: Dict[str, Any]) -> Dict[str, Any]:
"""使用Agent增强知识库"""
try:
# 创建知识上下文
knowledge_context = KnowledgeContext(
question=knowledge_data.get("question", ""),
answer=knowledge_data.get("answer", ""),
confidence=knowledge_data.get("confidence", 0.5),
source=knowledge_data.get("source", "unknown"),
category=knowledge_data.get("category", "general")
)
# 使用智能Agent处理知识库置信度
actions = await self.intelligent_agent.process_knowledge_confidence(knowledge_context)
# 执行动作
results = []
for action in actions:
result = await self.action_executor.execute_action(action)
results.append({
"action": action.description,
"priority": action.priority,
"confidence": action.confidence,
"result": result
})
return {
"success": True,
"question": knowledge_context.question,
"actions_taken": len(actions),
"results": results
}
except Exception as e:
logger.error(f"Agent增强知识库失败: {e}")
return {"success": False, "error": str(e)}
def _add_sample_execution_history(self):
"""添加示例执行历史"""
try:
from src.agent.intelligent_agent import AgentAction, ActionType
# 添加一些示例执行记录
sample_actions = [
AgentAction(
action_type=ActionType.ALERT_RESPONSE,
description="处理CPU使用率过高预警",
priority=5,
confidence=0.9,
parameters={"service": "main_service", "cpu_usage": "95%"},
estimated_time=30
),
AgentAction(
action_type=ActionType.KNOWLEDGE_UPDATE,
description="更新低置信度知识条目",
priority=3,
confidence=0.7,
parameters={"question": "如何重启服务", "enhanced_answer": "使用systemctl restart命令重启服务"},
estimated_time=60
),
AgentAction(
action_type=ActionType.WORKORDER_CREATE,
description="自动创建系统维护工单",
priority=4,
confidence=0.8,
parameters={"title": "系统性能优化", "category": "系统维护"},
estimated_time=120
),
AgentAction(
action_type=ActionType.SYSTEM_OPTIMIZE,
description="执行内存优化",
priority=3,
confidence=0.6,
parameters={"type": "memory", "target": "cache_cleanup"},
estimated_time=300
),
AgentAction(
action_type=ActionType.USER_NOTIFY,
description="通知管理员系统状态",
priority=2,
confidence=0.5,
parameters={"user_id": "admin", "message": "系统运行正常"},
estimated_time=10
)
]
# 模拟执行这些动作并记录历史
for i, action in enumerate(sample_actions):
execution_record = {
"action_id": f"{action.action_type.value}_{i+1}",
"action_type": action.action_type.value,
"description": action.description,
"priority": action.priority,
"confidence": action.confidence,
"start_time": (datetime.now().timestamp() - (len(sample_actions) - i) * 3600), # 模拟过去的时间
"end_time": (datetime.now().timestamp() - (len(sample_actions) - i) * 3600) + action.estimated_time,
"success": True,
"result": {
"success": True,
"message": f"{action.description}执行成功",
"execution_time": action.estimated_time
}
}
self.action_executor.execution_history.append(execution_record)
logger.info(f"已添加 {len(sample_actions)} 条示例执行历史")
except Exception as e:
logger.error(f"添加示例执行历史失败: {e}")
async def trigger_sample_actions(self) -> Dict[str, Any]:
"""触发示例动作用于演示Agent功能"""
try:
from src.agent.intelligent_agent import AgentAction, ActionType
# 创建示例动作
sample_action = AgentAction(
action_type=ActionType.ALERT_RESPONSE,
description="演示:处理系统预警",
priority=4,
confidence=0.8,
parameters={"alert_type": "demo", "severity": "medium"},
estimated_time=15
)
# 执行动作
result = await self.action_executor.execute_action(sample_action)
return {
"success": True,
"message": "示例动作已执行",
"action": sample_action.description,
"result": result,
"execution_history_count": len(self.action_executor.execution_history)
}
except Exception as e:
logger.error(f"触发示例动作失败: {e}")
return {"success": False, "error": str(e)}
def clear_execution_history(self) -> Dict[str, Any]:
"""清空执行历史"""
try:
count = len(self.action_executor.execution_history)
self.action_executor.execution_history.clear()
return {
"success": True,
"message": f"已清空 {count} 条执行历史"
}
except Exception as e:
logger.error(f"清空执行历史失败: {e}")
return {"success": False, "error": str(e)}
2025-09-06 21:06:18 +08:00
def _parse_knowledge_manually(self, content: str) -> List[Dict[str, Any]]:
"""手动解析知识内容"""
try:
entries = []
lines = content.split('\n')
current_entry = {}
for line in lines:
line = line.strip()
if not line:
continue
# 检查问题
if '问题' in line and ('' in line or ':' in line):
if current_entry and 'question' in current_entry:
entries.append(current_entry)
current_entry = {}
# 提取问题内容
if '' in line:
question = line.split('', 1)[1].strip()
else:
question = line.split(':', 1)[1].strip()
current_entry["question"] = question
# 检查答案
elif '答案' in line and ('' in line or ':' in line):
if '' in line:
answer = line.split('', 1)[1].strip()
else:
answer = line.split(':', 1)[1].strip()
current_entry["answer"] = answer
# 检查分类
elif '分类' in line and ('' in line or ':' in line):
if '' in line:
category = line.split('', 1)[1].strip()
else:
category = line.split(':', 1)[1].strip()
current_entry["category"] = category
# 检查置信度
elif '置信度' in line and ('' in line or ':' in line):
try:
if '' in line:
confidence_str = line.split('', 1)[1].strip()
else:
confidence_str = line.split(':', 1)[1].strip()
current_entry["confidence_score"] = float(confidence_str)
except:
current_entry["confidence_score"] = 0.7
# 添加最后一个条目
if current_entry and 'question' in current_entry and 'answer' in current_entry:
entries.append(current_entry)
# 确保每个条目都有必要的字段
for entry in entries:
if 'category' not in entry:
entry['category'] = '其他'
if 'confidence_score' not in entry:
entry['confidence_score'] = 0.7
logger.info(f"手动解析完成,提取到 {len(entries)} 条知识")
return entries
except Exception as e:
logger.error(f"手动解析知识失败: {e}")
return []
async def switch_to_agent_mode(self) -> bool:
"""切换到Agent模式"""
try:
self.is_agent_mode = True
logger.info("已切换到Agent模式")
return True
except Exception as e:
logger.error(f"切换到Agent模式失败: {e}")
return False
def switch_to_traditional_mode(self) -> bool:
"""切换到传统模式"""
try:
self.is_agent_mode = False
logger.info("已切换到传统模式")
return True
except Exception as e:
logger.error(f"切换到传统模式失败: {e}")
return False
async def start_agent_monitoring(self) -> bool:
"""启动Agent监控"""
try:
# 启动基础监控
self.start_monitoring()
# 启动Agent主动监控
asyncio.create_task(self._agent_monitoring_loop())
logger.info("Agent监控已启动")
return True
except Exception as e:
logger.error(f"启动Agent监控失败: {e}")
return False
async def _agent_monitoring_loop(self):
"""Agent监控循环"""
while True:
try:
# 每5分钟执行一次主动监控
await asyncio.sleep(300)
proactive_result = await self.proactive_monitoring()
if proactive_result.get("proactive_actions"):
logger.info(f"发现 {len(proactive_result['proactive_actions'])} 个主动行动机会")
# 这里可以实现自动处理逻辑
# 例如:自动发送通知、自动创建工单等
except Exception as e:
logger.error(f"Agent监控循环错误: {e}")
await asyncio.sleep(60) # 出错后等待1分钟再继续
# 使用示例
async def main():
"""主函数示例"""
# 创建Agent助手
agent_assistant = TSPAgentAssistant()
# 测试Agent功能
print("=== TSP Agent助手测试 ===")
# 测试Agent模式处理消息
response = await agent_assistant.process_message_agent(
message="我的账户无法登录,请帮助我解决这个问题",
user_id="user123"
)
print("Agent模式响应:", response)
# 测试智能工单创建
work_order = await agent_assistant.create_intelligent_work_order(
user_message="系统经常出现错误,影响正常使用",
user_id="user456"
)
print("智能工单创建:", work_order)
# 测试主动监控
monitoring_result = await agent_assistant.proactive_monitoring()
print("主动监控结果:", monitoring_result)
# 获取Agent状态
agent_status = agent_assistant.get_agent_status()
print("Agent状态:", agent_status)
if __name__ == "__main__":
asyncio.run(main())