# -*- coding: utf-8 -*- """ 工单管理蓝图 处理工单相关的API路由 """ import os import pandas as pd from datetime import datetime from flask import Blueprint, request, jsonify, send_file from werkzeug.utils import secure_filename from sqlalchemy import text from src.main import TSPAssistant from src.core.database import db_manager from src.core.models import WorkOrder, Conversation, WorkOrderSuggestion, KnowledgeEntry from src.core.query_optimizer import query_optimizer workorders_bp = Blueprint('workorders', __name__, url_prefix='/api/workorders') def get_assistant(): """获取TSP助手实例(懒加载)""" global _assistant if '_assistant' not in globals(): _assistant = TSPAssistant() return _assistant def _ensure_workorder_template_file() -> str: """返回已有的模板xlsx路径;不做动态生成,避免运行时依赖问题""" template_path = os.path.join('uploads', 'workorder_template.xlsx') # 确保目录存在 os.makedirs('uploads', exist_ok=True) if not os.path.exists(template_path): # 如果运行目录不存在模板,尝试从项目根相对路径拷贝一份 repo_template = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'uploads', 'workorder_template.xlsx') repo_template = os.path.abspath(repo_template) try: if os.path.exists(repo_template): import shutil shutil.copyfile(repo_template, template_path) else: raise FileNotFoundError('模板文件缺失:uploads/workorder_template.xlsx') except Exception as copy_err: raise copy_err return template_path @workorders_bp.route('') def get_workorders(): """获取工单列表(优化版)""" try: status_filter = request.args.get('status', '') priority_filter = request.args.get('priority', '') # 使用优化后的查询 result = query_optimizer.get_workorders_optimized( status_filter=status_filter, priority_filter=priority_filter ) return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 500 @workorders_bp.route('', methods=['POST']) def create_workorder(): """创建工单""" try: data = request.get_json() result = get_assistant().create_work_order( title=data['title'], description=data['description'], category=data['category'], priority=data['priority'] ) # 清除工单相关缓存 from src.core.cache_manager import cache_manager cache_manager.clear() # 清除所有缓存 return jsonify({"success": True, "workorder": result}) except Exception as e: return jsonify({"error": str(e)}), 500 @workorders_bp.route('/') def get_workorder_details(workorder_id): """获取工单详情(含数据库对话记录)""" try: with db_manager.get_session() as session: w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first() if not w: return jsonify({"error": "工单不存在"}), 404 convs = session.query(Conversation).filter(Conversation.work_order_id == w.id).order_by(Conversation.timestamp.asc()).all() conv_list = [] for c in convs: conv_list.append({ "id": c.id, "user_message": c.user_message, "assistant_response": c.assistant_response, "timestamp": c.timestamp.isoformat() if c.timestamp else None }) # 在会话内构建工单数据 workorder = { "id": w.id, "order_id": w.order_id, "title": w.title, "description": w.description, "category": w.category, "priority": w.priority, "status": w.status, "created_at": w.created_at.isoformat() if w.created_at else None, "updated_at": w.updated_at.isoformat() if w.updated_at else None, "resolution": w.resolution, "satisfaction_score": w.satisfaction_score, "conversations": conv_list } return jsonify(workorder) except Exception as e: return jsonify({"error": str(e)}), 500 @workorders_bp.route('/', methods=['PUT']) def update_workorder(workorder_id): """更新工单(写入数据库)""" try: data = request.get_json() if not data.get('title') or not data.get('description'): return jsonify({"error": "标题和描述不能为空"}), 400 with db_manager.get_session() as session: w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first() if not w: return jsonify({"error": "工单不存在"}), 404 w.title = data.get('title', w.title) w.description = data.get('description', w.description) w.category = data.get('category', w.category) w.priority = data.get('priority', w.priority) w.status = data.get('status', w.status) w.resolution = data.get('resolution', w.resolution) w.satisfaction_score = data.get('satisfaction_score', w.satisfaction_score) w.updated_at = datetime.now() session.commit() # 清除工单相关缓存 from src.core.cache_manager import cache_manager cache_manager.clear() # 清除所有缓存 updated = { "id": w.id, "title": w.title, "description": w.description, "category": w.category, "priority": w.priority, "status": w.status, "resolution": w.resolution, "satisfaction_score": w.satisfaction_score, "updated_at": w.updated_at.isoformat() if w.updated_at else None } return jsonify({"success": True, "message": "工单更新成功", "workorder": updated}) except Exception as e: return jsonify({"error": str(e)}), 500 @workorders_bp.route('/', methods=['DELETE']) def delete_workorder(workorder_id): """删除工单""" try: with db_manager.get_session() as session: workorder = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first() if not workorder: return jsonify({"error": "工单不存在"}), 404 # 先删除所有相关的子记录(按外键依赖顺序) # 1. 删除工单建议记录 try: session.execute(text("DELETE FROM work_order_suggestions WHERE work_order_id = :id"), {"id": workorder_id}) except Exception as e: print(f"删除工单建议记录失败: {e}") # 2. 删除对话记录 session.query(Conversation).filter(Conversation.work_order_id == workorder_id).delete() # 3. 删除工单 session.delete(workorder) session.commit() # 清除工单相关缓存 from src.core.cache_manager import cache_manager cache_manager.clear() # 清除所有缓存 return jsonify({ "success": True, "message": "工单删除成功" }) except Exception as e: return jsonify({"error": str(e)}), 500 @workorders_bp.route('//ai-suggestion', methods=['POST']) def generate_workorder_ai_suggestion(workorder_id): """根据工单描述与知识库生成AI建议草稿""" try: with db_manager.get_session() as session: w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first() if not w: return jsonify({"error": "工单不存在"}), 404 # 调用知识库搜索与LLM生成 query = f"{w.title} {w.description}" kb_results = get_assistant().search_knowledge(query, top_k=3) kb_list = kb_results.get('results', []) if isinstance(kb_results, dict) else [] # 组装提示词 context = "\n".join([f"Q: {k.get('question','')}\nA: {k.get('answer','')}" for k in kb_list]) from src.core.llm_client import QwenClient llm = QwenClient() prompt = f"请基于以下工单描述与知识库片段,给出简洁、可执行的处理建议。\n工单描述:\n{w.description}\n\n知识库片段:\n{context}\n\n请直接输出建议文本:" llm_resp = llm.chat_completion(messages=[{"role":"user","content":prompt}], temperature=0.3, max_tokens=800) suggestion = "" if llm_resp and 'choices' in llm_resp: suggestion = llm_resp['choices'][0]['message']['content'] # 保存/更新草稿记录 rec = session.query(WorkOrderSuggestion).filter(WorkOrderSuggestion.work_order_id == w.id).first() if not rec: rec = WorkOrderSuggestion(work_order_id=w.id, ai_suggestion=suggestion) session.add(rec) else: rec.ai_suggestion = suggestion rec.updated_at = datetime.now() session.commit() return jsonify({"success": True, "ai_suggestion": suggestion}) except Exception as e: return jsonify({"error": str(e)}), 500 @workorders_bp.route('//human-resolution', methods=['POST']) def save_workorder_human_resolution(workorder_id): """保存人工描述,并计算与AI建议相似度;若≥95%可自动审批入库""" try: data = request.get_json() or {} human_text = data.get('human_resolution','').strip() if not human_text: return jsonify({"error":"人工描述不能为空"}), 400 with db_manager.get_session() as session: w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first() if not w: return jsonify({"error": "工单不存在"}), 404 rec = session.query(WorkOrderSuggestion).filter(WorkOrderSuggestion.work_order_id == w.id).first() if not rec: rec = WorkOrderSuggestion(work_order_id=w.id) session.add(rec) rec.human_resolution = human_text # 计算相似度(使用简单cosine TF-IDF,避免外部服务依赖) try: from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity texts = [rec.ai_suggestion or "", human_text] vec = TfidfVectorizer(max_features=1000) mat = vec.fit_transform(texts) sim = float(cosine_similarity(mat[0:1], mat[1:2])[0][0]) except Exception: sim = 0.0 rec.ai_similarity = sim # 自动审批条件≥0.95 approved = sim >= 0.95 rec.approved = approved session.commit() return jsonify({"success": True, "similarity": sim, "approved": approved}) except Exception as e: return jsonify({"error": str(e)}), 500 @workorders_bp.route('//approve-to-knowledge', methods=['POST']) def approve_workorder_to_knowledge(workorder_id): """将已审批的AI建议入库为知识条目""" try: with db_manager.get_session() as session: w = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first() if not w: return jsonify({"error": "工单不存在"}), 404 rec = session.query(WorkOrderSuggestion).filter(WorkOrderSuggestion.work_order_id == w.id).first() if not rec or not rec.approved or not rec.ai_suggestion: return jsonify({"error": "未找到可入库的已审批AI建议"}), 400 # 入库为知识条目(问=工单标题;答=AI建议;类目用工单分类) entry = KnowledgeEntry( question=w.title or (w.description[:20] if w.description else '工单问题'), answer=rec.ai_suggestion, category=w.category or '其他', confidence_score=0.95, is_active=True, is_verified=True, verified_by='auto_approve', verified_at=datetime.now() ) session.add(entry) session.commit() return jsonify({"success": True, "knowledge_id": entry.id}) except Exception as e: return jsonify({"error": str(e)}), 500 @workorders_bp.route('/import', methods=['POST']) def import_workorders(): """导入Excel工单文件""" try: # 检查是否有文件上传 if 'file' not in request.files: return jsonify({"error": "没有上传文件"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "没有选择文件"}), 400 if not file.filename.endswith(('.xlsx', '.xls')): return jsonify({"error": "只支持Excel文件(.xlsx, .xls)"}), 400 # 保存上传的文件 filename = secure_filename(file.filename) upload_path = os.path.join('uploads', filename) os.makedirs('uploads', exist_ok=True) file.save(upload_path) # 解析Excel文件 try: df = pd.read_excel(upload_path) imported_workorders = [] # 处理每一行数据 for index, row in df.iterrows(): # 根据Excel列名映射到工单字段 title = str(row.get('标题', row.get('title', f'导入工单 {index + 1}'))) description = str(row.get('描述', row.get('description', ''))) category = str(row.get('分类', row.get('category', '技术问题'))) priority = str(row.get('优先级', row.get('priority', 'medium'))) status = str(row.get('状态', row.get('status', 'open'))) # 验证必填字段 if not title or title.strip() == '': continue # 创建工单到数据库 with db_manager.get_session() as session: workorder = WorkOrder( title=title, description=description, category=category, priority=priority, status=status, created_at=datetime.now(), updated_at=datetime.now() ) # 处理可选字段 if pd.notna(row.get('解决方案', row.get('resolution'))): workorder.resolution = str(row.get('解决方案', row.get('resolution'))) if pd.notna(row.get('满意度', row.get('satisfaction_score'))): try: workorder.satisfaction_score = int(row.get('满意度', row.get('satisfaction_score'))) except (ValueError, TypeError): workorder.satisfaction_score = None session.add(workorder) session.commit() # 添加到返回列表 imported_workorders.append({ "id": workorder.id, "order_id": workorder.order_id, "title": workorder.title, "description": workorder.description, "category": workorder.category, "priority": workorder.priority, "status": workorder.status, "created_at": workorder.created_at.isoformat() if workorder.created_at else None, "updated_at": workorder.updated_at.isoformat() if workorder.updated_at else None, "resolution": workorder.resolution, "satisfaction_score": workorder.satisfaction_score }) # 清理上传的文件 os.remove(upload_path) return jsonify({ "success": True, "message": f"成功导入 {len(imported_workorders)} 个工单", "imported_count": len(imported_workorders), "workorders": imported_workorders }) except Exception as e: # 清理上传的文件 if os.path.exists(upload_path): os.remove(upload_path) return jsonify({"error": f"解析Excel文件失败: {str(e)}"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 @workorders_bp.route('/import/template') def download_import_template(): """下载工单导入模板""" try: template_path = _ensure_workorder_template_file() return jsonify({ "success": True, "template_url": f"/uploads/workorder_template.xlsx" }) except Exception as e: return jsonify({"error": str(e)}), 500 @workorders_bp.route('/import/template/file') def download_import_template_file(): """直接返回工单导入模板文件(下载)""" try: template_path = _ensure_workorder_template_file() return send_file(template_path, as_attachment=True, download_name='工单导入模板.xlsx') except Exception as e: return jsonify({"error": str(e)}), 500