feat(analyze): add mixed_line_endings + near_duplicate_rows detectors

Two more detectors close the analyzer gap list:

mixed_line_endings (warn, tool=02): scans raw bytes for combinations of
  CRLF / LF / bare CR. Disaster pattern after multi-source concat
  (Windows + macOS + Linux exports stitched together). Operates on raw
  bytes only — DataFrame-mode analyze() skips it because raw bytes
  aren't available. _load_for_analysis now returns the raw bytes
  alongside the DataFrame and repair result so the detector has them.

near_duplicate_rows (info, tool=01): cheap dedup signal — strip and
  lowercase every string column, then count df.duplicated(). Catches the
  most common case (same customer entered twice with subtle formatting
  differences) without paying for fuzzy matching. Anything more
  sophisticated stays in tool 01.

Six new tests cover both detectors plus the dataframe-mode skip path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-29 16:09:42 +00:00
parent 0671ef277e
commit 8dfc6ad8ae
2 changed files with 146 additions and 7 deletions

View File

@@ -20,6 +20,7 @@ from pathlib import Path
from typing import Any, Iterable, Literal, Optional
import pandas as pd
from pandas.api import types as pdtypes
from .io import RepairResult, repair_bytes, detect_encoding, detect_delimiter
@@ -319,6 +320,51 @@ def _detect_mixed_case_email(df: pd.DataFrame) -> list[Finding]:
return findings
def _detect_near_duplicates(df: pd.DataFrame) -> list[Finding]:
"""Detect duplicate rows that differ only in case or padding.
Cheap pass: lowercase + strip every string column, then count
``df.duplicated()``. Catches the most common dedup signal (the same
customer entered twice with subtle formatting differences) without
paying the cost of fuzzy matching. Anything more sophisticated belongs
in tool 01.
"""
if len(df) < 2:
return []
norm = df.copy()
for col in norm.columns:
if pdtypes.is_object_dtype(norm[col]) or pdtypes.is_string_dtype(norm[col]):
norm[col] = (
norm[col].astype(str).str.strip().str.lower()
)
dup_mask = norm.duplicated(keep=False)
n_dupes = int(dup_mask.sum())
if n_dupes < 2:
return []
# Count *extra* copies, not total members of duplicate groups.
n_groups = int(norm[dup_mask].drop_duplicates().shape[0])
samples: list[tuple[int, str, str]] = []
for i in df[dup_mask].index[:5]:
# Render the first textual column's value as a sample.
col_name = next(
(c for c in df.columns if isinstance(df[c].iloc[i], str)),
df.columns[0],
)
samples.append((int(i), str(col_name), str(df[col_name].iloc[i])))
return [Finding(
id="near_duplicate_rows",
severity="info",
tool=TOOL_DEDUPLICATOR,
count=n_dupes,
description=(
f"{n_dupes} row(s) across ~{n_groups} group(s) are duplicates "
f"after stripping whitespace and lowercasing string columns. "
f"Run the deduplicator to merge or remove."
),
samples=samples,
)]
def _detect_leading_zero_ids(df: pd.DataFrame) -> list[Finding]:
"""Informational: a column where most values are zero-padded digit IDs.
@@ -355,6 +401,42 @@ def _detect_leading_zero_ids(df: pd.DataFrame) -> list[Finding]:
return findings
def _detect_mixed_line_endings(raw: bytes) -> list[Finding]:
"""Flag files that mix CRLF, LF, and bare CR line terminators.
Mixed endings are a classic disaster pattern after multi-source concat
(Windows + macOS + Linux exports stitched together). Operates on raw
bytes only — DataFrame-mode :func:`analyze` skips this detector.
"""
if not raw:
return []
n_crlf = raw.count(b"\r\n")
# Count standalone \r and \n (not part of \r\n) by subtracting overlaps.
n_lf = raw.count(b"\n") - n_crlf
n_cr = raw.count(b"\r") - n_crlf
kinds_present = sum(1 for n in (n_crlf, n_lf, n_cr) if n > 0)
if kinds_present <= 1:
return []
breakdown = []
if n_crlf:
breakdown.append(f"{n_crlf} CRLF")
if n_lf:
breakdown.append(f"{n_lf} LF")
if n_cr:
breakdown.append(f"{n_cr} CR")
return [Finding(
id="mixed_line_endings",
severity="warn",
tool=TOOL_TEXT_CLEANER,
count=kinds_present,
description=(
f"File mixes {kinds_present} line-ending styles "
f"({', '.join(breakdown)}). Naive splits on one style produce "
f"ghost rows or merged lines. Run the text cleaner to normalize."
),
)]
def _findings_from_repair(repair: RepairResult) -> list[Finding]:
"""Synthesize findings from a :class:`RepairResult`.
@@ -452,8 +534,11 @@ def analyze(
to synthesize ``csv_*`` findings so the user sees what the parser
quietly fixed.
"""
raw_for_byte_scan: Optional[bytes] = None
if isinstance(source, (str, Path)):
df, internal_repair = _load_for_analysis(Path(source), sample_rows=sample_rows)
df, internal_repair, raw_for_byte_scan = _load_for_analysis(
Path(source), sample_rows=sample_rows,
)
# Caller-supplied repair_result wins over the internally produced one,
# since the caller may have used non-default repair flags.
if repair_result is None:
@@ -464,6 +549,8 @@ def analyze(
findings: list[Finding] = []
if repair_result is not None:
findings.extend(_findings_from_repair(repair_result))
if raw_for_byte_scan is not None:
findings.extend(_detect_mixed_line_endings(raw_for_byte_scan))
findings.extend(_detect_smart_punctuation(df))
findings.extend(_detect_invisible_chars(df))
findings.extend(_detect_whitespace_padding(df))
@@ -471,18 +558,19 @@ def analyze(
findings.extend(_detect_mojibake(df))
findings.extend(_detect_mixed_case_email(df))
findings.extend(_detect_leading_zero_ids(df))
findings.extend(_detect_near_duplicates(df))
return findings
def _load_for_analysis(
path: Path, *, sample_rows: int,
) -> tuple[pd.DataFrame, Optional[RepairResult]]:
) -> tuple[pd.DataFrame, Optional[RepairResult], Optional[bytes]]:
"""Read just enough of *path* to scan, with the same robust pre-parse
repair the tool pages will use.
Returns ``(df, repair_result)``. The repair result is *None* for Excel
files since the byte-level repair step (BOM/NUL/smart-quote folding)
is CSV-specific.
Returns ``(df, repair_result, raw_bytes)``. The repair result and raw
bytes are *None* for Excel files since the byte-level repair step
(BOM/NUL/smart-quote folding) and line-ending scan are CSV-specific.
"""
suffix = path.suffix.lower()
if suffix in (".xlsx", ".xls"):
@@ -490,7 +578,7 @@ def _load_for_analysis(
path, dtype=str, keep_default_na=False, engine="openpyxl",
nrows=sample_rows,
)
return df, None
return df, None, None
enc = detect_encoding(path)
delim = detect_delimiter(path, enc)
raw = path.read_bytes()
@@ -502,7 +590,7 @@ def _load_for_analysis(
dtype=str, keep_default_na=False, on_bad_lines="warn",
nrows=sample_rows,
)
return df, repair
return df, repair, raw
def to_dict(finding: Finding) -> dict[str, Any]:

View File

@@ -173,6 +173,57 @@ class TestLeadingZeroIds:
assert "leading_zero_ids" not in _ids(findings)
# ---------------------------------------------------------------------------
# Near-duplicate rows
# ---------------------------------------------------------------------------
class TestNearDuplicates:
def test_finds_case_insensitive_dupes(self):
df = pd.DataFrame({
"name": ["Alice", "alice ", "Bob"],
"email": ["a@b.com", "A@B.COM", "bob@b.com"],
})
findings = analyze(df)
assert "near_duplicate_rows" in _ids(findings)
def test_unique_rows_no_finding(self):
df = pd.DataFrame({
"name": ["Alice", "Bob", "Carol"],
"email": ["a@x.com", "b@x.com", "c@x.com"],
})
findings = analyze(df)
assert "near_duplicate_rows" not in _ids(findings)
def test_single_row_no_finding(self):
df = pd.DataFrame({"x": ["only"]})
findings = analyze(df)
assert "near_duplicate_rows" not in _ids(findings)
# ---------------------------------------------------------------------------
# Mixed line endings
# ---------------------------------------------------------------------------
class TestMixedLineEndings:
def test_crlf_plus_lf_flagged(self, tmp_path):
f = tmp_path / "mixed.csv"
f.write_bytes(b"id,name\r\n1,Alice\n2,Bob\r\n")
findings = analyze(f)
assert "mixed_line_endings" in _ids(findings)
def test_uniform_lf_not_flagged(self, tmp_path):
f = tmp_path / "uniform.csv"
f.write_bytes(b"id,name\n1,Alice\n2,Bob\n")
findings = analyze(f)
assert "mixed_line_endings" not in _ids(findings)
def test_dataframe_mode_skips_detector(self):
# No raw bytes -> mixed_line_endings cannot be detected.
df = pd.DataFrame({"id": ["1"], "name": ["Alice"]})
findings = analyze(df)
assert "mixed_line_endings" not in _ids(findings)
# ---------------------------------------------------------------------------
# Findings synthesized from RepairResult
# ---------------------------------------------------------------------------