Files
assist/src/web/blueprints/workorders.py

414 lines
18 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
工单管理蓝图
处理工单相关的API路由
"""
import os
import pandas as pd
from datetime import datetime
from flask import Blueprint, request, jsonify, send_file
from werkzeug.utils import secure_filename
from sqlalchemy import text
from src.main import TSPAssistant
from src.core.database import db_manager
from src.core.models import WorkOrder, Conversation, WorkOrderSuggestion, KnowledgeEntry
from src.core.query_optimizer import query_optimizer
workorders_bp = Blueprint('workorders', __name__, url_prefix='/api/workorders')
# 使用全局单例避免重复创建
_assistant = None
def get_assistant():
"""获取TSP助手实例懒加载"""
global _assistant
if _assistant is None:
_assistant = TSPAssistant()
return _assistant
def _ensure_workorder_template_file() -> str:
"""返回已有的模板xlsx路径不做动态生成避免运行时依赖问题"""
template_path = os.path.join('uploads', 'workorder_template.xlsx')
# 确保目录存在
os.makedirs('uploads', exist_ok=True)
if not os.path.exists(template_path):
# 如果运行目录不存在模板,尝试从项目根相对路径拷贝一份
repo_template = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'uploads', 'workorder_template.xlsx')
repo_template = os.path.abspath(repo_template)
try:
if os.path.exists(repo_template):
import shutil
shutil.copyfile(repo_template, template_path)
else:
raise FileNotFoundError('模板文件缺失uploads/workorder_template.xlsx')
except Exception as copy_err:
raise copy_err
return template_path
@workorders_bp.route('')
def get_workorders():
"""获取工单列表(优化版)"""
try:
status_filter = request.args.get('status', '')
priority_filter = request.args.get('priority', '')
# 使用优化后的查询
result = query_optimizer.get_workorders_optimized(
status_filter=status_filter, priority_filter=priority_filter
)
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@workorders_bp.route('', methods=['POST'])
def create_workorder():
"""创建工单"""
try:
data = request.get_json()
result = get_assistant().create_work_order(
title=data['title'],
description=data['description'],
category=data['category'],
priority=data['priority']
)
# 清除工单相关缓存
from src.core.cache_manager import cache_manager
cache_manager.clear() # 清除所有缓存
return jsonify({"success": True, "workorder": result})
except Exception as e:
return jsonify({"error": str(e)}), 500
@workorders_bp.route('/<int:workorder_id>')
def get_workorder_details(workorder_id):
"""获取工单详情(含数据库对话记录)"""
try:
with db_manager.get_session() as session:
w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first()
if not w:
return jsonify({"error": "工单不存在"}), 404
convs = session.query(Conversation).filter(Conversation.work_order_id == w.id).order_by(Conversation.timestamp.asc()).all()
conv_list = []
for c in convs:
conv_list.append({
"id": c.id,
"user_message": c.user_message,
"assistant_response": c.assistant_response,
"timestamp": c.timestamp.isoformat() if c.timestamp else None
})
# 在会话内构建工单数据
workorder = {
"id": w.id,
"order_id": w.order_id,
"title": w.title,
"description": w.description,
"category": w.category,
"priority": w.priority,
"status": w.status,
"created_at": w.created_at.isoformat() if w.created_at else None,
"updated_at": w.updated_at.isoformat() if w.updated_at else None,
"resolution": w.resolution,
"satisfaction_score": w.satisfaction_score,
"conversations": conv_list
}
return jsonify(workorder)
except Exception as e:
return jsonify({"error": str(e)}), 500
@workorders_bp.route('/<int:workorder_id>', methods=['PUT'])
def update_workorder(workorder_id):
"""更新工单(写入数据库)"""
try:
data = request.get_json()
if not data.get('title') or not data.get('description'):
return jsonify({"error": "标题和描述不能为空"}), 400
with db_manager.get_session() as session:
w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first()
if not w:
return jsonify({"error": "工单不存在"}), 404
w.title = data.get('title', w.title)
w.description = data.get('description', w.description)
w.category = data.get('category', w.category)
w.priority = data.get('priority', w.priority)
w.status = data.get('status', w.status)
w.resolution = data.get('resolution', w.resolution)
w.satisfaction_score = data.get('satisfaction_score', w.satisfaction_score)
w.updated_at = datetime.now()
session.commit()
# 清除工单相关缓存
from src.core.cache_manager import cache_manager
cache_manager.clear() # 清除所有缓存
updated = {
"id": w.id,
"title": w.title,
"description": w.description,
"category": w.category,
"priority": w.priority,
"status": w.status,
"resolution": w.resolution,
"satisfaction_score": w.satisfaction_score,
"updated_at": w.updated_at.isoformat() if w.updated_at else None
}
return jsonify({"success": True, "message": "工单更新成功", "workorder": updated})
except Exception as e:
return jsonify({"error": str(e)}), 500
@workorders_bp.route('/<int:workorder_id>', methods=['DELETE'])
def delete_workorder(workorder_id):
"""删除工单"""
try:
with db_manager.get_session() as session:
workorder = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first()
if not workorder:
return jsonify({"error": "工单不存在"}), 404
# 先删除所有相关的子记录(按外键依赖顺序)
# 1. 删除工单建议记录
try:
session.execute(text("DELETE FROM work_order_suggestions WHERE work_order_id = :id"), {"id": workorder_id})
except Exception as e:
print(f"删除工单建议记录失败: {e}")
# 2. 删除对话记录
session.query(Conversation).filter(Conversation.work_order_id == workorder_id).delete()
# 3. 删除工单
session.delete(workorder)
session.commit()
# 清除工单相关缓存
from src.core.cache_manager import cache_manager
cache_manager.clear() # 清除所有缓存
return jsonify({
"success": True,
"message": "工单删除成功"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@workorders_bp.route('/<int:workorder_id>/ai-suggestion', methods=['POST'])
def generate_workorder_ai_suggestion(workorder_id):
"""根据工单描述与知识库生成AI建议草稿"""
try:
with db_manager.get_session() as session:
w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first()
if not w:
return jsonify({"error": "工单不存在"}), 404
# 调用知识库搜索与LLM生成
query = f"{w.title} {w.description}"
kb_results = get_assistant().search_knowledge(query, top_k=3)
kb_list = kb_results.get('results', []) if isinstance(kb_results, dict) else []
# 组装提示词
context = "\n".join([f"Q: {k.get('question','')}\nA: {k.get('answer','')}" for k in kb_list])
from src.core.llm_client import QwenClient
from ...core.component_singletons import component_singletons
llm = component_singletons.get_llm_client()
prompt = f"请基于以下工单描述与知识库片段,给出简洁、可执行的处理建议。\n工单描述:\n{w.description}\n\n知识库片段:\n{context}\n\n请直接输出建议文本:"
llm_resp = llm.chat_completion(messages=[{"role":"user","content":prompt}], temperature=0.3, max_tokens=800)
suggestion = ""
if llm_resp and 'choices' in llm_resp:
suggestion = llm_resp['choices'][0]['message']['content']
# 保存/更新草稿记录
rec = session.query(WorkOrderSuggestion).filter(WorkOrderSuggestion.work_order_id == w.id).first()
if not rec:
rec = WorkOrderSuggestion(work_order_id=w.id, ai_suggestion=suggestion)
session.add(rec)
else:
rec.ai_suggestion = suggestion
rec.updated_at = datetime.now()
session.commit()
return jsonify({"success": True, "ai_suggestion": suggestion})
except Exception as e:
return jsonify({"error": str(e)}), 500
@workorders_bp.route('/<int:workorder_id>/human-resolution', methods=['POST'])
def save_workorder_human_resolution(workorder_id):
"""保存人工描述并计算与AI建议相似度若≥95%可自动审批入库"""
try:
data = request.get_json() or {}
human_text = data.get('human_resolution','').strip()
if not human_text:
return jsonify({"error":"人工描述不能为空"}), 400
with db_manager.get_session() as session:
w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first()
if not w:
return jsonify({"error": "工单不存在"}), 404
rec = session.query(WorkOrderSuggestion).filter(WorkOrderSuggestion.work_order_id == w.id).first()
if not rec:
rec = WorkOrderSuggestion(work_order_id=w.id)
session.add(rec)
rec.human_resolution = human_text
# 计算相似度使用简单cosine TF-IDF避免外部服务依赖
try:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
texts = [rec.ai_suggestion or "", human_text]
vec = TfidfVectorizer(max_features=1000)
mat = vec.fit_transform(texts)
sim = float(cosine_similarity(mat[0:1], mat[1:2])[0][0])
except Exception:
sim = 0.0
rec.ai_similarity = sim
# 自动审批条件≥0.95
approved = sim >= 0.95
rec.approved = approved
session.commit()
return jsonify({"success": True, "similarity": sim, "approved": approved})
except Exception as e:
return jsonify({"error": str(e)}), 500
@workorders_bp.route('/<int:workorder_id>/approve-to-knowledge', methods=['POST'])
def approve_workorder_to_knowledge(workorder_id):
"""将已审批的AI建议入库为知识条目"""
try:
with db_manager.get_session() as session:
w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first()
if not w:
return jsonify({"error": "工单不存在"}), 404
rec = session.query(WorkOrderSuggestion).filter(WorkOrderSuggestion.work_order_id == w.id).first()
if not rec or not rec.approved or not rec.ai_suggestion:
return jsonify({"error": "未找到可入库的已审批AI建议"}), 400
# 入库为知识条目(问=工单标题;答=AI建议类目用工单分类
entry = KnowledgeEntry(
question=w.title or (w.description[:20] if w.description else '工单问题'),
answer=rec.ai_suggestion,
category=w.category or '其他',
confidence_score=0.95,
is_active=True,
is_verified=True,
verified_by='auto_approve',
verified_at=datetime.now()
)
session.add(entry)
session.commit()
return jsonify({"success": True, "knowledge_id": entry.id})
except Exception as e:
return jsonify({"error": str(e)}), 500
@workorders_bp.route('/import', methods=['POST'])
def import_workorders():
"""导入Excel工单文件"""
try:
# 检查是否有文件上传
if 'file' not in request.files:
return jsonify({"error": "没有上传文件"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "没有选择文件"}), 400
if not file.filename.endswith(('.xlsx', '.xls')):
return jsonify({"error": "只支持Excel文件(.xlsx, .xls)"}), 400
# 保存上传的文件
filename = secure_filename(file.filename)
upload_path = os.path.join('uploads', filename)
os.makedirs('uploads', exist_ok=True)
file.save(upload_path)
# 解析Excel文件
try:
df = pd.read_excel(upload_path)
imported_workorders = []
# 处理每一行数据
for index, row in df.iterrows():
# 根据Excel列名映射到工单字段
title = str(row.get('标题', row.get('title', f'导入工单 {index + 1}')))
description = str(row.get('描述', row.get('description', '')))
category = str(row.get('分类', row.get('category', '技术问题')))
priority = str(row.get('优先级', row.get('priority', 'medium')))
status = str(row.get('状态', row.get('status', 'open')))
# 验证必填字段
if not title or title.strip() == '':
continue
# 创建工单到数据库
with db_manager.get_session() as session:
workorder = WorkOrder(
title=title,
description=description,
category=category,
priority=priority,
status=status,
created_at=datetime.now(),
updated_at=datetime.now()
)
# 处理可选字段
if pd.notna(row.get('解决方案', row.get('resolution'))):
workorder.resolution = str(row.get('解决方案', row.get('resolution')))
if pd.notna(row.get('满意度', row.get('satisfaction_score'))):
try:
workorder.satisfaction_score = int(row.get('满意度', row.get('satisfaction_score')))
except (ValueError, TypeError):
workorder.satisfaction_score = None
session.add(workorder)
session.commit()
# 添加到返回列表
imported_workorders.append({
"id": workorder.id,
"order_id": workorder.order_id,
"title": workorder.title,
"description": workorder.description,
"category": workorder.category,
"priority": workorder.priority,
"status": workorder.status,
"created_at": workorder.created_at.isoformat() if workorder.created_at else None,
"updated_at": workorder.updated_at.isoformat() if workorder.updated_at else None,
"resolution": workorder.resolution,
"satisfaction_score": workorder.satisfaction_score
})
# 清理上传的文件
os.remove(upload_path)
return jsonify({
"success": True,
"message": f"成功导入 {len(imported_workorders)} 个工单",
"imported_count": len(imported_workorders),
"workorders": imported_workorders
})
except Exception as e:
# 清理上传的文件
if os.path.exists(upload_path):
os.remove(upload_path)
return jsonify({"error": f"解析Excel文件失败: {str(e)}"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
@workorders_bp.route('/import/template')
def download_import_template():
"""下载工单导入模板"""
try:
template_path = _ensure_workorder_template_file()
return jsonify({
"success": True,
"template_url": f"/uploads/workorder_template.xlsx"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@workorders_bp.route('/import/template/file')
def download_import_template_file():
"""直接返回工单导入模板文件(下载)"""
try:
template_path = _ensure_workorder_template_file()
return send_file(template_path, as_attachment=True, download_name='工单导入模板.xlsx')
except Exception as e:
return jsonify({"error": str(e)}), 500