"""Tests for src.core.io — file reading, encoding/delimiter detection.""" import io import pandas as pd import pytest from pathlib import Path from src.core.io import ( detect_encoding, detect_delimiter, detect_header_row, read_file, write_file, list_sheets, repair_bytes, read_csv_repaired, ) class TestDetectEncoding: def test_utf8_file(self, sample_csv_path): enc = detect_encoding(sample_csv_path) assert enc.lower().replace("-", "") in ("utf8", "ascii", "utf8sig") def test_empty_file(self, tmp_path): f = tmp_path / "empty.csv" f.write_bytes(b"") assert detect_encoding(f) == "utf-8" def test_bom_file(self, tmp_path): f = tmp_path / "bom.csv" f.write_bytes(b"\xef\xbb\xbfname,email\nAlice,a@b.com\n") assert detect_encoding(f) == "utf-8-sig" def test_latin1_file(self, tmp_path): f = tmp_path / "latin.csv" content = "name,city\nJosé,São Paulo\n".encode("latin-1") f.write_bytes(content) enc = detect_encoding(f) # Should detect something compatible with latin-1 family assert enc in ("iso-8859-1", "latin-1", "windows-1252", "cp1252", "iso-8859-9", "cp1250", "iso-8859-15", "utf-8") class TestDetectDelimiter: def test_comma(self, sample_csv_path): assert detect_delimiter(sample_csv_path) == "," def test_tab(self, tmp_path): f = tmp_path / "tabs.tsv" f.write_text("name\temail\nAlice\ta@b.com\n") assert detect_delimiter(f) == "\t" def test_semicolon(self, tmp_path): f = tmp_path / "semi.csv" f.write_text("name;email;phone\nAlice;a@b.com;555\n") assert detect_delimiter(f) == ";" def test_pipe(self, tmp_path): f = tmp_path / "pipe.csv" f.write_text("name|email|phone\nAlice|a@b.com|555\n") assert detect_delimiter(f) == "|" class TestDetectHeaderRow: def test_standard_csv(self, sample_csv_path): assert detect_header_row(sample_csv_path) == 0 def test_with_junk_rows(self, tmp_path): f = tmp_path / "junk.csv" f.write_text("Report generated 2024-01-01\n\nname,email,phone\nAlice,a@b.com,555\n") # Row 0 has "Report generated..." which is a single non-numeric string # Row 2 has "name,email,phone" which looks like headers # The heuristic checks all cells, so row 0 may match if it's a single cell hdr = detect_header_row(f) assert hdr in (0, 2) # depends on delimiter detection class TestReadFile: def test_read_csv(self, sample_csv_path): df = read_file(sample_csv_path) assert isinstance(df, pd.DataFrame) assert len(df) == 50 assert "customer_name" in df.columns def test_read_nonexistent(self): with pytest.raises(FileNotFoundError): read_file("/tmp/nonexistent_file_xyz.csv") def test_read_with_encoding_override(self, sample_csv_path): df = read_file(sample_csv_path, encoding="utf-8") assert len(df) == 50 def test_chunked_reading(self, sample_csv_path): chunks = read_file(sample_csv_path, chunk_size=10) # Should be a generator all_chunks = list(chunks) assert len(all_chunks) == 5 total_rows = sum(len(c) for c in all_chunks) assert total_rows == 50 class TestWriteFile: def test_write_csv(self, tmp_path, simple_df): out = tmp_path / "output.csv" write_file(simple_df, out) assert out.exists() # Read back df = pd.read_csv(out, encoding="utf-8-sig") assert len(df) == len(simple_df) def test_write_xlsx(self, tmp_path, simple_df): out = tmp_path / "output.xlsx" write_file(simple_df, out) assert out.exists() df = pd.read_excel(out) assert len(df) == len(simple_df) def test_utf8_bom_default(self, tmp_path, simple_df): out = tmp_path / "bom.csv" write_file(simple_df, out) raw = out.read_bytes() assert raw[:3] == b"\xef\xbb\xbf" class TestListSheets: def test_list_sheets(self, tmp_path, simple_df): path = tmp_path / "multi.xlsx" with pd.ExcelWriter(path, engine="openpyxl") as writer: simple_df.to_excel(writer, sheet_name="Sheet1", index=False) simple_df.to_excel(writer, sheet_name="Sheet2", index=False) sheets = list_sheets(path) assert sheets == ["Sheet1", "Sheet2"] # --------------------------------------------------------------------------- # Pre-parse repair # --------------------------------------------------------------------------- class TestRepairBytes: def test_strips_bom(self): raw = b"\xef\xbb\xbfid,name\n1,Alice\n" result = repair_bytes(raw) assert result.repaired_bytes == b"id,name\n1,Alice\n" assert any(a.kind == "strip_bom" for a in result.actions) def test_strips_nul_bytes(self): raw = b"id,name\n1,Hel\x00lo\n2,Wo\x00\x00rld\n" result = repair_bytes(raw) assert b"\x00" not in result.repaired_bytes nul_action = next(a for a in result.actions if a.kind == "strip_nul") assert "3" in nul_action.detail # 3 NUL bytes def test_folds_smart_double_quotes(self): raw = "id,note\n1,“hello”\n2,«bonjour»\n".encode("utf-8") result = repair_bytes(raw) text = result.repaired_bytes.decode("utf-8") assert "“" not in text and "”" not in text assert "«" not in text and "»" not in text assert any(a.kind == "fold_smart_quote" for a in result.actions) def test_does_not_fold_curly_singles(self): # Single curly quotes should pass through; cell-level cleaner handles them. raw = "id,note\n1,it’s fine\n".encode("utf-8") result = repair_bytes(raw) text = result.repaired_bytes.decode("utf-8") assert "’" in text assert not any(a.kind == "fold_smart_quote" for a in result.actions) def test_no_changes_when_clean(self): raw = b"id,name\n1,Alice\n2,Bob\n" result = repair_bytes(raw) assert result.repaired_bytes == raw assert result.actions == [] assert result.changed is False def test_repairs_unquoted_currency_comma(self): raw = ( b"id,price,qty\n" b"1,100,5\n" b"2, $1,500.00 ,7\n" # 4 fields instead of 3 b"3,200,9\n" ) result = repair_bytes(raw) # After repair, every row should have 3 fields when re-parsed. df = pd.read_csv(io.BytesIO(result.repaired_bytes)) assert list(df.columns) == ["id", "price", "qty"] assert len(df) == 3 assert any(a.kind == "quote_unquoted_delim" and a.line == 3 for a in result.actions) def test_logs_unrepairable_when_ambiguous(self): # Two adjacent merge candidates -> bail out, log unrepairable. raw = ( b"id,a,b,c\n" b"1,foo,bar,baz\n" b"2,1,2,3,4,5\n" # way too many extras, no clear merge ) result = repair_bytes(raw) assert 3 in result.unrepairable_lines def test_summary_groups_by_kind(self): raw = b"\xef\xbb\xbfid,name\n1,Hel\x00lo\n" result = repair_bytes(raw) summary = result.summary() assert summary.get("strip_bom") == 1 assert summary.get("strip_nul") == 1 class TestReadCsvRepaired: def test_recovers_malformed_currency_row(self, tmp_path): f = tmp_path / "bad.csv" f.write_bytes( b"id,price,qty\n" b"1,100,5\n" b"2, $1,500.00 ,7\n" b"3,200,9\n" ) df, repair = read_csv_repaired(f) assert len(df) == 3 assert "1,500.00" in df.iloc[1]["price"] assert repair.changed def test_passthrough_when_clean(self, tmp_path): f = tmp_path / "ok.csv" f.write_bytes(b"id,name\n1,Alice\n2,Bob\n") df, repair = read_csv_repaired(f) assert len(df) == 2 assert repair.changed is False