refactor: dedup, consolidate, harden public APIs across core modules

Closes 16 high-value findings from a parallel cross-module review.

Refactors:
- New src/core/_constants.py centralizes USPS street-suffix
  abbreviations, US state names, and 2-letter postal codes — one source
  of truth for both normalize_address (matching keys) and
  standardize_address (display formatting). Eliminates ~80 lines of
  duplicated dicts across normalizers.py and format_standardize.py.
- format_standardize.py: collapse 4 identical nested _err() helpers
  into one shared _err_or_passthrough() module function; drop a dead
  duplicate `return _err("not a phone number")` branch in
  standardize_phone.
- format_standardize.py: precompile per-locale month-name regexes
  (_MONTH_LOCALE_PATTERNS) and per-state-name regexes
  (_STATE_NAME_PATTERNS) at import time — they were rebuilt on every
  cell, a measurable hot path on million-row inputs.
- dedup.py: extract _is_missing(value) helper; one definition of
  "this cell is None / NaN / pd.NA" instead of two.
- fixes.py: extract _is_string_column(ser) helper; one dtype check
  instead of three duplicates across _apply_to_strings,
  _vectorized_translate, _vectorized_regex_sub.

Production-readiness:
- format_standardize.standardize_dataframe now logs a warning when
  more than 10% of typed cells are unparseable — surfaces the
  silently-broken-pipeline failure mode.
- StandardizeOptions.from_dict validates date_order / phone_format /
  currency_decimal / name_case / boolean_style / *_error_policy
  enum values up front, with a clear error message instead of a deep
  crash inside the per-cell function.
- StandardizeOptions.from_file and DeduplicationConfig.from_file wrap
  read + json.loads with descriptive OSError / ValueError messages
  including the file path.
- standardize_date(month_locales=...) validates locale codes against
  the available set instead of silently passing through unknown ones.
- io.read_file rejects chunk_size <= 0 (was silently failing inside
  pandas) and logs the resolved suffix + chunk_size at info level so
  data-pipeline runs are debuggable.
- io.read_file's FileNotFoundError gains explanatory context.
- io.write_file, text_clean.clean_dataframe, and dedup.deduplicate
  now reject non-DataFrame inputs with clear TypeError instead of
  cryptic pandas tracebacks downstream.
- dedup.deduplicate validates that survivor_rule=KEEP_MOST_RECENT has
  a usable date_column up front; the helper _select_survivor now
  raises (instead of silently falling back to keep_first) when called
  directly with bad arguments.
- dedup.deduplicate gains a structured no-op return when strategies
  is empty after auto-detection — preserves schema instead of crashing.
- analyze._detect_inconsistent_date_format narrows its bare except to
  (TypeError, ValueError) and logs a debug line so genuine bugs don't
  hide behind silent skip.

Tests:
- tests/test_audit_fixes.py grows by 11 cases covering the new
  validation paths (chunk_size, DataFrame guards, KEEP_MOST_RECENT
  date_column, enum validation, locale validation, JSON error wrapping).

Full project suite: 1208 passed, 4 skipped, 17 xfailed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-01 02:23:09 +00:00
parent b23a27d4e3
commit 2eece6467d
10 changed files with 457 additions and 231 deletions

103
src/core/_constants.py Normal file
View File

@@ -0,0 +1,103 @@
"""Shared lookup tables used by both the dedup-side normalizers and the
display-side format standardizer.
Both modules need the same authoritative mapping of US state names →
2-letter postal codes and USPS street-suffix abbreviations. Keeping a
single source of truth avoids the bug-class where the two layers drift
("Drive""Dr" in one but "Drv" in the other).
Per-module aliases (``_US_STATE_NAMES_NORM`` lowercase-only for matching
keys vs. ``_US_STATE_NAMES`` casefold-on-lookup for display) are still
defined locally to preserve historical behavior; both derive from the
canonical maps below.
"""
from __future__ import annotations
# US state name (lowercase) → 2-letter postal code.
US_STATE_NAMES: dict[str, str] = {
"alabama": "AL", "alaska": "AK", "arizona": "AZ", "arkansas": "AR",
"california": "CA", "colorado": "CO", "connecticut": "CT",
"delaware": "DE", "florida": "FL", "georgia": "GA", "hawaii": "HI",
"idaho": "ID", "illinois": "IL", "indiana": "IN", "iowa": "IA",
"kansas": "KS", "kentucky": "KY", "louisiana": "LA", "maine": "ME",
"maryland": "MD", "massachusetts": "MA", "michigan": "MI",
"minnesota": "MN", "mississippi": "MS", "missouri": "MO",
"montana": "MT", "nebraska": "NE", "nevada": "NV",
"new hampshire": "NH", "new jersey": "NJ", "new mexico": "NM",
"new york": "NY", "north carolina": "NC", "north dakota": "ND",
"ohio": "OH", "oklahoma": "OK", "oregon": "OR", "pennsylvania": "PA",
"rhode island": "RI", "south carolina": "SC", "south dakota": "SD",
"tennessee": "TN", "texas": "TX", "utah": "UT", "vermont": "VT",
"virginia": "VA", "washington": "WA", "west virginia": "WV",
"wisconsin": "WI", "wyoming": "WY",
"district of columbia": "DC",
}
# 2-letter US state postal codes (incl. territories). The placeholder
# code "ST" appears in the corpus fixtures; preserved here so the
# standardizer doesn't expand it as a Street abbreviation.
US_STATE_CODES: frozenset[str] = frozenset({
"AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA",
"HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD",
"MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
"NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC",
"SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY",
"DC", "PR", "VI", "GU", "AS", "MP",
"ST",
})
# Long-form → USPS-canonical short form (compression). Used by the
# format standardizer when ``expand=False`` and by normalize_address
# when reducing to a matching key.
USPS_COMPRESSIONS: dict[str, str] = {
"street": "St", "avenue": "Ave", "boulevard": "Blvd",
"drive": "Dr", "lane": "Ln", "road": "Rd", "court": "Ct",
"place": "Pl", "circle": "Cir", "trail": "Trl", "terrace": "Ter",
"parkway": "Pkwy", "highway": "Hwy", "expressway": "Expy",
"freeway": "Fwy", "square": "Sq", "alley": "Aly",
"crossing": "Xing", "point": "Pt",
"north": "N", "south": "S", "east": "E", "west": "W",
"northeast": "NE", "northwest": "NW", "southeast": "SE",
"southwest": "SW",
"apartment": "Apt", "suite": "Ste", "building": "Bldg",
"floor": "Fl", "room": "Rm", "fort": "Ft", "mount": "Mt",
"heights": "Hts", "springs": "Spgs",
}
# Abbreviation → expansion (the inverse of USPS_COMPRESSIONS, plus a
# handful of legacy aliases like ``av`` → ``Avenue``). Used by the
# format standardizer when ``expand=True`` (default).
USPS_EXPANSIONS: dict[str, str] = {
"st": "Street",
"ave": "Avenue", "av": "Avenue",
"blvd": "Boulevard", "blv": "Boulevard",
"dr": "Drive",
"ln": "Lane",
"rd": "Road",
"ct": "Court",
"pl": "Place",
"cir": "Circle",
"trl": "Trail", "tr": "Trail",
"ter": "Terrace",
"pkwy": "Parkway",
"hwy": "Highway",
"expy": "Expressway",
"fwy": "Freeway",
"sq": "Square",
"aly": "Alley",
"xing": "Crossing",
"pt": "Point",
"n": "North", "s": "South", "e": "East", "w": "West",
"ne": "Northeast", "nw": "Northwest",
"se": "Southeast", "sw": "Southwest",
"apt": "Apartment",
"ste": "Suite",
"bldg": "Building",
"fl": "Floor",
"rm": "Room",
"ft": "Fort",
"mt": "Mount",
"hts": "Heights",
"spgs": "Springs",
}

