diff --git a/README.md b/README.md index ce07d7d..94fff50 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,9 @@ data_analysis_agent/ │ ├── fallback_openai_client.py # 支持故障转移的OpenAI客户端 │ ├── extract_code.py # 代码提取工具 │ ├── format_execution_result.py # 执行结果格式化 -│ └── create_session_dir.py # 会话目录管理 +│ ├── create_session_dir.py # 会话目录管理 +│ ├── data_loader.py # 数据加载与画像生成 +│ └── script_generator.py # 可复用脚本生成器 ├── 📄 data_analysis_agent.py # 主智能体类 ├── 📄 prompts.py # 系统提示词模板 ├── 📄 main.py # 使用示例 @@ -266,12 +268,15 @@ stateDiagram-v2 ```python @dataclass class LLMConfig: - provider: str = "openai" + provider: str = os.environ.get("LLM_PROVIDER", "openai") api_key: str = os.environ.get("OPENAI_API_KEY", "") base_url: str = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1") model: str = os.environ.get("OPENAI_MODEL", "gpt-4") - max_tokens: int = 4000 - temperature: float = 0.1 + temperature: float = 0.5 + max_tokens: int = 8192 + + # 支持 gemini 等其他 provider 配置 + # ... ``` ### 执行器配置 @@ -281,7 +286,9 @@ class LLMConfig: ALLOWED_IMPORTS = { 'pandas', 'numpy', 'matplotlib', 'duckdb', 'scipy', 'sklearn', 'plotly', 'requests', - 'os', 'json', 'datetime', 're', 'pathlib' + 'os', 'json', 'datetime', 're', 'pathlib', + 'seaborn', 'statsmodels', 'networkx', 'jieba', + 'wordcloud', 'PIL', 'sqlite3', 'yaml' } ``` diff --git a/cleaned_data/.gitkeep b/cleaned_data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/config/llm_config.py b/config/llm_config.py index 2b2205c..c4bc6b4 100644 --- a/config/llm_config.py +++ b/config/llm_config.py @@ -18,11 +18,11 @@ class LLMConfig: """LLM配置""" provider: str = os.environ.get("LLM_PROVIDER", "openai") # openai, gemini, etc. - api_key: str = os.environ.get("OPENAI_API_KEY", "sk-Gce85QLROESeOWf3icd2mQnYHOrmMYojwVPQ0AubMjGQ5ZE2") - base_url: str = os.environ.get("OPENAI_BASE_URL", "https://gemini.jeason.online/v1") - model: str = os.environ.get("OPENAI_MODEL", "gemini-2.5-pro") + api_key: str = os.environ.get("OPENAI_API_KEY", "sk-2187174de21548b0b8b0c92129700199") + base_url: str = os.environ.get("OPENAI_BASE_URL", "http://127.0.0.1:9999/v1") + model: str = os.environ.get("OPENAI_MODEL", "gemini-3-flash") temperature: float = 0.5 - max_tokens: int = 131072 + max_tokens: int = 8192 # 降低默认值,避免某些API不支持过大的值 def __post_init__(self): """配置初始化后的处理""" @@ -34,6 +34,8 @@ class LLMConfig: # Gemini 的 OpenAI 兼容接口地址 self.base_url = os.environ.get("GEMINI_BASE_URL", "https://gemini.jeason.online") self.model = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") + # Gemini 有更严格的 token 限制 + self.max_tokens = 8192 def to_dict(self) -> Dict[str, Any]: """转换为字典""" diff --git a/data_analysis_agent.py b/data_analysis_agent.py index c83db08..cc48bf3 100644 --- a/data_analysis_agent.py +++ b/data_analysis_agent.py @@ -18,6 +18,7 @@ from utils.extract_code import extract_code_from_response from utils.data_loader import load_and_profile_data from utils.llm_helper import LLMHelper from utils.code_executor import CodeExecutor +from utils.script_generator import generate_reusable_script from config.llm_config import LLMConfig from prompts import data_analysis_system_prompt, final_report_system_prompt, data_analysis_followup_prompt @@ -61,6 +62,8 @@ class DataAnalysisAgent: self.session_output_dir = None self.executor = None self.data_profile = "" # 存储数据画像 + self.data_files = [] # 存储数据文件列表 + self.user_requirement = "" # 存储用户需求 def _process_response(self, response: str) -> Dict[str, Any]: """ @@ -76,7 +79,7 @@ class DataAnalysisAgent: yaml_data = self.llm.parse_yaml_response(response) action = yaml_data.get("action", "generate_code") - print(f"🎯 检测到动作: {action}") + print(f"[TARGET] 检测到动作: {action}") if action == "analysis_complete": return self._handle_analysis_complete(response, yaml_data) @@ -85,11 +88,11 @@ class DataAnalysisAgent: elif action == "generate_code": return self._handle_generate_code(response, yaml_data) else: - print(f"⚠️ 未知动作类型: {action},按generate_code处理") + print(f"[WARN] 未知动作类型: {action},按generate_code处理") return self._handle_generate_code(response, yaml_data) except Exception as e: - print(f"⚠️ 解析响应失败: {str(e)},尝试提取代码并按generate_code处理") + print(f"[WARN] 解析响应失败: {str(e)},尝试提取代码并按generate_code处理") # 即使YAML解析失败,也尝试提取代码 extracted_code = extract_code_from_response(response) if extracted_code: @@ -100,7 +103,7 @@ class DataAnalysisAgent: self, response: str, yaml_data: Dict[str, Any] ) -> Dict[str, Any]: """处理分析完成动作""" - print("✅ 分析任务完成") + print("[OK] 分析任务完成") final_report = yaml_data.get("final_report", "分析完成,无最终报告") return { "action": "analysis_complete", @@ -113,7 +116,7 @@ class DataAnalysisAgent: self, response: str, yaml_data: Dict[str, Any] ) -> Dict[str, Any]: """处理图片收集动作""" - print("📊 开始收集图片") + print("[CHART] 开始收集图片") figures_to_collect = yaml_data.get("figures_to_collect", []) collected_figures = [] @@ -130,10 +133,10 @@ class DataAnalysisAgent: description = figure_info.get("description", "") analysis = figure_info.get("analysis", "") - print(f"📈 收集图片 {figure_number}: {filename}") - print(f" 📂 路径: {file_path}") - print(f" 📝 描述: {description}") - print(f" 🔍 分析: {analysis}") + print(f"[GRAPH] 收集图片 {figure_number}: {filename}") + print(f" [DIR] 路径: {file_path}") + print(f" [NOTE] 描述: {description}") + print(f" [SEARCH] 分析: {analysis}") # 使用seen_paths集合来去重,防止重复收集 @@ -145,7 +148,7 @@ class DataAnalysisAgent: # 检查是否已经收集过该路径 abs_path = os.path.abspath(file_path) if abs_path not in seen_paths: - print(f" ✅ 文件存在: {file_path}") + print(f" [OK] 文件存在: {file_path}") # 记录图片信息 collected_figures.append( { @@ -158,12 +161,12 @@ class DataAnalysisAgent: ) seen_paths.add(abs_path) else: - print(f" ⚠️ 跳过重复图片: {file_path}") + print(f" [WARN] 跳过重复图片: {file_path}") else: if file_path: - print(f" ⚠️ 文件不存在: {file_path}") + print(f" [WARN] 文件不存在: {file_path}") else: - print(f" ⚠️ 未提供文件路径") + print(f" [WARN] 未提供文件路径") return { "action": "collect_figures", @@ -195,7 +198,7 @@ class DataAnalysisAgent: code = code.strip() if code: - print(f"🔧 执行代码:\n{code}") + print(f"[TOOL] 执行代码:\n{code}") print("-" * 40) # 执行代码 @@ -203,7 +206,7 @@ class DataAnalysisAgent: # 格式化执行结果 feedback = format_execution_result(result) - print(f"📋 执行反馈:\n{feedback}") + print(f"[LIST] 执行反馈:\n{feedback}") return { "action": "generate_code", @@ -215,7 +218,7 @@ class DataAnalysisAgent: } else: # 如果没有代码,说明LLM响应格式有问题,需要重新生成 - print("⚠️ 未从响应中提取到可执行代码,要求LLM重新生成") + print("[WARN] 未从响应中提取到可执行代码,要求LLM重新生成") return { "action": "invalid_response", "error": "响应中缺少可执行代码", @@ -246,6 +249,8 @@ class DataAnalysisAgent: self.conversation_history = [] self.analysis_results = [] self.current_round = 0 + self.data_files = files or [] # 保存数据文件列表 + self.user_requirement = user_input # 保存用户需求 # 创建本次分析的专用输出目录 if session_output_dir: @@ -264,12 +269,12 @@ class DataAnalysisAgent: # 设用工具生成数据画像 data_profile = "" if files: - print("🔍 正在生成数据画像...") + print("[SEARCH] 正在生成数据画像...") try: data_profile = load_and_profile_data(files) - print("✅ 数据画像生成完毕") + print("[OK] 数据画像生成完毕") except Exception as e: - print(f"⚠️ 数据画像生成失败: {e}") + print(f"[WARN] 数据画像生成失败: {e}") # 保存到实例变量供最终报告使用 self.data_profile = data_profile @@ -282,11 +287,11 @@ class DataAnalysisAgent: if data_profile: initial_prompt += f"\n\n{data_profile}\n\n请根据上述【数据画像】中的统计信息(如高频值、缺失率、数据范围)来制定分析策略。如果发现明显的高频问题或异常分布,请优先进行深度分析。" - print(f"🚀 开始数据分析任务") - print(f"📝 用户需求: {user_input}") + print(f"[START] 开始数据分析任务") + print(f"[NOTE] 用户需求: {user_input}") if files: - print(f"📁 数据文件: {', '.join(files)}") - print(f"📂 输出目录: {self.session_output_dir}") + print(f"[FOLDER] 数据文件: {', '.join(files)}") + print(f"[DIR] 输出目录: {self.session_output_dir}") # 添加到对话历史 self.conversation_history.append({"role": "user", "content": initial_prompt}) @@ -297,8 +302,8 @@ class DataAnalysisAgent: if max_rounds is None: current_max_rounds = 10 # 追问通常不需要那么长的思考链,10轮足够 - print(f"\n🚀 继续分析任务 (追问模式)") - print(f"📝 后续需求: {user_input}") + print(f"\n[START] 继续分析任务 (追问模式)") + print(f"[NOTE] 后续需求: {user_input}") # 重置当前轮数计数器,以便给新任务足够的轮次 self.current_round = 0 @@ -308,18 +313,21 @@ class DataAnalysisAgent: follow_up_prompt = f"后续需求: {user_input}\n(注意:这是后续追问,请直接针对该问题进行分析,无需从头开始执行完整SOP。)" self.conversation_history.append({"role": "user", "content": follow_up_prompt}) - print(f"🔢 本次最大轮数: {current_max_rounds}") + print(f"[NUM] 本次最大轮数: {current_max_rounds}") if self.force_max_rounds: - print(f"⚡ 强制模式: 将运行满 {current_max_rounds} 轮(忽略AI完成信号)") + print(f"[FAST] 强制模式: 将运行满 {current_max_rounds} 轮(忽略AI完成信号)") print("=" * 60) # 保存原始 max_rounds 以便恢复(虽然 analyze 结束后不需要恢复,但为了逻辑严谨) original_max_rounds = self.max_rounds self.max_rounds = current_max_rounds + # 初始化连续失败计数器 + consecutive_failures = 0 + while self.current_round < self.max_rounds: self.current_round += 1 - print(f"\n🔄 第 {self.current_round} 轮分析") + print(f"\n[LOOP] 第 {self.current_round} 轮分析") # 调用LLM生成响应 try: # 获取当前执行环境的变量信息 notebook_variables = self.executor.get_environment_info() @@ -340,15 +348,15 @@ class DataAnalysisAgent: formatted_system_prompt = base_system_prompt.format( notebook_variables=notebook_variables ) - print(f"🐛 [DEBUG] System Prompt Head:\n{formatted_system_prompt[:500]}...\n[...]") - print(f"🐛 [DEBUG] System Prompt Rules Check: 'stop_words' in prompt? {'stop_words' in formatted_system_prompt}") + print(f"[DEBUG] [DEBUG] System Prompt Head:\n{formatted_system_prompt[:500]}...\n[...]") + print(f"[DEBUG] [DEBUG] System Prompt Rules Check: 'stop_words' in prompt? {'stop_words' in formatted_system_prompt}") response = self.llm.call( prompt=self._build_conversation_prompt(), system_prompt=formatted_system_prompt, ) - print(f"🤖 助手响应:\n{response}") + print(f"[AI] 助手响应:\n{response}") # 使用统一的响应处理方法 process_result = self._process_response(response) @@ -356,9 +364,9 @@ class DataAnalysisAgent: # 根据处理结果决定是否继续(仅在非强制模式下) if process_result.get("action") == "invalid_response": consecutive_failures += 1 - print(f"⚠️ 连续失败次数: {consecutive_failures}/3") + print(f"[WARN] 连续失败次数: {consecutive_failures}/3") if consecutive_failures >= 3: - print(f"❌ 连续3次无法获取有效响应,分析终止。请检查网络或配置。") + print(f"[ERROR] 连续3次无法获取有效响应,分析终止。请检查网络或配置。") break else: consecutive_failures = 0 # 重置计数器 @@ -366,7 +374,7 @@ class DataAnalysisAgent: if not self.force_max_rounds and not process_result.get( "continue", True ): - print(f"\n✅ 分析完成!") + print(f"\n[OK] 分析完成!") break # 添加到对话历史 @@ -398,7 +406,7 @@ class DataAnalysisAgent: feedback = f"已收集 {len(collected_figures)} 个有效图片及其分析。" if missing_figures: - feedback += f"\n⚠️ 以下图片未找到,请检查代码是否成功保存了这些图片: {missing_figures}" + feedback += f"\n[WARN] 以下图片未找到,请检查代码是否成功保存了这些图片: {missing_figures}" self.conversation_history.append( { @@ -421,7 +429,7 @@ class DataAnalysisAgent: except Exception as e: error_msg = f"LLM调用错误: {str(e)}" - print(f"❌ {error_msg}") + print(f"[ERROR] {error_msg}") self.conversation_history.append( { "role": "user", @@ -430,7 +438,7 @@ class DataAnalysisAgent: ) # 生成最终总结 if self.current_round >= self.max_rounds: - print(f"\n⚠️ 已达到最大轮数 ({self.max_rounds}),分析结束") + print(f"\n[WARN] 已达到最大轮数 ({self.max_rounds}),分析结束") return self._generate_final_report() @@ -456,8 +464,8 @@ class DataAnalysisAgent: if result.get("action") == "collect_figures": all_figures.extend(result.get("collected_figures", [])) - print(f"\n📊 开始生成最终分析报告...") - print(f"📂 输出目录: {self.session_output_dir}") + print(f"\n[CHART] 开始生成最终分析报告...") + print(f"[DIR] 输出目录: {self.session_output_dir}") # --- 自动补全/发现图片机制 --- # 扫描目录下所有的png文件 @@ -475,7 +483,7 @@ class DataAnalysisAgent: for png_path in existing_pngs: abs_png_path = os.path.abspath(png_path) if abs_png_path not in collected_paths: - print(f"🔍 [自动发现] 补充未显式收集的图片: {os.path.basename(png_path)}") + print(f"[SEARCH] [自动发现] 补充未显式收集的图片: {os.path.basename(png_path)}") all_figures.append({ "figure_number": "Auto", "filename": os.path.basename(png_path), @@ -484,11 +492,11 @@ class DataAnalysisAgent: "analysis": "(该图表由系统自动捕获,Agent未提供具体分析文本,请结合图表标题理解)" }) except Exception as e: - print(f"⚠️ 自动发现图片失败: {e}") + print(f"[WARN] 自动发现图片失败: {e}") # --------------------------- - print(f"🔢 总轮数: {self.current_round}") - print(f"📈 收集图片: {len(all_figures)} 个") + print(f"[NUM] 总轮数: {self.current_round}") + print(f"[GRAPH] 收集图片: {len(all_figures)} 个") # 构建用于生成最终报告的提示词 final_report_prompt = self._build_final_report_prompt(all_figures) @@ -512,12 +520,12 @@ class DataAnalysisAgent: except: pass # 解析失败则保持原样 - print("✅ 最终报告生成完成") + print("[OK] 最终报告生成完成") - print("✅ 最终报告生成完成") + print("[OK] 最终报告生成完成") except Exception as e: - print(f"❌ 生成最终报告时出错: {str(e)}") + print(f"[ERROR] 生成最终报告时出错: {str(e)}") final_report_content = f"报告生成失败: {str(e)}" # 保存最终报告到文件 @@ -525,9 +533,21 @@ class DataAnalysisAgent: try: with open(report_file_path, "w", encoding="utf-8") as f: f.write(final_report_content) - print(f"📄 最终报告已保存至: {report_file_path}") + print(f"[DOC] 最终报告已保存至: {report_file_path}") except Exception as e: - print(f"❌ 保存报告文件失败: {str(e)}") + print(f"[ERROR] 保存报告文件失败: {str(e)}") + + # 生成可复用脚本 + script_path = "" + try: + script_path = generate_reusable_script( + analysis_results=self.analysis_results, + data_files=self.data_files, + session_output_dir=self.session_output_dir, + user_requirement=self.user_requirement + ) + except Exception as e: + print(f"[WARN] 脚本生成失败: {e}") # 返回完整的分析结果 return { @@ -538,6 +558,7 @@ class DataAnalysisAgent: "conversation_history": self.conversation_history, "final_report": final_report_content, "report_file_path": report_file_path, + "reusable_script_path": script_path, } def _build_final_report_prompt(self, all_figures: List[Dict[str, Any]]) -> str: @@ -584,7 +605,7 @@ class DataAnalysisAgent: # 在提示词中明确要求使用相对路径 prompt += """ -📁 **图片路径使用说明**: +[FOLDER] **图片路径使用说明**: 报告和图片都在同一目录下,请在报告中使用相对路径引用图片: - 格式: - 示例: diff --git a/data_preprocessing/README.md b/data_preprocessing/README.md new file mode 100644 index 0000000..6b14908 --- /dev/null +++ b/data_preprocessing/README.md @@ -0,0 +1,89 @@ +# 数据预处理模块 + +独立的数据清洗工具,用于在正式分析前准备数据。 + +## 功能 + +- **数据合并**:将多个 Excel/CSV 文件合并为单一 CSV +- **时间排序**:按时间列对数据进行排序 +- **目录管理**:标准化的原始数据和输出数据目录 + +## 目录结构 + +``` +project/ +├── raw_data/ # 原始数据存放目录 +│ ├── remotecontrol/ # 按数据来源分类 +│ └── ... +├── cleaned_data/ # 清洗后数据输出目录 +│ ├── xxx_merged.csv +│ └── xxx_sorted.csv +└── data_preprocessing/ # 本模块 +``` + +## 使用方法 + +### 命令行 + +```bash +# 初始化目录结构 +python -m data_preprocessing.cli init + +# 合并 Excel 文件 +python -m data_preprocessing.cli merge --source raw_data/remotecontrol + +# 合并并按时间排序 +python -m data_preprocessing.cli merge --source raw_data/remotecontrol --sort-by SendTime + +# 指定输出路径 +python -m data_preprocessing.cli merge -s raw_data/remotecontrol -o cleaned_data/my_output.csv + +# 排序已有 CSV +python -m data_preprocessing.cli sort --input some_file.csv --time-col SendTime + +# 原地排序(覆盖原文件) +python -m data_preprocessing.cli sort --input data.csv --inplace +``` + +### Python API + +```python +from data_preprocessing import merge_files, sort_by_time, Config + +# 合并文件 +output_path = merge_files( + source_dir="raw_data/remotecontrol", + output_file="cleaned_data/merged.csv", + pattern="*.xlsx", + time_column="SendTime" # 可选:合并后排序 +) + +# 排序 CSV +sorted_path = sort_by_time( + input_path="data.csv", + output_path="sorted_data.csv", + time_column="CreateTime" +) + +# 自定义配置 +config = Config() +config.raw_data_dir = "/path/to/raw" +config.cleaned_data_dir = "/path/to/cleaned" +config.ensure_dirs() +``` + +## 配置项 + +| 配置项 | 默认值 | 说明 | +|--------|--------|------| +| `raw_data_dir` | `raw_data/` | 原始数据目录 | +| `cleaned_data_dir` | `cleaned_data/` | 清洗输出目录 | +| `default_time_column` | `SendTime` | 默认时间列名 | +| `csv_encoding` | `utf-8-sig` | CSV 编码格式 | + +## 注意事项 + +1. 本模块与 `DataAnalysisAgent` 完全独立,不会相互调用 +2. 合并时会自动添加 `_source_file` 列标记数据来源(可用 `--no-source-col` 禁用) +3. Excel 文件会自动合并所有 Sheet +4. 无效时间值在排序时会被放到最后 diff --git a/data_preprocessing/__init__.py b/data_preprocessing/__init__.py new file mode 100644 index 0000000..f83fa68 --- /dev/null +++ b/data_preprocessing/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +""" +数据预处理模块 + +提供独立的数据清洗功能: +- 按时间排序 +- 同类数据合并 +""" + +from .sorter import sort_by_time +from .merger import merge_files +from .config import Config + +__all__ = ["sort_by_time", "merge_files", "Config"] diff --git a/data_preprocessing/cli.py b/data_preprocessing/cli.py new file mode 100644 index 0000000..a56a145 --- /dev/null +++ b/data_preprocessing/cli.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +""" +数据预处理命令行接口 + +使用示例: + # 合并 Excel 文件 + python -m data_preprocessing.cli merge --source raw_data/remotecontrol --output cleaned_data/merged.csv + + # 合并并排序 + python -m data_preprocessing.cli merge --source raw_data/remotecontrol --sort-by SendTime + + # 排序已有 CSV + python -m data_preprocessing.cli sort --input data.csv --output sorted.csv --time-col SendTime + + # 初始化目录结构 + python -m data_preprocessing.cli init +""" + +import argparse +import sys +from .config import default_config +from .sorter import sort_by_time +from .merger import merge_files + + +def main(): + parser = argparse.ArgumentParser( + prog="data_preprocessing", + description="数据预处理工具:排序、合并", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +示例: + %(prog)s merge --source raw_data/remotecontrol --sort-by SendTime + %(prog)s sort --input data.csv --time-col CreateTime + %(prog)s init + """ + ) + + subparsers = parser.add_subparsers(dest="command", help="可用命令") + + # ========== merge 命令 ========== + merge_parser = subparsers.add_parser("merge", help="合并同类文件") + merge_parser.add_argument( + "--source", "-s", + required=True, + help="源数据目录路径" + ) + merge_parser.add_argument( + "--output", "-o", + default=None, + help="输出文件路径 (默认: cleaned_data/<目录名>_merged.csv)" + ) + merge_parser.add_argument( + "--pattern", "-p", + default="*.xlsx", + help="文件匹配模式 (默认: *.xlsx)" + ) + merge_parser.add_argument( + "--sort-by", + default=None, + dest="time_column", + help="合并后按此时间列排序" + ) + merge_parser.add_argument( + "--no-source-col", + action="store_true", + help="不添加来源文件列" + ) + + # ========== sort 命令 ========== + sort_parser = subparsers.add_parser("sort", help="按时间排序 CSV") + sort_parser.add_argument( + "--input", "-i", + required=True, + help="输入 CSV 文件路径" + ) + sort_parser.add_argument( + "--output", "-o", + default=None, + help="输出文件路径 (默认: cleaned_data/<文件名>_sorted.csv)" + ) + sort_parser.add_argument( + "--time-col", "-t", + default=None, + dest="time_column", + help=f"时间列名 (默认: {default_config.default_time_column})" + ) + sort_parser.add_argument( + "--inplace", + action="store_true", + help="原地覆盖输入文件" + ) + + # ========== init 命令 ========== + init_parser = subparsers.add_parser("init", help="初始化目录结构") + + # 解析参数 + args = parser.parse_args() + + if args.command is None: + parser.print_help() + sys.exit(0) + + try: + if args.command == "merge": + result = merge_files( + source_dir=args.source, + output_file=args.output, + pattern=args.pattern, + time_column=args.time_column, + add_source_column=not args.no_source_col + ) + print(f"\n✅ 合并成功: {result}") + + elif args.command == "sort": + result = sort_by_time( + input_path=args.input, + output_path=args.output, + time_column=args.time_column, + inplace=args.inplace + ) + print(f"\n✅ 排序成功: {result}") + + elif args.command == "init": + default_config.ensure_dirs() + print("\n✅ 目录初始化完成") + + except FileNotFoundError as e: + print(f"\n❌ 错误: {e}") + sys.exit(1) + except KeyError as e: + print(f"\n❌ 错误: {e}") + sys.exit(1) + except Exception as e: + print(f"\n❌ 未知错误: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/data_preprocessing/config.py b/data_preprocessing/config.py new file mode 100644 index 0000000..f41e107 --- /dev/null +++ b/data_preprocessing/config.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +""" +数据预处理模块配置 +""" + +import os +from dataclasses import dataclass + +# 获取项目根目录 +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + + +@dataclass +class Config: + """预处理模块配置""" + + # 原始数据存放目录 + raw_data_dir: str = os.path.join(PROJECT_ROOT, "raw_data") + + # 清洗后数据输出目录 + cleaned_data_dir: str = os.path.join(PROJECT_ROOT, "cleaned_data") + + # 默认时间列名 + default_time_column: str = "SendTime" + + # 支持的文件扩展名 + supported_extensions: tuple = (".csv", ".xlsx", ".xls") + + # CSV 编码 + csv_encoding: str = "utf-8-sig" + + def ensure_dirs(self): + """确保目录存在""" + os.makedirs(self.raw_data_dir, exist_ok=True) + os.makedirs(self.cleaned_data_dir, exist_ok=True) + print(f"[OK] 目录已就绪:") + print(f" 原始数据: {self.raw_data_dir}") + print(f" 清洗输出: {self.cleaned_data_dir}") + + +# 默认配置实例 +default_config = Config() diff --git a/data_preprocessing/merge_excel.py b/data_preprocessing/merge_excel.py new file mode 100644 index 0000000..7233afa --- /dev/null +++ b/data_preprocessing/merge_excel.py @@ -0,0 +1,83 @@ + +import pandas as pd +import glob +import os + +def merge_excel_files(source_dir="remotecontrol", output_file="merged_all_files.csv"): + """ + 将指定目录下的所有 Excel 文件 (.xlsx, .xls) 合并为一个 CSV 文件。 + """ + print(f"[SEARCH] 正在扫描目录: {source_dir} ...") + + # 支持 xlsx 和 xls + files_xlsx = glob.glob(os.path.join(source_dir, "*.xlsx")) + files_xls = glob.glob(os.path.join(source_dir, "*.xls")) + files = files_xlsx + files_xls + + if not files: + print("[WARN] 未找到 Excel 文件。") + return + + # 按文件名中的数字进行排序 (例如: 1.xlsx, 2.xlsx, ..., 10.xlsx) + try: + files.sort(key=lambda x: int(os.path.basename(x).split('.')[0])) + print("[NUM] 已按文件名数字顺序排序") + except ValueError: + # 如果文件名不是纯数字,退回到字母排序 + files.sort() + print("[TEXT] 已按文件名包含非数字字符,使用字母顺序排序") + + print(f"[DIR] 找到 {len(files)} 个文件: {files}") + + all_dfs = [] + for file in files: + try: + print(f"[READ] 读取: {file}") + # 使用 ExcelFile 读取所有 sheet + xls = pd.ExcelFile(file) + print(f" [PAGES] 包含 Sheets: {xls.sheet_names}") + + file_dfs = [] + for sheet_name in xls.sheet_names: + df = pd.read_excel(xls, sheet_name=sheet_name) + if not df.empty: + print(f" [OK] Sheet '{sheet_name}' 读取成功: {len(df)} 行") + file_dfs.append(df) + else: + print(f" [WARN] Sheet '{sheet_name}' 为空,跳过") + + if file_dfs: + # 合并该文件的所有非空 sheet + file_merged_df = pd.concat(file_dfs, ignore_index=True) + # 可选:添加一列标记来源文件 + file_merged_df['Source_File'] = os.path.basename(file) + all_dfs.append(file_merged_df) + else: + print(f"[WARN] 文件 {file} 所有 Sheet 均为空") + + except Exception as e: + print(f"[ERROR] 读取 {file} 失败: {e}") + + if all_dfs: + print("[LOOP] 正在合并数据...") + merged_df = pd.concat(all_dfs, ignore_index=True) + + # 按 SendTime 排序 + if 'SendTime' in merged_df.columns: + print("[TIMER] 正在按 SendTime 排序...") + merged_df['SendTime'] = pd.to_datetime(merged_df['SendTime'], errors='coerce') + merged_df = merged_df.sort_values(by='SendTime') + else: + print("[WARN] 未找到 SendTime 列,跳过排序") + + print(f"[CACHE] 保存到: {output_file}") + merged_df.to_csv(output_file, index=False, encoding="utf-8-sig") + + print(f"[OK] 合并及排序完成!总行数: {len(merged_df)}") + print(f" 输出文件: {os.path.abspath(output_file)}") + else: + print("[WARN] 没有成功读取到任何数据。") + +if __name__ == "__main__": + # 如果需要在当前目录运行并合并 remotecontrol 文件夹下的内容 + merge_excel_files(source_dir="remotecontrol", output_file="remotecontrol_merged.csv") diff --git a/data_preprocessing/merger.py b/data_preprocessing/merger.py new file mode 100644 index 0000000..5e3d608 --- /dev/null +++ b/data_preprocessing/merger.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +""" +数据合并模块 + +合并同类 Excel/CSV 文件 +""" + +import os +import glob +import pandas as pd +from typing import Optional, List +from .config import default_config + + +def merge_files( + source_dir: str, + output_file: Optional[str] = None, + pattern: str = "*.xlsx", + time_column: Optional[str] = None, + add_source_column: bool = True +) -> str: + """ + 合并目录下的所有同类文件 + + Args: + source_dir: 源数据目录 + output_file: 输出 CSV 文件路径。如果为 None,则输出到 cleaned_data 目录 + pattern: 文件匹配模式 (e.g., "*.xlsx", "*.csv", "*.xls") + time_column: 可选,合并后按此列排序 + add_source_column: 是否添加来源文件列 + + Returns: + 输出文件的绝对路径 + + Raises: + FileNotFoundError: 目录不存在或未找到匹配文件 + """ + if not os.path.isdir(source_dir): + raise FileNotFoundError(f"目录不存在: {source_dir}") + + print(f"[SCAN] 正在扫描目录: {source_dir}") + print(f" 匹配模式: {pattern}") + + # 查找匹配文件 + files = glob.glob(os.path.join(source_dir, pattern)) + + # 如果是 xlsx,也尝试匹配 xls + if pattern == "*.xlsx": + files.extend(glob.glob(os.path.join(source_dir, "*.xls"))) + + if not files: + raise FileNotFoundError(f"未找到匹配 '{pattern}' 的文件") + + # 排序文件列表 + files = _sort_files(files) + print(f"[FOUND] 找到 {len(files)} 个文件") + + # 确定输出路径 + if output_file is None: + default_config.ensure_dirs() + dir_name = os.path.basename(os.path.normpath(source_dir)) + output_file = os.path.join( + default_config.cleaned_data_dir, + f"{dir_name}_merged.csv" + ) + + # 合并数据 + all_dfs = [] + for file in files: + try: + df = _read_file(file) + if df is not None and not df.empty: + if add_source_column: + df['_source_file'] = os.path.basename(file) + all_dfs.append(df) + except Exception as e: + print(f"[ERROR] 读取失败 {file}: {e}") + + if not all_dfs: + raise ValueError("没有成功读取到任何数据") + + print(f"[MERGE] 正在合并 {len(all_dfs)} 个数据源...") + merged_df = pd.concat(all_dfs, ignore_index=True) + print(f" 合并后总行数: {len(merged_df)}") + + # 可选:按时间排序 + if time_column and time_column in merged_df.columns: + print(f"[SORT] 正在按 '{time_column}' 排序...") + merged_df[time_column] = pd.to_datetime(merged_df[time_column], errors='coerce') + merged_df = merged_df.sort_values(by=time_column, na_position='last') + elif time_column: + print(f"[WARN] 未找到时间列 '{time_column}',跳过排序") + + # 保存结果 + print(f"[SAVE] 正在保存: {output_file}") + merged_df.to_csv(output_file, index=False, encoding=default_config.csv_encoding) + + abs_output = os.path.abspath(output_file) + print(f"[OK] 合并完成!") + print(f" 输出文件: {abs_output}") + print(f" 总行数: {len(merged_df)}") + + return abs_output + + +def _sort_files(files: List[str]) -> List[str]: + """对文件列表进行智能排序""" + try: + # 尝试按文件名中的数字排序 + files.sort(key=lambda x: int(os.path.basename(x).split('.')[0])) + print("[SORT] 已按文件名数字顺序排序") + except ValueError: + # 退回到字母排序 + files.sort() + print("[SORT] 已按文件名字母顺序排序") + return files + + +def _read_file(file_path: str) -> Optional[pd.DataFrame]: + """读取单个文件(支持 CSV 和 Excel)""" + ext = os.path.splitext(file_path)[1].lower() + + print(f"[READ] 读取: {os.path.basename(file_path)}") + + if ext == '.csv': + df = pd.read_csv(file_path, low_memory=False) + print(f" 行数: {len(df)}") + return df + + elif ext in ('.xlsx', '.xls'): + # 读取 Excel 所有 sheet 并合并 + xls = pd.ExcelFile(file_path) + print(f" Sheets: {xls.sheet_names}") + + sheet_dfs = [] + for sheet_name in xls.sheet_names: + df = pd.read_excel(xls, sheet_name=sheet_name) + if not df.empty: + print(f" - Sheet '{sheet_name}': {len(df)} 行") + sheet_dfs.append(df) + + if sheet_dfs: + return pd.concat(sheet_dfs, ignore_index=True) + return None + + else: + print(f"[WARN] 不支持的文件格式: {ext}") + return None diff --git a/data_preprocessing/sort_csv.py b/data_preprocessing/sort_csv.py new file mode 100644 index 0000000..1e07f62 --- /dev/null +++ b/data_preprocessing/sort_csv.py @@ -0,0 +1,45 @@ + +import pandas as pd +import os + +def sort_csv_by_time(file_path="remotecontrol_merged.csv", time_col="SendTime"): + """ + 读取 CSV 文件,按时间列排序,并保存。 + """ + if not os.path.exists(file_path): + print(f"[ERROR] 文件不存在: {file_path}") + return + + print(f"[READ] 正在读取 {file_path} ...") + try: + # 读取 CSV + df = pd.read_csv(file_path, low_memory=False) + print(f" [CHART] 数据行数: {len(df)}") + + if time_col not in df.columns: + print(f"[ERROR] 未找到时间列: {time_col}") + print(f" 可用列: {list(df.columns)}") + return + + print(f"[LOOP] 正在解析时间列 '{time_col}' ...") + # 转换为 datetime 对象,无法解析的设为 NaT + df[time_col] = pd.to_datetime(df[time_col], errors='coerce') + + # 检查无效时间 + nat_count = df[time_col].isna().sum() + if nat_count > 0: + print(f"[WARN] 发现 {nat_count} 行无效时间数据,排序时将排在最后") + + print("[LOOP] 正在按时间排序...") + df_sorted = df.sort_values(by=time_col) + + print(f"[CACHE] 正在保存及覆盖文件: {file_path} ...") + df_sorted.to_csv(file_path, index=False, encoding="utf-8-sig") + + print("[OK] 排序并保存完成!") + + except Exception as e: + print(f"[ERROR]处理失败: {e}") + +if __name__ == "__main__": + sort_csv_by_time() diff --git a/data_preprocessing/sorter.py b/data_preprocessing/sorter.py new file mode 100644 index 0000000..9dffe61 --- /dev/null +++ b/data_preprocessing/sorter.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +""" +数据排序模块 + +按时间列对 CSV 文件进行排序 +""" + +import os +import pandas as pd +from typing import Optional +from .config import default_config + + +def sort_by_time( + input_path: str, + output_path: Optional[str] = None, + time_column: str = None, + inplace: bool = False +) -> str: + """ + 按时间列对 CSV 文件排序 + + Args: + input_path: 输入 CSV 文件路径 + output_path: 输出路径。如果为 None 且 inplace=False,则输出到 cleaned_data 目录 + time_column: 时间列名,默认使用配置中的 default_time_column + inplace: 是否原地覆盖输入文件 + + Returns: + 输出文件的绝对路径 + + Raises: + FileNotFoundError: 输入文件不存在 + KeyError: 时间列不存在 + """ + # 参数处理 + time_column = time_column or default_config.default_time_column + + if not os.path.exists(input_path): + raise FileNotFoundError(f"文件不存在: {input_path}") + + # 确定输出路径 + if inplace: + output_path = input_path + elif output_path is None: + default_config.ensure_dirs() + basename = os.path.basename(input_path) + name, ext = os.path.splitext(basename) + output_path = os.path.join( + default_config.cleaned_data_dir, + f"{name}_sorted{ext}" + ) + + print(f"[READ] 正在读取: {input_path}") + df = pd.read_csv(input_path, low_memory=False) + print(f" 数据行数: {len(df)}") + + # 检查时间列是否存在 + if time_column not in df.columns: + available_cols = list(df.columns) + raise KeyError( + f"未找到时间列 '{time_column}'。可用列: {available_cols}" + ) + + print(f"[PARSE] 正在解析时间列 '{time_column}'...") + df[time_column] = pd.to_datetime(df[time_column], errors='coerce') + + # 统计无效时间 + nat_count = df[time_column].isna().sum() + if nat_count > 0: + print(f"[WARN] 发现 {nat_count} 行无效时间数据,排序时将排在最后") + + print("[SORT] 正在按时间排序...") + df_sorted = df.sort_values(by=time_column, na_position='last') + + print(f"[SAVE] 正在保存: {output_path}") + df_sorted.to_csv(output_path, index=False, encoding=default_config.csv_encoding) + + abs_output = os.path.abspath(output_path) + print(f"[OK] 排序完成!输出文件: {abs_output}") + + return abs_output diff --git a/main.py b/main.py index 239605e..b9ac2f2 100644 --- a/main.py +++ b/main.py @@ -17,7 +17,7 @@ class DualLogger: def write(self, message): self.terminal.write(message) # 过滤掉生成的代码块,不写入日志文件 - if "🔧 执行代码:" in message: + if "[TOOL] 执行代码:" in message: return self.log.write(message) self.log.flush() @@ -34,7 +34,7 @@ def setup_logging(log_dir): # 可选:也将错误输出重定向 # sys.stderr = logger print(f"\n{'='*20} Run Started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {'='*20}\n") - print(f"📄 日志文件已保存至: {os.path.join(log_dir, 'log.txt')}") + print(f"[DOC] 日志文件已保存至: {os.path.join(log_dir, 'log.txt')}") def main(): @@ -43,7 +43,7 @@ def main(): import os # 自动查找当前目录及remotecontrol目录下的所有数据文件 data_extensions = ['*.csv', '*.xlsx', '*.xls'] - search_dirs = ['jetour'] + search_dirs = ['cleaned_data'] files = [] for search_dir in search_dirs: @@ -52,10 +52,10 @@ def main(): files.extend(glob.glob(pattern)) if not files: - print("⚠️ 未在当前目录找到数据文件 (.csv, .xlsx),尝试使用默认文件") + print("[WARN] 未在当前目录找到数据文件 (.csv, .xlsx),尝试使用默认文件") files = ["./cleaned_data.csv"] else: - print(f"📂 自动识别到以下数据文件: {files}") + print(f"[DIR] 自动识别到以下数据文件: {files}") analysis_requirement = """ 基于所有运维工单,整理一份工单健康度报告,包括但不限于对所有车联网技术支持工单的全面数据分析, @@ -92,16 +92,16 @@ def main(): print("\n" + "="*30 + " 当前阶段分析完成 " + "="*30) # 询问用户是否继续 - print("\n💡 你可以继续对数据提出分析需求,或者输入 'exit'/'quit' 结束程序。") - user_response = input("👉 请输入后续分析需求 (直接回车退出): ").strip() + print("\n[TIP] 你可以继续对数据提出分析需求,或者输入 'exit'/'quit' 结束程序。") + user_response = input("[>] 请输入后续分析需求 (直接回车退出): ").strip() if not user_response or user_response.lower() in ['exit', 'quit', 'n', 'no']: - print("👋 分析结束,再见!") + print("[BYE] 分析结束,再见!") break # 更新需求,进入下一轮循环 analysis_requirement = user_response - print(f"\n🔄 收到新需求,正在继续分析...") + print(f"\n[LOOP] 收到新需求,正在继续分析...") if __name__ == "__main__": diff --git a/merge_excel.py b/merge_excel.py index 1894621..7233afa 100644 --- a/merge_excel.py +++ b/merge_excel.py @@ -7,7 +7,7 @@ def merge_excel_files(source_dir="remotecontrol", output_file="merged_all_files. """ 将指定目录下的所有 Excel 文件 (.xlsx, .xls) 合并为一个 CSV 文件。 """ - print(f"🔍 正在扫描目录: {source_dir} ...") + print(f"[SEARCH] 正在扫描目录: {source_dir} ...") # 支持 xlsx 和 xls files_xlsx = glob.glob(os.path.join(source_dir, "*.xlsx")) @@ -15,36 +15,36 @@ def merge_excel_files(source_dir="remotecontrol", output_file="merged_all_files. files = files_xlsx + files_xls if not files: - print("⚠️ 未找到 Excel 文件。") + print("[WARN] 未找到 Excel 文件。") return # 按文件名中的数字进行排序 (例如: 1.xlsx, 2.xlsx, ..., 10.xlsx) try: files.sort(key=lambda x: int(os.path.basename(x).split('.')[0])) - print("🔢 已按文件名数字顺序排序") + print("[NUM] 已按文件名数字顺序排序") except ValueError: # 如果文件名不是纯数字,退回到字母排序 files.sort() - print("🔤 已按文件名包含非数字字符,使用字母顺序排序") + print("[TEXT] 已按文件名包含非数字字符,使用字母顺序排序") - print(f"📂 找到 {len(files)} 个文件: {files}") + print(f"[DIR] 找到 {len(files)} 个文件: {files}") all_dfs = [] for file in files: try: - print(f"📖 读取: {file}") + print(f"[READ] 读取: {file}") # 使用 ExcelFile 读取所有 sheet xls = pd.ExcelFile(file) - print(f" 📑 包含 Sheets: {xls.sheet_names}") + print(f" [PAGES] 包含 Sheets: {xls.sheet_names}") file_dfs = [] for sheet_name in xls.sheet_names: df = pd.read_excel(xls, sheet_name=sheet_name) if not df.empty: - print(f" ✅ Sheet '{sheet_name}' 读取成功: {len(df)} 行") + print(f" [OK] Sheet '{sheet_name}' 读取成功: {len(df)} 行") file_dfs.append(df) else: - print(f" ⚠️ Sheet '{sheet_name}' 为空,跳过") + print(f" [WARN] Sheet '{sheet_name}' 为空,跳过") if file_dfs: # 合并该文件的所有非空 sheet @@ -53,30 +53,30 @@ def merge_excel_files(source_dir="remotecontrol", output_file="merged_all_files. file_merged_df['Source_File'] = os.path.basename(file) all_dfs.append(file_merged_df) else: - print(f"⚠️ 文件 {file} 所有 Sheet 均为空") + print(f"[WARN] 文件 {file} 所有 Sheet 均为空") except Exception as e: - print(f"❌ 读取 {file} 失败: {e}") + print(f"[ERROR] 读取 {file} 失败: {e}") if all_dfs: - print("🔄 正在合并数据...") + print("[LOOP] 正在合并数据...") merged_df = pd.concat(all_dfs, ignore_index=True) # 按 SendTime 排序 if 'SendTime' in merged_df.columns: - print("⏳ 正在按 SendTime 排序...") + print("[TIMER] 正在按 SendTime 排序...") merged_df['SendTime'] = pd.to_datetime(merged_df['SendTime'], errors='coerce') merged_df = merged_df.sort_values(by='SendTime') else: - print("⚠️ 未找到 SendTime 列,跳过排序") + print("[WARN] 未找到 SendTime 列,跳过排序") - print(f"💾 保存到: {output_file}") + print(f"[CACHE] 保存到: {output_file}") merged_df.to_csv(output_file, index=False, encoding="utf-8-sig") - print(f"✅ 合并及排序完成!总行数: {len(merged_df)}") + print(f"[OK] 合并及排序完成!总行数: {len(merged_df)}") print(f" 输出文件: {os.path.abspath(output_file)}") else: - print("⚠️ 没有成功读取到任何数据。") + print("[WARN] 没有成功读取到任何数据。") if __name__ == "__main__": # 如果需要在当前目录运行并合并 remotecontrol 文件夹下的内容 diff --git a/prompts.py b/prompts.py index 98f49ed..8d1dffc 100644 --- a/prompts.py +++ b/prompts.py @@ -1,5 +1,4 @@ data_analysis_system_prompt = """你是一个专业的数据分析助手,运行在Jupyter Notebook环境中,能够根据用户需求生成和执行Python数据分析代码。 - **核心使命**: - 接收自然语言需求,分阶段生成高效、安全的数据分析代码。 - 深度挖掘数据,不仅仅是绘图,更要发现数据背后的业务洞察。 @@ -316,11 +315,19 @@ final_report_system_prompt = """你是一位**资深数据分析专家 (Senior D data_analysis_followup_prompt = """你是一个专业的数据分析助手,运行在Jupyter Notebook环境中。 当前处于**追问模式 (Follow-up Mode)**。用户基于之前的分析结果提出了新的需求。 +<<<<<<< HEAD **核心使命**: - 直接针对用户的后续需求进行解答,**无需**重新执行完整SOP。 - 只有当用户明确要求重新进行全流程分析时,才执行SOP。 **核心能力**: +======= +[TARGET] **核心使命**: +- 直接针对用户的后续需求进行解答,**无需**重新执行完整SOP。 +- 只有当用户明确要求重新进行全流程分析时,才执行SOP。 + +[TOOL] **核心能力**: +>>>>>>> e9644360ce283742849fe67c38d05864513e2f96 1. **代码执行**:自动编写并执行Pandas/Matplotlib代码。 2. **多模态分析**:支持时序预测、文本挖掘(N-gram)、多维交叉分析。 3. **智能纠错**:遇到报错自动分析原因并修复代码。 diff --git a/raw_data/.gitkeep b/raw_data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/sort_csv.py b/sort_csv.py index c03a1b3..1e07f62 100644 --- a/sort_csv.py +++ b/sort_csv.py @@ -7,39 +7,39 @@ def sort_csv_by_time(file_path="remotecontrol_merged.csv", time_col="SendTime"): 读取 CSV 文件,按时间列排序,并保存。 """ if not os.path.exists(file_path): - print(f"❌ 文件不存在: {file_path}") + print(f"[ERROR] 文件不存在: {file_path}") return - print(f"📖 正在读取 {file_path} ...") + print(f"[READ] 正在读取 {file_path} ...") try: # 读取 CSV df = pd.read_csv(file_path, low_memory=False) - print(f" 📊 数据行数: {len(df)}") + print(f" [CHART] 数据行数: {len(df)}") if time_col not in df.columns: - print(f"❌ 未找到时间列: {time_col}") + print(f"[ERROR] 未找到时间列: {time_col}") print(f" 可用列: {list(df.columns)}") return - print(f"🔄 正在解析时间列 '{time_col}' ...") + print(f"[LOOP] 正在解析时间列 '{time_col}' ...") # 转换为 datetime 对象,无法解析的设为 NaT df[time_col] = pd.to_datetime(df[time_col], errors='coerce') # 检查无效时间 nat_count = df[time_col].isna().sum() if nat_count > 0: - print(f"⚠️ 发现 {nat_count} 行无效时间数据,排序时将排在最后") + print(f"[WARN] 发现 {nat_count} 行无效时间数据,排序时将排在最后") - print("🔄 正在按时间排序...") + print("[LOOP] 正在按时间排序...") df_sorted = df.sort_values(by=time_col) - print(f"💾 正在保存及覆盖文件: {file_path} ...") + print(f"[CACHE] 正在保存及覆盖文件: {file_path} ...") df_sorted.to_csv(file_path, index=False, encoding="utf-8-sig") - print("✅ 排序并保存完成!") + print("[OK] 排序并保存完成!") except Exception as e: - print(f"❌处理失败: {e}") + print(f"[ERROR]处理失败: {e}") if __name__ == "__main__": sort_csv_by_time() diff --git a/test.py b/test.py index e29c938..802147b 100644 --- a/test.py +++ b/test.py @@ -1,13 +1,13 @@ +from openai import OpenAI -import openai - -client = openai.OpenAI( - api_key="sk-Gce85QLROESeOWf3icd2mQnYHOrmMYojwVPQ0AubMjGQ5ZE2", - base_url="https://gemini.jeason.online/v1" +client = OpenAI( + base_url="http://127.0.0.1:9999/v1", + api_key="sk-2187174de21548b0b8b0c92129700199" ) response = client.chat.completions.create( - model="gemini-2.5-pro", - messages=[{"role": "user", "content": "你好,请自我介绍"}] + model="claude-sonnet-4-5", + messages=[{"role": "user", "content": "Hello"}] ) + print(response.choices[0].message.content) \ No newline at end of file diff --git a/utils/cache_manager.py b/utils/cache_manager.py index 63215c5..9ad9685 100644 --- a/utils/cache_manager.py +++ b/utils/cache_manager.py @@ -42,7 +42,7 @@ class CacheManager: with open(cache_path, 'rb') as f: return pickle.load(f) except Exception as e: - print(f"⚠️ 读取缓存失败: {e}") + print(f"[WARN] 读取缓存失败: {e}") return None return None @@ -56,14 +56,14 @@ class CacheManager: with open(cache_path, 'wb') as f: pickle.dump(value, f) except Exception as e: - print(f"⚠️ 写入缓存失败: {e}") + print(f"[WARN] 写入缓存失败: {e}") def clear(self) -> None: """清空所有缓存""" if self.cache_dir.exists(): for cache_file in self.cache_dir.glob("*.pkl"): cache_file.unlink() - print("✅ 缓存已清空") + print("[OK] 缓存已清空") def cached(self, key_func: Optional[Callable] = None): """缓存装饰器""" @@ -82,7 +82,7 @@ class CacheManager: # 尝试从缓存获取 cached_value = self.get(cache_key) if cached_value is not None: - print(f"💾 使用缓存: {cache_key[:8]}...") + print(f"[CACHE] 使用缓存: {cache_key[:8]}...") return cached_value # 执行函数并缓存结果 diff --git a/utils/code_executor.py b/utils/code_executor.py index 17f5537..c2cab11 100644 --- a/utils/code_executor.py +++ b/utils/code_executor.py @@ -410,17 +410,17 @@ from IPython.display import display try: # 尝试保存 fig.savefig(auto_filepath, bbox_inches='tight') - print(f"💾 [Auto-Save] 检测到未闭合图表,已安全保存至: {auto_filepath}") + print(f"[CACHE] [Auto-Save] 检测到未闭合图表,已安全保存至: {auto_filepath}") # 添加到输出中,告知Agent - output += f"\n[Auto-Save] ⚠️ 检测到Figure {fig_num}未关闭,系统已自动保存为: {auto_filename}" + output += f"\n[Auto-Save] [WARN] 检测到Figure {fig_num}未关闭,系统已自动保存为: {auto_filename}" self.image_counter += 1 except Exception as e: - print(f"⚠️ [Auto-Save] 保存失败: {e}") + print(f"[WARN] [Auto-Save] 保存失败: {e}") finally: plt.close(fig_num) except Exception as e: - print(f"⚠️ [Auto-Save Global] 异常: {e}") + print(f"[WARN] [Auto-Save Global] 异常: {e}") # --- 自动保存机制 end --- return { diff --git a/utils/data_loader.py b/utils/data_loader.py index 9717779..5f413fd 100644 --- a/utils/data_loader.py +++ b/utils/data_loader.py @@ -34,7 +34,7 @@ def load_and_profile_data(file_paths: list) -> str: profile_summary += f"## 文件: {file_name}\n\n" if not os.path.exists(file_path): - profile_summary += f"⚠️ 文件不存在: {file_path}\n\n" + profile_summary += f"[WARN] 文件不存在: {file_path}\n\n" continue try: @@ -52,7 +52,7 @@ def load_and_profile_data(file_paths: list) -> str: elif ext in ['.xlsx', '.xls']: df = pd.read_excel(file_path) else: - profile_summary += f"⚠️ 不支持的文件格式: {ext}\n\n" + profile_summary += f"[WARN] 不支持的文件格式: {ext}\n\n" continue # 基础信息 @@ -70,7 +70,7 @@ def load_and_profile_data(file_paths: list) -> str: profile_summary += f"#### {col} ({dtype})\n" if null_count > 0: - profile_summary += f"- ⚠️ 空值: {null_count} ({null_ratio:.1f}%)\n" + profile_summary += f"- [WARN] 空值: {null_count} ({null_ratio:.1f}%)\n" # 数值列分析 if pd.api.types.is_numeric_dtype(dtype): @@ -96,7 +96,7 @@ def load_and_profile_data(file_paths: list) -> str: profile_summary += "\n" except Exception as e: - profile_summary += f"❌ 读取或分析文件失败: {str(e)}\n\n" + profile_summary += f"[ERROR] 读取或分析文件失败: {str(e)}\n\n" return profile_summary @@ -141,7 +141,7 @@ def load_data_chunked(file_path: str, chunksize: Optional[int] = None) -> Iterat except UnicodeDecodeError: continue except Exception as e: - print(f"❌ 读取CSV文件失败: {e}") + print(f"[ERROR] 读取CSV文件失败: {e}") break elif ext in ['.xlsx', '.xls']: # Excel文件不支持chunksize,直接读取 @@ -151,7 +151,7 @@ def load_data_chunked(file_path: str, chunksize: Optional[int] = None) -> Iterat for i in range(0, len(df), chunksize): yield df.iloc[i:i+chunksize] except Exception as e: - print(f"❌ 读取Excel文件失败: {e}") + print(f"[ERROR] 读取Excel文件失败: {e}") def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional[pd.DataFrame]: @@ -166,7 +166,7 @@ def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional DataFrame或None """ if not os.path.exists(file_path): - print(f"⚠️ 文件不存在: {file_path}") + print(f"[WARN] 文件不存在: {file_path}") return None # 检查文件大小 @@ -174,7 +174,7 @@ def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional # 对于大文件,建议使用流式处理 if file_size_mb > app_config.max_file_size_mb: - print(f"⚠️ 文件过大 ({file_size_mb:.1f}MB),建议使用 load_data_chunked() 流式处理") + print(f"[WARN] 文件过大 ({file_size_mb:.1f}MB),建议使用 load_data_chunked() 流式处理") # 生成缓存键 cache_key = get_file_hash(file_path) @@ -183,7 +183,7 @@ def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional if not force_reload and app_config.data_cache_enabled: cached_data = data_cache.get(cache_key) if cached_data is not None: - print(f"💾 从缓存加载数据: {os.path.basename(file_path)}") + print(f"[CACHE] 从缓存加载数据: {os.path.basename(file_path)}") return cached_data # 加载数据 @@ -202,16 +202,16 @@ def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional elif ext in ['.xlsx', '.xls']: df = pd.read_excel(file_path) else: - print(f"⚠️ 不支持的文件格式: {ext}") + print(f"[WARN] 不支持的文件格式: {ext}") return None # 缓存数据 if df is not None and app_config.data_cache_enabled: data_cache.set(cache_key, df) - print(f"✅ 数据已缓存: {os.path.basename(file_path)}") + print(f"[OK] 数据已缓存: {os.path.basename(file_path)}") return df except Exception as e: - print(f"❌ 加载数据失败: {e}") + print(f"[ERROR] 加载数据失败: {e}") return None diff --git a/utils/data_quality.py b/utils/data_quality.py index 4458d62..4919b37 100644 --- a/utils/data_quality.py +++ b/utils/data_quality.py @@ -192,27 +192,27 @@ class DataQualityChecker: summary += f"**质量评分**: {self.quality_score:.1f}/100\n\n" if self.quality_score >= 90: - summary += "✅ **评级**: 优秀 - 数据质量很好\n\n" + summary += "[OK] **评级**: 优秀 - 数据质量很好\n\n" elif self.quality_score >= 75: - summary += "⚠️ **评级**: 良好 - 存在一些小问题\n\n" + summary += "[WARN] **评级**: 良好 - 存在一些小问题\n\n" elif self.quality_score >= 60: - summary += "⚠️ **评级**: 一般 - 需要处理多个问题\n\n" + summary += "[WARN] **评级**: 一般 - 需要处理多个问题\n\n" else: - summary += "❌ **评级**: 差 - 数据质量问题严重\n\n" + summary += "[ERROR] **评级**: 差 - 数据质量问题严重\n\n" summary += f"**问题统计**: 共 {len(self.issues)} 个质量问题\n" - summary += f"- 🔴 高严重性: {len([i for i in self.issues if i.severity == 'high'])} 个\n" - summary += f"- 🟡 中严重性: {len([i for i in self.issues if i.severity == 'medium'])} 个\n" - summary += f"- 🟢 低严重性: {len([i for i in self.issues if i.severity == 'low'])} 个\n\n" + summary += f"- [RED] 高严重性: {len([i for i in self.issues if i.severity == 'high'])} 个\n" + summary += f"- [YELLOW] 中严重性: {len([i for i in self.issues if i.severity == 'medium'])} 个\n" + summary += f"- [GREEN] 低严重性: {len([i for i in self.issues if i.severity == 'low'])} 个\n\n" if self.issues: summary += "### 主要问题:\n\n" # 只显示高和中严重性的问题 for issue in self.issues: if issue.severity in ["high", "medium"]: - emoji = "🔴" if issue.severity == "high" else "🟡" + emoji = "[RED]" if issue.severity == "high" else "[YELLOW]" summary += f"{emoji} **{issue.column}** - {issue.description}\n" - summary += f" 💡 {issue.suggestion}\n\n" + summary += f" [TIP] {issue.suggestion}\n\n" return summary diff --git a/utils/fallback_openai_client.py b/utils/fallback_openai_client.py index 0caed5a..005f137 100644 --- a/utils/fallback_openai_client.py +++ b/utils/fallback_openai_client.py @@ -57,7 +57,7 @@ class AsyncFallbackOpenAIClient: self.fallback_client = AsyncOpenAI(api_key=fallback_api_key, base_url=fallback_base_url, **_fallback_args) self.fallback_model_name = fallback_model_name else: - print("⚠️ 警告: 未完全配置备用 API 客户端。如果主 API 失败,将无法进行回退。") + print("[WARN] 警告: 未完全配置备用 API 客户端。如果主 API 失败,将无法进行回退。") self.content_filter_error_code = content_filter_error_code self.content_filter_error_field = content_filter_error_field @@ -90,11 +90,11 @@ class AsyncFallbackOpenAIClient: return completion except (APIConnectionError, APITimeoutError) as e: # 通常可以重试的网络错误 last_exception = e - print(f"⚠️ {api_name} API 调用时发生可重试错误 ({type(e).__name__}): {e}. 尝试次数 {attempt + 1}/{max_retries + 1}") + print(f"[WARN] {api_name} API 调用时发生可重试错误 ({type(e).__name__}): {e}. 尝试次数 {attempt + 1}/{max_retries + 1}") if attempt < max_retries: await asyncio.sleep(self.retry_delay_seconds * (attempt + 1)) # 增加延迟 else: - print(f"❌ {api_name} API 在达到最大重试次数后仍然失败。") + print(f"[ERROR] {api_name} API 在达到最大重试次数后仍然失败。") except APIStatusError as e: # API 返回的特定状态码错误 is_content_filter_error = False retry_after = None @@ -118,7 +118,7 @@ class AsyncFallbackOpenAIClient: if delay_str.endswith("s"): try: retry_after = float(delay_str[:-1]) - print(f"⏳ 收到服务器 RetryInfo,等待时间: {retry_after}秒") + print(f"[TIMER] 收到服务器 RetryInfo,等待时间: {retry_after}秒") except ValueError: pass except Exception: @@ -128,7 +128,7 @@ class AsyncFallbackOpenAIClient: raise e last_exception = e - print(f"⚠️ {api_name} API 调用时发生 APIStatusError ({e.status_code}): {e}. 尝试次数 {attempt + 1}/{max_retries + 1}") + print(f"[WARN] {api_name} API 调用时发生 APIStatusError ({e.status_code}): {e}. 尝试次数 {attempt + 1}/{max_retries + 1}") if attempt < max_retries: # 如果获取到了明确的 retry_after,则使用它;否则使用默认的指数退避 @@ -137,13 +137,13 @@ class AsyncFallbackOpenAIClient: if e.status_code == 429 and retry_after is None: wait_time = max(wait_time, 5.0 * (attempt + 1)) # 429 默认至少等 5 秒 - print(f"💤 将等待 {wait_time:.2f} 秒后重试...") + print(f"[WAIT] 将等待 {wait_time:.2f} 秒后重试...") await asyncio.sleep(wait_time) else: - print(f"❌ {api_name} API 在达到最大重试次数后仍然失败 (APIStatusError)。") + print(f"[ERROR] {api_name} API 在达到最大重试次数后仍然失败 (APIStatusError)。") except APIError as e: # 其他不可轻易重试的 OpenAI 错误 last_exception = e - print(f"❌ {api_name} API 调用时发生不可重试错误 ({type(e).__name__}): {e}") + print(f"[ERROR] {api_name} API 调用时发生不可重试错误 ({type(e).__name__}): {e}") break # 不再重试此类错误 if last_exception: @@ -196,7 +196,7 @@ class AsyncFallbackOpenAIClient: pass if is_content_filter_error and self.fallback_client and self.fallback_model_name: - print(f"ℹ️ 主 API 内容过滤错误 ({e_primary.status_code})。尝试切换到备用 API ({self.fallback_client.base_url})...") + print(f"[INFO] 主 API 内容过滤错误 ({e_primary.status_code})。尝试切换到备用 API ({self.fallback_client.base_url})...") try: fallback_completion = await self._attempt_api_call( client=self.fallback_client, @@ -206,20 +206,20 @@ class AsyncFallbackOpenAIClient: api_name="备用", **kwargs.copy() ) - print(f"✅ 备用 API 调用成功。") + print(f"[OK] 备用 API 调用成功。") return fallback_completion except APIError as e_fallback: - print(f"❌ 备用 API 调用最终失败: {type(e_fallback).__name__} - {e_fallback}") + print(f"[ERROR] 备用 API 调用最终失败: {type(e_fallback).__name__} - {e_fallback}") raise e_fallback else: if not (self.fallback_client and self.fallback_model_name and is_content_filter_error): # 如果不是内容过滤错误,或者没有可用的备用API,则记录主API的原始错误 - print(f"ℹ️ 主 API 错误 ({type(e_primary).__name__}: {e_primary}), 且不满足备用条件或备用API未配置。") + print(f"[INFO] 主 API 错误 ({type(e_primary).__name__}: {e_primary}), 且不满足备用条件或备用API未配置。") raise e_primary except APIError as e_primary_other: - print(f"❌ 主 API 调用最终失败 (非内容过滤,错误类型: {type(e_primary_other).__name__}): {e_primary_other}") + print(f"[ERROR] 主 API 调用最终失败 (非内容过滤,错误类型: {type(e_primary_other).__name__}): {e_primary_other}") if self.fallback_client and self.fallback_model_name: - print(f"ℹ️ 主 API 失败,尝试切换到备用 API ({self.fallback_client.base_url})...") + print(f"[INFO] 主 API 失败,尝试切换到备用 API ({self.fallback_client.base_url})...") try: fallback_completion = await self._attempt_api_call( client=self.fallback_client, @@ -229,10 +229,10 @@ class AsyncFallbackOpenAIClient: api_name="备用", **kwargs.copy() ) - print(f"✅ 备用 API 调用成功。") + print(f"[OK] 备用 API 调用成功。") return fallback_completion except APIError as e_fallback_after_primary_fail: - print(f"❌ 备用 API 在主 API 失败后也调用失败: {type(e_fallback_after_primary_fail).__name__} - {e_fallback_after_primary_fail}") + print(f"[ERROR] 备用 API 在主 API 失败后也调用失败: {type(e_fallback_after_primary_fail).__name__} - {e_fallback_after_primary_fail}") raise e_fallback_after_primary_fail else: raise e_primary_other diff --git a/utils/format_execution_result.py b/utils/format_execution_result.py index 7706d92..d886b10 100644 --- a/utils/format_execution_result.py +++ b/utils/format_execution_result.py @@ -7,17 +7,17 @@ def format_execution_result(result: Dict[str, Any]) -> str: feedback = [] if result['success']: - feedback.append("✅ 代码执行成功") + feedback.append("[OK] 代码执行成功") if result['output']: - feedback.append(f"📊 输出结果:\n{result['output']}") + feedback.append(f"[CHART] 输出结果:\n{result['output']}") if result.get('variables'): - feedback.append("📋 新生成的变量:") + feedback.append("[LIST] 新生成的变量:") for var_name, var_info in result['variables'].items(): feedback.append(f" - {var_name}: {var_info}") else: - feedback.append("❌ 代码执行失败") + feedback.append("[ERROR] 代码执行失败") feedback.append(f"错误信息: {result['error']}") if result['output']: feedback.append(f"部分输出: {result['output']}") diff --git a/utils/llm_helper.py b/utils/llm_helper.py index 34b50e0..70868e2 100644 --- a/utils/llm_helper.py +++ b/utils/llm_helper.py @@ -117,7 +117,7 @@ class LLMHelper: if use_cache and app_config.llm_cache_enabled: cached_response = llm_cache.get(cache_key) if cached_response: - print("💾 使用LLM缓存响应") + print("[CACHE] 使用LLM缓存响应") return cached_response # 调用LLM diff --git a/utils/script_generator.py b/utils/script_generator.py new file mode 100644 index 0000000..f6ebee4 --- /dev/null +++ b/utils/script_generator.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +""" +可复用脚本生成器 + +从分析会话的执行历史中提取成功执行的代码, +合并去重后生成可独立运行的 .py 脚本文件。 +""" + +import os +import re +from datetime import datetime +from typing import List, Dict, Any, Set + + +def extract_imports(code: str) -> Set[str]: + """从代码中提取所有 import 语句""" + imports = set() + lines = code.split('\n') + for line in lines: + stripped = line.strip() + if stripped.startswith('import ') or stripped.startswith('from '): + # 标准化 import 语句 + imports.add(stripped) + return imports + + +def remove_imports(code: str) -> str: + """从代码中移除所有 import 语句""" + lines = code.split('\n') + result_lines = [] + for line in lines: + stripped = line.strip() + if not stripped.startswith('import ') and not stripped.startswith('from '): + result_lines.append(line) + return '\n'.join(result_lines) + + +def clean_code_block(code: str) -> str: + """清理代码块,移除不必要的内容""" + # 移除可能的重复配置代码 + patterns_to_skip = [ + r"plt\.rcParams\['font\.sans-serif'\]", # 字体配置在模板中统一处理 + r"plt\.rcParams\['axes\.unicode_minus'\]", + ] + + lines = code.split('\n') + result_lines = [] + skip_until_empty = False + + for line in lines: + stripped = line.strip() + + # 跳过空行连续的情况 + if not stripped: + if skip_until_empty: + skip_until_empty = False + continue + result_lines.append(line) + continue + + # 检查是否需要跳过的模式 + should_skip = False + for pattern in patterns_to_skip: + if re.search(pattern, stripped): + should_skip = True + break + + if not should_skip: + result_lines.append(line) + + return '\n'.join(result_lines) + + +def generate_reusable_script( + analysis_results: List[Dict[str, Any]], + data_files: List[str], + session_output_dir: str, + user_requirement: str = "" +) -> str: + """ + 从分析结果中生成可复用的 Python 脚本 + + Args: + analysis_results: 分析过程中记录的结果列表,每个元素包含 'code', 'result' 等 + data_files: 原始数据文件路径列表 + session_output_dir: 会话输出目录 + user_requirement: 用户的原始需求描述 + + Returns: + 生成的脚本文件路径 + """ + # 收集所有成功执行的代码 + all_imports = set() + code_blocks = [] + + for result in analysis_results: + # 只处理 generate_code 类型的结果 + if result.get("action") == "collect_figures": + continue + + code = result.get("code", "") + exec_result = result.get("result", {}) + + # 只收集成功执行的代码 + if code and exec_result.get("success", False): + # 提取 imports + imports = extract_imports(code) + all_imports.update(imports) + + # 清理代码块 + cleaned_code = remove_imports(code) + cleaned_code = clean_code_block(cleaned_code) + + # 只添加非空的代码块 + if cleaned_code.strip(): + code_blocks.append({ + "round": result.get("round", 0), + "code": cleaned_code.strip() + }) + + if not code_blocks: + print("[WARN] 没有成功执行的代码块,跳过脚本生成") + return "" + + # 生成脚本内容 + now = datetime.now() + timestamp = now.strftime("%Y%m%d_%H%M%S") + + # 构建脚本头部 + script_header = f'''#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +数据分析脚本 - 自动生成 +===================================== +原始数据文件: {', '.join(data_files)} +生成时间: {now.strftime("%Y-%m-%d %H:%M:%S")} +原始需求: {user_requirement[:200] + '...' if len(user_requirement) > 200 else user_requirement} +===================================== + +使用方法: +1. 修改下方 DATA_FILES 列表中的文件路径 +2. 修改 OUTPUT_DIR 指定输出目录 +3. 运行: python {os.path.basename(session_output_dir)}_分析脚本.py +""" + +import os +''' + + # 添加标准 imports(去重后排序) + standard_imports = sorted([imp for imp in all_imports if imp.startswith('import ')]) + from_imports = sorted([imp for imp in all_imports if imp.startswith('from ')]) + + imports_section = '\n'.join(standard_imports + from_imports) + + # 配置区域 + config_section = f''' +# ========== 配置区域 (可修改) ========== + +# 数据文件路径 - 修改此处以分析不同的数据 +DATA_FILES = {repr(data_files)} + +# 输出目录 - 图片和报告将保存在此目录 +OUTPUT_DIR = "./analysis_output" + +# 创建输出目录 +os.makedirs(OUTPUT_DIR, exist_ok=True) + +# ========== 字体配置 (中文显示) ========== +import platform +import matplotlib.pyplot as plt + +system_name = platform.system() +if system_name == 'Darwin': + plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'PingFang SC', 'sans-serif'] +elif system_name == 'Windows': + plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'sans-serif'] +else: + plt.rcParams['font.sans-serif'] = ['WenQuanYi Micro Hei', 'sans-serif'] +plt.rcParams['axes.unicode_minus'] = False + +# 设置 session_output_dir 变量(兼容原始代码) +session_output_dir = OUTPUT_DIR +''' + + # 合并代码块 + code_section = "\n# ========== 分析代码 ==========\n\n" + + for i, block in enumerate(code_blocks, 1): + code_section += f"# --- 第 {block['round']} 轮分析 ---\n" + code_section += block['code'] + "\n\n" + + # 脚本尾部 + script_footer = ''' +# ========== 完成 ========== +print("\\n" + "=" * 50) +print("[OK] 分析完成!") +print(f"[OUTPUT] 输出目录: {os.path.abspath(OUTPUT_DIR)}") +print("=" * 50) +''' + + # 组装完整脚本 + full_script = script_header + imports_section + config_section + code_section + script_footer + + # 保存脚本文件 + script_filename = f"分析脚本_{timestamp}.py" + script_path = os.path.join(session_output_dir, script_filename) + + try: + with open(script_path, 'w', encoding='utf-8') as f: + f.write(full_script) + print(f"[OK] 可复用脚本已生成: {script_path}") + return script_path + except Exception as e: + print(f"[ERROR] 保存脚本失败: {e}") + return "" diff --git a/web/main.py b/web/main.py index 9abac5b..21f6db1 100644 --- a/web/main.py +++ b/web/main.py @@ -5,6 +5,7 @@ import threading import glob import uuid import json +from datetime import datetime from typing import Optional, Dict, List from fastapi import FastAPI, UploadFile, File, BackgroundTasks, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware @@ -18,8 +19,8 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from data_analysis_agent import DataAnalysisAgent from config.llm_config import LLMConfig from utils.create_session_dir import create_session_output_dir -from merge_excel import merge_excel_files -from sort_csv import sort_csv_by_time +from config.llm_config import LLMConfig +from utils.create_session_dir import create_session_output_dir app = FastAPI(title="IOV Data Analysis Agent") @@ -55,6 +56,7 @@ class SessionData: self.last_updated: str = "" self.user_requirement: str = "" self.file_list: List[str] = [] + self.reusable_script: Optional[str] = None # 新增:可复用脚本路径 class SessionManager: @@ -70,7 +72,74 @@ class SessionManager: def get_session(self, session_id: str) -> Optional[SessionData]: - return self.sessions.get(session_id) + if session_id in self.sessions: + return self.sessions[session_id] + + # Fallback: Try to reconstruct from disk for history sessions + output_dir = os.path.join("outputs", f"session_{session_id}") + if os.path.exists(output_dir) and os.path.isdir(output_dir): + return self._reconstruct_session(session_id, output_dir) + + return None + + def _reconstruct_session(self, session_id: str, output_dir: str) -> SessionData: + """从磁盘目录重建会话对象""" + session = SessionData(session_id) + session.output_dir = output_dir + session.is_running = False + session.current_round = session.max_rounds + session.progress_percentage = 100.0 + session.status_message = "已完成 (历史记录)" + + # Recover Log + log_path = os.path.join(output_dir, "process.log") + if os.path.exists(log_path): + session.log_file = log_path + + # Recover Report + # 宽容查找:扫描所有 .md 文件,优先取包含 "report" 或 "报告" 的文件 + md_files = glob.glob(os.path.join(output_dir, "*.md")) + if md_files: + # 默认取第一个 + chosen = md_files[0] + # 尝试找更好的匹配 + for md in md_files: + fname = os.path.basename(md).lower() + if "report" in fname or "报告" in fname: + chosen = md + break + session.generated_report = chosen + + # Recover Script (查找可能的脚本文件) + possible_scripts = ["data_analysis_script.py", "script.py", "analysis_script.py"] + for s in possible_scripts: + p = os.path.join(output_dir, s) + if os.path.exists(p): + session.reusable_script = p + break + + # Recover Results (images etc) + results_json = os.path.join(output_dir, "results.json") + if os.path.exists(results_json): + try: + with open(results_json, "r") as f: + session.analysis_results = json.load(f) + except: + pass + + # Recover Metadata + try: + stat = os.stat(output_dir) + dt = datetime.fromtimestamp(stat.st_ctime) + session.created_at = dt.strftime("%Y-%m-%d %H:%M:%S") + except: + pass + + # Cache it + with self.lock: + self.sessions[session_id] = session + + return session def list_sessions(self): return list(self.sessions.keys()) @@ -99,7 +168,10 @@ class SessionManager: "max_rounds": session.max_rounds, "created_at": session.created_at, "last_updated": session.last_updated, - "user_requirement": session.user_requirement[:100] + "..." if len(session.user_requirement) > 100 else session.user_requirement + "created_at": session.created_at, + "last_updated": session.last_updated, + "user_requirement": session.user_requirement[:100] + "..." if len(session.user_requirement) > 100 else session.user_requirement, + "script_path": session.reusable_script # 新增:返回脚本路径 } return None @@ -227,6 +299,7 @@ def run_analysis_task(session_id: str, files: list, user_requirement: str, is_fo session.generated_report = result.get("report_file_path", None) session.analysis_results = result.get("analysis_results", []) + session.reusable_script = result.get("reusable_script_path", None) # 新增:保存脚本路径 # Save results to json for persistence with open(os.path.join(session_output_dir, "results.json"), "w") as f: @@ -311,7 +384,8 @@ async def get_status(session_id: str = Query(..., description="Session ID")): "is_running": session.is_running, "log": log_content, "has_report": session.generated_report is not None, - "report_path": session.generated_report + "report_path": session.generated_report, + "script_path": session.reusable_script # 新增:返回脚本路径 } @app.get("/api/export") @@ -453,71 +527,24 @@ async def export_report(session_id: str = Query(..., description="Session ID")): media_type='application/zip' ) +@app.get("/api/download_script") +async def download_script(session_id: str = Query(..., description="Session ID")): + """下载生成的Python脚本""" + session = session_manager.get_session(session_id) + if not session or not session.reusable_script: + raise HTTPException(status_code=404, detail="Script not found") + + if not os.path.exists(session.reusable_script): + raise HTTPException(status_code=404, detail="Script file missing on server") + + return FileResponse( + path=session.reusable_script, + filename=os.path.basename(session.reusable_script), + media_type='text/x-python' + ) + # --- Tools API --- -class ToolRequest(BaseModel): - source_dir: Optional[str] = "uploads" - output_filename: Optional[str] = "merged_output.csv" - target_file: Optional[str] = None - -@app.post("/api/tools/merge") -async def tool_merge_excel(req: ToolRequest): - """ - Trigger Excel Merge Tool - """ - try: - source = req.source_dir - output = req.output_filename - - import asyncio - loop = asyncio.get_event_loop() - - await loop.run_in_executor(None, lambda: merge_excel_files(source, output)) - - output_abs = os.path.abspath(output) - if os.path.exists(output_abs): - return {"status": "success", "message": "Merge completed", "output_file": output_abs} - - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/api/tools/sort") -async def tool_sort_csv(req: ToolRequest): - """ - Trigger CSV Sort Tool - """ - try: - target = req.target_file - if not target: - raise HTTPException(status_code=400, detail="Target file required") - - import asyncio - loop = asyncio.get_event_loop() - - await loop.run_in_executor(None, lambda: sort_csv_by_time(target)) - - return {"status": "success", "message": f"Sorted {target} by time"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -# --- Help API --- - -@app.get("/api/help/troubleshooting") -async def get_troubleshooting_guide(): - """ - Returns the content of troubleshooting_guide.md - """ - guide_path = os.path.expanduser("~/.gemini/antigravity/brain/3ff617fe-5f27-4ab8-b61b-c634f2e75255/troubleshooting_guide.md") - - if not os.path.exists(guide_path): - return {"content": "# Troubleshooting Guide Not Found\n\nCould not locate the guide artifact."} - - try: - with open(guide_path, "r", encoding="utf-8") as f: - content = f.read() - return {"content": content} - except Exception as e: - return {"content": f"# Error Loading Guide\n\n{e}"} # --- 新增API端点 --- @@ -553,6 +580,61 @@ async def delete_specific_session(session_id: str): raise HTTPException(status_code=404, detail="Session not found") return {"status": "deleted", "session_id": session_id} + return {"status": "deleted", "session_id": session_id} + + +# --- History API --- + +@app.get("/api/history") +async def get_history(): + """ + Get list of past analysis sessions from outputs directory + """ + history = [] + output_base = "outputs" + + if not os.path.exists(output_base): + return {"history": []} + + try: + # Scan for session_* directories + for entry in os.scandir(output_base): + if entry.is_dir() and entry.name.startswith("session_"): + # Extract timestamp from folder name: session_20250101_120000 + session_id = entry.name.replace("session_", "") + + # Check creation time or extract from name + try: + # Try to parse timestamp from ID if it matches format + # Format: YYYYMMDD_HHMMSS + timestamp_str = session_id + dt = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S") + display_time = dt.strftime("%Y-%m-%d %H:%M:%S") + sort_key = dt.timestamp() + except ValueError: + # Fallback to file creation time + sort_key = entry.stat().st_ctime + display_time = datetime.fromtimestamp(sort_key).strftime("%Y-%m-%d %H:%M:%S") + + history.append({ + "id": session_id, + "timestamp": display_time, + "sort_key": sort_key, + "name": f"Session {display_time}" + }) + + # Sort by latest first + history.sort(key=lambda x: x["sort_key"], reverse=True) + + # Cleanup internal sort key + for item in history: + del item["sort_key"] + + return {"history": history} + + except Exception as e: + print(f"Error scanning history: {e}") + return {"history": []} if __name__ == "__main__": import uvicorn diff --git a/web/static/clean_style.css b/web/static/clean_style.css new file mode 100644 index 0000000..a31b0bd --- /dev/null +++ b/web/static/clean_style.css @@ -0,0 +1,535 @@ +/* Clean Style - IOV Data Analysis Agent */ + +:root { + --primary-color: #2563EB; + /* Tech Blue */ + --primary-hover: #1D4ED8; + --bg-color: #FFFFFF; + --sidebar-bg: #F9FAFB; + --text-primary: #111827; + --text-secondary: #6B7280; + --border-color: #E5E7EB; + --card-shadow: 0 1px 3px 0 rgba(0, 0, 0, 0.1), 0 1px 2px 0 rgba(0, 0, 0, 0.06); + --font-family: 'Inter', -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif; +} + +* { + box-sizing: border-box; + margin: 0; + padding: 0; +} + +body { + font-family: var(--font-family); + color: var(--text-primary); + background-color: var(--bg-color); + line-height: 1.5; + height: 100vh; + overflow: hidden; +} + +.app-container { + display: flex; + height: 100vh; +} + +/* Sidebar */ +.sidebar { + width: 240px; + /* Compact width */ + background-color: var(--sidebar-bg); + border-right: 1px solid var(--border-color); + display: flex; + flex-direction: column; + padding: 1rem; + flex-shrink: 0; +} + +.brand { + display: flex; + align-items: center; + gap: 0.75rem; + margin-bottom: 1.5rem; + font-weight: 600; + color: var(--text-primary); +} + +.brand i { + color: var(--primary-color); + font-size: 1.5rem; +} + +.nav-menu { + display: flex; + flex-direction: column; + gap: 0.5rem; + flex: 1; + overflow-y: hidden; + /* Let history list handle scroll */ +} + +.nav-item { + display: flex; + align-items: center; + gap: 0.75rem; + padding: 0.75rem 1rem; + border-radius: 0.375rem; + color: var(--text-secondary); + text-decoration: none; + cursor: pointer; + transition: all 0.2s; + font-size: 0.95rem; + border: none; + background: none; + width: 100%; + text-align: left; +} + +.nav-item:hover { + background-color: #F3F4F6; + color: var(--text-primary); +} + +.nav-item.active { + background-color: #EFF6FF; + color: var(--primary-color); + font-weight: 500; +} + +.nav-item i { + width: 1.25rem; + text-align: center; +} + +.nav-divider { + height: 1px; + background-color: var(--border-color); + margin: 1rem 0 0.5rem 0; +} + +.nav-section-title { + font-size: 0.75rem; + text-transform: uppercase; + color: var(--text-secondary); + font-weight: 600; + letter-spacing: 0.05em; + margin-bottom: 0.5rem; + padding-left: 0.5rem; +} + +/* History List */ +.history-list { + flex: 1; + overflow-y: auto; + display: flex; + flex-direction: column; + gap: 0.25rem; + padding-right: 5px; +} + +.history-item { + font-size: 0.85rem; + color: var(--text-secondary); + padding: 0.5rem 0.75rem; + border-radius: 0.375rem; + cursor: pointer; + transition: all 0.2s; + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; + display: flex; + align-items: center; + gap: 0.5rem; +} + +.history-item:hover { + background-color: #F3F4F6; + color: var(--text-primary); +} + +.history-item.active { + background-color: #EFF6FF; + color: var(--primary-color); +} + + +.status-bar { + margin-top: auto; + padding-top: 1rem; + border-top: 1px solid var(--border-color); + display: flex; + align-items: center; + gap: 0.5rem; + font-size: 0.875rem; + color: var(--text-secondary); +} + +.status-dot { + width: 8px; + height: 8px; + border-radius: 50%; + background-color: #D1D5DB; +} + +.status-dot.running { + background-color: var(--primary-color); + box-shadow: 0 0 0 2px rgba(37, 99, 235, 0.2); +} + +/* Main Content */ +.main-content { + flex: 1; + display: flex; + flex-direction: column; + height: 100vh; + overflow: hidden; + background-color: #FFFFFF; +} + +.header { + height: 64px; + border-bottom: 1px solid var(--border-color); + display: flex; + align-items: center; + padding: 0 2rem; + background-color: #FFFFFF; +} + +.header h2 { + font-size: 1.25rem; + font-weight: 600; +} + +.content-area { + flex: 1; + overflow-y: auto; + padding: 2rem; + background-color: #ffffff; +} + +/* Sections & Panel */ +.section { + display: none; + max-width: 1000px; + margin: 0 auto; +} + +.section.active { + display: block; +} + +.analysis-grid { + display: grid; + grid-template-columns: 350px 1fr; + gap: 2rem; + height: calc(100vh - 64px - 4rem); +} + +.panel { + background: #FFFFFF; + border: 1px solid var(--border-color); + border-radius: 0.5rem; + padding: 1.5rem; + display: flex; + flex-direction: column; + gap: 1.5rem; +} + +.panel-title { + font-size: 1rem; + font-weight: 600; + color: var(--text-primary); + margin-bottom: 0.5rem; + display: flex; + align-items: center; + justify-content: space-between; +} + +/* Forms */ +.form-group { + display: flex; + flex-direction: column; + gap: 0.5rem; +} + +.form-label { + font-size: 0.875rem; + font-weight: 500; + color: var(--text-secondary); +} + +.form-input, +.form-textarea { + padding: 0.625rem 0.875rem; + border: 1px solid var(--border-color); + border-radius: 0.375rem; + font-family: inherit; + font-size: 0.9rem; + color: var(--text-primary); + outline: none; + transition: border-color 0.2s; + width: 100%; +} + +.form-input:focus, +.form-textarea:focus { + border-color: var(--primary-color); + box-shadow: 0 0 0 2px rgba(37, 99, 235, 0.1); +} + +.form-textarea { + resize: vertical; + min-height: 100px; +} + +/* Buttons */ +.btn { + display: inline-flex; + align-items: center; + justify-content: center; + gap: 0.5rem; + padding: 0.625rem 1.25rem; + border-radius: 0.375rem; + font-weight: 500; + font-size: 0.9rem; + cursor: pointer; + transition: all 0.2s; + border: 1px solid transparent; +} + +.btn-primary { + background-color: var(--primary-color); + color: white; +} + +.btn-primary:hover { + background-color: var(--primary-hover); +} + +.btn-secondary { + background-color: white; + border-color: var(--border-color); + color: var(--text-primary); +} + +.btn-secondary:hover { + background-color: #F9FAFB; + border-color: #D1D5DB; +} + +.btn-sm { + padding: 0.375rem 0.75rem; + font-size: 0.875rem; +} + +/* Upload Area */ +.upload-area { + border: 2px dashed var(--border-color); + border-radius: 0.5rem; + padding: 2rem; + text-align: center; + cursor: pointer; + transition: all 0.2s; + background-color: #F9FAFB; +} + +.upload-area:hover, +.upload-area.dragover { + border-color: var(--primary-color); + background-color: #EFF6FF; +} + +.upload-icon { + font-size: 1.5rem; + color: var(--text-secondary); + margin-bottom: 0.75rem; +} + +.file-list { + margin-top: 1rem; + display: flex; + flex-direction: column; + gap: 0.5rem; +} + +.file-item { + display: flex; + align-items: center; + gap: 0.5rem; + font-size: 0.85rem; + color: var(--text-primary); + background: #FFFFFF; + padding: 0.5rem; + border: 1px solid var(--border-color); + border-radius: 0.25rem; +} + +/* Tabs */ +.tabs { + display: flex; + gap: 1rem; + margin-left: 1rem; +} + +.tab { + padding: 0.25rem 0.5rem; + font-size: 0.9rem; + color: var(--text-secondary); + cursor: pointer; + border-bottom: 2px solid transparent; + transition: all 0.2s; +} + +.tab:hover { + color: var(--text-primary); +} + +.tab.active { + color: var(--primary-color); + border-bottom-color: var(--primary-color); + font-weight: 500; +} + +/* Log & Report Content */ +.output-container { + flex: 1; + overflow-y: hidden; + /* Individual tabs scroll */ + background: #F9FAFB; + border: 1px solid var(--border-color); + border-radius: 0.375rem; + padding: 1rem; + position: relative; + display: flex; + flex-direction: column; +} + +#logsTab { + background-color: #1a1b26; + color: #a9b1d6; + font-family: 'JetBrains Mono', 'Menlo', 'Monaco', 'Courier New', monospace; + padding: 1.5rem; +} + +.log-content { + font-family: inherit; + font-size: 0.85rem; + white-space: pre-wrap; + line-height: 1.6; + margin: 0; +} + +.report-content { + font-size: 0.95rem; + line-height: 1.7; + color: #1F2937; +} + +.report-content img { + max-width: 100%; + border-radius: 0.375rem; + margin: 1rem 0; + box-shadow: var(--card-shadow); +} + +/* Empty State */ +.empty-state { + text-align: center; + padding: 4rem 2rem; + color: var(--text-secondary); +} + +/* Utilities */ +.hidden { + display: none !important; +} + +/* Gallery Carousel */ +.carousel-container { + position: relative; + width: 100%; + flex: 1; + display: flex; + align-items: center; + justify-content: center; + background: #F3F4F6; + border-radius: 0.5rem; + overflow: hidden; + margin-bottom: 1rem; +} + +.carousel-slide { + width: 100%; + height: 100%; + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; + padding: 2rem; +} + +.carousel-slide img { + max-width: 100%; + max-height: 500px; + object-fit: contain; + border-radius: 0.25rem; + box-shadow: 0 10px 15px -3px rgba(0, 0, 0, 0.1); + transition: transform 0.2s; + background: white; +} + +.carousel-btn { + position: absolute; + top: 50%; + transform: translateY(-50%); + background: rgba(255, 255, 255, 0.9); + border: 1px solid var(--border-color); + border-radius: 50%; + width: 44px; + height: 44px; + display: flex; + align-items: center; + justify-content: center; + cursor: pointer; + z-index: 10; + color: var(--text-primary); + box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); + transition: all 0.2s; +} + +.carousel-btn:hover { + background: var(--primary-color); + color: white; + border-color: var(--primary-color); + transform: translateY(-50%) scale(1.1); +} + +.carousel-btn.prev { + left: 1rem; +} + +.carousel-btn.next { + right: 1rem; +} + +.image-info { + width: 100%; + text-align: center; + color: var(--text-primary); + background: white; + padding: 1rem; + border-radius: 0.5rem; + border: 1px solid var(--border-color); +} + +.image-title { + font-weight: 600; + font-size: 1.1rem; + margin-bottom: 0.5rem; + color: var(--primary-color); +} + +.image-desc { + font-size: 0.9rem; + color: var(--text-secondary); +} \ No newline at end of file diff --git a/web/static/index.html b/web/static/index.html index fcfbd6f..c24e979 100644 --- a/web/static/index.html +++ b/web/static/index.html @@ -5,18 +5,20 @@