Generates a synthetic messy CSV at the target size, then runs every pipeline stage end-to-end (detect_encoding, repair_bytes, analyze, auto_fix on sample + full file) capturing wall-clock and peak RSS at each stage. Not part of the automated suite — invoke directly via ``python scripts/stress_1_25gb.py``. ``--keep`` to preserve the file between runs, ``--target-gb`` to tune the size. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
290 lines
10 KiB
Python
290 lines
10 KiB
Python
"""One-time 1.25 GB stress test for the analyzer + gate pipeline.
|
||
|
||
Not part of the automated suite. Generates a synthetic messy CSV at the
|
||
target size, runs every pipeline stage end-to-end, captures wall-clock
|
||
+ peak RSS at each stage, and prints a summary.
|
||
|
||
Run:
|
||
python scripts/stress_1_25gb.py [--keep] [--target-gb 1.25]
|
||
|
||
The generated file lives in $TMPDIR (default /tmp). With --keep the file
|
||
is not deleted after the run (useful for re-runs without regenerating).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import gc
|
||
import io
|
||
import os
|
||
import resource
|
||
import sys
|
||
import time
|
||
import tracemalloc
|
||
from contextlib import contextmanager
|
||
from pathlib import Path
|
||
from typing import Iterator
|
||
|
||
# Project root
|
||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Generator — designed so every analyzer detector finds something.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# 10 columns × ~180 bytes/row ≈ 1.25 GB at 7M rows. Tune via --target-gb.
|
||
HEADER = (
|
||
"id,Name ,\"email\",phone,city,notes,status,amount,date,“description”\n"
|
||
)
|
||
|
||
NAMES = [
|
||
"Alice Smith", " Bob Jones ", "Carol O’Connor",
|
||
"David Lee", "Eva Garcia", "Frank Miller", "Grace Kim",
|
||
"Henry Davis", "Iris Wong",
|
||
]
|
||
EMAILS = [
|
||
"alice@example.com", "BOB@Example.COM", "carol@example.com",
|
||
"DAVID@example.com", "eva@Example.com", "Frank@example.COM",
|
||
]
|
||
CITIES = ["New York", "Köln", "São Paulo", "Zürich", "Düsseldorf", "Madrid", "Tokyo"]
|
||
NOTES = [
|
||
"VIP — contact ASAP…",
|
||
"regular customer",
|
||
"follow up next quarter",
|
||
"needs “signed” agreement",
|
||
"5′ 11″ height noted",
|
||
"nice client",
|
||
]
|
||
STATUSES = ["active", "N/A", "TBD", "", "active", "unknown", "active"]
|
||
AMOUNTS = ["$1,500.00", "100.00", "250.50", "$50.00", "1,234.56", "75.00"]
|
||
DATES = ["2024-01-15", "2024-02-20", "2024-03-12", "2024-04-08", "2024-05-30"]
|
||
DESCRIPTIONS = [
|
||
"first ‘contact’ made",
|
||
"long-time client",
|
||
"referred by partner",
|
||
"premium support tier",
|
||
]
|
||
|
||
|
||
def gen_chunk(start_id: int, n_rows: int) -> str:
|
||
"""Build n_rows rows as a single string (cheap append to BufferedWriter)."""
|
||
out = []
|
||
for i in range(n_rows):
|
||
rid = start_id + i
|
||
name = NAMES[rid % len(NAMES)]
|
||
email = EMAILS[rid % len(EMAILS)]
|
||
city = CITIES[rid % len(CITIES)]
|
||
note = NOTES[rid % len(NOTES)]
|
||
status = STATUSES[rid % len(STATUSES)]
|
||
amount = AMOUNTS[rid % len(AMOUNTS)]
|
||
date = DATES[rid % len(DATES)]
|
||
desc = DESCRIPTIONS[rid % len(DESCRIPTIONS)]
|
||
# Note: 'amount' contains a comma — quoted to avoid breaking CSV.
|
||
out.append(
|
||
f"{rid},{name},{email},555-0{rid % 10000:04d},{city},"
|
||
f"{note},{status},\"{amount}\",{date},{desc}\n"
|
||
)
|
||
return "".join(out)
|
||
|
||
|
||
def generate_file(path: Path, target_gb: float) -> None:
|
||
"""Stream the synthetic CSV to disk in chunks until target size hit."""
|
||
target_bytes = int(target_gb * 1024**3)
|
||
rows_per_chunk = 50_000
|
||
total_rows = 0
|
||
written = 0
|
||
print(f" writing → {path} (target {target_gb} GB)")
|
||
t0 = time.perf_counter()
|
||
with path.open("w", encoding="utf-8", newline="") as fh:
|
||
fh.write(HEADER)
|
||
written += len(HEADER.encode("utf-8"))
|
||
while written < target_bytes:
|
||
chunk = gen_chunk(total_rows, rows_per_chunk)
|
||
fh.write(chunk)
|
||
written += len(chunk.encode("utf-8"))
|
||
total_rows += rows_per_chunk
|
||
if total_rows % 1_000_000 == 0:
|
||
print(
|
||
f" …{total_rows:,} rows, "
|
||
f"{written / 1024**3:.2f} GB in "
|
||
f"{time.perf_counter() - t0:.1f}s"
|
||
)
|
||
print(
|
||
f" done: {total_rows:,} rows, "
|
||
f"{written / 1024**3:.2f} GB in "
|
||
f"{time.perf_counter() - t0:.1f}s"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Stage runner — captures wall-clock + peak RSS delta per stage.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _rss_mb() -> float:
|
||
"""Current process RSS in MB."""
|
||
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024
|
||
|
||
|
||
@contextmanager
|
||
def stage(name: str, results: list) -> Iterator[None]:
|
||
gc.collect()
|
||
rss_before = _rss_mb()
|
||
t0 = time.perf_counter()
|
||
err: Exception | None = None
|
||
try:
|
||
yield
|
||
except Exception as e:
|
||
err = e
|
||
finally:
|
||
wall = time.perf_counter() - t0
|
||
rss_after = _rss_mb()
|
||
results.append({
|
||
"stage": name,
|
||
"wall_s": wall,
|
||
"rss_before_mb": rss_before,
|
||
"rss_after_mb": rss_after,
|
||
"rss_delta_mb": rss_after - rss_before,
|
||
"error": repr(err) if err else "",
|
||
})
|
||
print(
|
||
f" {name:<42} {wall:>8.2f}s "
|
||
f"RSS {rss_before:>7.0f} → {rss_after:>7.0f} MB "
|
||
f"(Δ {rss_after - rss_before:+.0f})"
|
||
+ (f" ERROR {err!r}" if err else "")
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> int:
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("--target-gb", type=float, default=1.25)
|
||
ap.add_argument("--keep", action="store_true",
|
||
help="Don't delete the test file at the end.")
|
||
ap.add_argument("--skip-generate", action="store_true",
|
||
help="Reuse an existing file at the target path.")
|
||
args = ap.parse_args()
|
||
|
||
tmp = Path(os.environ.get("TMPDIR", "/tmp"))
|
||
path = tmp / f"stress_{args.target_gb}gb.csv"
|
||
|
||
print(f"=== Stress test: {args.target_gb} GB ===")
|
||
print(f"Path: {path}")
|
||
print()
|
||
|
||
if not args.skip_generate or not path.exists():
|
||
print("[1/2] Generating fixture")
|
||
gen_t0 = time.perf_counter()
|
||
generate_file(path, args.target_gb)
|
||
gen_wall = time.perf_counter() - gen_t0
|
||
print(f"Generation total: {gen_wall:.1f}s")
|
||
print()
|
||
else:
|
||
print(f"[1/2] Reusing existing fixture ({path.stat().st_size / 1024**3:.2f} GB)")
|
||
print()
|
||
|
||
actual_gb = path.stat().st_size / 1024**3
|
||
print(f"[2/2] Pipeline run on {actual_gb:.2f} GB file")
|
||
print(f" {'stage':<42} {'wall':>9} {'RSS':>23}")
|
||
|
||
results: list[dict] = []
|
||
from src.core.io import detect_encoding, detect_delimiter, repair_bytes
|
||
from src.core.analyze import analyze, _load_for_analysis
|
||
from src.core.normalize import auto_fix, apply_decisions, is_normalized
|
||
|
||
with stage("detect_encoding", results):
|
||
enc = detect_encoding(path)
|
||
enc_used = enc if not results[-1]["error"] else "utf-8"
|
||
print(f" detected encoding: {enc_used!r}")
|
||
|
||
with stage("detect_delimiter", results):
|
||
delim = detect_delimiter(path, enc_used)
|
||
delim_used = delim if not results[-1]["error"] else ","
|
||
print(f" detected delimiter: {delim_used!r}")
|
||
|
||
raw_bytes = None
|
||
with stage("path.read_bytes (1.25GB → memory)", results):
|
||
raw_bytes = path.read_bytes()
|
||
|
||
repair = None
|
||
if raw_bytes is not None:
|
||
with stage("repair_bytes (full file)", results):
|
||
repair = repair_bytes(raw_bytes, encoding=enc_used, delimiter=delim_used)
|
||
if repair:
|
||
print(f" repair actions: {repair.summary()}")
|
||
print(f" repaired size: {len(repair.repaired_bytes) / 1024**3:.2f} GB")
|
||
|
||
# Free raw bytes before next stage so RSS deltas are honest.
|
||
del raw_bytes
|
||
gc.collect()
|
||
|
||
findings = []
|
||
with stage("analyze (default sample_rows=1000)", results):
|
||
findings = analyze(path, sample_rows=1000)
|
||
print(f" findings: {sorted({f.id for f in findings})}")
|
||
|
||
df_sample = None
|
||
with stage("_load_for_analysis (1000 rows)", results):
|
||
df_sample, _, _ = _load_for_analysis(path, sample_rows=1000)
|
||
if df_sample is not None:
|
||
print(f" sample df: {df_sample.shape}")
|
||
|
||
if df_sample is not None and findings:
|
||
with stage("auto_fix on 1000-row sample", results):
|
||
sample_result = auto_fix(df_sample, findings)
|
||
print(f" fixes applied: {len(sample_result.applied)}, cells changed: "
|
||
f"{sum(a.cells_changed for a in sample_result.applied)}")
|
||
|
||
# Now the heavy run: parse the FULL file, then auto_fix.
|
||
print()
|
||
print(" --- full-file pass (no sample cap) ---")
|
||
full_df = None
|
||
with stage("full read_csv via repaired bytes (full file)", results):
|
||
if repair is not None:
|
||
import pandas as pd
|
||
full_df = pd.read_csv(
|
||
io.BytesIO(repair.repaired_bytes),
|
||
encoding="utf-8", delimiter=delim_used,
|
||
dtype=str, keep_default_na=False, on_bad_lines="warn",
|
||
)
|
||
if full_df is not None:
|
||
print(f" full df: {full_df.shape}, "
|
||
f"approx mem: {full_df.memory_usage(deep=True).sum() / 1024**3:.2f} GB")
|
||
|
||
if full_df is not None:
|
||
# Re-run analysis on the full DataFrame to get findings against
|
||
# actual content, not the 1000-row sample.
|
||
with stage("analyze full df (no sample cap)", results):
|
||
full_findings = analyze(full_df, sample_rows=10**9)
|
||
print(f" full findings: {sorted({f.id for f in full_findings})}")
|
||
|
||
with stage("auto_fix full df (~7M rows)", results):
|
||
full_result = auto_fix(full_df, full_findings)
|
||
if full_result is not None:
|
||
print(f" full fixes applied: {len(full_result.applied)}, "
|
||
f"cells changed: {sum(a.cells_changed for a in full_result.applied)}")
|
||
print(f" cleaned_bytes: {len(full_result.cleaned_bytes) / 1024**3:.2f} GB")
|
||
|
||
# Cleanup
|
||
if not args.keep and path.exists():
|
||
path.unlink()
|
||
print()
|
||
print(f"Removed {path}")
|
||
|
||
# Summary table
|
||
print()
|
||
print("=== Summary ===")
|
||
print(f"{'stage':<46} {'wall (s)':>10} {'RSS Δ (MB)':>12}")
|
||
for r in results:
|
||
suffix = " ⚠" if r["error"] else ""
|
||
print(f"{r['stage']:<46} {r['wall_s']:>10.2f} {r['rss_delta_mb']:>+12.0f}{suffix}")
|
||
print(f"\nPeak RSS: {max(r['rss_after_mb'] for r in results):.0f} MB")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|