"""One-time 1.25 GB stress test for the analyzer + gate pipeline. Not part of the automated suite. Generates a synthetic messy CSV at the target size, runs every pipeline stage end-to-end, captures wall-clock + peak RSS at each stage, and prints a summary. Run: python scripts/stress_1_25gb.py [--keep] [--target-gb 1.25] The generated file lives in $TMPDIR (default /tmp). With --keep the file is not deleted after the run (useful for re-runs without regenerating). """ from __future__ import annotations import argparse import gc import io import os import resource import sys import time import tracemalloc from contextlib import contextmanager from pathlib import Path from typing import Iterator # Project root sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) # --------------------------------------------------------------------------- # Generator — designed so every analyzer detector finds something. # --------------------------------------------------------------------------- # 10 columns × ~180 bytes/row ≈ 1.25 GB at 7M rows. Tune via --target-gb. HEADER = ( "id,Name ,\"email​\",phone,city,notes,status,amount,date,“description”\n" ) NAMES = [ "Alice Smith", " Bob Jones ", "Carol O’Connor", "David Lee", "Eva Garcia", "Frank Miller", "Grace Kim", "Henry Davis", "Iris Wong", ] EMAILS = [ "alice@example.com", "BOB@Example.COM", "carol@example.com", "DAVID@example.com", "eva@Example.com", "Frank@example.COM", ] CITIES = ["New York", "Köln", "São Paulo", "Zürich", "Düsseldorf", "Madrid", "Tokyo"] NOTES = [ "VIP — contact ASAP…", "regular customer", "follow up next quarter", "needs “signed” agreement", "5′ 11″ height noted", "nice client", ] STATUSES = ["active", "N/A", "TBD", "", "active", "unknown", "active"] AMOUNTS = ["$1,500.00", "100.00", "250.50", "$50.00", "1,234.56", "75.00"] DATES = ["2024-01-15", "2024-02-20", "2024-03-12", "2024-04-08", "2024-05-30"] DESCRIPTIONS = [ "first ‘contact’ made", "long-time client", "referred by partner", "premium support tier", ] def gen_chunk(start_id: int, n_rows: int) -> str: """Build n_rows rows as a single string (cheap append to BufferedWriter).""" out = [] for i in range(n_rows): rid = start_id + i name = NAMES[rid % len(NAMES)] email = EMAILS[rid % len(EMAILS)] city = CITIES[rid % len(CITIES)] note = NOTES[rid % len(NOTES)] status = STATUSES[rid % len(STATUSES)] amount = AMOUNTS[rid % len(AMOUNTS)] date = DATES[rid % len(DATES)] desc = DESCRIPTIONS[rid % len(DESCRIPTIONS)] # Note: 'amount' contains a comma — quoted to avoid breaking CSV. out.append( f"{rid},{name},{email},555-0{rid % 10000:04d},{city}," f"{note},{status},\"{amount}\",{date},{desc}\n" ) return "".join(out) def generate_file(path: Path, target_gb: float) -> None: """Stream the synthetic CSV to disk in chunks until target size hit.""" target_bytes = int(target_gb * 1024**3) rows_per_chunk = 50_000 total_rows = 0 written = 0 print(f" writing → {path} (target {target_gb} GB)") t0 = time.perf_counter() with path.open("w", encoding="utf-8", newline="") as fh: fh.write(HEADER) written += len(HEADER.encode("utf-8")) while written < target_bytes: chunk = gen_chunk(total_rows, rows_per_chunk) fh.write(chunk) written += len(chunk.encode("utf-8")) total_rows += rows_per_chunk if total_rows % 1_000_000 == 0: print( f" …{total_rows:,} rows, " f"{written / 1024**3:.2f} GB in " f"{time.perf_counter() - t0:.1f}s" ) print( f" done: {total_rows:,} rows, " f"{written / 1024**3:.2f} GB in " f"{time.perf_counter() - t0:.1f}s" ) # --------------------------------------------------------------------------- # Stage runner — captures wall-clock + peak RSS delta per stage. # --------------------------------------------------------------------------- def _rss_mb() -> float: """Current process RSS in MB.""" return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 @contextmanager def stage(name: str, results: list) -> Iterator[None]: gc.collect() rss_before = _rss_mb() t0 = time.perf_counter() err: Exception | None = None try: yield except Exception as e: err = e finally: wall = time.perf_counter() - t0 rss_after = _rss_mb() results.append({ "stage": name, "wall_s": wall, "rss_before_mb": rss_before, "rss_after_mb": rss_after, "rss_delta_mb": rss_after - rss_before, "error": repr(err) if err else "", }) print( f" {name:<42} {wall:>8.2f}s " f"RSS {rss_before:>7.0f} → {rss_after:>7.0f} MB " f"(Δ {rss_after - rss_before:+.0f})" + (f" ERROR {err!r}" if err else "") ) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--target-gb", type=float, default=1.25) ap.add_argument("--keep", action="store_true", help="Don't delete the test file at the end.") ap.add_argument("--skip-generate", action="store_true", help="Reuse an existing file at the target path.") args = ap.parse_args() tmp = Path(os.environ.get("TMPDIR", "/tmp")) path = tmp / f"stress_{args.target_gb}gb.csv" print(f"=== Stress test: {args.target_gb} GB ===") print(f"Path: {path}") print() if not args.skip_generate or not path.exists(): print("[1/2] Generating fixture") gen_t0 = time.perf_counter() generate_file(path, args.target_gb) gen_wall = time.perf_counter() - gen_t0 print(f"Generation total: {gen_wall:.1f}s") print() else: print(f"[1/2] Reusing existing fixture ({path.stat().st_size / 1024**3:.2f} GB)") print() actual_gb = path.stat().st_size / 1024**3 print(f"[2/2] Pipeline run on {actual_gb:.2f} GB file") print(f" {'stage':<42} {'wall':>9} {'RSS':>23}") results: list[dict] = [] from src.core.io import detect_encoding, detect_delimiter, repair_bytes from src.core.analyze import analyze, _load_for_analysis from src.core.normalize import auto_fix, apply_decisions, is_normalized with stage("detect_encoding", results): enc = detect_encoding(path) enc_used = enc if not results[-1]["error"] else "utf-8" print(f" detected encoding: {enc_used!r}") with stage("detect_delimiter", results): delim = detect_delimiter(path, enc_used) delim_used = delim if not results[-1]["error"] else "," print(f" detected delimiter: {delim_used!r}") raw_bytes = None with stage("path.read_bytes (1.25GB → memory)", results): raw_bytes = path.read_bytes() repair = None if raw_bytes is not None: with stage("repair_bytes (full file)", results): repair = repair_bytes(raw_bytes, encoding=enc_used, delimiter=delim_used) if repair: print(f" repair actions: {repair.summary()}") print(f" repaired size: {len(repair.repaired_bytes) / 1024**3:.2f} GB") # Free raw bytes before next stage so RSS deltas are honest. del raw_bytes gc.collect() findings = [] with stage("analyze (default sample_rows=1000)", results): findings = analyze(path, sample_rows=1000) print(f" findings: {sorted({f.id for f in findings})}") df_sample = None with stage("_load_for_analysis (1000 rows)", results): df_sample, _, _ = _load_for_analysis(path, sample_rows=1000) if df_sample is not None: print(f" sample df: {df_sample.shape}") if df_sample is not None and findings: with stage("auto_fix on 1000-row sample", results): sample_result = auto_fix(df_sample, findings) print(f" fixes applied: {len(sample_result.applied)}, cells changed: " f"{sum(a.cells_changed for a in sample_result.applied)}") # Now the heavy run: parse the FULL file, then auto_fix. print() print(" --- full-file pass (no sample cap) ---") full_df = None with stage("full read_csv via repaired bytes (full file)", results): if repair is not None: import pandas as pd full_df = pd.read_csv( io.BytesIO(repair.repaired_bytes), encoding="utf-8", delimiter=delim_used, dtype=str, keep_default_na=False, on_bad_lines="warn", ) if full_df is not None: print(f" full df: {full_df.shape}, " f"approx mem: {full_df.memory_usage(deep=True).sum() / 1024**3:.2f} GB") if full_df is not None: # Re-run analysis on the full DataFrame to get findings against # actual content, not the 1000-row sample. with stage("analyze full df (no sample cap)", results): full_findings = analyze(full_df, sample_rows=10**9) print(f" full findings: {sorted({f.id for f in full_findings})}") with stage("auto_fix full df (~7M rows)", results): full_result = auto_fix(full_df, full_findings) if full_result is not None: print(f" full fixes applied: {len(full_result.applied)}, " f"cells changed: {sum(a.cells_changed for a in full_result.applied)}") print(f" cleaned_bytes: {len(full_result.cleaned_bytes) / 1024**3:.2f} GB") # Cleanup if not args.keep and path.exists(): path.unlink() print() print(f"Removed {path}") # Summary table print() print("=== Summary ===") print(f"{'stage':<46} {'wall (s)':>10} {'RSS Δ (MB)':>12}") for r in results: suffix = " ⚠" if r["error"] else "" print(f"{r['stage']:<46} {r['wall_s']:>10.2f} {r['rss_delta_mb']:>+12.0f}{suffix}") print(f"\nPeak RSS: {max(r['rss_after_mb'] for r in results):.0f} MB") return 0 if __name__ == "__main__": sys.exit(main())