Sweep follow-up to 93e43fc. Display labels now consistent across docs,
landing pages, CLI output, code comments, docstrings, and test prose.
Five parallel surfaces touched:
- docs (EN + ES): README, USER-GUIDE, CLI-REFERENCE, and 11 internal
design/planning docs
- landing pages: index + bookkeeper/revops/shopify-pet
- src: CLI module docstrings, _TOOL_DISPLAY dicts in cli_analyze.py
and gui/components/_legacy.py, core module headers, every tool
page's module docstring
- tests: class/method/module docstrings and section-header comments
- test-cases READMEs
Page slugs (1_Deduplicator etc.), tool_id strings (01_deduplicator
etc.), Python class names (TestDeduplicatorWorkflow, FeatureFlag.*),
URL paths, anchor IDs, CSS classes, and asset filenames were left
intact since they're code identifiers / structural references.
All 2033 tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
241 lines
8.9 KiB
Python
241 lines
8.9 KiB
Python
"""Acceptance corpus for the Map Columns tool.
|
|
|
|
Loads every fixture in ``test-cases/column-mapper-corpus/test_data/``
|
|
and asserts the documented behaviour against the documented schema.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
import pytest
|
|
|
|
from src.core.errors import InputValidationError
|
|
from src.core.column_mapper import (
|
|
MapOptions,
|
|
TargetField,
|
|
TargetSchema,
|
|
map_columns,
|
|
)
|
|
|
|
CORPUS = Path(__file__).resolve().parents[1] / "test-cases" / "column-mapper-corpus"
|
|
TEST_DATA = CORPUS / "test_data"
|
|
SCHEMAS = CORPUS / "schemas"
|
|
|
|
|
|
def _read(name: str) -> pd.DataFrame:
|
|
return pd.read_csv(TEST_DATA / name)
|
|
|
|
|
|
def _schema(name: str) -> TargetSchema:
|
|
return TargetSchema.from_file(SCHEMAS / name)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# UC01 — CRM import
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUC01CrmImport:
|
|
def test_strict_schema_round_trip(self):
|
|
df = _read("uc01_crm_import.csv")
|
|
schema = _schema("uc01_crm_target.json")
|
|
opts = MapOptions.from_preset("strict-schema")
|
|
opts.schema = schema
|
|
res = map_columns(df, opts)
|
|
|
|
# Every required target is present after the run.
|
|
for f in schema.fields:
|
|
if f.required:
|
|
assert f.name in res.mapped_df.columns
|
|
|
|
# 'owner' default added.
|
|
assert "owner" in res.columns_added
|
|
assert (res.mapped_df["owner"] == "unassigned").all()
|
|
|
|
# No unmapped survivors (strict preset drops extras).
|
|
assert res.unmapped_kept == []
|
|
|
|
# Reordered to schema order.
|
|
expected_prefix = [f.name for f in schema.fields]
|
|
assert list(res.mapped_df.columns)[: len(expected_prefix)] == expected_prefix
|
|
|
|
def test_types_coerced_from_strings(self):
|
|
df = _read("uc01_crm_import.csv")
|
|
schema = _schema("uc01_crm_target.json")
|
|
opts = MapOptions.from_preset("strict-schema")
|
|
opts.schema = schema
|
|
res = map_columns(df, opts)
|
|
# annual_rev → integer (was numeric strings in the source).
|
|
assert pd.api.types.is_integer_dtype(res.mapped_df["annual_rev"])
|
|
# created_date → datetime64.
|
|
assert pd.api.types.is_datetime64_any_dtype(res.mapped_df["created_date"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# UC02 — Multi-vendor unification
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUC02MultiVendor:
|
|
@pytest.mark.parametrize("vendor", ["a", "b", "c"])
|
|
def test_each_vendor_normalises_to_canonical(self, vendor):
|
|
df = _read(f"uc02_vendor_{vendor}.csv")
|
|
schema = _schema("uc02_canonical.json")
|
|
opts = MapOptions.from_preset("lenient-schema")
|
|
opts.schema = schema
|
|
opts.fuzzy_threshold = 0.5 # vendor C uses obscure aliases ("FName", "Tel")
|
|
res = map_columns(df, opts)
|
|
# Every required canonical field landed in the output.
|
|
for f in schema.fields:
|
|
if f.required:
|
|
assert f.name in res.mapped_df.columns, (
|
|
f"vendor {vendor}: missing {f.name}; mapping={res.mapping}"
|
|
)
|
|
|
|
def test_concatenated_vendors_share_schema(self):
|
|
# The point of unification: after each vendor goes through the
|
|
# mapper, the resulting frames stack cleanly.
|
|
schema = _schema("uc02_canonical.json")
|
|
opts = MapOptions.from_preset("strict-schema")
|
|
opts.schema = schema
|
|
opts.fuzzy_threshold = 0.5
|
|
frames = [
|
|
map_columns(_read(f"uc02_vendor_{v}.csv"), opts).mapped_df
|
|
for v in ("a", "b", "c")
|
|
]
|
|
unified = pd.concat(frames, ignore_index=True)
|
|
assert list(unified.columns) == [f.name for f in schema.fields]
|
|
# Total rows = sum of inputs.
|
|
assert len(unified) == sum(len(f) for f in frames)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# UC03 — Type coercion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUC03TypeCoercion:
|
|
def test_documented_failures_are_reported(self):
|
|
df = _read("uc03_type_coercion.csv")
|
|
schema = _schema("uc03_types.json")
|
|
opts = MapOptions.from_preset("lenient-schema")
|
|
opts.schema = schema
|
|
res = map_columns(df, opts)
|
|
# Bad rows survive as NaN, with counts recorded.
|
|
assert res.coercion_failures.get("age") == 1
|
|
assert res.coercion_failures.get("score") == 1
|
|
assert res.coercion_failures.get("joined") == 1
|
|
|
|
def test_coerced_dtypes(self):
|
|
df = _read("uc03_type_coercion.csv")
|
|
schema = _schema("uc03_types.json")
|
|
opts = MapOptions.from_preset("lenient-schema")
|
|
opts.schema = schema
|
|
res = map_columns(df, opts)
|
|
out = res.mapped_df
|
|
assert pd.api.types.is_integer_dtype(out["id"])
|
|
assert out["active"].dtype.name == "boolean"
|
|
assert pd.api.types.is_datetime64_any_dtype(out["joined"])
|
|
# Float failures NaN-ify.
|
|
assert pd.isna(out["score"].iloc[1])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Edge cases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestEC01DuplicateTarget:
|
|
def test_two_sources_to_same_target_raises(self):
|
|
df = _read("ec01_duplicate_target.csv")
|
|
opts = MapOptions(mapping={"a": "x", "b": "x"})
|
|
with pytest.raises(InputValidationError):
|
|
map_columns(df, opts)
|
|
|
|
|
|
class TestEC02UnicodeColumns:
|
|
def test_japanese_column_renamed(self):
|
|
df = _read("ec02_unicode_columns.csv")
|
|
opts = MapOptions(mapping={"名前": "name", "価格": "price"})
|
|
res = map_columns(df, opts)
|
|
assert "name" in res.mapped_df.columns
|
|
assert "price" in res.mapped_df.columns
|
|
# Email passes through (unmapped, kept by default).
|
|
assert "Email" in res.mapped_df.columns
|
|
|
|
|
|
class TestEC03WhitespaceHeaders:
|
|
def test_header_whitespace_does_not_block_match(self):
|
|
df = _read("ec03_whitespace_headers.csv")
|
|
schema = TargetSchema(fields=[
|
|
TargetField(name="first_name", aliases=["First Name"]),
|
|
TargetField(name="last_name", aliases=["Last Name"]),
|
|
TargetField(name="email", aliases=["EmailAddr"]),
|
|
])
|
|
opts = MapOptions(schema=schema, auto_infer=True)
|
|
res = map_columns(df, opts)
|
|
# All three columns should map despite the leading/trailing spaces.
|
|
assert len(res.mapping) == 3
|
|
|
|
|
|
class TestEC04NoMatch:
|
|
def test_zero_inferred_with_no_match(self):
|
|
df = _read("ec04_no_match.csv")
|
|
schema = TargetSchema(fields=[
|
|
TargetField(name="email"), TargetField(name="phone"),
|
|
])
|
|
opts = MapOptions(schema=schema, auto_infer=True, unmapped="keep")
|
|
res = map_columns(df, opts)
|
|
assert res.inferred_pairs == {}
|
|
# Source columns survive as-is under keep.
|
|
assert set(df.columns) <= set(res.mapped_df.columns)
|
|
|
|
def test_no_match_with_unmapped_error(self):
|
|
df = _read("ec04_no_match.csv")
|
|
schema = TargetSchema(fields=[TargetField(name="email")])
|
|
opts = MapOptions(
|
|
schema=schema, auto_infer=True, unmapped="error",
|
|
enforce_required=False,
|
|
)
|
|
with pytest.raises(InputValidationError):
|
|
map_columns(df, opts)
|
|
|
|
|
|
class TestEC05RequiredMissing:
|
|
def test_required_missing_raises(self):
|
|
df = _read("ec05_required_missing.csv")
|
|
schema = TargetSchema(fields=[
|
|
TargetField(name="first_name", required=True),
|
|
TargetField(name="email", required=True),
|
|
])
|
|
opts = MapOptions(schema=schema, auto_infer=True, enforce_required=True)
|
|
with pytest.raises(InputValidationError):
|
|
map_columns(df, opts)
|
|
|
|
def test_disable_enforce_surfaces_in_result(self):
|
|
df = _read("ec05_required_missing.csv")
|
|
schema = TargetSchema(fields=[
|
|
TargetField(name="first_name", required=True),
|
|
TargetField(name="email", required=True),
|
|
])
|
|
opts = MapOptions(schema=schema, auto_infer=True, enforce_required=False)
|
|
res = map_columns(df, opts)
|
|
assert "email" in res.missing_required_targets
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Whole-corpus property tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
ALL_FIXTURES = sorted(p.name for p in TEST_DATA.glob("*.csv"))
|
|
|
|
|
|
@pytest.mark.parametrize("fixture", ALL_FIXTURES)
|
|
def test_map_columns_does_not_mutate_input(fixture):
|
|
df = pd.read_csv(TEST_DATA / fixture)
|
|
snapshot = df.copy(deep=True)
|
|
try:
|
|
map_columns(df, MapOptions()) # identity run; default options.
|
|
except InputValidationError:
|
|
pass # ec01 / ec05 raise here — fine, mutation is what we care about.
|
|
pd.testing.assert_frame_equal(df, snapshot)
|