Closes 12 bugs and 8 gaps surfaced by parallel audits across all core modules, plus aligns the dedup-side normalizers with the new format_standardize behavior where they had silently diverged. Bugs (data integrity / correctness): - dedup: NaN/None values matched as duplicates because str(None)='None'. Two rows with missing email silently merged. - dedup: removed_df had 0 columns when nothing was removed; downstream code expecting matching schema broke. Now preserves column shape. - dedup: ColumnMatchStrategy threshold accepted any value; out-of-range silently broke matching. Validated to [0, 100] in __post_init__. - dedup: strategy referencing a missing column was silently skipped. Now raises ValueError listing available columns. - fixes: replace_null_sentinels crashed on non-string sentinels (int/None from JSON payload). Coerced to str. - fixes: _vectorized_regex_sub raised raw re.error on bad patterns. Now wraps as ValueError with clear message. - io: detect_header_row mis-identified all-empty and metadata-only rows as headers (all([]) is True). Now requires ≥2 non-empty cells. - config: from_dict crashed when JSON had unknown fields, breaking forward compat. Now filters to known fields. - analyze: mixed-case email detector flagged all-None columns because str(None)='None' contains both N and one. Now drops NaN before stringify. New features and gap closures: - io: _detect_excel_header_row mirrors detect_header_row for Excel via openpyxl read-only; _read_excel uses it when header_row=None. - io: write_file gains delimiter + encoding params; .tsv extension defaults to tab. - normalizers: normalize_phone preserves extensions as ;ext=N suffix. - normalizers: normalize_address folds spelled-out US state names to 2-letter codes (California ≡ CA). - normalizers: normalize_name drops surname particles (van, de, von) so "Charles de Gaulle" ≡ "Charles Gaulle" for matching. - analyze: new _detect_inconsistent_date_format detector flags columns with mixed ISO/US/EU date shapes; routes to format standardizer. - analyze: _NULL_LIKE recognizes "<na>" (pd.NA repr). - analyze: duplicate-row finding renamed count → n_extra (rows that would actually be removed) with clarified description. - dedup: group_confidence no longer falsely 100.0 when transitive group members lack a recorded direct pair; falls back to 100.0 only when truly no pairs were observed. - dedup: MatchResult / DeduplicationResult docstrings clarify that row_indices refer to the input frame's positional index (output index is reset). - text_clean: visualize_hidden_html(None) now returns None (matches visualize_hidden_text); strip_bom strips at most one BOM per call; sentence_case dead elif branch removed. Tests: - tests/test_audit_fixes.py — 28 regression tests, one or more per numbered finding, named after BUG/GAP/NIT tags so future readers can trace each test back to its audit. - tests/test_fixes_unit.py — 26 isolated unit tests for previously integration-only fix functions (trim_whitespace, strip_nbsp, strip_zero_width, normalize_line_endings, clean_headers, repair_mojibake — last skipped if ftfy unavailable). - tests/test_io.py — adds CSV / TSV / semicolon / UTF-8-BOM round-trip tests + Excel auto-header-detection tests. - tests/test_normalizers.py — adds 8 tests for the alignment work above (phone extension, state names, particles). Adds .claude/ to .gitignore (agent worktrees + local settings). Full project suite: 1197 passed, 4 skipped, 17 xfailed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
339 lines
12 KiB
Python
339 lines
12 KiB
Python
"""Tests for src.core.io — file reading, encoding/delimiter detection."""
|
||
|
||
import io
|
||
|
||
import pandas as pd
|
||
import pytest
|
||
from pathlib import Path
|
||
|
||
from src.core.io import (
|
||
detect_encoding,
|
||
detect_delimiter,
|
||
detect_header_row,
|
||
read_file,
|
||
write_file,
|
||
list_sheets,
|
||
repair_bytes,
|
||
read_csv_repaired,
|
||
)
|
||
|
||
|
||
class TestDetectEncoding:
|
||
def test_utf8_file(self, sample_csv_path):
|
||
enc = detect_encoding(sample_csv_path)
|
||
assert enc.lower().replace("-", "") in ("utf8", "ascii", "utf8sig")
|
||
|
||
def test_empty_file(self, tmp_path):
|
||
f = tmp_path / "empty.csv"
|
||
f.write_bytes(b"")
|
||
assert detect_encoding(f) == "utf-8"
|
||
|
||
def test_bom_file(self, tmp_path):
|
||
f = tmp_path / "bom.csv"
|
||
f.write_bytes(b"\xef\xbb\xbfname,email\nAlice,a@b.com\n")
|
||
assert detect_encoding(f) == "utf-8-sig"
|
||
|
||
def test_latin1_file(self, tmp_path):
|
||
f = tmp_path / "latin.csv"
|
||
content = "name,city\nJosé,São Paulo\n".encode("latin-1")
|
||
f.write_bytes(content)
|
||
enc = detect_encoding(f)
|
||
# Should detect something compatible with latin-1 family
|
||
assert enc in ("iso-8859-1", "latin-1", "windows-1252", "cp1252",
|
||
"iso-8859-9", "cp1250", "iso-8859-15", "utf-8")
|
||
|
||
|
||
class TestDetectDelimiter:
|
||
def test_comma(self, sample_csv_path):
|
||
assert detect_delimiter(sample_csv_path) == ","
|
||
|
||
def test_tab(self, tmp_path):
|
||
f = tmp_path / "tabs.tsv"
|
||
f.write_text("name\temail\nAlice\ta@b.com\n")
|
||
assert detect_delimiter(f) == "\t"
|
||
|
||
def test_semicolon(self, tmp_path):
|
||
f = tmp_path / "semi.csv"
|
||
f.write_text("name;email;phone\nAlice;a@b.com;555\n")
|
||
assert detect_delimiter(f) == ";"
|
||
|
||
def test_pipe(self, tmp_path):
|
||
f = tmp_path / "pipe.csv"
|
||
f.write_text("name|email|phone\nAlice|a@b.com|555\n")
|
||
assert detect_delimiter(f) == "|"
|
||
|
||
|
||
class TestDetectHeaderRow:
|
||
def test_standard_csv(self, sample_csv_path):
|
||
assert detect_header_row(sample_csv_path) == 0
|
||
|
||
def test_with_junk_rows(self, tmp_path):
|
||
f = tmp_path / "junk.csv"
|
||
f.write_text("Report generated 2024-01-01\n\nname,email,phone\nAlice,a@b.com,555\n")
|
||
# Row 0 has "Report generated..." which is a single non-numeric string
|
||
# Row 2 has "name,email,phone" which looks like headers
|
||
# The heuristic checks all cells, so row 0 may match if it's a single cell
|
||
hdr = detect_header_row(f)
|
||
assert hdr in (0, 2) # depends on delimiter detection
|
||
|
||
|
||
class TestReadFile:
|
||
def test_read_csv(self, sample_csv_path):
|
||
df = read_file(sample_csv_path)
|
||
assert isinstance(df, pd.DataFrame)
|
||
assert len(df) == 50
|
||
assert "customer_name" in df.columns
|
||
|
||
def test_read_nonexistent(self):
|
||
with pytest.raises(FileNotFoundError):
|
||
read_file("/tmp/nonexistent_file_xyz.csv")
|
||
|
||
def test_read_with_encoding_override(self, sample_csv_path):
|
||
df = read_file(sample_csv_path, encoding="utf-8")
|
||
assert len(df) == 50
|
||
|
||
def test_chunked_reading(self, sample_csv_path):
|
||
chunks = read_file(sample_csv_path, chunk_size=10)
|
||
# Should be a generator
|
||
all_chunks = list(chunks)
|
||
assert len(all_chunks) == 5
|
||
total_rows = sum(len(c) for c in all_chunks)
|
||
assert total_rows == 50
|
||
|
||
|
||
class TestWriteFile:
|
||
def test_write_csv(self, tmp_path, simple_df):
|
||
out = tmp_path / "output.csv"
|
||
write_file(simple_df, out)
|
||
assert out.exists()
|
||
# Read back
|
||
df = pd.read_csv(out, encoding="utf-8-sig")
|
||
assert len(df) == len(simple_df)
|
||
|
||
def test_write_xlsx(self, tmp_path, simple_df):
|
||
out = tmp_path / "output.xlsx"
|
||
write_file(simple_df, out)
|
||
assert out.exists()
|
||
df = pd.read_excel(out)
|
||
assert len(df) == len(simple_df)
|
||
|
||
def test_utf8_bom_default(self, tmp_path, simple_df):
|
||
out = tmp_path / "bom.csv"
|
||
write_file(simple_df, out)
|
||
raw = out.read_bytes()
|
||
assert raw[:3] == b"\xef\xbb\xbf"
|
||
|
||
|
||
class TestListSheets:
|
||
def test_list_sheets(self, tmp_path, simple_df):
|
||
path = tmp_path / "multi.xlsx"
|
||
with pd.ExcelWriter(path, engine="openpyxl") as writer:
|
||
simple_df.to_excel(writer, sheet_name="Sheet1", index=False)
|
||
simple_df.to_excel(writer, sheet_name="Sheet2", index=False)
|
||
sheets = list_sheets(path)
|
||
assert sheets == ["Sheet1", "Sheet2"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Pre-parse repair
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestRepairBytes:
|
||
def test_strips_bom(self):
|
||
raw = b"\xef\xbb\xbfid,name\n1,Alice\n"
|
||
result = repair_bytes(raw)
|
||
assert result.repaired_bytes == b"id,name\n1,Alice\n"
|
||
assert any(a.kind == "strip_bom" for a in result.actions)
|
||
|
||
def test_strips_nul_bytes(self):
|
||
raw = b"id,name\n1,Hel\x00lo\n2,Wo\x00\x00rld\n"
|
||
result = repair_bytes(raw)
|
||
assert b"\x00" not in result.repaired_bytes
|
||
nul_action = next(a for a in result.actions if a.kind == "strip_nul")
|
||
assert "3" in nul_action.detail # 3 NUL bytes
|
||
|
||
def test_folds_smart_double_quotes(self):
|
||
raw = "id,note\n1,“hello”\n2,«bonjour»\n".encode("utf-8")
|
||
result = repair_bytes(raw)
|
||
text = result.repaired_bytes.decode("utf-8")
|
||
assert "“" not in text and "”" not in text
|
||
assert "«" not in text and "»" not in text
|
||
assert any(a.kind == "fold_smart_quote" for a in result.actions)
|
||
|
||
def test_does_not_fold_curly_singles(self):
|
||
# Single curly quotes should pass through; cell-level cleaner handles them.
|
||
raw = "id,note\n1,it’s fine\n".encode("utf-8")
|
||
result = repair_bytes(raw)
|
||
text = result.repaired_bytes.decode("utf-8")
|
||
assert "’" in text
|
||
assert not any(a.kind == "fold_smart_quote" for a in result.actions)
|
||
|
||
def test_no_changes_when_clean(self):
|
||
raw = b"id,name\n1,Alice\n2,Bob\n"
|
||
result = repair_bytes(raw)
|
||
assert result.repaired_bytes == raw
|
||
assert result.actions == []
|
||
assert result.changed is False
|
||
|
||
def test_repairs_unquoted_currency_comma(self):
|
||
raw = (
|
||
b"id,price,qty\n"
|
||
b"1,100,5\n"
|
||
b"2, $1,500.00 ,7\n" # 4 fields instead of 3
|
||
b"3,200,9\n"
|
||
)
|
||
result = repair_bytes(raw)
|
||
# After repair, every row should have 3 fields when re-parsed.
|
||
df = pd.read_csv(io.BytesIO(result.repaired_bytes))
|
||
assert list(df.columns) == ["id", "price", "qty"]
|
||
assert len(df) == 3
|
||
assert any(a.kind == "quote_unquoted_delim" and a.line == 3 for a in result.actions)
|
||
|
||
def test_logs_unrepairable_when_ambiguous(self):
|
||
# Two adjacent merge candidates -> bail out, log unrepairable.
|
||
raw = (
|
||
b"id,a,b,c\n"
|
||
b"1,foo,bar,baz\n"
|
||
b"2,1,2,3,4,5\n" # way too many extras, no clear merge
|
||
)
|
||
result = repair_bytes(raw)
|
||
assert 3 in result.unrepairable_lines
|
||
|
||
def test_summary_groups_by_kind(self):
|
||
raw = b"\xef\xbb\xbfid,name\n1,Hel\x00lo\n"
|
||
result = repair_bytes(raw)
|
||
summary = result.summary()
|
||
assert summary.get("strip_bom") == 1
|
||
assert summary.get("strip_nul") == 1
|
||
|
||
|
||
class TestReadFileWithRepair:
|
||
"""``read_file(repair=True)`` (default) routes CSV through repair_bytes."""
|
||
|
||
def test_default_strips_bom_via_repair(self, tmp_path):
|
||
f = tmp_path / "bom.csv"
|
||
f.write_bytes(b"\xef\xbb\xbfid,name\n1,Alice\n")
|
||
df = read_file(f)
|
||
# First column header must be 'id', not 'id'.
|
||
assert list(df.columns)[0] == "id"
|
||
|
||
def test_default_folds_smart_double_quotes(self, tmp_path):
|
||
# Curly quotes are *unquoted* here — outer ASCII quotes would create
|
||
# a CSV-quoting collision once the fold runs.
|
||
f = tmp_path / "quoted.csv"
|
||
f.write_bytes("id,note\n1,curly “hello” world\n".encode("utf-8"))
|
||
df = read_file(f)
|
||
assert df.iloc[0]["note"] == 'curly "hello" world'
|
||
|
||
def test_repair_false_preserves_smart_quotes(self, tmp_path):
|
||
f = tmp_path / "quoted.csv"
|
||
f.write_bytes("id,note\n1,curly “hello” world\n".encode("utf-8"))
|
||
df = read_file(f, repair=False)
|
||
assert "“" in df.iloc[0]["note"] or "”" in df.iloc[0]["note"]
|
||
|
||
def test_chunked_read_skips_repair(self, tmp_path):
|
||
# Chunked reads bypass repair (memory budget). Verify they still work.
|
||
rows = "id,name\n" + "\n".join(f"{i},Alice" for i in range(1, 21))
|
||
f = tmp_path / "chunked.csv"
|
||
f.write_text(rows)
|
||
chunks = list(read_file(f, chunk_size=5))
|
||
total = sum(len(c) for c in chunks)
|
||
assert total == 20
|
||
|
||
|
||
class TestReadCsvRepaired:
|
||
def test_recovers_malformed_currency_row(self, tmp_path):
|
||
f = tmp_path / "bad.csv"
|
||
f.write_bytes(
|
||
b"id,price,qty\n"
|
||
b"1,100,5\n"
|
||
b"2, $1,500.00 ,7\n"
|
||
b"3,200,9\n"
|
||
)
|
||
df, repair = read_csv_repaired(f)
|
||
assert len(df) == 3
|
||
assert "1,500.00" in df.iloc[1]["price"]
|
||
assert repair.changed
|
||
|
||
def test_passthrough_when_clean(self, tmp_path):
|
||
f = tmp_path / "ok.csv"
|
||
f.write_bytes(b"id,name\n1,Alice\n2,Bob\n")
|
||
df, repair = read_csv_repaired(f)
|
||
assert len(df) == 2
|
||
assert repair.changed is False
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Round-trip integrity (audit GAP-19, GAP-21)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestRoundTrip:
|
||
def test_csv_roundtrip_preserves_values(self, tmp_path):
|
||
df = pd.DataFrame({
|
||
"id": ["1", "2", "3"],
|
||
"name": ["Alice", "Bob", "Carol"],
|
||
"amount": ["10.50", "20.25", "30.00"],
|
||
})
|
||
path = tmp_path / "rt.csv"
|
||
write_file(df, path)
|
||
loaded = read_file(path)
|
||
assert list(loaded.columns) == list(df.columns)
|
||
assert len(loaded) == len(df)
|
||
for col in df.columns:
|
||
assert list(loaded[col]) == list(df[col])
|
||
|
||
def test_tsv_roundtrip_via_extension(self, tmp_path):
|
||
df = pd.DataFrame({"a": ["1", "2"], "b": ["x", "y, z"]})
|
||
path = tmp_path / "rt.tsv"
|
||
write_file(df, path)
|
||
# Confirm tab is used and embedded comma in 'b' survives.
|
||
loaded = read_file(path)
|
||
assert list(loaded.columns) == ["a", "b"]
|
||
assert loaded.iloc[1]["b"] == "y, z"
|
||
|
||
def test_semicolon_roundtrip_via_explicit_delimiter(self, tmp_path):
|
||
df = pd.DataFrame({"a": ["1", "2"], "b": ["x", "y"]})
|
||
path = tmp_path / "rt.csv"
|
||
write_file(df, path, delimiter=";")
|
||
loaded = read_file(path)
|
||
assert list(loaded.columns) == ["a", "b"]
|
||
assert loaded.iloc[0]["a"] == "1"
|
||
|
||
def test_utf8_bom_non_ascii_roundtrip(self, tmp_path):
|
||
df = pd.DataFrame({"name": ["café", "naïve", "résumé"]})
|
||
path = tmp_path / "utf8.csv"
|
||
write_file(df, path)
|
||
loaded = read_file(path)
|
||
assert list(loaded["name"]) == ["café", "naïve", "résumé"]
|
||
|
||
|
||
class TestExcelHeaderDetection:
|
||
def test_excel_with_metadata_rows(self, tmp_path):
|
||
from openpyxl import Workbook
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
# Two leading blank rows + header + data.
|
||
ws.append(["Report generated 2024-01-15", None, None])
|
||
ws.append([None, None, None])
|
||
ws.append(["name", "email", "phone"])
|
||
ws.append(["alice", "a@x.com", "555-1234"])
|
||
ws.append(["bob", "b@x.com", "555-5678"])
|
||
path = tmp_path / "report.xlsx"
|
||
wb.save(path)
|
||
df = read_file(path)
|
||
# Auto-detected header row 2 → columns are name/email/phone
|
||
assert list(df.columns) == ["name", "email", "phone"]
|
||
assert len(df) == 2
|
||
|
||
def test_excel_normal_header_row_zero(self, tmp_path):
|
||
from openpyxl import Workbook
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
ws.append(["name", "email"])
|
||
ws.append(["alice", "a@x.com"])
|
||
path = tmp_path / "normal.xlsx"
|
||
wb.save(path)
|
||
df = read_file(path)
|
||
assert list(df.columns) == ["name", "email"]
|
||
assert len(df) == 1
|