"""File I/O: encoding/delimiter detection, CSV/Excel reading, output writing.""" from __future__ import annotations import csv import io import re from dataclasses import dataclass, field from pathlib import Path from typing import Generator, Optional import pandas as pd from charset_normalizer import from_bytes from loguru import logger # --------------------------------------------------------------------------- # Encoding detection # --------------------------------------------------------------------------- # charset-normalizer often picks an Eastern-European code page (cp1250, # cp1258) for byte-equivalent Western content, mac_iceland over mac_roman # in the Mac family, and shift_jis_2004 for short Cyrillic samples. The # arbiter below resolves these specific false positives without # overruling the detector when its top pick is genuinely the right # answer. # # Mapping is *over-picked encoding* → *more plausible substitutes (in # priority order)*. We accept either the candidate's primary encoding # name or any of its ``could_be_from_charset`` aliases. _ENCODING_FALLBACKS: dict[str, tuple[str, ...]] = { "cp1250": ("cp1252", "latin_1", "iso8859_15", "iso8859_2"), "cp1258": ("iso8859_2", "cp1250", "cp1252"), "mac_iceland": ("mac_roman",), "shift_jis_2004": ("koi8_r", "cp1251", "cp1252", "iso8859_2"), "shift_jisx0213": ("koi8_r", "cp1251", "cp1252", "iso8859_2"), } def _arbitrate_charset_match(matches) -> Optional[str]: """Pick the most plausible encoding from a charset-normalizer match list. Two distinguishing signals separate a false positive from a real pick when the top encoding is one we've recorded as over-picked: * If the top match's own ``could_be_from_charset`` alias list already names a preferred fallback (e.g. cp1250 with cp1252 as a sibling), we substitute — charset-normalizer has flagged the byte content as ambiguous. * If the second-ranked match shares identical *chaos* and *coherence* scores with the top — meaning the bytes decode byte-equivalently under both — we substitute when the second match is the preferred Western default. When neither signal fires (real cp1250 / cp1258 content where charset-normalizer is genuinely confident), the top pick is returned unchanged. """ ranked = list(matches) if not ranked: return None top = ranked[0] top_enc = top.encoding.lower() fallbacks = _ENCODING_FALLBACKS.get(top_enc) if not fallbacks: return top_enc # The decisive signal: a lower-ranked candidate that ties the top # pick on both chaos and coherence has decoded the bytes # *identically*, so the choice between them is byte-equivalent. When # one of those tied candidates is a preferred Western default, # substitute. We walk the fallbacks in priority order so the most # canonical alternative wins (cp1252 over iso8859_2 over iso8859_15). # # When no tied candidate matches, we leave the top pick alone — that # is the "real cp1250 / cp1258 content" path where charset-normalizer # is genuinely confident. top_chaos = getattr(top, "chaos", None) top_coherence = getattr(top, "coherence", None) tied: list = [] for m in ranked[1:]: if m.chaos != top_chaos or m.coherence != top_coherence: break # ranked list is monotonically less confident tied.append(m) if tied: for preferred in fallbacks: for m in tied: candidates = { m.encoding.lower(), *(a.lower() for a in m.could_be_from_charset), } if preferred in candidates: return preferred # No tied alternative — but charset-normalizer occasionally folds # the more popular Western alias into the *top pick's own* alias # list (cp1250 with cp1252 listed alongside). When that happens, # prefer the canonical Western form. top_aliases = {a.lower() for a in top.could_be_from_charset} for preferred in fallbacks: # Only honour an in-alias swap if the preferred encoding is a # different family from the top pick (cp1252 swap from cp1250 is # legitimate; iso8859_2 swap from cp1250 is not — they differ # bytewise on accented Eastern letters). if preferred in top_aliases and not _same_byte_family(top_enc, preferred): return preferred return top_enc # --------------------------------------------------------------------------- # Language-aware probe: distinguish KOI8-R from Shift_JIS, ISO-8859-2 from # cp1258 when charset-normalizer cannot. # --------------------------------------------------------------------------- # Unicode ranges that uniquely identify each language family. A candidate # encoding "wins" the probe when its decoding of the raw bytes produces # the highest *coverage ratio* (non-ASCII letters in the target range # divided by total non-ASCII letters). _CYRILLIC_RANGE = (0x0400, 0x04FF) _EE_LATIN_LETTERS = frozenset( "ąćęłńóśźżĄĆĘŁŃÓŚŹŻ" # Polish "áčďéěíňóřšťúůýžÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ" # Czech "áéíóöőúüűÁÉÍÓÖŐÚÜŰ" # Hungarian "äčďéíĺľňóôŕšťúýžÄČĎÉÍĹĽŇÓÔŔŠŤÚÝŽ" # Slovak ) # Encodings to probe when charset-normalizer fingerprints the file as # Japanese (a frequent misfire on short Cyrillic samples whose byte # patterns happen to coincide with shift_jis lead bytes). _CYRILLIC_PROBES: tuple[str, ...] = ("koi8_r", "cp1251", "iso8859_5") _EE_LATIN_PROBES: tuple[str, ...] = ("iso8859_2", "cp1250") def _cyrillic_coverage(text: str) -> float: """Fraction of *all non-ASCII characters* in *text* that are Cyrillic letters. Dividing by all non-ASCII (rather than only letters) penalises decodings that produce mostly symbols/box-drawing with a sprinkle of incidental Cyrillic glyphs — a real KOI8-R Russian text scores >0.7 because nearly every non-ASCII codepoint IS a Cyrillic letter, whereas a Japanese-shift_jis-decoded-as-koi8r text scores low. """ non_ascii = [c for c in text if ord(c) >= 0x80] if not non_ascii: return 0.0 cyr = sum( 1 for c in non_ascii if c.isalpha() and _CYRILLIC_RANGE[0] <= ord(c) <= _CYRILLIC_RANGE[1] ) return cyr / len(non_ascii) def _ee_latin_coverage(text: str) -> float: """Fraction of *all non-ASCII characters* in *text* that look like EE Latin.""" non_ascii = [c for c in text if ord(c) >= 0x80] if not non_ascii: return 0.0 ee = sum(1 for c in non_ascii if c in _EE_LATIN_LETTERS) return ee / len(non_ascii) def _probe_language(raw: bytes, top_enc: str) -> Optional[str]: """Try language-specific decodings when charset-normalizer guessed wrong. Returns a better encoding name when one of the probe candidates decodes the bytes into a language-coherent text (Cyrillic ≥ 70 % for Cyrillic probes, EE-Latin ≥ 50 % for EE Latin probes), else None. """ if top_enc in {"shift_jis_2004", "shift_jisx0213", "shift_jis", "cp932"}: probes, scorer, threshold = _CYRILLIC_PROBES, _cyrillic_coverage, 0.70 elif top_enc in {"cp1258", "iso8859_16"}: probes, scorer, threshold = _EE_LATIN_PROBES, _ee_latin_coverage, 0.50 else: return None # Score the top pick first. If the top encoding *itself* decodes the # bytes into reasonable Cyrillic / EE Latin text, the bytes are # genuinely in that script — don't override. try: top_decoded = raw.decode(top_enc, errors="replace") top_score = scorer(top_decoded) except LookupError: top_score = 0.0 best_enc: Optional[str] = None best_score = 0.0 for enc in probes: try: decoded = raw.decode(enc) except (UnicodeDecodeError, LookupError): continue score = scorer(decoded) if score > best_score: best_score = score best_enc = enc # Require both an absolute coverage threshold AND a clear margin over # the top pick — otherwise we risk hijacking real Japanese / Vietnamese # content whose decode happens to produce a few Cyrillic / EE-Latin # glyphs by coincidence. if best_enc and best_score >= threshold and best_score >= top_score + 0.30: return best_enc return None # Pairs of encoding names whose byte ranges DIFFER for accented letters. # Used to refuse spurious in-alias swaps (e.g. cp1250 vs iso8859_2 are # byte-distinct even though charset-normalizer lists them as siblings). _SAME_FAMILY: set[frozenset[str]] = { frozenset({"cp1250", "iso8859_2"}), frozenset({"mac_iceland", "mac_turkish"}), frozenset({"shift_jis_2004", "shift_jisx0213"}), } def _same_byte_family(a: str, b: str) -> bool: return frozenset({a, b}) in _SAME_FAMILY def detect_encoding(path: Path, sample_bytes: int = 65_536) -> str: """Detect file encoding by reading the first *sample_bytes*. Returns the best-guess encoding name (e.g. ``utf-8``, ``windows-1252``). Falls back to ``utf-8`` when detection is inconclusive. Reads only the head bytes (does not slurp the file). On a 1 GB input this is the difference between ~50 ms and a multi-GB allocation. """ with Path(path).open("rb") as fh: raw = fh.read(sample_bytes) if not raw: return "utf-8" # Check BOM first if raw[:3] == b"\xef\xbb\xbf": # A "lying" BOM: file claims utf-8 but the body bytes don't decode # as utf-8. Fall through to charset detection on the BOM-stripped # body so we don't hand back utf-8-sig that will then fail to read. body = raw[3:] try: body.decode("utf-8") return "utf-8-sig" except UnicodeDecodeError: logger.debug( "detect_encoding({}): file has UTF-8 BOM but body is not " "valid UTF-8 — falling through to charset detection", Path(path).name, ) raw = body elif raw[:2] in (b"\xff\xfe", b"\xfe\xff"): return "utf-16" # Strict UTF-8 wins. charset_normalizer fingerprints small files # dominated by short non-ASCII sequences (e.g. zero-width chars at # U+200B-class) as mac_latin2 / cp1250 / similar — but if the bytes # decode cleanly as UTF-8, that's the right answer regardless. try: raw.decode("utf-8") return "utf-8" except UnicodeDecodeError: pass matches = from_bytes(raw) enc = _arbitrate_charset_match(matches) if enc is None: return "utf-8" # Language-aware probe runs after the arbiter so we only spend cycles # on the cases where charset-normalizer fingerprinted the bytes as a # codepage that doesn't match the apparent script. Returns a better # encoding only when the probe finds a high-coverage match. probed = _probe_language(raw, enc) if probed: logger.debug( "detect_encoding({}): language probe overrode {} → {}", Path(path).name, enc, probed, ) enc = probed if enc in ("ascii", "us-ascii"): enc = "utf-8" return enc # --------------------------------------------------------------------------- # Delimiter detection # --------------------------------------------------------------------------- _COMMON_DELIMITERS = [",", "\t", ";", "|"] def detect_delimiter(path: Path, encoding: str = "utf-8") -> str: """Sniff the delimiter from the first 20 lines of a text file. Falls back to comma if csv.Sniffer cannot decide. """ raw_path = Path(path) lines: list[str] = [] with raw_path.open("r", encoding=encoding, errors="replace") as fh: for _ in range(20): line = fh.readline() if not line: break lines.append(line) if not lines: return "," sample = "".join(lines) try: dialect = csv.Sniffer().sniff(sample, delimiters="".join(_COMMON_DELIMITERS)) return dialect.delimiter except csv.Error: return "," # --------------------------------------------------------------------------- # Header-row detection # --------------------------------------------------------------------------- def detect_header_row(path: Path, encoding: str = "utf-8", delimiter: str = ",", max_scan: int = 20) -> int: """Return the 0-based index of the likely header row. Heuristic: the first row where *every* cell looks like a column name (non-numeric, non-empty string). Falls back to 0. """ raw_path = Path(path) with raw_path.open("r", encoding=encoding, errors="replace") as fh: reader = csv.reader(fh, delimiter=delimiter) for idx, row in enumerate(reader): if idx >= max_scan: break if not row: continue # Header heuristic: # - every non-empty cell looks like a header; # - at least 2 non-empty cells (or just 1 in a single-column # file). Without the count check, blank rows match # vacuously (``all([])`` is True) and metadata banners # like ``["Report 2024", "", ""]`` claim row 0 falsely. non_empty = [cell for cell in row if cell.strip()] min_required = 1 if len(row) <= 1 else 2 if ( len(non_empty) >= min_required and all(_looks_like_header(cell) for cell in non_empty) ): return idx return 0 def _looks_like_header(value: str) -> bool: """True if *value* looks like a column header, not a data value.""" v = value.strip() if not v: return False # Pure numbers are not headers try: float(v.replace(",", "")) return False except ValueError: pass return True # --------------------------------------------------------------------------- # Excel helpers # --------------------------------------------------------------------------- def list_sheets(path: Path) -> list[str]: """Return sheet names from an Excel workbook.""" xl = pd.ExcelFile(path, engine="openpyxl") return xl.sheet_names # --------------------------------------------------------------------------- # Reading # --------------------------------------------------------------------------- def read_file( path: str | Path, *, encoding: Optional[str] = None, delimiter: Optional[str] = None, header_row: Optional[int] = None, sheet_name: Optional[str | int] = 0, chunk_size: Optional[int] = None, repair: bool = True, ) -> pd.DataFrame | Generator[pd.DataFrame, None, None]: """Read a CSV, TSV, or Excel file into a DataFrame. Parameters ---------- path : file path encoding : override detected encoding (CSV only) delimiter : override detected delimiter (CSV only) header_row : 0-based row index for the header; auto-detected if *None* sheet_name : Excel sheet (name or 0-based index). Ignored for CSV. chunk_size : if set, return a generator of DataFrames (CSV only). When *chunk_size* is set, *repair* is forced off because the pre-parse pass loads the entire file into memory. repair : run :func:`repair_bytes` over the raw CSV before parsing (default ``True``). Excel files always skip this step. Pass ``repair=False`` when you specifically need pandas' raw view of the input. Returns a DataFrame (or generator when *chunk_size* is set). """ from .errors import FileAccessError, InputValidationError filepath = Path(path) if not filepath.exists(): raise FileAccessError( "Input file not found", path=filepath, operation="read_file", suggestion=( f"Check the path is correct. Parent directory " f"{filepath.parent} " f"{'exists' if filepath.parent.exists() else 'does NOT exist'}." ), ) if chunk_size is not None and chunk_size <= 0: raise InputValidationError( f"chunk_size must be positive; got {chunk_size}", operation="read_file", suggestion="Pass a positive integer (e.g., chunk_size=10000) or omit for non-streaming reads.", ) suffix = filepath.suffix.lower() logger.info( "read_file: {} (suffix={}, chunk_size={})", filepath, suffix, chunk_size, ) if suffix in (".xlsx", ".xls"): return _read_excel(filepath, header_row=header_row, sheet_name=sheet_name) else: return _read_csv( filepath, encoding=encoding, delimiter=delimiter, header_row=header_row, chunk_size=chunk_size, repair=repair, ) def _read_csv( path: Path, *, encoding: Optional[str] = None, delimiter: Optional[str] = None, header_row: Optional[int] = None, chunk_size: Optional[int] = None, repair: bool = True, ) -> pd.DataFrame | Generator[pd.DataFrame, None, None]: enc = encoding or detect_encoding(path) delim = delimiter or detect_delimiter(path, enc) hdr = header_row if header_row is not None else detect_header_row(path, enc, delim) logger.debug( "Reading CSV {} (encoding={}, delimiter={!r}, header_row={}, repair={})", path.name, enc, delim, hdr, repair, ) if chunk_size: # Streaming reads can't share memory with the repair pass; fall back # to direct pandas read so chunked workflows on huge files still # work. return pd.read_csv( filepath_or_buffer=path, encoding=enc, delimiter=delim, header=hdr, dtype=str, keep_default_na=False, on_bad_lines="warn", chunksize=chunk_size, ) if repair: raw = path.read_bytes() repair_result = repair_bytes(raw, encoding=enc, delimiter=delim) if repair_result.changed: logger.info( "Pre-parse repair on {}: {}", path.name, repair_result.summary(), ) if repair_result.unrepairable_lines: logger.warning( "Pre-parse repair on {}: {} unrepairable line(s) at {}", path.name, len(repair_result.unrepairable_lines), repair_result.unrepairable_lines[:10], ) return pd.read_csv( io.BytesIO(repair_result.repaired_bytes), encoding="utf-8", delimiter=delim, header=hdr, dtype=str, keep_default_na=False, on_bad_lines="warn", ) return pd.read_csv( filepath_or_buffer=path, encoding=enc, delimiter=delim, header=hdr, dtype=str, keep_default_na=False, on_bad_lines="warn", ) def _read_excel( path: Path, *, header_row: Optional[int] = None, sheet_name: Optional[str | int] = 0, ) -> pd.DataFrame: hdr = ( header_row if header_row is not None else _detect_excel_header_row(path, sheet_name) ) logger.debug("Reading Excel {} (sheet={}, header_row={})", path.name, sheet_name, hdr) try: return pd.read_excel( path, sheet_name=sheet_name, header=hdr, dtype=str, keep_default_na=False, engine="openpyxl", ) except ValueError as e: # pandas raises ValueError for "Worksheet named 'X' not found". from .errors import FileFormatError raise FileFormatError( "Could not read Excel sheet", path=path, operation=f"open sheet {sheet_name!r}", cause=e, suggestion=( "Check the sheet name exists. List available sheets with " "`from src.core.io import list_sheets; list_sheets(path)`." ), ) from e except Exception as e: # openpyxl can raise BadZipFile, InvalidFileException for # corrupt / non-xlsx inputs. Wrap with file context. from .errors import FileFormatError raise FileFormatError( "Excel file could not be parsed", path=path, operation="pd.read_excel", cause=e, suggestion=( "Confirm the file is a valid .xlsx workbook and not " "renamed/corrupted. Try opening it in Excel to verify." ), ) from e def _detect_excel_header_row( path: Path, sheet_name: Optional[str | int] = 0, max_scan: int = 20, ) -> int: """Mirror of :func:`detect_header_row` for Excel workbooks. Scans the first *max_scan* rows of *sheet_name* in read-only mode (so a 100 MB workbook doesn't get fully materialized) and returns the index of the first row where every non-empty cell looks like a column header. Falls back to 0 on parse failure (logged at debug — the caller's ``pd.read_excel`` will raise a useful FileFormatError with full context). """ try: from openpyxl import load_workbook from openpyxl.utils.exceptions import InvalidFileException except ImportError as e: logger.debug("openpyxl unavailable for header detection: {}", e) return 0 wb = None try: wb = load_workbook(path, read_only=True, data_only=True) if isinstance(sheet_name, int): names = wb.sheetnames target = names[sheet_name] if 0 <= sheet_name < len(names) else names[0] elif isinstance(sheet_name, str): target = sheet_name if sheet_name in wb.sheetnames else wb.sheetnames[0] else: target = wb.sheetnames[0] ws = wb[target] for idx, row in enumerate(ws.iter_rows(values_only=True)): if idx >= max_scan: break cells = ["" if v is None else str(v) for v in row] non_empty = [c for c in cells if c.strip()] min_required = 1 if len(cells) <= 1 else 2 if ( len(non_empty) >= min_required and all(_looks_like_header(c) for c in non_empty) ): return idx return 0 except (InvalidFileException, KeyError, IndexError, OSError) as e: # Corrupt workbook, missing sheet name, or read failure — fall # back to row 0 and let pd.read_excel raise the user-facing error # with full context. logger.debug( "Excel header detection failed for {} (sheet={}): {}", path, sheet_name, e, ) return 0 finally: if wb is not None: wb.close() # --------------------------------------------------------------------------- # Writing # --------------------------------------------------------------------------- def write_file( df: pd.DataFrame, path: str | Path, *, file_format: Optional[str] = None, encoding: str = "utf-8-sig", delimiter: Optional[str] = None, ) -> Path: """Write a DataFrame to CSV or Excel. Parameters ---------- df : DataFrame to write path : output file path file_format : ``"csv"``, ``"tsv"``, or ``"xlsx"``; auto-detected from *path* suffix if *None* encoding : output encoding (default ``utf-8-sig`` for Windows Excel compat) delimiter : field separator for delimited output. Defaults to ``,`` for ``.csv``, ``\\t`` for ``.tsv``, and the explicit value otherwise. Ignored for Excel formats. Returns the resolved output Path. """ from .errors import ensure_dataframe, wrap_file_write ensure_dataframe(df, function="write_file") out = Path(path) fmt = file_format or out.suffix.lstrip(".").lower() try: if fmt in ("xlsx", "xls"): df.to_excel(out, index=False, engine="openpyxl") else: sep = delimiter if delimiter is not None else ( "\t" if fmt == "tsv" else "," ) df.to_csv(out, index=False, encoding=encoding, sep=sep) except (OSError, PermissionError) as e: raise wrap_file_write(out, f"write_file (format={fmt})", e) from e logger.info("Wrote {} rows × {} cols to {}", len(df), len(df.columns), out) return out # --------------------------------------------------------------------------- # Pre-parse repair (CSV / delimited text) # --------------------------------------------------------------------------- # # Some pollution patterns confuse pandas' parser before the cleaner can ever # see the data. Smart double quotes inside an unquoted field, NUL bytes, and # unquoted delimiters embedded in numeric/currency cells all cause structural # parse failures or silent truncation. These helpers operate on raw bytes # (or decoded text) and produce a parseable byte stream plus an audit log. # # Design notes: # - Single curly quotes (U+2018/U+2019) are NOT folded here: they don't # conflict with the default CSV quote char and the cell-level cleaner # handles them more accurately. Only double-quote-equivalents are folded. # - Delimiter-row repair only attempts the unambiguous case (one extra # field, one merge candidate that looks like currency/thousands-sep). # Anything else is logged as unrepairable and the line is left alone. # Smart double-quote characters that confuse CSV parsing. _CSV_SMART_QUOTE_CHARS: tuple[str, ...] = ( "“", # LEFT DOUBLE QUOTATION MARK "”", # RIGHT DOUBLE QUOTATION MARK "„", # DOUBLE LOW-9 QUOTATION MARK "‟", # DOUBLE HIGH-REVERSED-9 QUOTATION MARK "«", # LEFT-POINTING DOUBLE ANGLE QUOTATION MARK "»", # RIGHT-POINTING DOUBLE ANGLE QUOTATION MARK "″", # DOUBLE PRIME ) # ``str.maketrans`` builds a codepoint→codepoint dict the C translate # uses directly. Iterating that dict yields ``int`` codepoints, which is # why we keep ``_CSV_SMART_QUOTE_CHARS`` separately for the ``.count`` # loop in the non-UTF-8 fold path. _CSV_SMART_QUOTE_TRANS = str.maketrans({c: '"' for c in _CSV_SMART_QUOTE_CHARS}) # Byte-level fast path: same characters but as UTF-8 byte sequences. Used # when the file is already valid UTF-8 — folds in C without ever # materializing a multi-GB decoded string. _CSV_SMART_QUOTE_BYTE_MAP: list[tuple[bytes, bytes]] = [ ("“".encode("utf-8"), b'"'), # E2 80 9C ("”".encode("utf-8"), b'"'), # E2 80 9D ("„".encode("utf-8"), b'"'), # E2 80 9E ("‟".encode("utf-8"), b'"'), # E2 80 9F ("«".encode("utf-8"), b'"'), # C2 AB ("»".encode("utf-8"), b'"'), # C2 BB ("″".encode("utf-8"), b'"'), # E2 80 B3 ] # Cheap probe: if none of these sentinel pairs appear in the bytes, # skip the smart-quote stage entirely. Probing one byte per family hits # the C-implemented ``bytes.__contains__`` which is sub-millisecond on a # 1 GB buffer. _CSV_SMART_QUOTE_PROBES = (b"\xe2\x80", b"\xc2\xab", b"\xc2\xbb") # A merged value is "currency-shaped" when it looks like $1,500.00 or 1.234,56 # (i.e., a sequence of digits, separators, and an optional currency sigil). _CURRENCY_SHAPED = re.compile(r"^\s*[$€£¥]?\s*\d{1,3}([,.\s]\d{3})+([,.]\d+)?\s*$") # Or a plain decimal with thousands grouping (no currency sigil). _THOUSANDS_SHAPED = re.compile(r"^\s*\d{1,3}(,\d{3})+(\.\d+)?\s*$") @dataclass class RepairAction: """One repair the pre-parse pass made to the raw bytes.""" kind: str # e.g. "strip_bom", "strip_nul", "fold_smart_quote", # "quote_unquoted_delim" line: Optional[int] # 1-indexed source line; None for file-level detail: str @dataclass class RepairResult: """Output of :func:`repair_bytes`.""" repaired_bytes: bytes actions: list[RepairAction] = field(default_factory=list) unrepairable_lines: list[int] = field(default_factory=list) @property def changed(self) -> bool: return bool(self.actions) def summary(self) -> dict[str, int]: """Action count grouped by kind.""" out: dict[str, int] = {} for a in self.actions: out[a.kind] = out.get(a.kind, 0) + 1 return out def _merge_score(left: str, right: str, delimiter: str) -> int: """Rank how plausible it is that ``left+delimiter+right`` is one field. Higher = more confident. ``0`` means the merge is implausible. - 3: merged value matches a currency-shaped or thousands-shaped pattern. - 1: loose heuristic (left has $/€/digit and right starts with digit, and delimiter is one of ``,``/``.``). - 0: no signal. Tiering matters because ``" $1,500.00 ,7"`` has two raw candidates (``$1+500.00`` and ``500.00+7``) but only the first produces a strict currency shape. """ merged = f"{left}{delimiter}{right}" if _CURRENCY_SHAPED.match(merged) or _THOUSANDS_SHAPED.match(merged): return 3 if delimiter in ".,": left_has_money = bool(re.search(r"[$€£¥]\s*\d", left)) or bool(re.search(r"\d\s*$", left)) right_starts_digits = bool(re.match(r"\s*\d", right)) if left_has_money and right_starts_digits: return 1 return 0 def _repair_extra_field_row( fields: list[str], expected: int, delimiter: str, ) -> Optional[list[str]]: """Try to merge one adjacent pair so the row has *expected* fields. Returns the repaired field list, or *None* if no unambiguous merge exists. """ if len(fields) != expected + 1: return None scores = [ (i, _merge_score(fields[i], fields[i + 1], delimiter)) for i in range(len(fields) - 1) ] best = max(s for _, s in scores) if best == 0: return None winners = [i for i, s in scores if s == best] if len(winners) != 1: return None i = winners[0] merged = f"{fields[i]}{delimiter}{fields[i + 1]}" return fields[:i] + [merged] + fields[i + 2:] def repair_bytes( raw: bytes, *, encoding: str = "utf-8", delimiter: str = ",", fold_quotes: bool = True, strip_nul: bool = True, repair_delims: bool = True, normalize_line_endings: bool = True, ) -> RepairResult: """Pre-parse repair on a raw delimited file. Performs (in order, each toggleable): 1. Strip a leading UTF-8 BOM. 2. Strip embedded NUL bytes (the C parser truncates fields at NUL). 3. Normalize line endings (CRLF and bare CR to LF). Bare CR confuses the C parser ("new-line character seen in unquoted field"); the text-cleaner contract also calls for LF inside multi-line cells. 4. Fold smart double quotes (curly, guillemet, double-prime) to ASCII ``"``. 5. Per-row repair when one rogue delimiter is embedded in a field that looks like currency or thousands-grouped digits — quote that field. Single curly quotes and other punctuation are deferred to the cell-level cleaner; this layer only fixes things that break CSV *parsing*. """ actions: list[RepairAction] = [] unrepairable: list[int] = [] data = raw # If the input is a UTF-16 / UTF-32 byte stream, transcode it to UTF-8 # up front. UTF-16 ASCII codepoints carry NUL as half of every 16-bit # unit, so the byte-level NUL-strip below would shred the file. Doing # the transcode here means the rest of the repair pipeline operates # on UTF-8 bytes regardless of the source encoding. enc_norm = encoding.lower().replace("-", "_") if encoding else "" is_wide = enc_norm.startswith(("utf_16", "utf_32")) # UTF-16 LE without a BOM that survives detection lands here too. if is_wide: try: decoded = data.decode(encoding) except (UnicodeDecodeError, LookupError): decoded = data.decode("utf-8", errors="replace") actions.append(RepairAction( kind="decode_replaced", line=None, detail=f"decode errors under {encoding}; replaced with U+FFFD", )) # Strip a leading UTF-16 BOM (decoded as U+FEFF) if present. if decoded and decoded[0] == "": decoded = decoded[1:] data = decoded.encode("utf-8") actions.append(RepairAction( kind="transcode_to_utf8", line=None, detail=f"transcoded {encoding} -> utf-8 ({len(raw)}B -> {len(data)}B)", )) encoding = "utf-8" # downstream steps now operate on UTF-8 # 1. BOM if data.startswith(b"\xef\xbb\xbf"): data = data[3:] actions.append(RepairAction(kind="strip_bom", line=None, detail="UTF-8 BOM removed")) # 2. NUL — only meaningful for single-byte / UTF-8 encodings. We've # already transcoded UTF-16/32 to UTF-8 above, so NUL here is genuine # corruption (truncated C strings, half-binary exports), not encoding. if strip_nul and b"\x00" in data: before = data.count(b"\x00") data = data.replace(b"\x00", b"") actions.append(RepairAction( kind="strip_nul", line=None, detail=f"removed {before} NUL byte(s)", )) # 3. Line endings: CRLF and bare CR -> LF. CRLF first so we don't # double-substitute. Done at the byte layer so it survives through # any subsequent decode failure. if normalize_line_endings and (b"\r" in data): n_crlf = data.count(b"\r\n") data = data.replace(b"\r\n", b"\n") n_cr = data.count(b"\r") if n_cr: data = data.replace(b"\r", b"\n") if n_crlf or n_cr: parts = [] if n_crlf: parts.append(f"{n_crlf} CRLF") if n_cr: parts.append(f"{n_cr} bare CR") actions.append(RepairAction( kind="normalize_line_endings", line=None, detail=f"normalized {', '.join(parts)} to LF", )) # Smart-quote fast path: when the bytes are already UTF-8 (which # they are after the wide-encoding transcode above), fold curly / # guillemet / double-prime quotes via ``bytes.replace`` — no decode, # no string allocation. The probe check skips this entirely on the # common case of files with no smart quotes. enc_norm = encoding.lower().replace("-", "_") if encoding else "" is_utf8 = enc_norm in ("utf_8", "utf_8_sig", "utf8", "ascii") smart_folded_bytes = False if fold_quotes and is_utf8: if any(p in data for p in _CSV_SMART_QUOTE_PROBES): replaced_total = 0 for src_bytes, dst in _CSV_SMART_QUOTE_BYTE_MAP: if src_bytes in data: n = data.count(src_bytes) if n: data = data.replace(src_bytes, dst) replaced_total += n if replaced_total: smart_folded_bytes = True actions.append(RepairAction( kind="fold_smart_quote", line=None, detail=f"replaced {replaced_total} smart double-quote char(s) with ASCII '\"'", )) # Always attempt the decode so we catch encoding errors (lying-BOM # case E30 needs the ``decode_replaced`` action to surface as the # ``encoding_decode_failed`` finding). The decode is O(N) memory but # CPython's UTF-8 decoder is C-implemented and runs at GB/s rates. decode_failed = False try: text = data.decode(encoding if not smart_folded_bytes else "utf-8") except (UnicodeDecodeError, LookupError): text = data.decode("utf-8", errors="replace") decode_failed = True actions.append(RepairAction( kind="decode_replaced", line=None, detail=f"decode errors under {encoding}; replaced with U+FFFD", )) # Smart-quote fold for non-UTF-8 inputs that bypassed the byte fast # path (the byte_map only covers the UTF-8 byte sequences). if fold_quotes and not is_utf8: # Count via ``str.count`` (C-implemented, ~GB/s) instead of a # Python-level char-by-char ``zip`` walk. On a 1 GB decoded # string the old path took ~100s of pure CPython iteration; the # ``count`` sum is microseconds because each call runs in C. n = sum(text.count(c) for c in _CSV_SMART_QUOTE_CHARS) if n: text = text.translate(_CSV_SMART_QUOTE_TRANS) actions.append(RepairAction( kind="fold_smart_quote", line=None, detail=f"replaced {n} smart double-quote char(s) with ASCII '\"'", )) # Per-row delimiter repair: skip the costly csv.reader walk on # well-formed files. Triggers, in cheap-to-expensive order: # 1. Currency sigil somewhere in the bytes (``$`` / € / £) — the # classic ``$1,500.00`` case. # 2. Non-comma delimiter (rare in the wild; opt in for safety). # 3. The decoder had to substitute U+FFFD (file is suspicious). # 4. Field-count mismatch: at least one data row has a different # delimiter count than the header. Costs O(N) but only on the # already-decoded ``text``. has_currency_sigil = ( b"$" in data or b"\xe2\x82\xac" in data or b"\xc2\xa3" in data ) needs_row_repair = repair_delims and ( has_currency_sigil or delimiter != "," or decode_failed or _has_field_count_mismatch(text, delimiter) ) if needs_row_repair: text, row_actions, unrepairable = _repair_rows(text, delimiter) actions.extend(row_actions) return RepairResult( repaired_bytes=text.encode("utf-8"), actions=actions, unrepairable_lines=unrepairable, ) def _has_field_count_mismatch(text: str, delimiter: str) -> bool: """Quick scan for rows whose unquoted-delimiter count differs from the header's. Walks the text once with a hand-rolled quote-state machine — much cheaper than running csv.reader, which materializes a list of every row. Returns True at the first mismatch. False negatives are acceptable here: the trigger only decides whether to run the (slower, exact) ``_repair_rows`` pass. False positives just mean we run the slow pass anyway. """ in_quote = False header_count: int | None = None current_count = 0 for ch in text: if ch == '"': in_quote = not in_quote continue if in_quote: continue if ch == delimiter: current_count += 1 continue if ch == "\n": if header_count is None: header_count = current_count elif current_count != header_count and current_count != 0: return True current_count = 0 # Trailing line without a newline. if ( header_count is not None and current_count != 0 and current_count != header_count ): return True return False def _repair_rows( text: str, delimiter: str, ) -> tuple[str, list[RepairAction], list[int]]: """Per-line field-count repair. Operates on already-decoded text.""" actions: list[RepairAction] = [] unrepairable: list[int] = [] reader = csv.reader(io.StringIO(text), delimiter=delimiter) rows = list(reader) if not rows: return text, actions, unrepairable expected = len(rows[0]) repaired_rows: list[list[str]] = [rows[0]] needs_rewrite = False for idx, row in enumerate(rows[1:], start=2): # 1-indexed; header is line 1 if len(row) == expected or not row: repaired_rows.append(row) continue if len(row) > expected: fixed = _repair_extra_field_row(row, expected, delimiter) if fixed is not None: repaired_rows.append(fixed) needs_rewrite = True actions.append(RepairAction( kind="quote_unquoted_delim", line=idx, detail=( f"line {idx}: merged adjacent fields to fix " f"unquoted '{delimiter}' (saw {len(row)} fields, " f"expected {expected})" ), )) continue unrepairable.append(idx) repaired_rows.append(row) else: # Too few fields: leave alone, log info-level only. unrepairable.append(idx) repaired_rows.append(row) if not needs_rewrite: return text, actions, unrepairable buf = io.StringIO() writer = csv.writer(buf, delimiter=delimiter, lineterminator="\n") for row in repaired_rows: writer.writerow(row) return buf.getvalue(), actions, unrepairable def read_csv_repaired( path: str | Path, *, encoding: Optional[str] = None, delimiter: Optional[str] = None, header_row: Optional[int] = None, fold_quotes: bool = True, strip_nul: bool = True, repair_delims: bool = True, ) -> tuple[pd.DataFrame, RepairResult]: """Read a CSV after running :func:`repair_bytes` on the raw file. Returns ``(df, repair_result)`` so callers can surface the action log. """ p = Path(path) enc = encoding or detect_encoding(p) delim = delimiter or detect_delimiter(p, enc) raw = p.read_bytes() repair = repair_bytes( raw, encoding=enc, delimiter=delim, fold_quotes=fold_quotes, strip_nul=strip_nul, repair_delims=repair_delims, ) hdr = header_row if header_row is not None else 0 df = pd.read_csv( io.BytesIO(repair.repaired_bytes), encoding="utf-8", delimiter=delim, header=hdr, dtype=str, keep_default_na=False, on_bad_lines="warn", ) if repair.actions: logger.info("Pre-parse repair on {}: {}", p.name, repair.summary()) return df, repair