diff --git a/docs/DEVELOPER.md b/docs/DEVELOPER.md index 67e3147..81b54bd 100644 --- a/docs/DEVELOPER.md +++ b/docs/DEVELOPER.md @@ -352,6 +352,8 @@ tests/ ├── test_analyze.py · test_normalize.py · test_text_clean.py ├── test_format_standardize.py ├── test_format_standardize_corpus.py # 199-row buyer corpus +├── test_pipeline.py # pipeline engine: adapters, run, validate, serialize +├── test_cli_pipeline.py # pipeline CLI: recommend/apply/strict/audit ├── test_audit_fixes.py · test_errors.py · test_fixes_unit.py ├── test_corpus.py · test_encodings_corpus.py · test_fixtures_sweep.py ├── test_cli.py · test_cli_*.py · test_e2e.py · test_install.py @@ -365,10 +367,27 @@ tests/ ├── test_workflows.py # happy path per Ready tool ├── test_dedup_review.py # match-group card interactions ├── test_advanced_panels.py # config_panel widgets + ├── test_pipeline_builder.py # module-card builder: cards, reorder, JSON, run + ├── test_pipeline_phrasing.py # step_phrase/step_status + name bridge (pure fns) ├── test_errors.py # malformed-upload error paths └── test_findings_panel.py # analyzer findings rendering ``` +### Pipeline (Automated Workflows) coverage + +The pipeline feature is pinned end to end across four files (~115 tests): +`test_pipeline.py` (core engine — every adapter's summary numbers, step +data-flow, error stop/continue, empty/single-column/all-disabled edges, +dict + file serialization round-trips, `recommended_pipeline(include=…)`, +soft-dependency validation), `test_cli_pipeline.py` (CLI — `--recommend`, +dry-run-by-default, `--apply` output + audit JSON, `--steps`, `--strict`, +`--continue-on-error`, arg validation, save→load round-trip), +`test_pipeline_builder.py` (the visual builder via AppTest — card seeding, +toggle, reorder ▲/▼, add/remove, restore-recommended, Advanced JSON +import/export, per-tool Configure panels emitting the right option dicts), +and `test_pipeline_phrasing.py` (the plain-English `step_phrase`/`step_status` +helpers and the adapter-key→friendly-name bridge as pure functions). + ### GUI test layer GUI tests drive pages with `streamlit.testing.v1.AppTest` — diff --git a/tests/gui/test_pipeline_builder.py b/tests/gui/test_pipeline_builder.py index 3dfc98f..2729572 100644 --- a/tests/gui/test_pipeline_builder.py +++ b/tests/gui/test_pipeline_builder.py @@ -10,6 +10,7 @@ files is covered separately in ``tests/test_junk_corpus_tool_pages.py``. from __future__ import annotations +import json from pathlib import Path import pytest @@ -118,3 +119,163 @@ def test_step_phrase_is_plain_english_not_json(): # a clean step is "ok" with no detail assert step_status("text_clean", {"cells_changed": 5})[1] == "ok" + + +# --------------------------------------------------------------------------- +# Helpers for the reorder / config tests below +# --------------------------------------------------------------------------- + + +def _ids(at) -> dict: + """Map tool name → that step's stable id (assumes unique tools).""" + return {s["tool"]: s["id"] for s in at.session_state["pipeline_steps"]} + + +def _tools(at) -> list: + return [s["tool"] for s in at.session_state["pipeline_steps"]] + + +# --------------------------------------------------------------------------- +# Reorder +# --------------------------------------------------------------------------- + + +def test_reorder_down_swaps_with_next_step(): + at = _app() + sid = _ids(at)["text_clean"] + before = _tools(at) + assert before == ["text_clean", "format_standardize", "missing", "dedup"] + [b for b in at.button if b.key == f"text_clean_{sid}_down"][0].click().run() + assert not at.exception + assert _tools(at) == ["format_standardize", "text_clean", "missing", "dedup"] + + +def test_reorder_up_swaps_with_previous_step(): + at = _app() + sid = _ids(at)["missing"] + [b for b in at.button if b.key == f"missing_{sid}_up"][0].click().run() + assert not at.exception + assert _tools(at) == ["text_clean", "missing", "format_standardize", "dedup"] + + +def test_first_up_and_last_down_buttons_are_disabled(): + at = _app() + ids = _ids(at) + first_up = [b for b in at.button if b.key == f"text_clean_{ids['text_clean']}_up"][0] + last_down = [b for b in at.button if b.key == f"dedup_{ids['dedup']}_down"][0] + assert first_up.disabled is True + assert last_down.disabled is True + # interior steps are freely movable + mid_up = [b for b in at.button if b.key == f"missing_{ids['missing']}_up"][0] + assert mid_up.disabled is False + + +def test_disabled_step_stays_disabled_after_reorder(): + at = _app() + sid = _ids(at)["text_clean"] + at.toggle[0].set_value(False).run() + assert at.session_state["pipeline_steps"][0]["enabled"] is False + # move the now-disabled first step down one slot + [b for b in at.button if b.key == f"text_clean_{sid}_down"][0].click().run() + assert not at.exception + steps = at.session_state["pipeline_steps"] + moved = [s for s in steps if s["tool"] == "text_clean"][0] + assert steps.index(moved) == 1 # it moved + assert moved["enabled"] is False # ...and stayed disabled + + +# --------------------------------------------------------------------------- +# Restore recommended steps +# --------------------------------------------------------------------------- + + +def test_restore_recommended_steps_button(): + at = _app() + # Diverge from the recommended default by removing a step. + [b for b in at.button if b.label == "✕"][0].click().run() + assert _tools(at) == ["format_standardize", "missing", "dedup"] + restore = [b for b in at.button if "Restore recommended steps" in b.label] + assert len(restore) == 1 + restore[0].click().run() + assert not at.exception + assert _tools(at) == ["text_clean", "format_standardize", "missing", "dedup"] + + +def test_restore_button_absent_when_steps_match_default(): + at = _app() + # Untouched recommended steps → no restore prompt. + assert not [b for b in at.button if "Restore recommended steps" in b.label] + + +# --------------------------------------------------------------------------- +# Advanced JSON export / import +# --------------------------------------------------------------------------- + + +def test_advanced_json_export_reflects_current_steps(): + at = _app() + exported = json.loads(at.code[0].value) + assert [s["tool"] for s in exported["steps"]] == \ + ["text_clean", "format_standardize", "missing", "dedup"] + # Remove a step and confirm the exported JSON drops it too. + [b for b in at.button if b.label == "✕"][0].click().run() + exported = json.loads(at.code[0].value) + assert [s["tool"] for s in exported["steps"]] == \ + ["format_standardize", "missing", "dedup"] + + +def test_load_pasted_json_replaces_the_step_list(): + at = _app() + one_step = json.dumps( + {"steps": [{"tool": "dedup", "options": {}, "enabled": True}]} + ) + [t for t in at.text_area if t.key == "pipeline_json_paste"][0].set_value( + one_step + ).run() + [b for b in at.button if b.label == "Load pasted JSON"][0].click().run() + assert not at.exception + assert _tools(at) == ["dedup"] + + +# --------------------------------------------------------------------------- +# Config renderers emit the right options +# --------------------------------------------------------------------------- + + +def test_format_standardize_config_emits_column_types(): + at = _app() + fid = _ids(at)["format_standardize"] + [s for s in at.selectbox if s.key == f"format_standardize_{fid}_fmt__phone"][0] \ + .set_value("Phone number").run() + [b for b in at.button if b.label == "Run Pipeline"][0].click().run() + assert not at.exception + step = [s for s in at.session_state["pipeline_steps"] + if s["tool"] == "format_standardize"][0] + assert step["options"]["column_types"].get("phone") == "phone" + + +def test_missing_config_drop_radio_emits_drop_row_strategy(): + at = _app() + mid = _ids(at)["missing"] + [r for r in at.radio if r.key == f"missing_{mid}_strategy"][0] \ + .set_value("Drop rows that have any blank").run() + [b for b in at.button if b.label == "Run Pipeline"][0].click().run() + assert not at.exception + step = [s for s in at.session_state["pipeline_steps"] + if s["tool"] == "missing"][0] + assert step["options"]["strategy"] == "drop_row" + + +def test_dedup_config_multiselect_builds_strategies(): + at = _app() + did = _ids(at)["dedup"] + [m for m in at.multiselect if m.key == f"dedup_{did}_matchcols"][0] \ + .set_value(["email"]).run() + [b for b in at.button if b.label == "Run Pipeline"][0].click().run() + assert not at.exception + step = [s for s in at.session_state["pipeline_steps"] + if s["tool"] == "dedup"][0] + strategies = step["options"]["strategies"] + cols = [c["column"] for c in strategies[0]["columns"]] + assert cols == ["email"] + assert strategies[0]["columns"][0]["algorithm"] == "exact" diff --git a/tests/gui/test_pipeline_phrasing.py b/tests/gui/test_pipeline_phrasing.py new file mode 100644 index 0000000..78bc10a --- /dev/null +++ b/tests/gui/test_pipeline_phrasing.py @@ -0,0 +1,254 @@ +"""Pure-function tests for pipeline_modules phrasing helpers. + +These cover the adapter-key → tool bridge, the plain-English ``step_phrase`` +wording, ``step_status`` pill levels, and the column-prose / pluralization +helpers (``_fmt_cols`` / ``_n``). No Streamlit / AppTest needed — every symbol +under test is a pure function over plain dicts/lists. +""" + +from __future__ import annotations + +import pytest + +from src.core.pipeline import TOOL_NAMES +from src.gui.components.pipeline_modules import ( + CONFIG_RENDERERS, + PIPELINE_TOOL_META, + _fmt_cols, + _n, + step_label, + step_phrase, + step_status, +) + + +# --------------------------------------------------------------------------- +# Bridge completeness +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("tool", TOOL_NAMES) +def test_pipeline_tool_meta_covers_every_tool(tool): + assert tool in PIPELINE_TOOL_META + assert PIPELINE_TOOL_META[tool] # non-empty tool_id + + +@pytest.mark.parametrize("tool", TOOL_NAMES) +def test_step_label_is_friendly_and_not_the_raw_key(tool): + label = step_label(tool) + assert isinstance(label, str) + assert label + assert label != tool + + +@pytest.mark.parametrize("tool", TOOL_NAMES) +def test_every_tool_has_a_config_renderer(tool): + assert tool in CONFIG_RENDERERS + assert callable(CONFIG_RENDERERS[tool]) + + +def test_step_label_falls_back_to_raw_key_for_unknown_tool(): + assert step_label("not_a_tool") == "not_a_tool" + + +# --------------------------------------------------------------------------- +# step_phrase — populated + no-op cases for all five tools +# --------------------------------------------------------------------------- + + +def test_step_phrase_text_clean_populated_and_noop(): + assert step_phrase("text_clean", { + "cells_changed": 1204, "columns_processed": ["name", "city"], + }) == "1,204 cells cleaned in name & city" + assert step_phrase("text_clean", {"cells_changed": 0}) == "No changes needed." + assert step_phrase("text_clean", {}) == "No changes needed." + + +def test_step_phrase_format_standardize_populated_and_noop(): + assert step_phrase("format_standardize", { + "cells_changed": 50, "columns_processed": ["phone"], + }) == "50 cells standardized in phone" + # unparseable cells append a "left unchanged" tail + assert step_phrase("format_standardize", { + "cells_changed": 50, "cells_unparseable": 3, "columns_processed": ["phone"], + }) == "50 cells standardized in phone (3 left unchanged)" + assert step_phrase("format_standardize", {}) == "Nothing to standardize." + assert step_phrase("format_standardize", { + "cells_changed": 0, "cells_unparseable": 0, + }) == "Nothing to standardize." + + +def test_step_phrase_missing_populated_and_noop(): + assert step_phrase("missing", { + "cells_filled": 12, "rows_dropped": 4, "columns_dropped": ["x", "y"], + }) == "12 cells filled, 4 rows dropped, 2 columns dropped" + assert step_phrase("missing", {}) == "No missing values to handle." + # sentinel-only flagging path + assert step_phrase("missing", { + "sentinels_standardized": 7, + }) == "7 blank cells flagged" + + +def test_step_phrase_column_map_populated_and_noop(): + assert step_phrase("column_map", { + "columns_renamed": 3, "columns_added": ["new"], "columns_dropped": ["old", "gone"], + }) == "3 columns renamed, 1 column added, 2 columns dropped" + assert step_phrase("column_map", {}) == "Columns already aligned." + + +def test_step_phrase_dedup_mockup_case(): + assert step_phrase("dedup", { + "input_rows": 18442, "output_rows": 18130, + "duplicates_removed": 312, "groups": 147, + }) == "312 duplicates removed across 147 groups (18,442 → 18,130 rows)" + + +def test_step_phrase_dedup_noop(): + assert step_phrase("dedup", {"duplicates_removed": 0}) == "No duplicates found." + assert step_phrase("dedup", {}) == "No duplicates found." + + +# --------------------------------------------------------------------------- +# Pluralization (_n) through step_phrase +# --------------------------------------------------------------------------- + + +def test_step_phrase_dedup_singular(): + assert step_phrase("dedup", { + "input_rows": 10, "output_rows": 9, + "duplicates_removed": 1, "groups": 1, + }) == "1 duplicate removed across 1 group (10 → 9 rows)" + + +def test_step_phrase_missing_singular(): + assert step_phrase("missing", { + "rows_dropped": 1, "columns_dropped": ["x"], + }) == "1 row dropped, 1 column dropped" + + +def test_n_singular_vs_plural_every_noun(): + assert _n(1, "cell") == "1 cell" + assert _n(2, "cell") == "2 cells" + assert _n(1, "row") == "1 row" + assert _n(3, "row") == "3 rows" + assert _n(1, "column") == "1 column" + assert _n(5, "column") == "5 columns" + assert _n(1, "duplicate") == "1 duplicate" + assert _n(9, "duplicate") == "9 duplicates" + assert _n(1, "group") == "1 group" + assert _n(4, "group") == "4 groups" + + +def test_n_thousands_separator(): + assert _n(1204, "cell") == "1,204 cells" + assert _n(18442, "row") == "18,442 rows" + + +# --------------------------------------------------------------------------- +# Column prose (_fmt_cols) +# --------------------------------------------------------------------------- + + +def test_fmt_cols_zero(): + assert _fmt_cols([]) == "" + + +def test_fmt_cols_one(): + assert _fmt_cols(["name"]) == "name" + + +def test_fmt_cols_two(): + assert _fmt_cols(["name", "city"]) == "name & city" + + +def test_fmt_cols_three(): + assert _fmt_cols(["a", "b", "c"]) == "a, b & c" + + +def test_fmt_cols_four_or_more(): + assert _fmt_cols(["a", "b", "c", "d"]) == "a, b & 2 more" + assert _fmt_cols(["a", "b", "c", "d", "e"]) == "a, b & 3 more" + + +def test_fmt_cols_coerces_non_strings(): + assert _fmt_cols([1, 2]) == "1 & 2" + + +# --------------------------------------------------------------------------- +# step_status — pill levels + details +# --------------------------------------------------------------------------- + + +def test_step_status_clean_is_ok(): + assert step_status("text_clean", {"cells_changed": 5}) == ("✓ ok", "ok", "") + + +def test_step_status_skipped(): + label, level, detail = step_status("text_clean", {"cells_changed": 5}, skipped=True) + assert level == "skipped" + assert detail == "" + assert "skipped" in label + + +def test_step_status_error_uses_first_line_only(): + label, level, detail = step_status( + "dedup", {}, error="X: msg\nline2\nline3", + ) + assert level == "error" + assert detail == "X: msg" + assert "error" in label + + +def test_step_status_error_takes_precedence_over_skipped(): + label, level, detail = step_status( + "text_clean", {}, skipped=True, error="boom\nsecond", + ) + assert level == "error" + assert detail == "boom" + + +def test_step_status_format_standardize_unparseable_warns(): + label, level, detail = step_status( + "format_standardize", {"cells_changed": 100, "cells_unparseable": 141}, + ) + assert level == "warn" + assert "141 skipped" in label + assert detail # non-empty inline detail + + +def test_step_status_format_standardize_no_unparseable_is_ok(): + assert step_status( + "format_standardize", {"cells_changed": 100}, + ) == ("✓ ok", "ok", "") + + +def test_step_status_column_map_coercion_failures_warn(): + label, level, detail = step_status( + "column_map", {"coercion_failures": {"age": 4}}, + ) + assert level == "warn" + assert "4 not coerced" in label + assert detail + + +def test_step_status_column_map_missing_required_targets_warn(): + label, level, detail = step_status( + "column_map", {"missing_required_targets": ["email"]}, + ) + assert level == "warn" + assert "missing targets" in label + assert "email" in detail + + +def test_step_status_column_map_missing_targets_take_precedence_over_coercion(): + # both present → missing-targets branch wins + label, level, detail = step_status( + "column_map", + {"missing_required_targets": ["email"], "coercion_failures": {"age": 4}}, + ) + assert level == "warn" + assert "missing targets" in label + + +def test_step_status_unknown_tool_is_ok(): + assert step_status("mystery", {"foo": 1}) == ("✓ ok", "ok", "") diff --git a/tests/test_cli_pipeline.py b/tests/test_cli_pipeline.py new file mode 100644 index 0000000..e132ec6 --- /dev/null +++ b/tests/test_cli_pipeline.py @@ -0,0 +1,293 @@ +"""Integration tests for the pipeline CLI (src/cli_pipeline.py). + +The Typer ``app`` is invoked directly via ``CliRunner`` to bypass the +license ``guard(...)`` that ``main()`` runs before ``app()`` — matching the +house pattern in ``test_cli_text_clean.py``. +""" + +from __future__ import annotations + +import json + +import pandas as pd +import pytest +from typer.testing import CliRunner + +from src.cli_pipeline import app +from src.core.pipeline import Pipeline, _DEFAULT_ORDER + +runner = CliRunner() + + +@pytest.fixture +def messy_csv(tmp_path): + """A small messy CSV with duplicate / whitespace / mixed-case rows.""" + df = pd.DataFrame({ + "name": [" Alice ", "alice", "Bob", "Charlie"], + "email": ["A@X.COM", "a@x.com", "bob@x.com", "charlie@x.com"], + "phone": ["555-1234", "5551234", "555-9999", "555-0000"], + "signup_date": ["2020-01-01", "2020-01-01", "2020-02-02", "2020-03-03"], + }) + path = tmp_path / "messy.csv" + df.to_csv(path, index=False) + return path + + +def _pipeline_artifacts(csv_path): + """The output CSV + audit JSON the CLI writes next to *csv_path*.""" + out_csv = csv_path.parent / f"{csv_path.stem}_pipeline.csv" + audit = csv_path.parent / f"{csv_path.stem}_pipeline.json" + return out_csv, audit + + +# --------------------------------------------------------------------------- +# --recommend +# --------------------------------------------------------------------------- + +class TestRecommend: + def test_recommend_prints_valid_json(self): + result = runner.invoke(app, ["--recommend"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert "steps" in data + tools = [s["tool"] for s in data["steps"]] + assert tools == list(_DEFAULT_ORDER) + + def test_recommend_default_tools_in_order(self): + result = runner.invoke(app, ["--recommend"]) + data = json.loads(result.output) + tools = [s["tool"] for s in data["steps"]] + assert tools == ["text_clean", "format_standardize", "missing", "dedup"] + assert len(tools) == 4 + + def test_recommend_output_writes_loadable_file(self, tmp_path): + out = tmp_path / "pipeline.json" + result = runner.invoke(app, ["--recommend", "--output", str(out)]) + assert result.exit_code == 0 + assert out.exists() + # Confirmation message printed instead of raw JSON. + assert str(out) in result.output + pipe = Pipeline.from_file(out) + assert [s.tool for s in pipe.steps] == list(_DEFAULT_ORDER) + + def test_recommend_output_message_not_json(self, tmp_path): + out = tmp_path / "pipeline.json" + result = runner.invoke(app, ["--recommend", "--output", str(out)]) + assert "saved to" in result.output.lower() + + +# --------------------------------------------------------------------------- +# Argument / input validation +# --------------------------------------------------------------------------- + +class TestArgValidation: + def test_no_args_exits_2(self): + result = runner.invoke(app, []) + assert result.exit_code == 2 + assert "input file is required" in result.output.lower() + + def test_nonexistent_input_exits_1(self, tmp_path): + missing = tmp_path / "does_not_exist_xyz.csv" + result = runner.invoke(app, [str(missing)]) + assert result.exit_code == 1 + assert "not found" in result.output.lower() + + def test_pipeline_and_steps_together_exits_1(self, messy_csv, tmp_path): + pj = tmp_path / "p.json" + Pipeline.from_dict({"steps": [{"tool": "text_clean"}]}).to_file(pj) + result = runner.invoke( + app, + [str(messy_csv), "--pipeline", str(pj), "--steps", "text_clean"], + ) + assert result.exit_code == 1 + assert "not both" in result.output.lower() + + def test_pipeline_nonexistent_exits_1(self, messy_csv, tmp_path): + missing = tmp_path / "no_such_pipeline.json" + result = runner.invoke( + app, [str(messy_csv), "--pipeline", str(missing)], + ) + assert result.exit_code == 1 + assert "not found" in result.output.lower() + + def test_unknown_tool_in_steps_errors(self, messy_csv): + result = runner.invoke(app, [str(messy_csv), "--steps", "bogus_tool"]) + assert result.exit_code != 0 + # Helpful error naming the offending value. + assert "bogus_tool" in result.output + + +# --------------------------------------------------------------------------- +# Dry-run (default) +# --------------------------------------------------------------------------- + +class TestDryRun: + def test_dry_run_exit_0_and_plan_printed(self, messy_csv): + result = runner.invoke(app, [str(messy_csv)]) + assert result.exit_code == 0 + assert "Pipeline plan:" in result.output + assert "plan-only run" in result.output + + def test_dry_run_writes_no_artifacts(self, messy_csv): + result = runner.invoke(app, [str(messy_csv)]) + assert result.exit_code == 0 + out_csv, audit = _pipeline_artifacts(messy_csv) + assert not out_csv.exists() + assert not audit.exists() + + +# --------------------------------------------------------------------------- +# --apply +# --------------------------------------------------------------------------- + +class TestApply: + def test_apply_default_pipeline_writes_outputs(self, messy_csv): + result = runner.invoke(app, [str(messy_csv), "--apply"]) + assert result.exit_code == 0 + out_csv, audit = _pipeline_artifacts(messy_csv) + assert out_csv.exists() + assert audit.exists() + # Output CSV is readable. + df = pd.read_csv(out_csv) + assert len(df.columns) >= 1 + + def test_apply_audit_has_documented_keys(self, messy_csv): + result = runner.invoke(app, [str(messy_csv), "--apply"]) + assert result.exit_code == 0 + _, audit = _pipeline_artifacts(messy_csv) + data = json.loads(audit.read_text()) + for key in ( + "pipeline", "warnings", "initial_rows", "final_rows", + "total_elapsed_seconds", "steps", + ): + assert key in data, f"missing audit key: {key}" + # One step entry per pipeline step (default = 4). + assert len(data["steps"]) == len(_DEFAULT_ORDER) + for step in data["steps"]: + for k in ( + "tool", "name", "enabled", "skipped", + "elapsed_seconds", "summary", "error", + ): + assert k in step, f"missing step key: {k}" + + def test_apply_dedup_reduces_rows(self, messy_csv): + result = runner.invoke(app, [str(messy_csv), "--apply"]) + assert result.exit_code == 0 + _, audit = _pipeline_artifacts(messy_csv) + data = json.loads(audit.read_text()) + # 4 input rows; the first two are duplicates once cleaned/standardized. + assert data["initial_rows"] == 4 + assert data["final_rows"] < data["initial_rows"] + + def test_apply_custom_output_path(self, messy_csv, tmp_path): + out = tmp_path / "custom.csv" + result = runner.invoke( + app, [str(messy_csv), "--apply", "--output", str(out)], + ) + assert result.exit_code == 0 + assert out.exists() + # Default-named CSV should NOT be written when --output is given. + default_csv, _ = _pipeline_artifacts(messy_csv) + assert not default_csv.exists() + # Audit JSON is still written next to the input. + _, audit = _pipeline_artifacts(messy_csv) + assert audit.exists() + + def test_apply_custom_steps_subset(self, messy_csv): + result = runner.invoke( + app, [str(messy_csv), "--apply", "--steps", "text_clean,missing"], + ) + assert result.exit_code == 0 + _, audit = _pipeline_artifacts(messy_csv) + data = json.loads(audit.read_text()) + tools = [s["tool"] for s in data["steps"]] + assert tools == ["text_clean", "missing"] + + +# --------------------------------------------------------------------------- +# Strict mode +# --------------------------------------------------------------------------- + +class TestStrict: + def test_strict_out_of_order_exits_2(self, messy_csv): + result = runner.invoke( + app, + [str(messy_csv), "--steps", "dedup,text_clean", "--strict", "--apply"], + ) + assert result.exit_code == 2 + assert "abort" in result.output.lower() + + def test_strict_out_of_order_writes_nothing(self, messy_csv): + result = runner.invoke( + app, + [str(messy_csv), "--steps", "dedup,text_clean", "--strict", "--apply"], + ) + assert result.exit_code == 2 + out_csv, audit = _pipeline_artifacts(messy_csv) + assert not out_csv.exists() + assert not audit.exists() + + +# --------------------------------------------------------------------------- +# Round-trip: --recommend --output then --pipeline --apply +# --------------------------------------------------------------------------- + +class TestRoundTrip: + def test_save_then_run_saved_pipeline(self, messy_csv, tmp_path): + pj = tmp_path / "p.json" + r1 = runner.invoke(app, ["--recommend", "--output", str(pj)]) + assert r1.exit_code == 0 + assert pj.exists() + + r2 = runner.invoke( + app, [str(messy_csv), "--pipeline", str(pj), "--apply"], + ) + assert r2.exit_code == 0 + out_csv, audit = _pipeline_artifacts(messy_csv) + assert out_csv.exists() + assert audit.exists() + + +# --------------------------------------------------------------------------- +# Step error handling (--continue-on-error) +# --------------------------------------------------------------------------- + +class TestStepError: + """A dedup step with an invalid survivor_rule raises a ConfigError at + run time, letting us exercise the stop/continue-on-error contract.""" + + def _bad_pipeline(self, tmp_path): + pj = tmp_path / "bad.json" + Pipeline.from_dict({ + "steps": [{ + "tool": "dedup", + "options": {"survivor_rule": "not_a_real_rule"}, + }] + }).to_file(pj) + return pj + + def test_step_error_halts_without_continue(self, messy_csv, tmp_path): + pj = self._bad_pipeline(tmp_path) + result = runner.invoke( + app, [str(messy_csv), "--pipeline", str(pj), "--apply"], + ) + assert result.exit_code != 0 + out_csv, audit = _pipeline_artifacts(messy_csv) + # Halted before writing output. + assert not out_csv.exists() + assert not audit.exists() + + def test_continue_on_error_completes_and_records_error(self, messy_csv, tmp_path): + pj = self._bad_pipeline(tmp_path) + result = runner.invoke( + app, + [str(messy_csv), "--pipeline", str(pj), "--apply", + "--continue-on-error"], + ) + assert result.exit_code == 0 + out_csv, audit = _pipeline_artifacts(messy_csv) + assert out_csv.exists() + assert audit.exists() + data = json.loads(audit.read_text()) + assert len(data["steps"]) == 1 + assert data["steps"][0]["error"], "expected the failed step's error recorded" diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 86203be..7dc90c7 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -322,3 +322,499 @@ class TestSoftDependencies: assert len(order) == len(TOOL_NAMES), ( f"SOFT_DEPENDENCIES contain a cycle; topo order={order}" ) + + +# --------------------------------------------------------------------------- +# Per-adapter summary correctness — exact numbers on KNOWN-messy input. +# Each adapter is also exercised through run_pipeline so the StepResult +# carries the adapter's summary verbatim. +# --------------------------------------------------------------------------- + +def _run_one(df, tool, options): + """Run a single-step pipeline and return (StepResult, PipelineResult).""" + res = run_pipeline(df, Pipeline(steps=[Step(tool, options)])) + return res.step_results[0], res + + +class TestTextCleanSummary: + def test_two_trimmable_cells_counted(self): + df = pd.DataFrame({ + "a": [" x ", "y", " z"], # 2 cells need trimming (" x ", " z") + "b": ["ok", "fine", "good"], # already clean + }) + out, summary = TOOL_ADAPTERS["text_clean"](df, {"trim": True}) + assert summary["cells_total"] == 6 + assert summary["cells_changed"] == 2 + assert sorted(summary["columns_processed"]) == ["a", "b"] + assert out["a"].tolist() == ["x", "y", "z"] + + def test_title_case_changes_all_cells(self): + df = pd.DataFrame({"name": ["alice smith", "BOB JONES"]}) + out, summary = TOOL_ADAPTERS["text_clean"](df, {"case": "title"}) + assert summary["cells_changed"] == 2 + assert out["name"].tolist() == ["Alice Smith", "Bob Jones"] + + def test_collapse_whitespace_counts_internal_runs(self): + df = pd.DataFrame({"name": ["a b", "c d", "e f"]}) + out, summary = TOOL_ADAPTERS["text_clean"]( + df, {"trim": True, "collapse_whitespace": True}, + ) + # "a b" and "e f" collapse; "c d" is already single-spaced. + assert summary["cells_changed"] == 2 + assert out["name"].tolist() == ["a b", "c d", "e f"] + + def test_summary_visible_through_run_pipeline(self): + df = pd.DataFrame({"a": [" x ", "y"]}) + sr, _res = _run_one(df, "text_clean", {"trim": True}) + assert sr.skipped is False + assert sr.error is None + assert sr.summary["cells_changed"] == 1 + assert sr.summary["cells_total"] == 2 + + +class TestFormatStandardizeSummary: + def test_one_unparseable_phone(self): + df = pd.DataFrame({ + "phone": ["(415) 555-1234", "not a phone", "+44 20 7946 0958"], + }) + out, summary = TOOL_ADAPTERS["format_standardize"]( + df, {"column_types": {"phone": "phone"}}, + ) + assert summary["cells_total"] == 3 + assert summary["cells_unparseable"] == 1 + assert summary["cells_changed"] == 2 + assert summary["columns_processed"] == ["phone"] + assert out["phone"].tolist() == [ + "+14155551234", "not a phone", "+442079460958", + ] + + def test_date_standardization_counts(self): + df = pd.DataFrame({"signup_date": ["2024-01-05", "Jan 5 2024", "garbage"]}) + out, summary = TOOL_ADAPTERS["format_standardize"]( + df, {"column_types": {"signup_date": "date"}}, + ) + # "2024-01-05" already canonical; "Jan 5 2024" rewritten; "garbage" fails. + assert summary["cells_unparseable"] == 1 + assert summary["cells_changed"] == 1 + assert out["signup_date"].tolist()[:2] == ["2024-01-05", "2024-01-05"] + + def test_summary_visible_through_run_pipeline(self): + df = pd.DataFrame({"phone": ["(415) 555-1234", "bad"]}) + sr, _ = _run_one(df, "format_standardize", {"column_types": {"phone": "phone"}}) + assert sr.summary["cells_unparseable"] == 1 + assert sr.summary["columns_processed"] == ["phone"] + + +class TestMissingSummary: + def test_median_fills_each_blank(self): + df = pd.DataFrame({"val": [1.0, np.nan, 3.0, np.nan, 5.0]}) + out, summary = TOOL_ADAPTERS["missing"](df, {"strategy": "median"}) + assert summary["cells_filled"] == 2 # exactly the 2 NaNs + assert summary["rows_dropped"] == 0 + assert out["val"].tolist() == [1.0, 3.0, 3.0, 3.0, 5.0] # median is 3.0 + + def test_drop_row_by_threshold(self): + # row_drop_threshold is the *fraction* of nulls needed to drop a row. + df = pd.DataFrame({ + "a": [1.0, np.nan, 3.0], + "b": ["x", np.nan, "z"], # middle row is 100% null + }) + out, summary = TOOL_ADAPTERS["missing"]( + df, {"strategy": "drop_row", "row_drop_threshold": 0.4}, + ) + assert summary["rows_dropped"] == 1 + assert len(out) == 2 + + def test_sentinel_standardization_count(self): + df = pd.DataFrame({"x": ["ok", "N/A", "fine", "N/A"]}) + out, summary = TOOL_ADAPTERS["missing"](df, { + "strategy": "none", + "sentinels": ["N/A"], + "standardize_sentinels": True, + }) + assert summary["sentinels_standardized"] == 2 + # The two "N/A" cells became real NaN. + assert out["x"].isna().sum() == 2 + assert out["x"].tolist()[0] == "ok" + + def test_summary_visible_through_run_pipeline(self): + df = pd.DataFrame({"val": [1.0, np.nan, 3.0]}) + sr, _ = _run_one(df, "missing", {"strategy": "median"}) + assert sr.summary["cells_filled"] == 1 + + +class TestColumnMapSummary: + def test_single_rename(self): + df = pd.DataFrame({"old": [1, 2], "keep": [3, 4]}) + out, summary = TOOL_ADAPTERS["column_map"]( + df, {"mapping": {"old": "new"}, "unmapped": "keep"}, + ) + assert summary["columns_renamed"] == 1 + assert summary["columns_dropped"] == [] + assert list(out.columns) == ["new", "keep"] + + def test_unmapped_drop_reports_dropped_columns(self): + df = pd.DataFrame({"old": [1, 2], "keep": [3, 4]}) + out, summary = TOOL_ADAPTERS["column_map"]( + df, {"mapping": {"old": "new"}, "unmapped": "drop"}, + ) + assert summary["columns_renamed"] == 1 + assert summary["columns_dropped"] == ["keep"] + assert list(out.columns) == ["new"] + + def test_summary_visible_through_run_pipeline(self): + df = pd.DataFrame({"old": [1], "keep": [2]}) + sr, _ = _run_one(df, "column_map", {"mapping": {"old": "new"}}) + assert sr.summary["columns_renamed"] == 1 + + +class TestDedupSummary: + def test_exact_duplicate_rows(self): + df = pd.DataFrame({ + "email": ["a@x.com", "b@x.com", "a@x.com", "a@x.com"], + "name": ["A", "B", "A", "A"], + }) + out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "first"}) + assert summary["input_rows"] == 4 + assert summary["output_rows"] == 2 + assert summary["duplicates_removed"] == 2 + assert summary["groups"] == 1 + assert out["email"].tolist() == ["a@x.com", "b@x.com"] + + def test_explicit_exact_strategy_on_column(self): + df = pd.DataFrame({ + "email": ["a@x.com", "b@x.com", "a@x.com"], + "name": ["A", "B", "C"], + }) + out, summary = TOOL_ADAPTERS["dedup"](df, { + "survivor_rule": "first", + "strategies": [{"columns": [ + {"column": "email", "algorithm": "exact", "threshold": 100}, + ]}], + }) + assert summary["duplicates_removed"] == 1 + assert summary["groups"] == 1 + + def test_most_complete_keeps_fuller_survivor(self): + df = pd.DataFrame({ + "email": ["a@x.com", "a@x.com"], + "name": ["", "Alice"], # second row is more complete + "phone": ["111", "111"], + }) + out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "most_complete"}) + assert summary["duplicates_removed"] == 1 + assert out.iloc[0]["name"] == "Alice" + + def test_no_duplicates_is_noop(self): + df = pd.DataFrame({"email": ["a@x.com", "b@x.com"], "name": ["A", "B"]}) + out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "first"}) + assert summary["duplicates_removed"] == 0 + assert summary["output_rows"] == 2 + + def test_summary_visible_through_run_pipeline(self): + df = pd.DataFrame({"email": ["a@x.com", "a@x.com"], "name": ["A", "A"]}) + sr, res = _run_one(df, "dedup", {"survivor_rule": "first"}) + assert sr.summary["duplicates_removed"] == 1 + assert res.final_rows == 1 + + +# --------------------------------------------------------------------------- +# Data flow — a later step depends on an earlier step's output +# --------------------------------------------------------------------------- + +class TestDataFlow: + def test_text_clean_enables_dedup_match(self): + # The two phones differ only by surrounding whitespace; without + # the trim they are distinct, so dedup alone would keep both. + df = pd.DataFrame({"phone": [" +14155551234 ", "+14155551234"]}) + p = Pipeline(steps=[ + Step("text_clean", {"trim": True}), + Step("dedup", {"survivor_rule": "first"}), + ]) + res = run_pipeline(df, p) + assert res.initial_rows == 2 + assert res.final_rows == 1 + assert res.final_df["phone"].tolist() == ["+14155551234"] + + def test_dedup_default_matching_normalizes_whitespace(self): + # Note: dedup's exact matcher already normalizes surrounding + # whitespace, so the two phones collapse even WITHOUT a prior + # text_clean. The survivor still carries the un-trimmed value. + df = pd.DataFrame({"phone": [" +14155551234 ", "+14155551234"]}) + res = run_pipeline(df, Pipeline(steps=[Step("dedup", {"survivor_rule": "first"})])) + assert res.final_rows == 1 + # Survivor keeps the raw (still-padded) text — dedup does not clean. + assert res.final_df["phone"].tolist() == [" +14155551234 "] + + def test_chained_initial_and_final_rows(self): + df = pd.DataFrame({ + "name": [" Al ", "al", "Bob"], + "v": [1, 1, 2], + }) + p = Pipeline(steps=[ + Step("text_clean", {"trim": True, "case": "title"}), + Step("dedup", {"survivor_rule": "first"}), + ]) + res = run_pipeline(df, p) + # " Al " and "al" both become "Al" → duplicate rows collapse. + assert res.initial_rows == 3 + assert res.final_rows == 2 + assert "Al" in res.final_df["name"].tolist() + + +# --------------------------------------------------------------------------- +# Error handling — stop_on_error semantics +# --------------------------------------------------------------------------- + +class TestErrorHandling: + def test_most_recent_without_date_raises_by_default(self, messy_df): + # dedup with survivor_rule="most_recent" but no date_column errors. + p = Pipeline(steps=[Step("dedup", {"survivor_rule": "most_recent"})]) + with pytest.raises(InputValidationError): + run_pipeline(messy_df, p) + + def test_continue_on_error_sets_error_string(self): + df = pd.DataFrame({"email": ["a@x.com", "a@x.com"], "name": ["A", "B"]}) + p = Pipeline(steps=[ + Step("text_clean", {"trim": True}), + Step("dedup", {"survivor_rule": "most_recent"}), # will fail + Step("missing", {"strategy": "none"}), + ]) + res = run_pipeline(df, p, stop_on_error=False) + bad = res.step_results[1] + assert bad.error is not None + assert isinstance(bad.error, str) and bad.error.strip() + # The failed step did NOT change the row count — previous df carried. + assert res.step_results[2].error is None + assert res.final_rows == 2 + + def test_failed_step_summary_is_empty(self): + df = pd.DataFrame({"e": ["a", "a"], "n": ["x", "y"]}) + p = Pipeline(steps=[Step("dedup", {"survivor_rule": "most_recent"})]) + res = run_pipeline(df, p, stop_on_error=False) + assert res.step_results[0].summary == {} + assert res.step_results[0].skipped is False + + def test_config_error_on_bad_survivor_rule_propagates(self, messy_df): + p = Pipeline(steps=[Step("dedup", {"survivor_rule": "nonsense"})]) + with pytest.raises(ConfigError): + run_pipeline(messy_df, p) + + +# --------------------------------------------------------------------------- +# Edge inputs +# --------------------------------------------------------------------------- + +class TestEdgeInputs: + def test_empty_dataframe_runs_clean(self): + empty = pd.DataFrame({"name": [], "phone": []}) + res = run_pipeline( + empty, + recommended_pipeline(options={"missing": {"strategy": "none"}}), + ) + assert res.initial_rows == 0 + assert res.final_rows == 0 + assert all(sr.error is None for sr in res.step_results if not sr.skipped) + + def test_single_column_dataframe(self): + df = pd.DataFrame({"name": [" Al ", "al"]}) + res = run_pipeline( + df, Pipeline(steps=[Step("text_clean", {"trim": True, "case": "title"})]), + ) + assert res.final_df["name"].tolist() == ["Al", "Al"] + + def test_all_steps_disabled_returns_unchanged(self, messy_df): + snapshot = messy_df.copy(deep=True) + p = Pipeline(steps=[ + Step("text_clean", enabled=False), + Step("format_standardize", enabled=False), + Step("missing", enabled=False), + Step("dedup", enabled=False), + ]) + res = run_pipeline(messy_df, p) + assert all(sr.skipped is True for sr in res.step_results) + assert res.final_rows == res.initial_rows == 5 + pd.testing.assert_frame_equal(res.final_df, snapshot) + + def test_empty_pipeline_is_identity(self, messy_df): + res = run_pipeline(messy_df, Pipeline(steps=[])) + assert res.step_results == [] + assert res.final_rows == 5 + pd.testing.assert_frame_equal(res.final_df, messy_df) + + +# --------------------------------------------------------------------------- +# Serialization round-trips with disabled / named / nested-option steps +# --------------------------------------------------------------------------- + +class TestSerializationRoundtrips: + def test_disabled_and_named_step_survive_dict(self): + p = Pipeline(steps=[ + Step("text_clean", {"trim": True}, enabled=False, name="Pre-clean"), + Step("dedup", {"survivor_rule": "first"}, name="Final dedup"), + ]) + loaded = Pipeline.from_dict(p.to_dict()) + assert loaded.steps[0].enabled is False + assert loaded.steps[0].name == "Pre-clean" + assert loaded.steps[0].options == {"trim": True} + assert loaded.steps[1].name == "Final dedup" + assert loaded.steps[1].display_name() == "Final dedup" + + def test_nested_options_survive_dict(self): + nested = { + "column_types": {"phone": "phone", "signup_date": "date"}, + } + strat = { + "survivor_rule": "most_complete", + "strategies": [{"columns": [ + {"column": "email", "algorithm": "exact", "threshold": 100}, + ]}], + } + p = Pipeline(steps=[ + Step("format_standardize", nested), + Step("dedup", strat), + ]) + loaded = Pipeline.from_dict(p.to_dict()) + assert loaded.steps[0].options["column_types"] == nested["column_types"] + assert loaded.steps[1].options["strategies"] == strat["strategies"] + + def test_nested_options_survive_file(self, tmp_path): + p = Pipeline(steps=[ + Step("format_standardize", + {"column_types": {"phone": "phone"}}, + enabled=False, name="formats"), + ]) + path = tmp_path / "pipe.json" + p.to_file(path) + loaded = Pipeline.from_file(path) + assert loaded.steps[0].enabled is False + assert loaded.steps[0].name == "formats" + assert loaded.steps[0].options == {"column_types": {"phone": "phone"}} + + def test_roundtrip_is_idempotent(self): + p = Pipeline(steps=[ + Step("text_clean", {"trim": True}, enabled=False, name="x"), + Step("missing", {"strategy": "median"}), + ]) + once = Pipeline.from_dict(p.to_dict()) + twice = Pipeline.from_dict(once.to_dict()) + assert once.to_dict() == twice.to_dict() == p.to_dict() + + +# --------------------------------------------------------------------------- +# recommended_pipeline(include=...) — subsetting, ordering, option seeding +# --------------------------------------------------------------------------- + +class TestRecommendedInclude: + def test_subset_preserves_given_order(self): + p = recommended_pipeline(include=["dedup", "text_clean"]) + assert [s.tool for s in p.steps] == ["dedup", "text_clean"] + + def test_column_map_first(self): + p = recommended_pipeline(include=[ + "column_map", "text_clean", "format_standardize", "missing", "dedup", + ]) + assert p.steps[0].tool == "column_map" + assert len(p.steps) == 5 + + def test_column_map_last(self): + p = recommended_pipeline(include=[ + "text_clean", "format_standardize", "missing", "dedup", "column_map", + ]) + assert p.steps[-1].tool == "column_map" + + def test_unknown_tool_in_include_raises(self): + with pytest.raises(InputValidationError): + recommended_pipeline(include=["text_clean", "not_a_tool"]) + + def test_options_seeding_only_targets_named_tool(self): + p = recommended_pipeline( + include=["text_clean", "dedup"], + options={"dedup": {"survivor_rule": "last"}}, + ) + assert p.steps[0].options == {} # text_clean unseeded + assert p.steps[1].options == {"survivor_rule": "last"} + + def test_empty_include_yields_no_steps(self): + p = recommended_pipeline(include=[]) + assert p.steps == [] + + def test_seeded_options_are_independent_copies(self): + seed = {"text_clean": {"trim": True}} + p = recommended_pipeline(include=["text_clean"], options=seed) + # Mutating the produced step must not leak back into the seed. + p.steps[0].options["trim"] = False + assert seed["text_clean"]["trim"] is True + + +# --------------------------------------------------------------------------- +# Realistic demo integration — messy customers table end-to-end +# --------------------------------------------------------------------------- + +class TestDemoIntegration: + @pytest.fixture + def customers_df(self): + return pd.DataFrame({ + "Full Name": [" alice smith ", "BOB JONES", "alice smith", ""], + "Email": ["alice@x.com ", "bob@x.com", "alice@x.com", "carol@x.com"], + "Phone": [" +14155551234 ", "+442079460958", + "+14155551234", "+13035551111"], + }) + + def test_full_recommended_plus_column_map(self, customers_df): + p = recommended_pipeline( + include=["text_clean", "format_standardize", "missing", + "dedup", "column_map"], + options={ + "text_clean": {"trim": True, "collapse_whitespace": True}, + "missing": {"strategy": "none"}, + "dedup": { + "survivor_rule": "most_complete", + "strategies": [{"columns": [ + {"column": "Phone", "algorithm": "exact", "threshold": 100}, + ]}], + }, + "column_map": { + "mapping": {"Full Name": "name", "Email": "email", + "Phone": "phone"}, + "unmapped": "keep", + }, + }, + ) + res = run_pipeline(customers_df, p) + + # Two rows share the same phone after trimming → one duplicate removed. + assert res.initial_rows == 4 + assert res.final_rows == 3 + assert res.final_rows < res.initial_rows + + # Headers were renamed by the trailing column_map step. + assert list(res.final_df.columns) == ["name", "email", "phone"] + + # The surviving Alice row kept its (trimmed) phone. + phones = res.final_df["phone"].tolist() + assert "+14155551234" in phones + assert phones.count("+14155551234") == 1 # only one Alice survives + + # Every executed step succeeded. + assert all(sr.error is None for sr in res.step_results if not sr.skipped) + # column_map reported the three renames. + cm = res.step_results[-1] + assert cm.step.tool == "column_map" + assert cm.summary["columns_renamed"] == 3 + + def test_demo_dedup_step_reports_one_duplicate(self, customers_df): + p = recommended_pipeline(options={ + "text_clean": {"trim": True}, + "missing": {"strategy": "none"}, + "dedup": { + "survivor_rule": "most_complete", + "strategies": [{"columns": [ + {"column": "Phone", "algorithm": "exact", "threshold": 100}, + ]}], + }, + }) + res = run_pipeline(customers_df, p) + dedup_sr = next(s for s in res.step_results if s.step.tool == "dedup") + assert dedup_sr.summary["duplicates_removed"] == 1 + assert dedup_sr.summary["groups"] == 1