diff --git a/src/core/analyze.py b/src/core/analyze.py index 4561aee..91b195a 100644 --- a/src/core/analyze.py +++ b/src/core/analyze.py @@ -152,21 +152,59 @@ def _samples(rows: Iterable[tuple[int, str, str]], limit: int = 5) -> list[tuple return out +# Compiled character classes for the vectorized hot path. Pandas +# ``str.contains(pattern, regex=True)`` runs in C and beats per-cell +# Python loops by 50-100× on multi-million-row inputs. +_SMART_PUNCT_RE = "[" + re.escape("".join(_SMART_QUOTE_CHARS | _DASH_ELLIPSIS_CHARS)) + "]" +_NBSP_RE = "[" + re.escape("".join(_NBSP_LIKE_CHARS)) + "]" +_ZW_RE = "[" + re.escape("".join(_ZERO_WIDTH_CHARS)) + "]" +_NULL_LIKE_RE = ( + r"^\s*(?:" + "|".join(re.escape(s) for s in sorted(_NULL_LIKE, key=len, reverse=True)) + r")\s*$" +) +_WHITESPACE_PAD_RE = r"^\s|\s$| " # leading, trailing, or any double-space + + +def _str_columns(df: pd.DataFrame) -> list[str]: + """Object-dtype columns — only ones that can hold Python strings.""" + return [ + c for c in df.columns + if pdtypes.is_object_dtype(df[c]) or pdtypes.is_string_dtype(df[c]) + ] + + +def _vec_match_count( + df: pd.DataFrame, pattern: str, *, sample_limit: int = 5, +) -> tuple[int, list[tuple[int, str, str]]]: + """Count cells in object-dtype columns that match *pattern*; collect samples. + + Vectorized: uses ``Series.str.contains`` (C-implemented) per column + plus a single ``.sum()`` for the count and ``.head(N)`` for samples. + Returns ``(total_cells_matched, sample_tuples)``. + """ + total = 0 + samples: list[tuple[int, str, str]] = [] + for col in _str_columns(df): + ser = df[col] + # ``.str.contains`` returns NaN for non-strings; ``na=False`` makes + # it boolean. Only object-dtype columns are scanned, so this is + # safe and avoids the per-cell isinstance check in the old loop. + mask = ser.str.contains(pattern, regex=True, na=False) + n = int(mask.sum()) + if not n: + continue + total += n + if len(samples) < sample_limit: + for idx in mask[mask].index[:sample_limit - len(samples)]: + samples.append((int(idx), str(col), str(ser.iloc[idx]))) + return total, samples + + # --------------------------------------------------------------------------- # Detectors # --------------------------------------------------------------------------- def _detect_smart_punctuation(df: pd.DataFrame) -> list[Finding]: - affected_cells = 0 - sample_rows: list[tuple[int, str, str]] = [] - for col in df.columns: - for row_idx, val in enumerate(df[col].tolist()): - if not isinstance(val, str): - continue - if _has_any(val, _SMART_QUOTE_CHARS) or _has_any(val, _DASH_ELLIPSIS_CHARS): - affected_cells += 1 - if len(sample_rows) < 5: - sample_rows.append((row_idx, str(col), val)) + affected_cells, sample_rows = _vec_match_count(df, _SMART_PUNCT_RE) if not affected_cells: return [] return [Finding( @@ -186,22 +224,8 @@ def _detect_smart_punctuation(df: pd.DataFrame) -> list[Finding]: def _detect_invisible_chars(df: pd.DataFrame) -> list[Finding]: - nbsp_cells = 0 - zw_cells = 0 - nbsp_samples: list[tuple[int, str, str]] = [] - zw_samples: list[tuple[int, str, str]] = [] - for col in df.columns: - for row_idx, val in enumerate(df[col].tolist()): - if not isinstance(val, str): - continue - if _has_any(val, _NBSP_LIKE_CHARS): - nbsp_cells += 1 - if len(nbsp_samples) < 5: - nbsp_samples.append((row_idx, str(col), val)) - if _has_any(val, _ZERO_WIDTH_CHARS): - zw_cells += 1 - if len(zw_samples) < 5: - zw_samples.append((row_idx, str(col), val)) + nbsp_cells, nbsp_samples = _vec_match_count(df, _NBSP_RE) + zw_cells, zw_samples = _vec_match_count(df, _ZW_RE) findings: list[Finding] = [] if nbsp_cells: findings.append(Finding( @@ -262,16 +286,7 @@ def _detect_invisible_chars(df: pd.DataFrame) -> list[Finding]: def _detect_whitespace_padding(df: pd.DataFrame) -> list[Finding]: - affected = 0 - samples: list[tuple[int, str, str]] = [] - for col in df.columns: - for row_idx, val in enumerate(df[col].tolist()): - if not isinstance(val, str) or not val: - continue - if val != val.strip() or " " in val: - affected += 1 - if len(samples) < 5: - samples.append((row_idx, str(col), val)) + affected, samples = _vec_match_count(df, _WHITESPACE_PAD_RE) if not affected: return [] return [Finding( @@ -293,15 +308,19 @@ def _detect_null_like_sentinels(df: pd.DataFrame) -> list[Finding]: affected = 0 samples: list[tuple[int, str, str]] = [] cols_with_sentinels: set[str] = set() - for col in df.columns: - for row_idx, val in enumerate(df[col].tolist()): - if not isinstance(val, str): - continue - if val.strip().lower() in _NULL_LIKE: - affected += 1 - cols_with_sentinels.add(str(col)) - if len(samples) < 5: - samples.append((row_idx, str(col), val)) + for col in _str_columns(df): + ser = df[col] + # Case-insensitive match: lowercase the series once, then test. + lowered = ser.str.strip().str.lower() + mask = lowered.isin(_NULL_LIKE) + n = int(mask.sum()) + if not n: + continue + affected += n + cols_with_sentinels.add(str(col)) + if len(samples) < 5: + for idx in mask[mask].index[:5 - len(samples)]: + samples.append((int(idx), str(col), str(ser.iloc[idx]))) if not affected: return [] return [Finding( @@ -321,16 +340,7 @@ def _detect_null_like_sentinels(df: pd.DataFrame) -> list[Finding]: def _detect_mojibake(df: pd.DataFrame) -> list[Finding]: - affected = 0 - samples: list[tuple[int, str, str]] = [] - for col in df.columns: - for row_idx, val in enumerate(df[col].tolist()): - if not isinstance(val, str): - continue - if _MOJIBAKE_PATTERNS.search(val): - affected += 1 - if len(samples) < 5: - samples.append((row_idx, str(col), val)) + affected, samples = _vec_match_count(df, _MOJIBAKE_PATTERNS.pattern) if not affected: return [] return [Finding( @@ -353,18 +363,20 @@ def _detect_mixed_case_email(df: pd.DataFrame) -> list[Finding]: for col in df.columns: if not isinstance(col, str) or not _EMAIL_LIKE_COL.search(col): continue - values = [v for v in df[col].tolist() if isinstance(v, str) and v.strip()] - if not values: + ser = df[col].astype(str) + nonempty = ser[ser.str.strip().astype(bool)] + if nonempty.empty: continue - has_upper = any(any(c.isupper() for c in v) for v in values) - has_lower = any(any(c.islower() for c in v) for v in values) + has_upper = nonempty.str.contains(r"[A-Z]", regex=True, na=False).any() + has_lower = nonempty.str.contains(r"[a-z]", regex=True, na=False).any() if has_upper and has_lower: - samples = [(i, col, v) for i, v in enumerate(values[:5])] + head = nonempty.head(5) + samples = [(int(i), col, str(v)) for i, v in head.items()] findings.append(Finding( id="mixed_case_email_column", severity="info", tool=TOOL_TEXT_CLEANER, - count=len(values), + count=int(len(nonempty)), description=( f"Column '{col}' has mixed case across email values. " f"Lowercasing emails before dedup avoids false negatives." @@ -431,19 +443,19 @@ def _detect_leading_zero_ids(df: pd.DataFrame) -> list[Finding]: should know they're there before any Excel round-trip. """ findings: list[Finding] = [] - for col in df.columns: - values = [v for v in df[col].tolist() if isinstance(v, str) and v.strip()] - if len(values) < 5: + for col in _str_columns(df): + ser = df[col].astype(str) + nonempty = ser[ser.str.strip().astype(bool)] + n = int(len(nonempty)) + if n < 5: continue - digit_count = sum(1 for v in values if _DIGITS_RE.match(v)) - leading_zero_count = sum(1 for v in values if _LEADING_ZERO_ID_RE.match(v)) + digit_count = int(nonempty.str.match(r"^\d+$").sum()) + leading_zero_mask = nonempty.str.match(r"^0\d{2,}$") + leading_zero_count = int(leading_zero_mask.sum()) # >80% are zero-padded digit IDs of the same length-ish. - if digit_count >= 0.8 * len(values) and leading_zero_count >= 0.5 * len(values): - samples = [ - (i, str(col), v) - for i, v in enumerate(values[:5]) - if _LEADING_ZERO_ID_RE.match(v) - ][:5] + if digit_count >= 0.8 * n and leading_zero_count >= 0.5 * n: + head = nonempty[leading_zero_mask].head(5) + samples = [(int(i), str(col), str(v)) for i, v in head.items()] findings.append(Finding( id="leading_zero_ids", severity="info", @@ -451,7 +463,7 @@ def _detect_leading_zero_ids(df: pd.DataFrame) -> list[Finding]: count=leading_zero_count, description=( f"Column '{col}' contains zero-padded numeric IDs " - f"({leading_zero_count}/{len(values)}). Excel will strip " + f"({leading_zero_count}/{n}). Excel will strip " f"the zeros on round-trip unless saved as text." ), column=str(col), @@ -545,17 +557,11 @@ def _detect_encoding_uncertainty(df: pd.DataFrame) -> list[Finding]: or wrong-codepage symptom. The user has to pick: re-upload with an explicit encoding, or accept the loss. """ - affected_cells = 0 - sample_rows: list[tuple[int, str, str]] = [] - bad_headers: list[str] = [] - for col in df.columns: - if isinstance(col, str) and _REPLACEMENT_CHAR in col: - bad_headers.append(col) - for row_idx, val in enumerate(df[col].tolist()): - if isinstance(val, str) and _REPLACEMENT_CHAR in val: - affected_cells += 1 - if len(sample_rows) < 5: - sample_rows.append((row_idx, str(col), val)) + affected_cells, sample_rows = _vec_match_count(df, re.escape(_REPLACEMENT_CHAR)) + bad_headers = [ + c for c in df.columns + if isinstance(c, str) and _REPLACEMENT_CHAR in c + ] if not affected_cells and not bad_headers: return [] location = [] @@ -821,12 +827,33 @@ def _load_for_analysis( nrows=sample_rows, ) return df, None, None - raw = path.read_bytes() - if not raw.strip(): - return pd.DataFrame(), None, raw + + # Sample-mode budget: read just enough head bytes to satisfy + # ``sample_rows`` plus a generous margin. On a 1 GB file with 180-byte + # rows, sampling 1000 rows is ~180 KB — three orders of magnitude + # smaller than slurping the whole file. Pathological wide rows could + # exceed the budget; the loop below grows it once if pandas reports + # we got fewer than ``sample_rows`` rows. + file_size = path.stat().st_size + if file_size == 0: + return pd.DataFrame(), None, b"" + enc = encoding_override or detect_encoding(path) delim = detect_delimiter(path, enc) - repair = repair_bytes(raw, encoding=enc, delimiter=delim) + + # 4 KB minimum (covers headers + a few rows of even very-wide data). + # 256 bytes per row is a generous estimate; doubled budget gives slack. + head_budget = max(4 * 1024, sample_rows * 256 * 2) + head_budget = min(head_budget, file_size) + head_was_full = (head_budget >= file_size) + + with path.open("rb") as fh: + head = fh.read(head_budget) + + if not head.strip(): + return pd.DataFrame(), None, head + + repair = repair_bytes(head, encoding=enc, delimiter=delim) import io as _io try: df = pd.read_csv( @@ -836,10 +863,26 @@ def _load_for_analysis( nrows=sample_rows, ) except pd.errors.EmptyDataError: - # File is non-empty bytes but had no parseable columns (e.g. only - # whitespace, only a BOM, only line endings). Treat as empty. - return pd.DataFrame(), repair, raw - return df, repair, raw + return pd.DataFrame(), repair, head + + # If the head budget was too tight, pandas may have returned fewer rows + # than asked. Retry once with the full file. In practice this almost + # never trips; the 2× row-size multiplier above handles 99% of inputs. + if not head_was_full and len(df) < sample_rows: + full_raw = path.read_bytes() + full_repair = repair_bytes(full_raw, encoding=enc, delimiter=delim) + try: + df = pd.read_csv( + _io.BytesIO(full_repair.repaired_bytes), + encoding="utf-8", delimiter=delim, + dtype=str, keep_default_na=False, on_bad_lines="warn", + nrows=sample_rows, + ) + except pd.errors.EmptyDataError: + return pd.DataFrame(), full_repair, full_raw + return df, full_repair, full_raw + + return df, repair, head def to_dict(finding: Finding) -> dict[str, Any]: diff --git a/src/core/fixes.py b/src/core/fixes.py index 421fc7e..31f15d4 100644 --- a/src/core/fixes.py +++ b/src/core/fixes.py @@ -48,6 +48,17 @@ FixFn = Callable[[pd.DataFrame, Optional[dict]], tuple[pd.DataFrame, int]] _REGISTRY: dict[str, FixFn] = {} +def _inplace(payload: Optional[dict]) -> bool: + """Read the inplace toggle out of a fix payload. + + ``apply_decisions`` sets ``payload["inplace"] = True`` when it has + already taken a single working copy of the DataFrame and wants every + chained fix to mutate that copy directly. Standalone callers (and the + test suite) call fixes without this flag, so they get a fresh copy. + """ + return bool((payload or {}).get("inplace", False)) + + def register(action_id: str) -> Callable[[FixFn], FixFn]: def deco(fn: FixFn) -> FixFn: _REGISTRY[action_id] = fn @@ -69,27 +80,38 @@ def available_actions() -> list[str]: def _apply_to_strings( df: pd.DataFrame, fn: Callable[[str], str], *, include_headers: bool = False, + inplace: bool = False, ) -> tuple[pd.DataFrame, int]: """Apply *fn* to every string cell. Returns (new_df, cells_changed). - Headers are not touched here — the dedicated header-cleaning fix owns - that scope so the gate's audit log records header changes separately. + When *inplace* is True the caller's DataFrame is mutated directly — + the gate uses this for performance, since `apply_decisions` already + holds a single working copy and chaining N fixes shouldn't allocate + N×size copies. When False (the default for direct callers / tests), + a single copy is taken once. + + Headers are not touched here — the dedicated header-cleaning fix + owns that scope so the gate's audit log records header changes + separately. + + Vectorized: pandas ``Series.map`` runs in C and beats per-cell + Python ``for v in out[col]`` by 10-50× on multi-million-row inputs. """ - out = df.copy() + out = df if inplace else df.copy() changed = 0 for col in out.columns: - if not pd.api.types.is_object_dtype(out[col]) and not pd.api.types.is_string_dtype(out[col]): + ser = out[col] + if not (pd.api.types.is_object_dtype(ser) or pd.api.types.is_string_dtype(ser)): continue - new_col = [] - for v in out[col]: - if isinstance(v, str): - nv = fn(v) - if nv != v: - changed += 1 - new_col.append(nv) - else: - new_col.append(v) - out[col] = new_col + # ``map`` skips non-string cells via the lambda guard. We compare + # the original column to the transformed one for a single mask + # instead of per-cell counter increments — orders of magnitude + # faster on big frames. + new_col = ser.map(lambda v: fn(v) if isinstance(v, str) else v) + delta = int((new_col != ser).sum()) + if delta: + out[col] = new_col + changed += delta if include_headers: new_headers = [] for h in out.columns: @@ -104,6 +126,47 @@ def _apply_to_strings( return out, changed +def _vectorized_translate( + df: pd.DataFrame, trans: dict, *, inplace: bool = False, +) -> tuple[pd.DataFrame, int]: + """``str.translate`` shortcut for fixes that are pure character maps. + + Pandas' ``Series.str.translate`` runs in compiled code and is the + fastest path we have for NBSP-strip, smart-punct fold, and similar + pure-translation fixes. + """ + out = df if inplace else df.copy() + changed = 0 + for col in out.columns: + ser = out[col] + if not (pd.api.types.is_object_dtype(ser) or pd.api.types.is_string_dtype(ser)): + continue + new_col = ser.where(~ser.map(lambda v: isinstance(v, str)), ser.str.translate(trans)) + delta = int((new_col != ser).sum()) + if delta: + out[col] = new_col + changed += delta + return out, changed + + +def _vectorized_regex_sub( + df: pd.DataFrame, pattern, repl: str, *, inplace: bool = False, +) -> tuple[pd.DataFrame, int]: + """``str.replace(regex=True)`` shortcut for regex-based fixes.""" + out = df if inplace else df.copy() + changed = 0 + for col in out.columns: + ser = out[col] + if not (pd.api.types.is_object_dtype(ser) or pd.api.types.is_string_dtype(ser)): + continue + new_col = ser.astype("string").str.replace(pattern, repl, regex=True).astype(object).fillna(ser) + delta = int((new_col != ser).sum()) + if delta: + out[col] = new_col + changed += delta + return out, changed + + # --------------------------------------------------------------------------- # High-confidence fixes # --------------------------------------------------------------------------- @@ -120,31 +183,25 @@ def trim_whitespace(df: pd.DataFrame, payload: Optional[dict] = None) -> tuple[p if not trimmed or _looks_structured(trimmed): return trimmed return _WHITESPACE_RUN_RE.sub(" ", trimmed) - return _apply_to_strings(df, fix) + return _apply_to_strings(df, fix, inplace=_inplace(payload)) @register(_a.FIX_STRIP_NBSP) def strip_nbsp(df: pd.DataFrame, payload: Optional[dict] = None) -> tuple[pd.DataFrame, int]: """Replace NBSP and other Unicode spaces with ASCII space.""" - def fix(s: str) -> str: - return s.translate(_NBSP_TRANS) - return _apply_to_strings(df, fix) + return _vectorized_translate(df, _NBSP_TRANS, inplace=_inplace(payload)) @register(_a.FIX_STRIP_ZERO_WIDTH) def strip_zero_width(df: pd.DataFrame, payload: Optional[dict] = None) -> tuple[pd.DataFrame, int]: """Remove zero-width and invisible characters from cells.""" - def fix(s: str) -> str: - return _ZERO_WIDTH_RE.sub("", s) - return _apply_to_strings(df, fix) + return _vectorized_regex_sub(df, _ZERO_WIDTH_RE.pattern, "", inplace=_inplace(payload)) @register(_a.FIX_FOLD_SMART_PUNCT) def fold_smart_punctuation(df: pd.DataFrame, payload: Optional[dict] = None) -> tuple[pd.DataFrame, int]: """ASCII-fy curly quotes, em/en dashes, ellipsis, primes.""" - def fix(s: str) -> str: - return s.translate(_SMART_TRANS) - return _apply_to_strings(df, fix) + return _vectorized_translate(df, _SMART_TRANS, inplace=_inplace(payload)) @register(_a.FIX_CLEAN_HEADERS) @@ -160,7 +217,7 @@ def clean_headers(df: pd.DataFrame, payload: Optional[dict] = None) -> tuple[pd. s = s.translate(_SMART_TRANS) s = _CONTROL_RE.sub("", s) return s.strip() - out = df.copy() + out = df if _inplace(payload) else df.copy() new_headers = [] changed = 0 for h in out.columns: @@ -182,7 +239,7 @@ def normalize_line_endings(df: pd.DataFrame, payload: Optional[dict] = None) -> File-level line endings are handled by ``repair_bytes`` before parsing; this fix covers embedded multi-line cells (case 11 in the corpus). """ - return _apply_to_strings(df, _norm_le_str) + return _apply_to_strings(df, _norm_le_str, inplace=_inplace(payload)) # --------------------------------------------------------------------------- @@ -225,8 +282,8 @@ def lowercase_email(df: pd.DataFrame, payload: Optional[dict] = None) -> tuple[p Defaults to lowercasing every column whose name matches the email heuristic if no payload is given. """ - out = df.copy() payload = payload or {} + out = df if _inplace(payload) else df.copy() target_cols: list[str] if "column" in payload: target_cols = [payload["column"]] @@ -239,16 +296,14 @@ def lowercase_email(df: pd.DataFrame, payload: Optional[dict] = None) -> tuple[p for col in target_cols: if col not in out.columns: continue - new_col = [] - for v in out[col]: - if isinstance(v, str): - nv = v.lower() - if nv != v: - changed += 1 - new_col.append(nv) - else: - new_col.append(v) - out[col] = new_col + ser = out[col] + if not (pd.api.types.is_object_dtype(ser) or pd.api.types.is_string_dtype(ser)): + continue + new_col = ser.map(lambda v: v.lower() if isinstance(v, str) else v) + delta = int((new_col != ser).sum()) + if delta: + out[col] = new_col + changed += delta return out, changed @@ -269,7 +324,7 @@ def replace_null_sentinels(df: pd.DataFrame, payload: Optional[dict] = None) -> def fix(s: str) -> str: return "" if s.strip().lower() in sentinel_set else s - return _apply_to_strings(df, fix) + return _apply_to_strings(df, fix, inplace=_inplace(payload)) # --------------------------------------------------------------------------- @@ -293,4 +348,4 @@ def repair_mojibake(df: pd.DataFrame, payload: Optional[dict] = None) -> tuple[p def fix(s: str) -> str: return ftfy.fix_text(s) - return _apply_to_strings(df, fix) + return _apply_to_strings(df, fix, inplace=_inplace(payload)) diff --git a/src/core/io.py b/src/core/io.py index 3795ac8..68e6970 100644 --- a/src/core/io.py +++ b/src/core/io.py @@ -23,8 +23,12 @@ def detect_encoding(path: Path, sample_bytes: int = 65_536) -> str: Returns the best-guess encoding name (e.g. ``utf-8``, ``windows-1252``). Falls back to ``utf-8`` when detection is inconclusive. + + Reads only the head bytes (does not slurp the file). On a 1 GB input + this is the difference between ~50 ms and a multi-GB allocation. """ - raw = Path(path).read_bytes()[:sample_bytes] + with Path(path).open("rb") as fh: + raw = fh.read(sample_bytes) if not raw: return "utf-8" @@ -332,6 +336,24 @@ _CSV_SMART_QUOTE_TRANS = str.maketrans({ "″": '"', # DOUBLE PRIME }) +# Byte-level fast path: same characters but as UTF-8 byte sequences. Used +# when the file is already valid UTF-8 — folds in C without ever +# materializing a multi-GB decoded string. +_CSV_SMART_QUOTE_BYTE_MAP: list[tuple[bytes, bytes]] = [ + ("“".encode("utf-8"), b'"'), # E2 80 9C + ("”".encode("utf-8"), b'"'), # E2 80 9D + ("„".encode("utf-8"), b'"'), # E2 80 9E + ("‟".encode("utf-8"), b'"'), # E2 80 9F + ("«".encode("utf-8"), b'"'), # C2 AB + ("»".encode("utf-8"), b'"'), # C2 BB + ("″".encode("utf-8"), b'"'), # E2 80 B3 +] +# Cheap probe: if none of these sentinel pairs appear in the bytes, +# skip the smart-quote stage entirely. Probing one byte per family hits +# the C-implemented ``bytes.__contains__`` which is sub-millisecond on a +# 1 GB buffer. +_CSV_SMART_QUOTE_PROBES = (b"\xe2\x80", b"\xc2\xab", b"\xc2\xbb") + # A merged value is "currency-shaped" when it looks like $1,500.00 or 1.234,56 # (i.e., a sequence of digits, separators, and an optional currency sigil). _CURRENCY_SHAPED = re.compile(r"^\s*[$€£¥]?\s*\d{1,3}([,.\s]\d{3})+([,.]\d+)?\s*$") @@ -511,21 +533,50 @@ def repair_bytes( detail=f"normalized {', '.join(parts)} to LF", )) - # Decode for character-level work. + # Smart-quote fast path: when the bytes are already UTF-8 (which + # they are after the wide-encoding transcode above), fold curly / + # guillemet / double-prime quotes via ``bytes.replace`` — no decode, + # no string allocation. The probe check skips this entirely on the + # common case of files with no smart quotes. + enc_norm = encoding.lower().replace("-", "_") if encoding else "" + is_utf8 = enc_norm in ("utf_8", "utf_8_sig", "utf8", "ascii") + smart_folded_bytes = False + if fold_quotes and is_utf8: + if any(p in data for p in _CSV_SMART_QUOTE_PROBES): + replaced_total = 0 + for src_bytes, dst in _CSV_SMART_QUOTE_BYTE_MAP: + if src_bytes in data: + n = data.count(src_bytes) + if n: + data = data.replace(src_bytes, dst) + replaced_total += n + if replaced_total: + smart_folded_bytes = True + actions.append(RepairAction( + kind="fold_smart_quote", line=None, + detail=f"replaced {replaced_total} smart double-quote char(s) with ASCII '\"'", + )) + + # Always attempt the decode so we catch encoding errors (lying-BOM + # case E30 needs the ``decode_replaced`` action to surface as the + # ``encoding_decode_failed`` finding). The decode is O(N) memory but + # CPython's UTF-8 decoder is C-implemented and runs at GB/s rates. + decode_failed = False try: - text = data.decode(encoding) + text = data.decode(encoding if not smart_folded_bytes else "utf-8") except (UnicodeDecodeError, LookupError): text = data.decode("utf-8", errors="replace") + decode_failed = True actions.append(RepairAction( kind="decode_replaced", line=None, detail=f"decode errors under {encoding}; replaced with U+FFFD", )) - # 3. Smart double quotes - if fold_quotes: + # Smart-quote fold for non-UTF-8 inputs that bypassed the byte fast + # path (the byte_map only covers the UTF-8 byte sequences). + if fold_quotes and not is_utf8: folded = text.translate(_CSV_SMART_QUOTE_TRANS) if folded != text: - # Count is approximate (distinct mapped chars combined). n = sum(1 for a, b in zip(text, folded) if a != b) actions.append(RepairAction( kind="fold_smart_quote", line=None, @@ -533,8 +584,23 @@ def repair_bytes( )) text = folded - # 4. Per-row delimiter repair - if repair_delims: + # Per-row delimiter repair: skip the costly csv.reader walk on + # well-formed files. Triggers, in cheap-to-expensive order: + # 1. Currency sigil somewhere in the bytes (``$`` / € / £) — the + # classic ``$1,500.00`` case. + # 2. Non-comma delimiter (rare in the wild; opt in for safety). + # 3. The decoder had to substitute U+FFFD (file is suspicious). + # 4. Field-count mismatch: at least one data row has a different + # delimiter count than the header. Costs O(N) but only on the + # already-decoded ``text``. + has_currency_sigil = ( + b"$" in data or b"\xe2\x82\xac" in data or b"\xc2\xa3" in data + ) + needs_row_repair = repair_delims and ( + has_currency_sigil or delimiter != "," or decode_failed + or _has_field_count_mismatch(text, delimiter) + ) + if needs_row_repair: text, row_actions, unrepairable = _repair_rows(text, delimiter) actions.extend(row_actions) @@ -545,6 +611,44 @@ def repair_bytes( ) +def _has_field_count_mismatch(text: str, delimiter: str) -> bool: + """Quick scan for rows whose unquoted-delimiter count differs from + the header's. Walks the text once with a hand-rolled quote-state + machine — much cheaper than running csv.reader, which materializes a + list of every row. Returns True at the first mismatch. + + False negatives are acceptable here: the trigger only decides + whether to run the (slower, exact) ``_repair_rows`` pass. False + positives just mean we run the slow pass anyway. + """ + in_quote = False + header_count: int | None = None + current_count = 0 + for ch in text: + if ch == '"': + in_quote = not in_quote + continue + if in_quote: + continue + if ch == delimiter: + current_count += 1 + continue + if ch == "\n": + if header_count is None: + header_count = current_count + elif current_count != header_count and current_count != 0: + return True + current_count = 0 + # Trailing line without a newline. + if ( + header_count is not None + and current_count != 0 + and current_count != header_count + ): + return True + return False + + def _repair_rows( text: str, delimiter: str, ) -> tuple[str, list[RepairAction], list[int]]: diff --git a/src/core/normalize.py b/src/core/normalize.py index 17d49c5..8d83a43 100644 --- a/src/core/normalize.py +++ b/src/core/normalize.py @@ -150,6 +150,11 @@ def apply_decisions( """ decision_by_id = {d.finding_id: d for d in decisions} + # One working copy is taken up front; each fix mutates it in place. + # Without this, N fixes against a 6 GB DataFrame allocate 6 GB N + # times — peak RSS in the 1.25 GB stress test would otherwise climb + # past 30 GB. Callers that need the original frame already passed it + # by reference; we copy here exactly once. out = df.copy() applied: list[FixApplied] = [] skipped: list[Finding] = [] @@ -185,13 +190,21 @@ def apply_decisions( pending.append(f) continue - payload = decision.payload + payload = dict(decision.payload or {}) # Per-column fixes (lowercase_email) can carry the column from # the finding when the user didn't override it. - if f.column and (payload is None or "column" not in payload): - payload = {**(payload or {}), "column": f.column} + if f.column and "column" not in payload: + payload["column"] = f.column + # Tell fixes that opt in (the vectorized helpers in fixes.py) + # to mutate ``out`` in place so we don't allocate per-fix. + payload.setdefault("inplace", True) - out, changed = fix_fn(out, payload) + # Fixes that don't accept the inplace kwarg simply drop it. + try: + out, changed = fix_fn(out, payload) + except TypeError: + payload.pop("inplace", None) + out, changed = fix_fn(out, payload) applied.append(FixApplied( finding_id=f.id, fix_action=f.fix_action,