From 3e1ecf25490616c203adda6e5371a69384d65349 Mon Sep 17 00:00:00 2001 From: Jeason <1710884619@qq.com> Date: Mon, 20 Apr 2026 09:50:35 +0800 Subject: [PATCH] =?UTF-8?q?=E5=89=8D=E5=90=8E=E7=AB=AF=E9=A1=B5=E9=9D=A2?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E7=AD=96=E7=95=A5=EF=BC=8C=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=88=86=E6=9E=90=E6=A8=A1=E6=9D=BF=E7=83=AD=E7=BC=96=E8=BE=91?= =?UTF-8?q?=E4=BB=A5=E5=8F=8Ayaml=E9=85=8D=E7=BD=AE=EF=BC=8C=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E6=8F=90=E7=A4=BA=E8=AF=8D=E7=BC=96=E7=A0=81=EF=BC=8C?= =?UTF-8?q?=E5=8D=A0=E7=94=A8=E7=AC=A6=E7=AD=89=E9=97=AE=E9=A2=98=EF=BC=8C?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=96=87=E4=BB=B6=E6=89=AB=E6=8F=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cleaned_data/.gitkeep | 0 config/templates/anomaly_detection.yaml | 22 ++ config/templates/comparison.yaml | 18 ++ config/templates/health_report.yaml | 50 ++++ config/templates/trend_analysis.yaml | 22 ++ data_analysis_agent.py | 47 ++- prompts.py | 3 + tests/test_properties.py | 2 +- utils/analysis_templates.py | 364 ++++++++---------------- utils/code_executor.py | 54 +++- utils/llm_helper.py | 14 + utils/script_generator.py | 65 +++++ web/main.py | 157 ++++++++-- web/static/script.js | 8 +- 14 files changed, 539 insertions(+), 287 deletions(-) delete mode 100644 cleaned_data/.gitkeep create mode 100644 config/templates/anomaly_detection.yaml create mode 100644 config/templates/comparison.yaml create mode 100644 config/templates/health_report.yaml create mode 100644 config/templates/trend_analysis.yaml diff --git a/cleaned_data/.gitkeep b/cleaned_data/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/config/templates/anomaly_detection.yaml b/config/templates/anomaly_detection.yaml new file mode 100644 index 0000000..0c1e8db --- /dev/null +++ b/config/templates/anomaly_detection.yaml @@ -0,0 +1,22 @@ +name: 异常值检测分析 +description: 识别数据中的异常值和离群点 +steps: + - name: 数值列统计分析 + description: 计算数值列的统计特征 + prompt: 计算所有数值列的均值、标准差、四分位数等统计量 + + - name: 箱线图可视化 + description: 使用箱线图识别异常值 + prompt: 为每个数值列绘制箱线图,直观展示异常值分布 + + - name: Z-Score异常检测 + description: 使用Z-Score方法检测异常值 + prompt: 计算每个数值的Z-Score,标记|Z|>3的异常值 + + - name: IQR异常检测 + description: 使用四分位距方法检测异常值 + prompt: 使用IQR方法(Q1-1.5*IQR, Q3+1.5*IQR)检测异常值 + + - name: 异常值汇总报告 + description: 整理所有检测到的异常值 + prompt: 汇总所有异常值,分析其特征和可能原因,提供处理建议 diff --git a/config/templates/comparison.yaml b/config/templates/comparison.yaml new file mode 100644 index 0000000..defd231 --- /dev/null +++ b/config/templates/comparison.yaml @@ -0,0 +1,18 @@ +name: 分组对比分析 +description: 对比不同分组之间的差异和特征 +steps: + - name: 分组统计 + description: 计算各组的统计指标 + prompt: 按分组列分组,计算数值列的均值、中位数、标准差 + + - name: 分组可视化对比 + description: 绘制对比图表 + prompt: 绘制各组的柱状图和箱线图,直观对比差异 + + - name: 差异显著性检验 + description: 统计检验组间差异 + prompt: 进行t检验或方差分析,判断组间差异是否显著 + + - name: 对比结论 + description: 总结对比结果 + prompt: 总结各组特征、主要差异和业务洞察 diff --git a/config/templates/health_report.yaml b/config/templates/health_report.yaml new file mode 100644 index 0000000..fed3ab3 --- /dev/null +++ b/config/templates/health_report.yaml @@ -0,0 +1,50 @@ +name: 车联网工单健康度报告 +description: 全面分析车联网技术支持工单的健康状况,从多个维度评估工单处理效率和质量 +steps: + - name: 数据概览与质量检查 + description: 检查数据完整性、缺失值、异常值等 + prompt: 加载数据并进行质量检查,输出数据概况和潜在问题 + + - name: 工单总量分析 + description: 统计总工单数、时间分布、趋势变化 + prompt: 计算总工单数,按时间维度统计工单量,绘制时间序列趋势图 + + - name: 车型维度分析 + description: 分析不同车型的工单分布和问题特征 + prompt: 统计各车型工单数量,绘制车型分布图,识别高风险车型 + + - name: 模块维度分析 + description: 分析工单涉及的技术模块分布 + prompt: 统计各技术模块的工单量,绘制模块分布图,识别高频问题模块 + + - name: 功能维度分析 + description: 分析具体功能点的问题分布 + prompt: 统计各功能的工单量,绘制TOP功能问题排行,分析功能稳定性 + + - name: 问题严重程度分析 + description: 分析工单的严重程度分布 + prompt: 统计不同严重程度的工单比例,绘制严重程度分布图 + + - name: 处理时长分析 + description: 分析工单处理时效性 + prompt: 计算平均处理时长、SLA达成率,识别超时工单,绘制时长分布图 + + - name: 责任人工作负载分析 + description: 分析各责任人的工单负载和处理效率 + prompt: 统计各责任人的工单数和处理效率,绘制负载分布图 + + - name: 来源渠道分析 + description: 分析工单来源渠道分布 + prompt: 统计各来源渠道的工单量,绘制渠道分布图 + + - name: 高频问题深度分析 + description: 识别并深入分析高频问题 + prompt: 提取TOP10高频问题,分析问题原因、影响范围和解决方案 + + - name: 综合健康度评分 + description: 基于多个维度计算综合健康度评分 + prompt: 综合考虑工单量、处理时长、问题严重度等指标,计算健康度评分 + + - name: 生成最终报告 + description: 整合所有分析结果,生成完整报告 + prompt: 整合所有图表和分析结论,生成一份完整的车联网工单健康度报告 diff --git a/config/templates/trend_analysis.yaml b/config/templates/trend_analysis.yaml new file mode 100644 index 0000000..401b178 --- /dev/null +++ b/config/templates/trend_analysis.yaml @@ -0,0 +1,22 @@ +name: 时间序列趋势分析 +description: 分析数据的时间趋势、季节性和周期性特征 +steps: + - name: 时间序列数据准备 + description: 将数据转换为时间序列格式 + prompt: 将时间列转换为日期格式,按时间排序数据 + + - name: 趋势可视化 + description: 绘制时间序列图 + prompt: 绘制数值随时间的变化趋势图,添加移动平均线 + + - name: 趋势分析 + description: 识别上升、下降或平稳趋势 + prompt: 计算趋势线斜率,判断整体趋势方向和变化速率 + + - name: 季节性分析 + description: 检测季节性模式 + prompt: 分析月度、季度等周期性模式,绘制季节性分解图 + + - name: 异常点检测 + description: 识别时间序列中的异常点 + prompt: 使用统计方法检测时间序列中的异常值,标注在图表上 diff --git a/data_analysis_agent.py b/data_analysis_agent.py index 4778e4f..017847a 100644 --- a/data_analysis_agent.py +++ b/data_analysis_agent.py @@ -352,9 +352,13 @@ class DataAnalysisAgent: def _compress_trimmed_messages(self, messages: list) -> str: """Compress trimmed messages into a concise summary string. - Extracts the action type from each assistant message and the execution - outcome (success / failure) from the subsequent user feedback message. - Code blocks and raw execution output are excluded. + Extracts the action type from each assistant message, the execution + outcome (success / failure), and completed SOP stages from the + subsequent user feedback message. Code blocks and raw execution + output are excluded. + + The summary explicitly lists completed SOP stages so the LLM does + not restart from stage 1 after conversation trimming. Args: messages: List of conversation message dicts to compress. @@ -364,6 +368,17 @@ class DataAnalysisAgent: """ summary_parts = ["[分析摘要] 以下是之前分析轮次的概要:"] round_num = 0 + completed_stages = set() + + # SOP stage keywords to detect from assistant messages + stage_keywords = { + "阶段1": "数据探索与加载", + "阶段2": "基础分布分析", + "阶段3": "时序与来源分析", + "阶段4": "深度交叉分析", + "阶段5": "效率分析", + "阶段6": "高级挖掘", + } for msg in messages: content = msg["content"] @@ -375,12 +390,27 @@ class DataAnalysisAgent: action = "collect_figures" elif "action: \"analysis_complete\"" in content or "action: analysis_complete" in content: action = "analysis_complete" + + # Detect completed SOP stages + for stage_key, stage_name in stage_keywords.items(): + if stage_key in content or stage_name in content: + completed_stages.add(f"{stage_key}: {stage_name}") + summary_parts.append(f"- 轮次{round_num}: 动作={action}") elif msg["role"] == "user" and "代码执行反馈" in content: success = "失败" if "[ERROR]" in content or "执行错误" in content else "成功" if summary_parts and summary_parts[-1].startswith("- 轮次"): summary_parts[-1] += f", 执行结果={success}" + # Append completed stages so the LLM knows where to continue + if completed_stages: + summary_parts.append("") + summary_parts.append("**已完成的SOP阶段** (请勿重复执行):") + for stage in sorted(completed_stages): + summary_parts.append(f" ✓ {stage}") + summary_parts.append("") + summary_parts.append("请从下一个未完成的阶段继续,不要重新执行已完成的阶段。") + return "\n".join(summary_parts) def _profile_files_parallel(self, file_paths: list) -> tuple: @@ -948,6 +978,17 @@ class DataAnalysisAgent: - 注意:必须使用实际生成的图片文件名,严禁使用占位符 """ + # Append actual data files list so the LLM uses real filenames in the report + if self._session_ref and self._session_ref.data_files: + data_files_summary = "\n**已生成的数据文件列表** (请在报告中使用这些实际文件名,替换模板中的占位文件名如 [4-1TSP问题聚类.xlsx]):\n" + for df_meta in self._session_ref.data_files: + fname = df_meta.get("filename", "") + desc = df_meta.get("description", "") + rows = df_meta.get("rows", 0) + data_files_summary += f"- {fname} ({rows}行): {desc}\n" + data_files_summary += "\n注意:报告模板中的 `[4-1TSP问题聚类.xlsx]` 等占位文件名必须替换为上述实际文件名。如果某类聚类文件未生成,请说明原因(如数据量不足或该分类不适用),不要保留占位符。\n" + prompt += data_files_summary + return prompt def reset(self): diff --git a/prompts.py b/prompts.py index f43f4df..b0face3 100644 --- a/prompts.py +++ b/prompts.py @@ -89,6 +89,8 @@ jupyter notebook环境当前变量: - 生成 `模块_严重程度堆叠图.png` (Stacked Bar) **阶段5:效率分析** +- **必做:按一级分类分组统计**:对每个一级分类(如TSP、APP、DK、咨询等)分别计算工单数量、平均时长、中位数时长,输出汇总表并保存为CSV。 + 示例代码:`df.groupby('一级分类')['解决时长/h'].agg(['count','mean','median'])` - 生成 `处理时长分布.png` (直方图) - 生成 `责任人效率分析.png` (散点图: 工单量 vs 平均时长) @@ -179,6 +181,7 @@ final_report_system_prompt = """你是一位**资深数据分析专家 (Senior D ### 报告结构模板使用说明 (Template Instructions) - **固定格式 (Format)**:所有的 Markdown 标题 (`#`, `##`)、列表项前缀 (`- **...**`)、表格表头是必须保留的**骨架**。 - **写作指引 (Prompts)**:方括号 `[...]` 内的文字是给你的**写作提示**,请根据实际分析将其**替换**为具体内容,**不要**在最终报告中保留方括号。 +- **数据文件引用规则**:模板中的 `[4-1TSP问题聚类.xlsx]` 等占位文件名**必须替换**为实际生成的文件名(见下方传入的已生成数据文件列表)。如果某类文件未生成,请注明原因(如"数据量不足,未执行聚类"或"该分类无对应数据"),不要保留占位符。 - **直接输出Markdown**:不要使用JSON或YAML包裹,直接输出Markdown内容。 --- diff --git a/tests/test_properties.py b/tests/test_properties.py index d709aa2..58b18c0 100644 --- a/tests/test_properties.py +++ b/tests/test_properties.py @@ -248,7 +248,7 @@ def test_prop7_template_prompt_prepended(name): template = get_template(name) prompt = template.get_full_prompt() assert len(prompt) > 0 - assert template.name in prompt + assert template.display_name in prompt # =========================================================================== diff --git a/utils/analysis_templates.py b/utils/analysis_templates.py index a306521..94a0427 100644 --- a/utils/analysis_templates.py +++ b/utils/analysis_templates.py @@ -1,289 +1,153 @@ # -*- coding: utf-8 -*- """ -分析模板系统 - 提供预定义的分析场景 +分析模板系统 - 从 config/templates/*.yaml 加载模板 + +模板文件格式: + name: 模板显示名称 + description: 模板描述 + steps: + - name: 步骤名称 + description: 步骤描述 + prompt: 给LLM的指令 """ -from abc import ABC, abstractmethod +import os +import glob +import yaml from typing import List, Dict, Any from dataclasses import dataclass +TEMPLATES_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "templates") + @dataclass class AnalysisStep: """分析步骤""" name: str description: str - analysis_type: str # explore, visualize, calculate, report prompt: str -class AnalysisTemplate(ABC): - """分析模板基类""" - - def __init__(self, name: str, description: str): +class AnalysisTemplate: + """从 YAML 文件加载的分析模板""" + + def __init__(self, name: str, display_name: str, description: str, steps: List[AnalysisStep], filepath: str = ""): self.name = name + self.display_name = display_name self.description = description - self.steps: List[AnalysisStep] = [] - - @abstractmethod - def build_steps(self, **kwargs) -> List[AnalysisStep]: - """构建分析步骤""" - pass - - def get_full_prompt(self, **kwargs) -> str: - """获取完整的分析提示词""" - steps = self.build_steps(**kwargs) - - prompt = f"# {self.name}\n\n{self.description}\n\n" + self.steps = steps + self.filepath = filepath + + def get_full_prompt(self) -> str: + prompt = f"# {self.display_name}\n\n{self.description}\n\n" prompt += "## 分析步骤:\n\n" - - for i, step in enumerate(steps, 1): + for i, step in enumerate(self.steps, 1): prompt += f"### {i}. {step.name}\n" prompt += f"{step.description}\n\n" prompt += f"```\n{step.prompt}\n```\n\n" - return prompt - -class HealthReportTemplate(AnalysisTemplate): - """健康度报告模板 - 专门用于车联网工单健康度分析""" - - def __init__(self): - super().__init__( - name="车联网工单健康度报告", - description="全面分析车联网技术支持工单的健康状况,从多个维度评估工单处理效率和质量" - ) - - def build_steps(self, **kwargs) -> List[AnalysisStep]: - """构建健康度报告的分析步骤""" - return [ - AnalysisStep( - name="数据概览与质量检查", - description="检查数据完整性、缺失值、异常值等", - analysis_type="explore", - prompt="加载数据并进行质量检查,输出数据概况和潜在问题" - ), - AnalysisStep( - name="工单总量分析", - description="统计总工单数、时间分布、趋势变化", - analysis_type="calculate", - prompt="计算总工单数,按时间维度统计工单量,绘制时间序列趋势图" - ), - AnalysisStep( - name="车型维度分析", - description="分析不同车型的工单分布和问题特征", - analysis_type="visualize", - prompt="统计各车型工单数量,绘制车型分布饼图和柱状图,识别高风险车型" - ), - AnalysisStep( - name="模块维度分析", - description="分析工单涉及的技术模块分布", - analysis_type="visualize", - prompt="统计各技术模块的工单量,绘制模块分布图,识别高频问题模块" - ), - AnalysisStep( - name="功能维度分析", - description="分析具体功能点的问题分布", - analysis_type="visualize", - prompt="统计各功能的工单量,绘制TOP功能问题排行,分析功能稳定性" - ), - AnalysisStep( - name="问题严重程度分析", - description="分析工单的严重程度分布", - analysis_type="visualize", - prompt="统计不同严重程度的工单比例,绘制严重程度分布图" - ), - AnalysisStep( - name="处理时长分析", - description="分析工单处理时效性", - analysis_type="calculate", - prompt="计算平均处理时长、SLA达成率,识别超时工单,绘制时长分布图" - ), - AnalysisStep( - name="责任人工作负载分析", - description="分析各责任人的工单负载和处理效率", - analysis_type="visualize", - prompt="统计各责任人的工单数和处理效率,绘制负载分布图,识别超负荷人员" - ), - AnalysisStep( - name="来源渠道分析", - description="分析工单来源渠道分布", - analysis_type="visualize", - prompt="统计各来源渠道的工单量,绘制渠道分布图" - ), - AnalysisStep( - name="高频问题深度分析", - description="识别并深入分析高频问题", - analysis_type="explore", - prompt="提取TOP10高频问题,分析问题原因、影响范围和解决方案" - ), - AnalysisStep( - name="综合健康度评分", - description="基于多个维度计算综合健康度评分", - analysis_type="calculate", - prompt="综合考虑工单量、处理时长、问题严重度等指标,计算健康度评分" - ), - AnalysisStep( - name="生成最终报告", - description="整合所有分析结果,生成完整报告", - analysis_type="report", - prompt="整合所有图表和分析结论,生成一份完整的车联网工单健康度报告" - ) - ] + def to_dict(self) -> Dict[str, Any]: + return { + "name": self.name, + "display_name": self.display_name, + "description": self.description, + "steps": [{"name": s.name, "description": s.description, "prompt": s.prompt} for s in self.steps], + } -class TrendAnalysisTemplate(AnalysisTemplate): - """趋势分析模板""" - - def __init__(self): - super().__init__( - name="时间序列趋势分析", - description="分析数据的时间趋势、季节性和周期性特征" - ) - - def build_steps(self, time_column: str = "日期", value_column: str = "数值", **kwargs) -> List[AnalysisStep]: - return [ - AnalysisStep( - name="时间序列数据准备", - description="将数据转换为时间序列格式", - analysis_type="explore", - prompt=f"将 '{time_column}' 列转换为日期格式,按时间排序数据" - ), - AnalysisStep( - name="趋势可视化", - description="绘制时间序列图", - analysis_type="visualize", - prompt=f"绘制 '{value_column}' 随 '{time_column}' 的变化趋势图,添加移动平均线" - ), - AnalysisStep( - name="趋势分析", - description="识别上升、下降或平稳趋势", - analysis_type="calculate", - prompt="计算趋势线斜率,判断整体趋势方向和变化速率" - ), - AnalysisStep( - name="季节性分析", - description="检测季节性模式", - analysis_type="visualize", - prompt="分析月度、季度等周期性模式,绘制季节性分解图" - ), - AnalysisStep( - name="异常点检测", - description="识别时间序列中的异常点", - analysis_type="calculate", - prompt="使用统计方法检测时间序列中的异常值,标注在图表上" - ) - ] +def _load_template_from_file(filepath: str) -> AnalysisTemplate: + """从单个 YAML 文件加载模板""" + with open(filepath, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + template_name = os.path.splitext(os.path.basename(filepath))[0] + steps = [] + for s in data.get("steps", []): + steps.append(AnalysisStep( + name=s.get("name", ""), + description=s.get("description", ""), + prompt=s.get("prompt", ""), + )) + + return AnalysisTemplate( + name=template_name, + display_name=data.get("name", template_name), + description=data.get("description", ""), + steps=steps, + filepath=filepath, + ) -class AnomalyDetectionTemplate(AnalysisTemplate): - """异常检测模板""" - - def __init__(self): - super().__init__( - name="异常值检测分析", - description="识别数据中的异常值和离群点" - ) - - def build_steps(self, **kwargs) -> List[AnalysisStep]: - return [ - AnalysisStep( - name="数值列统计分析", - description="计算数值列的统计特征", - analysis_type="calculate", - prompt="计算所有数值列的均值、标准差、四分位数等统计量" - ), - AnalysisStep( - name="箱线图可视化", - description="使用箱线图识别异常值", - analysis_type="visualize", - prompt="为每个数值列绘制箱线图,直观展示异常值分布" - ), - AnalysisStep( - name="Z-Score异常检测", - description="使用Z-Score方法检测异常值", - analysis_type="calculate", - prompt="计算每个数值的Z-Score,标记|Z|>3的异常值" - ), - AnalysisStep( - name="IQR异常检测", - description="使用四分位距方法检测异常值", - analysis_type="calculate", - prompt="使用IQR方法(Q1-1.5*IQR, Q3+1.5*IQR)检测异常值" - ), - AnalysisStep( - name="异常值汇总报告", - description="整理所有检测到的异常值", - analysis_type="report", - prompt="汇总所有异常值,分析其特征和可能原因,提供处理建议" - ) - ] +def _scan_templates() -> Dict[str, AnalysisTemplate]: + """扫描 config/templates/ 目录加载所有模板""" + registry = {} + if not os.path.exists(TEMPLATES_DIR): + os.makedirs(TEMPLATES_DIR, exist_ok=True) + return registry + + for fpath in sorted(glob.glob(os.path.join(TEMPLATES_DIR, "*.yaml"))): + try: + tpl = _load_template_from_file(fpath) + registry[tpl.name] = tpl + except Exception as e: + print(f"[WARN] 加载模板失败 {fpath}: {e}") + return registry -class ComparisonAnalysisTemplate(AnalysisTemplate): - """对比分析模板""" - - def __init__(self): - super().__init__( - name="分组对比分析", - description="对比不同分组之间的差异和特征" - ) - - def build_steps(self, group_column: str = "分组", value_column: str = "数值", **kwargs) -> List[AnalysisStep]: - return [ - AnalysisStep( - name="分组统计", - description="计算各组的统计指标", - analysis_type="calculate", - prompt=f"按 '{group_column}' 分组,计算 '{value_column}' 的均值、中位数、标准差" - ), - AnalysisStep( - name="分组可视化对比", - description="绘制对比图表", - analysis_type="visualize", - prompt=f"绘制各组的柱状图和箱线图,直观对比差异" - ), - AnalysisStep( - name="差异显著性检验", - description="统计检验组间差异", - analysis_type="calculate", - prompt="进行t检验或方差分析,判断组间差异是否显著" - ), - AnalysisStep( - name="对比结论", - description="总结对比结果", - analysis_type="report", - prompt="总结各组特征、主要差异和业务洞察" - ) - ] +# Module-level registry, refreshed on each call to support hot-editing +def _get_registry() -> Dict[str, AnalysisTemplate]: + return _scan_templates() -# 模板注册表 -TEMPLATE_REGISTRY = { - "health_report": HealthReportTemplate, - "trend_analysis": TrendAnalysisTemplate, - "anomaly_detection": AnomalyDetectionTemplate, - "comparison": ComparisonAnalysisTemplate -} +# Keep TEMPLATE_REGISTRY as a lazy property for backward compatibility with tests +TEMPLATE_REGISTRY = _scan_templates() def get_template(template_name: str) -> AnalysisTemplate: - """获取分析模板""" - template_class = TEMPLATE_REGISTRY.get(template_name) - if template_class: - return template_class() - else: - raise ValueError(f"未找到模板: {template_name}。可用模板: {list(TEMPLATE_REGISTRY.keys())}") + """获取分析模板(每次从磁盘重新加载以支持热编辑)""" + registry = _get_registry() + if template_name in registry: + return registry[template_name] + raise ValueError(f"未找到模板: {template_name}。可用模板: {list(registry.keys())}") def list_templates() -> List[Dict[str, str]]: """列出所有可用模板""" - templates = [] - for name, template_class in TEMPLATE_REGISTRY.items(): - template = template_class() - templates.append({ - "name": name, - "display_name": template.name, - "description": template.description - }) - return templates + registry = _get_registry() + return [ + {"name": tpl.name, "display_name": tpl.display_name, "description": tpl.description} + for tpl in registry.values() + ] + + +def save_template(template_name: str, data: Dict[str, Any]) -> str: + """保存或更新模板到 YAML 文件,返回文件路径""" + os.makedirs(TEMPLATES_DIR, exist_ok=True) + filepath = os.path.join(TEMPLATES_DIR, f"{template_name}.yaml") + + yaml_data = { + "name": data.get("display_name", data.get("name", template_name)), + "description": data.get("description", ""), + "steps": data.get("steps", []), + } + + with open(filepath, "w", encoding="utf-8") as f: + yaml.dump(yaml_data, f, allow_unicode=True, default_flow_style=False, sort_keys=False) + + # Refresh global registry + global TEMPLATE_REGISTRY + TEMPLATE_REGISTRY = _scan_templates() + + return filepath + + +def delete_template(template_name: str) -> bool: + """删除模板文件""" + filepath = os.path.join(TEMPLATES_DIR, f"{template_name}.yaml") + if os.path.exists(filepath): + os.remove(filepath) + global TEMPLATE_REGISTRY + TEMPLATE_REGISTRY = _scan_templates() + return True + return False diff --git a/utils/code_executor.py b/utils/code_executor.py index f8f85bf..d867d7e 100644 --- a/utils/code_executor.py +++ b/utils/code_executor.py @@ -92,12 +92,29 @@ class CodeExecutor: AUTO_EXPORT_MAX_ROWS = 50000 # Variable names to skip during DataFrame auto-export - # (common import aliases and built-in namespace names) + # (common import aliases, built-in namespace names, and typical + # temporary/intermediate variable names that shouldn't be persisted) _SKIP_EXPORT_NAMES = { + # Import aliases "pd", "np", "plt", "sns", "os", "json", "sys", "re", "io", "csv", "glob", "duckdb", "display", "math", "datetime", "time", "warnings", "logging", "copy", "pickle", "pathlib", "collections", "itertools", "functools", "operator", "random", "networkx", + # Common data variable — the main loaded DataFrame should not be + # auto-exported every round; the LLM can save it explicitly via + # DATA_FILE_SAVED if needed. + "df", + # Typical intermediate/temporary variable names from analysis code + "cross_table", "cross_table_filtered", + "module_issue_table", "module_issue_filtered", + "correlation_matrix", + "feature_data", "person_stats", "top_persons", + "abnormal_durations", "abnormal_orders", + "missing_df", "missing_values", "missing_percent", + "monthly_counts", "monthly_summary", + "distribution_results", "phrase_freq", + "normal_durations", + "df_check", "df_temp", } # Regex for parsing DATA_FILE_SAVED markers @@ -341,15 +358,31 @@ from IPython.display import display @staticmethod def _sanitize_for_json(rows: List[Dict]) -> List[Dict]: - """Replace NaN/inf/-inf with None so the data is JSON-serializable.""" + """Make evidence row values JSON-serializable. + + Handles NaN/inf → None, Timestamp/datetime → isoformat string, + numpy scalars → Python native types. + """ import math sanitized = [] for row in rows: clean = {} for k, v in row.items(): - if isinstance(v, float) and (math.isnan(v) or math.isinf(v)): + if v is None: clean[k] = None + elif isinstance(v, float) and (math.isnan(v) or math.isinf(v)): + clean[k] = None + elif hasattr(v, 'isoformat'): # Timestamp, datetime + clean[k] = v.isoformat() + elif hasattr(v, 'item'): # numpy scalar + clean[k] = v.item() else: + try: + if pd.isna(v): + clean[k] = None + continue + except (TypeError, ValueError): + pass clean[k] = v sanitized.append(clean) return sanitized @@ -405,12 +438,17 @@ from IPython.display import display def _detect_new_dataframes( self, before: Dict[str, int], after: Dict[str, int] ) -> List[str]: - """Return variable names of new or changed DataFrames.""" - new_or_changed = [] + """Return variable names of truly NEW DataFrames only. + + Only returns names that did not exist in the before-snapshot. + Changed DataFrames (same name, different id) are excluded to avoid + re-exporting the main 'df' or other modified variables every round. + """ + new_only = [] for name, obj_id in after.items(): - if name not in before or before[name] != obj_id: - new_or_changed.append(name) - return new_or_changed + if name not in before: + new_only.append(name) + return new_only def _export_dataframe(self, var_name: str, df) -> Optional[Dict[str, Any]]: """ diff --git a/utils/llm_helper.py b/utils/llm_helper.py index 70868e2..d143230 100644 --- a/utils/llm_helper.py +++ b/utils/llm_helper.py @@ -84,6 +84,20 @@ class LLMHelper: else: yaml_content = response.strip() + # Strip language identifier if LLM used ```python instead of ```yaml + # e.g. "python\naction: ..." → "action: ..." + import re + if re.match(r'^[a-zA-Z]+\n', yaml_content): + yaml_content = yaml_content.split('\n', 1)[1] + + # Fix Windows backslash paths that break YAML double-quoted strings. + # e.g. "D:\code\iov..." → "D:/code/iov..." inside quoted values + yaml_content = re.sub( + r'"([A-Za-z]:\\[^"]*)"', + lambda m: '"' + m.group(1).replace('\\', '/') + '"', + yaml_content, + ) + parsed = yaml.safe_load(yaml_content) return parsed if parsed is not None else {} except Exception as e: diff --git a/utils/script_generator.py b/utils/script_generator.py index f6ebee4..2124fd2 100644 --- a/utils/script_generator.py +++ b/utils/script_generator.py @@ -71,6 +71,59 @@ def clean_code_block(code: str) -> str: return '\n'.join(result_lines) +def _is_verification_code(code: str) -> bool: + """Detect code blocks that only check/list files without doing real analysis. + + These are typically generated when the LLM runs os.listdir / os.path.exists + loops to verify outputs, and should not appear in the reusable script. + """ + lines = [l.strip() for l in code.strip().splitlines() if l.strip() and not l.strip().startswith('#')] + if not lines: + return True + + verification_indicators = 0 + analysis_indicators = 0 + + for line in lines: + # Verification patterns + if any(kw in line for kw in [ + 'os.listdir(', 'os.path.exists(', 'os.path.getsize(', + 'os.path.isfile(', '✓', '✗', 'all_exist', + ]): + verification_indicators += 1 + # Analysis patterns (actual computation / plotting / saving) + if any(kw in line for kw in [ + '.plot(', 'plt.', '.to_csv(', '.value_counts()', + '.groupby(', '.corr(', '.fit_transform(', '.fit_predict(', + 'pd.read_csv(', 'pd.crosstab(', '.describe()', + ]): + analysis_indicators += 1 + + # If the block is dominated by verification with no real analysis, skip it + return verification_indicators > 0 and analysis_indicators == 0 + + +def _is_duplicate_data_load(code: str, seen_load_blocks: set) -> bool: + """Detect duplicate data loading blocks (LLM 'amnesia' repeats). + + Computes a fingerprint from the code's structural lines (ignoring + whitespace and comments) and returns True if we've seen it before. + """ + # Extract structural fingerprint: non-empty, non-comment lines + structural_lines = [] + for line in code.splitlines(): + stripped = line.strip() + if stripped and not stripped.startswith('#'): + structural_lines.append(stripped) + + fingerprint = '\n'.join(structural_lines[:30]) # First 30 lines are enough + + if fingerprint in seen_load_blocks: + return True + seen_load_blocks.add(fingerprint) + return False + + def generate_reusable_script( analysis_results: List[Dict[str, Any]], data_files: List[str], @@ -92,17 +145,29 @@ def generate_reusable_script( # 收集所有成功执行的代码 all_imports = set() code_blocks = [] + seen_load_blocks: Set[str] = set() for result in analysis_results: # 只处理 generate_code 类型的结果 if result.get("action") == "collect_figures": continue + # Skip retry attempts + if result.get("retry"): + continue code = result.get("code", "") exec_result = result.get("result", {}) # 只收集成功执行的代码 if code and exec_result.get("success", False): + # Skip pure verification/file-check code (e.g. os.listdir loops) + if _is_verification_code(code): + continue + + # Skip duplicate data-loading blocks (LLM amnesia repeats) + if _is_duplicate_data_load(code, seen_load_blocks): + continue + # 提取 imports imports = extract_imports(code) all_imports.update(imports) diff --git a/web/main.py b/web/main.py index 4bd23ef..538a616 100644 --- a/web/main.py +++ b/web/main.py @@ -85,9 +85,28 @@ class SessionManager: return self.sessions[session_id] # Fallback: Try to reconstruct from disk for history sessions + # First try the old convention: outputs/session_{uuid} output_dir = os.path.join("outputs", f"session_{session_id}") if os.path.exists(output_dir) and os.path.isdir(output_dir): return self._reconstruct_session(session_id, output_dir) + + # Scan all session directories for session_meta.json matching this session_id + # This handles the case where output_dir uses a timestamp name, not the UUID + outputs_root = "outputs" + if os.path.exists(outputs_root): + for dirname in os.listdir(outputs_root): + dir_path = os.path.join(outputs_root, dirname) + if not os.path.isdir(dir_path) or not dirname.startswith("session_"): + continue + meta_path = os.path.join(dir_path, "session_meta.json") + if os.path.exists(meta_path): + try: + with open(meta_path, "r", encoding="utf-8") as f: + meta = json.load(f) + if meta.get("session_id") == session_id: + return self._reconstruct_session(session_id, dir_path) + except Exception: + continue return None @@ -99,33 +118,52 @@ class SessionManager: session.current_round = session.max_rounds session.progress_percentage = 100.0 session.status_message = "已完成 (历史记录)" + + # Read session_meta.json if available + meta = {} + meta_path = os.path.join(output_dir, "session_meta.json") + if os.path.exists(meta_path): + try: + with open(meta_path, "r", encoding="utf-8") as f: + meta = json.load(f) + except Exception: + pass # Recover Log log_path = os.path.join(output_dir, "process.log") if os.path.exists(log_path): session.log_file = log_path - # Recover Report - # 宽容查找:扫描所有 .md 文件,优先取包含 "report" 或 "报告" 的文件 - md_files = glob.glob(os.path.join(output_dir, "*.md")) - if md_files: - # 默认取第一个 - chosen = md_files[0] - # 尝试找更好的匹配 - for md in md_files: - fname = os.path.basename(md).lower() - if "report" in fname or "报告" in fname: - chosen = md - break - session.generated_report = chosen + # Recover Report — prefer meta, then scan .md files + report_path = meta.get("report_path") + if report_path and os.path.exists(report_path): + session.generated_report = report_path + else: + md_files = glob.glob(os.path.join(output_dir, "*.md")) + if md_files: + chosen = md_files[0] + for md in md_files: + fname = os.path.basename(md).lower() + if "report" in fname or "报告" in fname: + chosen = md + break + session.generated_report = chosen - # Recover Script (查找可能的脚本文件) - possible_scripts = ["data_analysis_script.py", "script.py", "analysis_script.py"] - for s in possible_scripts: - p = os.path.join(output_dir, s) - if os.path.exists(p): - session.reusable_script = p - break + # Recover Script — prefer meta, then scan for 分析脚本_*.py or other patterns + script_path = meta.get("script_path") + if script_path and os.path.exists(script_path): + session.reusable_script = script_path + else: + # Try Chinese-named scripts first (generated by this system) + script_files = glob.glob(os.path.join(output_dir, "分析脚本_*.py")) + if not script_files: + for s in ["data_analysis_script.py", "script.py", "analysis_script.py"]: + p = os.path.join(output_dir, s) + if os.path.exists(p): + script_files = [p] + break + if script_files: + session.reusable_script = script_files[0] # Recover Results (images etc) results_json = os.path.join(output_dir, "results.json") @@ -219,6 +257,14 @@ def run_analysis_task(session_id: str, files: list, user_requirement: str, is_fo session_output_dir = session.output_dir session.log_file = os.path.join(session_output_dir, "process.log") + # Persist session-to-directory mapping immediately so recovery works + # even if the server restarts mid-analysis + try: + with open(os.path.join(session_output_dir, "session_meta.json"), "w") as f: + json.dump({"session_id": session_id, "user_requirement": user_requirement}, f, default=str) + except Exception: + pass + # 使用 PrintCapture 替代全局 FileLogger,退出 with 块后自动恢复 stdout with PrintCapture(session.log_file): if is_followup: @@ -285,6 +331,18 @@ def run_analysis_task(session_id: str, files: list, user_requirement: str, is_fo "data_files": session.data_files, }, f, default=str) + # Persist session-to-directory mapping for recovery after server restart + try: + with open(os.path.join(session_output_dir, "session_meta.json"), "w") as f: + json.dump({ + "session_id": session_id, + "user_requirement": user_requirement, + "report_path": session.generated_report, + "script_path": session.reusable_script, + }, f, default=str) + except Exception: + pass + except Exception as e: print(f"Error during analysis: {e}") @@ -350,14 +408,36 @@ async def chat_analysis(request: ChatRequest, background_tasks: BackgroundTasks) import math as _math def _sanitize_value(v): - """Replace NaN/inf with None for JSON safety.""" - if isinstance(v, float) and (_math.isnan(v) or _math.isinf(v)): + """Make any value JSON-serializable. + + Handles: NaN/inf floats → None, pandas Timestamp/Timedelta → str, + numpy integers/floats → Python int/float, dicts and lists recursively. + """ + if v is None: return None + if isinstance(v, float): + if _math.isnan(v) or _math.isinf(v): + return None + return v + if isinstance(v, (int, bool, str)): + return v if isinstance(v, dict): return {k: _sanitize_value(val) for k, val in v.items()} if isinstance(v, list): return [_sanitize_value(item) for item in v] - return v + # pandas Timestamp, Timedelta, NaT + try: + if pd.isna(v): + return None + except (TypeError, ValueError): + pass + if hasattr(v, 'isoformat'): # datetime, Timestamp + return v.isoformat() + # numpy scalar types + if hasattr(v, 'item'): + return v.item() + # Fallback: convert to string + return str(v) @app.get("/api/status") @@ -533,6 +613,37 @@ async def list_available_templates(): return {"templates": list_templates()} +@app.get("/api/templates/{template_name}") +async def get_template_detail(template_name: str): + """获取单个模板的完整内容(含步骤)""" + from utils.analysis_templates import get_template + try: + tpl = get_template(template_name) + return tpl.to_dict() + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + +@app.put("/api/templates/{template_name}") +async def update_template(template_name: str, body: dict): + """创建或更新模板""" + from utils.analysis_templates import save_template + try: + filepath = save_template(template_name, body) + return {"status": "saved", "filepath": filepath} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@app.delete("/api/templates/{template_name}") +async def remove_template(template_name: str): + """删除模板""" + from utils.analysis_templates import delete_template + if delete_template(template_name): + return {"status": "deleted"} + raise HTTPException(status_code=404, detail=f"Template not found: {template_name}") + + # --- Data Files API --- @app.get("/api/data-files") diff --git a/web/static/script.js b/web/static/script.js index e030006..dde336a 100644 --- a/web/static/script.js +++ b/web/static/script.js @@ -209,12 +209,16 @@ function startPolling() { loadDataFiles(); // Update progress bar during analysis + // Use rounds.length (actual completed analysis rounds) for display + // instead of current_round (which includes non-code rounds like collect_figures) if (data.is_running && data.progress_percentage !== undefined) { - updateProgressBar(data.progress_percentage, data.status_message, data.current_round, data.max_rounds); + const displayRound = rounds.length || data.current_round || 0; + updateProgressBar(data.progress_percentage, data.status_message, displayRound, data.max_rounds); } if (!data.is_running && isRunning) { - updateProgressBar(100, 'Analysis complete', data.current_round || data.max_rounds, data.max_rounds); + const displayRound = rounds.length || data.current_round || data.max_rounds; + updateProgressBar(100, 'Analysis complete', displayRound, data.max_rounds); setTimeout(hideProgressBar, 3000); setRunningState(false);