"""Acceptance corpus for the Column Mapper. Loads every fixture in ``test-cases/column-mapper-corpus/test_data/`` and asserts the documented behaviour against the documented schema. """ from __future__ import annotations import json from pathlib import Path import pandas as pd import pytest from src.core.errors import InputValidationError from src.core.column_mapper import ( MapOptions, TargetField, TargetSchema, map_columns, ) CORPUS = Path(__file__).resolve().parents[1] / "test-cases" / "column-mapper-corpus" TEST_DATA = CORPUS / "test_data" SCHEMAS = CORPUS / "schemas" def _read(name: str) -> pd.DataFrame: return pd.read_csv(TEST_DATA / name) def _schema(name: str) -> TargetSchema: return TargetSchema.from_file(SCHEMAS / name) # --------------------------------------------------------------------------- # UC01 — CRM import # --------------------------------------------------------------------------- class TestUC01CrmImport: def test_strict_schema_round_trip(self): df = _read("uc01_crm_import.csv") schema = _schema("uc01_crm_target.json") opts = MapOptions.from_preset("strict-schema") opts.schema = schema res = map_columns(df, opts) # Every required target is present after the run. for f in schema.fields: if f.required: assert f.name in res.mapped_df.columns # 'owner' default added. assert "owner" in res.columns_added assert (res.mapped_df["owner"] == "unassigned").all() # No unmapped survivors (strict preset drops extras). assert res.unmapped_kept == [] # Reordered to schema order. expected_prefix = [f.name for f in schema.fields] assert list(res.mapped_df.columns)[: len(expected_prefix)] == expected_prefix def test_types_coerced_from_strings(self): df = _read("uc01_crm_import.csv") schema = _schema("uc01_crm_target.json") opts = MapOptions.from_preset("strict-schema") opts.schema = schema res = map_columns(df, opts) # annual_rev → integer (was numeric strings in the source). assert pd.api.types.is_integer_dtype(res.mapped_df["annual_rev"]) # created_date → datetime64. assert pd.api.types.is_datetime64_any_dtype(res.mapped_df["created_date"]) # --------------------------------------------------------------------------- # UC02 — Multi-vendor unification # --------------------------------------------------------------------------- class TestUC02MultiVendor: @pytest.mark.parametrize("vendor", ["a", "b", "c"]) def test_each_vendor_normalises_to_canonical(self, vendor): df = _read(f"uc02_vendor_{vendor}.csv") schema = _schema("uc02_canonical.json") opts = MapOptions.from_preset("lenient-schema") opts.schema = schema opts.fuzzy_threshold = 0.5 # vendor C uses obscure aliases ("FName", "Tel") res = map_columns(df, opts) # Every required canonical field landed in the output. for f in schema.fields: if f.required: assert f.name in res.mapped_df.columns, ( f"vendor {vendor}: missing {f.name}; mapping={res.mapping}" ) def test_concatenated_vendors_share_schema(self): # The point of unification: after each vendor goes through the # mapper, the resulting frames stack cleanly. schema = _schema("uc02_canonical.json") opts = MapOptions.from_preset("strict-schema") opts.schema = schema opts.fuzzy_threshold = 0.5 frames = [ map_columns(_read(f"uc02_vendor_{v}.csv"), opts).mapped_df for v in ("a", "b", "c") ] unified = pd.concat(frames, ignore_index=True) assert list(unified.columns) == [f.name for f in schema.fields] # Total rows = sum of inputs. assert len(unified) == sum(len(f) for f in frames) # --------------------------------------------------------------------------- # UC03 — Type coercion # --------------------------------------------------------------------------- class TestUC03TypeCoercion: def test_documented_failures_are_reported(self): df = _read("uc03_type_coercion.csv") schema = _schema("uc03_types.json") opts = MapOptions.from_preset("lenient-schema") opts.schema = schema res = map_columns(df, opts) # Bad rows survive as NaN, with counts recorded. assert res.coercion_failures.get("age") == 1 assert res.coercion_failures.get("score") == 1 assert res.coercion_failures.get("joined") == 1 def test_coerced_dtypes(self): df = _read("uc03_type_coercion.csv") schema = _schema("uc03_types.json") opts = MapOptions.from_preset("lenient-schema") opts.schema = schema res = map_columns(df, opts) out = res.mapped_df assert pd.api.types.is_integer_dtype(out["id"]) assert out["active"].dtype.name == "boolean" assert pd.api.types.is_datetime64_any_dtype(out["joined"]) # Float failures NaN-ify. assert pd.isna(out["score"].iloc[1]) # --------------------------------------------------------------------------- # Edge cases # --------------------------------------------------------------------------- class TestEC01DuplicateTarget: def test_two_sources_to_same_target_raises(self): df = _read("ec01_duplicate_target.csv") opts = MapOptions(mapping={"a": "x", "b": "x"}) with pytest.raises(InputValidationError): map_columns(df, opts) class TestEC02UnicodeColumns: def test_japanese_column_renamed(self): df = _read("ec02_unicode_columns.csv") opts = MapOptions(mapping={"名前": "name", "価格": "price"}) res = map_columns(df, opts) assert "name" in res.mapped_df.columns assert "price" in res.mapped_df.columns # Email passes through (unmapped, kept by default). assert "Email" in res.mapped_df.columns class TestEC03WhitespaceHeaders: def test_header_whitespace_does_not_block_match(self): df = _read("ec03_whitespace_headers.csv") schema = TargetSchema(fields=[ TargetField(name="first_name", aliases=["First Name"]), TargetField(name="last_name", aliases=["Last Name"]), TargetField(name="email", aliases=["EmailAddr"]), ]) opts = MapOptions(schema=schema, auto_infer=True) res = map_columns(df, opts) # All three columns should map despite the leading/trailing spaces. assert len(res.mapping) == 3 class TestEC04NoMatch: def test_zero_inferred_with_no_match(self): df = _read("ec04_no_match.csv") schema = TargetSchema(fields=[ TargetField(name="email"), TargetField(name="phone"), ]) opts = MapOptions(schema=schema, auto_infer=True, unmapped="keep") res = map_columns(df, opts) assert res.inferred_pairs == {} # Source columns survive as-is under keep. assert set(df.columns) <= set(res.mapped_df.columns) def test_no_match_with_unmapped_error(self): df = _read("ec04_no_match.csv") schema = TargetSchema(fields=[TargetField(name="email")]) opts = MapOptions( schema=schema, auto_infer=True, unmapped="error", enforce_required=False, ) with pytest.raises(InputValidationError): map_columns(df, opts) class TestEC05RequiredMissing: def test_required_missing_raises(self): df = _read("ec05_required_missing.csv") schema = TargetSchema(fields=[ TargetField(name="first_name", required=True), TargetField(name="email", required=True), ]) opts = MapOptions(schema=schema, auto_infer=True, enforce_required=True) with pytest.raises(InputValidationError): map_columns(df, opts) def test_disable_enforce_surfaces_in_result(self): df = _read("ec05_required_missing.csv") schema = TargetSchema(fields=[ TargetField(name="first_name", required=True), TargetField(name="email", required=True), ]) opts = MapOptions(schema=schema, auto_infer=True, enforce_required=False) res = map_columns(df, opts) assert "email" in res.missing_required_targets # --------------------------------------------------------------------------- # Whole-corpus property tests # --------------------------------------------------------------------------- ALL_FIXTURES = sorted(p.name for p in TEST_DATA.glob("*.csv")) @pytest.mark.parametrize("fixture", ALL_FIXTURES) def test_map_columns_does_not_mutate_input(fixture): df = pd.read_csv(TEST_DATA / fixture) snapshot = df.copy(deep=True) try: map_columns(df, MapOptions()) # identity run; default options. except InputValidationError: pass # ec01 / ec05 raise here — fine, mutation is what we care about. pd.testing.assert_frame_equal(df, snapshot)