feat: implement text cleaner (script 02) with CLI, GUI, and tests

Builds 02_text_cleaner.py from stub to working: character-level hygiene
for CSV/Excel inputs covering trim, whitespace collapse, smart-character
folding, Unicode NFC/NFKC, BOM strip, zero-width strip, control-char
strip, line-ending normalization, and per-column case conversion. Three
presets (minimal/excel-hygiene/paranoid) keep the buyer surface small.

- src/core/text_clean.py: pure helpers + CleanOptions/CleanResult +
  clean_dataframe with dtype-safe column selection
- src/cli_text_clean.py: Typer CLI mirroring the dedup CLI shape
  (dry-run by default, --apply writes cleaned + changes audit, JSON
  config save/load)
- src/gui/pages/2_Text_Cleaner.py: real Streamlit page with preset
  picker, advanced toggles, preview, before/after metrics, and three
  download buttons
- tests/test_text_clean.py + test_cli_text_clean.py: 92 new tests
  covering edge cases E1-E50 from the spec
- samples/messy_text.csv: demo dataset surfacing UC1, UC3, UC6, UC10
  in 10 rows
- test-cases/uc16-uc26 + ec05-ec09: per-use-case and per-edge-case
  fixtures

Docs: TECHNICAL.md §10.2 (full Tier 1/2/3 spec), DECISIONS.md v1.7
entry locking the spec, CLI-REFERENCE.md gains the text cleaner
section, README.md gains a top-level Text Cleaner block, USER-GUIDE.md
status row 02 promoted Skeleton -> Working.

200/200 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-29 15:14:15 +00:00
parent b2ca04e6f4
commit 54f92ae47e
28 changed files with 2093 additions and 58 deletions

View File

