diff --git a/scripts/stress_1_25gb.py b/scripts/stress_1_25gb.py deleted file mode 100644 index 5c7bce8..0000000 --- a/scripts/stress_1_25gb.py +++ /dev/null @@ -1,289 +0,0 @@ -"""One-time 1.25 GB stress test for the analyzer + gate pipeline. - -Not part of the automated suite. Generates a synthetic messy CSV at the -target size, runs every pipeline stage end-to-end, captures wall-clock -+ peak RSS at each stage, and prints a summary. - -Run: - python scripts/stress_1_25gb.py [--keep] [--target-gb 1.25] - -The generated file lives in $TMPDIR (default /tmp). With --keep the file -is not deleted after the run (useful for re-runs without regenerating). -""" - -from __future__ import annotations - -import argparse -import gc -import io -import os -import resource -import sys -import time -import tracemalloc -from contextlib import contextmanager -from pathlib import Path -from typing import Iterator - -# Project root -sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) - - -# --------------------------------------------------------------------------- -# Generator — designed so every analyzer detector finds something. -# --------------------------------------------------------------------------- - -# 10 columns × ~180 bytes/row ≈ 1.25 GB at 7M rows. Tune via --target-gb. -HEADER = ( - "id,Name ,\"email​\",phone,city,notes,status,amount,date,“description”\n" -) - -NAMES = [ - "Alice Smith", " Bob Jones ", "Carol O’Connor", - "David Lee", "Eva Garcia", "Frank Miller", "Grace Kim", - "Henry Davis", "Iris Wong", -] -EMAILS = [ - "alice@example.com", "BOB@Example.COM", "carol@example.com", - "DAVID@example.com", "eva@Example.com", "Frank@example.COM", -] -CITIES = ["New York", "Köln", "São Paulo", "Zürich", "Düsseldorf", "Madrid", "Tokyo"] -NOTES = [ - "VIP — contact ASAP…", - "regular customer", - "follow up next quarter", - "needs “signed” agreement", - "5′ 11″ height noted", - "nice client", -] -STATUSES = ["active", "N/A", "TBD", "", "active", "unknown", "active"] -AMOUNTS = ["$1,500.00", "100.00", "250.50", "$50.00", "1,234.56", "75.00"] -DATES = ["2024-01-15", "2024-02-20", "2024-03-12", "2024-04-08", "2024-05-30"] -DESCRIPTIONS = [ - "first ‘contact’ made", - "long-time client", - "referred by partner", - "premium support tier", -] - - -def gen_chunk(start_id: int, n_rows: int) -> str: - """Build n_rows rows as a single string (cheap append to BufferedWriter).""" - out = [] - for i in range(n_rows): - rid = start_id + i - name = NAMES[rid % len(NAMES)] - email = EMAILS[rid % len(EMAILS)] - city = CITIES[rid % len(CITIES)] - note = NOTES[rid % len(NOTES)] - status = STATUSES[rid % len(STATUSES)] - amount = AMOUNTS[rid % len(AMOUNTS)] - date = DATES[rid % len(DATES)] - desc = DESCRIPTIONS[rid % len(DESCRIPTIONS)] - # Note: 'amount' contains a comma — quoted to avoid breaking CSV. - out.append( - f"{rid},{name},{email},555-0{rid % 10000:04d},{city}," - f"{note},{status},\"{amount}\",{date},{desc}\n" - ) - return "".join(out) - - -def generate_file(path: Path, target_gb: float) -> None: - """Stream the synthetic CSV to disk in chunks until target size hit.""" - target_bytes = int(target_gb * 1024**3) - rows_per_chunk = 50_000 - total_rows = 0 - written = 0 - print(f" writing → {path} (target {target_gb} GB)") - t0 = time.perf_counter() - with path.open("w", encoding="utf-8", newline="") as fh: - fh.write(HEADER) - written += len(HEADER.encode("utf-8")) - while written < target_bytes: - chunk = gen_chunk(total_rows, rows_per_chunk) - fh.write(chunk) - written += len(chunk.encode("utf-8")) - total_rows += rows_per_chunk - if total_rows % 1_000_000 == 0: - print( - f" …{total_rows:,} rows, " - f"{written / 1024**3:.2f} GB in " - f"{time.perf_counter() - t0:.1f}s" - ) - print( - f" done: {total_rows:,} rows, " - f"{written / 1024**3:.2f} GB in " - f"{time.perf_counter() - t0:.1f}s" - ) - - -# --------------------------------------------------------------------------- -# Stage runner — captures wall-clock + peak RSS delta per stage. -# --------------------------------------------------------------------------- - -def _rss_mb() -> float: - """Current process RSS in MB.""" - return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 - - -@contextmanager -def stage(name: str, results: list) -> Iterator[None]: - gc.collect() - rss_before = _rss_mb() - t0 = time.perf_counter() - err: Exception | None = None - try: - yield - except Exception as e: - err = e - finally: - wall = time.perf_counter() - t0 - rss_after = _rss_mb() - results.append({ - "stage": name, - "wall_s": wall, - "rss_before_mb": rss_before, - "rss_after_mb": rss_after, - "rss_delta_mb": rss_after - rss_before, - "error": repr(err) if err else "", - }) - print( - f" {name:<42} {wall:>8.2f}s " - f"RSS {rss_before:>7.0f} → {rss_after:>7.0f} MB " - f"(Δ {rss_after - rss_before:+.0f})" - + (f" ERROR {err!r}" if err else "") - ) - - -# --------------------------------------------------------------------------- -# Main -# --------------------------------------------------------------------------- - -def main() -> int: - ap = argparse.ArgumentParser() - ap.add_argument("--target-gb", type=float, default=1.25) - ap.add_argument("--keep", action="store_true", - help="Don't delete the test file at the end.") - ap.add_argument("--skip-generate", action="store_true", - help="Reuse an existing file at the target path.") - args = ap.parse_args() - - tmp = Path(os.environ.get("TMPDIR", "/tmp")) - path = tmp / f"stress_{args.target_gb}gb.csv" - - print(f"=== Stress test: {args.target_gb} GB ===") - print(f"Path: {path}") - print() - - if not args.skip_generate or not path.exists(): - print("[1/2] Generating fixture") - gen_t0 = time.perf_counter() - generate_file(path, args.target_gb) - gen_wall = time.perf_counter() - gen_t0 - print(f"Generation total: {gen_wall:.1f}s") - print() - else: - print(f"[1/2] Reusing existing fixture ({path.stat().st_size / 1024**3:.2f} GB)") - print() - - actual_gb = path.stat().st_size / 1024**3 - print(f"[2/2] Pipeline run on {actual_gb:.2f} GB file") - print(f" {'stage':<42} {'wall':>9} {'RSS':>23}") - - results: list[dict] = [] - from src.core.io import detect_encoding, detect_delimiter, repair_bytes - from src.core.analyze import analyze, _load_for_analysis - from src.core.normalize import auto_fix, apply_decisions, is_normalized - - with stage("detect_encoding", results): - enc = detect_encoding(path) - enc_used = enc if not results[-1]["error"] else "utf-8" - print(f" detected encoding: {enc_used!r}") - - with stage("detect_delimiter", results): - delim = detect_delimiter(path, enc_used) - delim_used = delim if not results[-1]["error"] else "," - print(f" detected delimiter: {delim_used!r}") - - raw_bytes = None - with stage("path.read_bytes (1.25GB → memory)", results): - raw_bytes = path.read_bytes() - - repair = None - if raw_bytes is not None: - with stage("repair_bytes (full file)", results): - repair = repair_bytes(raw_bytes, encoding=enc_used, delimiter=delim_used) - if repair: - print(f" repair actions: {repair.summary()}") - print(f" repaired size: {len(repair.repaired_bytes) / 1024**3:.2f} GB") - - # Free raw bytes before next stage so RSS deltas are honest. - del raw_bytes - gc.collect() - - findings = [] - with stage("analyze (default sample_rows=1000)", results): - findings = analyze(path, sample_rows=1000) - print(f" findings: {sorted({f.id for f in findings})}") - - df_sample = None - with stage("_load_for_analysis (1000 rows)", results): - df_sample, _, _ = _load_for_analysis(path, sample_rows=1000) - if df_sample is not None: - print(f" sample df: {df_sample.shape}") - - if df_sample is not None and findings: - with stage("auto_fix on 1000-row sample", results): - sample_result = auto_fix(df_sample, findings) - print(f" fixes applied: {len(sample_result.applied)}, cells changed: " - f"{sum(a.cells_changed for a in sample_result.applied)}") - - # Now the heavy run: parse the FULL file, then auto_fix. - print() - print(" --- full-file pass (no sample cap) ---") - full_df = None - with stage("full read_csv via repaired bytes (full file)", results): - if repair is not None: - import pandas as pd - full_df = pd.read_csv( - io.BytesIO(repair.repaired_bytes), - encoding="utf-8", delimiter=delim_used, - dtype=str, keep_default_na=False, on_bad_lines="warn", - ) - if full_df is not None: - print(f" full df: {full_df.shape}, " - f"approx mem: {full_df.memory_usage(deep=True).sum() / 1024**3:.2f} GB") - - if full_df is not None: - # Re-run analysis on the full DataFrame to get findings against - # actual content, not the 1000-row sample. - with stage("analyze full df (no sample cap)", results): - full_findings = analyze(full_df, sample_rows=10**9) - print(f" full findings: {sorted({f.id for f in full_findings})}") - - with stage("auto_fix full df (~7M rows)", results): - full_result = auto_fix(full_df, full_findings) - if full_result is not None: - print(f" full fixes applied: {len(full_result.applied)}, " - f"cells changed: {sum(a.cells_changed for a in full_result.applied)}") - print(f" cleaned_bytes: {len(full_result.cleaned_bytes) / 1024**3:.2f} GB") - - # Cleanup - if not args.keep and path.exists(): - path.unlink() - print() - print(f"Removed {path}") - - # Summary table - print() - print("=== Summary ===") - print(f"{'stage':<46} {'wall (s)':>10} {'RSS Δ (MB)':>12}") - for r in results: - suffix = " ⚠" if r["error"] else "" - print(f"{r['stage']:<46} {r['wall_s']:>10.2f} {r['rss_delta_mb']:>+12.0f}{suffix}") - print(f"\nPeak RSS: {max(r['rss_after_mb'] for r in results):.0f} MB") - return 0 - - -if __name__ == "__main__": - sys.exit(main())