refactor: 清理不需要的代码文件,添加.gitignore,优化项目结构

This commit is contained in:
赵杰 Jie Zhao (雄狮汽车科技)
2025-09-16 17:05:50 +01:00
parent 9451945e08
commit 9ca36042e3
65 changed files with 3370 additions and 10809 deletions

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TSP助手预警管理Web应用
@@ -8,6 +8,7 @@ TSP助手预警管理Web应用
import sys
import os
import json
import logging
import pandas as pd
from datetime import datetime, timedelta
from openpyxl import Workbook
@@ -24,10 +25,25 @@ from src.agent_assistant import TSPAgentAssistant
from src.analytics.alert_system import AlertRule, AlertLevel, AlertType
from src.dialogue.realtime_chat import RealtimeChatManager
from src.vehicle.vehicle_data_manager import VehicleDataManager
from src.core.database import db_manager
from src.core.models import WorkOrder, Alert, Conversation, KnowledgeEntry, WorkOrderSuggestion
app = Flask(__name__)
CORS(app)
# 抑制 /api/health 的访问日志
werkzeug_logger = logging.getLogger('werkzeug')
class HealthLogFilter(logging.Filter):
def filter(self, record):
try:
msg = record.getMessage()
return '/api/health' not in msg
except Exception:
return True
werkzeug_logger.addFilter(HealthLogFilter())
# 配置上传文件夹
UPLOAD_FOLDER = 'uploads'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
@@ -39,6 +55,26 @@ agent_assistant = TSPAgentAssistant()
chat_manager = RealtimeChatManager()
vehicle_manager = VehicleDataManager()
# 工具函数:确保工单模板文件存在
def _ensure_workorder_template_file() -> str:
"""返回已有的模板xlsx路径不做动态生成避免运行时依赖问题"""
template_path = os.path.join(app.config['UPLOAD_FOLDER'], 'workorder_template.xlsx')
# 确保目录存在
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
if not os.path.exists(template_path):
# 如果运行目录不存在模板,尝试从项目根相对路径拷贝一次
repo_template = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '..', 'uploads', 'workorder_template.xlsx')
repo_template = os.path.abspath(repo_template)
try:
if os.path.exists(repo_template):
import shutil
shutil.copyfile(repo_template, template_path)
else:
raise FileNotFoundError('模板文件缺失uploads/workorder_template.xlsx')
except Exception as copy_err:
raise copy_err
return template_path
@app.route('/')
def index():
"""主页 - 综合管理平台"""
@@ -51,10 +87,28 @@ def alerts():
@app.route('/api/health')
def get_health():
"""获取系统健康状态"""
"""获取系统健康状态附加近1小时业务指标"""
try:
health = assistant.get_system_health()
return jsonify(health)
base = assistant.get_system_health() or {}
# 追加数据库近1小时指标
from datetime import datetime, timedelta
with db_manager.get_session() as session:
since = datetime.now() - timedelta(hours=1)
conv_count = session.query(Conversation).filter(Conversation.timestamp >= since).count()
resp_times = [c.response_time for c in session.query(Conversation).filter(Conversation.timestamp >= since).all() if c.response_time]
avg_resp = round(sum(resp_times)/len(resp_times), 2) if resp_times else 0
open_wos = session.query(WorkOrder).filter(WorkOrder.status == 'open').count()
levels = session.query(Alert.level).filter(Alert.is_active == True).all()
level_map = {}
for (lvl,) in levels:
level_map[lvl] = level_map.get(lvl, 0) + 1
base.update({
"throughput_1h": conv_count,
"avg_response_time_1h": avg_resp,
"open_workorders": open_wos,
"active_alerts_by_level": level_map
})
return jsonify(base)
except Exception as e:
return jsonify({"error": str(e)}), 500
@@ -571,94 +625,34 @@ def unverify_knowledge(knowledge_id):
# 工单相关API
@app.route('/api/workorders')
def get_workorders():
"""获取工单列表"""
"""获取工单列表(来自数据库)"""
try:
status_filter = request.args.get('status')
priority_filter = request.args.get('priority')
# 这里应该调用工单管理器的获取方法
workorders = [
{
"id": 1,
"title": "车辆无法远程启动",
"description": "用户反映APP中远程启动功能无法使用",
"category": "远程控制",
"priority": "high",
"status": "open",
"created_at": "2024-01-01T10:00:00Z"
},
{
"id": 2,
"title": "APP显示异常",
"description": "APP中车辆信息显示不正确",
"category": "APP功能",
"priority": "medium",
"status": "in_progress",
"created_at": "2024-01-01T11:00:00Z"
},
{
"id": 3,
"title": "蓝牙连接失败",
"description": "用户无法通过蓝牙连接车辆",
"category": "蓝牙功能",
"priority": "high",
"status": "open",
"created_at": "2024-01-01T12:00:00Z"
},
{
"id": 4,
"title": "车辆定位不准确",
"description": "APP中显示的车辆位置与实际位置不符",
"category": "定位功能",
"priority": "medium",
"status": "resolved",
"created_at": "2024-01-01T13:00:00Z"
},
{
"id": 5,
"title": "远程解锁失败",
"description": "用户无法通过APP远程解锁车辆",
"category": "远程控制",
"priority": "urgent",
"status": "open",
"created_at": "2024-01-01T14:00:00Z"
},
{
"id": 6,
"title": "APP闪退问题",
"description": "用户反映APP在使用过程中频繁闪退",
"category": "APP功能",
"priority": "high",
"status": "in_progress",
"created_at": "2024-01-01T15:00:00Z"
},
{
"id": 7,
"title": "车辆状态更新延迟",
"description": "车辆状态信息更新不及时,存在延迟",
"category": "数据同步",
"priority": "low",
"status": "open",
"created_at": "2024-01-01T16:00:00Z"
},
{
"id": 8,
"title": "用户认证失败",
"description": "部分用户无法正常登录APP",
"category": "用户认证",
"priority": "high",
"status": "resolved",
"created_at": "2024-01-01T17:00:00Z"
}
]
# 应用过滤
with db_manager.get_session() as session:
q = session.query(WorkOrder)
if status_filter and status_filter != 'all':
workorders = [w for w in workorders if w['status'] == status_filter]
q = q.filter(WorkOrder.status == status_filter)
if priority_filter and priority_filter != 'all':
workorders = [w for w in workorders if w['priority'] == priority_filter]
return jsonify(workorders)
q = q.filter(WorkOrder.priority == priority_filter)
q = q.order_by(WorkOrder.created_at.desc())
rows = q.all()
result = []
for w in rows:
result.append({
"id": w.id,
"order_id": w.order_id,
"title": w.title,
"description": w.description,
"category": w.category,
"priority": w.priority,
"status": w.status,
"created_at": w.created_at.isoformat() if w.created_at else None,
"updated_at": w.updated_at.isoformat() if w.updated_at else None,
"resolution": w.resolution,
"satisfaction_score": w.satisfaction_score
})
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@@ -679,36 +673,34 @@ def create_workorder():
@app.route('/api/workorders/<int:workorder_id>')
def get_workorder_details(workorder_id):
"""获取工单详情"""
"""获取工单详情(含数据库对话记录)"""
try:
# 这里应该从数据库获取工单详情
# 暂时返回模拟数据
with db_manager.get_session() as session:
w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first()
if not w:
return jsonify({"error": "工单不存在"}), 404
convs = session.query(Conversation).filter(Conversation.work_order_id == w.id).order_by(Conversation.timestamp.asc()).all()
conv_list = []
for c in convs:
conv_list.append({
"id": c.id,
"user_message": c.user_message,
"assistant_response": c.assistant_response,
"timestamp": c.timestamp.isoformat() if c.timestamp else None
})
workorder = {
"id": workorder_id,
"order_id": f"WO{workorder_id:06d}",
"title": "车辆无法远程启动",
"description": "用户反映APP中远程启动功能无法使用点击启动按钮后没有任何反应车辆也没有响应。",
"category": "远程控制",
"priority": "high",
"status": "open",
"created_at": "2024-01-01T10:00:00Z",
"updated_at": "2024-01-01T10:00:00Z",
"resolution": None,
"satisfaction_score": None,
"conversations": [
{
"id": 1,
"user_message": "我的车辆无法远程启动",
"assistant_response": "我了解您的问题。让我帮您排查一下远程启动功能的问题。",
"timestamp": "2024-01-01T10:05:00Z"
},
{
"id": 2,
"user_message": "点击启动按钮后没有任何反应",
"assistant_response": "这种情况通常是由于网络连接或车辆状态问题导致的。请检查车辆是否处于可启动状态。",
"timestamp": "2024-01-01T10:10:00Z"
}
]
"id": w.id,
"order_id": w.order_id,
"title": w.title,
"description": w.description,
"category": w.category,
"priority": w.priority,
"status": w.status,
"created_at": w.created_at.isoformat() if w.created_at else None,
"updated_at": w.updated_at.isoformat() if w.updated_at else None,
"resolution": w.resolution,
"satisfaction_score": w.satisfaction_score,
"conversations": conv_list
}
return jsonify(workorder)
except Exception as e:
@@ -716,33 +708,135 @@ def get_workorder_details(workorder_id):
@app.route('/api/workorders/<int:workorder_id>', methods=['PUT'])
def update_workorder(workorder_id):
"""更新工单"""
"""更新工单(写入数据库)"""
try:
data = request.get_json()
# 验证必填字段
if not data.get('title') or not data.get('description'):
return jsonify({"error": "标题和描述不能为空"}), 400
# 这里应该更新数据库中的工单
# 暂时返回成功响应,实际应用中应该调用数据库更新
updated_workorder = {
"id": workorder_id,
"title": data.get('title'),
"description": data.get('description'),
"category": data.get('category', '技术问题'),
"priority": data.get('priority', 'medium'),
"status": data.get('status', 'open'),
"resolution": data.get('resolution'),
"satisfaction_score": data.get('satisfaction_score'),
"updated_at": datetime.now().isoformat()
}
return jsonify({
"success": True,
"message": "工单更新成功",
"workorder": updated_workorder
})
with db_manager.get_session() as session:
w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first()
if not w:
return jsonify({"error": "工单不存在"}), 404
w.title = data.get('title', w.title)
w.description = data.get('description', w.description)
w.category = data.get('category', w.category)
w.priority = data.get('priority', w.priority)
w.status = data.get('status', w.status)
w.resolution = data.get('resolution', w.resolution)
w.satisfaction_score = data.get('satisfaction_score', w.satisfaction_score)
w.updated_at = datetime.now()
session.commit()
updated = {
"id": w.id,
"title": w.title,
"description": w.description,
"category": w.category,
"priority": w.priority,
"status": w.status,
"resolution": w.resolution,
"satisfaction_score": w.satisfaction_score,
"updated_at": w.updated_at.isoformat() if w.updated_at else None
}
return jsonify({"success": True, "message": "工单更新成功", "workorder": updated})
except Exception as e:
return jsonify({"error": str(e)}), 500
# 工单AI建议生成、保存人工描述、审批入库
@app.route('/api/workorders/<int:workorder_id>/ai-suggestion', methods=['POST'])
def generate_workorder_ai_suggestion(workorder_id):
"""根据工单描述与知识库生成AI建议草稿"""
try:
with db_manager.get_session() as session:
w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first()
if not w:
return jsonify({"error": "工单不存在"}), 404
# 调用知识库搜索与LLM生成
query = f"{w.title} {w.description}"
kb_results = assistant.search_knowledge(query, top_k=3)
kb_list = kb_results.get('results', []) if isinstance(kb_results, dict) else []
# 组装提示词
context = "\n".join([f"Q: {k.get('question','')}\nA: {k.get('answer','')}" for k in kb_list])
from src.core.llm_client import QwenClient
llm = QwenClient()
prompt = f"请基于以下工单描述与知识库片段,给出简洁、可执行的处理建议。\n工单描述:\n{w.description}\n\n知识库片段:\n{context}\n\n请直接输出建议文本:"
llm_resp = llm.chat_completion(messages=[{"role":"user","content":prompt}], temperature=0.3, max_tokens=800)
suggestion = ""
if llm_resp and 'choices' in llm_resp:
suggestion = llm_resp['choices'][0]['message']['content']
# 保存/更新草稿记录
rec = session.query(WorkOrderSuggestion).filter(WorkOrderSuggestion.work_order_id == w.id).first()
if not rec:
rec = WorkOrderSuggestion(work_order_id=w.id, ai_suggestion=suggestion)
session.add(rec)
else:
rec.ai_suggestion = suggestion
rec.updated_at = datetime.now()
session.commit()
return jsonify({"success": True, "ai_suggestion": suggestion})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/workorders/<int:workorder_id>/human-resolution', methods=['POST'])
def save_workorder_human_resolution(workorder_id):
"""保存人工描述并计算与AI建议相似度若≥95%可自动审批入库"""
try:
data = request.get_json() or {}
human_text = data.get('human_resolution','').strip()
if not human_text:
return jsonify({"error":"人工描述不能为空"}), 400
with db_manager.get_session() as session:
w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first()
if not w:
return jsonify({"error": "工单不存在"}), 404
rec = session.query(WorkOrderSuggestion).filter(WorkOrderSuggestion.work_order_id == w.id).first()
if not rec:
rec = WorkOrderSuggestion(work_order_id=w.id)
session.add(rec)
rec.human_resolution = human_text
# 计算相似度使用简单cosine TF-IDF避免外部服务依赖
try:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
texts = [rec.ai_suggestion or "", human_text]
vec = TfidfVectorizer(max_features=1000)
mat = vec.fit_transform(texts)
sim = float(cosine_similarity(mat[0:1], mat[1:2])[0][0])
except Exception:
sim = 0.0
rec.ai_similarity = sim
# 自动审批条件≥0.95
approved = sim >= 0.95
rec.approved = approved
session.commit()
return jsonify({"success": True, "similarity": sim, "approved": approved})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/workorders/<int:workorder_id>/approve-to-knowledge', methods=['POST'])
def approve_workorder_to_knowledge(workorder_id):
"""将已审批的AI建议入库为知识条目"""
try:
with db_manager.get_session() as session:
w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first()
if not w:
return jsonify({"error": "工单不存在"}), 404
rec = session.query(WorkOrderSuggestion).filter(WorkOrderSuggestion.work_order_id == w.id).first()
if not rec or not rec.approved or not rec.ai_suggestion:
return jsonify({"error": "未找到可入库的已审批AI建议"}), 400
# 入库为知识条目(问=工单标题;答=AI建议类目用工单分类
entry = KnowledgeEntry(
question=w.title or (w.description[:20] if w.description else '工单问题'),
answer=rec.ai_suggestion,
category=w.category or '其他',
confidence_score=0.95,
is_active=True,
is_verified=True,
verified_by='auto_approve',
verified_at=datetime.now()
)
session.add(entry)
session.commit()
return jsonify({"success": True, "knowledge_id": entry.id})
except Exception as e:
return jsonify({"error": str(e)}), 500
@@ -751,98 +845,112 @@ def update_workorder(workorder_id):
def get_analytics():
"""获取分析数据"""
try:
time_range = request.args.get('timeRange', '30')
# 支持多种参数名
time_range = request.args.get('timeRange', request.args.get('days', '30'))
dimension = request.args.get('dimension', 'workorders')
# 生成模拟分析数据
analytics = generate_analytics_data(int(time_range), dimension)
analytics = generate_db_analytics(int(time_range), dimension)
return jsonify(analytics)
except Exception as e:
return jsonify({"error": str(e)}), 500
def generate_analytics_data(days, dimension):
"""生成分析数据"""
import random
from datetime import datetime, timedelta
# 生成时间序列数据
trend_data = []
for i in range(days):
date = (datetime.now() - timedelta(days=days-i-1)).strftime('%Y-%m-%d')
workorders = random.randint(5, 25)
alerts = random.randint(0, 10)
trend_data.append({
'date': date,
'workorders': workorders,
'alerts': alerts
})
def generate_db_analytics(days: int, dimension: str) -> dict:
"""基于数据库生成真实分析数据"""
from collections import defaultdict, Counter
end_time = datetime.now()
start_time = end_time - timedelta(days=days-1)
with db_manager.get_session() as session:
# 拉取数据
workorders = session.query(WorkOrder).filter(WorkOrder.created_at >= start_time).all()
alerts = session.query(Alert).filter(Alert.created_at >= start_time).all()
conversations = session.query(Conversation).filter(Conversation.timestamp >= start_time).all()
knowledge_entries = session.query(KnowledgeEntry).all()
# 趋势数据(按天)
day_keys = [(start_time + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(days)]
wo_by_day = Counter([(wo.created_at.strftime('%Y-%m-%d') if wo.created_at else end_time.strftime('%Y-%m-%d')) for wo in workorders])
alert_by_day = Counter([(al.created_at.strftime('%Y-%m-%d') if al.created_at else end_time.strftime('%Y-%m-%d')) for al in alerts])
trend = [{
'date': d,
'workorders': int(wo_by_day.get(d, 0)),
'alerts': int(alert_by_day.get(d, 0))
} for d in day_keys]
# 工单统计
total = len(workorders)
status_counts = Counter([wo.status for wo in workorders])
category_counts = Counter([wo.category for wo in workorders])
priority_counts = Counter([wo.priority for wo in workorders])
resolved_count = status_counts.get('resolved', 0)
workorders_stats = {
'total': random.randint(100, 500),
'open': random.randint(10, 50),
'in_progress': random.randint(5, 30),
'resolved': random.randint(50, 200),
'closed': random.randint(20, 100),
'by_category': {
'技术问题': random.randint(20, 80),
'业务问题': random.randint(15, 60),
'系统故障': random.randint(10, 40),
'功能需求': random.randint(5, 30),
'其他': random.randint(5, 20)
},
'by_priority': {
'low': random.randint(20, 60),
'medium': random.randint(30, 80),
'high': random.randint(10, 40),
'urgent': random.randint(5, 20)
'total': total,
'open': status_counts.get('open', 0),
'in_progress': status_counts.get('in_progress', 0),
'resolved': resolved_count,
'closed': status_counts.get('closed', 0),
'by_category': dict(category_counts),
'by_priority': dict(priority_counts)
}
}
# 满意度分析
# 满意度
scores = []
for wo in workorders:
if wo.satisfaction_score not in (None, ''):
try:
score = float(wo.satisfaction_score)
scores.append(score)
except (ValueError, TypeError):
continue
avg_satisfaction = round(sum(scores)/len(scores), 1) if scores else 0
dist = Counter([str(int(round(s))) for s in scores]) if scores else {}
satisfaction_stats = {
'average': round(random.uniform(3.5, 4.8), 1),
'distribution': {
'1': random.randint(0, 5),
'2': random.randint(0, 10),
'3': random.randint(5, 20),
'4': random.randint(20, 50),
'5': random.randint(30, 80)
}
'average': avg_satisfaction,
'distribution': {k: int(v) for k, v in dist.items()}
}
# 预警统计
level_counts = Counter([al.level for al in alerts])
active_alerts = len([al for al in alerts if al.is_active])
resolved_alerts = len([al for al in alerts if not al.is_active and al.resolved_at])
alerts_stats = {
'total': random.randint(50, 200),
'active': random.randint(5, 30),
'resolved': random.randint(20, 100),
'by_level': {
'low': random.randint(10, 40),
'medium': random.randint(15, 50),
'high': random.randint(5, 25),
'critical': random.randint(2, 10)
'total': len(alerts),
'active': active_alerts,
'resolved': resolved_alerts,
'by_level': {k: int(v) for k, v in level_counts.items()}
}
}
# 性能指标
# 性能指标(基于对话响应时间粗略估计)
resp_times = []
for c in conversations:
if c.response_time not in (None, ''):
try:
resp_time = float(c.response_time)
resp_times.append(resp_time)
except (ValueError, TypeError):
continue
avg_resp = round(sum(resp_times)/len(resp_times), 2) if resp_times else 0
throughput = len(conversations) # 期间内的对话数量
# 错误率:用严重预警比例粗估
critical = level_counts.get('critical', 0)
error_rate = round((critical / alerts_stats['total']) * 100, 2) if alerts_stats['total'] > 0 else 0
performance_stats = {
'response_time': round(random.uniform(0.5, 2.0), 2),
'uptime': round(random.uniform(95, 99.9), 1),
'error_rate': round(random.uniform(0.1, 2.0), 2),
'throughput': random.randint(1000, 5000)
'response_time': avg_resp,
'uptime': 99.0, # 可接入真实监控后更新
'error_rate': error_rate,
'throughput': throughput
}
return {
'trend': trend_data,
'trend': trend,
'workorders': workorders_stats,
'satisfaction': satisfaction_stats,
'alerts': alerts_stats,
'performance': performance_stats,
'summary': {
'total_workorders': workorders_stats['total'],
'resolution_rate': round((workorders_stats['resolved'] / workorders_stats['total']) * 100, 1) if workorders_stats['total'] > 0 else 0,
'avg_satisfaction': satisfaction_stats['average'],
'active_alerts': alerts_stats['active']
'total_workorders': total,
'resolution_rate': round((resolved_count/total)*100, 1) if total > 0 else 0,
'avg_satisfaction': avg_satisfaction,
'active_alerts': active_alerts
}
}
@@ -850,8 +958,8 @@ def generate_analytics_data(days, dimension):
def export_analytics():
"""导出分析报告"""
try:
# 生成Excel报告
analytics = generate_analytics_data(30, 'workorders')
# 生成Excel报告(使用数据库真实数据)
analytics = generate_db_analytics(30, 'workorders')
# 创建工作簿
wb = Workbook()
@@ -882,6 +990,50 @@ def export_analytics():
except Exception as e:
return jsonify({"error": str(e)}), 500
# Agent 工具统计与自定义工具
@app.route('/api/agent/tools/stats')
def get_agent_tools_stats():
try:
tools = agent_assistant.agent_core.tool_manager.get_available_tools()
performance = agent_assistant.agent_core.tool_manager.get_tool_performance_report()
return jsonify({
"success": True,
"tools": tools,
"performance": performance
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/agent/tools/register', methods=['POST'])
def register_custom_tool():
"""注册自定义工具(仅登记元数据,函数为占位)"""
try:
data = request.get_json() or {}
name = data.get('name')
description = data.get('description', '')
if not name:
return jsonify({"error": "缺少工具名称"}), 400
def _placeholder_tool(**kwargs):
return {"message": f"自定义工具 {name} 已登记(占位),当前不可执行", "params": kwargs}
agent_assistant.agent_core.tool_manager.register_tool(
name,
_placeholder_tool,
metadata={"description": description, "custom": True}
)
return jsonify({"success": True, "message": "工具已注册"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/agent/tools/unregister/<name>', methods=['DELETE'])
def unregister_custom_tool(name):
try:
success = agent_assistant.agent_core.tool_manager.unregister_tool(name)
return jsonify({"success": success})
except Exception as e:
return jsonify({"error": str(e)}), 500
# 工单导入相关API
@app.route('/api/workorders/import', methods=['POST'])
def import_workorders():
@@ -953,24 +1105,7 @@ def import_workorders():
def download_import_template():
"""下载工单导入模板"""
try:
# 创建模板数据
template_data = {
'标题': ['车辆无法启动', '空调不制冷', '导航系统故障'],
'描述': ['用户反映车辆无法正常启动', '空调系统无法制冷', '导航系统显示异常'],
'分类': ['技术问题', '技术问题', '技术问题'],
'优先级': ['high', 'medium', 'low'],
'状态': ['open', 'in_progress', 'resolved'],
'解决方案': ['检查电池和启动系统', '检查制冷剂和压缩机', '更新导航软件'],
'满意度': [5, 4, 5]
}
df = pd.DataFrame(template_data)
# 保存为Excel文件
template_path = 'uploads/workorder_template.xlsx'
os.makedirs('uploads', exist_ok=True)
df.to_excel(template_path, index=False)
template_path = _ensure_workorder_template_file()
return jsonify({
"success": True,
"template_url": f"/uploads/workorder_template.xlsx"
@@ -979,6 +1114,15 @@ def download_import_template():
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/workorders/import/template/file')
def download_import_template_file():
"""直接返回工单导入模板文件(下载)"""
try:
template_path = _ensure_workorder_template_file()
return send_file(template_path, as_attachment=True, download_name='工单导入模板.xlsx')
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/uploads/<filename>')
def uploaded_file(filename):
"""提供上传文件的下载服务"""
@@ -989,13 +1133,46 @@ def uploaded_file(filename):
def get_settings():
"""获取系统设置"""
try:
import json
settings_path = os.path.join('data', 'system_settings.json')
os.makedirs('data', exist_ok=True)
if os.path.exists(settings_path):
with open(settings_path, 'r', encoding='utf-8') as f:
settings = json.load(f)
# 掩码API Key
if settings.get('api_key'):
settings['api_key'] = '******'
settings['api_key_masked'] = True
else:
settings = {
"api_timeout": 30,
"max_history": 10,
"refresh_interval": 10,
"auto_monitoring": True,
"agent_mode": True
}
"agent_mode": True,
# LLM与API配置仅持久化不直接热更新LLM客户端
"api_provider": "openai",
"api_base_url": "",
"api_key": "",
"model_name": "qwen-turbo",
"model_temperature": 0.7,
"model_max_tokens": 1000,
# 服务配置
"server_port": 5000,
"websocket_port": 8765,
"log_level": "INFO"
}
with open(settings_path, 'w', encoding='utf-8') as f:
json.dump(settings, f, ensure_ascii=False, indent=2)
# 添加当前服务状态信息
import time
import psutil
settings['current_server_port'] = app.config.get('SERVER_PORT', 5000)
settings['current_websocket_port'] = app.config.get('WEBSOCKET_PORT', 8765)
settings['uptime_seconds'] = int(time.time() - app.config.get('START_TIME', time.time()))
settings['memory_usage_percent'] = psutil.virtual_memory().percent
settings['cpu_usage_percent'] = psutil.cpu_percent()
return jsonify(settings)
except Exception as e:
return jsonify({"error": str(e)}), 500
@@ -1005,7 +1182,26 @@ def save_settings():
"""保存系统设置"""
try:
data = request.get_json()
# 这里应该保存设置到配置文件
import json
os.makedirs('data', exist_ok=True)
settings_path = os.path.join('data', 'system_settings.json')
# 读取旧值处理api_key掩码
old = {}
if os.path.exists(settings_path):
try:
with open(settings_path, 'r', encoding='utf-8') as f:
old = json.load(f)
except Exception:
old = {}
# 如果前端传回掩码或空则保留旧的api_key
if 'api_key' in data:
if not data['api_key'] or data['api_key'] == '******':
data['api_key'] = old.get('api_key', '')
# 移除mask标志
if 'api_key_masked' in data:
data.pop('api_key_masked')
with open(settings_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return jsonify({"success": True, "message": "设置保存成功"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@@ -1033,10 +1229,13 @@ def get_vehicle_data():
"""获取车辆数据"""
try:
vehicle_id = request.args.get('vehicle_id')
vehicle_vin = request.args.get('vehicle_vin')
data_type = request.args.get('data_type')
limit = request.args.get('limit', 10, type=int)
if vehicle_id:
if vehicle_vin:
data = vehicle_manager.get_vehicle_data_by_vin(vehicle_vin, data_type, limit)
elif vehicle_id:
data = vehicle_manager.get_vehicle_data(vehicle_id, data_type, limit)
else:
data = vehicle_manager.search_vehicle_data(limit=limit)
@@ -1045,6 +1244,15 @@ def get_vehicle_data():
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/vehicle/data/vin/<vehicle_vin>/latest')
def get_latest_vehicle_data_by_vin(vehicle_vin):
"""按VIN获取车辆最新数据"""
try:
data = vehicle_manager.get_latest_vehicle_data_by_vin(vehicle_vin)
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/vehicle/data/<vehicle_id>/latest')
def get_latest_vehicle_data(vehicle_id):
"""获取车辆最新数据"""
@@ -1087,5 +1295,50 @@ def init_sample_vehicle_data():
except Exception as e:
return jsonify({"error": str(e)}), 500
# API测试相关接口
@app.route('/api/test/connection', methods=['POST'])
def test_api_connection():
"""测试API连接"""
try:
data = request.get_json()
api_provider = data.get('api_provider', 'openai')
api_base_url = data.get('api_base_url', '')
api_key = data.get('api_key', '')
model_name = data.get('model_name', 'qwen-turbo')
# 这里可以调用LLM客户端进行连接测试
# 暂时返回模拟结果
return jsonify({
"success": True,
"message": f"API连接测试成功 - {api_provider}",
"response_time": "150ms",
"model_status": "可用"
})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/test/model', methods=['POST'])
def test_model_response():
"""测试模型回答"""
try:
data = request.get_json()
test_message = data.get('test_message', '你好,请简单介绍一下你自己')
# 这里可以调用LLM客户端进行回答测试
# 暂时返回模拟结果
return jsonify({
"success": True,
"test_message": test_message,
"response": "你好我是TSP智能助手基于大语言模型构建的智能客服系统。我可以帮助您解决车辆相关问题提供技术支持和服务。",
"response_time": "1.2s",
"tokens_used": 45
})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
if __name__ == '__main__':
import time
app.config['START_TIME'] = time.time()
app.config['SERVER_PORT'] = 5000
app.config['WEBSOCKET_PORT'] = 8765
app.run(debug=True, host='0.0.0.0', port=5000)