# -*- coding: utf-8 -*- """ Property-based tests for analysis-dashboard-redesign features. Uses hypothesis with max_examples=100 as specified in the design document. Run: python -m pytest tests/test_dashboard_properties.py -v """ import os import sys import re import json import tempfile # Ensure project root is on path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import pytest import pandas as pd from hypothesis import given, settings, assume from hypothesis import strategies as st from hypothesis.extra.pandas import column, data_frames, range_indexes # --------------------------------------------------------------------------- # Helpers / Strategies # --------------------------------------------------------------------------- # Strategy for generating random execution results (success or failure) execution_result_st = st.fixed_dictionaries({ "success": st.booleans(), "output": st.text(min_size=0, max_size=200), "error": st.text(min_size=0, max_size=200), "variables": st.just({}), "evidence_rows": st.lists( st.dictionaries( keys=st.text(min_size=1, max_size=10, alphabet="abcdefghijklmnopqrstuvwxyz"), values=st.one_of(st.integers(), st.text(min_size=0, max_size=20), st.floats(allow_nan=False)), min_size=1, max_size=5, ), min_size=0, max_size=10, ), "auto_exported_files": st.just([]), "prompt_saved_files": st.just([]), }) # Strategy for reasoning text (may be empty, simulating missing YAML field) reasoning_st = st.one_of(st.just(""), st.text(min_size=1, max_size=200)) # Strategy for code text code_st = st.text(min_size=1, max_size=500, alphabet="abcdefghijklmnopqrstuvwxyz0123456789 =()._\n") # Strategy for feedback/raw_log text feedback_st = st.text(min_size=0, max_size=300) def build_round_data(round_num, reasoning, code, result, feedback): """Construct a Round_Data dict the same way DataAnalysisAgent._handle_generate_code does.""" def summarize_result(r): if r.get("success"): evidence_rows = r.get("evidence_rows", []) if evidence_rows: num_rows = len(evidence_rows) num_cols = len(evidence_rows[0]) if evidence_rows else 0 return f"执行成功,输出 DataFrame ({num_rows}行×{num_cols}列)" output = r.get("output", "") if output: first_line = output.strip().split("\n")[0][:80] return f"执行成功: {first_line}" return "执行成功" else: error = r.get("error", "未知错误") if len(error) > 100: error = error[:100] + "..." return f"执行失败: {error}" return { "round": round_num, "reasoning": reasoning, "code": code, "result_summary": summarize_result(result), "evidence_rows": result.get("evidence_rows", []), "raw_log": feedback, "auto_exported_files": result.get("auto_exported_files", []), "prompt_saved_files": result.get("prompt_saved_files", []), } # Regex for parsing DATA_FILE_SAVED markers (same as CodeExecutor) _DATA_FILE_SAVED_RE = re.compile( r"\[DATA_FILE_SAVED\]\s*filename:\s*(.+?),\s*rows:\s*(\d+),\s*description:\s*(.+)" ) def parse_data_file_saved_markers(stdout_text): """Parse [DATA_FILE_SAVED] marker lines — mirrors CodeExecutor._parse_data_file_saved_markers.""" results = [] for line in stdout_text.splitlines(): m = _DATA_FILE_SAVED_RE.search(line) if m: results.append({ "filename": m.group(1).strip(), "rows": int(m.group(2)), "description": m.group(3).strip(), }) return results # Evidence annotation regex (same as web/main.py) _EVIDENCE_PATTERN = re.compile(r"") def split_report_to_paragraphs(markdown_content): """Mirrors _split_report_to_paragraphs from web/main.py.""" lines = markdown_content.split("\n") paragraphs = [] current_block = [] current_type = "text" para_id = 0 def flush_block(): nonlocal para_id, current_block, current_type text = "\n".join(current_block).strip() if text: paragraphs.append({ "id": f"p-{para_id}", "type": current_type, "content": text, }) para_id += 1 current_block = [] current_type = "text" in_table = False in_code = False for line in lines: stripped = line.strip() if stripped.startswith("```"): if in_code: current_block.append(line) flush_block() in_code = False continue else: flush_block() current_block.append(line) current_type = "code" in_code = True continue if in_code: current_block.append(line) continue if re.match(r"^#{1,6}\s", stripped): flush_block() current_block.append(line) current_type = "heading" flush_block() continue if re.match(r"^!\[.*\]\(.*\)", stripped): flush_block() current_block.append(line) current_type = "image" flush_block() continue if stripped.startswith("|"): if not in_table: flush_block() in_table = True current_type = "table" current_block.append(line) continue else: if in_table: flush_block() in_table = False if not stripped: flush_block() continue current_block.append(line) flush_block() return paragraphs def extract_evidence_annotations(paragraphs, rounds): """Mirrors _extract_evidence_annotations from web/main.py, using a rounds list instead of session.""" supporting_data = {} for para in paragraphs: content = para.get("content", "") match = _EVIDENCE_PATTERN.search(content) if match: round_num = int(match.group(1)) idx = round_num - 1 if 0 <= idx < len(rounds): evidence_rows = rounds[idx].get("evidence_rows", []) if evidence_rows: supporting_data[para["id"]] = evidence_rows return supporting_data # =========================================================================== # Property 1: Round_Data Structural Completeness (Task 16.1) # Feature: analysis-dashboard-redesign, Property 1: Round_Data structural completeness # Validates: Requirements 1.1, 1.3, 1.4 # =========================================================================== ROUND_DATA_REQUIRED_FIELDS = { "round": int, "reasoning": str, "code": str, "result_summary": str, "evidence_rows": list, "raw_log": str, } @settings(max_examples=100) @given( num_rounds=st.integers(min_value=1, max_value=20), results=st.lists(execution_result_st, min_size=1, max_size=20), reasonings=st.lists(reasoning_st, min_size=1, max_size=20), codes=st.lists(code_st, min_size=1, max_size=20), feedbacks=st.lists(feedback_st, min_size=1, max_size=20), ) def test_prop1_round_data_structural_completeness(num_rounds, results, reasonings, codes, feedbacks): """Round_Data objects must contain all required fields with correct types and preserve insertion order. **Validates: Requirements 1.1, 1.3, 1.4** """ # Build a list of rounds using the same number of entries count = min(num_rounds, len(results), len(reasonings), len(codes), len(feedbacks)) rounds_list = [] for i in range(count): rd = build_round_data(i + 1, reasonings[i], codes[i], results[i], feedbacks[i]) rounds_list.append(rd) # Verify all required fields present with correct types for rd in rounds_list: for field, expected_type in ROUND_DATA_REQUIRED_FIELDS.items(): assert field in rd, f"Missing field: {field}" assert isinstance(rd[field], expected_type), ( f"Field '{field}' expected {expected_type.__name__}, got {type(rd[field]).__name__}" ) # Verify insertion order preserved for i in range(len(rounds_list) - 1): assert rounds_list[i]["round"] <= rounds_list[i + 1]["round"], ( f"Insertion order violated: round {rounds_list[i]['round']} > {rounds_list[i + 1]['round']}" ) # =========================================================================== # Property 2: Evidence Capture Bounded (Task 16.2) # Feature: analysis-dashboard-redesign, Property 2: Evidence capture bounded # Validates: Requirements 4.1, 4.2, 4.3 # =========================================================================== # Strategy for generating random DataFrames with 0-10000 rows and 1-50 columns col_name_st = st.text( min_size=1, max_size=10, alphabet=st.sampled_from("abcdefghijklmnopqrstuvwxyz_"), ).filter(lambda s: s[0] != "_") # column names shouldn't start with _ @settings(max_examples=100) @given( num_rows=st.integers(min_value=0, max_value=10000), num_cols=st.integers(min_value=1, max_value=50), ) def test_prop2_evidence_capture_bounded(num_rows, num_cols): """Evidence capture must return at most 10 rows with keys matching DataFrame columns. **Validates: Requirements 4.1, 4.2, 4.3** """ # Generate a DataFrame with the given dimensions import numpy as np columns = [f"col_{i}" for i in range(num_cols)] if num_rows == 0: df = pd.DataFrame(columns=columns) else: data = np.random.randint(0, 100, size=(num_rows, num_cols)) df = pd.DataFrame(data, columns=columns) # Simulate the evidence capture logic: df.head(10).to_dict(orient='records') evidence_rows = df.head(10).to_dict(orient="records") # Verify length constraints assert len(evidence_rows) <= 10 assert len(evidence_rows) == min(10, len(df)) # Verify each row dict has keys matching the DataFrame's column names expected_keys = set(df.columns) for row in evidence_rows: assert set(row.keys()) == expected_keys # =========================================================================== # Property 3: Filename Deduplication (Task 16.3) # Feature: analysis-dashboard-redesign, Property 3: Filename deduplication # Validates: Requirements 5.3 # =========================================================================== @settings(max_examples=100) @given( num_exports=st.integers(min_value=1, max_value=20), var_name=st.text(min_size=1, max_size=20, alphabet="abcdefghijklmnopqrstuvwxyz_0123456789").filter( lambda s: s[0].isalpha() ), ) def test_prop3_filename_deduplication(num_exports, var_name): """All generated filenames from same-name exports must be unique. **Validates: Requirements 5.3** """ output_dir = tempfile.mkdtemp() generated_filenames = [] for _ in range(num_exports): # Simulate _export_dataframe dedup logic base_filename = f"{var_name}.csv" filepath = os.path.join(output_dir, base_filename) if os.path.exists(filepath): suffix = 1 while True: dedup_filename = f"{var_name}_{suffix}.csv" filepath = os.path.join(output_dir, dedup_filename) if not os.path.exists(filepath): base_filename = dedup_filename break suffix += 1 # Create the file to simulate the export with open(filepath, "w") as f: f.write("dummy") generated_filenames.append(base_filename) # Verify all filenames are unique assert len(generated_filenames) == len(set(generated_filenames)), ( f"Duplicate filenames found: {generated_filenames}" ) # =========================================================================== # Property 4: Auto-Export Metadata Completeness (Task 16.4) # Feature: analysis-dashboard-redesign, Property 4: Auto-export metadata completeness # Validates: Requirements 5.4, 5.5 # =========================================================================== @settings(max_examples=100) @given( var_name=st.text(min_size=1, max_size=20, alphabet="abcdefghijklmnopqrstuvwxyz_0123456789").filter( lambda s: s[0].isalpha() ), num_rows=st.integers(min_value=0, max_value=1000), num_cols=st.integers(min_value=1, max_value=50), ) def test_prop4_auto_export_metadata_completeness(var_name, num_rows, num_cols): """Auto-export metadata must contain all required fields with correct values. **Validates: Requirements 5.4, 5.5** """ import numpy as np output_dir = tempfile.mkdtemp() columns = [f"col_{i}" for i in range(num_cols)] if num_rows == 0: df = pd.DataFrame(columns=columns) else: data = np.random.randint(0, 100, size=(num_rows, num_cols)) df = pd.DataFrame(data, columns=columns) # Simulate _export_dataframe logic base_filename = f"{var_name}.csv" filepath = os.path.join(output_dir, base_filename) if os.path.exists(filepath): suffix = 1 while True: dedup_filename = f"{var_name}_{suffix}.csv" filepath = os.path.join(output_dir, dedup_filename) if not os.path.exists(filepath): base_filename = dedup_filename break suffix += 1 df.to_csv(filepath, index=False) metadata = { "variable_name": var_name, "filename": base_filename, "rows": len(df), "cols": len(df.columns), "columns": list(df.columns), } # Verify all required fields present for field in ("variable_name", "filename", "rows", "cols", "columns"): assert field in metadata, f"Missing field: {field}" # Verify values match the source DataFrame assert metadata["rows"] == len(df) assert metadata["cols"] == len(df.columns) assert metadata["columns"] == list(df.columns) assert metadata["variable_name"] == var_name # =========================================================================== # Property 5: DATA_FILE_SAVED Marker Parsing Round-Trip (Task 16.5) # Feature: analysis-dashboard-redesign, Property 5: DATA_FILE_SAVED marker parsing round-trip # Validates: Requirements 6.3 # =========================================================================== # Strategy for filenames: alphanumeric + Chinese + underscores + hyphens, with extension filename_base_st = st.text( min_size=1, max_size=30, alphabet=st.sampled_from( "abcdefghijklmnopqrstuvwxyz" "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "0123456789" "_-" "数据分析结果汇总报告" ), ).filter(lambda s: len(s.strip()) > 0 and "," not in s) filename_ext_st = st.sampled_from([".csv", ".xlsx"]) filename_st = st.builds(lambda base, ext: base.strip() + ext, filename_base_st, filename_ext_st) description_st = st.text( min_size=1, max_size=100, alphabet=st.sampled_from( "abcdefghijklmnopqrstuvwxyz" "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "0123456789 " "各类型问题聚合统计分析结果" ), ).filter(lambda s: len(s.strip()) > 0) @settings(max_examples=100) @given( filename=filename_st, rows=st.integers(min_value=1, max_value=1000000), description=description_st, ) def test_prop5_data_file_saved_marker_round_trip(filename, rows, description): """Formatting then parsing a DATA_FILE_SAVED marker must recover original values. **Validates: Requirements 6.3** """ # Format the marker marker = f"[DATA_FILE_SAVED] filename: {filename}, rows: {rows}, description: {description}" # Parse using the same logic as CodeExecutor parsed = parse_data_file_saved_markers(marker) assert len(parsed) == 1, f"Expected 1 parsed result, got {len(parsed)}" assert parsed[0]["filename"] == filename.strip() assert parsed[0]["rows"] == rows assert parsed[0]["description"] == description.strip() # =========================================================================== # Property 6: Data File Preview Bounded Rows (Task 16.6) # Feature: analysis-dashboard-redesign, Property 6: Data file preview bounded rows # Validates: Requirements 7.2 # =========================================================================== @settings(max_examples=100) @given( num_rows=st.integers(min_value=0, max_value=10000), num_cols=st.integers(min_value=1, max_value=50), ) def test_prop6_data_file_preview_bounded_rows(num_rows, num_cols): """Preview of a CSV file must return at most 5 rows with correct column names. **Validates: Requirements 7.2** """ import numpy as np columns = [f"col_{i}" for i in range(num_cols)] if num_rows == 0: df = pd.DataFrame(columns=columns) else: data = np.random.randint(0, 100, size=(num_rows, num_cols)) df = pd.DataFrame(data, columns=columns) # Write to a temp CSV file tmp_dir = tempfile.mkdtemp() csv_path = os.path.join(tmp_dir, "test_data.csv") df.to_csv(csv_path, index=False) # Read back using the same logic as the preview endpoint preview_df = pd.read_csv(csv_path, nrows=5) # Verify at most 5 rows assert len(preview_df) <= 5 assert len(preview_df) == min(5, num_rows) # Verify column names match exactly assert list(preview_df.columns) == columns # =========================================================================== # Property 7: Evidence Annotation Parsing (Task 16.7) # Feature: analysis-dashboard-redesign, Property 7: Evidence annotation parsing # Validates: Requirements 11.3, 11.4 # =========================================================================== # Strategy for generating paragraphs with/without evidence annotations annotated_paragraph_st = st.builds( lambda text, round_num: f"{text} ", st.text(min_size=1, max_size=100, alphabet="abcdefghijklmnopqrstuvwxyz .,!"), st.integers(min_value=1, max_value=100), ) plain_paragraph_st = st.text( min_size=1, max_size=100, alphabet="abcdefghijklmnopqrstuvwxyz .,!", ).filter(lambda s: "evidence:" not in s and len(s.strip()) > 0) @settings(max_examples=100) @given( annotated=st.lists(annotated_paragraph_st, min_size=0, max_size=10), plain=st.lists(plain_paragraph_st, min_size=0, max_size=10), ) def test_prop7_evidence_annotation_parsing(annotated, plain): """Annotated paragraphs must be correctly extracted; non-annotated must be excluded. **Validates: Requirements 11.3, 11.4** """ assume(len(annotated) + len(plain) > 0) # Build markdown by interleaving annotated and plain paragraphs all_paragraphs = [] for p in annotated: all_paragraphs.append(("annotated", p)) for p in plain: all_paragraphs.append(("plain", p)) # Build markdown content with blank lines between paragraphs markdown = "\n\n".join(text for _, text in all_paragraphs) # Parse into paragraphs paragraphs = split_report_to_paragraphs(markdown) # Build fake rounds data (up to 100 rounds, each with some evidence) rounds = [ {"evidence_rows": [{"key": f"value_{i}"}]} for i in range(100) ] # Extract evidence annotations supporting_data = extract_evidence_annotations(paragraphs, rounds) # Verify: annotated paragraphs with valid round numbers should be in supporting_data for para in paragraphs: content = para.get("content", "") match = _EVIDENCE_PATTERN.search(content) if match: round_num = int(match.group(1)) idx = round_num - 1 if 0 <= idx < len(rounds) and rounds[idx].get("evidence_rows"): assert para["id"] in supporting_data, ( f"Annotated paragraph {para['id']} with round {round_num} not in supporting_data" ) else: # Non-annotated paragraphs must NOT be in supporting_data assert para["id"] not in supporting_data, ( f"Non-annotated paragraph {para['id']} should not be in supporting_data" ) # =========================================================================== # Property 8: SessionData JSON Round-Trip (Task 16.8) # Feature: analysis-dashboard-redesign, Property 8: SessionData JSON round-trip # Validates: Requirements 12.4 # =========================================================================== # Strategy for Round_Data dicts round_data_st = st.fixed_dictionaries({ "round": st.integers(min_value=1, max_value=100), "reasoning": st.text(min_size=0, max_size=200), "code": st.text(min_size=0, max_size=200), "result_summary": st.text(min_size=0, max_size=200), "evidence_rows": st.lists( st.dictionaries( keys=st.text(min_size=1, max_size=10, alphabet="abcdefghijklmnopqrstuvwxyz"), values=st.one_of( st.integers(min_value=-1000, max_value=1000), st.text(min_size=0, max_size=20), ), min_size=0, max_size=5, ), min_size=0, max_size=10, ), "raw_log": st.text(min_size=0, max_size=200), }) # Strategy for file metadata dicts file_metadata_st = st.fixed_dictionaries({ "filename": st.text(min_size=1, max_size=30, alphabet="abcdefghijklmnopqrstuvwxyz0123456789_."), "description": st.text(min_size=0, max_size=100), "rows": st.integers(min_value=0, max_value=100000), "cols": st.integers(min_value=0, max_value=100), "columns": st.lists(st.text(min_size=1, max_size=10, alphabet="abcdefghijklmnopqrstuvwxyz"), max_size=10), "size_bytes": st.integers(min_value=0, max_value=10000000), "source": st.sampled_from(["auto", "prompt"]), }) @settings(max_examples=100) @given( rounds=st.lists(round_data_st, min_size=0, max_size=20), data_files=st.lists(file_metadata_st, min_size=0, max_size=20), ) def test_prop8_session_data_json_round_trip(rounds, data_files): """Serializing rounds and data_files to JSON and back must produce equal data. **Validates: Requirements 12.4** """ data = { "rounds": rounds, "data_files": data_files, } # Serialize using the same approach as the codebase serialized = json.dumps(data, default=str) deserialized = json.loads(serialized) assert deserialized["rounds"] == rounds assert deserialized["data_files"] == data_files