Previously only analyze() and direct read_csv_repaired() callers got the byte-level repair pass (BOM strip, NUL strip, smart-double-quote fold, unquoted-delimiter merge). The dedup CLI and any other read_file consumer silently missed it. read_file gains a repair=True default. CSV/TSV inputs run through repair_bytes before pandas sees them; Excel inputs still pass through unchanged. Chunked reads (chunk_size set) bypass repair because the pre- parse pass loads the whole file — preserving streaming behavior on huge files. Repair actions and unrepairable lines are logged at INFO/WARNING. cli_text_clean opts out (repair=False): the cleaner offers fine-grained control via --preset and per-op flags, and a byte-level smart-quote fold under the user's "minimal" preset would violate that contract. The cell-level cleaner does the equivalent work itself when its options ask for it. Tests: read_file default strips BOM and folds curly double quotes; repair=False preserves smart quotes; chunked reads still work and skip repair as documented. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
264 lines
9.4 KiB
Python
264 lines
9.4 KiB
Python
"""Tests for src.core.io — file reading, encoding/delimiter detection."""
|
||
|
||
import io
|
||
|
||
import pandas as pd
|
||
import pytest
|
||
from pathlib import Path
|
||
|
||
from src.core.io import (
|
||
detect_encoding,
|
||
detect_delimiter,
|
||
detect_header_row,
|
||
read_file,
|
||
write_file,
|
||
list_sheets,
|
||
repair_bytes,
|
||
read_csv_repaired,
|
||
)
|
||
|
||
|
||
class TestDetectEncoding:
|
||
def test_utf8_file(self, sample_csv_path):
|
||
enc = detect_encoding(sample_csv_path)
|
||
assert enc.lower().replace("-", "") in ("utf8", "ascii", "utf8sig")
|
||
|
||
def test_empty_file(self, tmp_path):
|
||
f = tmp_path / "empty.csv"
|
||
f.write_bytes(b"")
|
||
assert detect_encoding(f) == "utf-8"
|
||
|
||
def test_bom_file(self, tmp_path):
|
||
f = tmp_path / "bom.csv"
|
||
f.write_bytes(b"\xef\xbb\xbfname,email\nAlice,a@b.com\n")
|
||
assert detect_encoding(f) == "utf-8-sig"
|
||
|
||
def test_latin1_file(self, tmp_path):
|
||
f = tmp_path / "latin.csv"
|
||
content = "name,city\nJosé,São Paulo\n".encode("latin-1")
|
||
f.write_bytes(content)
|
||
enc = detect_encoding(f)
|
||
# Should detect something compatible with latin-1 family
|
||
assert enc in ("iso-8859-1", "latin-1", "windows-1252", "cp1252",
|
||
"iso-8859-9", "cp1250", "iso-8859-15", "utf-8")
|
||
|
||
|
||
class TestDetectDelimiter:
|
||
def test_comma(self, sample_csv_path):
|
||
assert detect_delimiter(sample_csv_path) == ","
|
||
|
||
def test_tab(self, tmp_path):
|
||
f = tmp_path / "tabs.tsv"
|
||
f.write_text("name\temail\nAlice\ta@b.com\n")
|
||
assert detect_delimiter(f) == "\t"
|
||
|
||
def test_semicolon(self, tmp_path):
|
||
f = tmp_path / "semi.csv"
|
||
f.write_text("name;email;phone\nAlice;a@b.com;555\n")
|
||
assert detect_delimiter(f) == ";"
|
||
|
||
def test_pipe(self, tmp_path):
|
||
f = tmp_path / "pipe.csv"
|
||
f.write_text("name|email|phone\nAlice|a@b.com|555\n")
|
||
assert detect_delimiter(f) == "|"
|
||
|
||
|
||
class TestDetectHeaderRow:
|
||
def test_standard_csv(self, sample_csv_path):
|
||
assert detect_header_row(sample_csv_path) == 0
|
||
|
||
def test_with_junk_rows(self, tmp_path):
|
||
f = tmp_path / "junk.csv"
|
||
f.write_text("Report generated 2024-01-01\n\nname,email,phone\nAlice,a@b.com,555\n")
|
||
# Row 0 has "Report generated..." which is a single non-numeric string
|
||
# Row 2 has "name,email,phone" which looks like headers
|
||
# The heuristic checks all cells, so row 0 may match if it's a single cell
|
||
hdr = detect_header_row(f)
|
||
assert hdr in (0, 2) # depends on delimiter detection
|
||
|
||
|
||
class TestReadFile:
|
||
def test_read_csv(self, sample_csv_path):
|
||
df = read_file(sample_csv_path)
|
||
assert isinstance(df, pd.DataFrame)
|
||
assert len(df) == 50
|
||
assert "customer_name" in df.columns
|
||
|
||
def test_read_nonexistent(self):
|
||
with pytest.raises(FileNotFoundError):
|
||
read_file("/tmp/nonexistent_file_xyz.csv")
|
||
|
||
def test_read_with_encoding_override(self, sample_csv_path):
|
||
df = read_file(sample_csv_path, encoding="utf-8")
|
||
assert len(df) == 50
|
||
|
||
def test_chunked_reading(self, sample_csv_path):
|
||
chunks = read_file(sample_csv_path, chunk_size=10)
|
||
# Should be a generator
|
||
all_chunks = list(chunks)
|
||
assert len(all_chunks) == 5
|
||
total_rows = sum(len(c) for c in all_chunks)
|
||
assert total_rows == 50
|
||
|
||
|
||
class TestWriteFile:
|
||
def test_write_csv(self, tmp_path, simple_df):
|
||
out = tmp_path / "output.csv"
|
||
write_file(simple_df, out)
|
||
assert out.exists()
|
||
# Read back
|
||
df = pd.read_csv(out, encoding="utf-8-sig")
|
||
assert len(df) == len(simple_df)
|
||
|
||
def test_write_xlsx(self, tmp_path, simple_df):
|
||
out = tmp_path / "output.xlsx"
|
||
write_file(simple_df, out)
|
||
assert out.exists()
|
||
df = pd.read_excel(out)
|
||
assert len(df) == len(simple_df)
|
||
|
||
def test_utf8_bom_default(self, tmp_path, simple_df):
|
||
out = tmp_path / "bom.csv"
|
||
write_file(simple_df, out)
|
||
raw = out.read_bytes()
|
||
assert raw[:3] == b"\xef\xbb\xbf"
|
||
|
||
|
||
class TestListSheets:
|
||
def test_list_sheets(self, tmp_path, simple_df):
|
||
path = tmp_path / "multi.xlsx"
|
||
with pd.ExcelWriter(path, engine="openpyxl") as writer:
|
||
simple_df.to_excel(writer, sheet_name="Sheet1", index=False)
|
||
simple_df.to_excel(writer, sheet_name="Sheet2", index=False)
|
||
sheets = list_sheets(path)
|
||
assert sheets == ["Sheet1", "Sheet2"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Pre-parse repair
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestRepairBytes:
|
||
def test_strips_bom(self):
|
||
raw = b"\xef\xbb\xbfid,name\n1,Alice\n"
|
||
result = repair_bytes(raw)
|
||
assert result.repaired_bytes == b"id,name\n1,Alice\n"
|
||
assert any(a.kind == "strip_bom" for a in result.actions)
|
||
|
||
def test_strips_nul_bytes(self):
|
||
raw = b"id,name\n1,Hel\x00lo\n2,Wo\x00\x00rld\n"
|
||
result = repair_bytes(raw)
|
||
assert b"\x00" not in result.repaired_bytes
|
||
nul_action = next(a for a in result.actions if a.kind == "strip_nul")
|
||
assert "3" in nul_action.detail # 3 NUL bytes
|
||
|
||
def test_folds_smart_double_quotes(self):
|
||
raw = "id,note\n1,“hello”\n2,«bonjour»\n".encode("utf-8")
|
||
result = repair_bytes(raw)
|
||
text = result.repaired_bytes.decode("utf-8")
|
||
assert "“" not in text and "”" not in text
|
||
assert "«" not in text and "»" not in text
|
||
assert any(a.kind == "fold_smart_quote" for a in result.actions)
|
||
|
||
def test_does_not_fold_curly_singles(self):
|
||
# Single curly quotes should pass through; cell-level cleaner handles them.
|
||
raw = "id,note\n1,it’s fine\n".encode("utf-8")
|
||
result = repair_bytes(raw)
|
||
text = result.repaired_bytes.decode("utf-8")
|
||
assert "’" in text
|
||
assert not any(a.kind == "fold_smart_quote" for a in result.actions)
|
||
|
||
def test_no_changes_when_clean(self):
|
||
raw = b"id,name\n1,Alice\n2,Bob\n"
|
||
result = repair_bytes(raw)
|
||
assert result.repaired_bytes == raw
|
||
assert result.actions == []
|
||
assert result.changed is False
|
||
|
||
def test_repairs_unquoted_currency_comma(self):
|
||
raw = (
|
||
b"id,price,qty\n"
|
||
b"1,100,5\n"
|
||
b"2, $1,500.00 ,7\n" # 4 fields instead of 3
|
||
b"3,200,9\n"
|
||
)
|
||
result = repair_bytes(raw)
|
||
# After repair, every row should have 3 fields when re-parsed.
|
||
df = pd.read_csv(io.BytesIO(result.repaired_bytes))
|
||
assert list(df.columns) == ["id", "price", "qty"]
|
||
assert len(df) == 3
|
||
assert any(a.kind == "quote_unquoted_delim" and a.line == 3 for a in result.actions)
|
||
|
||
def test_logs_unrepairable_when_ambiguous(self):
|
||
# Two adjacent merge candidates -> bail out, log unrepairable.
|
||
raw = (
|
||
b"id,a,b,c\n"
|
||
b"1,foo,bar,baz\n"
|
||
b"2,1,2,3,4,5\n" # way too many extras, no clear merge
|
||
)
|
||
result = repair_bytes(raw)
|
||
assert 3 in result.unrepairable_lines
|
||
|
||
def test_summary_groups_by_kind(self):
|
||
raw = b"\xef\xbb\xbfid,name\n1,Hel\x00lo\n"
|
||
result = repair_bytes(raw)
|
||
summary = result.summary()
|
||
assert summary.get("strip_bom") == 1
|
||
assert summary.get("strip_nul") == 1
|
||
|
||
|
||
class TestReadFileWithRepair:
|
||
"""``read_file(repair=True)`` (default) routes CSV through repair_bytes."""
|
||
|
||
def test_default_strips_bom_via_repair(self, tmp_path):
|
||
f = tmp_path / "bom.csv"
|
||
f.write_bytes(b"\xef\xbb\xbfid,name\n1,Alice\n")
|
||
df = read_file(f)
|
||
# First column header must be 'id', not 'id'.
|
||
assert list(df.columns)[0] == "id"
|
||
|
||
def test_default_folds_smart_double_quotes(self, tmp_path):
|
||
# Curly quotes are *unquoted* here — outer ASCII quotes would create
|
||
# a CSV-quoting collision once the fold runs.
|
||
f = tmp_path / "quoted.csv"
|
||
f.write_bytes("id,note\n1,curly “hello” world\n".encode("utf-8"))
|
||
df = read_file(f)
|
||
assert df.iloc[0]["note"] == 'curly "hello" world'
|
||
|
||
def test_repair_false_preserves_smart_quotes(self, tmp_path):
|
||
f = tmp_path / "quoted.csv"
|
||
f.write_bytes("id,note\n1,curly “hello” world\n".encode("utf-8"))
|
||
df = read_file(f, repair=False)
|
||
assert "“" in df.iloc[0]["note"] or "”" in df.iloc[0]["note"]
|
||
|
||
def test_chunked_read_skips_repair(self, tmp_path):
|
||
# Chunked reads bypass repair (memory budget). Verify they still work.
|
||
rows = "id,name\n" + "\n".join(f"{i},Alice" for i in range(1, 21))
|
||
f = tmp_path / "chunked.csv"
|
||
f.write_text(rows)
|
||
chunks = list(read_file(f, chunk_size=5))
|
||
total = sum(len(c) for c in chunks)
|
||
assert total == 20
|
||
|
||
|
||
class TestReadCsvRepaired:
|
||
def test_recovers_malformed_currency_row(self, tmp_path):
|
||
f = tmp_path / "bad.csv"
|
||
f.write_bytes(
|
||
b"id,price,qty\n"
|
||
b"1,100,5\n"
|
||
b"2, $1,500.00 ,7\n"
|
||
b"3,200,9\n"
|
||
)
|
||
df, repair = read_csv_repaired(f)
|
||
assert len(df) == 3
|
||
assert "1,500.00" in df.iloc[1]["price"]
|
||
assert repair.changed
|
||
|
||
def test_passthrough_when_clean(self, tmp_path):
|
||
f = tmp_path / "ok.csv"
|
||
f.write_bytes(b"id,name\n1,Alice\n2,Bob\n")
|
||
df, repair = read_csv_repaired(f)
|
||
assert len(df) == 2
|
||
assert repair.changed is False
|