@@ -0,0 +1,158 @@
"""Integration tests for the text-cleaner CLI."""
from __future__ import annotations
from pathlib import Path
import pandas as pd
import pytest
from typer.testing import CliRunner
from src.cli_text_clean import app
runner = CliRunner()
@pytest.fixture
def messy_csv(tmp_path):
df = pd.DataFrame({
"name": [" Alice ", "“Bob”", "Charlie"],
"city": ["NYC", " LA ", "SF"],
"qty": [1, 2, 3],
})
path = tmp_path / "messy.csv"
df.to_csv(path, index=False)
return path
class TestPreview:
def test_default_is_preview(self, messy_csv):
result = runner.invoke(app, [str(messy_csv)])
assert result.exit_code == 0
assert "preview" in result.output.lower()
assert "Cells changed" in result.output
def test_no_files_written_in_preview(self, messy_csv):
result = runner.invoke(app, [str(messy_csv)])
assert result.exit_code == 0
assert not (messy_csv.parent / f"{messy_csv.stem}_cleaned.csv").exists()
def test_file_not_found(self):
result = runner.invoke(app, ["/tmp/does_not_exist_xyz.csv"])
assert result.exit_code != 0
assert "not found" in result.output.lower()
class TestApply:
def test_apply_writes_cleaned_file(self, messy_csv): # E47
result = runner.invoke(app, [str(messy_csv), "--apply"])
assert result.exit_code == 0
cleaned = messy_csv.parent / f"{messy_csv.stem}_cleaned.csv"
assert cleaned.exists()
df = pd.read_csv(cleaned)
assert df["name"].iloc[0] == "Alice"
def test_apply_writes_changes_audit(self, messy_csv):
result = runner.invoke(app, [str(messy_csv), "--apply"])
assert result.exit_code == 0
changes = messy_csv.parent / f"{messy_csv.stem}_changes.csv"
assert changes.exists()
def test_no_audit_when_no_changes(self, tmp_path):
clean = tmp_path / "clean.csv"
pd.DataFrame({"a": ["x", "y"]}).to_csv(clean, index=False)
result = runner.invoke(app, [str(clean), "--apply"])
assert result.exit_code == 0
assert not (tmp_path / "clean_changes.csv").exists()
def test_custom_output_path(self, messy_csv, tmp_path):
out = tmp_path / "renamed.csv"
result = runner.invoke(app, [str(messy_csv), "--apply", "-o", str(out)])
assert result.exit_code == 0
assert out.exists()
class TestPresets:
def test_minimal_does_not_fold_smart_chars(self, messy_csv):
result = runner.invoke(app, [str(messy_csv), "--apply", "--preset", "minimal"])
assert result.exit_code == 0
cleaned = messy_csv.parent / f"{messy_csv.stem}_cleaned.csv"
df = pd.read_csv(cleaned)
# Smart quotes preserved under minimal preset
assert "" in df["name"].iloc[1] or "" in df["name"].iloc[1]
def test_excel_hygiene_default_folds_smart_chars(self, messy_csv):
result = runner.invoke(app, [str(messy_csv), "--apply"])
assert result.exit_code == 0
cleaned = messy_csv.parent / f"{messy_csv.stem}_cleaned.csv"
df = pd.read_csv(cleaned)
assert df["name"].iloc[1] == '"Bob"'
def test_unknown_preset_errors(self, messy_csv):
result = runner.invoke(app, [str(messy_csv), "--preset", "weird"])
assert result.exit_code != 0
assert "Unknown preset" in result.output
class TestColumnSelection:
def test_columns_flag(self, messy_csv):
result = runner.invoke(
app, [str(messy_csv), "--apply", "--columns", "name"],
)
assert result.exit_code == 0
cleaned = messy_csv.parent / f"{messy_csv.stem}_cleaned.csv"
df = pd.read_csv(cleaned)
assert df["name"].iloc[0] == "Alice"
# city should be untouched (still has spaces)
assert df["city"].iloc[1] == " LA "
def test_skip_flag(self, messy_csv):
result = runner.invoke(
app, [str(messy_csv), "--apply", "--skip", "name"],
)
assert result.exit_code == 0
cleaned = messy_csv.parent / f"{messy_csv.stem}_cleaned.csv"
df = pd.read_csv(cleaned)
# name should still have spaces
assert df["name"].iloc[0].startswith(" ")
class TestCaseFlag:
def test_bare_case_applies_to_all(self, tmp_path):
path = tmp_path / "names.csv"
pd.DataFrame({"a": ["alice"], "b": ["bob"]}).to_csv(path, index=False)
result = runner.invoke(app, [str(path), "--apply", "--case", "upper"])
assert result.exit_code == 0
df = pd.read_csv(tmp_path / "names_cleaned.csv")
assert df["a"].iloc[0] == "ALICE"
assert df["b"].iloc[0] == "BOB"
def test_per_column_case(self, tmp_path):
path = tmp_path / "names.csv"
pd.DataFrame({"name": ["alice"], "code": ["abc"]}).to_csv(path, index=False)
result = runner.invoke(
app, [str(path), "--apply", "--case", "title:name,upper:code"],
)
assert result.exit_code == 0
df = pd.read_csv(tmp_path / "names_cleaned.csv")
assert df["name"].iloc[0] == "Alice"
assert df["code"].iloc[0] == "ABC"
class TestConfigRoundTrip:
def test_save_and_load(self, messy_csv, tmp_path):
cfg = tmp_path / "opts.json"
result1 = runner.invoke(
app,
[str(messy_csv), "--save-config", str(cfg), "--preset", "minimal", "--no-trim"],
)
assert result1.exit_code == 0
assert cfg.exists()
# Reload and apply
result2 = runner.invoke(app, [str(messy_csv), "--apply", "--config", str(cfg)])
assert result2.exit_code == 0
cleaned = messy_csv.parent / f"{messy_csv.stem}_cleaned.csv"
df = pd.read_csv(cleaned)
# With --no-trim, leading spaces survive
assert df["name"].iloc[0].startswith(" ")

482
tests/test_text_clean.py Normal file
View File

