From b8a9fa1b09f1b22ede1e4bbad398f8315d872c16 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 29 Apr 2026 15:37:49 +0000 Subject: [PATCH] feat(io): pre-parse CSV repair (BOM/NUL/smart-quotes/unquoted-delim) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Some pollution patterns block pandas before the cell-level cleaner can run. Add a pre-parse pass on raw bytes that fixes only what breaks parsing, and returns a structured action log the GUI/CLI can surface to the user. repair_bytes(raw, *, encoding, delimiter, fold_quotes, strip_nul, repair_delims): 1. Strip leading UTF-8 BOM. 2. Strip embedded NUL bytes (the C parser truncates fields at NUL). 3. Fold smart double quotes (curly, guillemet, double-prime) to ASCII '"'. Curly singles are NOT folded here; they don't conflict with CSV and the cell-level cleaner handles them more accurately. 4. Per-row repair when one rogue delimiter is embedded in a field that looks like currency or thousands-grouped digits. Tiered scoring keeps " $1,500.00 ,7" unambiguous: the strict currency regex match wins over the loose digit/sigil heuristic. read_csv_repaired(path) -> (DataFrame, RepairResult). RepairResult exposes .actions, .unrepairable_lines, and a summary() grouped by kind. Out of scope for this pass: encoding repair, delimiter conversion, multi- delimiter merges (k>1) — logged as unrepairable so callers can see what was left alone instead of silently parsing wrong. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/core/io.py | 281 +++++++++++++++++++++++++++++++++++++++++++++++ tests/test_io.py | 99 +++++++++++++++++ 2 files changed, 380 insertions(+) diff --git a/src/core/io.py b/src/core/io.py index 54e4904..d668e45 100644 --- a/src/core/io.py +++ b/src/core/io.py @@ -4,6 +4,8 @@ from __future__ import annotations import csv import io +import re +from dataclasses import dataclass, field from pathlib import Path from typing import Generator, Optional @@ -245,3 +247,282 @@ def write_file( df.to_csv(out, index=False, encoding=encoding) logger.info("Wrote {} rows to {}", len(df), out) return out + + +# --------------------------------------------------------------------------- +# Pre-parse repair (CSV / delimited text) +# --------------------------------------------------------------------------- +# +# Some pollution patterns confuse pandas' parser before the cleaner can ever +# see the data. Smart double quotes inside an unquoted field, NUL bytes, and +# unquoted delimiters embedded in numeric/currency cells all cause structural +# parse failures or silent truncation. These helpers operate on raw bytes +# (or decoded text) and produce a parseable byte stream plus an audit log. +# +# Design notes: +# - Single curly quotes (U+2018/U+2019) are NOT folded here: they don't +# conflict with the default CSV quote char and the cell-level cleaner +# handles them more accurately. Only double-quote-equivalents are folded. +# - Delimiter-row repair only attempts the unambiguous case (one extra +# field, one merge candidate that looks like currency/thousands-sep). +# Anything else is logged as unrepairable and the line is left alone. + +# Smart double-quote characters that confuse CSV parsing. +_CSV_SMART_QUOTE_TRANS = str.maketrans({ + "“": '"', # LEFT DOUBLE QUOTATION MARK + "”": '"', # RIGHT DOUBLE QUOTATION MARK + "„": '"', # DOUBLE LOW-9 QUOTATION MARK + "‟": '"', # DOUBLE HIGH-REVERSED-9 QUOTATION MARK + "«": '"', # LEFT-POINTING DOUBLE ANGLE QUOTATION MARK + "»": '"', # RIGHT-POINTING DOUBLE ANGLE QUOTATION MARK + "″": '"', # DOUBLE PRIME +}) + +# A merged value is "currency-shaped" when it looks like $1,500.00 or 1.234,56 +# (i.e., a sequence of digits, separators, and an optional currency sigil). +_CURRENCY_SHAPED = re.compile(r"^\s*[$€£¥]?\s*\d{1,3}([,.\s]\d{3})+([,.]\d+)?\s*$") +# Or a plain decimal with thousands grouping (no currency sigil). +_THOUSANDS_SHAPED = re.compile(r"^\s*\d{1,3}(,\d{3})+(\.\d+)?\s*$") + + +@dataclass +class RepairAction: + """One repair the pre-parse pass made to the raw bytes.""" + + kind: str # e.g. "strip_bom", "strip_nul", "fold_smart_quote", + # "quote_unquoted_delim" + line: Optional[int] # 1-indexed source line; None for file-level + detail: str + + +@dataclass +class RepairResult: + """Output of :func:`repair_bytes`.""" + + repaired_bytes: bytes + actions: list[RepairAction] = field(default_factory=list) + unrepairable_lines: list[int] = field(default_factory=list) + + @property + def changed(self) -> bool: + return bool(self.actions) + + def summary(self) -> dict[str, int]: + """Action count grouped by kind.""" + out: dict[str, int] = {} + for a in self.actions: + out[a.kind] = out.get(a.kind, 0) + 1 + return out + + +def _merge_score(left: str, right: str, delimiter: str) -> int: + """Rank how plausible it is that ``left+delimiter+right`` is one field. + + Higher = more confident. ``0`` means the merge is implausible. + + - 3: merged value matches a currency-shaped or thousands-shaped pattern. + - 1: loose heuristic (left has $/€/digit and right starts with digit, and + delimiter is one of ``,``/``.``). + - 0: no signal. + + Tiering matters because ``" $1,500.00 ,7"`` has two raw candidates + (``$1+500.00`` and ``500.00+7``) but only the first produces a strict + currency shape. + """ + merged = f"{left}{delimiter}{right}" + if _CURRENCY_SHAPED.match(merged) or _THOUSANDS_SHAPED.match(merged): + return 3 + if delimiter in ".,": + left_has_money = bool(re.search(r"[$€£¥]\s*\d", left)) or bool(re.search(r"\d\s*$", left)) + right_starts_digits = bool(re.match(r"\s*\d", right)) + if left_has_money and right_starts_digits: + return 1 + return 0 + + +def _repair_extra_field_row( + fields: list[str], expected: int, delimiter: str, +) -> Optional[list[str]]: + """Try to merge one adjacent pair so the row has *expected* fields. + + Returns the repaired field list, or *None* if no unambiguous merge exists. + """ + if len(fields) != expected + 1: + return None + scores = [ + (i, _merge_score(fields[i], fields[i + 1], delimiter)) + for i in range(len(fields) - 1) + ] + best = max(s for _, s in scores) + if best == 0: + return None + winners = [i for i, s in scores if s == best] + if len(winners) != 1: + return None + i = winners[0] + merged = f"{fields[i]}{delimiter}{fields[i + 1]}" + return fields[:i] + [merged] + fields[i + 2:] + + +def repair_bytes( + raw: bytes, + *, + encoding: str = "utf-8", + delimiter: str = ",", + fold_quotes: bool = True, + strip_nul: bool = True, + repair_delims: bool = True, +) -> RepairResult: + """Pre-parse repair on a raw delimited file. + + Performs (in order, each toggleable): + + 1. Strip a leading UTF-8 BOM. + 2. Strip embedded NUL bytes (the C parser truncates fields at NUL). + 3. Fold smart double quotes (curly, guillemet, double-prime) to ASCII ``"``. + 4. Per-row repair when one rogue delimiter is embedded in a field that + looks like currency or thousands-grouped digits — quote that field. + + Single curly quotes and other punctuation are deferred to the cell-level + cleaner; this layer only fixes things that break CSV *parsing*. + """ + actions: list[RepairAction] = [] + unrepairable: list[int] = [] + data = raw + + # 1. BOM + if data.startswith(b"\xef\xbb\xbf"): + data = data[3:] + actions.append(RepairAction(kind="strip_bom", line=None, detail="UTF-8 BOM removed")) + + # 2. NUL + if strip_nul and b"\x00" in data: + before = data.count(b"\x00") + data = data.replace(b"\x00", b"") + actions.append(RepairAction( + kind="strip_nul", line=None, + detail=f"removed {before} NUL byte(s)", + )) + + # Decode for character-level work. + try: + text = data.decode(encoding) + except (UnicodeDecodeError, LookupError): + text = data.decode("utf-8", errors="replace") + actions.append(RepairAction( + kind="decode_replaced", line=None, + detail=f"decode errors under {encoding}; replaced with U+FFFD", + )) + + # 3. Smart double quotes + if fold_quotes: + folded = text.translate(_CSV_SMART_QUOTE_TRANS) + if folded != text: + # Count is approximate (distinct mapped chars combined). + n = sum(1 for a, b in zip(text, folded) if a != b) + actions.append(RepairAction( + kind="fold_smart_quote", line=None, + detail=f"replaced {n} smart double-quote char(s) with ASCII '\"'", + )) + text = folded + + # 4. Per-row delimiter repair + if repair_delims: + text, row_actions, unrepairable = _repair_rows(text, delimiter) + actions.extend(row_actions) + + return RepairResult( + repaired_bytes=text.encode("utf-8"), + actions=actions, + unrepairable_lines=unrepairable, + ) + + +def _repair_rows( + text: str, delimiter: str, +) -> tuple[str, list[RepairAction], list[int]]: + """Per-line field-count repair. Operates on already-decoded text.""" + actions: list[RepairAction] = [] + unrepairable: list[int] = [] + + reader = csv.reader(io.StringIO(text), delimiter=delimiter) + rows = list(reader) + if not rows: + return text, actions, unrepairable + + expected = len(rows[0]) + repaired_rows: list[list[str]] = [rows[0]] + needs_rewrite = False + + for idx, row in enumerate(rows[1:], start=2): # 1-indexed; header is line 1 + if len(row) == expected or not row: + repaired_rows.append(row) + continue + if len(row) > expected: + fixed = _repair_extra_field_row(row, expected, delimiter) + if fixed is not None: + repaired_rows.append(fixed) + needs_rewrite = True + actions.append(RepairAction( + kind="quote_unquoted_delim", line=idx, + detail=( + f"line {idx}: merged adjacent fields to fix " + f"unquoted '{delimiter}' (saw {len(row)} fields, " + f"expected {expected})" + ), + )) + continue + unrepairable.append(idx) + repaired_rows.append(row) + else: + # Too few fields: leave alone, log info-level only. + unrepairable.append(idx) + repaired_rows.append(row) + + if not needs_rewrite: + return text, actions, unrepairable + + buf = io.StringIO() + writer = csv.writer(buf, delimiter=delimiter, lineterminator="\n") + for row in repaired_rows: + writer.writerow(row) + return buf.getvalue(), actions, unrepairable + + +def read_csv_repaired( + path: str | Path, + *, + encoding: Optional[str] = None, + delimiter: Optional[str] = None, + header_row: Optional[int] = None, + fold_quotes: bool = True, + strip_nul: bool = True, + repair_delims: bool = True, +) -> tuple[pd.DataFrame, RepairResult]: + """Read a CSV after running :func:`repair_bytes` on the raw file. + + Returns ``(df, repair_result)`` so callers can surface the action log. + """ + p = Path(path) + enc = encoding or detect_encoding(p) + delim = delimiter or detect_delimiter(p, enc) + raw = p.read_bytes() + + repair = repair_bytes( + raw, encoding=enc, delimiter=delim, + fold_quotes=fold_quotes, strip_nul=strip_nul, repair_delims=repair_delims, + ) + + hdr = header_row if header_row is not None else 0 + df = pd.read_csv( + io.BytesIO(repair.repaired_bytes), + encoding="utf-8", + delimiter=delim, + header=hdr, + dtype=str, + keep_default_na=False, + on_bad_lines="warn", + ) + if repair.actions: + logger.info("Pre-parse repair on {}: {}", p.name, repair.summary()) + return df, repair diff --git a/tests/test_io.py b/tests/test_io.py index eae5620..598b5ae 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -1,5 +1,7 @@ """Tests for src.core.io — file reading, encoding/delimiter detection.""" +import io + import pandas as pd import pytest from pathlib import Path @@ -11,6 +13,8 @@ from src.core.io import ( read_file, write_file, list_sheets, + repair_bytes, + read_csv_repaired, ) @@ -128,3 +132,98 @@ class TestListSheets: simple_df.to_excel(writer, sheet_name="Sheet2", index=False) sheets = list_sheets(path) assert sheets == ["Sheet1", "Sheet2"] + + +# --------------------------------------------------------------------------- +# Pre-parse repair +# --------------------------------------------------------------------------- + +class TestRepairBytes: + def test_strips_bom(self): + raw = b"\xef\xbb\xbfid,name\n1,Alice\n" + result = repair_bytes(raw) + assert result.repaired_bytes == b"id,name\n1,Alice\n" + assert any(a.kind == "strip_bom" for a in result.actions) + + def test_strips_nul_bytes(self): + raw = b"id,name\n1,Hel\x00lo\n2,Wo\x00\x00rld\n" + result = repair_bytes(raw) + assert b"\x00" not in result.repaired_bytes + nul_action = next(a for a in result.actions if a.kind == "strip_nul") + assert "3" in nul_action.detail # 3 NUL bytes + + def test_folds_smart_double_quotes(self): + raw = "id,note\n1,“hello”\n2,«bonjour»\n".encode("utf-8") + result = repair_bytes(raw) + text = result.repaired_bytes.decode("utf-8") + assert "“" not in text and "”" not in text + assert "«" not in text and "»" not in text + assert any(a.kind == "fold_smart_quote" for a in result.actions) + + def test_does_not_fold_curly_singles(self): + # Single curly quotes should pass through; cell-level cleaner handles them. + raw = "id,note\n1,it’s fine\n".encode("utf-8") + result = repair_bytes(raw) + text = result.repaired_bytes.decode("utf-8") + assert "’" in text + assert not any(a.kind == "fold_smart_quote" for a in result.actions) + + def test_no_changes_when_clean(self): + raw = b"id,name\n1,Alice\n2,Bob\n" + result = repair_bytes(raw) + assert result.repaired_bytes == raw + assert result.actions == [] + assert result.changed is False + + def test_repairs_unquoted_currency_comma(self): + raw = ( + b"id,price,qty\n" + b"1,100,5\n" + b"2, $1,500.00 ,7\n" # 4 fields instead of 3 + b"3,200,9\n" + ) + result = repair_bytes(raw) + # After repair, every row should have 3 fields when re-parsed. + df = pd.read_csv(io.BytesIO(result.repaired_bytes)) + assert list(df.columns) == ["id", "price", "qty"] + assert len(df) == 3 + assert any(a.kind == "quote_unquoted_delim" and a.line == 3 for a in result.actions) + + def test_logs_unrepairable_when_ambiguous(self): + # Two adjacent merge candidates -> bail out, log unrepairable. + raw = ( + b"id,a,b,c\n" + b"1,foo,bar,baz\n" + b"2,1,2,3,4,5\n" # way too many extras, no clear merge + ) + result = repair_bytes(raw) + assert 3 in result.unrepairable_lines + + def test_summary_groups_by_kind(self): + raw = b"\xef\xbb\xbfid,name\n1,Hel\x00lo\n" + result = repair_bytes(raw) + summary = result.summary() + assert summary.get("strip_bom") == 1 + assert summary.get("strip_nul") == 1 + + +class TestReadCsvRepaired: + def test_recovers_malformed_currency_row(self, tmp_path): + f = tmp_path / "bad.csv" + f.write_bytes( + b"id,price,qty\n" + b"1,100,5\n" + b"2, $1,500.00 ,7\n" + b"3,200,9\n" + ) + df, repair = read_csv_repaired(f) + assert len(df) == 3 + assert "1,500.00" in df.iloc[1]["price"] + assert repair.changed + + def test_passthrough_when_clean(self, tmp_path): + f = tmp_path / "ok.csv" + f.write_bytes(b"id,name\n1,Alice\n2,Bob\n") + df, repair = read_csv_repaired(f) + assert len(df) == 2 + assert repair.changed is False