Files
datatools-dev/tests/test_format_streaming.py
Michael 966af8ef94 feat: 3 new tools, format streaming, distribution-ready demo + landing pages
Tools shipped this batch (4 → 6 of 9 Ready):
  04 Missing Value Handler   src/core/missing.py + cli_missing.py + GUI
  05 Column Mapper           src/core/column_mapper.py + cli_column_map.py + GUI
  09 Pipeline Runner         src/core/pipeline.py + cli_pipeline.py + GUI
                             with soft tool-dependency graph (recommended,
                             not enforced) and JSON save/load for repeatable
                             weekly cleanups.

Format Standardizer reworked for 1 GB international files:
  • Vectorised dispatch + LRU cache over phone/date/currency/boolean/email
  • Per-row country / address columns drive parsing
  • Audit cap (default 10 k rows, ~50 MB RAM)
  • standardize_file(): chunked streaming entry point (~165 k rows/sec)
  • currency_decimal="auto" for EU comma-decimal locales
  • R$ / kr / zł multi-char currency prefixes
  • cli_format.py with auto-stream above 100 MB inputs

Encoding detection arbiter + language-aware probe:
  Closes the last 4 xfails (cp1250 / mac_iceland / shift_jis_2004 / lying-BOM)
  via tied-confidence arbiter + Cyrillic / EE-Latin coverage probes.

Distribution-readiness assets:
  • streamlit_app.py — Streamlit Community Cloud entry shim
  • src/gui/app_demo.py — single-page demo, ?p=<persona> routing,
    100-row cap + watermark, free-vs-paid boundary enforced at surface
  • samples/demo/ — 3 niche datasets + pre-tuned pipeline JSONs
  • landing/ — 4 static HTML pages (apex chooser + 3 niche),
    shared CSS, deploy.py URL-substitution script,
    auto-generated robots.txt + sitemap.xml + 404.html + favicon
  • docs/PLAN.md, DEMO-PLAN.md, DEPLOYMENT.md, POST-LAUNCH.md, NEXT-STEPS.md
    — full strategy + measurement + deployment + master checklist

Test counts:
  before: 1,520 passed · 4 skipped · 17 xfailed
  after:  1,729 passed · 0 skipped · 0  xfailed

Tier-1 corpora added:
  • missing-corpus           3 use cases + 16 edge cases
  • column-mapper-corpus     3 use cases + 5 edge cases
  • format-cleaner intl      20-row 13-country stress fixture

Engine hardening flushed out by the corpora:
  • interpolate guards against object-dtype columns
  • mean/median skip all-NaN columns (silences numpy warning)
  • fillna runs under future.no_silent_downcasting (silences pandas warning)
  • mojibake test no longer skips when ftfy installed (monkeypatch path)
  • drop-row threshold semantics: strict-greater (consistent across rows / cols)
  • currency_decimal validator allow-set updated for "auto"

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 22:31:26 +00:00

302 lines
11 KiB
Python

