Introduces src/core/errors.py with a small structured error hierarchy
that every public entry point now uses. Each error carries the
context a user needs to fix it and the context a maintainer needs to
trace it.
The hierarchy:
DataToolsError (base — formats path, column, operation, suggestion)
InputValidationError (extends ValueError — bad arg / wrong type)
ConfigError (extends ValueError — bad config / options)
FileFormatError (extends ValueError — file is not what we expected)
FileAccessError (extends OSError — file I/O failure)
Subclassing the stdlib bases means existing `except OSError` /
`except ValueError` handlers still catch them — no breaking change.
Helpers:
- ensure_dataframe(value, function=...) — uniform DataFrame guard
- ensure_choice(value, name=, choices=) — uniform enum/literal guard
- wrap_file_read(path, op, exc) — tag OSError with hint + path
- wrap_file_write(path, op, exc) — same, with Windows-aware tip
- format_for_user(exc, context=) — user-facing string for st.error / stderr
Library hardening:
- io.read_file: missing files surface FileAccessError listing whether
the parent directory exists, and the suggestion to check the path.
- io.read_file: chunk_size <= 0 now raises InputValidationError with
a positive-integer suggestion.
- io._read_excel: openpyxl BadZipFile / InvalidFileException / pandas
ValueError ("sheet not found") wrapped as FileFormatError listing
the path and a "list sheets with list_sheets()" hint.
- io._detect_excel_header_row: bare except narrowed to specific
openpyxl exceptions; falls back gracefully and logs at debug so
the real error surfaces from pd.read_excel.
- io.write_file: OSError / PermissionError on to_csv/to_excel wrapped
with file path and Windows-aware "file may be open in another
program" hint.
- dedup._parse_date: bare `except Exception` narrowed to
(TypeError, ValueError, OutOfBoundsDatetime); failed values
logged at debug for survivor-selection forensics.
- dedup._select_survivor: KEEP_MOST_RECENT now raises
InputValidationError instead of silently falling back to keep_first.
- dedup.deduplicate: input validation errors are InputValidationError
with operation/column/suggestion fields.
- format_standardize.from_dict: invalid FieldType for a column raises
ConfigError naming the column AND the bad value AND listing valid
values; same for date_order / phone_format / etc.
- format_standardize.from_file: OSError / JSON decode wrapped with
path AND line/column where parsing failed.
- format_standardize.to_file: TypeError on json.dumps wrapped as
ConfigError with the suspected source (extra_abbreviations).
- format_standardize._apply_field_type: dispatcher's "unknown field
type" branch now raises AssertionError (it's an internal invariant,
not user error — a new enum value was added without a branch).
- format_standardize._resolve_column_types: missing-column error now
InputValidationError with a "check for typos / unparsed header"
suggestion.
- format_standardize.standardize_dataframe: ensure_dataframe at entry.
- text_clean.clean_dataframe: ensure_dataframe at entry.
- config.to_strategies: invalid Algorithm/NormalizerType wrapped as
ConfigError naming the strategy index AND the column.
- config.to_survivor_rule: invalid SurvivorRule wrapped as ConfigError
listing valid values.
- config.from_file: OSError / JSON decode wrapped (mirror of
StandardizeOptions.from_file).
- fixes.repair_mojibake: ImportError on ftfy now logged at info level
with the underlying ImportError so a corrupt-package vs not-installed
distinction is visible in the logs.
- normalizers.normalize_phone: phonenumbers.NumberParseException now
logged at debug when the digits-only fallback drops extension /
country-code information — gives a trail when matching results
look wrong.
GUI / CLI surfaces:
- All 9 page handlers (`except Exception as e: st.error(...)`) now
use format_for_user(), which renders DataToolsError fields nicely
and falls back to "ClassName: message" for unrecognized errors.
- 2_Text_Cleaner and 3_Format_Standardizer additionally distinguish
UnicodeDecodeError with an "re-save as UTF-8" suggestion before
the generic handler.
- cli.py's "Error reading file" handler now uses format_for_user()
and includes the input path in the prefix.
Tests:
- tests/test_errors.py — 22 new tests covering: base class formatting,
stdlib inheritance, ensure_dataframe / ensure_choice helpers,
wrap_file_read / wrap_file_write, format_for_user behavior, and
end-to-end integration (missing file, missing dir, bad JSON, bad
algorithm, bad enum, missing column).
- tests/test_audit_fixes.py + tests/test_io.py — updated 4 tests for
the new exception types (InputValidationError replaces TypeError,
FileAccessError extends OSError).
Full project suite: 1230 passed, 4 skipped, 17 xfailed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
342 lines
12 KiB
Python
342 lines
12 KiB
Python
"""Tests for src.core.io — file reading, encoding/delimiter detection."""
|
||
|
||
import io
|
||
|
||
import pandas as pd
|
||
import pytest
|
||
from pathlib import Path
|
||
|
||
from src.core.io import (
|
||
detect_encoding,
|
||
detect_delimiter,
|
||
detect_header_row,
|
||
read_file,
|
||
write_file,
|
||
list_sheets,
|
||
repair_bytes,
|
||
read_csv_repaired,
|
||
)
|
||
|
||
|
||
class TestDetectEncoding:
|
||
def test_utf8_file(self, sample_csv_path):
|
||
enc = detect_encoding(sample_csv_path)
|
||
assert enc.lower().replace("-", "") in ("utf8", "ascii", "utf8sig")
|
||
|
||
def test_empty_file(self, tmp_path):
|
||
f = tmp_path / "empty.csv"
|
||
f.write_bytes(b"")
|
||
assert detect_encoding(f) == "utf-8"
|
||
|
||
def test_bom_file(self, tmp_path):
|
||
f = tmp_path / "bom.csv"
|
||
f.write_bytes(b"\xef\xbb\xbfname,email\nAlice,a@b.com\n")
|
||
assert detect_encoding(f) == "utf-8-sig"
|
||
|
||
def test_latin1_file(self, tmp_path):
|
||
f = tmp_path / "latin.csv"
|
||
content = "name,city\nJosé,São Paulo\n".encode("latin-1")
|
||
f.write_bytes(content)
|
||
enc = detect_encoding(f)
|
||
# Should detect something compatible with latin-1 family
|
||
assert enc in ("iso-8859-1", "latin-1", "windows-1252", "cp1252",
|
||
"iso-8859-9", "cp1250", "iso-8859-15", "utf-8")
|
||
|
||
|
||
class TestDetectDelimiter:
|
||
def test_comma(self, sample_csv_path):
|
||
assert detect_delimiter(sample_csv_path) == ","
|
||
|
||
def test_tab(self, tmp_path):
|
||
f = tmp_path / "tabs.tsv"
|
||
f.write_text("name\temail\nAlice\ta@b.com\n")
|
||
assert detect_delimiter(f) == "\t"
|
||
|
||
def test_semicolon(self, tmp_path):
|
||
f = tmp_path / "semi.csv"
|
||
f.write_text("name;email;phone\nAlice;a@b.com;555\n")
|
||
assert detect_delimiter(f) == ";"
|
||
|
||
def test_pipe(self, tmp_path):
|
||
f = tmp_path / "pipe.csv"
|
||
f.write_text("name|email|phone\nAlice|a@b.com|555\n")
|
||
assert detect_delimiter(f) == "|"
|
||
|
||
|
||
class TestDetectHeaderRow:
|
||
def test_standard_csv(self, sample_csv_path):
|
||
assert detect_header_row(sample_csv_path) == 0
|
||
|
||
def test_with_junk_rows(self, tmp_path):
|
||
f = tmp_path / "junk.csv"
|
||
f.write_text("Report generated 2024-01-01\n\nname,email,phone\nAlice,a@b.com,555\n")
|
||
# Row 0 has "Report generated..." which is a single non-numeric string
|
||
# Row 2 has "name,email,phone" which looks like headers
|
||
# The heuristic checks all cells, so row 0 may match if it's a single cell
|
||
hdr = detect_header_row(f)
|
||
assert hdr in (0, 2) # depends on delimiter detection
|
||
|
||
|
||
class TestReadFile:
|
||
def test_read_csv(self, sample_csv_path):
|
||
df = read_file(sample_csv_path)
|
||
assert isinstance(df, pd.DataFrame)
|
||
assert len(df) == 50
|
||
assert "customer_name" in df.columns
|
||
|
||
def test_read_nonexistent(self):
|
||
# FileAccessError extends OSError so existing `except OSError`
|
||
# handlers still catch it.
|
||
from src.core.errors import FileAccessError
|
||
with pytest.raises((FileAccessError, OSError)):
|
||
read_file("/tmp/nonexistent_file_xyz.csv")
|
||
|
||
def test_read_with_encoding_override(self, sample_csv_path):
|
||
df = read_file(sample_csv_path, encoding="utf-8")
|
||
assert len(df) == 50
|
||
|
||
def test_chunked_reading(self, sample_csv_path):
|
||
chunks = read_file(sample_csv_path, chunk_size=10)
|
||
# Should be a generator
|
||
all_chunks = list(chunks)
|
||
assert len(all_chunks) == 5
|
||
total_rows = sum(len(c) for c in all_chunks)
|
||
assert total_rows == 50
|
||
|
||
|
||
class TestWriteFile:
|
||
def test_write_csv(self, tmp_path, simple_df):
|
||
out = tmp_path / "output.csv"
|
||
write_file(simple_df, out)
|
||
assert out.exists()
|
||
# Read back
|
||
df = pd.read_csv(out, encoding="utf-8-sig")
|
||
assert len(df) == len(simple_df)
|
||
|
||
def test_write_xlsx(self, tmp_path, simple_df):
|
||
out = tmp_path / "output.xlsx"
|
||
write_file(simple_df, out)
|
||
assert out.exists()
|
||
df = pd.read_excel(out)
|
||
assert len(df) == len(simple_df)
|
||
|
||
def test_utf8_bom_default(self, tmp_path, simple_df):
|
||
out = tmp_path / "bom.csv"
|
||
write_file(simple_df, out)
|
||
raw = out.read_bytes()
|
||
assert raw[:3] == b"\xef\xbb\xbf"
|
||
|
||
|
||
class TestListSheets:
|
||
def test_list_sheets(self, tmp_path, simple_df):
|
||
path = tmp_path / "multi.xlsx"
|
||
with pd.ExcelWriter(path, engine="openpyxl") as writer:
|
||
simple_df.to_excel(writer, sheet_name="Sheet1", index=False)
|
||
simple_df.to_excel(writer, sheet_name="Sheet2", index=False)
|
||
sheets = list_sheets(path)
|
||
assert sheets == ["Sheet1", "Sheet2"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Pre-parse repair
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestRepairBytes:
|
||
def test_strips_bom(self):
|
||
raw = b"\xef\xbb\xbfid,name\n1,Alice\n"
|
||
result = repair_bytes(raw)
|
||
assert result.repaired_bytes == b"id,name\n1,Alice\n"
|
||
assert any(a.kind == "strip_bom" for a in result.actions)
|
||
|
||
def test_strips_nul_bytes(self):
|
||
raw = b"id,name\n1,Hel\x00lo\n2,Wo\x00\x00rld\n"
|
||
result = repair_bytes(raw)
|
||
assert b"\x00" not in result.repaired_bytes
|
||
nul_action = next(a for a in result.actions if a.kind == "strip_nul")
|
||
assert "3" in nul_action.detail # 3 NUL bytes
|
||
|
||
def test_folds_smart_double_quotes(self):
|
||
raw = "id,note\n1,“hello”\n2,«bonjour»\n".encode("utf-8")
|
||
result = repair_bytes(raw)
|
||
text = result.repaired_bytes.decode("utf-8")
|
||
assert "“" not in text and "”" not in text
|
||
assert "«" not in text and "»" not in text
|
||
assert any(a.kind == "fold_smart_quote" for a in result.actions)
|
||
|
||
def test_does_not_fold_curly_singles(self):
|
||
# Single curly quotes should pass through; cell-level cleaner handles them.
|
||
raw = "id,note\n1,it’s fine\n".encode("utf-8")
|
||
result = repair_bytes(raw)
|
||
text = result.repaired_bytes.decode("utf-8")
|
||
assert "’" in text
|
||
assert not any(a.kind == "fold_smart_quote" for a in result.actions)
|
||
|
||
def test_no_changes_when_clean(self):
|
||
raw = b"id,name\n1,Alice\n2,Bob\n"
|
||
result = repair_bytes(raw)
|
||
assert result.repaired_bytes == raw
|
||
assert result.actions == []
|
||
assert result.changed is False
|
||
|
||
def test_repairs_unquoted_currency_comma(self):
|
||
raw = (
|
||
b"id,price,qty\n"
|
||
b"1,100,5\n"
|
||
b"2, $1,500.00 ,7\n" # 4 fields instead of 3
|
||
b"3,200,9\n"
|
||
)
|
||
result = repair_bytes(raw)
|
||
# After repair, every row should have 3 fields when re-parsed.
|
||
df = pd.read_csv(io.BytesIO(result.repaired_bytes))
|
||
assert list(df.columns) == ["id", "price", "qty"]
|
||
assert len(df) == 3
|
||
assert any(a.kind == "quote_unquoted_delim" and a.line == 3 for a in result.actions)
|
||
|
||
def test_logs_unrepairable_when_ambiguous(self):
|
||
# Two adjacent merge candidates -> bail out, log unrepairable.
|
||
raw = (
|
||
b"id,a,b,c\n"
|
||
b"1,foo,bar,baz\n"
|
||
b"2,1,2,3,4,5\n" # way too many extras, no clear merge
|
||
)
|
||
result = repair_bytes(raw)
|
||
assert 3 in result.unrepairable_lines
|
||
|
||
def test_summary_groups_by_kind(self):
|
||
raw = b"\xef\xbb\xbfid,name\n1,Hel\x00lo\n"
|
||
result = repair_bytes(raw)
|
||
summary = result.summary()
|
||
assert summary.get("strip_bom") == 1
|
||
assert summary.get("strip_nul") == 1
|
||
|
||
|
||
class TestReadFileWithRepair:
|
||
"""``read_file(repair=True)`` (default) routes CSV through repair_bytes."""
|
||
|
||
def test_default_strips_bom_via_repair(self, tmp_path):
|
||
f = tmp_path / "bom.csv"
|
||
f.write_bytes(b"\xef\xbb\xbfid,name\n1,Alice\n")
|
||
df = read_file(f)
|
||
# First column header must be 'id', not 'id'.
|
||
assert list(df.columns)[0] == "id"
|
||
|
||
def test_default_folds_smart_double_quotes(self, tmp_path):
|
||
# Curly quotes are *unquoted* here — outer ASCII quotes would create
|
||
# a CSV-quoting collision once the fold runs.
|
||
f = tmp_path / "quoted.csv"
|
||
f.write_bytes("id,note\n1,curly “hello” world\n".encode("utf-8"))
|
||
df = read_file(f)
|
||
assert df.iloc[0]["note"] == 'curly "hello" world'
|
||
|
||
def test_repair_false_preserves_smart_quotes(self, tmp_path):
|
||
f = tmp_path / "quoted.csv"
|
||
f.write_bytes("id,note\n1,curly “hello” world\n".encode("utf-8"))
|
||
df = read_file(f, repair=False)
|
||
assert "“" in df.iloc[0]["note"] or "”" in df.iloc[0]["note"]
|
||
|
||
def test_chunked_read_skips_repair(self, tmp_path):
|
||
# Chunked reads bypass repair (memory budget). Verify they still work.
|
||
rows = "id,name\n" + "\n".join(f"{i},Alice" for i in range(1, 21))
|
||
f = tmp_path / "chunked.csv"
|
||
f.write_text(rows)
|
||
chunks = list(read_file(f, chunk_size=5))
|
||
total = sum(len(c) for c in chunks)
|
||
assert total == 20
|
||
|
||
|
||
class TestReadCsvRepaired:
|
||
def test_recovers_malformed_currency_row(self, tmp_path):
|
||
f = tmp_path / "bad.csv"
|
||
f.write_bytes(
|
||
b"id,price,qty\n"
|
||
b"1,100,5\n"
|
||
b"2, $1,500.00 ,7\n"
|
||
b"3,200,9\n"
|
||
)
|
||
df, repair = read_csv_repaired(f)
|
||
assert len(df) == 3
|
||
assert "1,500.00" in df.iloc[1]["price"]
|
||
assert repair.changed
|
||
|
||
def test_passthrough_when_clean(self, tmp_path):
|
||
f = tmp_path / "ok.csv"
|
||
f.write_bytes(b"id,name\n1,Alice\n2,Bob\n")
|
||
df, repair = read_csv_repaired(f)
|
||
assert len(df) == 2
|
||
assert repair.changed is False
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Round-trip integrity (audit GAP-19, GAP-21)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestRoundTrip:
|
||
def test_csv_roundtrip_preserves_values(self, tmp_path):
|
||
df = pd.DataFrame({
|
||
"id": ["1", "2", "3"],
|
||
"name": ["Alice", "Bob", "Carol"],
|
||
"amount": ["10.50", "20.25", "30.00"],
|
||
})
|
||
path = tmp_path / "rt.csv"
|
||
write_file(df, path)
|
||
loaded = read_file(path)
|
||
assert list(loaded.columns) == list(df.columns)
|
||
assert len(loaded) == len(df)
|
||
for col in df.columns:
|
||
assert list(loaded[col]) == list(df[col])
|
||
|
||
def test_tsv_roundtrip_via_extension(self, tmp_path):
|
||
df = pd.DataFrame({"a": ["1", "2"], "b": ["x", "y, z"]})
|
||
path = tmp_path / "rt.tsv"
|
||
write_file(df, path)
|
||
# Confirm tab is used and embedded comma in 'b' survives.
|
||
loaded = read_file(path)
|
||
assert list(loaded.columns) == ["a", "b"]
|
||
assert loaded.iloc[1]["b"] == "y, z"
|
||
|
||
def test_semicolon_roundtrip_via_explicit_delimiter(self, tmp_path):
|
||
df = pd.DataFrame({"a": ["1", "2"], "b": ["x", "y"]})
|
||
path = tmp_path / "rt.csv"
|
||
write_file(df, path, delimiter=";")
|
||
loaded = read_file(path)
|
||
assert list(loaded.columns) == ["a", "b"]
|
||
assert loaded.iloc[0]["a"] == "1"
|
||
|
||
def test_utf8_bom_non_ascii_roundtrip(self, tmp_path):
|
||
df = pd.DataFrame({"name": ["café", "naïve", "résumé"]})
|
||
path = tmp_path / "utf8.csv"
|
||
write_file(df, path)
|
||
loaded = read_file(path)
|
||
assert list(loaded["name"]) == ["café", "naïve", "résumé"]
|
||
|
||
|
||
class TestExcelHeaderDetection:
|
||
def test_excel_with_metadata_rows(self, tmp_path):
|
||
from openpyxl import Workbook
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
# Two leading blank rows + header + data.
|
||
ws.append(["Report generated 2024-01-15", None, None])
|
||
ws.append([None, None, None])
|
||
ws.append(["name", "email", "phone"])
|
||
ws.append(["alice", "a@x.com", "555-1234"])
|
||
ws.append(["bob", "b@x.com", "555-5678"])
|
||
path = tmp_path / "report.xlsx"
|
||
wb.save(path)
|
||
df = read_file(path)
|
||
# Auto-detected header row 2 → columns are name/email/phone
|
||
assert list(df.columns) == ["name", "email", "phone"]
|
||
assert len(df) == 2
|
||
|
||
def test_excel_normal_header_row_zero(self, tmp_path):
|
||
from openpyxl import Workbook
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
ws.append(["name", "email"])
|
||
ws.append(["alice", "a@x.com"])
|
||
path = tmp_path / "normal.xlsx"
|
||
wb.save(path)
|
||
df = read_file(path)
|
||
assert list(df.columns) == ["name", "email"]
|
||
assert len(df) == 1
|