View File

@@ -20,6 +20,7 @@ from pathlib import Path
from typing import Any, Iterable, Literal, Optional from typing import Any, Iterable, Literal, Optional
import pandas as pd import pandas as pd
from loguru import logger
from pandas.api import types as pdtypes from pandas.api import types as pdtypes
from .io import RepairResult, repair_bytes, detect_encoding, detect_delimiter from .io import RepairResult, repair_bytes, detect_encoding, detect_delimiter
@@ -384,7 +385,14 @@ def _detect_inconsistent_date_format(df: pd.DataFrame) -> list[Finding]:
for col in df.columns: for col in df.columns:
try: try:
ser = df[col].dropna().astype(str) ser = df[col].dropna().astype(str)
except Exception: except (TypeError, ValueError) as e:
# Some pandas extension dtypes (e.g., custom Decimal arrays)
# can refuse string coercion. Skip those columns but log the
# reason so a real bug doesn't hide behind silent skip.
logger.debug(
"date-format detector: skipping {!r} ({}): {}",
col, type(e).__name__, e,
)
continue continue
nonempty = ser[ser.str.strip().astype(bool)] nonempty = ser[ser.str.strip().astype(bool)]
if len(nonempty) < 4: if len(nonempty) < 4:

View File

@@ -86,7 +86,19 @@ class DeduplicationConfig:
@classmethod @classmethod
def from_file(cls, path: str | Path) -> DeduplicationConfig: def from_file(cls, path: str | Path) -> DeduplicationConfig:
"""Load configuration from a JSON file.""" """Load configuration from a JSON file."""
data = json.loads(Path(path).read_text()) path = Path(path)
try:
text = path.read_text()
except OSError as e:
raise OSError(
f"Could not read DeduplicationConfig from {path}: {e}"
) from e
try:
data = json.loads(text)
except json.JSONDecodeError as e:
raise ValueError(
f"Invalid JSON in DeduplicationConfig file {path}: {e}"
) from e
return cls.from_dict(data) return cls.from_dict(data)
@classmethod @classmethod

View File

