Files
iov_data_analysis_agent/tests/test_dashboard_properties.py

652 lines
22 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
Property-based tests for analysis-dashboard-redesign features.
Uses hypothesis with max_examples=100 as specified in the design document.
Run: python -m pytest tests/test_dashboard_properties.py -v
"""
import os
import sys
import re
import json
import tempfile
# Ensure project root is on path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
import pandas as pd
from hypothesis import given, settings, assume
from hypothesis import strategies as st
from hypothesis.extra.pandas import column, data_frames, range_indexes
# ---------------------------------------------------------------------------
# Helpers / Strategies
# ---------------------------------------------------------------------------
# Strategy for generating random execution results (success or failure)
execution_result_st = st.fixed_dictionaries({
"success": st.booleans(),
"output": st.text(min_size=0, max_size=200),
"error": st.text(min_size=0, max_size=200),
"variables": st.just({}),
"evidence_rows": st.lists(
st.dictionaries(
keys=st.text(min_size=1, max_size=10, alphabet="abcdefghijklmnopqrstuvwxyz"),
values=st.one_of(st.integers(), st.text(min_size=0, max_size=20), st.floats(allow_nan=False)),
min_size=1,
max_size=5,
),
min_size=0,
max_size=10,
),
"auto_exported_files": st.just([]),
"prompt_saved_files": st.just([]),
})
# Strategy for reasoning text (may be empty, simulating missing YAML field)
reasoning_st = st.one_of(st.just(""), st.text(min_size=1, max_size=200))
# Strategy for code text
code_st = st.text(min_size=1, max_size=500, alphabet="abcdefghijklmnopqrstuvwxyz0123456789 =()._\n")
# Strategy for feedback/raw_log text
feedback_st = st.text(min_size=0, max_size=300)
def build_round_data(round_num, reasoning, code, result, feedback):
"""Construct a Round_Data dict the same way DataAnalysisAgent._handle_generate_code does."""
def summarize_result(r):
if r.get("success"):
evidence_rows = r.get("evidence_rows", [])
if evidence_rows:
num_rows = len(evidence_rows)
num_cols = len(evidence_rows[0]) if evidence_rows else 0
return f"执行成功,输出 DataFrame ({num_rows}行×{num_cols}列)"
output = r.get("output", "")
if output:
first_line = output.strip().split("\n")[0][:80]
return f"执行成功: {first_line}"
return "执行成功"
else:
error = r.get("error", "未知错误")
if len(error) > 100:
error = error[:100] + "..."
return f"执行失败: {error}"
return {
"round": round_num,
"reasoning": reasoning,
"code": code,
"result_summary": summarize_result(result),
"evidence_rows": result.get("evidence_rows", []),
"raw_log": feedback,
"auto_exported_files": result.get("auto_exported_files", []),
"prompt_saved_files": result.get("prompt_saved_files", []),
}
# Regex for parsing DATA_FILE_SAVED markers (same as CodeExecutor)
_DATA_FILE_SAVED_RE = re.compile(
r"\[DATA_FILE_SAVED\]\s*filename:\s*(.+?),\s*rows:\s*(\d+),\s*description:\s*(.+)"
)
def parse_data_file_saved_markers(stdout_text):
"""Parse [DATA_FILE_SAVED] marker lines — mirrors CodeExecutor._parse_data_file_saved_markers."""
results = []
for line in stdout_text.splitlines():
m = _DATA_FILE_SAVED_RE.search(line)
if m:
results.append({
"filename": m.group(1).strip(),
"rows": int(m.group(2)),
"description": m.group(3).strip(),
})
return results
# Evidence annotation regex (same as web/main.py)
_EVIDENCE_PATTERN = re.compile(r"<!--\s*evidence:round_(\d+)\s*-->")
def split_report_to_paragraphs(markdown_content):
"""Mirrors _split_report_to_paragraphs from web/main.py."""
lines = markdown_content.split("\n")
paragraphs = []
current_block = []
current_type = "text"
para_id = 0
def flush_block():
nonlocal para_id, current_block, current_type
text = "\n".join(current_block).strip()
if text:
paragraphs.append({
"id": f"p-{para_id}",
"type": current_type,
"content": text,
})
para_id += 1
current_block = []
current_type = "text"
in_table = False
in_code = False
for line in lines:
stripped = line.strip()
if stripped.startswith("```"):
if in_code:
current_block.append(line)
flush_block()
in_code = False
continue
else:
flush_block()
current_block.append(line)
current_type = "code"
in_code = True
continue
if in_code:
current_block.append(line)
continue
if re.match(r"^#{1,6}\s", stripped):
flush_block()
current_block.append(line)
current_type = "heading"
flush_block()
continue
if re.match(r"^!\[.*\]\(.*\)", stripped):
flush_block()
current_block.append(line)
current_type = "image"
flush_block()
continue
if stripped.startswith("|"):
if not in_table:
flush_block()
in_table = True
current_type = "table"
current_block.append(line)
continue
else:
if in_table:
flush_block()
in_table = False
if not stripped:
flush_block()
continue
current_block.append(line)
flush_block()
return paragraphs
def extract_evidence_annotations(paragraphs, rounds):
"""Mirrors _extract_evidence_annotations from web/main.py, using a rounds list instead of session."""
supporting_data = {}
for para in paragraphs:
content = para.get("content", "")
match = _EVIDENCE_PATTERN.search(content)
if match:
round_num = int(match.group(1))
idx = round_num - 1
if 0 <= idx < len(rounds):
evidence_rows = rounds[idx].get("evidence_rows", [])
if evidence_rows:
supporting_data[para["id"]] = evidence_rows
return supporting_data
# ===========================================================================
# Property 1: Round_Data Structural Completeness (Task 16.1)
# Feature: analysis-dashboard-redesign, Property 1: Round_Data structural completeness
# Validates: Requirements 1.1, 1.3, 1.4
# ===========================================================================
ROUND_DATA_REQUIRED_FIELDS = {
"round": int,
"reasoning": str,
"code": str,
"result_summary": str,
"evidence_rows": list,
"raw_log": str,
}
@settings(max_examples=100)
@given(
num_rounds=st.integers(min_value=1, max_value=20),
results=st.lists(execution_result_st, min_size=1, max_size=20),
reasonings=st.lists(reasoning_st, min_size=1, max_size=20),
codes=st.lists(code_st, min_size=1, max_size=20),
feedbacks=st.lists(feedback_st, min_size=1, max_size=20),
)
def test_prop1_round_data_structural_completeness(num_rounds, results, reasonings, codes, feedbacks):
"""Round_Data objects must contain all required fields with correct types and preserve insertion order.
**Validates: Requirements 1.1, 1.3, 1.4**
"""
# Build a list of rounds using the same number of entries
count = min(num_rounds, len(results), len(reasonings), len(codes), len(feedbacks))
rounds_list = []
for i in range(count):
rd = build_round_data(i + 1, reasonings[i], codes[i], results[i], feedbacks[i])
rounds_list.append(rd)
# Verify all required fields present with correct types
for rd in rounds_list:
for field, expected_type in ROUND_DATA_REQUIRED_FIELDS.items():
assert field in rd, f"Missing field: {field}"
assert isinstance(rd[field], expected_type), (
f"Field '{field}' expected {expected_type.__name__}, got {type(rd[field]).__name__}"
)
# Verify insertion order preserved
for i in range(len(rounds_list) - 1):
assert rounds_list[i]["round"] <= rounds_list[i + 1]["round"], (
f"Insertion order violated: round {rounds_list[i]['round']} > {rounds_list[i + 1]['round']}"
)
# ===========================================================================
# Property 2: Evidence Capture Bounded (Task 16.2)
# Feature: analysis-dashboard-redesign, Property 2: Evidence capture bounded
# Validates: Requirements 4.1, 4.2, 4.3
# ===========================================================================
# Strategy for generating random DataFrames with 0-10000 rows and 1-50 columns
col_name_st = st.text(
min_size=1, max_size=10,
alphabet=st.sampled_from("abcdefghijklmnopqrstuvwxyz_"),
).filter(lambda s: s[0] != "_") # column names shouldn't start with _
@settings(max_examples=100)
@given(
num_rows=st.integers(min_value=0, max_value=10000),
num_cols=st.integers(min_value=1, max_value=50),
)
def test_prop2_evidence_capture_bounded(num_rows, num_cols):
"""Evidence capture must return at most 10 rows with keys matching DataFrame columns.
**Validates: Requirements 4.1, 4.2, 4.3**
"""
# Generate a DataFrame with the given dimensions
import numpy as np
columns = [f"col_{i}" for i in range(num_cols)]
if num_rows == 0:
df = pd.DataFrame(columns=columns)
else:
data = np.random.randint(0, 100, size=(num_rows, num_cols))
df = pd.DataFrame(data, columns=columns)
# Simulate the evidence capture logic: df.head(10).to_dict(orient='records')
evidence_rows = df.head(10).to_dict(orient="records")
# Verify length constraints
assert len(evidence_rows) <= 10
assert len(evidence_rows) == min(10, len(df))
# Verify each row dict has keys matching the DataFrame's column names
expected_keys = set(df.columns)
for row in evidence_rows:
assert set(row.keys()) == expected_keys
# ===========================================================================
# Property 3: Filename Deduplication (Task 16.3)
# Feature: analysis-dashboard-redesign, Property 3: Filename deduplication
# Validates: Requirements 5.3
# ===========================================================================
@settings(max_examples=100)
@given(
num_exports=st.integers(min_value=1, max_value=20),
var_name=st.text(min_size=1, max_size=20, alphabet="abcdefghijklmnopqrstuvwxyz_0123456789").filter(
lambda s: s[0].isalpha()
),
)
def test_prop3_filename_deduplication(num_exports, var_name):
"""All generated filenames from same-name exports must be unique.
**Validates: Requirements 5.3**
"""
output_dir = tempfile.mkdtemp()
generated_filenames = []
for _ in range(num_exports):
# Simulate _export_dataframe dedup logic
base_filename = f"{var_name}.csv"
filepath = os.path.join(output_dir, base_filename)
if os.path.exists(filepath):
suffix = 1
while True:
dedup_filename = f"{var_name}_{suffix}.csv"
filepath = os.path.join(output_dir, dedup_filename)
if not os.path.exists(filepath):
base_filename = dedup_filename
break
suffix += 1
# Create the file to simulate the export
with open(filepath, "w") as f:
f.write("dummy")
generated_filenames.append(base_filename)
# Verify all filenames are unique
assert len(generated_filenames) == len(set(generated_filenames)), (
f"Duplicate filenames found: {generated_filenames}"
)
# ===========================================================================
# Property 4: Auto-Export Metadata Completeness (Task 16.4)
# Feature: analysis-dashboard-redesign, Property 4: Auto-export metadata completeness
# Validates: Requirements 5.4, 5.5
# ===========================================================================
@settings(max_examples=100)
@given(
var_name=st.text(min_size=1, max_size=20, alphabet="abcdefghijklmnopqrstuvwxyz_0123456789").filter(
lambda s: s[0].isalpha()
),
num_rows=st.integers(min_value=0, max_value=1000),
num_cols=st.integers(min_value=1, max_value=50),
)
def test_prop4_auto_export_metadata_completeness(var_name, num_rows, num_cols):
"""Auto-export metadata must contain all required fields with correct values.
**Validates: Requirements 5.4, 5.5**
"""
import numpy as np
output_dir = tempfile.mkdtemp()
columns = [f"col_{i}" for i in range(num_cols)]
if num_rows == 0:
df = pd.DataFrame(columns=columns)
else:
data = np.random.randint(0, 100, size=(num_rows, num_cols))
df = pd.DataFrame(data, columns=columns)
# Simulate _export_dataframe logic
base_filename = f"{var_name}.csv"
filepath = os.path.join(output_dir, base_filename)
if os.path.exists(filepath):
suffix = 1
while True:
dedup_filename = f"{var_name}_{suffix}.csv"
filepath = os.path.join(output_dir, dedup_filename)
if not os.path.exists(filepath):
base_filename = dedup_filename
break
suffix += 1
df.to_csv(filepath, index=False)
metadata = {
"variable_name": var_name,
"filename": base_filename,
"rows": len(df),
"cols": len(df.columns),
"columns": list(df.columns),
}
# Verify all required fields present
for field in ("variable_name", "filename", "rows", "cols", "columns"):
assert field in metadata, f"Missing field: {field}"
# Verify values match the source DataFrame
assert metadata["rows"] == len(df)
assert metadata["cols"] == len(df.columns)
assert metadata["columns"] == list(df.columns)
assert metadata["variable_name"] == var_name
# ===========================================================================
# Property 5: DATA_FILE_SAVED Marker Parsing Round-Trip (Task 16.5)
# Feature: analysis-dashboard-redesign, Property 5: DATA_FILE_SAVED marker parsing round-trip
# Validates: Requirements 6.3
# ===========================================================================
# Strategy for filenames: alphanumeric + Chinese + underscores + hyphens, with extension
filename_base_st = st.text(
min_size=1,
max_size=30,
alphabet=st.sampled_from(
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"0123456789"
"_-"
"数据分析结果汇总报告"
),
).filter(lambda s: len(s.strip()) > 0 and "," not in s)
filename_ext_st = st.sampled_from([".csv", ".xlsx"])
filename_st = st.builds(lambda base, ext: base.strip() + ext, filename_base_st, filename_ext_st)
description_st = st.text(
min_size=1,
max_size=100,
alphabet=st.sampled_from(
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"0123456789 "
"各类型问题聚合统计分析结果"
),
).filter(lambda s: len(s.strip()) > 0)
@settings(max_examples=100)
@given(
filename=filename_st,
rows=st.integers(min_value=1, max_value=1000000),
description=description_st,
)
def test_prop5_data_file_saved_marker_round_trip(filename, rows, description):
"""Formatting then parsing a DATA_FILE_SAVED marker must recover original values.
**Validates: Requirements 6.3**
"""
# Format the marker
marker = f"[DATA_FILE_SAVED] filename: {filename}, rows: {rows}, description: {description}"
# Parse using the same logic as CodeExecutor
parsed = parse_data_file_saved_markers(marker)
assert len(parsed) == 1, f"Expected 1 parsed result, got {len(parsed)}"
assert parsed[0]["filename"] == filename.strip()
assert parsed[0]["rows"] == rows
assert parsed[0]["description"] == description.strip()
# ===========================================================================
# Property 6: Data File Preview Bounded Rows (Task 16.6)
# Feature: analysis-dashboard-redesign, Property 6: Data file preview bounded rows
# Validates: Requirements 7.2
# ===========================================================================
@settings(max_examples=100)
@given(
num_rows=st.integers(min_value=0, max_value=10000),
num_cols=st.integers(min_value=1, max_value=50),
)
def test_prop6_data_file_preview_bounded_rows(num_rows, num_cols):
"""Preview of a CSV file must return at most 5 rows with correct column names.
**Validates: Requirements 7.2**
"""
import numpy as np
columns = [f"col_{i}" for i in range(num_cols)]
if num_rows == 0:
df = pd.DataFrame(columns=columns)
else:
data = np.random.randint(0, 100, size=(num_rows, num_cols))
df = pd.DataFrame(data, columns=columns)
# Write to a temp CSV file
tmp_dir = tempfile.mkdtemp()
csv_path = os.path.join(tmp_dir, "test_data.csv")
df.to_csv(csv_path, index=False)
# Read back using the same logic as the preview endpoint
preview_df = pd.read_csv(csv_path, nrows=5)
# Verify at most 5 rows
assert len(preview_df) <= 5
assert len(preview_df) == min(5, num_rows)
# Verify column names match exactly
assert list(preview_df.columns) == columns
# ===========================================================================
# Property 7: Evidence Annotation Parsing (Task 16.7)
# Feature: analysis-dashboard-redesign, Property 7: Evidence annotation parsing
# Validates: Requirements 11.3, 11.4
# ===========================================================================
# Strategy for generating paragraphs with/without evidence annotations
annotated_paragraph_st = st.builds(
lambda text, round_num: f"{text} <!-- evidence:round_{round_num} -->",
st.text(min_size=1, max_size=100, alphabet="abcdefghijklmnopqrstuvwxyz .,!"),
st.integers(min_value=1, max_value=100),
)
plain_paragraph_st = st.text(
min_size=1,
max_size=100,
alphabet="abcdefghijklmnopqrstuvwxyz .,!",
).filter(lambda s: "evidence:" not in s and len(s.strip()) > 0)
@settings(max_examples=100)
@given(
annotated=st.lists(annotated_paragraph_st, min_size=0, max_size=10),
plain=st.lists(plain_paragraph_st, min_size=0, max_size=10),
)
def test_prop7_evidence_annotation_parsing(annotated, plain):
"""Annotated paragraphs must be correctly extracted; non-annotated must be excluded.
**Validates: Requirements 11.3, 11.4**
"""
assume(len(annotated) + len(plain) > 0)
# Build markdown by interleaving annotated and plain paragraphs
all_paragraphs = []
for p in annotated:
all_paragraphs.append(("annotated", p))
for p in plain:
all_paragraphs.append(("plain", p))
# Build markdown content with blank lines between paragraphs
markdown = "\n\n".join(text for _, text in all_paragraphs)
# Parse into paragraphs
paragraphs = split_report_to_paragraphs(markdown)
# Build fake rounds data (up to 100 rounds, each with some evidence)
rounds = [
{"evidence_rows": [{"key": f"value_{i}"}]}
for i in range(100)
]
# Extract evidence annotations
supporting_data = extract_evidence_annotations(paragraphs, rounds)
# Verify: annotated paragraphs with valid round numbers should be in supporting_data
for para in paragraphs:
content = para.get("content", "")
match = _EVIDENCE_PATTERN.search(content)
if match:
round_num = int(match.group(1))
idx = round_num - 1
if 0 <= idx < len(rounds) and rounds[idx].get("evidence_rows"):
assert para["id"] in supporting_data, (
f"Annotated paragraph {para['id']} with round {round_num} not in supporting_data"
)
else:
# Non-annotated paragraphs must NOT be in supporting_data
assert para["id"] not in supporting_data, (
f"Non-annotated paragraph {para['id']} should not be in supporting_data"
)
# ===========================================================================
# Property 8: SessionData JSON Round-Trip (Task 16.8)
# Feature: analysis-dashboard-redesign, Property 8: SessionData JSON round-trip
# Validates: Requirements 12.4
# ===========================================================================
# Strategy for Round_Data dicts
round_data_st = st.fixed_dictionaries({
"round": st.integers(min_value=1, max_value=100),
"reasoning": st.text(min_size=0, max_size=200),
"code": st.text(min_size=0, max_size=200),
"result_summary": st.text(min_size=0, max_size=200),
"evidence_rows": st.lists(
st.dictionaries(
keys=st.text(min_size=1, max_size=10, alphabet="abcdefghijklmnopqrstuvwxyz"),
values=st.one_of(
st.integers(min_value=-1000, max_value=1000),
st.text(min_size=0, max_size=20),
),
min_size=0,
max_size=5,
),
min_size=0,
max_size=10,
),
"raw_log": st.text(min_size=0, max_size=200),
})
# Strategy for file metadata dicts
file_metadata_st = st.fixed_dictionaries({
"filename": st.text(min_size=1, max_size=30, alphabet="abcdefghijklmnopqrstuvwxyz0123456789_."),
"description": st.text(min_size=0, max_size=100),
"rows": st.integers(min_value=0, max_value=100000),
"cols": st.integers(min_value=0, max_value=100),
"columns": st.lists(st.text(min_size=1, max_size=10, alphabet="abcdefghijklmnopqrstuvwxyz"), max_size=10),
"size_bytes": st.integers(min_value=0, max_value=10000000),
"source": st.sampled_from(["auto", "prompt"]),
})
@settings(max_examples=100)
@given(
rounds=st.lists(round_data_st, min_size=0, max_size=20),
data_files=st.lists(file_metadata_st, min_size=0, max_size=20),
)
def test_prop8_session_data_json_round_trip(rounds, data_files):
"""Serializing rounds and data_files to JSON and back must produce equal data.
**Validates: Requirements 12.4**
"""
data = {
"rounds": rounds,
"data_files": data_files,
}
# Serialize using the same approach as the codebase
serialized = json.dumps(data, default=str)
deserialized = json.loads(serialized)
assert deserialized["rounds"] == rounds
assert deserialized["data_files"] == data_files