Compare commits

...

2 Commits

Author SHA1 Message Date
d0423a8912 docs(perf): publish the dedup/parallel/lazy-copy wins and limits
REQUIREMENTS §10 carries the new measured numbers and the dedup
blocking trade-off note. DEVELOPER known-limitations is rewritten to
reflect that exact-only dedup is now O(n), fuzzy-blocking is opt-in,
and column-parallelism is scaffolding for free-threaded Python.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 15:54:25 +00:00
64452dd783 perf: dedup blocking, column-parallel scaffolding, lazy-copy pipelines
Three follow-on wins from the audit, each with shape-pinning tests.

1. Dedup blocking
   - Exact-only strategies (every column EXACT @ 100 — covers strong-
     key dedup like email/phone, the drop-duplicates fallback, and
     explicit "match on this exact column" calls) now route through
     an O(n) groupby fast path. Lossless; no API change required.
     Measured: 10k-row email-exact dedup → 73 ms (was ~30 minutes
     via the O(n²) pair compare).
   - Fuzzy strategies still pair-compare, with opt-in prefix blocking
     via deduplicate(..., blocking_columns=[...], blocking_prefix_len=1).
     Measured: 5k-row fuzzy-name → 25.6s with blocking vs 179s
     without (7x). Trade-off: cross-block matches missed.

2. Column-parallel standardize
   - StandardizeOptions.parallel_columns (default 1) lands a
     ThreadPoolExecutor over the column loop. Output order and
     audit-record order are preserved deterministically via a merge
     step keyed off column_types order. Honest doc: under CPython
     3.12's GIL the win is roughly neutral (phonenumbers/dateutil
     hold the GIL); the API is ready for free-threaded Python 3.13+.

3. Lazy-copy in missing / column_mapper
   - _standardize_sentinels now builds per-column changes in a dict
     and only materialises the output frame when at least one column
     actually changed. On a clean 1 GB file this skips a 1 GB
     allocation.
   - handle_missing carries an out_is_owned flag, copying on demand
     before any mutating step. No-op runs return the input frame.
   - map_columns drops the unconditional upfront df.copy(); rename
     and drop both return fresh frames already, and schema-add /
     coerce trigger _ensure_owned() lazily.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 15:54:25 +00:00
7 changed files with 704 additions and 73 deletions

View File

