Files
datatools-dev/tests/test_column_mapper_corpus.py
Michael db5ec084da docs+code: rename tool labels everywhere
Sweep follow-up to 93e43fc. Display labels now consistent across docs,
landing pages, CLI output, code comments, docstrings, and test prose.
Five parallel surfaces touched:

- docs (EN + ES): README, USER-GUIDE, CLI-REFERENCE, and 11 internal
  design/planning docs
- landing pages: index + bookkeeper/revops/shopify-pet
- src: CLI module docstrings, _TOOL_DISPLAY dicts in cli_analyze.py
  and gui/components/_legacy.py, core module headers, every tool
  page's module docstring
- tests: class/method/module docstrings and section-header comments
- test-cases READMEs

Page slugs (1_Deduplicator etc.), tool_id strings (01_deduplicator
etc.), Python class names (TestDeduplicatorWorkflow, FeatureFlag.*),
URL paths, anchor IDs, CSS classes, and asset filenames were left
intact since they're code identifiers / structural references.

All 2033 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 19:50:09 +00:00

241 lines
8.9 KiB
Python

"""Acceptance corpus for the Map Columns tool.
Loads every fixture in ``test-cases/column-mapper-corpus/test_data/``
and asserts the documented behaviour against the documented schema.
"""
from __future__ import annotations
import json
from pathlib import Path
import pandas as pd
import pytest
from src.core.errors import InputValidationError
from src.core.column_mapper import (
MapOptions,
TargetField,
TargetSchema,
map_columns,
)
CORPUS = Path(__file__).resolve().parents[1] / "test-cases" / "column-mapper-corpus"
TEST_DATA = CORPUS / "test_data"
SCHEMAS = CORPUS / "schemas"
def _read(name: str) -> pd.DataFrame:
return pd.read_csv(TEST_DATA / name)
def _schema(name: str) -> TargetSchema:
return TargetSchema.from_file(SCHEMAS / name)
# ---------------------------------------------------------------------------
# UC01 — CRM import
# ---------------------------------------------------------------------------
class TestUC01CrmImport:
def test_strict_schema_round_trip(self):
df = _read("uc01_crm_import.csv")
schema = _schema("uc01_crm_target.json")
opts = MapOptions.from_preset("strict-schema")
opts.schema = schema
res = map_columns(df, opts)
# Every required target is present after the run.
for f in schema.fields:
if f.required:
assert f.name in res.mapped_df.columns
# 'owner' default added.
assert "owner" in res.columns_added
assert (res.mapped_df["owner"] == "unassigned").all()
# No unmapped survivors (strict preset drops extras).
assert res.unmapped_kept == []
# Reordered to schema order.
expected_prefix = [f.name for f in schema.fields]
assert list(res.mapped_df.columns)[: len(expected_prefix)] == expected_prefix
def test_types_coerced_from_strings(self):
df = _read("uc01_crm_import.csv")
schema = _schema("uc01_crm_target.json")
opts = MapOptions.from_preset("strict-schema")
opts.schema = schema
res = map_columns(df, opts)
# annual_rev → integer (was numeric strings in the source).
assert pd.api.types.is_integer_dtype(res.mapped_df["annual_rev"])
# created_date → datetime64.
assert pd.api.types.is_datetime64_any_dtype(res.mapped_df["created_date"])
# ---------------------------------------------------------------------------
# UC02 — Multi-vendor unification
# ---------------------------------------------------------------------------
class TestUC02MultiVendor:
@pytest.mark.parametrize("vendor", ["a", "b", "c"])
def test_each_vendor_normalises_to_canonical(self, vendor):
df = _read(f"uc02_vendor_{vendor}.csv")
schema = _schema("uc02_canonical.json")
opts = MapOptions.from_preset("lenient-schema")
opts.schema = schema
opts.fuzzy_threshold = 0.5 # vendor C uses obscure aliases ("FName", "Tel")
res = map_columns(df, opts)
# Every required canonical field landed in the output.
for f in schema.fields:
if f.required:
assert f.name in res.mapped_df.columns, (
f"vendor {vendor}: missing {f.name}; mapping={res.mapping}"
)
def test_concatenated_vendors_share_schema(self):
# The point of unification: after each vendor goes through the
# mapper, the resulting frames stack cleanly.
schema = _schema("uc02_canonical.json")
opts = MapOptions.from_preset("strict-schema")
opts.schema = schema
opts.fuzzy_threshold = 0.5
frames = [
map_columns(_read(f"uc02_vendor_{v}.csv"), opts).mapped_df
for v in ("a", "b", "c")
]
unified = pd.concat(frames, ignore_index=True)
assert list(unified.columns) == [f.name for f in schema.fields]
# Total rows = sum of inputs.
assert len(unified) == sum(len(f) for f in frames)
# ---------------------------------------------------------------------------
# UC03 — Type coercion
# ---------------------------------------------------------------------------
class TestUC03TypeCoercion:
def test_documented_failures_are_reported(self):
df = _read("uc03_type_coercion.csv")
schema = _schema("uc03_types.json")
opts = MapOptions.from_preset("lenient-schema")
opts.schema = schema
res = map_columns(df, opts)
# Bad rows survive as NaN, with counts recorded.
assert res.coercion_failures.get("age") == 1
assert res.coercion_failures.get("score") == 1
assert res.coercion_failures.get("joined") == 1
def test_coerced_dtypes(self):
df = _read("uc03_type_coercion.csv")
schema = _schema("uc03_types.json")
opts = MapOptions.from_preset("lenient-schema")
opts.schema = schema
res = map_columns(df, opts)
out = res.mapped_df
assert pd.api.types.is_integer_dtype(out["id"])
assert out["active"].dtype.name == "boolean"
assert pd.api.types.is_datetime64_any_dtype(out["joined"])
# Float failures NaN-ify.
assert pd.isna(out["score"].iloc[1])
# ---------------------------------------------------------------------------
# Edge cases
# ---------------------------------------------------------------------------
class TestEC01DuplicateTarget:
def test_two_sources_to_same_target_raises(self):
df = _read("ec01_duplicate_target.csv")
opts = MapOptions(mapping={"a": "x", "b": "x"})
with pytest.raises(InputValidationError):
map_columns(df, opts)
class TestEC02UnicodeColumns:
def test_japanese_column_renamed(self):
df = _read("ec02_unicode_columns.csv")
opts = MapOptions(mapping={"名前": "name", "価格": "price"})
res = map_columns(df, opts)
assert "name" in res.mapped_df.columns
assert "price" in res.mapped_df.columns
# Email passes through (unmapped, kept by default).
assert "Email" in res.mapped_df.columns
class TestEC03WhitespaceHeaders:
def test_header_whitespace_does_not_block_match(self):
df = _read("ec03_whitespace_headers.csv")
schema = TargetSchema(fields=[
TargetField(name="first_name", aliases=["First Name"]),
TargetField(name="last_name", aliases=["Last Name"]),
TargetField(name="email", aliases=["EmailAddr"]),
])
opts = MapOptions(schema=schema, auto_infer=True)
res = map_columns(df, opts)
# All three columns should map despite the leading/trailing spaces.
assert len(res.mapping) == 3
class TestEC04NoMatch:
def test_zero_inferred_with_no_match(self):
df = _read("ec04_no_match.csv")
schema = TargetSchema(fields=[
TargetField(name="email"), TargetField(name="phone"),
])
opts = MapOptions(schema=schema, auto_infer=True, unmapped="keep")
res = map_columns(df, opts)
assert res.inferred_pairs == {}
# Source columns survive as-is under keep.
assert set(df.columns) <= set(res.mapped_df.columns)
def test_no_match_with_unmapped_error(self):
df = _read("ec04_no_match.csv")
schema = TargetSchema(fields=[TargetField(name="email")])
opts = MapOptions(
schema=schema, auto_infer=True, unmapped="error",
enforce_required=False,
)
with pytest.raises(InputValidationError):
map_columns(df, opts)
class TestEC05RequiredMissing:
def test_required_missing_raises(self):
df = _read("ec05_required_missing.csv")
schema = TargetSchema(fields=[
TargetField(name="first_name", required=True),
TargetField(name="email", required=True),
])
opts = MapOptions(schema=schema, auto_infer=True, enforce_required=True)
with pytest.raises(InputValidationError):
map_columns(df, opts)
def test_disable_enforce_surfaces_in_result(self):
df = _read("ec05_required_missing.csv")
schema = TargetSchema(fields=[
TargetField(name="first_name", required=True),
TargetField(name="email", required=True),
])
opts = MapOptions(schema=schema, auto_infer=True, enforce_required=False)
res = map_columns(df, opts)
assert "email" in res.missing_required_targets
# ---------------------------------------------------------------------------
# Whole-corpus property tests
# ---------------------------------------------------------------------------
ALL_FIXTURES = sorted(p.name for p in TEST_DATA.glob("*.csv"))
@pytest.mark.parametrize("fixture", ALL_FIXTURES)
def test_map_columns_does_not_mutate_input(fixture):
df = pd.read_csv(TEST_DATA / fixture)
snapshot = df.copy(deep=True)
try:
map_columns(df, MapOptions()) # identity run; default options.
except InputValidationError:
pass # ec01 / ec05 raise here — fine, mutation is what we care about.
pd.testing.assert_frame_equal(df, snapshot)