"""Upload-time data quality analyzer. Runs a fast, read-only scan over an uploaded file (or DataFrame) and returns a list of :class:`Finding` objects. Each finding names the issue, how many cells/rows are affected, and which downstream tool can address it. The GUI consumes findings to badge tool nav items; the CLI prints them as a table. The analyzer is *purely advisory*: it never mutates data, never runs a tool, and is safe to skip. Treat it as a guided onboarding step, not a hard gate on the upload flow. """ from __future__ import annotations import re import unicodedata from dataclasses import dataclass, field from pathlib import Path from typing import Any, Iterable, Literal, Optional import pandas as pd from .io import RepairResult, repair_bytes, detect_encoding, detect_delimiter Severity = Literal["info", "warn", "error"] # Tool identifiers — match the 0N_ convention used by the script set. # Listed here so detectors stay decoupled from the GUI's display layer. TOOL_TEXT_CLEANER = "02_text_cleaner" TOOL_MISSING_HANDLER = "04_missing_handler" TOOL_DEDUPLICATOR = "01_deduplicator" TOOL_FORMAT_STANDARDIZER = "03_format_standardizer" @dataclass class Finding: """One issue the analyzer surfaced. Attributes ---------- id Stable identifier (``"smart_quotes_in_data"``); used for GUI lookup and downloadable JSON exports. Never localized. severity ``"info"`` (FYI), ``"warn"`` (likely needs cleanup), ``"error"`` (will block downstream work). tool Tool id that can address the finding, or empty string for purely informational findings. count Number of cells (or rows) affected. description Single-sentence human summary used for banners and tooltips. column Column name when scoped to one column; ``None`` for whole-frame / file-level findings. samples Up to a handful of ``(row, column, value)`` tuples for the GUI to render. Cap at five so the JSON export stays compact. """ id: str severity: Severity tool: str count: int description: str column: Optional[str] = None samples: list[tuple[int, str, str]] = field(default_factory=list) # --------------------------------------------------------------------------- # Per-cell character classes (kept independent of text_clean to avoid an # import cycle and to keep the analyzer self-contained). # --------------------------------------------------------------------------- _SMART_QUOTE_CHARS = set("“”‘’„‟«»′″") _DASH_ELLIPSIS_CHARS = set("–—―−…") _NBSP_LIKE_CHARS = set("       ") _ZERO_WIDTH_CHARS = set("​‌‍⁠‎‏­") _NULL_LIKE = { "n/a", "na", "nan", "null", "none", "#n/a", "#na", "-", "--", "tbd", "unknown", "n.a.", "(null)", } # Mojibake fingerprints: classic UTF-8-as-cp1252 corruptions. _MOJIBAKE_PATTERNS = re.compile( r"Ã[©¨¢¤¶Œœ]" # café -> café, étage -> étage etc. r"|â€[™œžs˜“”]" # don't -> don’t r"|Â[ -¿]" ) _LEADING_ZERO_ID_RE = re.compile(r"^0\d{2,}$") _DIGITS_RE = re.compile(r"^\d+$") _EMAIL_LIKE_COL = re.compile(r"e?[ -_]?mail|^email|address$", re.IGNORECASE) def _has_any(text: str, chars: set[str]) -> bool: return any(c in chars for c in text) def _samples(rows: Iterable[tuple[int, str, str]], limit: int = 5) -> list[tuple[int, str, str]]: out: list[tuple[int, str, str]] = [] for item in rows: out.append(item) if len(out) >= limit: break return out # --------------------------------------------------------------------------- # Detectors # --------------------------------------------------------------------------- def _detect_smart_punctuation(df: pd.DataFrame) -> list[Finding]: affected_cells = 0 sample_rows: list[tuple[int, str, str]] = [] for col in df.columns: for row_idx, val in enumerate(df[col].tolist()): if not isinstance(val, str): continue if _has_any(val, _SMART_QUOTE_CHARS) or _has_any(val, _DASH_ELLIPSIS_CHARS): affected_cells += 1 if len(sample_rows) < 5: sample_rows.append((row_idx, str(col), val)) if not affected_cells: return [] return [Finding( id="smart_punctuation_in_data", severity="warn", tool=TOOL_TEXT_CLEANER, count=affected_cells, description=( f"{affected_cells} cell(s) contain curly quotes, em/en dashes, " f"or ellipsis characters. These break string equality joins and " f"regex patterns." ), samples=sample_rows, )] def _detect_invisible_chars(df: pd.DataFrame) -> list[Finding]: nbsp_cells = 0 zw_cells = 0 nbsp_samples: list[tuple[int, str, str]] = [] zw_samples: list[tuple[int, str, str]] = [] for col in df.columns: for row_idx, val in enumerate(df[col].tolist()): if not isinstance(val, str): continue if _has_any(val, _NBSP_LIKE_CHARS): nbsp_cells += 1 if len(nbsp_samples) < 5: nbsp_samples.append((row_idx, str(col), val)) if _has_any(val, _ZERO_WIDTH_CHARS): zw_cells += 1 if len(zw_samples) < 5: zw_samples.append((row_idx, str(col), val)) findings: list[Finding] = [] if nbsp_cells: findings.append(Finding( id="nbsp_or_unicode_whitespace", severity="warn", tool=TOOL_TEXT_CLEANER, count=nbsp_cells, description=( f"{nbsp_cells} cell(s) contain non-breaking or other Unicode " f"spaces. These look identical to a regular space but break " f"join keys." ), samples=nbsp_samples, )) if zw_cells: findings.append(Finding( id="zero_width_or_invisible", severity="warn", tool=TOOL_TEXT_CLEANER, count=zw_cells, description=( f"{zw_cells} cell(s) contain zero-width or invisible " f"characters (ZWSP, ZWJ, soft hyphen, BOM, bidi marks)." ), samples=zw_samples, )) # Headers carry the same risks; flag separately so the user sees that # df["Email"] vs df["Email​"] is the issue. bad_headers = [ c for c in df.columns if isinstance(c, str) and ( c != c.strip() or _has_any(c, _NBSP_LIKE_CHARS) or _has_any(c, _ZERO_WIDTH_CHARS) or _has_any(c, _SMART_QUOTE_CHARS) ) ] if bad_headers: findings.append(Finding( id="dirty_column_headers", severity="warn", tool=TOOL_TEXT_CLEANER, count=len(bad_headers), description=( f"{len(bad_headers)} column header(s) contain whitespace, " f"smart quotes, or invisible characters. These break " f"df['col'] lookups." ), samples=[(0, h, h) for h in bad_headers[:5]], )) return findings def _detect_whitespace_padding(df: pd.DataFrame) -> list[Finding]: affected = 0 samples: list[tuple[int, str, str]] = [] for col in df.columns: for row_idx, val in enumerate(df[col].tolist()): if not isinstance(val, str) or not val: continue if val != val.strip() or " " in val: affected += 1 if len(samples) < 5: samples.append((row_idx, str(col), val)) if not affected: return [] return [Finding( id="whitespace_padding", severity="warn", tool=TOOL_TEXT_CLEANER, count=affected, description=( f"{affected} cell(s) have leading/trailing whitespace or " f"multi-space internal runs. Common cause of failed joins." ), samples=samples, )] def _detect_null_like_sentinels(df: pd.DataFrame) -> list[Finding]: affected = 0 samples: list[tuple[int, str, str]] = [] cols_with_sentinels: set[str] = set() for col in df.columns: for row_idx, val in enumerate(df[col].tolist()): if not isinstance(val, str): continue if val.strip().lower() in _NULL_LIKE: affected += 1 cols_with_sentinels.add(str(col)) if len(samples) < 5: samples.append((row_idx, str(col), val)) if not affected: return [] return [Finding( id="null_like_sentinels", severity="info", tool=TOOL_MISSING_HANDLER, count=affected, description=( f"{affected} cell(s) across {len(cols_with_sentinels)} column(s) " f"look like disguised nulls (N/A, NaN, None, '-'). Decide what " f"counts as missing in the missing-value handler." ), samples=samples, )] def _detect_mojibake(df: pd.DataFrame) -> list[Finding]: affected = 0 samples: list[tuple[int, str, str]] = [] for col in df.columns: for row_idx, val in enumerate(df[col].tolist()): if not isinstance(val, str): continue if _MOJIBAKE_PATTERNS.search(val): affected += 1 if len(samples) < 5: samples.append((row_idx, str(col), val)) if not affected: return [] return [Finding( id="suspected_mojibake", severity="info", tool=TOOL_TEXT_CLEANER, count=affected, description=( f"{affected} cell(s) match common UTF-8-as-cp1252 mojibake " f"patterns (é, ’, etc.). Auto-repair is opt-in (Tier 2)." ), samples=samples, )] def _detect_mixed_case_email(df: pd.DataFrame) -> list[Finding]: findings: list[Finding] = [] for col in df.columns: if not isinstance(col, str) or not _EMAIL_LIKE_COL.search(col): continue values = [v for v in df[col].tolist() if isinstance(v, str) and v.strip()] if not values: continue has_upper = any(any(c.isupper() for c in v) for v in values) has_lower = any(any(c.islower() for c in v) for v in values) if has_upper and has_lower: samples = [(i, col, v) for i, v in enumerate(values[:5])] findings.append(Finding( id="mixed_case_email_column", severity="info", tool=TOOL_TEXT_CLEANER, count=len(values), description=( f"Column '{col}' has mixed case across email values. " f"Lowercasing emails before dedup avoids false negatives." ), column=col, samples=samples, )) return findings def _detect_leading_zero_ids(df: pd.DataFrame) -> list[Finding]: """Informational: a column where most values are zero-padded digit IDs. Worth surfacing because Excel re-opens often strip them — the user should know they're there before any Excel round-trip. """ findings: list[Finding] = [] for col in df.columns: values = [v for v in df[col].tolist() if isinstance(v, str) and v.strip()] if len(values) < 5: continue digit_count = sum(1 for v in values if _DIGITS_RE.match(v)) leading_zero_count = sum(1 for v in values if _LEADING_ZERO_ID_RE.match(v)) # >80% are zero-padded digit IDs of the same length-ish. if digit_count >= 0.8 * len(values) and leading_zero_count >= 0.5 * len(values): samples = [ (i, str(col), v) for i, v in enumerate(values[:5]) if _LEADING_ZERO_ID_RE.match(v) ][:5] findings.append(Finding( id="leading_zero_ids", severity="info", tool="", count=leading_zero_count, description=( f"Column '{col}' contains zero-padded numeric IDs " f"({leading_zero_count}/{len(values)}). Excel will strip " f"the zeros on round-trip unless saved as text." ), column=str(col), samples=samples, )) return findings def _findings_from_repair(repair: RepairResult) -> list[Finding]: """Synthesize findings from a :class:`RepairResult`. Each repair kind maps to a single info-severity finding so the GUI shows the user what the parser quietly fixed before they reached the tool pages. """ if not repair.changed and not repair.unrepairable_lines: return [] summary = repair.summary() findings: list[Finding] = [] if "strip_bom" in summary: findings.append(Finding( id="csv_bom_stripped", severity="info", tool=TOOL_TEXT_CLEANER, count=1, description="UTF-8 BOM at file start was removed before parsing.", )) if "strip_nul" in summary: nul_action = next(a for a in repair.actions if a.kind == "strip_nul") findings.append(Finding( id="csv_nul_stripped", severity="warn", tool=TOOL_TEXT_CLEANER, count=1, description=( f"Embedded NUL bytes in the file were stripped before " f"parsing ({nul_action.detail})." ), )) if "fold_smart_quote" in summary: action = next(a for a in repair.actions if a.kind == "fold_smart_quote") findings.append(Finding( id="csv_smart_quotes_folded", severity="info", tool=TOOL_TEXT_CLEANER, count=1, description=( f"Smart double quotes were folded to ASCII before parsing " f"({action.detail})." ), )) if "quote_unquoted_delim" in summary: n = summary["quote_unquoted_delim"] findings.append(Finding( id="csv_unquoted_delimiters_repaired", severity="warn", tool="", count=n, description=( f"{n} row(s) had a delimiter inside an unquoted field " f"(e.g. '$1,500.00') and were merged during pre-parse repair." ), )) if repair.unrepairable_lines: n = len(repair.unrepairable_lines) findings.append(Finding( id="csv_unrepairable_rows", severity="error", tool="", count=n, description=( f"{n} row(s) had ambiguous structural problems and were " f"left as-is. Inspect lines: " f"{repair.unrepairable_lines[:10]}" ), )) return findings # --------------------------------------------------------------------------- # Public entry point # --------------------------------------------------------------------------- def analyze( source: pd.DataFrame | str | Path, *, sample_rows: int = 1000, repair_result: Optional[RepairResult] = None, ) -> list[Finding]: """Run all detectors against *source* and return a list of findings. Parameters ---------- source Either a DataFrame already in memory or a path to a CSV/Excel file. Paths are read with the same encoding/delimiter detection as :func:`read_file`. Only the first *sample_rows* are scanned. sample_rows Cap on how many rows to scan. Defaults to 1000 — enough to detect every per-cell pollution pattern without paying for a multi-GB read. repair_result Optional :class:`RepairResult` from a prior pre-parse pass; used to synthesize ``csv_*`` findings so the user sees what the parser quietly fixed. """ if isinstance(source, (str, Path)): df, internal_repair = _load_for_analysis(Path(source), sample_rows=sample_rows) # Caller-supplied repair_result wins over the internally produced one, # since the caller may have used non-default repair flags. if repair_result is None: repair_result = internal_repair else: df = source.head(sample_rows).copy() if len(source) > sample_rows else source.copy() findings: list[Finding] = [] if repair_result is not None: findings.extend(_findings_from_repair(repair_result)) findings.extend(_detect_smart_punctuation(df)) findings.extend(_detect_invisible_chars(df)) findings.extend(_detect_whitespace_padding(df)) findings.extend(_detect_null_like_sentinels(df)) findings.extend(_detect_mojibake(df)) findings.extend(_detect_mixed_case_email(df)) findings.extend(_detect_leading_zero_ids(df)) return findings def _load_for_analysis( path: Path, *, sample_rows: int, ) -> tuple[pd.DataFrame, Optional[RepairResult]]: """Read just enough of *path* to scan, with the same robust pre-parse repair the tool pages will use. Returns ``(df, repair_result)``. The repair result is *None* for Excel files since the byte-level repair step (BOM/NUL/smart-quote folding) is CSV-specific. """ suffix = path.suffix.lower() if suffix in (".xlsx", ".xls"): df = pd.read_excel( path, dtype=str, keep_default_na=False, engine="openpyxl", nrows=sample_rows, ) return df, None enc = detect_encoding(path) delim = detect_delimiter(path, enc) raw = path.read_bytes() repair = repair_bytes(raw, encoding=enc, delimiter=delim) import io as _io df = pd.read_csv( _io.BytesIO(repair.repaired_bytes), encoding="utf-8", delimiter=delim, dtype=str, keep_default_na=False, on_bad_lines="warn", nrows=sample_rows, ) return df, repair def to_dict(finding: Finding) -> dict[str, Any]: """JSON-friendly representation; used by the CLI ``--json`` output.""" return { "id": finding.id, "severity": finding.severity, "tool": finding.tool, "count": finding.count, "description": finding.description, "column": finding.column, "samples": [ {"row": r, "column": c, "value": v} for r, c, v in finding.samples ], } def findings_by_tool(findings: list[Finding]) -> dict[str, list[Finding]]: """Group findings by tool id; useful for the GUI sidebar badges.""" out: dict[str, list[Finding]] = {} for f in findings: if not f.tool: continue out.setdefault(f.tool, []).append(f) return out