@@ -0,0 +1,482 @@
"""Tests for src/core/text_clean.py.
Covers edge cases E1-E50 from TECHNICAL.md Section 10.2 plan.
"""
from __future__ import annotations
import json
import numpy as np
import pandas as pd
import pytest
from src.core.text_clean import (
CleanOptions,
PRESETS,
apply_case,
clean_dataframe,
clean_value,
collapse_whitespace,
fold_smart_chars,
normalize_line_endings,
sentence_case,
smart_title_case,
strip_bom,
strip_control,
strip_zero_width,
to_nfc,
to_nfkc,
trim,
)
# ---------------------------------------------------------------------------
# Per-string helpers
# ---------------------------------------------------------------------------
class TestTrim:
def test_strips_leading_and_trailing(self):
assert trim(" hello ") == "hello"
def test_preserves_internal_spaces(self):
assert trim(" a b ") == "a b"
def test_empty_string(self):
assert trim("") == ""
def test_idempotent(self):
assert trim(trim(" x ")) == trim(" x ")
class TestCollapseWhitespace:
def test_multiple_spaces(self):
assert collapse_whitespace("a b") == "a b"
def test_tab_inside_cell(self): # E2
assert collapse_whitespace("a\tb") == "a b"
def test_mixed_tabs_and_spaces(self): # E3
assert collapse_whitespace("a \t \t b") == "a b"
def test_idempotent(self):
assert collapse_whitespace(collapse_whitespace("a b")) == collapse_whitespace("a b")
class TestNFC:
def test_combining_acute(self): # E6
decomposed = "" # e + combining acute
composed = "é" # é
assert to_nfc(decomposed) == composed
def test_idempotent(self):
s = "café"
assert to_nfc(to_nfc(s)) == to_nfc(s)
class TestNFKC:
def test_circled_digit(self): # E7
assert to_nfkc("") == "1"
def test_ligature(self): # E7
assert to_nfkc("") == "fi"
def test_idempotent(self):
assert to_nfkc(to_nfkc("①fi")) == to_nfkc("①fi")
class TestSmartChars:
def test_curly_quotes(self): # E11
assert fold_smart_chars("hi") == "'hi'"
assert fold_smart_chars("“hi”") == '"hi"'
def test_dashes(self): # E12
assert fold_smart_chars("a—b") == "a-b"
assert fold_smart_chars("ab") == "a-b"
def test_ellipsis(self): # E13
assert fold_smart_chars("wait…") == "wait..."
def test_nbsp(self): # E14
assert fold_smart_chars("a b") == "a b"
def test_idempotent(self):
s = "“hi” — a b"
assert fold_smart_chars(fold_smart_chars(s)) == fold_smart_chars(s)
class TestZeroWidth:
def test_zwsp_midword(self): # E16
assert strip_zero_width("foobar") == "foobar"
def test_bidi_marks_stripped(self): # E17
assert strip_zero_width("abc") == "abc"
def test_word_joiner(self): # E18
assert strip_zero_width("ab") == "ab"
def test_mid_string_feff(self): # E22
assert strip_zero_width("foobar") == "foobar"
class TestStripBOM:
def test_leading_bom(self):
assert strip_bom("hello") == "hello"
def test_no_bom(self):
assert strip_bom("hello") == "hello"
def test_idempotent(self):
assert strip_bom(strip_bom("x")) == strip_bom("x")
class TestStripControl:
def test_null_byte(self): # E20
assert strip_control("a\x00b") == "ab"
def test_preserves_tab_newline_cr(self): # E19
assert strip_control("a\tb\nc\rd") == "a\tb\nc\rd"
def test_strips_other_control(self):
# 0x01..0x1F minus tab/newline/CR/VT/FF? we keep \t \n \r only.
assert strip_control("a\x01b\x07c\x1fd") == "abcd"
def test_strips_del(self):
assert strip_control("a\x7fb") == "ab"
class TestLineEndings:
def test_crlf(self): # E23
assert normalize_line_endings("a\r\nb") == "a\nb"
def test_bare_cr(self): # E24
assert normalize_line_endings("a\rb") == "a\nb"
def test_idempotent(self):
assert (
normalize_line_endings(normalize_line_endings("a\r\nb\rc"))
== normalize_line_endings("a\r\nb\rc")
)
class TestSmartTitleCase:
def test_preserves_acronym(self): # E26
assert smart_title_case("USA report") == "USA Report"
assert smart_title_case("nasa launch") == "Nasa Launch" # already lower
assert smart_title_case("NASA launch") == "NASA Launch"
def test_lowercases_particles_midstring(self): # E27
assert smart_title_case("the lord of the rings") == "The Lord of the Rings"
assert smart_title_case("a tale of two cities") == "A Tale of Two Cities"
def test_keeps_first_and_last_capitalized(self):
# "of" at the end stays capitalized
result = smart_title_case("kingdom of")
assert result == "Kingdom Of"
def test_apostrophe(self):
assert smart_title_case("o'neil") == "O'neil"
class TestSentenceCase:
def test_basic(self): # E28
assert sentence_case("hello. how are you? fine!") == "Hello. How are you? Fine!"
def test_preserves_punctuation(self):
assert sentence_case("WHAT? OK.") == "What? Ok."
class TestApplyCase:
def test_modes(self):
assert apply_case("Hello World", "upper") == "HELLO WORLD"
assert apply_case("Hello World", "lower") == "hello world"
assert apply_case("hello world", "title") == "Hello World"
assert apply_case("hello. world.", "sentence") == "Hello. World."
def test_unknown_mode_raises(self):
with pytest.raises(ValueError):
apply_case("x", "weird") # type: ignore[arg-type]
# ---------------------------------------------------------------------------
# clean_value composition
# ---------------------------------------------------------------------------
class TestCleanValue:
def test_default_excel_hygiene(self):
opts = CleanOptions()
out, ops = clean_value("“Hello world” ", opts)
assert out == '"Hello world"'
assert "fold_smart_chars" in ops
assert "trim" in ops
def test_pure_whitespace_to_empty(self): # E1
opts = CleanOptions()
out, ops = clean_value(" ", opts)
assert out == ""
def test_nbsp_only_cell(self): # E5
opts = CleanOptions()
out, _ = clean_value(" ", opts)
assert out == ""
def test_non_string_passthrough(self): # E32
opts = CleanOptions()
for val in (None, 42, 3.14, True, np.nan):
out, ops = clean_value(val, opts)
# NaN compares unequal to itself; check pd.isna for that case
if isinstance(val, float) and pd.isna(val):
assert pd.isna(out)
else:
assert out == val
assert ops == []
def test_empty_string(self):
opts = CleanOptions()
out, ops = clean_value("", opts)
assert out == ""
assert ops == []
def test_only_unchanged_ops_not_logged(self):
opts = CleanOptions(trim=True, collapse_whitespace=True, nfc=False, nfkc=False,
fold_smart_chars=False, strip_zero_width=False,
strip_bom=False, strip_control=False,
normalize_line_endings=False)
out, ops = clean_value("hello", opts)
assert out == "hello"
assert ops == []
class TestIdempotency:
"""E40 — applying the pipeline twice yields the same result as once."""
@pytest.mark.parametrize("preset", list(PRESETS.keys()))
def test_preset_idempotent(self, preset):
opts = CleanOptions.from_preset(preset)
cases = [
"“Hello world” ",
" \t multi space \r\n ",
"café",
"éclair",
"leading-bom",
"USA and the Rings",
"a\x00b\x01c",
"",
" ",
]
for s in cases:
once, _ = clean_value(s, opts)
twice, _ = clean_value(once, opts)
assert once == twice, f"not idempotent on {s!r} (preset {preset})"
# ---------------------------------------------------------------------------
# clean_dataframe
# ---------------------------------------------------------------------------
class TestCleanDataframe:
def test_only_string_columns_touched(self): # E31, E33, E35
df = pd.DataFrame({
"name": [" Alice ", "Bob"],
"age": [30, 25],
"joined": pd.to_datetime(["2024-01-01", "2024-02-01"]),
"active": [True, False],
})
result = clean_dataframe(df)
assert result.cleaned_df["name"].tolist() == ["Alice", "Bob"]
assert result.cleaned_df["age"].tolist() == [30, 25]
assert result.cleaned_df["active"].tolist() == [True, False]
assert "name" in result.columns_processed
assert "age" not in result.columns_processed
def test_explicit_columns(self): # E41
df = pd.DataFrame({"a": [" x "], "b": [" y "]})
result = clean_dataframe(df, CleanOptions(columns=["a"]))
assert result.cleaned_df["a"].iloc[0] == "x"
assert result.cleaned_df["b"].iloc[0] == " y "
assert result.columns_processed == ["a"]
def test_skip_columns(self): # E42
df = pd.DataFrame({"name": [" A "], "notes": [" free text "]})
result = clean_dataframe(df, CleanOptions(skip_columns=["notes"]))
assert result.cleaned_df["name"].iloc[0] == "A"
assert result.cleaned_df["notes"].iloc[0] == " free text "
def test_unknown_column_raises(self):
df = pd.DataFrame({"a": ["x"]})
with pytest.raises(ValueError):
clean_dataframe(df, CleanOptions(columns=["missing"]))
def test_empty_dataframe(self): # E43
df = pd.DataFrame()
result = clean_dataframe(df)
assert result.cells_changed == 0
assert result.cells_total == 0
assert result.cleaned_df.empty
def test_single_column_file(self): # E44
df = pd.DataFrame({"only": [" hello "]})
result = clean_dataframe(df)
assert result.cleaned_df["only"].iloc[0] == "hello"
def test_all_numeric_no_op(self): # E45
df = pd.DataFrame({"a": [1, 2], "b": [3.0, 4.0]})
result = clean_dataframe(df)
assert result.columns_processed == []
assert result.cells_changed == 0
def test_mixed_object_column_strings_only(self): # E34
df = pd.DataFrame({"mix": [" hello ", 42, None]})
result = clean_dataframe(df)
assert result.cleaned_df["mix"].iloc[0] == "hello"
assert result.cleaned_df["mix"].iloc[1] == 42
assert result.cleaned_df["mix"].iloc[2] is None
def test_nan_preserved(self): # E32
df = pd.DataFrame({"a": [" x ", np.nan]})
result = clean_dataframe(df)
assert result.cleaned_df["a"].iloc[0] == "x"
assert pd.isna(result.cleaned_df["a"].iloc[1])
def test_changes_audit_count(self): # E48
df = pd.DataFrame({"a": [" x ", "y", " z"]})
result = clean_dataframe(df)
assert result.cells_changed == 2
assert len(result.changes) == 2
assert set(result.changes["row"].tolist()) == {0, 2}
def test_does_not_mutate_input(self):
df = pd.DataFrame({"a": [" x "]})
original = df.copy()
clean_dataframe(df)
assert df.equals(original)
def test_per_column_case_via_case_columns(self):
df = pd.DataFrame({"name": ["alice"], "code": ["abc"]})
result = clean_dataframe(df, CleanOptions(case_columns={"code": "upper"}))
assert result.cleaned_df["name"].iloc[0] == "alice"
assert result.cleaned_df["code"].iloc[0] == "ABC"
def test_global_case_applied_to_selected_only(self):
df = pd.DataFrame({"name": ["alice"], "notes": ["bob"]})
result = clean_dataframe(
df, CleanOptions(columns=["name"], case="upper"),
)
assert result.cleaned_df["name"].iloc[0] == "ALICE"
assert result.cleaned_df["notes"].iloc[0] == "bob"
# ---------------------------------------------------------------------------
# Presets and config round-trip
# ---------------------------------------------------------------------------
class TestPresets:
def test_minimal_only_trim_collapse(self):
opts = CleanOptions.from_preset("minimal")
assert opts.trim is True
assert opts.collapse_whitespace is True
assert opts.nfc is False
assert opts.fold_smart_chars is False
def test_excel_hygiene_smart_chars_on_nfkc_off(self):
opts = CleanOptions.from_preset("excel-hygiene")
assert opts.fold_smart_chars is True
assert opts.nfc is True
assert opts.nfkc is False
def test_paranoid_includes_nfkc(self):
opts = CleanOptions.from_preset("paranoid")
assert opts.nfkc is True
def test_unknown_preset_raises(self):
with pytest.raises(ValueError):
CleanOptions.from_preset("does-not-exist")
class TestConfigRoundTrip:
def test_dict_roundtrip(self): # E49
opts = CleanOptions(
trim=False, nfc=True, columns=["a", "b"], skip_columns=["c"],
case="upper",
)
recovered = CleanOptions.from_dict(opts.to_dict())
assert recovered == opts
def test_file_roundtrip(self, tmp_path):
path = tmp_path / "opts.json"
opts = CleanOptions(case_columns={"code": "upper"}, fold_smart_chars=False)
opts.to_file(path)
loaded = CleanOptions.from_file(path)
assert loaded == opts
def test_unknown_keys_ignored(self): # E50
data = {"trim": True, "totally_made_up_key": 42}
opts = CleanOptions.from_dict(data)
assert opts.trim is True
# ---------------------------------------------------------------------------
# Use-case smoke tests (whole-pipeline)
# ---------------------------------------------------------------------------
class TestUseCases:
def test_excel_save_as_csv_utf8_bom(self):
# UC3: BOM at start of first cell
df = pd.DataFrame({"name": ["Alice", "Bob"], "city": ["NYC", "LA"]})
result = clean_dataframe(df)
assert result.cleaned_df["name"].iloc[0] == "Alice"
def test_word_smart_quotes_in_product_titles(self):
# UC2
df = pd.DataFrame({"title": ["“Best Dog Collar”", "Cat Toy — Red"]})
result = clean_dataframe(df)
assert result.cleaned_df["title"].iloc[0] == '"Best Dog Collar"'
assert result.cleaned_df["title"].iloc[1] == "Cat Toy - Red"
def test_nbsp_in_email_field(self):
# UC10: invisible Unicode hiding in emails
df = pd.DataFrame({"email": ["alice@test.com", "bob @test.com"]})
result = clean_dataframe(df)
# ZWSP stripped; NBSP folded to space then collapsed but trim won't remove
# internal space. So "bob @test.com" remains. That's correct: the cleaner
# doesn't know that's an email — script 03 owns email format. Just confirm
# the invisible char is gone.
assert "" not in result.cleaned_df["email"].iloc[0]
assert " " not in result.cleaned_df["email"].iloc[1]
def test_quickbooks_trailing_spaces(self):
# UC6: VLOOKUP fails because of trailing spaces
df = pd.DataFrame({"vendor": ["ACME Corp ", "ACME Corp"]})
result = clean_dataframe(df)
assert result.cleaned_df["vendor"].iloc[0] == result.cleaned_df["vendor"].iloc[1]
def test_bank_export_crlf_in_memo(self):
# UC5: \r\n inside multi-line memo cells
df = pd.DataFrame({"memo": ["line one\r\nline two\r\nline three"]})
result = clean_dataframe(df)
assert "\r" not in result.cleaned_df["memo"].iloc[0]
assert result.cleaned_df["memo"].iloc[0].count("\n") == 2
# ---------------------------------------------------------------------------
# Reporting / dtype edge cases
# ---------------------------------------------------------------------------
class TestReporting:
def test_changes_columns_present(self):
df = pd.DataFrame({"a": [" x "]})
result = clean_dataframe(df)
assert list(result.changes.columns) == [
"row", "column", "old", "new", "ops_applied",
]
def test_changes_empty_when_no_changes(self):
df = pd.DataFrame({"a": ["x", "y"]})
result = clean_dataframe(df)
assert result.cells_changed == 0
assert result.changes.empty
def test_cells_total_counts_only_processed_columns(self):
df = pd.DataFrame({"a": ["x", "y", "z"], "n": [1, 2, 3]})
result = clean_dataframe(df)
assert result.cells_total == 3 # only "a" is processed