diff --git a/scripts/stress_1_25gb.py b/scripts/stress_1_25gb.py new file mode 100644 index 0000000..5c7bce8 --- /dev/null +++ b/scripts/stress_1_25gb.py @@ -0,0 +1,289 @@ +"""One-time 1.25 GB stress test for the analyzer + gate pipeline. + +Not part of the automated suite. Generates a synthetic messy CSV at the +target size, runs every pipeline stage end-to-end, captures wall-clock ++ peak RSS at each stage, and prints a summary. + +Run: + python scripts/stress_1_25gb.py [--keep] [--target-gb 1.25] + +The generated file lives in $TMPDIR (default /tmp). With --keep the file +is not deleted after the run (useful for re-runs without regenerating). +""" + +from __future__ import annotations + +import argparse +import gc +import io +import os +import resource +import sys +import time +import tracemalloc +from contextlib import contextmanager +from pathlib import Path +from typing import Iterator + +# Project root +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + + +# --------------------------------------------------------------------------- +# Generator — designed so every analyzer detector finds something. +# --------------------------------------------------------------------------- + +# 10 columns × ~180 bytes/row ≈ 1.25 GB at 7M rows. Tune via --target-gb. +HEADER = ( + "id,Name ,\"email​\",phone,city,notes,status,amount,date,“description”\n" +) + +NAMES = [ + "Alice Smith", " Bob Jones ", "Carol O’Connor", + "David Lee", "Eva Garcia", "Frank Miller", "Grace Kim", + "Henry Davis", "Iris Wong", +] +EMAILS = [ + "alice@example.com", "BOB@Example.COM", "carol@example.com", + "DAVID@example.com", "eva@Example.com", "Frank@example.COM", +] +CITIES = ["New York", "Köln", "São Paulo", "Zürich", "Düsseldorf", "Madrid", "Tokyo"] +NOTES = [ + "VIP — contact ASAP…", + "regular customer", + "follow up next quarter", + "needs “signed” agreement", + "5′ 11″ height noted", + "nice client", +] +STATUSES = ["active", "N/A", "TBD", "", "active", "unknown", "active"] +AMOUNTS = ["$1,500.00", "100.00", "250.50", "$50.00", "1,234.56", "75.00"] +DATES = ["2024-01-15", "2024-02-20", "2024-03-12", "2024-04-08", "2024-05-30"] +DESCRIPTIONS = [ + "first ‘contact’ made", + "long-time client", + "referred by partner", + "premium support tier", +] + + +def gen_chunk(start_id: int, n_rows: int) -> str: + """Build n_rows rows as a single string (cheap append to BufferedWriter).""" + out = [] + for i in range(n_rows): + rid = start_id + i + name = NAMES[rid % len(NAMES)] + email = EMAILS[rid % len(EMAILS)] + city = CITIES[rid % len(CITIES)] + note = NOTES[rid % len(NOTES)] + status = STATUSES[rid % len(STATUSES)] + amount = AMOUNTS[rid % len(AMOUNTS)] + date = DATES[rid % len(DATES)] + desc = DESCRIPTIONS[rid % len(DESCRIPTIONS)] + # Note: 'amount' contains a comma — quoted to avoid breaking CSV. + out.append( + f"{rid},{name},{email},555-0{rid % 10000:04d},{city}," + f"{note},{status},\"{amount}\",{date},{desc}\n" + ) + return "".join(out) + + +def generate_file(path: Path, target_gb: float) -> None: + """Stream the synthetic CSV to disk in chunks until target size hit.""" + target_bytes = int(target_gb * 1024**3) + rows_per_chunk = 50_000 + total_rows = 0 + written = 0 + print(f" writing → {path} (target {target_gb} GB)") + t0 = time.perf_counter() + with path.open("w", encoding="utf-8", newline="") as fh: + fh.write(HEADER) + written += len(HEADER.encode("utf-8")) + while written < target_bytes: + chunk = gen_chunk(total_rows, rows_per_chunk) + fh.write(chunk) + written += len(chunk.encode("utf-8")) + total_rows += rows_per_chunk + if total_rows % 1_000_000 == 0: + print( + f" …{total_rows:,} rows, " + f"{written / 1024**3:.2f} GB in " + f"{time.perf_counter() - t0:.1f}s" + ) + print( + f" done: {total_rows:,} rows, " + f"{written / 1024**3:.2f} GB in " + f"{time.perf_counter() - t0:.1f}s" + ) + + +# --------------------------------------------------------------------------- +# Stage runner — captures wall-clock + peak RSS delta per stage. +# --------------------------------------------------------------------------- + +def _rss_mb() -> float: + """Current process RSS in MB.""" + return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 + + +@contextmanager +def stage(name: str, results: list) -> Iterator[None]: + gc.collect() + rss_before = _rss_mb() + t0 = time.perf_counter() + err: Exception | None = None + try: + yield + except Exception as e: + err = e + finally: + wall = time.perf_counter() - t0 + rss_after = _rss_mb() + results.append({ + "stage": name, + "wall_s": wall, + "rss_before_mb": rss_before, + "rss_after_mb": rss_after, + "rss_delta_mb": rss_after - rss_before, + "error": repr(err) if err else "", + }) + print( + f" {name:<42} {wall:>8.2f}s " + f"RSS {rss_before:>7.0f} → {rss_after:>7.0f} MB " + f"(Δ {rss_after - rss_before:+.0f})" + + (f" ERROR {err!r}" if err else "") + ) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--target-gb", type=float, default=1.25) + ap.add_argument("--keep", action="store_true", + help="Don't delete the test file at the end.") + ap.add_argument("--skip-generate", action="store_true", + help="Reuse an existing file at the target path.") + args = ap.parse_args() + + tmp = Path(os.environ.get("TMPDIR", "/tmp")) + path = tmp / f"stress_{args.target_gb}gb.csv" + + print(f"=== Stress test: {args.target_gb} GB ===") + print(f"Path: {path}") + print() + + if not args.skip_generate or not path.exists(): + print("[1/2] Generating fixture") + gen_t0 = time.perf_counter() + generate_file(path, args.target_gb) + gen_wall = time.perf_counter() - gen_t0 + print(f"Generation total: {gen_wall:.1f}s") + print() + else: + print(f"[1/2] Reusing existing fixture ({path.stat().st_size / 1024**3:.2f} GB)") + print() + + actual_gb = path.stat().st_size / 1024**3 + print(f"[2/2] Pipeline run on {actual_gb:.2f} GB file") + print(f" {'stage':<42} {'wall':>9} {'RSS':>23}") + + results: list[dict] = [] + from src.core.io import detect_encoding, detect_delimiter, repair_bytes + from src.core.analyze import analyze, _load_for_analysis + from src.core.normalize import auto_fix, apply_decisions, is_normalized + + with stage("detect_encoding", results): + enc = detect_encoding(path) + enc_used = enc if not results[-1]["error"] else "utf-8" + print(f" detected encoding: {enc_used!r}") + + with stage("detect_delimiter", results): + delim = detect_delimiter(path, enc_used) + delim_used = delim if not results[-1]["error"] else "," + print(f" detected delimiter: {delim_used!r}") + + raw_bytes = None + with stage("path.read_bytes (1.25GB → memory)", results): + raw_bytes = path.read_bytes() + + repair = None + if raw_bytes is not None: + with stage("repair_bytes (full file)", results): + repair = repair_bytes(raw_bytes, encoding=enc_used, delimiter=delim_used) + if repair: + print(f" repair actions: {repair.summary()}") + print(f" repaired size: {len(repair.repaired_bytes) / 1024**3:.2f} GB") + + # Free raw bytes before next stage so RSS deltas are honest. + del raw_bytes + gc.collect() + + findings = [] + with stage("analyze (default sample_rows=1000)", results): + findings = analyze(path, sample_rows=1000) + print(f" findings: {sorted({f.id for f in findings})}") + + df_sample = None + with stage("_load_for_analysis (1000 rows)", results): + df_sample, _, _ = _load_for_analysis(path, sample_rows=1000) + if df_sample is not None: + print(f" sample df: {df_sample.shape}") + + if df_sample is not None and findings: + with stage("auto_fix on 1000-row sample", results): + sample_result = auto_fix(df_sample, findings) + print(f" fixes applied: {len(sample_result.applied)}, cells changed: " + f"{sum(a.cells_changed for a in sample_result.applied)}") + + # Now the heavy run: parse the FULL file, then auto_fix. + print() + print(" --- full-file pass (no sample cap) ---") + full_df = None + with stage("full read_csv via repaired bytes (full file)", results): + if repair is not None: + import pandas as pd + full_df = pd.read_csv( + io.BytesIO(repair.repaired_bytes), + encoding="utf-8", delimiter=delim_used, + dtype=str, keep_default_na=False, on_bad_lines="warn", + ) + if full_df is not None: + print(f" full df: {full_df.shape}, " + f"approx mem: {full_df.memory_usage(deep=True).sum() / 1024**3:.2f} GB") + + if full_df is not None: + # Re-run analysis on the full DataFrame to get findings against + # actual content, not the 1000-row sample. + with stage("analyze full df (no sample cap)", results): + full_findings = analyze(full_df, sample_rows=10**9) + print(f" full findings: {sorted({f.id for f in full_findings})}") + + with stage("auto_fix full df (~7M rows)", results): + full_result = auto_fix(full_df, full_findings) + if full_result is not None: + print(f" full fixes applied: {len(full_result.applied)}, " + f"cells changed: {sum(a.cells_changed for a in full_result.applied)}") + print(f" cleaned_bytes: {len(full_result.cleaned_bytes) / 1024**3:.2f} GB") + + # Cleanup + if not args.keep and path.exists(): + path.unlink() + print() + print(f"Removed {path}") + + # Summary table + print() + print("=== Summary ===") + print(f"{'stage':<46} {'wall (s)':>10} {'RSS Δ (MB)':>12}") + for r in results: + suffix = " ⚠" if r["error"] else "" + print(f"{r['stage']:<46} {r['wall_s']:>10.2f} {r['rss_delta_mb']:>+12.0f}{suffix}") + print(f"\nPeak RSS: {max(r['rss_after_mb'] for r in results):.0f} MB") + return 0 + + +if __name__ == "__main__": + sys.exit(main())