From edf6ccf90b18ee8661b06c84175e7333b6408871 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 29 Apr 2026 15:41:36 +0000 Subject: [PATCH] feat(analyze): upload-time data quality analyzer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pure, advisory scan over an uploaded file or DataFrame that returns a list of Finding objects naming each issue, the affected count, and which downstream tool can fix it. The GUI uses this to badge tool nav items at upload; the CLI will print findings as a table or JSON. src/core/analyze.py: Finding dataclass (id, severity, tool, count, description, column, samples) analyze(source, *, sample_rows=1000, repair_result=None) -> list[Finding] - source: DataFrame, path, or str. Path scans first 1000 rows. - When source is a path, runs the same pre-parse repair the tool pages will use; the resulting RepairResult is auto-surfaced as csv_* findings. A caller-supplied repair_result wins so non-default repair flags are respected. Detectors (each independent, samples capped at 5): - smart_punctuation_in_data -> 02 - nbsp_or_unicode_whitespace -> 02 - zero_width_or_invisible -> 02 - dirty_column_headers -> 02 - whitespace_padding -> 02 - null_like_sentinels -> 04 - suspected_mojibake -> 02 (Tier 2) - mixed_case_email_column -> 02 case op - leading_zero_ids -> informational, no tool Helpers: findings_by_tool() for sidebar grouping, to_dict() for JSON. Detectors are decoupled from the GUI display layer — they emit stable tool ids ("02_text_cleaner") and the GUI maps those to display names. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/core/__init__.py | 19 ++ src/core/analyze.py | 531 ++++++++++++++++++++++++++++++++++++++++++ tests/test_analyze.py | 268 +++++++++++++++++++++ 3 files changed, 818 insertions(+) create mode 100644 src/core/analyze.py create mode 100644 tests/test_analyze.py diff --git a/src/core/__init__.py b/src/core/__init__.py index 3966e3b..23e3302 100644 --- a/src/core/__init__.py +++ b/src/core/__init__.py @@ -51,8 +51,18 @@ from .io import ( detect_encoding, detect_header_row, list_sheets, + read_csv_repaired, read_file, + repair_bytes, write_file, + RepairAction, + RepairResult, +) +from .analyze import ( + Finding, + analyze, + findings_by_tool, + to_dict, ) from .config import ( ColumnStrategyConfig, @@ -105,6 +115,15 @@ __all__ = [ "detect_encoding", "detect_delimiter", "detect_header_row", + "read_csv_repaired", + "repair_bytes", + "RepairAction", + "RepairResult", + # Analyzer + "Finding", + "analyze", + "findings_by_tool", + "to_dict", # Config "DeduplicationConfig", "StrategyConfig", diff --git a/src/core/analyze.py b/src/core/analyze.py new file mode 100644 index 0000000..62ac87f --- /dev/null +++ b/src/core/analyze.py @@ -0,0 +1,531 @@ +"""Upload-time data quality analyzer. + +Runs a fast, read-only scan over an uploaded file (or DataFrame) and +returns a list of :class:`Finding` objects. Each finding names the issue, +how many cells/rows are affected, and which downstream tool can address +it. The GUI consumes findings to badge tool nav items; the CLI prints +them as a table. + +The analyzer is *purely advisory*: it never mutates data, never runs a +tool, and is safe to skip. Treat it as a guided onboarding step, not a +hard gate on the upload flow. +""" + +from __future__ import annotations + +import re +import unicodedata +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Iterable, Literal, Optional + +import pandas as pd + +from .io import RepairResult, repair_bytes, detect_encoding, detect_delimiter + +Severity = Literal["info", "warn", "error"] + + +# Tool identifiers — match the 0N_ convention used by the script set. +# Listed here so detectors stay decoupled from the GUI's display layer. +TOOL_TEXT_CLEANER = "02_text_cleaner" +TOOL_MISSING_HANDLER = "04_missing_handler" +TOOL_DEDUPLICATOR = "01_deduplicator" +TOOL_FORMAT_STANDARDIZER = "03_format_standardizer" + + +@dataclass +class Finding: + """One issue the analyzer surfaced. + + Attributes + ---------- + id + Stable identifier (``"smart_quotes_in_data"``); used for GUI lookup + and downloadable JSON exports. Never localized. + severity + ``"info"`` (FYI), ``"warn"`` (likely needs cleanup), + ``"error"`` (will block downstream work). + tool + Tool id that can address the finding, or empty string for purely + informational findings. + count + Number of cells (or rows) affected. + description + Single-sentence human summary used for banners and tooltips. + column + Column name when scoped to one column; ``None`` for whole-frame / + file-level findings. + samples + Up to a handful of ``(row, column, value)`` tuples for the GUI + to render. Cap at five so the JSON export stays compact. + """ + + id: str + severity: Severity + tool: str + count: int + description: str + column: Optional[str] = None + samples: list[tuple[int, str, str]] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Per-cell character classes (kept independent of text_clean to avoid an +# import cycle and to keep the analyzer self-contained). +# --------------------------------------------------------------------------- + +_SMART_QUOTE_CHARS = set("“”‘’„‟«»′″") +_DASH_ELLIPSIS_CHARS = set("–—―−…") +_NBSP_LIKE_CHARS = set("       ") +_ZERO_WIDTH_CHARS = set("​‌‍⁠‎‏­") + +_NULL_LIKE = { + "n/a", "na", "nan", "null", "none", "#n/a", "#na", "-", "--", + "tbd", "unknown", "n.a.", "(null)", +} + +# Mojibake fingerprints: classic UTF-8-as-cp1252 corruptions. +_MOJIBAKE_PATTERNS = re.compile( + r"Ã[©¨¢¤¶Œœ]" # café -> café, étage -> étage etc. + r"|â€[™œžs˜“”]" # don't -> don’t + r"|Â[ -¿]" +) + +_LEADING_ZERO_ID_RE = re.compile(r"^0\d{2,}$") +_DIGITS_RE = re.compile(r"^\d+$") +_EMAIL_LIKE_COL = re.compile(r"e?[ -_]?mail|^email|address$", re.IGNORECASE) + + +def _has_any(text: str, chars: set[str]) -> bool: + return any(c in chars for c in text) + + +def _samples(rows: Iterable[tuple[int, str, str]], limit: int = 5) -> list[tuple[int, str, str]]: + out: list[tuple[int, str, str]] = [] + for item in rows: + out.append(item) + if len(out) >= limit: + break + return out + + +# --------------------------------------------------------------------------- +# Detectors +# --------------------------------------------------------------------------- + +def _detect_smart_punctuation(df: pd.DataFrame) -> list[Finding]: + affected_cells = 0 + sample_rows: list[tuple[int, str, str]] = [] + for col in df.columns: + for row_idx, val in enumerate(df[col].tolist()): + if not isinstance(val, str): + continue + if _has_any(val, _SMART_QUOTE_CHARS) or _has_any(val, _DASH_ELLIPSIS_CHARS): + affected_cells += 1 + if len(sample_rows) < 5: + sample_rows.append((row_idx, str(col), val)) + if not affected_cells: + return [] + return [Finding( + id="smart_punctuation_in_data", + severity="warn", + tool=TOOL_TEXT_CLEANER, + count=affected_cells, + description=( + f"{affected_cells} cell(s) contain curly quotes, em/en dashes, " + f"or ellipsis characters. These break string equality joins and " + f"regex patterns." + ), + samples=sample_rows, + )] + + +def _detect_invisible_chars(df: pd.DataFrame) -> list[Finding]: + nbsp_cells = 0 + zw_cells = 0 + nbsp_samples: list[tuple[int, str, str]] = [] + zw_samples: list[tuple[int, str, str]] = [] + for col in df.columns: + for row_idx, val in enumerate(df[col].tolist()): + if not isinstance(val, str): + continue + if _has_any(val, _NBSP_LIKE_CHARS): + nbsp_cells += 1 + if len(nbsp_samples) < 5: + nbsp_samples.append((row_idx, str(col), val)) + if _has_any(val, _ZERO_WIDTH_CHARS): + zw_cells += 1 + if len(zw_samples) < 5: + zw_samples.append((row_idx, str(col), val)) + findings: list[Finding] = [] + if nbsp_cells: + findings.append(Finding( + id="nbsp_or_unicode_whitespace", + severity="warn", + tool=TOOL_TEXT_CLEANER, + count=nbsp_cells, + description=( + f"{nbsp_cells} cell(s) contain non-breaking or other Unicode " + f"spaces. These look identical to a regular space but break " + f"join keys." + ), + samples=nbsp_samples, + )) + if zw_cells: + findings.append(Finding( + id="zero_width_or_invisible", + severity="warn", + tool=TOOL_TEXT_CLEANER, + count=zw_cells, + description=( + f"{zw_cells} cell(s) contain zero-width or invisible " + f"characters (ZWSP, ZWJ, soft hyphen, BOM, bidi marks)." + ), + samples=zw_samples, + )) + # Headers carry the same risks; flag separately so the user sees that + # df["Email"] vs df["Email​"] is the issue. + bad_headers = [ + c for c in df.columns + if isinstance(c, str) and ( + c != c.strip() + or _has_any(c, _NBSP_LIKE_CHARS) + or _has_any(c, _ZERO_WIDTH_CHARS) + or _has_any(c, _SMART_QUOTE_CHARS) + ) + ] + if bad_headers: + findings.append(Finding( + id="dirty_column_headers", + severity="warn", + tool=TOOL_TEXT_CLEANER, + count=len(bad_headers), + description=( + f"{len(bad_headers)} column header(s) contain whitespace, " + f"smart quotes, or invisible characters. These break " + f"df['col'] lookups." + ), + samples=[(0, h, h) for h in bad_headers[:5]], + )) + return findings + + +def _detect_whitespace_padding(df: pd.DataFrame) -> list[Finding]: + affected = 0 + samples: list[tuple[int, str, str]] = [] + for col in df.columns: + for row_idx, val in enumerate(df[col].tolist()): + if not isinstance(val, str) or not val: + continue + if val != val.strip() or " " in val: + affected += 1 + if len(samples) < 5: + samples.append((row_idx, str(col), val)) + if not affected: + return [] + return [Finding( + id="whitespace_padding", + severity="warn", + tool=TOOL_TEXT_CLEANER, + count=affected, + description=( + f"{affected} cell(s) have leading/trailing whitespace or " + f"multi-space internal runs. Common cause of failed joins." + ), + samples=samples, + )] + + +def _detect_null_like_sentinels(df: pd.DataFrame) -> list[Finding]: + affected = 0 + samples: list[tuple[int, str, str]] = [] + cols_with_sentinels: set[str] = set() + for col in df.columns: + for row_idx, val in enumerate(df[col].tolist()): + if not isinstance(val, str): + continue + if val.strip().lower() in _NULL_LIKE: + affected += 1 + cols_with_sentinels.add(str(col)) + if len(samples) < 5: + samples.append((row_idx, str(col), val)) + if not affected: + return [] + return [Finding( + id="null_like_sentinels", + severity="info", + tool=TOOL_MISSING_HANDLER, + count=affected, + description=( + f"{affected} cell(s) across {len(cols_with_sentinels)} column(s) " + f"look like disguised nulls (N/A, NaN, None, '-'). Decide what " + f"counts as missing in the missing-value handler." + ), + samples=samples, + )] + + +def _detect_mojibake(df: pd.DataFrame) -> list[Finding]: + affected = 0 + samples: list[tuple[int, str, str]] = [] + for col in df.columns: + for row_idx, val in enumerate(df[col].tolist()): + if not isinstance(val, str): + continue + if _MOJIBAKE_PATTERNS.search(val): + affected += 1 + if len(samples) < 5: + samples.append((row_idx, str(col), val)) + if not affected: + return [] + return [Finding( + id="suspected_mojibake", + severity="info", + tool=TOOL_TEXT_CLEANER, + count=affected, + description=( + f"{affected} cell(s) match common UTF-8-as-cp1252 mojibake " + f"patterns (é, ’, etc.). Auto-repair is opt-in (Tier 2)." + ), + samples=samples, + )] + + +def _detect_mixed_case_email(df: pd.DataFrame) -> list[Finding]: + findings: list[Finding] = [] + for col in df.columns: + if not isinstance(col, str) or not _EMAIL_LIKE_COL.search(col): + continue + values = [v for v in df[col].tolist() if isinstance(v, str) and v.strip()] + if not values: + continue + has_upper = any(any(c.isupper() for c in v) for v in values) + has_lower = any(any(c.islower() for c in v) for v in values) + if has_upper and has_lower: + samples = [(i, col, v) for i, v in enumerate(values[:5])] + findings.append(Finding( + id="mixed_case_email_column", + severity="info", + tool=TOOL_TEXT_CLEANER, + count=len(values), + description=( + f"Column '{col}' has mixed case across email values. " + f"Lowercasing emails before dedup avoids false negatives." + ), + column=col, + samples=samples, + )) + return findings + + +def _detect_leading_zero_ids(df: pd.DataFrame) -> list[Finding]: + """Informational: a column where most values are zero-padded digit IDs. + + Worth surfacing because Excel re-opens often strip them — the user + should know they're there before any Excel round-trip. + """ + findings: list[Finding] = [] + for col in df.columns: + values = [v for v in df[col].tolist() if isinstance(v, str) and v.strip()] + if len(values) < 5: + continue + digit_count = sum(1 for v in values if _DIGITS_RE.match(v)) + leading_zero_count = sum(1 for v in values if _LEADING_ZERO_ID_RE.match(v)) + # >80% are zero-padded digit IDs of the same length-ish. + if digit_count >= 0.8 * len(values) and leading_zero_count >= 0.5 * len(values): + samples = [ + (i, str(col), v) + for i, v in enumerate(values[:5]) + if _LEADING_ZERO_ID_RE.match(v) + ][:5] + findings.append(Finding( + id="leading_zero_ids", + severity="info", + tool="", + count=leading_zero_count, + description=( + f"Column '{col}' contains zero-padded numeric IDs " + f"({leading_zero_count}/{len(values)}). Excel will strip " + f"the zeros on round-trip unless saved as text." + ), + column=str(col), + samples=samples, + )) + return findings + + +def _findings_from_repair(repair: RepairResult) -> list[Finding]: + """Synthesize findings from a :class:`RepairResult`. + + Each repair kind maps to a single info-severity finding so the GUI + shows the user what the parser quietly fixed before they reached the + tool pages. + """ + if not repair.changed and not repair.unrepairable_lines: + return [] + summary = repair.summary() + findings: list[Finding] = [] + if "strip_bom" in summary: + findings.append(Finding( + id="csv_bom_stripped", + severity="info", + tool=TOOL_TEXT_CLEANER, + count=1, + description="UTF-8 BOM at file start was removed before parsing.", + )) + if "strip_nul" in summary: + nul_action = next(a for a in repair.actions if a.kind == "strip_nul") + findings.append(Finding( + id="csv_nul_stripped", + severity="warn", + tool=TOOL_TEXT_CLEANER, + count=1, + description=( + f"Embedded NUL bytes in the file were stripped before " + f"parsing ({nul_action.detail})." + ), + )) + if "fold_smart_quote" in summary: + action = next(a for a in repair.actions if a.kind == "fold_smart_quote") + findings.append(Finding( + id="csv_smart_quotes_folded", + severity="info", + tool=TOOL_TEXT_CLEANER, + count=1, + description=( + f"Smart double quotes were folded to ASCII before parsing " + f"({action.detail})." + ), + )) + if "quote_unquoted_delim" in summary: + n = summary["quote_unquoted_delim"] + findings.append(Finding( + id="csv_unquoted_delimiters_repaired", + severity="warn", + tool="", + count=n, + description=( + f"{n} row(s) had a delimiter inside an unquoted field " + f"(e.g. '$1,500.00') and were merged during pre-parse repair." + ), + )) + if repair.unrepairable_lines: + n = len(repair.unrepairable_lines) + findings.append(Finding( + id="csv_unrepairable_rows", + severity="error", + tool="", + count=n, + description=( + f"{n} row(s) had ambiguous structural problems and were " + f"left as-is. Inspect lines: " + f"{repair.unrepairable_lines[:10]}" + ), + )) + return findings + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + +def analyze( + source: pd.DataFrame | str | Path, + *, + sample_rows: int = 1000, + repair_result: Optional[RepairResult] = None, +) -> list[Finding]: + """Run all detectors against *source* and return a list of findings. + + Parameters + ---------- + source + Either a DataFrame already in memory or a path to a CSV/Excel file. + Paths are read with the same encoding/delimiter detection as + :func:`read_file`. Only the first *sample_rows* are scanned. + sample_rows + Cap on how many rows to scan. Defaults to 1000 — enough to detect + every per-cell pollution pattern without paying for a multi-GB read. + repair_result + Optional :class:`RepairResult` from a prior pre-parse pass; used + to synthesize ``csv_*`` findings so the user sees what the parser + quietly fixed. + """ + if isinstance(source, (str, Path)): + df, internal_repair = _load_for_analysis(Path(source), sample_rows=sample_rows) + # Caller-supplied repair_result wins over the internally produced one, + # since the caller may have used non-default repair flags. + if repair_result is None: + repair_result = internal_repair + else: + df = source.head(sample_rows).copy() if len(source) > sample_rows else source.copy() + + findings: list[Finding] = [] + if repair_result is not None: + findings.extend(_findings_from_repair(repair_result)) + findings.extend(_detect_smart_punctuation(df)) + findings.extend(_detect_invisible_chars(df)) + findings.extend(_detect_whitespace_padding(df)) + findings.extend(_detect_null_like_sentinels(df)) + findings.extend(_detect_mojibake(df)) + findings.extend(_detect_mixed_case_email(df)) + findings.extend(_detect_leading_zero_ids(df)) + return findings + + +def _load_for_analysis( + path: Path, *, sample_rows: int, +) -> tuple[pd.DataFrame, Optional[RepairResult]]: + """Read just enough of *path* to scan, with the same robust pre-parse + repair the tool pages will use. + + Returns ``(df, repair_result)``. The repair result is *None* for Excel + files since the byte-level repair step (BOM/NUL/smart-quote folding) + is CSV-specific. + """ + suffix = path.suffix.lower() + if suffix in (".xlsx", ".xls"): + df = pd.read_excel( + path, dtype=str, keep_default_na=False, engine="openpyxl", + nrows=sample_rows, + ) + return df, None + enc = detect_encoding(path) + delim = detect_delimiter(path, enc) + raw = path.read_bytes() + repair = repair_bytes(raw, encoding=enc, delimiter=delim) + import io as _io + df = pd.read_csv( + _io.BytesIO(repair.repaired_bytes), + encoding="utf-8", delimiter=delim, + dtype=str, keep_default_na=False, on_bad_lines="warn", + nrows=sample_rows, + ) + return df, repair + + +def to_dict(finding: Finding) -> dict[str, Any]: + """JSON-friendly representation; used by the CLI ``--json`` output.""" + return { + "id": finding.id, + "severity": finding.severity, + "tool": finding.tool, + "count": finding.count, + "description": finding.description, + "column": finding.column, + "samples": [ + {"row": r, "column": c, "value": v} + for r, c, v in finding.samples + ], + } + + +def findings_by_tool(findings: list[Finding]) -> dict[str, list[Finding]]: + """Group findings by tool id; useful for the GUI sidebar badges.""" + out: dict[str, list[Finding]] = {} + for f in findings: + if not f.tool: + continue + out.setdefault(f.tool, []).append(f) + return out diff --git a/tests/test_analyze.py b/tests/test_analyze.py new file mode 100644 index 0000000..ef519ae --- /dev/null +++ b/tests/test_analyze.py @@ -0,0 +1,268 @@ +"""Tests for src.core.analyze — upload-time data quality detectors.""" + +from __future__ import annotations + +from pathlib import Path + +import pandas as pd +import pytest + +from src.core.analyze import ( + Finding, + TOOL_DEDUPLICATOR, + TOOL_MISSING_HANDLER, + TOOL_TEXT_CLEANER, + analyze, + findings_by_tool, + to_dict, +) +from src.core.io import RepairAction, RepairResult, repair_bytes + + +def _ids(findings: list[Finding]) -> set[str]: + return {f.id for f in findings} + + +# --------------------------------------------------------------------------- +# Smart punctuation +# --------------------------------------------------------------------------- + +class TestSmartPunctuation: + def test_finds_curly_quotes(self): + df = pd.DataFrame({"note": ["plain", "“fancy”", "it’s"]}) + findings = analyze(df) + assert "smart_punctuation_in_data" in _ids(findings) + f = next(f for f in findings if f.id == "smart_punctuation_in_data") + assert f.severity == "warn" + assert f.tool == TOOL_TEXT_CLEANER + assert f.count == 2 + + def test_finds_dashes_and_ellipsis(self): + df = pd.DataFrame({"note": ["a—b", "wait…"]}) + findings = analyze(df) + assert "smart_punctuation_in_data" in _ids(findings) + + def test_clean_data_no_finding(self): + df = pd.DataFrame({"note": ["plain", "ASCII only", "no smart chars"]}) + findings = analyze(df) + assert "smart_punctuation_in_data" not in _ids(findings) + + +# --------------------------------------------------------------------------- +# Invisible / NBSP / dirty headers +# --------------------------------------------------------------------------- + +class TestInvisibleChars: + def test_finds_nbsp(self): + df = pd.DataFrame({"name": ["Alice ", "Bob"]}) + findings = analyze(df) + assert "nbsp_or_unicode_whitespace" in _ids(findings) + f = next(f for f in findings if f.id == "nbsp_or_unicode_whitespace") + assert f.count == 1 + + def test_finds_zero_width(self): + df = pd.DataFrame({"name": ["Alice​", "Bob"]}) + findings = analyze(df) + assert "zero_width_or_invisible" in _ids(findings) + + def test_flags_dirty_headers(self): + df = pd.DataFrame({" id ": [1], "Email​": ["a@b.com"]}) + findings = analyze(df) + assert "dirty_column_headers" in _ids(findings) + f = next(f for f in findings if f.id == "dirty_column_headers") + assert f.count == 2 + + def test_clean_headers_no_finding(self): + df = pd.DataFrame({"id": [1], "email": ["a@b.com"]}) + findings = analyze(df) + assert "dirty_column_headers" not in _ids(findings) + + +# --------------------------------------------------------------------------- +# Whitespace padding +# --------------------------------------------------------------------------- + +class TestWhitespacePadding: + def test_finds_leading_trailing_space(self): + df = pd.DataFrame({"x": [" padded ", "clean"]}) + findings = analyze(df) + assert "whitespace_padding" in _ids(findings) + + def test_finds_internal_double_space(self): + df = pd.DataFrame({"x": ["double space", "single space"]}) + findings = analyze(df) + assert "whitespace_padding" in _ids(findings) + + def test_no_finding_when_clean(self): + df = pd.DataFrame({"x": ["clean", "also clean"]}) + findings = analyze(df) + assert "whitespace_padding" not in _ids(findings) + + +# --------------------------------------------------------------------------- +# Null-like sentinels +# --------------------------------------------------------------------------- + +class TestNullLikeSentinels: + def test_finds_n_a_and_nan(self): + df = pd.DataFrame({"x": ["valid", "N/A", "nan", "None", "-"]}) + findings = analyze(df) + f = next(f for f in findings if f.id == "null_like_sentinels") + assert f.count == 4 + assert f.tool == TOOL_MISSING_HANDLER + assert f.severity == "info" + + def test_clean_data_no_finding(self): + df = pd.DataFrame({"x": ["a", "b", "c"]}) + findings = analyze(df) + assert "null_like_sentinels" not in _ids(findings) + + +# --------------------------------------------------------------------------- +# Mojibake +# --------------------------------------------------------------------------- + +class TestMojibake: + def test_finds_classic_pattern(self): + df = pd.DataFrame({"name": ["café", "café", "Müller"]}) + findings = analyze(df) + assert "suspected_mojibake" in _ids(findings) + + def test_clean_unicode_no_finding(self): + df = pd.DataFrame({"name": ["café", "naïve", "München"]}) + findings = analyze(df) + assert "suspected_mojibake" not in _ids(findings) + + +# --------------------------------------------------------------------------- +# Mixed-case email column +# --------------------------------------------------------------------------- + +class TestMixedCaseEmail: + def test_finds_mixed_case(self): + df = pd.DataFrame({"email": ["Alice@Example.COM", "bob@example.com"]}) + findings = analyze(df) + assert "mixed_case_email_column" in _ids(findings) + + def test_all_lower_no_finding(self): + df = pd.DataFrame({"email": ["a@b.com", "c@d.com"]}) + findings = analyze(df) + assert "mixed_case_email_column" not in _ids(findings) + + def test_non_email_column_ignored(self): + df = pd.DataFrame({"name": ["Alice", "bob"]}) + findings = analyze(df) + assert "mixed_case_email_column" not in _ids(findings) + + +# --------------------------------------------------------------------------- +# Leading-zero IDs +# --------------------------------------------------------------------------- + +class TestLeadingZeroIds: + def test_finds_zero_padded_ids(self): + df = pd.DataFrame({ + "sku": ["0001234", "0005678", "0009999", "0001111", "0002222", "0003333"], + }) + findings = analyze(df) + assert "leading_zero_ids" in _ids(findings) + + def test_no_finding_when_no_leading_zero(self): + df = pd.DataFrame({"id": [str(i) for i in range(1, 100)]}) + findings = analyze(df) + assert "leading_zero_ids" not in _ids(findings) + + +# --------------------------------------------------------------------------- +# Findings synthesized from RepairResult +# --------------------------------------------------------------------------- + +class TestFindingsFromRepair: + def test_bom_strip_surfaces(self): + repair = repair_bytes(b"\xef\xbb\xbfid,name\n1,Alice\n") + findings = analyze(pd.DataFrame({"id": ["1"], "name": ["Alice"]}), + repair_result=repair) + assert "csv_bom_stripped" in _ids(findings) + + def test_nul_strip_surfaces(self): + repair = repair_bytes(b"id,name\n1,Hel\x00lo\n") + findings = analyze(pd.DataFrame({"id": ["1"], "name": ["Hello"]}), + repair_result=repair) + assert "csv_nul_stripped" in _ids(findings) + + def test_unrepairable_surfaces_as_error(self): + # Synthesize a result with an unrepairable line. + repair = RepairResult( + repaired_bytes=b"id,a,b\n1,foo,bar\n", + actions=[], + unrepairable_lines=[3], + ) + findings = analyze(pd.DataFrame({"id": ["1"], "a": ["foo"], "b": ["bar"]}), + repair_result=repair) + f = next(f for f in findings if f.id == "csv_unrepairable_rows") + assert f.severity == "error" + + +# --------------------------------------------------------------------------- +# End-to-end on the corpus kitchen-sink fixture +# --------------------------------------------------------------------------- + +class TestEndToEnd: + def test_kitchen_sink_fixture_finds_pollution(self): + path = Path("test-cases/text-cleaner-corpus/test_data/20_kitchen_sink.csv") + if not path.exists(): + pytest.skip("corpus fixture not present") + findings = analyze(path) + ids = _ids(findings) + # Kitchen-sink has BOM, smart quotes, NBSP, ZWSP, and dirty headers. + # Pre-parse repair handles the file-level smart-quote/BOM, so they + # show up as csv_* findings; the cell-level NBSP/ZW remain as + # data findings. + assert "csv_bom_stripped" in ids or "csv_smart_quotes_folded" in ids + # NBSP-padded headers should still surface — pre-parse repair only + # touches double-quote characters. + assert any(i.startswith("dirty_") or i.startswith("nbsp") or i.startswith("zero_width") + for i in ids) + + def test_clean_dataframe_returns_empty_findings(self): + df = pd.DataFrame({ + "id": ["1", "2", "3"], + "name": ["Alice", "Bob", "Carol"], + "email": ["a@x.com", "b@x.com", "c@x.com"], + }) + findings = analyze(df) + assert findings == [] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +class TestHelpers: + def test_findings_by_tool_groups_correctly(self): + df = pd.DataFrame({ + "name": [" padded ", "“smart”"], + "x": ["N/A", "valid"], + }) + findings = analyze(df) + grouped = findings_by_tool(findings) + assert TOOL_TEXT_CLEANER in grouped + assert TOOL_MISSING_HANDLER in grouped + + def test_findings_by_tool_skips_toolless(self): + repair = RepairResult( + repaired_bytes=b"", actions=[], unrepairable_lines=[5, 7], + ) + findings = analyze(pd.DataFrame({"x": ["a"]}), repair_result=repair) + grouped = findings_by_tool(findings) + # csv_unrepairable_rows has tool="" and should not appear. + assert all(t for t in grouped) + + def test_to_dict_is_json_serializable(self): + df = pd.DataFrame({"x": [" padded "]}) + findings = analyze(df) + d = to_dict(findings[0]) + import json + json.dumps(d) # would raise on non-serializable values + assert d["id"] == "whitespace_padding" + assert "samples" in d