Files
assist/src/web/app.py

1047 lines
36 KiB
Python
Raw Normal View History

2025-09-06 21:06:18 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TSP助手预警管理Web应用
提供预警系统的Web界面和API接口
"""
import sys
import os
import json
import pandas as pd
2025-09-06 21:06:18 +08:00
from datetime import datetime, timedelta
from openpyxl import Workbook
from openpyxl.styles import Font
from flask import Flask, render_template, request, jsonify, redirect, url_for, send_from_directory, send_file
2025-09-06 21:06:18 +08:00
from flask_cors import CORS
from werkzeug.utils import secure_filename
2025-09-06 21:06:18 +08:00
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.main import TSPAssistant
from src.agent_assistant import TSPAgentAssistant
from src.analytics.alert_system import AlertRule, AlertLevel, AlertType
from src.dialogue.realtime_chat import RealtimeChatManager
from src.vehicle.vehicle_data_manager import VehicleDataManager
app = Flask(__name__)
CORS(app)
# 配置上传文件夹
UPLOAD_FOLDER = 'uploads'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
2025-09-06 21:06:18 +08:00
# 初始化TSP助手和Agent助手
assistant = TSPAssistant()
agent_assistant = TSPAgentAssistant()
chat_manager = RealtimeChatManager()
vehicle_manager = VehicleDataManager()
@app.route('/')
def index():
"""主页 - 综合管理平台"""
return render_template('dashboard.html')
@app.route('/alerts')
def alerts():
"""预警管理页面"""
return render_template('index.html')
@app.route('/api/health')
def get_health():
"""获取系统健康状态"""
try:
health = assistant.get_system_health()
return jsonify(health)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/alerts')
def get_alerts():
"""获取预警列表"""
try:
alerts = assistant.get_active_alerts()
return jsonify(alerts)
except Exception as e:
return jsonify({"error": str(e)}), 500
2025-09-08 15:27:22 +08:00
@app.route('/api/alerts', methods=['POST'])
def create_alert():
"""创建预警"""
try:
data = request.get_json()
alert = assistant.create_alert(
alert_type=data.get('alert_type', 'manual'),
title=data.get('title', '手动预警'),
description=data.get('description', ''),
level=data.get('level', 'medium')
)
return jsonify({"success": True, "alert": alert})
except Exception as e:
return jsonify({"error": str(e)}), 500
2025-09-06 21:06:18 +08:00
@app.route('/api/alerts/statistics')
def get_alert_statistics():
"""获取预警统计"""
try:
stats = assistant.get_alert_statistics()
return jsonify(stats)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/alerts/<int:alert_id>/resolve', methods=['POST'])
def resolve_alert(alert_id):
"""解决预警"""
try:
success = assistant.resolve_alert(alert_id)
if success:
return jsonify({"success": True, "message": "预警已解决"})
else:
return jsonify({"success": False, "message": "解决预警失败"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/rules')
def get_rules():
"""获取预警规则列表"""
try:
rules = assistant.alert_system.rules
rules_data = []
for name, rule in rules.items():
rules_data.append({
"name": rule.name,
"description": rule.description,
"alert_type": rule.alert_type.value,
"level": rule.level.value,
"threshold": rule.threshold,
"condition": rule.condition,
"enabled": rule.enabled,
"check_interval": rule.check_interval,
"cooldown": rule.cooldown
})
return jsonify(rules_data)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/rules', methods=['POST'])
def create_rule():
"""创建预警规则"""
try:
data = request.get_json()
rule = AlertRule(
name=data['name'],
description=data['description'],
alert_type=AlertType(data['alert_type']),
level=AlertLevel(data['level']),
threshold=float(data['threshold']),
condition=data['condition'],
enabled=data.get('enabled', True),
check_interval=int(data.get('check_interval', 300)),
cooldown=int(data.get('cooldown', 3600))
)
success = assistant.alert_system.add_custom_rule(rule)
if success:
return jsonify({"success": True, "message": "规则创建成功"})
else:
return jsonify({"success": False, "message": "规则创建失败"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/rules/<rule_name>', methods=['PUT'])
def update_rule(rule_name):
"""更新预警规则"""
try:
data = request.get_json()
success = assistant.alert_system.update_rule(rule_name, **data)
if success:
return jsonify({"success": True, "message": "规则更新成功"})
else:
return jsonify({"success": False, "message": "规则更新失败"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/rules/<rule_name>', methods=['DELETE'])
def delete_rule(rule_name):
"""删除预警规则"""
try:
success = assistant.alert_system.delete_rule(rule_name)
if success:
return jsonify({"success": True, "message": "规则删除成功"})
else:
return jsonify({"success": False, "message": "规则删除失败"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/monitor/start', methods=['POST'])
def start_monitoring():
"""启动监控服务"""
try:
success = assistant.start_monitoring()
if success:
return jsonify({"success": True, "message": "监控服务已启动"})
else:
return jsonify({"success": False, "message": "启动监控服务失败"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/monitor/stop', methods=['POST'])
def stop_monitoring():
"""停止监控服务"""
try:
success = assistant.stop_monitoring()
if success:
return jsonify({"success": True, "message": "监控服务已停止"})
else:
return jsonify({"success": False, "message": "停止监控服务失败"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/monitor/status')
def get_monitor_status():
"""获取监控服务状态"""
try:
health = assistant.get_system_health()
return jsonify({
"monitor_status": health.get("monitor_status", "unknown"),
"health_score": health.get("health_score", 0),
"active_alerts": health.get("active_alerts", 0)
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/check-alerts', methods=['POST'])
def check_alerts():
"""手动检查预警"""
try:
alerts = assistant.check_alerts()
return jsonify({
"success": True,
"alerts": alerts,
"count": len(alerts)
})
except Exception as e:
return jsonify({"error": str(e)}), 500
# 实时对话相关路由
@app.route('/chat')
def chat():
"""实时对话页面 (WebSocket版本)"""
return render_template('chat.html')
@app.route('/chat-http')
def chat_http():
"""实时对话页面 (HTTP版本)"""
return render_template('chat_http.html')
@app.route('/api/chat/session', methods=['POST'])
def create_chat_session():
"""创建对话会话"""
try:
data = request.get_json()
user_id = data.get('user_id', 'anonymous')
work_order_id = data.get('work_order_id')
session_id = chat_manager.create_session(user_id, work_order_id)
return jsonify({
"success": True,
"session_id": session_id,
"message": "会话创建成功"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/chat/message', methods=['POST'])
def send_chat_message():
"""发送聊天消息"""
try:
data = request.get_json()
session_id = data.get('session_id')
message = data.get('message')
if not session_id or not message:
return jsonify({"error": "缺少必要参数"}), 400
result = chat_manager.process_message(session_id, message)
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/chat/history/<session_id>')
def get_chat_history(session_id):
"""获取对话历史"""
try:
history = chat_manager.get_session_history(session_id)
return jsonify({
"success": True,
"history": history
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/chat/work-order', methods=['POST'])
def create_work_order():
"""创建工单"""
try:
data = request.get_json()
session_id = data.get('session_id')
title = data.get('title')
description = data.get('description')
category = data.get('category', '技术问题')
priority = data.get('priority', 'medium')
if not session_id or not title or not description:
return jsonify({"error": "缺少必要参数"}), 400
result = chat_manager.create_work_order(session_id, title, description, category, priority)
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/chat/work-order/<int:work_order_id>')
def get_work_order_status(work_order_id):
"""获取工单状态"""
try:
result = chat_manager.get_work_order_status(work_order_id)
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/chat/session/<session_id>', methods=['DELETE'])
def end_chat_session(session_id):
"""结束对话会话"""
try:
success = chat_manager.end_session(session_id)
return jsonify({
"success": success,
"message": "会话已结束" if success else "结束会话失败"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/chat/sessions')
def get_active_sessions():
"""获取活跃会话列表"""
try:
sessions = chat_manager.get_active_sessions()
return jsonify({
"success": True,
"sessions": sessions
})
except Exception as e:
return jsonify({"error": str(e)}), 500
# Agent相关API
@app.route('/api/agent/status')
def get_agent_status():
"""获取Agent状态"""
try:
status = agent_assistant.get_agent_status()
return jsonify(status)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/agent/toggle', methods=['POST'])
def toggle_agent_mode():
"""切换Agent模式"""
try:
data = request.get_json()
enabled = data.get('enabled', True)
success = agent_assistant.toggle_agent_mode(enabled)
return jsonify({
"success": success,
"message": f"Agent模式已{'启用' if enabled else '禁用'}"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/agent/monitoring/start', methods=['POST'])
def start_agent_monitoring():
"""启动Agent监控"""
try:
success = agent_assistant.start_proactive_monitoring()
return jsonify({
"success": success,
"message": "Agent监控已启动" if success else "启动失败"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/agent/monitoring/stop', methods=['POST'])
def stop_agent_monitoring():
"""停止Agent监控"""
try:
success = agent_assistant.stop_proactive_monitoring()
return jsonify({
"success": success,
"message": "Agent监控已停止" if success else "停止失败"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/agent/proactive-monitoring', methods=['POST'])
def proactive_monitoring():
"""主动监控检查"""
try:
result = agent_assistant.run_proactive_monitoring()
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/agent/intelligent-analysis', methods=['POST'])
def intelligent_analysis():
"""智能分析"""
try:
analysis = agent_assistant.run_intelligent_analysis()
return jsonify({"success": True, "analysis": analysis})
except Exception as e:
return jsonify({"error": str(e)}), 500
# 知识库相关API
@app.route('/api/knowledge')
def get_knowledge():
"""获取知识库列表"""
try:
# 获取分页参数
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# 从数据库获取知识库数据
knowledge_entries = assistant.knowledge_manager.get_knowledge_entries(
page=page, per_page=per_page
)
return jsonify(knowledge_entries)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/knowledge/search')
def search_knowledge():
"""搜索知识库"""
try:
query = request.args.get('q', '')
# 这里应该调用知识库管理器的搜索方法
results = assistant.search_knowledge(query, top_k=5)
return jsonify(results.get('results', []))
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/knowledge', methods=['POST'])
def add_knowledge():
"""添加知识库条目"""
try:
data = request.get_json()
success = assistant.knowledge_manager.add_knowledge_entry(
question=data['question'],
answer=data['answer'],
category=data['category'],
confidence_score=data['confidence_score']
)
return jsonify({"success": success, "message": "知识添加成功" if success else "添加失败"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/knowledge/stats')
def get_knowledge_stats():
"""获取知识库统计"""
try:
stats = assistant.knowledge_manager.get_knowledge_stats()
return jsonify(stats)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/knowledge/upload', methods=['POST'])
def upload_knowledge_file():
"""上传文件并生成知识库"""
try:
if 'file' not in request.files:
return jsonify({"error": "没有上传文件"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "没有选择文件"}), 400
# 保存文件到临时目录
import tempfile
import os
import uuid
# 创建唯一的临时文件名
temp_filename = f"upload_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}"
temp_path = os.path.join(tempfile.gettempdir(), temp_filename)
try:
# 保存文件
file.save(temp_path)
# 使用Agent助手处理文件
result = agent_assistant.process_file_to_knowledge(temp_path, file.filename)
return jsonify(result)
finally:
# 确保删除临时文件
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception as cleanup_error:
logger.warning(f"清理临时文件失败: {cleanup_error}")
except Exception as e:
logger.error(f"文件上传处理失败: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/knowledge/delete/<int:knowledge_id>', methods=['DELETE'])
def delete_knowledge(knowledge_id):
"""删除知识库条目"""
try:
success = assistant.knowledge_manager.delete_knowledge_entry(knowledge_id)
return jsonify({"success": success, "message": "删除成功" if success else "删除失败"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/knowledge/verify/<int:knowledge_id>', methods=['POST'])
def verify_knowledge(knowledge_id):
"""验证知识库条目"""
try:
data = request.get_json() or {}
verified_by = data.get('verified_by', 'admin')
success = assistant.knowledge_manager.verify_knowledge_entry(knowledge_id, verified_by)
return jsonify({"success": success, "message": "验证成功" if success else "验证失败"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/knowledge/unverify/<int:knowledge_id>', methods=['POST'])
def unverify_knowledge(knowledge_id):
"""取消验证知识库条目"""
try:
success = assistant.knowledge_manager.unverify_knowledge_entry(knowledge_id)
return jsonify({"success": success, "message": "取消验证成功" if success else "取消验证失败"})
except Exception as e:
return jsonify({"error": str(e)}), 500
# 工单相关API
@app.route('/api/workorders')
def get_workorders():
"""获取工单列表"""
try:
status_filter = request.args.get('status')
priority_filter = request.args.get('priority')
# 这里应该调用工单管理器的获取方法
workorders = [
{
"id": 1,
"title": "车辆无法远程启动",
"description": "用户反映APP中远程启动功能无法使用",
"category": "远程控制",
"priority": "high",
"status": "open",
"created_at": "2024-01-01T10:00:00Z"
},
{
"id": 2,
"title": "APP显示异常",
"description": "APP中车辆信息显示不正确",
"category": "APP功能",
"priority": "medium",
"status": "in_progress",
"created_at": "2024-01-01T11:00:00Z"
},
{
"id": 3,
"title": "蓝牙连接失败",
"description": "用户无法通过蓝牙连接车辆",
"category": "蓝牙功能",
"priority": "high",
"status": "open",
"created_at": "2024-01-01T12:00:00Z"
},
{
"id": 4,
"title": "车辆定位不准确",
"description": "APP中显示的车辆位置与实际位置不符",
"category": "定位功能",
"priority": "medium",
"status": "resolved",
"created_at": "2024-01-01T13:00:00Z"
},
{
"id": 5,
"title": "远程解锁失败",
"description": "用户无法通过APP远程解锁车辆",
"category": "远程控制",
"priority": "urgent",
"status": "open",
"created_at": "2024-01-01T14:00:00Z"
},
{
"id": 6,
"title": "APP闪退问题",
"description": "用户反映APP在使用过程中频繁闪退",
"category": "APP功能",
"priority": "high",
"status": "in_progress",
"created_at": "2024-01-01T15:00:00Z"
},
{
"id": 7,
"title": "车辆状态更新延迟",
"description": "车辆状态信息更新不及时,存在延迟",
"category": "数据同步",
"priority": "low",
"status": "open",
"created_at": "2024-01-01T16:00:00Z"
},
{
"id": 8,
"title": "用户认证失败",
"description": "部分用户无法正常登录APP",
"category": "用户认证",
"priority": "high",
"status": "resolved",
"created_at": "2024-01-01T17:00:00Z"
2025-09-06 21:06:18 +08:00
}
]
# 应用过滤
if status_filter and status_filter != 'all':
workorders = [w for w in workorders if w['status'] == status_filter]
if priority_filter and priority_filter != 'all':
workorders = [w for w in workorders if w['priority'] == priority_filter]
return jsonify(workorders)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/workorders', methods=['POST'])
def create_workorder():
"""创建工单"""
try:
data = request.get_json()
result = assistant.create_work_order(
title=data['title'],
description=data['description'],
category=data['category'],
priority=data['priority']
)
return jsonify({"success": True, "workorder": result})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/workorders/<int:workorder_id>')
def get_workorder_details(workorder_id):
"""获取工单详情"""
try:
# 这里应该从数据库获取工单详情
# 暂时返回模拟数据
workorder = {
"id": workorder_id,
"order_id": f"WO{workorder_id:06d}",
"title": "车辆无法远程启动",
"description": "用户反映APP中远程启动功能无法使用点击启动按钮后没有任何反应车辆也没有响应。",
"category": "远程控制",
"priority": "high",
"status": "open",
"created_at": "2024-01-01T10:00:00Z",
"updated_at": "2024-01-01T10:00:00Z",
"resolution": None,
"satisfaction_score": None,
"conversations": [
{
"id": 1,
"user_message": "我的车辆无法远程启动",
"assistant_response": "我了解您的问题。让我帮您排查一下远程启动功能的问题。",
"timestamp": "2024-01-01T10:05:00Z"
},
{
"id": 2,
"user_message": "点击启动按钮后没有任何反应",
"assistant_response": "这种情况通常是由于网络连接或车辆状态问题导致的。请检查车辆是否处于可启动状态。",
"timestamp": "2024-01-01T10:10:00Z"
}
]
}
return jsonify(workorder)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/workorders/<int:workorder_id>', methods=['PUT'])
def update_workorder(workorder_id):
"""更新工单"""
try:
data = request.get_json()
# 验证必填字段
if not data.get('title') or not data.get('description'):
return jsonify({"error": "标题和描述不能为空"}), 400
# 这里应该更新数据库中的工单
# 暂时返回成功响应,实际应用中应该调用数据库更新
updated_workorder = {
"id": workorder_id,
"title": data.get('title'),
"description": data.get('description'),
"category": data.get('category', '技术问题'),
"priority": data.get('priority', 'medium'),
"status": data.get('status', 'open'),
"resolution": data.get('resolution'),
"satisfaction_score": data.get('satisfaction_score'),
"updated_at": datetime.now().isoformat()
}
return jsonify({
"success": True,
"message": "工单更新成功",
"workorder": updated_workorder
})
except Exception as e:
return jsonify({"error": str(e)}), 500
2025-09-06 21:06:18 +08:00
# 分析相关API
@app.route('/api/analytics')
def get_analytics():
"""获取分析数据"""
try:
time_range = request.args.get('timeRange', '30')
dimension = request.args.get('dimension', 'workorders')
# 生成模拟分析数据
analytics = generate_analytics_data(int(time_range), dimension)
2025-09-06 21:06:18 +08:00
return jsonify(analytics)
except Exception as e:
return jsonify({"error": str(e)}), 500
def generate_analytics_data(days, dimension):
"""生成分析数据"""
import random
from datetime import datetime, timedelta
# 生成时间序列数据
trend_data = []
for i in range(days):
date = (datetime.now() - timedelta(days=days-i-1)).strftime('%Y-%m-%d')
workorders = random.randint(5, 25)
alerts = random.randint(0, 10)
trend_data.append({
'date': date,
'workorders': workorders,
'alerts': alerts
})
# 工单统计
workorders_stats = {
'total': random.randint(100, 500),
'open': random.randint(10, 50),
'in_progress': random.randint(5, 30),
'resolved': random.randint(50, 200),
'closed': random.randint(20, 100),
'by_category': {
'技术问题': random.randint(20, 80),
'业务问题': random.randint(15, 60),
'系统故障': random.randint(10, 40),
'功能需求': random.randint(5, 30),
'其他': random.randint(5, 20)
},
'by_priority': {
'low': random.randint(20, 60),
'medium': random.randint(30, 80),
'high': random.randint(10, 40),
'urgent': random.randint(5, 20)
}
}
# 满意度分析
satisfaction_stats = {
'average': round(random.uniform(3.5, 4.8), 1),
'distribution': {
'1': random.randint(0, 5),
'2': random.randint(0, 10),
'3': random.randint(5, 20),
'4': random.randint(20, 50),
'5': random.randint(30, 80)
}
}
# 预警统计
alerts_stats = {
'total': random.randint(50, 200),
'active': random.randint(5, 30),
'resolved': random.randint(20, 100),
'by_level': {
'low': random.randint(10, 40),
'medium': random.randint(15, 50),
'high': random.randint(5, 25),
'critical': random.randint(2, 10)
}
}
# 性能指标
performance_stats = {
'response_time': round(random.uniform(0.5, 2.0), 2),
'uptime': round(random.uniform(95, 99.9), 1),
'error_rate': round(random.uniform(0.1, 2.0), 2),
'throughput': random.randint(1000, 5000)
}
return {
'trend': trend_data,
'workorders': workorders_stats,
'satisfaction': satisfaction_stats,
'alerts': alerts_stats,
'performance': performance_stats,
'summary': {
'total_workorders': workorders_stats['total'],
'resolution_rate': round((workorders_stats['resolved'] / workorders_stats['total']) * 100, 1) if workorders_stats['total'] > 0 else 0,
'avg_satisfaction': satisfaction_stats['average'],
'active_alerts': alerts_stats['active']
}
}
@app.route('/api/analytics/export')
def export_analytics():
"""导出分析报告"""
try:
# 生成Excel报告
analytics = generate_analytics_data(30, 'workorders')
# 创建工作簿
wb = Workbook()
ws = wb.active
ws.title = "分析报告"
# 添加标题
ws['A1'] = 'TSP智能助手分析报告'
ws['A1'].font = Font(size=16, bold=True)
# 添加工单统计
ws['A3'] = '工单统计'
ws['A3'].font = Font(bold=True)
ws['A4'] = '总工单数'
ws['B4'] = analytics['workorders']['total']
ws['A5'] = '待处理'
ws['B5'] = analytics['workorders']['open']
ws['A6'] = '已解决'
ws['B6'] = analytics['workorders']['resolved']
# 保存文件
report_path = 'uploads/analytics_report.xlsx'
os.makedirs('uploads', exist_ok=True)
wb.save(report_path)
return send_file(report_path, as_attachment=True, download_name='analytics_report.xlsx')
except Exception as e:
return jsonify({"error": str(e)}), 500
# 工单导入相关API
@app.route('/api/workorders/import', methods=['POST'])
def import_workorders():
"""导入Excel工单文件"""
try:
# 检查是否有文件上传
if 'file' not in request.files:
return jsonify({"error": "没有上传文件"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "没有选择文件"}), 400
if not file.filename.endswith(('.xlsx', '.xls')):
return jsonify({"error": "只支持Excel文件(.xlsx, .xls)"}), 400
# 保存上传的文件
filename = secure_filename(file.filename)
upload_path = os.path.join('uploads', filename)
os.makedirs('uploads', exist_ok=True)
file.save(upload_path)
# 解析Excel文件
try:
df = pd.read_excel(upload_path)
imported_workorders = []
# 处理每一行数据
for index, row in df.iterrows():
# 根据Excel列名映射到工单字段
workorder = {
"id": len(assistant.work_orders) + index + 1, # 生成新ID
"order_id": f"WO{len(assistant.work_orders) + index + 1:06d}",
"title": str(row.get('标题', row.get('title', f'导入工单 {index + 1}'))),
"description": str(row.get('描述', row.get('description', ''))),
"category": str(row.get('分类', row.get('category', '技术问题'))),
"priority": str(row.get('优先级', row.get('priority', 'medium'))),
"status": str(row.get('状态', row.get('status', 'open'))),
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"resolution": str(row.get('解决方案', row.get('resolution', ''))) if pd.notna(row.get('解决方案', row.get('resolution'))) else None,
"satisfaction_score": int(row.get('满意度', row.get('satisfaction_score', 0))) if pd.notna(row.get('满意度', row.get('satisfaction_score'))) else None
}
# 添加到工单列表(这里应该保存到数据库)
assistant.work_orders.append(workorder)
imported_workorders.append(workorder)
# 清理上传的文件
os.remove(upload_path)
return jsonify({
"success": True,
"message": f"成功导入 {len(imported_workorders)} 个工单",
"imported_count": len(imported_workorders),
"workorders": imported_workorders
})
except Exception as e:
# 清理上传的文件
if os.path.exists(upload_path):
os.remove(upload_path)
return jsonify({"error": f"解析Excel文件失败: {str(e)}"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/workorders/import/template')
def download_import_template():
"""下载工单导入模板"""
try:
# 创建模板数据
template_data = {
'标题': ['车辆无法启动', '空调不制冷', '导航系统故障'],
'描述': ['用户反映车辆无法正常启动', '空调系统无法制冷', '导航系统显示异常'],
'分类': ['技术问题', '技术问题', '技术问题'],
'优先级': ['high', 'medium', 'low'],
'状态': ['open', 'in_progress', 'resolved'],
'解决方案': ['检查电池和启动系统', '检查制冷剂和压缩机', '更新导航软件'],
'满意度': [5, 4, 5]
}
df = pd.DataFrame(template_data)
# 保存为Excel文件
template_path = 'uploads/workorder_template.xlsx'
os.makedirs('uploads', exist_ok=True)
df.to_excel(template_path, index=False)
return jsonify({
"success": True,
"template_url": f"/uploads/workorder_template.xlsx"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/uploads/<filename>')
def uploaded_file(filename):
"""提供上传文件的下载服务"""
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
2025-09-06 21:06:18 +08:00
# 系统设置相关API
@app.route('/api/settings')
def get_settings():
"""获取系统设置"""
try:
settings = {
"api_timeout": 30,
"max_history": 10,
"refresh_interval": 10,
"auto_monitoring": True,
"agent_mode": True
}
return jsonify(settings)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/settings', methods=['POST'])
def save_settings():
"""保存系统设置"""
try:
data = request.get_json()
# 这里应该保存设置到配置文件
return jsonify({"success": True, "message": "设置保存成功"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/system/info')
def get_system_info():
"""获取系统信息"""
try:
import sys
import platform
info = {
"version": "1.0.0",
"python_version": sys.version,
"database": "SQLite",
"uptime": "2天3小时",
"memory_usage": 128
}
return jsonify(info)
except Exception as e:
return jsonify({"error": str(e)}), 500
# 车辆数据相关API
@app.route('/api/vehicle/data')
def get_vehicle_data():
"""获取车辆数据"""
try:
vehicle_id = request.args.get('vehicle_id')
data_type = request.args.get('data_type')
limit = request.args.get('limit', 10, type=int)
if vehicle_id:
data = vehicle_manager.get_vehicle_data(vehicle_id, data_type, limit)
else:
data = vehicle_manager.search_vehicle_data(limit=limit)
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/vehicle/data/<vehicle_id>/latest')
def get_latest_vehicle_data(vehicle_id):
"""获取车辆最新数据"""
try:
data = vehicle_manager.get_latest_vehicle_data(vehicle_id)
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/vehicle/data/<vehicle_id>/summary')
def get_vehicle_summary(vehicle_id):
"""获取车辆数据摘要"""
try:
summary = vehicle_manager.get_vehicle_summary(vehicle_id)
return jsonify(summary)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/vehicle/data', methods=['POST'])
def add_vehicle_data():
"""添加车辆数据"""
try:
data = request.get_json()
success = vehicle_manager.add_vehicle_data(
vehicle_id=data['vehicle_id'],
data_type=data['data_type'],
data_value=data['data_value'],
vehicle_vin=data.get('vehicle_vin')
)
return jsonify({"success": success, "message": "数据添加成功" if success else "添加失败"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/vehicle/init-sample-data', methods=['POST'])
def init_sample_vehicle_data():
"""初始化示例车辆数据"""
try:
success = vehicle_manager.add_sample_vehicle_data()
return jsonify({"success": success, "message": "示例数据初始化成功" if success else "初始化失败"})
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)