@@ -13,7 +13,7 @@ from __future__ import annotations
import re import re
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import Enum from enum import Enum
from typing import Callable, Optional from typing import Any, Callable, Optional
import pandas as pd import pandas as pd
from loguru import logger from loguru import logger
@@ -162,6 +162,23 @@ def _compute_similarity(val_a: str, val_b: str, algorithm: Algorithm) -> float:
# Pair comparison # Pair comparison
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _is_missing(value: Any) -> bool:
"""True for None, NaN, or pd.NA — used everywhere we need to treat
a cell as "absent" rather than as the string ``"None"`` or ``"nan"``.
Centralizing the check avoids the silent-merge bug where
``str(None)`` and ``str(nan)`` would both compare equal to themselves
and produce a false 100% match in the deduplicator.
"""
if value is None:
return True
if value is pd.NA:
return True
if isinstance(value, float) and pd.isna(value):
return True
return False
def _compare_pair( def _compare_pair(
row_a: pd.Series, row_a: pd.Series,
row_b: pd.Series, row_b: pd.Series,
@@ -180,18 +197,8 @@ def _compare_pair(
raw_a = row_a.get(col, "") raw_a = row_a.get(col, "")
raw_b = row_b.get(col, "") raw_b = row_b.get(col, "")
# NaN / None always count as "empty" — never as the literal va = "" if _is_missing(raw_a) else str(raw_a)
# string "None" or "nan", which would otherwise let two rows vb = "" if _is_missing(raw_b) else str(raw_b)
# with missing data in this column match at 100% similarity.
a_missing = raw_a is None or (
isinstance(raw_a, float) and pd.isna(raw_a)
) or raw_a is pd.NA
b_missing = raw_b is None or (
isinstance(raw_b, float) and pd.isna(raw_b)
) or raw_b is pd.NA
va = "" if a_missing else str(raw_a)
vb = "" if b_missing else str(raw_b)
# Skip if both empty # Skip if both empty
if not va and not vb: if not va and not vb:
@@ -324,8 +331,14 @@ def _select_survivor(
if rule == SurvivorRule.KEEP_MOST_RECENT: if rule == SurvivorRule.KEEP_MOST_RECENT:
if not date_column or date_column not in df.columns: if not date_column or date_column not in df.columns:
logger.warning("date_column '{}' not found; falling back to keep_first", date_column) # The public ``deduplicate()`` validates this earlier, so
return indices[0] # reaching here means a caller invoked the helper directly
# with bad arguments — surface a clear error instead of a
# silent fallback that produces wrong-but-plausible output.
raise ValueError(
f"KEEP_MOST_RECENT requires date_column to be a column in df; "
f"got {date_column!r} (available: {list(df.columns)})"
)
best_idx = indices[0] best_idx = indices[0]
best_date = _parse_date(df.iloc[indices[0]].get(date_column, "")) best_date = _parse_date(df.iloc[indices[0]].get(date_column, ""))
for idx in indices[1:]: for idx in indices[1:]:
@@ -504,6 +517,24 @@ def deduplicate(
Returns a ``DeduplicationResult``. Returns a ``DeduplicationResult``.
""" """
if not isinstance(df, pd.DataFrame):
raise TypeError(
f"deduplicate() requires a pandas DataFrame; got {type(df).__name__}"
)
if survivor_rule == SurvivorRule.KEEP_MOST_RECENT and not date_column:
raise ValueError(
"survivor_rule=KEEP_MOST_RECENT requires date_column to be set"
)
if (
survivor_rule == SurvivorRule.KEEP_MOST_RECENT
and date_column
and date_column not in df.columns
):
raise ValueError(
f"date_column={date_column!r} not found in input. "
f"Available columns: {list(df.columns)}"
)
log_entries: list[str] = [] log_entries: list[str] = []
original_count = len(df) original_count = len(df)
@@ -511,6 +542,19 @@ def deduplicate(
strategies = build_default_strategies(df) strategies = build_default_strategies(df)
log_entries.append(f"Auto-detected {len(strategies)} match strategies") log_entries.append(f"Auto-detected {len(strategies)} match strategies")
if not strategies:
# Empty input or no detectable strategies — return the input
# unchanged with an empty match result, structurally consistent.
logger.info("deduplicate: no strategies, returning input unchanged")
return DeduplicationResult(
original_row_count=original_count,
deduplicated_df=df.reset_index(drop=True),
removed_df=df.iloc[0:0].copy().reset_index(drop=True),
match_groups=[],
log_entries=log_entries + ["No strategies → no-op"],
is_preview=preview,
)
# Validate every strategy references real columns — silent skip # Validate every strategy references real columns — silent skip
# would let a typo (``e_mail`` instead of ``email``) produce a # would let a typo (``e_mail`` instead of ``email``) produce a
# confidently-empty result. # confidently-empty result.

View File

@@ -78,6 +78,19 @@ def available_actions() -> list[str]:
# Helpers # Helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _is_string_column(ser: pd.Series) -> bool:
"""Return True for object- or pandas-string-typed Series.
Centralizes the dtype check so the three column-mutating helpers
(``_apply_to_strings``, ``_vectorized_translate``,
``_vectorized_regex_sub``) stay in sync if the rule changes.
"""
return (
pd.api.types.is_object_dtype(ser)
or pd.api.types.is_string_dtype(ser)
)
def _apply_to_strings( def _apply_to_strings(
df: pd.DataFrame, fn: Callable[[str], str], *, include_headers: bool = False, df: pd.DataFrame, fn: Callable[[str], str], *, include_headers: bool = False,
inplace: bool = False, inplace: bool = False,
@@ -101,7 +114,7 @@ def _apply_to_strings(
changed = 0 changed = 0
for col in out.columns: for col in out.columns:
ser = out[col] ser = out[col]
if not (pd.api.types.is_object_dtype(ser) or pd.api.types.is_string_dtype(ser)): if not _is_string_column(ser):
continue continue
# ``map`` skips non-string cells via the lambda guard. We compare # ``map`` skips non-string cells via the lambda guard. We compare
# the original column to the transformed one for a single mask # the original column to the transformed one for a single mask
@@ -139,7 +152,7 @@ def _vectorized_translate(
changed = 0 changed = 0
for col in out.columns: for col in out.columns:
ser = out[col] ser = out[col]
if not (pd.api.types.is_object_dtype(ser) or pd.api.types.is_string_dtype(ser)): if not _is_string_column(ser):
continue continue
new_col = ser.where(~ser.map(lambda v: isinstance(v, str)), ser.str.translate(trans)) new_col = ser.where(~ser.map(lambda v: isinstance(v, str)), ser.str.translate(trans))
delta = int((new_col != ser).sum()) delta = int((new_col != ser).sum())
@@ -167,7 +180,7 @@ def _vectorized_regex_sub(
changed = 0 changed = 0
for col in out.columns: for col in out.columns:
ser = out[col] ser = out[col]
if not (pd.api.types.is_object_dtype(ser) or pd.api.types.is_string_dtype(ser)): if not _is_string_column(ser):
continue continue
new_col = ser.astype("string").str.replace(pattern, repl, regex=True).astype(object).fillna(ser) new_col = ser.astype("string").str.replace(pattern, repl, regex=True).astype(object).fillna(ser)
delta = int((new_col != ser).sum()) delta = int((new_col != ser).sum())
@@ -307,7 +320,7 @@ def lowercase_email(df: pd.DataFrame, payload: Optional[dict] = None) -> tuple[p
if col not in out.columns: if col not in out.columns:
continue continue
ser = out[col] ser = out[col]
if not (pd.api.types.is_object_dtype(ser) or pd.api.types.is_string_dtype(ser)): if not _is_string_column(ser):
continue continue
new_col = ser.map(lambda v: v.lower() if isinstance(v, str) else v) new_col = ser.map(lambda v: v.lower() if isinstance(v, str) else v)
delta = int((new_col != ser).sum()) delta = int((new_col != ser).sum())

View File

@@ -23,6 +23,8 @@ from __future__ import annotations
import json import json
import re import re
from loguru import logger
from dataclasses import asdict, dataclass, field from dataclasses import asdict, dataclass, field
from datetime import datetime, timedelta from datetime import datetime, timedelta
from enum import Enum from enum import Enum
@@ -51,6 +53,19 @@ class FieldType(str, Enum):
EMAIL = "email" EMAIL = "email"
# Shared error-policy helper used by every per-domain standardizer.
# Returns ``(<error: reason>, changed)`` under the ``"sentinel"`` policy
# and ``(value, False)`` under ``"passthrough"`` so unparseable input
# survives unchanged.
def _err_or_passthrough(
reason: str, value: str, policy: str,
) -> tuple[str, bool]:
if policy == "sentinel":
sentinel = f"<error: {reason}>"
return sentinel, sentinel != value
return value, False
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Date # Date
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -153,24 +168,49 @@ _MONTH_LOCALES: dict[str, dict[str, str]] = {
} }
def _build_month_locale_patterns() -> dict[str, list[tuple["re.Pattern[str]", str]]]:
"""Precompile per-locale (pattern, replacement) lists once at import.
The previous loop compiled every pattern for every input cell — at
millions of rows that's a measurable hot spot.
"""
out: dict[str, list[tuple[re.Pattern[str], str]]] = {}
for loc, table in _MONTH_LOCALES.items():
out[loc] = [
(
re.compile(
rf"(?<![A-Za-z]){re.escape(foreign)}(?![A-Za-z])",
re.IGNORECASE,
),
english,
)
for foreign, english in table.items()
]
return out
_MONTH_LOCALE_PATTERNS = _build_month_locale_patterns()
def _apply_month_locale(s: str, locales: list[str]) -> str: def _apply_month_locale(s: str, locales: list[str]) -> str:
"""Replace localized month names with English equivalents.""" """Replace localized month names with English equivalents.
Raises ``ValueError`` if any locale is unrecognized — silent skip
would mask typos like ``"FR"`` (uppercase) or ``"french"``.
"""
unknown = [
loc for loc in locales if loc != "en" and loc not in _MONTH_LOCALES
]
if unknown:
raise ValueError(
f"Unknown month locale(s): {unknown}. "
f"Available: {sorted(_MONTH_LOCALES) + ['en']}"
)
for loc in locales: for loc in locales:
if loc == "en": if loc == "en":
continue continue
table = _MONTH_LOCALES.get(loc) for pat, english in _MONTH_LOCALE_PATTERNS[loc]:
if not table: s = pat.sub(english, s)
continue
for foreign, english in table.items():
# Word-boundary match, case-insensitive — covers ``15 janvier
# 2024`` and ``15. Januar 2024`` alike. The replacement also
# strips a trailing period after a German abbreviation (``15.``
# is the day; the month is the next token).
pattern = re.compile(
rf"(?<![A-Za-z]){re.escape(foreign)}(?![A-Za-z])",
re.IGNORECASE,
)
s = pattern.sub(english, s)
return s return s
@@ -250,11 +290,7 @@ def standardize_date(
if not s: if not s:
return value, False return value, False
def _err(reason: str) -> tuple[str, bool]: _err = lambda reason: _err_or_passthrough(reason, value, error_policy)
if error_policy == "sentinel":
sentinel = f"<error: {reason}>"
return sentinel, sentinel != value
return value, False
# Excel serial dates and Unix timestamps don't survive the weekday- # Excel serial dates and Unix timestamps don't survive the weekday-
# prefix / time-tail strips, so try them first. They short-circuit # prefix / time-tail strips, so try them first. They short-circuit
@@ -395,11 +431,7 @@ def standardize_phone(
if not s: if not s:
return value, False return value, False
def _err(reason: str) -> tuple[str, bool]: _err = lambda reason: _err_or_passthrough(reason, value, error_policy)
if error_policy == "sentinel":
sentinel = f"<error: {reason}>"
return sentinel, sentinel != value
return value, False
if output_format == "DIGITS": if output_format == "DIGITS":
digits = re.sub(r"\D", "", s) digits = re.sub(r"\D", "", s)
@@ -437,13 +469,11 @@ def standardize_phone(
try: try:
parsed = phonenumbers.parse(s, default_region) parsed = phonenumbers.parse(s, default_region)
except phonenumbers.NumberParseException: except phonenumbers.NumberParseException:
# Only emit a sentinel for inputs that clearly contain digits # Anything that can't be parsed becomes a sentinel under the
# but failed to parse (corpus § 4.3 errors). Pure non-numeric # sentinel policy; passthrough returns the original. Both digit-
# strings pass through unchanged so a "TBD"-style placeholder # and-formatting failures and pure non-numeric ("TBD"-style) cells
# doesn't get reshaped into a phone error. # land here.
if re.search(r"\d", s):
return _err("not a phone number") return _err("not a phone number")
return _err("not a phone number") # symmetric — TBD/garbage flagged
if not phonenumbers.is_possible_number(parsed): if not phonenumbers.is_possible_number(parsed):
# Distinguish "too many digits" from generic invalidity for # Distinguish "too many digits" from generic invalidity for
@@ -594,11 +624,7 @@ def standardize_currency(
if not s: if not s:
return value, False return value, False
def _err(reason: str) -> tuple[str, bool]: _err = lambda reason: _err_or_passthrough(reason, value, error_policy)
if error_policy == "sentinel":
sentinel = f"<error: {reason}>"
return sentinel, sentinel != value
return value, False
if "%" in s: if "%" in s:
return _err("percentage not currency") return _err("percentage not currency")
@@ -941,51 +967,16 @@ def standardize_name(
# Address # Address
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Expansion table — the inverse of the dedup-side ``_USPS_ABBREVIATIONS``. # Expansion table — the inverse of the dedup-side compression set in
# These are the canonical long-form spellings the standardizer emits when # ``normalize_address``. We deliberately don't expand ``unit``, ``loop``,
# it sees the abbreviation. We deliberately don't expand ``unit``, ``loop``, # or ``way`` because those are already the long form. Canonical mappings
# or ``way`` because those are already the long form. # live in :mod:`src.core._constants` so both modules stay in sync.
_ADDRESS_EXPANSIONS: dict[str, str] = { from ._constants import (
"st": "Street", USPS_EXPANSIONS as _ADDRESS_EXPANSIONS,
"ave": "Avenue", USPS_COMPRESSIONS as _ADDRESS_COMPRESSIONS,
"av": "Avenue", US_STATE_CODES as _US_STATE_CODES_SHARED,
"blvd": "Boulevard", US_STATE_NAMES as _US_STATE_NAMES_SHARED,
"blv": "Boulevard", )
"dr": "Drive",
"ln": "Lane",
"rd": "Road",
"ct": "Court",
"pl": "Place",
"cir": "Circle",
"trl": "Trail",
"tr": "Trail",
"ter": "Terrace",
"pkwy": "Parkway",
"hwy": "Highway",
"expy": "Expressway",
"fwy": "Freeway",
"sq": "Square",
"aly": "Alley",
"xing": "Crossing",
"pt": "Point",
"n": "North",
"s": "South",
"e": "East",
"w": "West",
"ne": "Northeast",
"nw": "Northwest",
"se": "Southeast",
"sw": "Southwest",
"apt": "Apartment",
"ste": "Suite",
"bldg": "Building",
"fl": "Floor",
"rm": "Room",
"ft": "Fort",
"mt": "Mount",
"hts": "Heights",
"spgs": "Springs",
}
# Short tokens that look like directions but only mean a direction at the # Short tokens that look like directions but only mean a direction at the
# start or end of an address — never in the middle of a street name. This # start or end of an address — never in the middle of a street name. This
@@ -996,58 +987,23 @@ _DIRECTION_TOKENS = {"n", "s", "e", "w", "ne", "nw", "se", "sw"}
_TOKEN_RE = re.compile(r"\w+|[^\w\s]+|\s+") _TOKEN_RE = re.compile(r"\w+|[^\w\s]+|\s+")
# 2-letter US state postal codes — preserved verbatim so they don't get # Aliases over the shared constants — kept for the local module-level
# title-cased into ``Ny``/``Ca`` and don't collide with abbreviation # reads that already reference these names.
# entries (``ST`` no longer expands to ``Street`` when the surrounding _US_STATE_CODES = _US_STATE_CODES_SHARED
# context says it's a state code). _US_STATE_NAMES = _US_STATE_NAMES_SHARED
_US_STATE_CODES: set[str] = {
"AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA",
"HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD",
"MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
"NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC",
"SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY",
"DC", "PR", "VI", "GU", "AS", "MP",
# ``ST`` appears as a placeholder state in the corpus fixtures; keep
# it preserved so test rows don't trip the Street collision.
"ST",
}
# State name → 2-letter postal code. Used when ``state_to_code=True``. # Precompiled (pattern, code) list for the state-name → 2-letter
_US_STATE_NAMES: dict[str, str] = { # conversion. Sorted longest-first so ``new york`` matches before ``new``.
"alabama": "AL", "alaska": "AK", "arizona": "AZ", "arkansas": "AR", _STATE_NAME_PATTERNS: list[tuple[re.Pattern[str], str]] = [
"california": "CA", "colorado": "CO", "connecticut": "CT", (
"delaware": "DE", "florida": "FL", "georgia": "GA", "hawaii": "HI", re.compile(
"idaho": "ID", "illinois": "IL", "indiana": "IN", "iowa": "IA", rf"(,\s*){re.escape(full)}(\s+\d{{5}}(?:-\d{{4}})?)",
"kansas": "KS", "kentucky": "KY", "louisiana": "LA", "maine": "ME", re.IGNORECASE,
"maryland": "MD", "massachusetts": "MA", "michigan": "MI", ),
"minnesota": "MN", "mississippi": "MS", "missouri": "MO", code,
"montana": "MT", "nebraska": "NE", "nevada": "NV", )
"new hampshire": "NH", "new jersey": "NJ", "new mexico": "NM", for full, code in sorted(_US_STATE_NAMES.items(), key=lambda kv: -len(kv[0]))
"new york": "NY", "north carolina": "NC", "north dakota": "ND", ]
"ohio": "OH", "oklahoma": "OK", "oregon": "OR", "pennsylvania": "PA",
"rhode island": "RI", "south carolina": "SC", "south dakota": "SD",
"tennessee": "TN", "texas": "TX", "utah": "UT", "vermont": "VT",
"virginia": "VA", "washington": "WA", "west virginia": "WV",
"wisconsin": "WI", "wyoming": "WY",
"district of columbia": "DC",
}
# Inverse abbreviation table used when ``expand=False`` — compresses
# spelled-out forms back to their USPS abbreviations.
_ADDRESS_COMPRESSIONS: dict[str, str] = {
"street": "St", "avenue": "Ave", "boulevard": "Blvd",
"drive": "Dr", "lane": "Ln", "road": "Rd", "court": "Ct",
"place": "Pl", "circle": "Cir", "trail": "Trl", "terrace": "Ter",
"parkway": "Pkwy", "highway": "Hwy", "expressway": "Expy",
"freeway": "Fwy", "square": "Sq", "alley": "Aly",
"crossing": "Xing", "point": "Pt",
"north": "N", "south": "S", "east": "E", "west": "W",
"northeast": "NE", "northwest": "NW", "southeast": "SE",
"southwest": "SW",
"apartment": "Apt", "suite": "Ste", "building": "Bldg",
"floor": "Fl", "room": "Rm", "fort": "Ft", "mount": "Mt",
"heights": "Hts", "springs": "Spgs",
}
# PO Box variants normalize to a single canonical form. # PO Box variants normalize to a single canonical form.
_PO_BOX_RE = re.compile( _PO_BOX_RE = re.compile(
@@ -1144,15 +1100,10 @@ def standardize_address(
if state_to_code and is_us_shaped: if state_to_code and is_us_shaped:
# Only convert state names in the *state slot* — between a comma # Only convert state names in the *state slot* — between a comma
# and a US ZIP — so the city ``New York`` in ``…, New York, NY # and a US ZIP — so the city ``New York`` in ``…, New York, NY
# 10001`` is not shortened to ``NY``. # 10001`` is not shortened to ``NY``. Patterns are precompiled
for full, code in sorted( # at module load.
_US_STATE_NAMES.items(), key=lambda kv: -len(kv[0]) for pat, code in _STATE_NAME_PATTERNS:
): s = pat.sub(rf"\g<1>{code}\g<2>", s)
pattern = re.compile(
rf"(,\s*){re.escape(full)}(\s+\d{{5}}(?:-\d{{4}})?)",
re.IGNORECASE,
)
s = pattern.sub(rf"\g<1>{code}\g<2>", s)
if not expand: if not expand:
# Compression direction is only safe for US-shaped addresses. # Compression direction is only safe for US-shaped addresses.
@@ -1297,11 +1248,7 @@ def standardize_email(
if not s: if not s:
return value, False return value, False
def _err(reason: str) -> tuple[str, bool]: _err = lambda reason: _err_or_passthrough(reason, value, error_policy)
if error_policy == "sentinel":
sentinel = f"<error: {reason}>"
return sentinel, sentinel != value
return value, False
# Multi-email cell — error before we silently pick one. # Multi-email cell — error before we silently pick one.
if _EMAIL_MULTI_RE.search(s) and not s.startswith("<"): if _EMAIL_MULTI_RE.search(s) and not s.startswith("<"):
@@ -1583,10 +1530,34 @@ class StandardizeOptions:
known = {f for f in cls.__dataclass_fields__} known = {f for f in cls.__dataclass_fields__}
kwargs = {k: v for k, v in data.items() if k in known} kwargs = {k: v for k, v in data.items() if k in known}
column_types = kwargs.get("column_types") or {} column_types = kwargs.get("column_types") or {}
try:
kwargs["column_types"] = { kwargs["column_types"] = {
c: FieldType(t) if not isinstance(t, FieldType) else t c: FieldType(t) if not isinstance(t, FieldType) else t
for c, t in column_types.items() for c, t in column_types.items()
} }
except ValueError as e:
valid = ", ".join(sorted(t.value for t in FieldType))
raise ValueError(
f"Invalid field type in column_types: {e}. Valid: {valid}"
) from e
# Surface enum-string mismatches early — bad date_order ("xyz")
# would otherwise crash deep inside standardize_date.
for field_name, valid in (
("date_order", {"MDY", "DMY"}),
("phone_format", set(_PHONE_FORMAT_MAP) | {"DIGITS"}),
("currency_decimal", {"dot", "comma"}),
("name_case", {"title", "upper", "lower"}),
("boolean_style", set(_BOOL_OUTPUT)),
("date_error_policy", {"passthrough", "sentinel"}),
("phone_error_policy", {"passthrough", "sentinel"}),
("currency_error_policy", {"passthrough", "sentinel"}),
("email_error_policy", {"passthrough", "sentinel"}),
):
value = kwargs.get(field_name)
if value is not None and value not in valid:
raise ValueError(
f"Invalid {field_name}={value!r}. Valid: {sorted(valid)}"
)
return cls(**kwargs) return cls(**kwargs)
def to_dict(self) -> dict: def to_dict(self) -> dict:
@@ -1602,7 +1573,20 @@ class StandardizeOptions:
@classmethod @classmethod
def from_file(cls, path: str | Path) -> StandardizeOptions: def from_file(cls, path: str | Path) -> StandardizeOptions:
return cls.from_dict(json.loads(Path(path).read_text())) path = Path(path)
try:
text = path.read_text()
except OSError as e:
raise OSError(
f"Could not read StandardizeOptions config from {path}: {e}"
) from e
try:
data = json.loads(text)
except json.JSONDecodeError as e:
raise ValueError(
f"Invalid JSON in StandardizeOptions config {path}: {e}"
) from e
return cls.from_dict(data)
@dataclass @dataclass
@@ -1826,6 +1810,20 @@ def standardize_dataframe(
columns=["row", "column", "field_type", "old", "new"], columns=["row", "column", "field_type", "old", "new"],
) )
# Surface a warning when more than 10% of typed cells failed to
# parse — usually means the user mis-typed a column (text marked
# as DATE) or the data is genuinely garbage. Without this, a
# quietly-broken pipeline shows zero changes and silently lets bad
# data flow downstream.
if cells_total > 0 and cells_unparseable / cells_total > 0.1:
logger.warning(
"standardize_dataframe: {}/{} cells ({}%) in typed columns were "
"unparseable — check column_types for mismatches with the data.",
cells_unparseable,
cells_total,
int(100 * cells_unparseable / cells_total),
)
return StandardizeResult( return StandardizeResult(
standardized_df=out, standardized_df=out,
changes=changes_df, changes=changes_df,

View File

@@ -184,9 +184,18 @@ def read_file(
""" """
filepath = Path(path) filepath = Path(path)
if not filepath.exists(): if not filepath.exists():
raise FileNotFoundError(f"File not found: {filepath}") raise FileNotFoundError(
f"Input file not found: {filepath} "
f"(required for encoding/delimiter detection and reading)"
)
if chunk_size is not None and chunk_size <= 0:
raise ValueError(f"chunk_size must be positive; got {chunk_size}")
suffix = filepath.suffix.lower() suffix = filepath.suffix.lower()
logger.info(
"read_file: {} (suffix={}, chunk_size={})",
filepath, suffix, chunk_size,
)
if suffix in (".xlsx", ".xls"): if suffix in (".xlsx", ".xls"):
return _read_excel(filepath, header_row=header_row, sheet_name=sheet_name) return _read_excel(filepath, header_row=header_row, sheet_name=sheet_name)
else: else:
@@ -362,6 +371,10 @@ def write_file(
Returns the resolved output Path. Returns the resolved output Path.
""" """
if not isinstance(df, pd.DataFrame):
raise TypeError(
f"write_file() requires a pandas DataFrame; got {type(df).__name__}"
)
out = Path(path) out = Path(path)
fmt = file_format or out.suffix.lstrip(".").lower() fmt = file_format or out.suffix.lstrip(".").lower()
if fmt in ("xlsx", "xls"): if fmt in ("xlsx", "xls"):

View File

@@ -151,70 +151,22 @@ def normalize_name(value: Optional[str]) -> str:
# Address normalizer # Address normalizer
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Long-form → short-form (USPS) for normalization-side matching keys.
# Sourced from the shared canonical map; lower-cased + augmented with a
# few normalize-only aliases (``saint`` → ``st``, identity entries for
# ``unit`` / ``loop`` / ``way``, ``number`` → ``#``).
from ._constants import USPS_COMPRESSIONS as _SHARED_COMPRESSIONS, US_STATE_NAMES as _SHARED_STATE_NAMES
_USPS_ABBREVIATIONS: dict[str, str] = { _USPS_ABBREVIATIONS: dict[str, str] = {
"street": "st", **{k: v.lower() for k, v in _SHARED_COMPRESSIONS.items()},
"avenue": "ave", "way": "way", "loop": "loop", "unit": "unit",
"boulevard": "blvd",
"drive": "dr",
"lane": "ln",
"road": "rd",
"court": "ct",
"place": "pl",
"circle": "cir",
"trail": "trl",
"way": "way",
"terrace": "ter",
"parkway": "pkwy",
"highway": "hwy",
"expressway": "expy",
"freeway": "fwy",
"square": "sq",
"loop": "loop",
"alley": "aly",
"crossing": "xing",
"point": "pt",
"north": "n",
"south": "s",
"east": "e",
"west": "w",
"northeast": "ne",
"northwest": "nw",
"southeast": "se",
"southwest": "sw",
"apartment": "apt",
"suite": "ste",
"building": "bldg",
"floor": "fl",
"room": "rm",
"unit": "unit",
"number": "#", "number": "#",
"saint": "st", "saint": "st",
"fort": "ft",
"mount": "mt",
"heights": "hts",
"springs": "spgs",
} }
# Lowercase variant of the shared state-name map for matching keys.
# US state name → 2-letter postal code. Substituted before tokenization
# so ``California`` and ``CA`` normalize to the same key.
_US_STATE_NAMES_NORM: dict[str, str] = { _US_STATE_NAMES_NORM: dict[str, str] = {
"alabama": "al", "alaska": "ak", "arizona": "az", "arkansas": "ar", name: code.lower() for name, code in _SHARED_STATE_NAMES.items()
"california": "ca", "colorado": "co", "connecticut": "ct",
"delaware": "de", "florida": "fl", "georgia": "ga", "hawaii": "hi",
"idaho": "id", "illinois": "il", "indiana": "in", "iowa": "ia",
"kansas": "ks", "kentucky": "ky", "louisiana": "la", "maine": "me",
"maryland": "md", "massachusetts": "ma", "michigan": "mi",
"minnesota": "mn", "mississippi": "ms", "missouri": "mo",
"montana": "mt", "nebraska": "ne", "nevada": "nv",
"new hampshire": "nh", "new jersey": "nj", "new mexico": "nm",
"new york": "ny", "north carolina": "nc", "north dakota": "nd",
"ohio": "oh", "oklahoma": "ok", "oregon": "or", "pennsylvania": "pa",
"rhode island": "ri", "south carolina": "sc", "south dakota": "sd",
"tennessee": "tn", "texas": "tx", "utah": "ut", "vermont": "vt",
"virginia": "va", "washington": "wa", "west virginia": "wv",
"wisconsin": "wi", "wyoming": "wy",
"district of columbia": "dc",
} }

View File

@@ -18,6 +18,7 @@ from pathlib import Path
from typing import Any, Callable, Iterable, Literal, Optional from typing import Any, Callable, Iterable, Literal, Optional
import pandas as pd import pandas as pd
from loguru import logger
from pandas.api import types as pdtypes from pandas.api import types as pdtypes
@@ -535,7 +536,16 @@ def clean_dataframe(df: pd.DataFrame, options: Optional[CleanOptions] = None) ->
Numeric, datetime, and boolean columns are skipped by default. The input Numeric, datetime, and boolean columns are skipped by default. The input
DataFrame is not mutated; a copy is returned in ``CleanResult.cleaned_df``. DataFrame is not mutated; a copy is returned in ``CleanResult.cleaned_df``.
""" """
if not isinstance(df, pd.DataFrame):
raise TypeError(
f"clean_dataframe() requires a pandas DataFrame; "
f"got {type(df).__name__}"
)
options = options or CleanOptions() options = options or CleanOptions()
logger.debug(
"clean_dataframe: rows={}, cols={}, case={}",
len(df), len(df.columns), options.case,
)
out = df.copy() out = df.copy()
columns = _select_columns(out, options) columns = _select_columns(out, options)

View File

@@ -301,3 +301,76 @@ class TestNullLikeIncludesPandasNA:
# keep_default_na=False, pandas NA values appear as the literal # keep_default_na=False, pandas NA values appear as the literal
# string "<NA>" and the analyzer should flag them. # string "<NA>" and the analyzer should flag them.
assert "<na>" in _NULL_LIKE assert "<na>" in _NULL_LIKE
# ---------------------------------------------------------------------------
# Production-readiness review (refactor pass)
# ---------------------------------------------------------------------------
class TestProductionReadyValidation:
def test_chunk_size_must_be_positive(self, tmp_path: Path):
from src.core.io import read_file
f = tmp_path / "tiny.csv"
f.write_text("a,b\n1,2\n")
with pytest.raises(ValueError, match="chunk_size must be positive"):
list(read_file(f, chunk_size=0))
with pytest.raises(ValueError, match="chunk_size must be positive"):
list(read_file(f, chunk_size=-5))
def test_write_file_rejects_non_dataframe(self, tmp_path: Path):
from src.core.io import write_file
with pytest.raises(TypeError, match="requires a pandas DataFrame"):
write_file({"a": [1]}, tmp_path / "out.csv") # type: ignore[arg-type]
def test_clean_dataframe_rejects_non_dataframe(self):
from src.core.text_clean import clean_dataframe
with pytest.raises(TypeError, match="requires a pandas DataFrame"):
clean_dataframe([{"a": 1}]) # type: ignore[arg-type]
def test_deduplicate_rejects_non_dataframe(self):
from src.core.dedup import deduplicate
with pytest.raises(TypeError, match="requires a pandas DataFrame"):
deduplicate({"x": [1]}) # type: ignore[arg-type]
def test_keep_most_recent_requires_date_column(self):
from src.core.dedup import deduplicate, SurvivorRule
df = pd.DataFrame({"name": ["a", "b"]})
with pytest.raises(ValueError, match="date_column"):
deduplicate(df, survivor_rule=SurvivorRule.KEEP_MOST_RECENT)
def test_keep_most_recent_with_unknown_date_column(self):
from src.core.dedup import deduplicate, SurvivorRule
df = pd.DataFrame({"name": ["a", "b"]})
with pytest.raises(ValueError, match="not found in input"):
deduplicate(
df,
survivor_rule=SurvivorRule.KEEP_MOST_RECENT,
date_column="created_at",
)
def test_standardize_options_invalid_enum(self):
from src.core.format_standardize import StandardizeOptions
with pytest.raises(ValueError, match="Invalid date_order"):
StandardizeOptions.from_dict({"date_order": "XYZ"})
def test_standardize_options_invalid_field_type(self):
from src.core.format_standardize import StandardizeOptions
with pytest.raises(ValueError, match="Invalid field type"):
StandardizeOptions.from_dict({"column_types": {"x": "made_up"}})
def test_month_locale_validation(self):
from src.core.format_standardize import standardize_date
with pytest.raises(ValueError, match="Unknown month locale"):
standardize_date("15 januar 2024", month_locales=["DE"]) # uppercase typo
def test_config_from_file_missing(self, tmp_path: Path):
from src.core.config import DeduplicationConfig
with pytest.raises(OSError, match="Could not read"):
DeduplicationConfig.from_file(tmp_path / "missing.json")
def test_config_from_file_bad_json(self, tmp_path: Path):
from src.core.config import DeduplicationConfig
path = tmp_path / "bad.json"
path.write_text("{not: valid json")
with pytest.raises(ValueError, match="Invalid JSON"):
DeduplicationConfig.from_file(path)