Files
iov_data_analysis_agent/utils/data_privacy.py

226 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
数据隐私保护层
核心原则:发给外部 LLM 的信息只包含 schema 级别的元数据,
绝不包含真实数据值。所有真实数据仅在本地代码执行环境中使用。
分级策略:
- SAFE安全级: 可发送给 LLM — 列名、数据类型、行列数、空值率、唯一值数量
- LOCAL本地级: 仅本地使用 — 真实数据值、TOP N 高频值、统计数值、样本行
"""
import re
import pandas as pd
from typing import List, Optional
def build_safe_profile(file_paths: list) -> str:
"""
生成可安全发送给外部 LLM 的数据画像。
只包含 schema 信息,不包含任何真实数据值。
Args:
file_paths: 数据文件路径列表
Returns:
安全的 Markdown 格式数据画像
"""
import os
profile = "# 数据结构概览 (Schema Profile)\n\n"
if not file_paths:
return profile + "未提供数据文件。"
for file_path in file_paths:
file_name = os.path.basename(file_path)
profile += f"## 文件: {file_name}\n\n"
if not os.path.exists(file_path):
profile += f"[WARN] 文件不存在: {file_path}\n\n"
continue
try:
df = _load_dataframe(file_path)
if df is None:
continue
rows, cols = df.shape
profile += f"- **维度**: {rows} 行 x {cols}\n"
profile += f"- **列名**: `{', '.join(df.columns)}`\n\n"
profile += "### 列结构:\n\n"
profile += "| 列名 | 数据类型 | 空值率 | 唯一值数 | 特征描述 |\n"
profile += "|------|---------|--------|---------|----------|\n"
for col in df.columns:
dtype = str(df[col].dtype)
null_count = df[col].isnull().sum()
null_pct = f"{(null_count / rows) * 100:.1f}%" if rows > 0 else "0%"
unique_count = df[col].nunique()
# 特征描述:只描述数据特征,不暴露具体值
feature_desc = _describe_column_safe(df[col], unique_count, rows)
profile += f"| {col} | {dtype} | {null_pct} | {unique_count} | {feature_desc} |\n"
profile += "\n"
except Exception as e:
profile += f"[ERROR] 读取文件失败: {str(e)}\n\n"
return profile
def build_local_profile(file_paths: list) -> str:
"""
生成完整的本地数据画像(包含真实数据值)。
仅用于本地代码执行环境,不发送给 LLM。
这是原来 load_and_profile_data 的功能,保留完整信息。
"""
from utils.data_loader import load_and_profile_data
return load_and_profile_data(file_paths)
def sanitize_execution_feedback(feedback: str, max_lines: int = 30) -> str:
"""
对代码执行反馈进行脱敏处理,移除可能包含真实数据的内容。
保留:
- 执行状态(成功/失败)
- 错误信息
- DataFrame 的 shape 信息
- 图片保存路径
- 列名信息
移除/截断:
- 具体的数据行DataFrame 输出)
- 大段的数值输出
Args:
feedback: 原始执行反馈
max_lines: 最大保留行数
Returns:
脱敏后的反馈
"""
if not feedback:
return feedback
lines = feedback.split("\n")
safe_lines = []
in_dataframe_output = False
df_line_count = 0
for line in lines:
stripped = line.strip()
# 始终保留的关键信息
if any(kw in stripped for kw in [
"图片已保存", "保存至", "[OK]", "[WARN]", "[ERROR]",
"[Auto-Save]", "数据表形状", "列名:", ".png",
"shape", "columns", "dtype", "info()", "describe()",
]):
safe_lines.append(line)
in_dataframe_output = False
continue
# 检测 DataFrame 输出的开始(通常有列头行)
if _looks_like_dataframe_row(stripped):
if not in_dataframe_output:
in_dataframe_output = True
df_line_count = 0
safe_lines.append("[数据输出已省略 - 数据仅在本地执行环境中可见]")
df_line_count += 1
continue
# 检测纯数值行
if _is_numeric_heavy_line(stripped):
if not in_dataframe_output:
in_dataframe_output = True
safe_lines.append("[数值输出已省略]")
continue
# 普通文本行
in_dataframe_output = False
safe_lines.append(line)
# 限制总行数
if len(safe_lines) > max_lines:
safe_lines = safe_lines[:max_lines]
safe_lines.append(f"[... 输出已截断,共 {len(lines)} 行]")
return "\n".join(safe_lines)
def _load_dataframe(file_path: str):
"""加载 DataFrame支持多种格式和编码"""
import os
ext = os.path.splitext(file_path)[1].lower()
if ext == ".csv":
for encoding in ["utf-8", "gbk", "gb18030", "latin1"]:
try:
return pd.read_csv(file_path, encoding=encoding)
except (UnicodeDecodeError, Exception):
continue
elif ext in [".xlsx", ".xls"]:
try:
return pd.read_excel(file_path)
except Exception:
pass
return None
def _describe_column_safe(series: pd.Series, unique_count: int, total_rows: int) -> str:
"""安全地描述列特征,不暴露具体值"""
dtype = series.dtype
if pd.api.types.is_numeric_dtype(dtype):
if unique_count <= 5:
return "低基数数值(可能是分类编码)"
elif unique_count < total_rows * 0.05:
return "离散数值"
else:
return "连续数值"
if pd.api.types.is_datetime64_any_dtype(dtype):
return "时间序列"
# 文本/分类列
if unique_count == 1:
return "单一值(常量列)"
elif unique_count <= 10:
return f"低基数分类({unique_count}类)"
elif unique_count <= 50:
return f"中基数分类({unique_count}类)"
elif unique_count > total_rows * 0.8:
return "高基数文本可能是ID或描述"
else:
return f"文本分类({unique_count}类)"
def _looks_like_dataframe_row(line: str) -> bool:
"""判断一行是否看起来像 DataFrame 输出"""
if not line:
return False
# DataFrame 输出通常有多个空格分隔的列
parts = line.split()
if len(parts) >= 3:
# 第一个元素是索引(数字)
try:
int(parts[0])
return True
except ValueError:
pass
return False
def _is_numeric_heavy_line(line: str) -> bool:
"""判断一行是否主要由数值组成"""
if not line or len(line) < 5:
return False
digits_and_dots = sum(1 for c in line if c.isdigit() or c in ".,-+eE ")
return digits_and_dots / len(line) > 0.7