Some pollution patterns block pandas before the cell-level cleaner can run.
Add a pre-parse pass on raw bytes that fixes only what breaks parsing, and
returns a structured action log the GUI/CLI can surface to the user.
repair_bytes(raw, *, encoding, delimiter, fold_quotes, strip_nul, repair_delims):
1. Strip leading UTF-8 BOM.
2. Strip embedded NUL bytes (the C parser truncates fields at NUL).
3. Fold smart double quotes (curly, guillemet, double-prime) to ASCII '"'.
Curly singles are NOT folded here; they don't conflict with CSV and the
cell-level cleaner handles them more accurately.
4. Per-row repair when one rogue delimiter is embedded in a field that
looks like currency or thousands-grouped digits. Tiered scoring keeps
" $1,500.00 ,7" unambiguous: the strict currency regex match wins
over the loose digit/sigil heuristic.
read_csv_repaired(path) -> (DataFrame, RepairResult). RepairResult exposes
.actions, .unrepairable_lines, and a summary() grouped by kind.
Out of scope for this pass: encoding repair, delimiter conversion, multi-
delimiter merges (k>1) — logged as unrepairable so callers can see what was
left alone instead of silently parsing wrong.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
230 lines
7.9 KiB
Python
230 lines
7.9 KiB
Python
"""Tests for src.core.io — file reading, encoding/delimiter detection."""
|
||
|
||
import io
|
||
|
||
import pandas as pd
|
||
import pytest
|
||
from pathlib import Path
|
||
|
||
from src.core.io import (
|
||
detect_encoding,
|
||
detect_delimiter,
|
||
detect_header_row,
|
||
read_file,
|
||
write_file,
|
||
list_sheets,
|
||
repair_bytes,
|
||
read_csv_repaired,
|
||
)
|
||
|
||
|
||
class TestDetectEncoding:
|
||
def test_utf8_file(self, sample_csv_path):
|
||
enc = detect_encoding(sample_csv_path)
|
||
assert enc.lower().replace("-", "") in ("utf8", "ascii", "utf8sig")
|
||
|
||
def test_empty_file(self, tmp_path):
|
||
f = tmp_path / "empty.csv"
|
||
f.write_bytes(b"")
|
||
assert detect_encoding(f) == "utf-8"
|
||
|
||
def test_bom_file(self, tmp_path):
|
||
f = tmp_path / "bom.csv"
|
||
f.write_bytes(b"\xef\xbb\xbfname,email\nAlice,a@b.com\n")
|
||
assert detect_encoding(f) == "utf-8-sig"
|
||
|
||
def test_latin1_file(self, tmp_path):
|
||
f = tmp_path / "latin.csv"
|
||
content = "name,city\nJosé,São Paulo\n".encode("latin-1")
|
||
f.write_bytes(content)
|
||
enc = detect_encoding(f)
|
||
# Should detect something compatible with latin-1 family
|
||
assert enc in ("iso-8859-1", "latin-1", "windows-1252", "cp1252",
|
||
"iso-8859-9", "cp1250", "iso-8859-15", "utf-8")
|
||
|
||
|
||
class TestDetectDelimiter:
|
||
def test_comma(self, sample_csv_path):
|
||
assert detect_delimiter(sample_csv_path) == ","
|
||
|
||
def test_tab(self, tmp_path):
|
||
f = tmp_path / "tabs.tsv"
|
||
f.write_text("name\temail\nAlice\ta@b.com\n")
|
||
assert detect_delimiter(f) == "\t"
|
||
|
||
def test_semicolon(self, tmp_path):
|
||
f = tmp_path / "semi.csv"
|
||
f.write_text("name;email;phone\nAlice;a@b.com;555\n")
|
||
assert detect_delimiter(f) == ";"
|
||
|
||
def test_pipe(self, tmp_path):
|
||
f = tmp_path / "pipe.csv"
|
||
f.write_text("name|email|phone\nAlice|a@b.com|555\n")
|
||
assert detect_delimiter(f) == "|"
|
||
|
||
|
||
class TestDetectHeaderRow:
|
||
def test_standard_csv(self, sample_csv_path):
|
||
assert detect_header_row(sample_csv_path) == 0
|
||
|
||
def test_with_junk_rows(self, tmp_path):
|
||
f = tmp_path / "junk.csv"
|
||
f.write_text("Report generated 2024-01-01\n\nname,email,phone\nAlice,a@b.com,555\n")
|
||
# Row 0 has "Report generated..." which is a single non-numeric string
|
||
# Row 2 has "name,email,phone" which looks like headers
|
||
# The heuristic checks all cells, so row 0 may match if it's a single cell
|
||
hdr = detect_header_row(f)
|
||
assert hdr in (0, 2) # depends on delimiter detection
|
||
|
||
|
||
class TestReadFile:
|
||
def test_read_csv(self, sample_csv_path):
|
||
df = read_file(sample_csv_path)
|
||
assert isinstance(df, pd.DataFrame)
|
||
assert len(df) == 50
|
||
assert "customer_name" in df.columns
|
||
|
||
def test_read_nonexistent(self):
|
||
with pytest.raises(FileNotFoundError):
|
||
read_file("/tmp/nonexistent_file_xyz.csv")
|
||
|
||
def test_read_with_encoding_override(self, sample_csv_path):
|
||
df = read_file(sample_csv_path, encoding="utf-8")
|
||
assert len(df) == 50
|
||
|
||
def test_chunked_reading(self, sample_csv_path):
|
||
chunks = read_file(sample_csv_path, chunk_size=10)
|
||
# Should be a generator
|
||
all_chunks = list(chunks)
|
||
assert len(all_chunks) == 5
|
||
total_rows = sum(len(c) for c in all_chunks)
|
||
assert total_rows == 50
|
||
|
||
|
||
class TestWriteFile:
|
||
def test_write_csv(self, tmp_path, simple_df):
|
||
out = tmp_path / "output.csv"
|
||
write_file(simple_df, out)
|
||
assert out.exists()
|
||
# Read back
|
||
df = pd.read_csv(out, encoding="utf-8-sig")
|
||
assert len(df) == len(simple_df)
|
||
|
||
def test_write_xlsx(self, tmp_path, simple_df):
|
||
out = tmp_path / "output.xlsx"
|
||
write_file(simple_df, out)
|
||
assert out.exists()
|
||
df = pd.read_excel(out)
|
||
assert len(df) == len(simple_df)
|
||
|
||
def test_utf8_bom_default(self, tmp_path, simple_df):
|
||
out = tmp_path / "bom.csv"
|
||
write_file(simple_df, out)
|
||
raw = out.read_bytes()
|
||
assert raw[:3] == b"\xef\xbb\xbf"
|
||
|
||
|
||
class TestListSheets:
|
||
def test_list_sheets(self, tmp_path, simple_df):
|
||
path = tmp_path / "multi.xlsx"
|
||
with pd.ExcelWriter(path, engine="openpyxl") as writer:
|
||
simple_df.to_excel(writer, sheet_name="Sheet1", index=False)
|
||
simple_df.to_excel(writer, sheet_name="Sheet2", index=False)
|
||
sheets = list_sheets(path)
|
||
assert sheets == ["Sheet1", "Sheet2"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Pre-parse repair
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestRepairBytes:
|
||
def test_strips_bom(self):
|
||
raw = b"\xef\xbb\xbfid,name\n1,Alice\n"
|
||
result = repair_bytes(raw)
|
||
assert result.repaired_bytes == b"id,name\n1,Alice\n"
|
||
assert any(a.kind == "strip_bom" for a in result.actions)
|
||
|
||
def test_strips_nul_bytes(self):
|
||
raw = b"id,name\n1,Hel\x00lo\n2,Wo\x00\x00rld\n"
|
||
result = repair_bytes(raw)
|
||
assert b"\x00" not in result.repaired_bytes
|
||
nul_action = next(a for a in result.actions if a.kind == "strip_nul")
|
||
assert "3" in nul_action.detail # 3 NUL bytes
|
||
|
||
def test_folds_smart_double_quotes(self):
|
||
raw = "id,note\n1,“hello”\n2,«bonjour»\n".encode("utf-8")
|
||
result = repair_bytes(raw)
|
||
text = result.repaired_bytes.decode("utf-8")
|
||
assert "“" not in text and "”" not in text
|
||
assert "«" not in text and "»" not in text
|
||
assert any(a.kind == "fold_smart_quote" for a in result.actions)
|
||
|
||
def test_does_not_fold_curly_singles(self):
|
||
# Single curly quotes should pass through; cell-level cleaner handles them.
|
||
raw = "id,note\n1,it’s fine\n".encode("utf-8")
|
||
result = repair_bytes(raw)
|
||
text = result.repaired_bytes.decode("utf-8")
|
||
assert "’" in text
|
||
assert not any(a.kind == "fold_smart_quote" for a in result.actions)
|
||
|
||
def test_no_changes_when_clean(self):
|
||
raw = b"id,name\n1,Alice\n2,Bob\n"
|
||
result = repair_bytes(raw)
|
||
assert result.repaired_bytes == raw
|
||
assert result.actions == []
|
||
assert result.changed is False
|
||
|
||
def test_repairs_unquoted_currency_comma(self):
|
||
raw = (
|
||
b"id,price,qty\n"
|
||
b"1,100,5\n"
|
||
b"2, $1,500.00 ,7\n" # 4 fields instead of 3
|
||
b"3,200,9\n"
|
||
)
|
||
result = repair_bytes(raw)
|
||
# After repair, every row should have 3 fields when re-parsed.
|
||
df = pd.read_csv(io.BytesIO(result.repaired_bytes))
|
||
assert list(df.columns) == ["id", "price", "qty"]
|
||
assert len(df) == 3
|
||
assert any(a.kind == "quote_unquoted_delim" and a.line == 3 for a in result.actions)
|
||
|
||
def test_logs_unrepairable_when_ambiguous(self):
|
||
# Two adjacent merge candidates -> bail out, log unrepairable.
|
||
raw = (
|
||
b"id,a,b,c\n"
|
||
b"1,foo,bar,baz\n"
|
||
b"2,1,2,3,4,5\n" # way too many extras, no clear merge
|
||
)
|
||
result = repair_bytes(raw)
|
||
assert 3 in result.unrepairable_lines
|
||
|
||
def test_summary_groups_by_kind(self):
|
||
raw = b"\xef\xbb\xbfid,name\n1,Hel\x00lo\n"
|
||
result = repair_bytes(raw)
|
||
summary = result.summary()
|
||
assert summary.get("strip_bom") == 1
|
||
assert summary.get("strip_nul") == 1
|
||
|
||
|
||
class TestReadCsvRepaired:
|
||
def test_recovers_malformed_currency_row(self, tmp_path):
|
||
f = tmp_path / "bad.csv"
|
||
f.write_bytes(
|
||
b"id,price,qty\n"
|
||
b"1,100,5\n"
|
||
b"2, $1,500.00 ,7\n"
|
||
b"3,200,9\n"
|
||
)
|
||
df, repair = read_csv_repaired(f)
|
||
assert len(df) == 3
|
||
assert "1,500.00" in df.iloc[1]["price"]
|
||
assert repair.changed
|
||
|
||
def test_passthrough_when_clean(self, tmp_path):
|
||
f = tmp_path / "ok.csv"
|
||
f.write_bytes(b"id,name\n1,Alice\n2,Bob\n")
|
||
df, repair = read_csv_repaired(f)
|
||
assert len(df) == 2
|
||
assert repair.changed is False
|