"""Tests for the format-standardizer rework: cache, vectorized dispatch,
per-row country, audit cap, and streaming entry point."""
from __future__ import annotations
import csv
from pathlib import Path
import pandas as pd
import pytest
from src.core.format_standardize import (
FieldType,
StandardizeOptions,
StreamingStandardizeResult,
_normalize_region,
standardize_dataframe,
standardize_file,
)
# ---------------------------------------------------------------------------
# Per-row country / region
# ---------------------------------------------------------------------------
class TestPerRowCountry:
def test_phone_uses_per_row_country(self):
df = pd.DataFrame({
"phone": ["020 7946 0958", "03-3210-7000", "(415) 555-1234"],
"country": ["GB", "JP", "US"],
})
opts = StandardizeOptions(
column_types={"phone": FieldType.PHONE},
phone_country_column="country",
)
res = standardize_dataframe(df, opts)
out = res.standardized_df["phone"].tolist()
assert out[0].startswith("+44")
assert out[1].startswith("+81")
assert out[2].startswith("+1")
def test_phone_country_full_name_resolved(self):
df = pd.DataFrame({
"phone": ["020 7946 0958"],
"country": ["United Kingdom"],
})
opts = StandardizeOptions(
column_types={"phone": FieldType.PHONE},
phone_country_column="country",
)
res = standardize_dataframe(df, opts)
assert res.standardized_df["phone"].iloc[0].startswith("+44")
def test_blank_country_falls_back_to_default(self):
df = pd.DataFrame({
"phone": ["(415) 555-1234"],
"country": [""], # blank → use default region
})
opts = StandardizeOptions(
column_types={"phone": FieldType.PHONE},
phone_country_column="country",
phone_region="US",
)
res = standardize_dataframe(df, opts)
assert res.standardized_df["phone"].iloc[0] == "+14155551234"
def test_unknown_country_column_raises(self):
df = pd.DataFrame({"phone": ["x"]})
opts = StandardizeOptions(
column_types={"phone": FieldType.PHONE},
phone_country_column="missing_col",
)
from src.core.errors import InputValidationError
with pytest.raises(InputValidationError):
standardize_dataframe(df, opts)
class TestNormalizeRegion:
def test_iso2_passthrough(self):
assert _normalize_region("US") == "US"
assert _normalize_region("us") == "US"
assert _normalize_region(" jp ") == "JP"
def test_iso3_mapped(self):
assert _normalize_region("USA") == "US"
assert _normalize_region("GBR") == "GB"
assert _normalize_region("JPN") == "JP"
def test_full_name(self):
assert _normalize_region("United States") == "US"
assert _normalize_region("Japan") == "JP"
assert _normalize_region("Brazil") == "BR"
assert _normalize_region("brasil") == "BR"
assert _normalize_region("España") == "ES"
def test_blank_or_unknown(self):
assert _normalize_region("") is None
assert _normalize_region(" ") is None
assert _normalize_region(None) is None
assert _normalize_region("xyz-no-such-country") is None
# ---------------------------------------------------------------------------
# Audit cap
# ---------------------------------------------------------------------------
class TestAuditCap:
def test_cap_truncates_change_rows(self):
df = pd.DataFrame({
"phone": ["(415) 555-12{:02d}".format(i) for i in range(50)],
})
opts = StandardizeOptions(
column_types={"phone": FieldType.PHONE},
audit_max_rows=5,
)
res = standardize_dataframe(df, opts)
# cells_changed counts everything; the audit table is capped.
assert res.cells_changed == 50
assert len(res.changes) == 5
def test_unbounded_audit(self):
df = pd.DataFrame({
"phone": ["(415) 555-12{:02d}".format(i) for i in range(20)],
})
opts = StandardizeOptions(
column_types={"phone": FieldType.PHONE},
audit_max_rows=None,
)
res = standardize_dataframe(df, opts)
assert len(res.changes) == 20
# ---------------------------------------------------------------------------
# Cache + vectorized dispatch (correctness)
# ---------------------------------------------------------------------------
class TestCacheCorrectness:
def test_repeated_phone_consistent(self):
# 1000 copies of the same phone should produce identical output.
df = pd.DataFrame({"phone": ["(415) 555-1234"] * 1000})
opts = StandardizeOptions(
column_types={"phone": FieldType.PHONE},
audit_max_rows=None,
)
res = standardize_dataframe(df, opts)
assert (res.standardized_df["phone"] == "+14155551234").all()
assert res.cells_changed == 1000
def test_cache_disabled_still_works(self):
df = pd.DataFrame({"phone": ["(415) 555-1234", "020 7946 0958"]})
opts = StandardizeOptions(
column_types={"phone": FieldType.PHONE},
cache_size=0, # disabled
)
res = standardize_dataframe(df, opts)
assert res.standardized_df["phone"].iloc[0] == "+14155551234"
# ---------------------------------------------------------------------------
# Streaming standardize_file
# ---------------------------------------------------------------------------
class TestStandardizeFile:
def test_basic_streaming(self, tmp_path):
inp = tmp_path / "in.csv"
inp.write_text(
"phone,country,price\n"
"(415) 555-1234,US,$1500.00\n"
"020 7946 0958,GB,£99.99\n"
"03-3210-7000,JP,¥12000\n"
"+33 1 42 86 82 00,FR,€850.50\n"
)
out = tmp_path / "out.csv"
opts = StandardizeOptions(
column_types={"phone": FieldType.PHONE, "price": FieldType.CURRENCY},
phone_country_column="country",
currency_preserve_code=True,
)
res = standardize_file(inp, out, opts, chunk_size=2)
assert isinstance(res, StreamingStandardizeResult)
assert res.rows_processed == 4
assert res.chunks_processed == 2
assert out.exists()
out_df = pd.read_csv(out, dtype=str, keep_default_na=False)
assert out_df["phone"].iloc[0].startswith("+1")
assert out_df["phone"].iloc[1].startswith("+44")
assert out_df["phone"].iloc[2].startswith("+81")
assert out_df["phone"].iloc[3].startswith("+33")
def test_audit_capped_across_chunks(self, tmp_path):
# 60 rows, audit cap 10, chunks of 20 → audit must stop at 10.
inp = tmp_path / "in.csv"
rows = ["phone\n"] + [f"(415) 555-12{i:02d}\n" for i in range(60)]
inp.write_text("".join(rows))
out = tmp_path / "out.csv"
opts = StandardizeOptions(
column_types={"phone": FieldType.PHONE},
audit_max_rows=10,
)
res = standardize_file(inp, out, opts, chunk_size=20)
# Audit file exists and has exactly 10 data rows + 1 header.
audit_lines = res.audit_path.read_text().splitlines()
assert len(audit_lines) - 1 == 10
def test_audit_row_indices_are_global(self, tmp_path):
# Audit row numbers must reflect absolute file position, not chunk-local.
inp = tmp_path / "in.csv"
rows = ["phone\n"] + [f"(415) 555-12{i:02d}\n" for i in range(30)]
inp.write_text("".join(rows))
out = tmp_path / "out.csv"
opts = StandardizeOptions(
column_types={"phone": FieldType.PHONE},
audit_max_rows=None,
)
res = standardize_file(inp, out, opts, chunk_size=10)
audit = pd.read_csv(res.audit_path)
# Rows should be 0..29, monotonically increasing.
assert audit["row"].tolist() == list(range(30))
def test_progress_callback_fires(self, tmp_path):
inp = tmp_path / "in.csv"
inp.write_text("phone\n" + "\n".join("(415) 555-1234" for _ in range(20)) + "\n")
out = tmp_path / "out.csv"
opts = StandardizeOptions(column_types={"phone": FieldType.PHONE})
seen: list[tuple[int, int]] = []
def cb(rows, chunks):
seen.append((rows, chunks))
standardize_file(inp, out, opts, chunk_size=5, progress_callback=cb)
assert len(seen) == 4
assert seen[-1] == (20, 4)
def test_progress_callback_exception_does_not_abort(self, tmp_path):
inp = tmp_path / "in.csv"
inp.write_text("phone\n(415) 555-1234\n")
out = tmp_path / "out.csv"
opts = StandardizeOptions(column_types={"phone": FieldType.PHONE})
def bad_cb(*a, **k):
raise RuntimeError("boom")
# Must not raise.
res = standardize_file(inp, out, opts, chunk_size=1, progress_callback=bad_cb)
assert res.rows_processed == 1
def test_missing_input_raises_clean_error(self, tmp_path):
from src.core.errors import FileAccessError
opts = StandardizeOptions(column_types={"phone": FieldType.PHONE})
with pytest.raises(FileAccessError):
standardize_file(
tmp_path / "missing.csv",
tmp_path / "out.csv",
opts,
)
# ---------------------------------------------------------------------------
# International coverage smoke
# ---------------------------------------------------------------------------
class TestInternationalCoverage:
@pytest.mark.parametrize("number,country,prefix", [
("020 7946 0958", "GB", "+44"),
("03-3210-7000", "JP", "+81"),
("+49 30 12345678", "DE", "+49"),
("01 42 86 82 00", "FR", "+33"),
("+39 06 6982", "IT", "+39"),
("+34 91 411 1111", "ES", "+34"),
("+86 10 1234 5678", "CN", "+86"),
("+91 11 2345 6789", "IN", "+91"),
("+61 2 9374 4000", "AU", "+61"),
("11 3071 0000", "BR", "+55"),
("+52 55 5555 0000", "MX", "+52"),
("+82 2 2287 0114", "KR", "+82"),
])
def test_phone_via_per_row_region(self, number, country, prefix):
df = pd.DataFrame({"phone": [number], "country": [country]})
opts = StandardizeOptions(
column_types={"phone": FieldType.PHONE},
phone_country_column="country",
)
res = standardize_dataframe(df, opts)
out = res.standardized_df["phone"].iloc[0]
assert out.startswith(prefix), (
f"{number!r} ({country}): expected to start with {prefix}, got {out!r}"
)
@pytest.mark.parametrize("price,want_code", [
("$1,500.00", "USD"),
("€850,50", "EUR"),
("£99.99", "GBP"),
("¥12000", "JPY"),
("R$ 250,00", "BRL"),
("CHF 1200.00", "CHF"),
])
def test_currency_codes_detected(self, price, want_code):
df = pd.DataFrame({"price": [price]})
opts = StandardizeOptions(
column_types={"price": FieldType.CURRENCY},
currency_preserve_code=True,
currency_decimal="auto", # international mode
)
res = standardize_dataframe(df, opts)
assert want_code in res.standardized_df["price"].iloc[0]