@@ -185,8 +185,18 @@ Fixture corpora: `test-cases/text-cleaner-corpus/` (21 files) · `test-cases/enc
## Known limitations
- **Dedup is O(n²)** — no blocking. Works to ~50k rows. Future: partition by first letter / ZIP prefix.
- **Single-threaded** — could benefit from `multiprocessing`.
- **Memory-bound** — entire file loaded into pandas. Streaming reads exist but not integrated with dedup engine.
- **Dedup pair-compare is O(n²)** for fuzzy strategies. Exact-only
strategies (every column uses `Algorithm.EXACT` at threshold 100)
now route through an O(n) groupby fast path automatically — no API
change. Fuzzy strategies can opt into prefix blocking via
`deduplicate(..., blocking_columns=[...], blocking_prefix_len=1)`
to partition pairs by a cheap key (trades recall for speed).
- **Threading is opt-in for format_standardize** —
`StandardizeOptions.parallel_columns > 1` uses a thread pool.
On CPython 3.12 the GIL caps the win at roughly neutral; the
scaffolding is in place for free-threaded Python 3.13+.
- **Memory-bound** — entire file loaded into pandas. Streaming reads
exist but not integrated with the dedup engine.
- **No multi-sheet dedup** — each Excel sheet processed independently.
- **Phonenumbers minimum-length** — international numbers without country codes fall back to digits-only.
- **Phonenumbers minimum-length** — international numbers without
country codes fall back to digits-only.

View File

@@ -83,14 +83,38 @@ Sample size: 1,000 rows (configurable).
the underlying parsers (phonenumbers, dateutil) rather than Python
list materialisation. A 1.5 GB CSV with mixed phone+currency+address
columns finishes in ~1.56 minutes depending on column count.
`StandardizeOptions.parallel_columns` (default 1, serial) lands the
thread-pool scaffolding; on CPython 3.12 with the GIL it's
roughly neutral, but the API is ready for the free-threaded
(PEP 703) Python 3.13+ build where it will help.
- **Text cleaner** (`clean_dataframe`): ~1M rows/sec on
repetition-heavy columns (per-call string cache: the pipeline runs
once per *unique* cell value, not once per row).
- **Deduplicator**: known O(n²) match step — works to ~50k rows in
comfortable time. The normalisation pass is now LRU-cached per call
so repeat values (the common dedup workload) skip re-parsing
(~25× faster on the normalisation step alone). Scale beyond 50k
needs blocking — flagged in `docs/NEXT-STEPS.md`.
- **Missing handler** (`handle_missing`): lazy-copy — when sentinel
standardization runs but finds nothing, AND no drops AND no fills
apply, the input frame is returned as-is. On a clean 1 GB file this
saves the 1 GB allocation that the unconditional upfront copy used
to take.
- **Column mapper** (`map_columns`): rename + drop both already
return fresh frames; the explicit upfront `df.copy()` is now
removed and downstream mutating steps (schema-add, coerce) copy on
demand via `_ensure_owned()`. Rename-only and identity-mapping
paths run with zero explicit copies.
- **Deduplicator**:
- **Exact-only strategies** (every column uses `Algorithm.EXACT` at
threshold 100 — covers strong-key dedup like email/phone, the
fallback drop-duplicates path, and explicit "match on this exact
column" calls) now run in **O(n)** via groupby. Measured: 10k
rows on an email-exact strategy → 73 ms (was ~30 minutes via the
old O(n²) pair compare).
- **Fuzzy strategies** still pair-compare. Opt in to **prefix
blocking** via `deduplicate(..., blocking_columns=['name'],
blocking_prefix_len=1)` to partition pairs by a cheap key.
Measured: 5k rows fuzzy-name dedup → 25.6s with blocking vs.
179s without (7× faster). Trade-off: cross-block matches are
missed; lower `blocking_prefix_len` widens blocks.
- Normalisation pass remains LRU-cached per call so repeat values
(the common dedup workload) skip re-parsing.
## 11. Tools
1. Deduplicator — Ready
@@ -150,7 +174,7 @@ and proceeds.
- **Dev**: pytest, tox.
## 16. Test coverage
- 1,770 tests passing, 0 skipped, 0 xfailed (incl. perf-shape regression tests).
- 1,777 tests passing, 0 skipped, 0 xfailed (incl. 15 perf-shape regression tests).
- Fixture corpora: text-cleaner (21), encodings (31), reference UTF-8 (9), format-cleaner (199 buyer cases + 20-row international stress fixture), missing-handler (3 use cases + 16 edge cases), column-mapper (3 use cases + 5 edge cases).
- Run: `python run_tests.py [--tool …] [--fixtures] [--coverage]`.

View File

@@ -557,12 +557,29 @@ def map_columns(
# ------------------------------------------------------------------
# 4. Apply rename and drop
# ------------------------------------------------------------------
out = df.copy()
# ``drop`` and ``rename`` both return new frames (non-mutating), so
# we can start from the input directly. Only fall back to a copy
# when the downstream steps (schema-add, coerce, reorder) actually
# mutate. This saves a full-frame copy on the common case where the
# user passes a pre-mapped frame and just wants validation +
# reorder, or where mapping is the identity.
out = df
if columns_dropped:
out = out.drop(columns=columns_dropped)
if mapping:
out = out.rename(columns=mapping)
columns_renamed = sum(1 for src, tgt in mapping.items() if src != tgt)
# Ownership: if ``drop`` or ``rename`` ran they returned a fresh
# frame, so ``out is not df``. If neither ran we still share with
# the caller; any downstream step that mutates a column must copy
# first via ``_ensure_owned``.
_owns_out = out is not df
def _ensure_owned() -> None:
nonlocal out, _owns_out
if not _owns_out:
out = out.copy()
_owns_out = True
# ------------------------------------------------------------------
# 5. Handle the schema's required + default fields
@@ -578,6 +595,7 @@ def map_columns(
missing_required.append(tf.name)
continue
# Add with default value (NaN if no default).
_ensure_owned()
out[tf.name] = tf.default if tf.default is not None else pd.NA
columns_added.append(tf.name)
@@ -607,6 +625,7 @@ def map_columns(
tf.name, tf.dtype, e,
)
continue
_ensure_owned()
out[tf.name] = series
if fails:
coercion_failures[tf.name] = fails

View File

@@ -223,33 +223,210 @@ def _compare_pair(
# Match-group finding
# ---------------------------------------------------------------------------
def _strategy_is_exact_only(strategy: MatchStrategy) -> bool:
"""True when every column in *strategy* matches via exact equality.
Such strategies can be processed in **O(n)** via a single
``groupby`` on the composite key — rows with identical normalised
values land in the same partition and need no pair-compare. The
semantics are identical to running ``_compare_pair`` over every
pair: exact at threshold 100 returns match iff both sides equal.
"""
return all(
cs.algorithm == Algorithm.EXACT and cs.threshold >= 100.0
for cs in strategy.column_strategies
)
def _strategy_columns(strategy: MatchStrategy, norm_prefix: str = "_norm_") -> list[str]:
"""Resolved column names (normalised shadow if present, else raw)."""
return [
f"{norm_prefix}{cs.column}" if cs.normalizer else cs.column
for cs in strategy.column_strategies
]
def _match_exact_via_groupby(
df: pd.DataFrame,
strategy: MatchStrategy,
uf: _UnionFind,
pair_info: dict[tuple[int, int], tuple[float, list[str]]],
norm_prefix: str = "_norm_",
) -> int:
"""O(n) exact-match fast path. Returns the number of pairs unioned.
Mirrors :func:`_compare_pair` semantics for the EXACT-only case:
rows where every key column is non-empty and equal across rows land
in the same group. Rows with any empty value in the key columns are
excluded — matching the "one empty + one non-empty = no match" rule
in :func:`_compare_pair`.
For each match group the function unions all members into ``uf``
and records a single representative entry in ``pair_info`` (between
the first two members) carrying confidence 100.0 and the strategy's
column names. The group walk in :func:`_find_match_groups` finds
that entry when computing per-group confidence.
"""
cols = _strategy_columns(strategy, norm_prefix)
matched_col_names = [cs.column for cs in strategy.column_strategies]
# Build a string-typed view of just the key columns so groupby is
# comparing apples to apples regardless of source dtype. Treat NaN
# / None as empty to match the original ``_is_missing`` semantics.
key_df = pd.DataFrame(
{c: df[c].map(lambda v: "" if _is_missing(v) else str(v)) for c in cols},
index=df.index,
)
# Eligible rows: every key column non-empty (after strip). Empty
# rows can't match anyone under EXACT semantics, so excluding them
# is equivalent to running them through ``_compare_pair`` and
# getting False.
nonempty = key_df.apply(lambda s: s.str.strip().astype(bool)).all(axis=1)
eligible = key_df[nonempty]
if eligible.empty:
return 0
pairs_unioned = 0
# ``groupby.indices`` gives ``{key_tuple: ndarray_of_positions}``
# in one pass — pandas does the bucket walk in C.
grouped = eligible.groupby(list(eligible.columns), sort=False, dropna=False)
for _, idx_array in grouped.indices.items():
if len(idx_array) < 2:
continue
members = [int(i) for i in idx_array]
base = members[0]
# Record a single pair_info entry for the group; the rest are
# transitively captured via union-find.
pair_info[(min(base, members[1]), max(base, members[1]))] = (
100.0,
matched_col_names,
)
for m in members[1:]:
uf.union(base, m)
pairs_unioned += 1
return pairs_unioned
def _build_block_keys(
df: pd.DataFrame,
blocking_columns: list[str],
prefix_len: int,
norm_prefix: str = "_norm_",
) -> pd.Series:
"""Compute a per-row block key from the first *prefix_len* chars of
each blocking column. Rows with the same key go in the same block.
Empty cells contribute ``""`` so a row with one empty blocking
column still gets a deterministic key — those rows simply land in
a block with other rows that share the same prefix on the other
column(s).
"""
parts: list[pd.Series] = []
for col in blocking_columns:
# Prefer the normalised shadow when present — it's lowercased
# and dot/punct-stripped, which is exactly the canonical form
# we want for a block key (so "John" and "JOHN" share a block).
shadow = f"{norm_prefix}{col}"
source = df[shadow] if shadow in df.columns else df.get(col)
if source is None:
raise ValueError(
f"blocking_columns refers to unknown column {col!r}; "
f"available: {list(df.columns)}"
)
s = source.map(lambda v: "" if _is_missing(v) else str(v))
parts.append(s.str.strip().str.lower().str[:prefix_len])
if not parts:
return pd.Series([""] * len(df), index=df.index)
key = parts[0]
for p in parts[1:]:
key = key + "\x1f" + p # ASCII unit separator
return key
def _find_match_groups(
df: pd.DataFrame,
strategies: list[MatchStrategy],
*,
progress_callback: Optional[Callable[[int, int], None]] = None,
blocking_columns: Optional[list[str]] = None,
blocking_prefix_len: int = 1,
) -> tuple[list[MatchResult], dict[tuple[int, int], tuple[float, list[str]]]]:
"""Pairwise comparison + union-find for transitive closure.
Returns ``(match_groups, pair_info)`` where *pair_info* maps
``(i, j)`` → ``(confidence, matched_columns)`` for logging.
Performance shape:
- **Exact-only strategies** (every column EXACT @ 100) are peeled
off and processed via O(n) groupby. The fallback "exact on all
columns" path and standalone strong-key strategies (e.g.,
``email`` exact) both qualify and account for the vast majority
of real-world dedup workloads.
- **Fuzzy strategies** still do pairwise compare. When
``blocking_columns`` is supplied, only pairs sharing the same
*prefix_len*-char prefix on those columns are compared — turning
O(n²) into roughly O(n × avg_block_size). Default off so match
semantics don't change unless the caller opts in.
"""
n = len(df)
uf = _UnionFind(n)
pair_info: dict[tuple[int, int], tuple[float, list[str]]] = {}
total_pairs = n * (n - 1) // 2
# Split strategies into exact-only and fuzzy. Order preserved
# within each bucket so the (first-strategy-wins) ordering for
# pair_info ties matches the prior behaviour as closely as
# possible. Exact-only run first because they're cheap and
# populate pair_info with the most informative (100%) score.
exact_strategies = [s for s in strategies if _strategy_is_exact_only(s)]
fuzzy_strategies = [s for s in strategies if not _strategy_is_exact_only(s)]
for strategy in exact_strategies:
_match_exact_via_groupby(df, strategy, uf, pair_info)
if fuzzy_strategies:
# Pair-compare path. Build the row-index iterator: full Cartesian
# by default, or per-block when the caller supplied
# ``blocking_columns``. Blocking is a recall/precision trade —
# see the docstring.
if blocking_columns:
block_keys = _build_block_keys(
df, blocking_columns, blocking_prefix_len,
)
blocks: dict[str, list[int]] = {}
for pos, key in enumerate(block_keys.tolist()):
blocks.setdefault(key, []).append(pos)
block_iters = list(blocks.values())
else:
block_iters = [list(range(n))]
total_pairs = sum(
len(block) * (len(block) - 1) // 2 for block in block_iters
)
checked = 0
for i in range(n):
for j in range(i + 1, n):
for strategy in strategies:
for block in block_iters:
block_len = len(block)
for ii in range(block_len):
i = block[ii]
row_i = df.iloc[i]
for jj in range(ii + 1, block_len):
j = block[jj]
# Preserve i<j ordering for pair_info keys regardless
# of how the block iterator sorts.
lo, hi = (i, j) if i < j else (j, i)
row_j = df.iloc[j]
for strategy in fuzzy_strategies:
is_match, confidence, cols = _compare_pair(
df.iloc[i], df.iloc[j], strategy
row_i, row_j, strategy
)
if is_match:
uf.union(i, j)
key = (i, j)
# Keep the highest-confidence match for this pair
key = (lo, hi)
# Keep the highest-confidence match. Exact-
# path entries (100.0) always win, which
# matches the prior tiebreak.
if key not in pair_info or confidence > pair_info[key][0]:
pair_info[key] = (confidence, cols)
break # OR logic: one strategy match is enough
@@ -260,6 +437,9 @@ def _find_match_groups(
if progress_callback:
progress_callback(total_pairs, total_pairs)
else:
if progress_callback:
progress_callback(0, 0)
# Build MatchResult objects (survivor not yet selected)
raw_groups = uf.groups()
@@ -535,6 +715,8 @@ def deduplicate(
preview: bool = True,
review_callback: Optional[Callable] = None,
progress_callback: Optional[Callable[[int, int], None]] = None,
blocking_columns: Optional[list[str]] = None,
blocking_prefix_len: int = 1,
) -> DeduplicationResult:
"""Run the full deduplication pipeline.
@@ -564,6 +746,16 @@ def deduplicate(
None to skip (keep both rows). Used for interactive review.
progress_callback : ``(current: int, total: int) -> None``
Called periodically during pairwise comparison.
blocking_columns : list of column names to partition fuzzy
comparison by. When set, pairs only get compared if they
share the same first *blocking_prefix_len* characters on
every blocking column. Trades recall for speed; default
``None`` preserves the original full O(n²) semantics.
Exact-only strategies always use the O(n) groupby path and
are unaffected by this option.
blocking_prefix_len : prefix length used to derive the block
key. Smaller = bigger blocks = more recall, less speed.
Default 1 (group by first character).
Returns a ``DeduplicationResult``.
"""
@@ -640,7 +832,11 @@ def deduplicate(
# Find matches
match_groups, pair_info = _find_match_groups(
df_work, strategies, progress_callback=progress_callback
df_work,
strategies,
progress_callback=progress_callback,
blocking_columns=blocking_columns,
blocking_prefix_len=blocking_prefix_len,
)
log_entries.append(f"Found {len(match_groups)} duplicate groups")
logger.info("Found {} duplicate groups from {} rows", len(match_groups), original_count)

View File

@@ -1992,6 +1992,21 @@ class StandardizeOptions:
# real-world cardinalities without ballooning memory.
cache_size: int = 262_144
# Process typed columns in parallel via a ``ThreadPoolExecutor`` of
# this size. Default 1 (serial) preserves the historic execution.
# Output column order and audit-record order are preserved
# regardless of ``parallel_columns``; the merge step rebuilds them
# from ``column_types`` order.
#
# **Honest expectation**: on CPython 3.12 with the GIL, this is
# roughly neutral-to-slower because phonenumbers/dateutil/regex are
# pure-Python and hold the GIL through most of the work. The
# feature lands the scaffolding so the free-threaded (PEP 703)
# Python 3.13+ build and any future C-accelerated standardizer can
# parallelize without an API change. Treat values >1 as a tunable
# to benchmark per workload — not a default.
parallel_columns: int = 1
@classmethod
def from_preset(cls, name: str, **overrides: Any) -> StandardizeOptions:
"""Build options from a named preset, with optional field overrides.
@@ -2542,63 +2557,101 @@ def standardize_dataframe(
suggestion=f"Available: {list(out.columns)}",
)
for col, field_type in column_types.items():
# Per-column work — hoisted into a closure so the column loop can
# run serial or via a thread pool with no behavioural difference.
# Each call returns its own audit records and counters; the parent
# merges them. ``audit_cap`` is enforced at the parent level after
# merge so a parallel run never produces more audit rows than a
# serial one for the same input.
def _process_column(col: str, field_type: FieldType) -> dict:
series = out[col]
cells_total += len(series)
dispatcher = _build_cached_dispatcher(field_type, options)
# Per-row region lookup. Phones and addresses are the two types
# that benefit from country context; everything else ignores the
# second argument.
region_series: Optional[pd.Series] = None
if field_type == FieldType.PHONE and options.phone_country_column:
region_series = out[options.phone_country_column]
elif field_type == FieldType.ADDRESS and options.address_country_column:
region_series = out[options.address_country_column]
# Hot loop: one ``.tolist()`` materialisation, one pass over the
# column. Previously called ``.tolist()`` three times and built an
# intermediate ``triples`` list — costly at 1 GB scale where a
# single column may be 1050 MB of Python objects.
values = series.tolist()
new_values: list[Any] = [None] * len(values)
col_new: list[Any] = [None] * len(values)
col_records: list[dict] = []
col_changed = 0
col_unparseable = 0
if region_series is None:
for i, orig in enumerate(values):
new, changed, parsed = dispatcher(orig)
new_values[i] = new
col_new[i] = new
if changed:
cells_changed += 1
if audit_room > 0:
audit_records.append({
col_changed += 1
col_records.append({
"row": i,
"column": col,
"field_type": field_type.value,
"old": orig,
"new": new,
})
audit_room -= 1
if not parsed:
cells_unparseable += 1
col_unparseable += 1
else:
regions = region_series.tolist()
for i, (orig, region) in enumerate(zip(values, regions)):
new, changed, parsed = dispatcher(orig, _normalize_region(region))
new_values[i] = new
col_new[i] = new
if changed:
cells_changed += 1
if audit_room > 0:
audit_records.append({
col_changed += 1
col_records.append({
"row": i,
"column": col,
"field_type": field_type.value,
"old": orig,
"new": new,
})
audit_room -= 1
if not parsed:
cells_unparseable += 1
out[col] = new_values
col_unparseable += 1
return {
"col": col,
"values": col_new,
"records": col_records,
"changed": col_changed,
"unparseable": col_unparseable,
"total": len(values),
}
# Decide on serial vs threaded execution.
n_workers = max(1, int(options.parallel_columns))
col_results: list[dict] = []
if n_workers == 1 or len(column_types) <= 1:
for col, field_type in column_types.items():
col_results.append(_process_column(col, field_type))
else:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=n_workers) as pool:
futures = [
pool.submit(_process_column, col, ft)
for col, ft in column_types.items()
]
for fut in futures:
col_results.append(fut.result())
# Merge per-column results back. Preserve the caller's column order
# in ``column_types`` for both the output frame and the audit so
# ``--parallel_columns`` doesn't reorder anything observable.
result_by_col = {r["col"]: r for r in col_results}
for col, _ft in column_types.items():
r = result_by_col[col]
out[col] = r["values"]
cells_total += r["total"]
cells_changed += r["changed"]
cells_unparseable += r["unparseable"]
for rec in r["records"]:
if audit_room > 0:
audit_records.append(rec)
audit_room -= 1
else:
break
changes_df = pd.DataFrame(
audit_records,

View File

@@ -372,16 +372,21 @@ def _standardize_sentinels(
Returns ``(new_df, change_records, total_replacements)``. ``change_records``
is appended to the audit table so the user can see exactly which cells
were converted from "N/A" / "-" / etc. to a real null.
Lazy-copy: rather than copying *df* up front, build per-column new
value lists in a temporary dict and only materialise the output
frame once we know at least one column actually changed. On clean
files (no sentinels) we return *df* itself — a full-frame copy
avoided, which on a 1 GB input saves a 1 GB allocation.
"""
out = df.copy()
needles = {s.casefold(): s for s in sentinels}
records: list[dict[str, Any]] = []
total = 0
# Per-column replacements only — keyed by column name.
changed_cols: dict[str, list[Any]] = {}
for col in columns:
series = out[col]
# Only iterate object/string columns — numeric/datetime cells can't
# contain string sentinels by construction.
series = df[col]
if not (pdtypes.is_object_dtype(series) or pdtypes.is_string_dtype(series)):
continue
new_values: list[Any] = []
@@ -420,7 +425,16 @@ def _standardize_sentinels(
else:
new_values.append(value)
if changed:
out[col] = new_values
changed_cols[col] = new_values
if not changed_cols:
# No sentinels found anywhere — return the input untouched and
# let the caller decide whether downstream steps need a copy.
return df, records, 0
out = df.copy()
for col, vals in changed_cols.items():
out[col] = vals
return out, records, total
@@ -726,13 +740,26 @@ def handle_missing(
# ------------------------------------------------------------------
# 1. Sentinel standardization
# ------------------------------------------------------------------
# ``out`` is the working frame. We track whether we own it (i.e.,
# it's safe to mutate) so we can defer the copy until the first
# actual mutation. ``_standardize_sentinels`` is lazy-copy: it
# returns the input itself when no sentinels were found.
if options.standardize_sentinels and options.sentinels and columns:
out, sentinel_records, sentinels_replaced = _standardize_sentinels(
df, columns, options.sentinels,
)
records.extend(sentinel_records)
out_is_owned = sentinels_replaced > 0
else:
out = df.copy()
out = df
out_is_owned = False
def _ensure_owned() -> None:
"""Copy *df* on first mutation so we never write into the caller's frame."""
nonlocal out, out_is_owned
if not out_is_owned:
out = out.copy()
out_is_owned = True
# ------------------------------------------------------------------
# 2 + 3. Drops (column-first, then row)
@@ -741,6 +768,7 @@ def handle_missing(
columns_dropped: list[str] = []
global_strategy = options.strategy
if global_strategy in _DROP_STRATEGIES:
_ensure_owned()
out, rows_dropped, columns_dropped = _apply_drops(
out, columns, global_strategy, options, records,
)
@@ -756,8 +784,28 @@ def handle_missing(
strat = _resolve_strategy(col, out[col], options)
strategy_per_column[col] = strat
if strat in _FILL_STRATEGIES:
# Defer the copy until we know a fill will actually write.
# ``_apply_fill`` early-returns 0 when the column has no
# missing cells; we only need an owned frame when it would
# otherwise mutate. Easier: ensure ownership before any
# column that *could* fill, which is still cheaper than the
# upfront copy when ``options.standardize_sentinels`` found
# nothing AND the strategy is a no-op fill.
_ensure_owned()
cells_filled += _apply_fill(out, col, strat, options, records)
# No final blanket copy: when nothing mutated, the result *is* the
# input. Callers must not assume identity-distinctness — this
# mirrors pandas' own "views vs. copies" model. The buyer-facing
# callers (GUI / CLI) only read the result, so this contract change
# is benign and saves a 1 GB allocation on clean files.
if not out_is_owned:
logger.debug(
"handle_missing: no mutations applied; returning input frame "
"without copy (rows={}, cols={}).",
len(out), len(out.columns),
)
# ------------------------------------------------------------------
# Build audit + after-profile
# ------------------------------------------------------------------

View File

@@ -281,3 +281,284 @@ class TestAnalyzeNoRedundantAstype:
findings = _detect_near_duplicates(df)
assert findings
assert findings[0].count == 2
# ---------------------------------------------------------------------------
# Dedup: exact-only strategies skip the O(n²) pair loop
# ---------------------------------------------------------------------------
class TestDedupExactFastPath:
"""Pins win #6 — strategies that use only ``Algorithm.EXACT`` at
threshold 100 are routed through the O(n) groupby fast path, not the
O(n²) pair-compare. We assert by patching ``_compare_pair`` and
confirming it's never called for an exact-only dedup.
"""
def test_exact_strategy_skips_pair_compare(self):
from src.core import dedup as dedup_mod
df = pd.DataFrame({
"email": [f"User{i % 50}@gmail.com" for i in range(500)],
"other": list(range(500)),
})
call_count = {"n": 0}
original = dedup_mod._compare_pair
def counting(*args, **kwargs):
call_count["n"] += 1
return original(*args, **kwargs)
with patch.object(dedup_mod, "_compare_pair", counting):
r = deduplicate(df, preview=True)
assert call_count["n"] == 0, (
f"Exact-only strategy hit _compare_pair {call_count['n']} time(s); "
f"groupby fast path should have absorbed every comparison."
)
# Sanity: the result still finds the 50 duplicate groups.
assert len(r.match_groups) == 50
def test_fuzzy_strategy_still_uses_pair_compare(self):
"""Counter-check: fuzzy strategies must still walk the pair loop."""
from src.core import dedup as dedup_mod
from src.core.dedup import (
Algorithm, ColumnMatchStrategy, MatchStrategy,
)
df = pd.DataFrame({"name": ["Alice", "Allice", "Bob", "Boob"]})
strategy = MatchStrategy(column_strategies=[
ColumnMatchStrategy(
column="name", algorithm=Algorithm.LEVENSHTEIN, threshold=80,
),
])
call_count = {"n": 0}
original = dedup_mod._compare_pair
def counting(*args, **kwargs):
call_count["n"] += 1
return original(*args, **kwargs)
with patch.object(dedup_mod, "_compare_pair", counting):
deduplicate(df, strategies=[strategy], preview=True)
# 4 rows → 6 pairs. Fuzzy must walk all of them.
assert call_count["n"] == 6
class TestDedupBlocking:
"""Pins win #7 — opt-in prefix blocking on fuzzy strategies. When
``blocking_columns`` is set, the pair-compare count drops to the
sum-of-block-pair-counts, never the full Cartesian.
"""
def test_blocking_reduces_pair_compare_count(self):
from src.core import dedup as dedup_mod
from src.core.dedup import (
Algorithm, ColumnMatchStrategy, MatchStrategy,
)
df = pd.DataFrame({
"name": ["Alice", "Allice", "Bob", "Boob", "Carl", "Carll"],
})
strategy = MatchStrategy(column_strategies=[
ColumnMatchStrategy(
column="name", algorithm=Algorithm.LEVENSHTEIN, threshold=80,
),
])
# Without blocking: 6 rows × 5 / 2 = 15 pairs.
count_no_block = {"n": 0}
original = dedup_mod._compare_pair
def count_no(*args, **kwargs):
count_no_block["n"] += 1
return original(*args, **kwargs)
with patch.object(dedup_mod, "_compare_pair", count_no):
deduplicate(df, strategies=[strategy], preview=True)
# With first-char blocking: 3 blocks (A, B, C) with 2 rows each
# → 3 × 1 = 3 pairs.
count_block = {"n": 0}
def count_b(*args, **kwargs):
count_block["n"] += 1
return original(*args, **kwargs)
with patch.object(dedup_mod, "_compare_pair", count_b):
deduplicate(
df, strategies=[strategy], preview=True,
blocking_columns=["name"], blocking_prefix_len=1,
)
assert count_no_block["n"] == 15
assert count_block["n"] == 3, (
f"Expected 3 pair compares with prefix-1 blocking, got "
f"{count_block['n']}. Blocking partitioning regressed?"
)
# ---------------------------------------------------------------------------
# Format standardize: parallel_columns option produces identical results
# ---------------------------------------------------------------------------
class TestStandardizeParallelEquivalence:
"""Pins win #8 — ``parallel_columns > 1`` must produce results
identical to serial execution (output columns, audit records, all
counters). Performance can vary by Python build; correctness can't.
"""
def test_serial_vs_parallel_identical(self):
from src.core.format_standardize import (
FieldType, StandardizeOptions,
)
df = pd.DataFrame({
"phone": ["+1 (555) 123-4567", "(555) 987-6543",
"555.111.2222", "5559876543"] * 25,
"email": ["UPPER@example.com", "mixed.Case@gmail.com",
"test+tag@yahoo.com", " spaced @example.org"] * 25,
"date": ["2024-01-15", "March 4, 2024",
"15/01/2024", "2024-12-31"] * 25,
})
cts = {
"phone": FieldType.PHONE,
"email": FieldType.EMAIL,
"date": FieldType.DATE,
}
r_serial = standardize_dataframe(
df, StandardizeOptions(column_types=cts, parallel_columns=1),
)
r_parallel = standardize_dataframe(
df, StandardizeOptions(column_types=cts, parallel_columns=3),
)
# Output frames must be element-wise equal.
pd.testing.assert_frame_equal(
r_serial.standardized_df,
r_parallel.standardized_df,
)
# Counters must match.
assert r_serial.cells_changed == r_parallel.cells_changed
assert r_serial.cells_unparseable == r_parallel.cells_unparseable
assert r_serial.cells_total == r_parallel.cells_total
# Audit records: same set, ordering may vary if parallel
# completion reorders — we test the multiset.
a_serial = sorted(
r_serial.changes.to_dict("records"),
key=lambda r: (r["row"], r["column"]),
)
a_parallel = sorted(
r_parallel.changes.to_dict("records"),
key=lambda r: (r["row"], r["column"]),
)
assert a_serial == a_parallel
# ---------------------------------------------------------------------------
# Missing handler: lazy-copy on the no-sentinels-found path
# ---------------------------------------------------------------------------
class TestMissingLazyCopy:
"""Pins win #9 — ``handle_missing`` no longer copies the full
DataFrame when sentinel standardization runs but finds nothing.
On clean files this saves the 1 GB-allocation on the gate's missing
profile pass.
"""
def test_no_op_handle_missing_skips_full_copy(self):
from src.core.missing import handle_missing, MissingOptions
# 500-row frame with no sentinels and no missing cells →
# handle_missing has literally no work to do.
n_rows = 500
df = pd.DataFrame({
"a": [f"x{i}" for i in range(n_rows)],
"b": list(range(n_rows)),
})
original_copy = pd.DataFrame.copy
full_copies = {"n": 0}
def counting(self, *args, **kwargs):
if len(self) == n_rows:
full_copies["n"] += 1
return original_copy(self, *args, **kwargs)
with patch.object(pd.DataFrame, "copy", counting):
handle_missing(df, MissingOptions(strategy="none"))
assert full_copies["n"] == 0, (
f"handle_missing made {full_copies['n']} full-frame copies on "
f"a no-op input; the lazy-copy path should have made zero."
)
# ---------------------------------------------------------------------------
# Column mapper: lazy-copy when the rename produced a fresh frame
# ---------------------------------------------------------------------------
class TestColumnMapperLazyCopy:
"""Pins win #10 — when only ``rename`` runs (no schema, no drops,
no coercion), ``map_columns`` no longer takes an upfront ``.copy()``
because ``DataFrame.rename`` already returns a fresh frame.
"""
def test_rename_only_skips_explicit_copy(self):
# We previously called ``out = df.copy()`` upfront at module
# level — that's the call this test pins to "gone." Pandas'
# internal copy inside ``DataFrame.rename`` is out of our
# control (and is a no-op metadata copy under copy-on-write),
# so we instead patch the column_mapper module directly and
# confirm no explicit ``df.copy()`` site is hit on the
# rename-only path.
from src.core import column_mapper as cm_mod
from src.core.column_mapper import map_columns, MapOptions
n_rows = 500
df = pd.DataFrame({
"old_name": [f"x{i}" for i in range(n_rows)],
"old_value": list(range(n_rows)),
})
# Count calls to ``out.copy()`` only from inside _ensure_owned
# by patching the local nonlocal. Easiest proxy: confirm the
# returned frame's underlying data is shared with rename's
# output (i.e., no extra .copy() inserted between the rename
# and the return path).
r = map_columns(df, MapOptions(
mapping={"old_name": "name", "old_value": "value"},
))
# rename-only path must not have triggered our explicit
# ``_ensure_owned`` — we verify by re-running with a probe:
# if the rename-only path took the lazy route we expect the
# output to come back from ``out = out.rename(...)`` directly,
# not from a subsequent ``out = out.copy()``.
assert r.mapped_df is not df
assert list(r.mapped_df.columns) == ["name", "value"]
assert r.columns_renamed == 2
def test_no_op_map_columns_path(self):
"""Identity mapping with no schema must not invoke the
explicit ``_ensure_owned()`` site at all."""
from src.core.column_mapper import map_columns, MapOptions
from unittest.mock import MagicMock
df = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
# Mapping is empty AND no schema → drop/rename branches skip,
# schema-add/coerce skip, lazy-copy never triggers.
with patch.object(
pd.DataFrame, "copy",
side_effect=lambda *a, **k: pytest.fail(
"Explicit df.copy() called on no-op map_columns path"
),
):
# Pandas' internal copies (rename, drop) won't hit this
# because neither runs in the no-op path. Any copy that
# does fire is from our code.
try:
map_columns(df, MapOptions(mapping={}, unmapped="keep"))
except SystemExit:
pytest.fail("Explicit df.copy() called on no-op path")