Tools shipped this batch (4 → 6 of 9 Ready):
04 Missing Value Handler src/core/missing.py + cli_missing.py + GUI
05 Column Mapper src/core/column_mapper.py + cli_column_map.py + GUI
09 Pipeline Runner src/core/pipeline.py + cli_pipeline.py + GUI
with soft tool-dependency graph (recommended,
not enforced) and JSON save/load for repeatable
weekly cleanups.
Format Standardizer reworked for 1 GB international files:
• Vectorised dispatch + LRU cache over phone/date/currency/boolean/email
• Per-row country / address columns drive parsing
• Audit cap (default 10 k rows, ~50 MB RAM)
• standardize_file(): chunked streaming entry point (~165 k rows/sec)
• currency_decimal="auto" for EU comma-decimal locales
• R$ / kr / zł multi-char currency prefixes
• cli_format.py with auto-stream above 100 MB inputs
Encoding detection arbiter + language-aware probe:
Closes the last 4 xfails (cp1250 / mac_iceland / shift_jis_2004 / lying-BOM)
via tied-confidence arbiter + Cyrillic / EE-Latin coverage probes.
Distribution-readiness assets:
• streamlit_app.py — Streamlit Community Cloud entry shim
• src/gui/app_demo.py — single-page demo, ?p=<persona> routing,
100-row cap + watermark, free-vs-paid boundary enforced at surface
• samples/demo/ — 3 niche datasets + pre-tuned pipeline JSONs
• landing/ — 4 static HTML pages (apex chooser + 3 niche),
shared CSS, deploy.py URL-substitution script,
auto-generated robots.txt + sitemap.xml + 404.html + favicon
• docs/PLAN.md, DEMO-PLAN.md, DEPLOYMENT.md, POST-LAUNCH.md, NEXT-STEPS.md
— full strategy + measurement + deployment + master checklist
Test counts:
before: 1,520 passed · 4 skipped · 17 xfailed
after: 1,729 passed · 0 skipped · 0 xfailed
Tier-1 corpora added:
• missing-corpus 3 use cases + 16 edge cases
• column-mapper-corpus 3 use cases + 5 edge cases
• format-cleaner intl 20-row 13-country stress fixture
Engine hardening flushed out by the corpora:
• interpolate guards against object-dtype columns
• mean/median skip all-NaN columns (silences numpy warning)
• fillna runs under future.no_silent_downcasting (silences pandas warning)
• mojibake test no longer skips when ftfy installed (monkeypatch path)
• drop-row threshold semantics: strict-greater (consistent across rows / cols)
• currency_decimal validator allow-set updated for "auto"
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
302 lines
11 KiB
Python
302 lines
11 KiB
Python
"""Tests for the format-standardizer rework: cache, vectorized dispatch,
|
|
per-row country, audit cap, and streaming entry point."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
import pytest
|
|
|
|
from src.core.format_standardize import (
|
|
FieldType,
|
|
StandardizeOptions,
|
|
StreamingStandardizeResult,
|
|
_normalize_region,
|
|
standardize_dataframe,
|
|
standardize_file,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-row country / region
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPerRowCountry:
|
|
def test_phone_uses_per_row_country(self):
|
|
df = pd.DataFrame({
|
|
"phone": ["020 7946 0958", "03-3210-7000", "(415) 555-1234"],
|
|
"country": ["GB", "JP", "US"],
|
|
})
|
|
opts = StandardizeOptions(
|
|
column_types={"phone": FieldType.PHONE},
|
|
phone_country_column="country",
|
|
)
|
|
res = standardize_dataframe(df, opts)
|
|
out = res.standardized_df["phone"].tolist()
|
|
assert out[0].startswith("+44")
|
|
assert out[1].startswith("+81")
|
|
assert out[2].startswith("+1")
|
|
|
|
def test_phone_country_full_name_resolved(self):
|
|
df = pd.DataFrame({
|
|
"phone": ["020 7946 0958"],
|
|
"country": ["United Kingdom"],
|
|
})
|
|
opts = StandardizeOptions(
|
|
column_types={"phone": FieldType.PHONE},
|
|
phone_country_column="country",
|
|
)
|
|
res = standardize_dataframe(df, opts)
|
|
assert res.standardized_df["phone"].iloc[0].startswith("+44")
|
|
|
|
def test_blank_country_falls_back_to_default(self):
|
|
df = pd.DataFrame({
|
|
"phone": ["(415) 555-1234"],
|
|
"country": [""], # blank → use default region
|
|
})
|
|
opts = StandardizeOptions(
|
|
column_types={"phone": FieldType.PHONE},
|
|
phone_country_column="country",
|
|
phone_region="US",
|
|
)
|
|
res = standardize_dataframe(df, opts)
|
|
assert res.standardized_df["phone"].iloc[0] == "+14155551234"
|
|
|
|
def test_unknown_country_column_raises(self):
|
|
df = pd.DataFrame({"phone": ["x"]})
|
|
opts = StandardizeOptions(
|
|
column_types={"phone": FieldType.PHONE},
|
|
phone_country_column="missing_col",
|
|
)
|
|
from src.core.errors import InputValidationError
|
|
with pytest.raises(InputValidationError):
|
|
standardize_dataframe(df, opts)
|
|
|
|
|
|
class TestNormalizeRegion:
|
|
def test_iso2_passthrough(self):
|
|
assert _normalize_region("US") == "US"
|
|
assert _normalize_region("us") == "US"
|
|
assert _normalize_region(" jp ") == "JP"
|
|
|
|
def test_iso3_mapped(self):
|
|
assert _normalize_region("USA") == "US"
|
|
assert _normalize_region("GBR") == "GB"
|
|
assert _normalize_region("JPN") == "JP"
|
|
|
|
def test_full_name(self):
|
|
assert _normalize_region("United States") == "US"
|
|
assert _normalize_region("Japan") == "JP"
|
|
assert _normalize_region("Brazil") == "BR"
|
|
assert _normalize_region("brasil") == "BR"
|
|
assert _normalize_region("España") == "ES"
|
|
|
|
def test_blank_or_unknown(self):
|
|
assert _normalize_region("") is None
|
|
assert _normalize_region(" ") is None
|
|
assert _normalize_region(None) is None
|
|
assert _normalize_region("xyz-no-such-country") is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Audit cap
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestAuditCap:
|
|
def test_cap_truncates_change_rows(self):
|
|
df = pd.DataFrame({
|
|
"phone": ["(415) 555-12{:02d}".format(i) for i in range(50)],
|
|
})
|
|
opts = StandardizeOptions(
|
|
column_types={"phone": FieldType.PHONE},
|
|
audit_max_rows=5,
|
|
)
|
|
res = standardize_dataframe(df, opts)
|
|
# cells_changed counts everything; the audit table is capped.
|
|
assert res.cells_changed == 50
|
|
assert len(res.changes) == 5
|
|
|
|
def test_unbounded_audit(self):
|
|
df = pd.DataFrame({
|
|
"phone": ["(415) 555-12{:02d}".format(i) for i in range(20)],
|
|
})
|
|
opts = StandardizeOptions(
|
|
column_types={"phone": FieldType.PHONE},
|
|
audit_max_rows=None,
|
|
)
|
|
res = standardize_dataframe(df, opts)
|
|
assert len(res.changes) == 20
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cache + vectorized dispatch (correctness)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCacheCorrectness:
|
|
def test_repeated_phone_consistent(self):
|
|
# 1000 copies of the same phone should produce identical output.
|
|
df = pd.DataFrame({"phone": ["(415) 555-1234"] * 1000})
|
|
opts = StandardizeOptions(
|
|
column_types={"phone": FieldType.PHONE},
|
|
audit_max_rows=None,
|
|
)
|
|
res = standardize_dataframe(df, opts)
|
|
assert (res.standardized_df["phone"] == "+14155551234").all()
|
|
assert res.cells_changed == 1000
|
|
|
|
def test_cache_disabled_still_works(self):
|
|
df = pd.DataFrame({"phone": ["(415) 555-1234", "020 7946 0958"]})
|
|
opts = StandardizeOptions(
|
|
column_types={"phone": FieldType.PHONE},
|
|
cache_size=0, # disabled
|
|
)
|
|
res = standardize_dataframe(df, opts)
|
|
assert res.standardized_df["phone"].iloc[0] == "+14155551234"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Streaming standardize_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStandardizeFile:
|
|
def test_basic_streaming(self, tmp_path):
|
|
inp = tmp_path / "in.csv"
|
|
inp.write_text(
|
|
"phone,country,price\n"
|
|
"(415) 555-1234,US,$1500.00\n"
|
|
"020 7946 0958,GB,£99.99\n"
|
|
"03-3210-7000,JP,¥12000\n"
|
|
"+33 1 42 86 82 00,FR,€850.50\n"
|
|
)
|
|
out = tmp_path / "out.csv"
|
|
opts = StandardizeOptions(
|
|
column_types={"phone": FieldType.PHONE, "price": FieldType.CURRENCY},
|
|
phone_country_column="country",
|
|
currency_preserve_code=True,
|
|
)
|
|
res = standardize_file(inp, out, opts, chunk_size=2)
|
|
assert isinstance(res, StreamingStandardizeResult)
|
|
assert res.rows_processed == 4
|
|
assert res.chunks_processed == 2
|
|
assert out.exists()
|
|
out_df = pd.read_csv(out, dtype=str, keep_default_na=False)
|
|
assert out_df["phone"].iloc[0].startswith("+1")
|
|
assert out_df["phone"].iloc[1].startswith("+44")
|
|
assert out_df["phone"].iloc[2].startswith("+81")
|
|
assert out_df["phone"].iloc[3].startswith("+33")
|
|
|
|
def test_audit_capped_across_chunks(self, tmp_path):
|
|
# 60 rows, audit cap 10, chunks of 20 → audit must stop at 10.
|
|
inp = tmp_path / "in.csv"
|
|
rows = ["phone\n"] + [f"(415) 555-12{i:02d}\n" for i in range(60)]
|
|
inp.write_text("".join(rows))
|
|
out = tmp_path / "out.csv"
|
|
opts = StandardizeOptions(
|
|
column_types={"phone": FieldType.PHONE},
|
|
audit_max_rows=10,
|
|
)
|
|
res = standardize_file(inp, out, opts, chunk_size=20)
|
|
# Audit file exists and has exactly 10 data rows + 1 header.
|
|
audit_lines = res.audit_path.read_text().splitlines()
|
|
assert len(audit_lines) - 1 == 10
|
|
|
|
def test_audit_row_indices_are_global(self, tmp_path):
|
|
# Audit row numbers must reflect absolute file position, not chunk-local.
|
|
inp = tmp_path / "in.csv"
|
|
rows = ["phone\n"] + [f"(415) 555-12{i:02d}\n" for i in range(30)]
|
|
inp.write_text("".join(rows))
|
|
out = tmp_path / "out.csv"
|
|
opts = StandardizeOptions(
|
|
column_types={"phone": FieldType.PHONE},
|
|
audit_max_rows=None,
|
|
)
|
|
res = standardize_file(inp, out, opts, chunk_size=10)
|
|
audit = pd.read_csv(res.audit_path)
|
|
# Rows should be 0..29, monotonically increasing.
|
|
assert audit["row"].tolist() == list(range(30))
|
|
|
|
def test_progress_callback_fires(self, tmp_path):
|
|
inp = tmp_path / "in.csv"
|
|
inp.write_text("phone\n" + "\n".join("(415) 555-1234" for _ in range(20)) + "\n")
|
|
out = tmp_path / "out.csv"
|
|
opts = StandardizeOptions(column_types={"phone": FieldType.PHONE})
|
|
seen: list[tuple[int, int]] = []
|
|
def cb(rows, chunks):
|
|
seen.append((rows, chunks))
|
|
standardize_file(inp, out, opts, chunk_size=5, progress_callback=cb)
|
|
assert len(seen) == 4
|
|
assert seen[-1] == (20, 4)
|
|
|
|
def test_progress_callback_exception_does_not_abort(self, tmp_path):
|
|
inp = tmp_path / "in.csv"
|
|
inp.write_text("phone\n(415) 555-1234\n")
|
|
out = tmp_path / "out.csv"
|
|
opts = StandardizeOptions(column_types={"phone": FieldType.PHONE})
|
|
def bad_cb(*a, **k):
|
|
raise RuntimeError("boom")
|
|
# Must not raise.
|
|
res = standardize_file(inp, out, opts, chunk_size=1, progress_callback=bad_cb)
|
|
assert res.rows_processed == 1
|
|
|
|
def test_missing_input_raises_clean_error(self, tmp_path):
|
|
from src.core.errors import FileAccessError
|
|
opts = StandardizeOptions(column_types={"phone": FieldType.PHONE})
|
|
with pytest.raises(FileAccessError):
|
|
standardize_file(
|
|
tmp_path / "missing.csv",
|
|
tmp_path / "out.csv",
|
|
opts,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# International coverage smoke
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestInternationalCoverage:
|
|
@pytest.mark.parametrize("number,country,prefix", [
|
|
("020 7946 0958", "GB", "+44"),
|
|
("03-3210-7000", "JP", "+81"),
|
|
("+49 30 12345678", "DE", "+49"),
|
|
("01 42 86 82 00", "FR", "+33"),
|
|
("+39 06 6982", "IT", "+39"),
|
|
("+34 91 411 1111", "ES", "+34"),
|
|
("+86 10 1234 5678", "CN", "+86"),
|
|
("+91 11 2345 6789", "IN", "+91"),
|
|
("+61 2 9374 4000", "AU", "+61"),
|
|
("11 3071 0000", "BR", "+55"),
|
|
("+52 55 5555 0000", "MX", "+52"),
|
|
("+82 2 2287 0114", "KR", "+82"),
|
|
])
|
|
def test_phone_via_per_row_region(self, number, country, prefix):
|
|
df = pd.DataFrame({"phone": [number], "country": [country]})
|
|
opts = StandardizeOptions(
|
|
column_types={"phone": FieldType.PHONE},
|
|
phone_country_column="country",
|
|
)
|
|
res = standardize_dataframe(df, opts)
|
|
out = res.standardized_df["phone"].iloc[0]
|
|
assert out.startswith(prefix), (
|
|
f"{number!r} ({country}): expected to start with {prefix}, got {out!r}"
|
|
)
|
|
|
|
@pytest.mark.parametrize("price,want_code", [
|
|
("$1,500.00", "USD"),
|
|
("€850,50", "EUR"),
|
|
("£99.99", "GBP"),
|
|
("¥12000", "JPY"),
|
|
("R$ 250,00", "BRL"),
|
|
("CHF 1200.00", "CHF"),
|
|
])
|
|
def test_currency_codes_detected(self, price, want_code):
|
|
df = pd.DataFrame({"price": [price]})
|
|
opts = StandardizeOptions(
|
|
column_types={"price": FieldType.CURRENCY},
|
|
currency_preserve_code=True,
|
|
currency_decimal="auto", # international mode
|
|
)
|
|
res = standardize_dataframe(df, opts)
|
|
assert want_code in res.standardized_df["price"].iloc[0]
|