"""DataTools Column Mapper. Rename columns, enforce a target schema, coerce types, drop / add / reorder columns. Designed for the three buyer profiles the toolkit already serves: 1. **Schema enforcement** — analyst receives a CSV that has to fit a known target shape (a CRM import format, a database schema, a mailing-list contract). Map source columns to target names, coerce each to the declared type, drop the extras, fail clearly when a required target field is missing. 2. **Multi-source unification** — operator merges vendor/partner exports where every file uses different column names ("First Name" / "first_name" / "FirstName"). The fuzzy auto-mapper proposes a mapping; the user reviews and overrides. 3. **Type coercion** — quick conversion of mis-typed columns (string "123" → int, "true"/"yes" → bool, "2024-01-15" → date) without leaving the tool, with errors surfaced row-by-row. Public API ---------- Types: TargetField, TargetSchema, ColumnMapping, MapOptions, MapResult, ColumnDtype Functions: map_columns(df, options) -> MapResult infer_mapping(df, schema, *, threshold=0.6) -> dict[src, target] coerce_series(series, dtype) -> (Series, n_failures) Presets: PRESETS = {"rename-only", "strict-schema", "lenient-schema"} """ from __future__ import annotations import json import re from dataclasses import asdict, dataclass, field from pathlib import Path from typing import Any, Iterable, Literal, Optional import numpy as np import pandas as pd from loguru import logger from pandas.api import types as pdtypes from .errors import ConfigError, InputValidationError, ensure_choice, ensure_dataframe # --------------------------------------------------------------------------- # Types # --------------------------------------------------------------------------- ColumnDtype = Literal[ "string", "integer", "float", "boolean", "date", "datetime", "category", "auto", # leave dtype alone ] _VALID_DTYPES: frozenset[str] = frozenset({ "string", "integer", "float", "boolean", "date", "datetime", "category", "auto", }) @dataclass class TargetField: """One field in a target schema. Required fields whose source column is missing produce a ``MapResult.missing_required_targets`` entry rather than silently creating a NaN column. """ name: str dtype: ColumnDtype = "auto" required: bool = False aliases: list[str] = field(default_factory=list) default: Any = None @dataclass class TargetSchema: """Ordered list of target fields. Ordering survives into the result DataFrame.""" fields: list[TargetField] def field_names(self) -> list[str]: return [f.name for f in self.fields] def get(self, name: str) -> Optional[TargetField]: return next((f for f in self.fields if f.name == name), None) def to_dict(self) -> dict: return {"fields": [asdict(f) for f in self.fields]} def to_file(self, path: str | Path) -> Path: out = Path(path) out.write_text(json.dumps(self.to_dict(), indent=2, default=str)) return out @classmethod def from_dict(cls, data: dict) -> TargetSchema: if "fields" not in data: raise ConfigError( "Target schema must contain a 'fields' list", operation="TargetSchema.from_dict", suggestion='Example: {"fields": [{"name": "email", "dtype": "string", "required": true}, ...]}', ) fields = [] for entry in data["fields"]: if isinstance(entry, str): fields.append(TargetField(name=entry)) continue if "name" not in entry: raise ConfigError( f"Schema field is missing 'name': {entry!r}", operation="TargetSchema.from_dict", ) dtype = entry.get("dtype", "auto") if dtype not in _VALID_DTYPES: raise ConfigError( f"Schema field {entry['name']!r}: unknown dtype {dtype!r}", operation="TargetSchema.from_dict", suggestion=f"Valid: {sorted(_VALID_DTYPES)}", ) fields.append(TargetField( name=entry["name"], dtype=dtype, required=bool(entry.get("required", False)), aliases=list(entry.get("aliases", [])), default=entry.get("default"), )) return cls(fields=fields) @classmethod def from_file(cls, path: str | Path) -> TargetSchema: return cls.from_dict(json.loads(Path(path).read_text())) # --------------------------------------------------------------------------- # Fuzzy column-name matching # --------------------------------------------------------------------------- # Whitespace, punctuation, and case all vary across vendors. We normalise # both sides to a token list before comparing. _NORM_RE = re.compile(r"[^a-z0-9]+") def _normalize_name(name: str) -> str: """Lowercase, strip non-alphanumerics — ``First Name`` → ``firstname``.""" if not isinstance(name, str): return "" return _NORM_RE.sub("", name.strip().lower()) def _token_set(name: str) -> frozenset[str]: """Tokenise a column name on non-alphanumeric boundaries.""" if not isinstance(name, str): return frozenset() parts = [p for p in _NORM_RE.split(name.strip().lower()) if p] return frozenset(parts) def _name_similarity(a: str, b: str) -> float: """Cheap similarity score in [0.0, 1.0]. Combines exact-after-normalisation, token Jaccard, and SequenceMatcher ratio. A real fuzzy library (rapidfuzz) is already a project dependency for the deduplicator — we use it when available, fall back to stdlib ``difflib`` otherwise so the mapper works in trimmed builds. """ if not a or not b: return 0.0 na, nb = _normalize_name(a), _normalize_name(b) if na == nb: return 1.0 ta, tb = _token_set(a), _token_set(b) jaccard = (len(ta & tb) / len(ta | tb)) if (ta or tb) else 0.0 try: from rapidfuzz import fuzz seq = fuzz.ratio(na, nb) / 100.0 except ImportError: from difflib import SequenceMatcher seq = SequenceMatcher(None, na, nb).ratio() return max(jaccard, seq) def infer_mapping( df: pd.DataFrame, schema: TargetSchema, *, threshold: float = 0.6, ) -> dict[str, str]: """Best-guess source-column → target-field mapping. Returns a dict keyed by source-column name. A source column is omitted from the result when no candidate scores above *threshold*. Each target is matched at most once: the highest-scoring source wins, ties broken by source-column order in *df*. Aliases declared on a :class:`TargetField` are scored as if they were target names — useful for vendor-specific synonyms (``["customer_id", "cust_id", "client_no"]``). """ ensure_dataframe(df, function="infer_mapping") sources = list(df.columns) targets = schema.fields # All (source, target) candidate scores; keep only those above # threshold, sorted descending so a greedy walk picks the best # available pairings first. scored: list[tuple[float, str, str]] = [] for src in sources: for tgt in targets: best = _name_similarity(src, tgt.name) for alias in tgt.aliases: s = _name_similarity(src, alias) if s > best: best = s if best >= threshold: scored.append((best, str(src), tgt.name)) scored.sort(key=lambda x: (-x[0], sources.index(x[1]))) mapping: dict[str, str] = {} used_targets: set[str] = set() for score, src, tgt in scored: if src in mapping or tgt in used_targets: continue mapping[src] = tgt used_targets.add(tgt) return mapping # --------------------------------------------------------------------------- # Type coercion # --------------------------------------------------------------------------- _TRUTHY = frozenset({"true", "t", "yes", "y", "1"}) _FALSY = frozenset({"false", "f", "no", "n", "0"}) def _coerce_boolean(value: Any) -> Any: if isinstance(value, bool): return value if value is None or (isinstance(value, float) and pd.isna(value)): return pd.NA if isinstance(value, (int, float)): return bool(value) if isinstance(value, str): v = value.strip().lower() if v in _TRUTHY: return True if v in _FALSY: return False raise ValueError(f"cannot coerce to boolean: {value!r}") def coerce_series(series: pd.Series, dtype: ColumnDtype) -> tuple[pd.Series, int]: """Coerce *series* to *dtype*, returning ``(coerced, n_failures)``. Failures are counted but never raised — the caller (``map_columns``) surfaces them through ``MapResult.coercion_failures`` so the user can inspect which rows didn't fit. Already-typed inputs are cheap no-ops. """ if dtype == "auto": return series, 0 if dtype == "string": return series.astype("string"), 0 if dtype == "category": return series.astype("category"), 0 if dtype == "integer": coerced = pd.to_numeric(series, errors="coerce") # Use nullable Int64 so NaN entries don't get cast to floats. rounded = coerced.round().astype("Int64") # Failures = original non-NaN cells whose numeric coercion produced NaN. original_filled = series.notna() failed = (rounded.isna() & original_filled).sum() return rounded, int(failed) if dtype == "float": coerced = pd.to_numeric(series, errors="coerce").astype("Float64") original_filled = series.notna() failed = (coerced.isna() & original_filled).sum() return coerced, int(failed) if dtype == "boolean": out: list[Any] = [] failed = 0 for v in series.tolist(): try: out.append(_coerce_boolean(v)) except ValueError: out.append(pd.NA) failed += 1 return pd.Series(out, index=series.index, dtype="boolean"), failed if dtype in {"date", "datetime"}: coerced = pd.to_datetime(series, errors="coerce", utc=False) original_filled = series.notna() failed = (coerced.isna() & original_filled).sum() if dtype == "date": # Drop the time component but keep dtype as datetime64 so # downstream operations (delta, sort) still work. coerced = coerced.dt.normalize() return coerced, int(failed) raise InputValidationError( f"Unknown dtype {dtype!r}", operation="coerce_series", suggestion=f"Valid: {sorted(_VALID_DTYPES)}", ) # --------------------------------------------------------------------------- # Options / result dataclasses # --------------------------------------------------------------------------- # Strategy for handling source columns that don't appear in the target # schema. ``keep`` preserves them at the end of the output; ``drop`` # removes them; ``error`` raises an InputValidationError. UnmappedStrategy = Literal["keep", "drop", "error"] PRESETS: dict[str, dict[str, Any]] = { "rename-only": { "auto_infer": True, "unmapped": "keep", "coerce_types": False, "reorder_to_schema": False, }, "strict-schema": { "auto_infer": True, "unmapped": "drop", "coerce_types": True, "reorder_to_schema": True, }, "lenient-schema": { "auto_infer": True, "unmapped": "keep", "coerce_types": True, "reorder_to_schema": True, }, } @dataclass class MapOptions: """Toggles for column mapping. Defaults match the ``rename-only`` preset: best-effort fuzzy match against the schema (if provided), keep unmapped source columns after the mapped ones, no type coercion, no reorder. """ # Either pass an explicit ``mapping`` dict or a ``schema`` (and let # the engine infer the mapping). Explicit mapping wins when both # are set. mapping: dict[str, str] = field(default_factory=dict) schema: Optional[TargetSchema] = None # When True (default), missing entries in ``mapping`` are filled in # by ``infer_mapping`` against ``schema``. When False, only the # explicit mapping is honoured. auto_infer: bool = True fuzzy_threshold: float = 0.6 # What to do with source columns that aren't in the mapping. unmapped: UnmappedStrategy = "keep" # Apply target-field dtypes from the schema after rename. coerce_types: bool = False # Reorder output to match schema.fields order. Unmapped survivors # (when unmapped="keep") are appended at the end in their original # source order. reorder_to_schema: bool = False # Required-target enforcement. When True (default), a required # target field that has no source column raises an InputValidationError. # When False, the missing field is added with ``default`` value. enforce_required: bool = True @classmethod def from_preset(cls, name: str) -> MapOptions: if name not in PRESETS: raise ConfigError( f"Unknown preset '{name}'", operation="MapOptions.from_preset", suggestion=f"Available: {sorted(PRESETS)}", ) return cls(**PRESETS[name]) @classmethod def from_dict(cls, data: dict) -> MapOptions: known = set(cls.__dataclass_fields__) kwargs = {k: v for k, v in data.items() if k in known} if "schema" in kwargs and isinstance(kwargs["schema"], dict): kwargs["schema"] = TargetSchema.from_dict(kwargs["schema"]) return cls(**kwargs) def to_dict(self) -> dict: out: dict[str, Any] = { "mapping": dict(self.mapping), "auto_infer": self.auto_infer, "fuzzy_threshold": self.fuzzy_threshold, "unmapped": self.unmapped, "coerce_types": self.coerce_types, "reorder_to_schema": self.reorder_to_schema, "enforce_required": self.enforce_required, } if self.schema is not None: out["schema"] = self.schema.to_dict() return out def to_file(self, path: str | Path) -> Path: out = Path(path) out.write_text(json.dumps(self.to_dict(), indent=2, default=str)) return out @classmethod def from_file(cls, path: str | Path) -> MapOptions: return cls.from_dict(json.loads(Path(path).read_text())) def validate(self) -> None: ensure_choice( self.unmapped, name="unmapped", choices=("keep", "drop", "error"), function="MapOptions.validate", ) if not (0.0 <= self.fuzzy_threshold <= 1.0): raise ConfigError( f"fuzzy_threshold must be in [0.0, 1.0], got {self.fuzzy_threshold!r}", operation="MapOptions.validate", ) @dataclass class MapResult: """Output of ``map_columns``.""" mapped_df: pd.DataFrame mapping: dict[str, str] # source → target inferred_pairs: dict[str, str] # subset of mapping that was auto-inferred columns_renamed: int columns_dropped: list[str] columns_added: list[str] # required-defaulted fields added with default value coercion_failures: dict[str, int] # column → n_rows_that_failed_coercion unmapped_kept: list[str] missing_required_targets: list[str] # --------------------------------------------------------------------------- # Main entry point # --------------------------------------------------------------------------- def map_columns( df: pd.DataFrame, options: Optional[MapOptions] = None, ) -> MapResult: """Apply *options* to *df* and return a :class:`MapResult`. Pipeline placement (recommended, not enforced) ---------------------------------------------- Two natural slots: * **Early** — header alignment for multi-vendor unification. Each vendor uses different column names; rename to a canonical schema before any other tool runs. * **Late** — schema enforcement for output. After cleaning, coerce types and project to the target shape (CRM import contract, database schema). Run after format / missing so the coerced data is canonical first. The pipeline runner does not enforce a position; place by use case. Pipeline: 1. Compose mapping (explicit ``options.mapping`` ∪ inferred pairs from ``options.schema``). 2. Reject duplicate target names — two source columns mapped to the same target is a user error, not a silent overwrite. 3. Decide what to do with unmapped source columns (``keep`` / ``drop`` / ``error``). 4. Rename, then handle missing required targets, then coerce types, then reorder. """ ensure_dataframe(df, function="map_columns") options = options or MapOptions() options.validate() # ------------------------------------------------------------------ # 1. Compose the effective mapping # ------------------------------------------------------------------ explicit = dict(options.mapping) inferred: dict[str, str] = {} if options.schema is not None and options.auto_infer: all_inferred = infer_mapping(df, options.schema, threshold=options.fuzzy_threshold) # Explicit user pairings always win. used_targets = set(explicit.values()) for src, tgt in all_inferred.items(): if src in explicit: continue if tgt in used_targets: continue inferred[src] = tgt used_targets.add(tgt) mapping: dict[str, str] = {**inferred, **explicit} # ------------------------------------------------------------------ # 2. Validate mapping coherence # ------------------------------------------------------------------ unknown_sources = [s for s in mapping if s not in df.columns] if unknown_sources: raise InputValidationError( f"Mapping references columns not in input: {unknown_sources}", operation="map_columns", suggestion=f"Available source columns: {list(df.columns)}", ) target_counts: dict[str, int] = {} for tgt in mapping.values(): target_counts[tgt] = target_counts.get(tgt, 0) + 1 duplicates = [t for t, n in target_counts.items() if n > 1] if duplicates: raise InputValidationError( f"Multiple source columns mapped to the same target(s): {duplicates}", operation="map_columns", suggestion="Each target name must be unique. Drop or rename the conflicting source columns.", ) # ------------------------------------------------------------------ # 3. Handle unmapped source columns # ------------------------------------------------------------------ unmapped_sources = [c for c in df.columns if c not in mapping] unmapped_kept: list[str] = [] columns_dropped: list[str] = [] if unmapped_sources: if options.unmapped == "drop": columns_dropped = list(unmapped_sources) elif options.unmapped == "error": raise InputValidationError( f"Source columns have no mapping and unmapped='error': {unmapped_sources}", operation="map_columns", suggestion=( "Either add explicit mapping entries, set unmapped='keep' / 'drop', " "or include the columns in the target schema." ), ) else: unmapped_kept = list(unmapped_sources) # ------------------------------------------------------------------ # 4. Apply rename and drop # ------------------------------------------------------------------ out = df.copy() if columns_dropped: out = out.drop(columns=columns_dropped) if mapping: out = out.rename(columns=mapping) columns_renamed = sum(1 for src, tgt in mapping.items() if src != tgt) # ------------------------------------------------------------------ # 5. Handle the schema's required + default fields # ------------------------------------------------------------------ columns_added: list[str] = [] missing_required: list[str] = [] if options.schema is not None: present = set(out.columns) for tf in options.schema.fields: if tf.name in present: continue if tf.required and tf.default is None: missing_required.append(tf.name) continue # Add with default value (NaN if no default). out[tf.name] = tf.default if tf.default is not None else pd.NA columns_added.append(tf.name) if missing_required and options.enforce_required: raise InputValidationError( f"Required target field(s) missing from input: {missing_required}", operation="map_columns", suggestion=( "Either add explicit mapping entries, lower fuzzy_threshold, " "supply a default in the schema, or set enforce_required=False." ), ) # ------------------------------------------------------------------ # 6. Coerce types per the schema # ------------------------------------------------------------------ coercion_failures: dict[str, int] = {} if options.coerce_types and options.schema is not None: for tf in options.schema.fields: if tf.name not in out.columns or tf.dtype == "auto": continue try: series, fails = coerce_series(out[tf.name], tf.dtype) except (ValueError, TypeError) as e: logger.warning( "map_columns: coerce of {!r} → {} failed: {}", tf.name, tf.dtype, e, ) continue out[tf.name] = series if fails: coercion_failures[tf.name] = fails # ------------------------------------------------------------------ # 7. Reorder # ------------------------------------------------------------------ if options.reorder_to_schema and options.schema is not None: ordered = [f.name for f in options.schema.fields if f.name in out.columns] # Append survivors (kept-unmapped originals) in their pre-rename order. survivors = [c for c in out.columns if c not in ordered] out = out.loc[:, ordered + survivors] return MapResult( mapped_df=out, mapping=mapping, inferred_pairs=inferred, columns_renamed=columns_renamed, columns_dropped=columns_dropped, columns_added=columns_added, coercion_failures=coercion_failures, unmapped_kept=unmapped_kept, missing_required_targets=missing_required, )