"""Tests for the format-standardizer rework: cache, vectorized dispatch, per-row country, audit cap, and streaming entry point.""" from __future__ import annotations import csv from pathlib import Path import pandas as pd import pytest from src.core.format_standardize import ( FieldType, StandardizeOptions, StreamingStandardizeResult, _normalize_region, standardize_dataframe, standardize_file, ) # --------------------------------------------------------------------------- # Per-row country / region # --------------------------------------------------------------------------- class TestPerRowCountry: def test_phone_uses_per_row_country(self): df = pd.DataFrame({ "phone": ["020 7946 0958", "03-3210-7000", "(415) 555-1234"], "country": ["GB", "JP", "US"], }) opts = StandardizeOptions( column_types={"phone": FieldType.PHONE}, phone_country_column="country", ) res = standardize_dataframe(df, opts) out = res.standardized_df["phone"].tolist() assert out[0].startswith("+44") assert out[1].startswith("+81") assert out[2].startswith("+1") def test_phone_country_full_name_resolved(self): df = pd.DataFrame({ "phone": ["020 7946 0958"], "country": ["United Kingdom"], }) opts = StandardizeOptions( column_types={"phone": FieldType.PHONE}, phone_country_column="country", ) res = standardize_dataframe(df, opts) assert res.standardized_df["phone"].iloc[0].startswith("+44") def test_blank_country_falls_back_to_default(self): df = pd.DataFrame({ "phone": ["(415) 555-1234"], "country": [""], # blank → use default region }) opts = StandardizeOptions( column_types={"phone": FieldType.PHONE}, phone_country_column="country", phone_region="US", ) res = standardize_dataframe(df, opts) assert res.standardized_df["phone"].iloc[0] == "+14155551234" def test_unknown_country_column_raises(self): df = pd.DataFrame({"phone": ["x"]}) opts = StandardizeOptions( column_types={"phone": FieldType.PHONE}, phone_country_column="missing_col", ) from src.core.errors import InputValidationError with pytest.raises(InputValidationError): standardize_dataframe(df, opts) class TestNormalizeRegion: def test_iso2_passthrough(self): assert _normalize_region("US") == "US" assert _normalize_region("us") == "US" assert _normalize_region(" jp ") == "JP" def test_iso3_mapped(self): assert _normalize_region("USA") == "US" assert _normalize_region("GBR") == "GB" assert _normalize_region("JPN") == "JP" def test_full_name(self): assert _normalize_region("United States") == "US" assert _normalize_region("Japan") == "JP" assert _normalize_region("Brazil") == "BR" assert _normalize_region("brasil") == "BR" assert _normalize_region("España") == "ES" def test_blank_or_unknown(self): assert _normalize_region("") is None assert _normalize_region(" ") is None assert _normalize_region(None) is None assert _normalize_region("xyz-no-such-country") is None # --------------------------------------------------------------------------- # Audit cap # --------------------------------------------------------------------------- class TestAuditCap: def test_cap_truncates_change_rows(self): df = pd.DataFrame({ "phone": ["(415) 555-12{:02d}".format(i) for i in range(50)], }) opts = StandardizeOptions( column_types={"phone": FieldType.PHONE}, audit_max_rows=5, ) res = standardize_dataframe(df, opts) # cells_changed counts everything; the audit table is capped. assert res.cells_changed == 50 assert len(res.changes) == 5 def test_unbounded_audit(self): df = pd.DataFrame({ "phone": ["(415) 555-12{:02d}".format(i) for i in range(20)], }) opts = StandardizeOptions( column_types={"phone": FieldType.PHONE}, audit_max_rows=None, ) res = standardize_dataframe(df, opts) assert len(res.changes) == 20 # --------------------------------------------------------------------------- # Cache + vectorized dispatch (correctness) # --------------------------------------------------------------------------- class TestCacheCorrectness: def test_repeated_phone_consistent(self): # 1000 copies of the same phone should produce identical output. df = pd.DataFrame({"phone": ["(415) 555-1234"] * 1000}) opts = StandardizeOptions( column_types={"phone": FieldType.PHONE}, audit_max_rows=None, ) res = standardize_dataframe(df, opts) assert (res.standardized_df["phone"] == "+14155551234").all() assert res.cells_changed == 1000 def test_cache_disabled_still_works(self): df = pd.DataFrame({"phone": ["(415) 555-1234", "020 7946 0958"]}) opts = StandardizeOptions( column_types={"phone": FieldType.PHONE}, cache_size=0, # disabled ) res = standardize_dataframe(df, opts) assert res.standardized_df["phone"].iloc[0] == "+14155551234" # --------------------------------------------------------------------------- # Streaming standardize_file # --------------------------------------------------------------------------- class TestStandardizeFile: def test_basic_streaming(self, tmp_path): inp = tmp_path / "in.csv" inp.write_text( "phone,country,price\n" "(415) 555-1234,US,$1500.00\n" "020 7946 0958,GB,£99.99\n" "03-3210-7000,JP,¥12000\n" "+33 1 42 86 82 00,FR,€850.50\n" ) out = tmp_path / "out.csv" opts = StandardizeOptions( column_types={"phone": FieldType.PHONE, "price": FieldType.CURRENCY}, phone_country_column="country", currency_preserve_code=True, ) res = standardize_file(inp, out, opts, chunk_size=2) assert isinstance(res, StreamingStandardizeResult) assert res.rows_processed == 4 assert res.chunks_processed == 2 assert out.exists() out_df = pd.read_csv(out, dtype=str, keep_default_na=False) assert out_df["phone"].iloc[0].startswith("+1") assert out_df["phone"].iloc[1].startswith("+44") assert out_df["phone"].iloc[2].startswith("+81") assert out_df["phone"].iloc[3].startswith("+33") def test_audit_capped_across_chunks(self, tmp_path): # 60 rows, audit cap 10, chunks of 20 → audit must stop at 10. inp = tmp_path / "in.csv" rows = ["phone\n"] + [f"(415) 555-12{i:02d}\n" for i in range(60)] inp.write_text("".join(rows)) out = tmp_path / "out.csv" opts = StandardizeOptions( column_types={"phone": FieldType.PHONE}, audit_max_rows=10, ) res = standardize_file(inp, out, opts, chunk_size=20) # Audit file exists and has exactly 10 data rows + 1 header. audit_lines = res.audit_path.read_text().splitlines() assert len(audit_lines) - 1 == 10 def test_audit_row_indices_are_global(self, tmp_path): # Audit row numbers must reflect absolute file position, not chunk-local. inp = tmp_path / "in.csv" rows = ["phone\n"] + [f"(415) 555-12{i:02d}\n" for i in range(30)] inp.write_text("".join(rows)) out = tmp_path / "out.csv" opts = StandardizeOptions( column_types={"phone": FieldType.PHONE}, audit_max_rows=None, ) res = standardize_file(inp, out, opts, chunk_size=10) audit = pd.read_csv(res.audit_path) # Rows should be 0..29, monotonically increasing. assert audit["row"].tolist() == list(range(30)) def test_progress_callback_fires(self, tmp_path): inp = tmp_path / "in.csv" inp.write_text("phone\n" + "\n".join("(415) 555-1234" for _ in range(20)) + "\n") out = tmp_path / "out.csv" opts = StandardizeOptions(column_types={"phone": FieldType.PHONE}) seen: list[tuple[int, int]] = [] def cb(rows, chunks): seen.append((rows, chunks)) standardize_file(inp, out, opts, chunk_size=5, progress_callback=cb) assert len(seen) == 4 assert seen[-1] == (20, 4) def test_progress_callback_exception_does_not_abort(self, tmp_path): inp = tmp_path / "in.csv" inp.write_text("phone\n(415) 555-1234\n") out = tmp_path / "out.csv" opts = StandardizeOptions(column_types={"phone": FieldType.PHONE}) def bad_cb(*a, **k): raise RuntimeError("boom") # Must not raise. res = standardize_file(inp, out, opts, chunk_size=1, progress_callback=bad_cb) assert res.rows_processed == 1 def test_missing_input_raises_clean_error(self, tmp_path): from src.core.errors import FileAccessError opts = StandardizeOptions(column_types={"phone": FieldType.PHONE}) with pytest.raises(FileAccessError): standardize_file( tmp_path / "missing.csv", tmp_path / "out.csv", opts, ) # --------------------------------------------------------------------------- # International coverage smoke # --------------------------------------------------------------------------- class TestInternationalCoverage: @pytest.mark.parametrize("number,country,prefix", [ ("020 7946 0958", "GB", "+44"), ("03-3210-7000", "JP", "+81"), ("+49 30 12345678", "DE", "+49"), ("01 42 86 82 00", "FR", "+33"), ("+39 06 6982", "IT", "+39"), ("+34 91 411 1111", "ES", "+34"), ("+86 10 1234 5678", "CN", "+86"), ("+91 11 2345 6789", "IN", "+91"), ("+61 2 9374 4000", "AU", "+61"), ("11 3071 0000", "BR", "+55"), ("+52 55 5555 0000", "MX", "+52"), ("+82 2 2287 0114", "KR", "+82"), ]) def test_phone_via_per_row_region(self, number, country, prefix): df = pd.DataFrame({"phone": [number], "country": [country]}) opts = StandardizeOptions( column_types={"phone": FieldType.PHONE}, phone_country_column="country", ) res = standardize_dataframe(df, opts) out = res.standardized_df["phone"].iloc[0] assert out.startswith(prefix), ( f"{number!r} ({country}): expected to start with {prefix}, got {out!r}" ) @pytest.mark.parametrize("price,want_code", [ ("$1,500.00", "USD"), ("€850,50", "EUR"), ("£99.99", "GBP"), ("¥12000", "JPY"), ("R$ 250,00", "BRL"), ("CHF 1200.00", "CHF"), ]) def test_currency_codes_detected(self, price, want_code): df = pd.DataFrame({"price": [price]}) opts = StandardizeOptions( column_types={"price": FieldType.CURRENCY}, currency_preserve_code=True, currency_decimal="auto", # international mode ) res = standardize_dataframe(df, opts) assert want_code in res.standardized_df["price"].iloc[0]