test(pipeline): complete automated test suite for the pipeline feature

Adds ~115 tests pinning the Automated Workflows feature end to end:

- tests/test_pipeline.py (+43): per-adapter summary correctness on known
  inputs, multi-step data flow, error stop/continue contract, empty /
  single-column / all-disabled edges, dict+file serialization round-trips,
  recommended_pipeline(include=…), and a synthesized demo integration run.
- tests/test_cli_pipeline.py (new, 21): --recommend, dry-run-by-default,
  --apply output CSV + audit JSON, --steps, --strict abort, arg validation,
  --continue-on-error vs halt, and a save→load round-trip. Invokes the Typer
  app directly to bypass the license guard (house pattern).
- tests/gui/test_pipeline_builder.py (+9): reorder ▲/▼, disabled edge
  buttons, disabled-step persistence across reorder, restore-recommended,
  Advanced JSON export/import, and per-tool Configure panels emitting the
  correct option dicts (AppTest).
- tests/gui/test_pipeline_phrasing.py (new, 30): step_phrase/step_status and
  the adapter-key→friendly-name bridge as pure functions, incl. pluralization,
  column prose, and warn/error status derivation.

Full suite: 2565 passed, 91 skipped. No product bugs surfaced. Documents the
coverage in docs/DEVELOPER.md (test tree + a pipeline-coverage note).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-22 18:31:15 +00:00
parent 00d3f28865
commit 38616d69e2
5 changed files with 1223 additions and 0 deletions

View File

@@ -352,6 +352,8 @@ tests/
├── test_analyze.py · test_normalize.py · test_text_clean.py
├── test_format_standardize.py
├── test_format_standardize_corpus.py # 199-row buyer corpus
├── test_pipeline.py # pipeline engine: adapters, run, validate, serialize
├── test_cli_pipeline.py # pipeline CLI: recommend/apply/strict/audit
├── test_audit_fixes.py · test_errors.py · test_fixes_unit.py
├── test_corpus.py · test_encodings_corpus.py · test_fixtures_sweep.py
├── test_cli.py · test_cli_*.py · test_e2e.py · test_install.py
@@ -365,10 +367,27 @@ tests/
├── test_workflows.py # happy path per Ready tool
├── test_dedup_review.py # match-group card interactions
├── test_advanced_panels.py # config_panel widgets
├── test_pipeline_builder.py # module-card builder: cards, reorder, JSON, run
├── test_pipeline_phrasing.py # step_phrase/step_status + name bridge (pure fns)
├── test_errors.py # malformed-upload error paths
└── test_findings_panel.py # analyzer findings rendering
```
### Pipeline (Automated Workflows) coverage
The pipeline feature is pinned end to end across four files (~115 tests):
`test_pipeline.py` (core engine — every adapter's summary numbers, step
data-flow, error stop/continue, empty/single-column/all-disabled edges,
dict + file serialization round-trips, `recommended_pipeline(include=…)`,
soft-dependency validation), `test_cli_pipeline.py` (CLI — `--recommend`,
dry-run-by-default, `--apply` output + audit JSON, `--steps`, `--strict`,
`--continue-on-error`, arg validation, save→load round-trip),
`test_pipeline_builder.py` (the visual builder via AppTest — card seeding,
toggle, reorder ▲/▼, add/remove, restore-recommended, Advanced JSON
import/export, per-tool Configure panels emitting the right option dicts),
and `test_pipeline_phrasing.py` (the plain-English `step_phrase`/`step_status`
helpers and the adapter-key→friendly-name bridge as pure functions).
### GUI test layer
GUI tests drive pages with `streamlit.testing.v1.AppTest` —

View File

@@ -10,6 +10,7 @@ files is covered separately in ``tests/test_junk_corpus_tool_pages.py``.
from __future__ import annotations
import json
from pathlib import Path
import pytest
@@ -118,3 +119,163 @@ def test_step_phrase_is_plain_english_not_json():
# a clean step is "ok" with no detail
assert step_status("text_clean", {"cells_changed": 5})[1] == "ok"
# ---------------------------------------------------------------------------
# Helpers for the reorder / config tests below
# ---------------------------------------------------------------------------
def _ids(at) -> dict:
"""Map tool name → that step's stable id (assumes unique tools)."""
return {s["tool"]: s["id"] for s in at.session_state["pipeline_steps"]}
def _tools(at) -> list:
return [s["tool"] for s in at.session_state["pipeline_steps"]]
# ---------------------------------------------------------------------------
# Reorder
# ---------------------------------------------------------------------------
def test_reorder_down_swaps_with_next_step():
at = _app()
sid = _ids(at)["text_clean"]
before = _tools(at)
assert before == ["text_clean", "format_standardize", "missing", "dedup"]
[b for b in at.button if b.key == f"text_clean_{sid}_down"][0].click().run()
assert not at.exception
assert _tools(at) == ["format_standardize", "text_clean", "missing", "dedup"]
def test_reorder_up_swaps_with_previous_step():
at = _app()
sid = _ids(at)["missing"]
[b for b in at.button if b.key == f"missing_{sid}_up"][0].click().run()
assert not at.exception
assert _tools(at) == ["text_clean", "missing", "format_standardize", "dedup"]
def test_first_up_and_last_down_buttons_are_disabled():
at = _app()
ids = _ids(at)
first_up = [b for b in at.button if b.key == f"text_clean_{ids['text_clean']}_up"][0]
last_down = [b for b in at.button if b.key == f"dedup_{ids['dedup']}_down"][0]
assert first_up.disabled is True
assert last_down.disabled is True
# interior steps are freely movable
mid_up = [b for b in at.button if b.key == f"missing_{ids['missing']}_up"][0]
assert mid_up.disabled is False
def test_disabled_step_stays_disabled_after_reorder():
at = _app()
sid = _ids(at)["text_clean"]
at.toggle[0].set_value(False).run()
assert at.session_state["pipeline_steps"][0]["enabled"] is False
# move the now-disabled first step down one slot
[b for b in at.button if b.key == f"text_clean_{sid}_down"][0].click().run()
assert not at.exception
steps = at.session_state["pipeline_steps"]
moved = [s for s in steps if s["tool"] == "text_clean"][0]
assert steps.index(moved) == 1 # it moved
assert moved["enabled"] is False # ...and stayed disabled
# ---------------------------------------------------------------------------
# Restore recommended steps
# ---------------------------------------------------------------------------
def test_restore_recommended_steps_button():
at = _app()
# Diverge from the recommended default by removing a step.
[b for b in at.button if b.label == ""][0].click().run()
assert _tools(at) == ["format_standardize", "missing", "dedup"]
restore = [b for b in at.button if "Restore recommended steps" in b.label]
assert len(restore) == 1
restore[0].click().run()
assert not at.exception
assert _tools(at) == ["text_clean", "format_standardize", "missing", "dedup"]
def test_restore_button_absent_when_steps_match_default():
at = _app()
# Untouched recommended steps → no restore prompt.
assert not [b for b in at.button if "Restore recommended steps" in b.label]
# ---------------------------------------------------------------------------
# Advanced JSON export / import
# ---------------------------------------------------------------------------
def test_advanced_json_export_reflects_current_steps():
at = _app()
exported = json.loads(at.code[0].value)
assert [s["tool"] for s in exported["steps"]] == \
["text_clean", "format_standardize", "missing", "dedup"]
# Remove a step and confirm the exported JSON drops it too.
[b for b in at.button if b.label == ""][0].click().run()
exported = json.loads(at.code[0].value)
assert [s["tool"] for s in exported["steps"]] == \
["format_standardize", "missing", "dedup"]
def test_load_pasted_json_replaces_the_step_list():
at = _app()
one_step = json.dumps(
{"steps": [{"tool": "dedup", "options": {}, "enabled": True}]}
)
[t for t in at.text_area if t.key == "pipeline_json_paste"][0].set_value(
one_step
).run()
[b for b in at.button if b.label == "Load pasted JSON"][0].click().run()
assert not at.exception
assert _tools(at) == ["dedup"]
# ---------------------------------------------------------------------------
# Config renderers emit the right options
# ---------------------------------------------------------------------------
def test_format_standardize_config_emits_column_types():
at = _app()
fid = _ids(at)["format_standardize"]
[s for s in at.selectbox if s.key == f"format_standardize_{fid}_fmt__phone"][0] \
.set_value("Phone number").run()
[b for b in at.button if b.label == "Run Pipeline"][0].click().run()
assert not at.exception
step = [s for s in at.session_state["pipeline_steps"]
if s["tool"] == "format_standardize"][0]
assert step["options"]["column_types"].get("phone") == "phone"
def test_missing_config_drop_radio_emits_drop_row_strategy():
at = _app()
mid = _ids(at)["missing"]
[r for r in at.radio if r.key == f"missing_{mid}_strategy"][0] \
.set_value("Drop rows that have any blank").run()
[b for b in at.button if b.label == "Run Pipeline"][0].click().run()
assert not at.exception
step = [s for s in at.session_state["pipeline_steps"]
if s["tool"] == "missing"][0]
assert step["options"]["strategy"] == "drop_row"
def test_dedup_config_multiselect_builds_strategies():
at = _app()
did = _ids(at)["dedup"]
[m for m in at.multiselect if m.key == f"dedup_{did}_matchcols"][0] \
.set_value(["email"]).run()
[b for b in at.button if b.label == "Run Pipeline"][0].click().run()
assert not at.exception
step = [s for s in at.session_state["pipeline_steps"]
if s["tool"] == "dedup"][0]
strategies = step["options"]["strategies"]
cols = [c["column"] for c in strategies[0]["columns"]]
assert cols == ["email"]
assert strategies[0]["columns"][0]["algorithm"] == "exact"

View File

@@ -0,0 +1,254 @@
"""Pure-function tests for pipeline_modules phrasing helpers.
These cover the adapter-key → tool bridge, the plain-English ``step_phrase``
wording, ``step_status`` pill levels, and the column-prose / pluralization
helpers (``_fmt_cols`` / ``_n``). No Streamlit / AppTest needed — every symbol
under test is a pure function over plain dicts/lists.
"""
from __future__ import annotations
import pytest
from src.core.pipeline import TOOL_NAMES
from src.gui.components.pipeline_modules import (
CONFIG_RENDERERS,
PIPELINE_TOOL_META,
_fmt_cols,
_n,
step_label,
step_phrase,
step_status,
)
# ---------------------------------------------------------------------------
# Bridge completeness
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("tool", TOOL_NAMES)
def test_pipeline_tool_meta_covers_every_tool(tool):
assert tool in PIPELINE_TOOL_META
assert PIPELINE_TOOL_META[tool] # non-empty tool_id
@pytest.mark.parametrize("tool", TOOL_NAMES)
def test_step_label_is_friendly_and_not_the_raw_key(tool):
label = step_label(tool)
assert isinstance(label, str)
assert label
assert label != tool
@pytest.mark.parametrize("tool", TOOL_NAMES)
def test_every_tool_has_a_config_renderer(tool):
assert tool in CONFIG_RENDERERS
assert callable(CONFIG_RENDERERS[tool])
def test_step_label_falls_back_to_raw_key_for_unknown_tool():
assert step_label("not_a_tool") == "not_a_tool"
# ---------------------------------------------------------------------------
# step_phrase — populated + no-op cases for all five tools
# ---------------------------------------------------------------------------
def test_step_phrase_text_clean_populated_and_noop():
assert step_phrase("text_clean", {
"cells_changed": 1204, "columns_processed": ["name", "city"],
}) == "1,204 cells cleaned in name & city"
assert step_phrase("text_clean", {"cells_changed": 0}) == "No changes needed."
assert step_phrase("text_clean", {}) == "No changes needed."
def test_step_phrase_format_standardize_populated_and_noop():
assert step_phrase("format_standardize", {
"cells_changed": 50, "columns_processed": ["phone"],
}) == "50 cells standardized in phone"
# unparseable cells append a "left unchanged" tail
assert step_phrase("format_standardize", {
"cells_changed": 50, "cells_unparseable": 3, "columns_processed": ["phone"],
}) == "50 cells standardized in phone (3 left unchanged)"
assert step_phrase("format_standardize", {}) == "Nothing to standardize."
assert step_phrase("format_standardize", {
"cells_changed": 0, "cells_unparseable": 0,
}) == "Nothing to standardize."
def test_step_phrase_missing_populated_and_noop():
assert step_phrase("missing", {
"cells_filled": 12, "rows_dropped": 4, "columns_dropped": ["x", "y"],
}) == "12 cells filled, 4 rows dropped, 2 columns dropped"
assert step_phrase("missing", {}) == "No missing values to handle."
# sentinel-only flagging path
assert step_phrase("missing", {
"sentinels_standardized": 7,
}) == "7 blank cells flagged"
def test_step_phrase_column_map_populated_and_noop():
assert step_phrase("column_map", {
"columns_renamed": 3, "columns_added": ["new"], "columns_dropped": ["old", "gone"],
}) == "3 columns renamed, 1 column added, 2 columns dropped"
assert step_phrase("column_map", {}) == "Columns already aligned."
def test_step_phrase_dedup_mockup_case():
assert step_phrase("dedup", {
"input_rows": 18442, "output_rows": 18130,
"duplicates_removed": 312, "groups": 147,
}) == "312 duplicates removed across 147 groups (18,442 → 18,130 rows)"
def test_step_phrase_dedup_noop():
assert step_phrase("dedup", {"duplicates_removed": 0}) == "No duplicates found."
assert step_phrase("dedup", {}) == "No duplicates found."
# ---------------------------------------------------------------------------
# Pluralization (_n) through step_phrase
# ---------------------------------------------------------------------------
def test_step_phrase_dedup_singular():
assert step_phrase("dedup", {
"input_rows": 10, "output_rows": 9,
"duplicates_removed": 1, "groups": 1,
}) == "1 duplicate removed across 1 group (10 → 9 rows)"
def test_step_phrase_missing_singular():
assert step_phrase("missing", {
"rows_dropped": 1, "columns_dropped": ["x"],
}) == "1 row dropped, 1 column dropped"
def test_n_singular_vs_plural_every_noun():
assert _n(1, "cell") == "1 cell"
assert _n(2, "cell") == "2 cells"
assert _n(1, "row") == "1 row"
assert _n(3, "row") == "3 rows"
assert _n(1, "column") == "1 column"
assert _n(5, "column") == "5 columns"
assert _n(1, "duplicate") == "1 duplicate"
assert _n(9, "duplicate") == "9 duplicates"
assert _n(1, "group") == "1 group"
assert _n(4, "group") == "4 groups"
def test_n_thousands_separator():
assert _n(1204, "cell") == "1,204 cells"
assert _n(18442, "row") == "18,442 rows"
# ---------------------------------------------------------------------------
# Column prose (_fmt_cols)
# ---------------------------------------------------------------------------
def test_fmt_cols_zero():
assert _fmt_cols([]) == ""
def test_fmt_cols_one():
assert _fmt_cols(["name"]) == "name"
def test_fmt_cols_two():
assert _fmt_cols(["name", "city"]) == "name & city"
def test_fmt_cols_three():
assert _fmt_cols(["a", "b", "c"]) == "a, b & c"
def test_fmt_cols_four_or_more():
assert _fmt_cols(["a", "b", "c", "d"]) == "a, b & 2 more"
assert _fmt_cols(["a", "b", "c", "d", "e"]) == "a, b & 3 more"
def test_fmt_cols_coerces_non_strings():
assert _fmt_cols([1, 2]) == "1 & 2"
# ---------------------------------------------------------------------------
# step_status — pill levels + details
# ---------------------------------------------------------------------------
def test_step_status_clean_is_ok():
assert step_status("text_clean", {"cells_changed": 5}) == ("✓ ok", "ok", "")
def test_step_status_skipped():
label, level, detail = step_status("text_clean", {"cells_changed": 5}, skipped=True)
assert level == "skipped"
assert detail == ""
assert "skipped" in label
def test_step_status_error_uses_first_line_only():
label, level, detail = step_status(
"dedup", {}, error="X: msg\nline2\nline3",
)
assert level == "error"
assert detail == "X: msg"
assert "error" in label
def test_step_status_error_takes_precedence_over_skipped():
label, level, detail = step_status(
"text_clean", {}, skipped=True, error="boom\nsecond",
)
assert level == "error"
assert detail == "boom"
def test_step_status_format_standardize_unparseable_warns():
label, level, detail = step_status(
"format_standardize", {"cells_changed": 100, "cells_unparseable": 141},
)
assert level == "warn"
assert "141 skipped" in label
assert detail # non-empty inline detail
def test_step_status_format_standardize_no_unparseable_is_ok():
assert step_status(
"format_standardize", {"cells_changed": 100},
) == ("✓ ok", "ok", "")
def test_step_status_column_map_coercion_failures_warn():
label, level, detail = step_status(
"column_map", {"coercion_failures": {"age": 4}},
)
assert level == "warn"
assert "4 not coerced" in label
assert detail
def test_step_status_column_map_missing_required_targets_warn():
label, level, detail = step_status(
"column_map", {"missing_required_targets": ["email"]},
)
assert level == "warn"
assert "missing targets" in label
assert "email" in detail
def test_step_status_column_map_missing_targets_take_precedence_over_coercion():
# both present → missing-targets branch wins
label, level, detail = step_status(
"column_map",
{"missing_required_targets": ["email"], "coercion_failures": {"age": 4}},
)
assert level == "warn"
assert "missing targets" in label
def test_step_status_unknown_tool_is_ok():
assert step_status("mystery", {"foo": 1}) == ("✓ ok", "ok", "")

293
tests/test_cli_pipeline.py Normal file
View File

@@ -0,0 +1,293 @@
"""Integration tests for the pipeline CLI (src/cli_pipeline.py).
The Typer ``app`` is invoked directly via ``CliRunner`` to bypass the
license ``guard(...)`` that ``main()`` runs before ``app()`` — matching the
house pattern in ``test_cli_text_clean.py``.
"""
from __future__ import annotations
import json
import pandas as pd
import pytest
from typer.testing import CliRunner
from src.cli_pipeline import app
from src.core.pipeline import Pipeline, _DEFAULT_ORDER
runner = CliRunner()
@pytest.fixture
def messy_csv(tmp_path):
"""A small messy CSV with duplicate / whitespace / mixed-case rows."""
df = pd.DataFrame({
"name": [" Alice ", "alice", "Bob", "Charlie"],
"email": ["A@X.COM", "a@x.com", "bob@x.com", "charlie@x.com"],
"phone": ["555-1234", "5551234", "555-9999", "555-0000"],
"signup_date": ["2020-01-01", "2020-01-01", "2020-02-02", "2020-03-03"],
})
path = tmp_path / "messy.csv"
df.to_csv(path, index=False)
return path
def _pipeline_artifacts(csv_path):
"""The output CSV + audit JSON the CLI writes next to *csv_path*."""
out_csv = csv_path.parent / f"{csv_path.stem}_pipeline.csv"
audit = csv_path.parent / f"{csv_path.stem}_pipeline.json"
return out_csv, audit
# ---------------------------------------------------------------------------
# --recommend
# ---------------------------------------------------------------------------
class TestRecommend:
def test_recommend_prints_valid_json(self):
result = runner.invoke(app, ["--recommend"])
assert result.exit_code == 0
data = json.loads(result.output)
assert "steps" in data
tools = [s["tool"] for s in data["steps"]]
assert tools == list(_DEFAULT_ORDER)
def test_recommend_default_tools_in_order(self):
result = runner.invoke(app, ["--recommend"])
data = json.loads(result.output)
tools = [s["tool"] for s in data["steps"]]
assert tools == ["text_clean", "format_standardize", "missing", "dedup"]
assert len(tools) == 4
def test_recommend_output_writes_loadable_file(self, tmp_path):
out = tmp_path / "pipeline.json"
result = runner.invoke(app, ["--recommend", "--output", str(out)])
assert result.exit_code == 0
assert out.exists()
# Confirmation message printed instead of raw JSON.
assert str(out) in result.output
pipe = Pipeline.from_file(out)
assert [s.tool for s in pipe.steps] == list(_DEFAULT_ORDER)
def test_recommend_output_message_not_json(self, tmp_path):
out = tmp_path / "pipeline.json"
result = runner.invoke(app, ["--recommend", "--output", str(out)])
assert "saved to" in result.output.lower()
# ---------------------------------------------------------------------------
# Argument / input validation
# ---------------------------------------------------------------------------
class TestArgValidation:
def test_no_args_exits_2(self):
result = runner.invoke(app, [])
assert result.exit_code == 2
assert "input file is required" in result.output.lower()
def test_nonexistent_input_exits_1(self, tmp_path):
missing = tmp_path / "does_not_exist_xyz.csv"
result = runner.invoke(app, [str(missing)])
assert result.exit_code == 1
assert "not found" in result.output.lower()
def test_pipeline_and_steps_together_exits_1(self, messy_csv, tmp_path):
pj = tmp_path / "p.json"
Pipeline.from_dict({"steps": [{"tool": "text_clean"}]}).to_file(pj)
result = runner.invoke(
app,
[str(messy_csv), "--pipeline", str(pj), "--steps", "text_clean"],
)
assert result.exit_code == 1
assert "not both" in result.output.lower()
def test_pipeline_nonexistent_exits_1(self, messy_csv, tmp_path):
missing = tmp_path / "no_such_pipeline.json"
result = runner.invoke(
app, [str(messy_csv), "--pipeline", str(missing)],
)
assert result.exit_code == 1
assert "not found" in result.output.lower()
def test_unknown_tool_in_steps_errors(self, messy_csv):
result = runner.invoke(app, [str(messy_csv), "--steps", "bogus_tool"])
assert result.exit_code != 0
# Helpful error naming the offending value.
assert "bogus_tool" in result.output
# ---------------------------------------------------------------------------
# Dry-run (default)
# ---------------------------------------------------------------------------
class TestDryRun:
def test_dry_run_exit_0_and_plan_printed(self, messy_csv):
result = runner.invoke(app, [str(messy_csv)])
assert result.exit_code == 0
assert "Pipeline plan:" in result.output
assert "plan-only run" in result.output
def test_dry_run_writes_no_artifacts(self, messy_csv):
result = runner.invoke(app, [str(messy_csv)])
assert result.exit_code == 0
out_csv, audit = _pipeline_artifacts(messy_csv)
assert not out_csv.exists()
assert not audit.exists()
# ---------------------------------------------------------------------------
# --apply
# ---------------------------------------------------------------------------
class TestApply:
def test_apply_default_pipeline_writes_outputs(self, messy_csv):
result = runner.invoke(app, [str(messy_csv), "--apply"])
assert result.exit_code == 0
out_csv, audit = _pipeline_artifacts(messy_csv)
assert out_csv.exists()
assert audit.exists()
# Output CSV is readable.
df = pd.read_csv(out_csv)
assert len(df.columns) >= 1
def test_apply_audit_has_documented_keys(self, messy_csv):
result = runner.invoke(app, [str(messy_csv), "--apply"])
assert result.exit_code == 0
_, audit = _pipeline_artifacts(messy_csv)
data = json.loads(audit.read_text())
for key in (
"pipeline", "warnings", "initial_rows", "final_rows",
"total_elapsed_seconds", "steps",
):
assert key in data, f"missing audit key: {key}"
# One step entry per pipeline step (default = 4).
assert len(data["steps"]) == len(_DEFAULT_ORDER)
for step in data["steps"]:
for k in (
"tool", "name", "enabled", "skipped",
"elapsed_seconds", "summary", "error",
):
assert k in step, f"missing step key: {k}"
def test_apply_dedup_reduces_rows(self, messy_csv):
result = runner.invoke(app, [str(messy_csv), "--apply"])
assert result.exit_code == 0
_, audit = _pipeline_artifacts(messy_csv)
data = json.loads(audit.read_text())
# 4 input rows; the first two are duplicates once cleaned/standardized.
assert data["initial_rows"] == 4
assert data["final_rows"] < data["initial_rows"]
def test_apply_custom_output_path(self, messy_csv, tmp_path):
out = tmp_path / "custom.csv"
result = runner.invoke(
app, [str(messy_csv), "--apply", "--output", str(out)],
)
assert result.exit_code == 0
assert out.exists()
# Default-named CSV should NOT be written when --output is given.
default_csv, _ = _pipeline_artifacts(messy_csv)
assert not default_csv.exists()
# Audit JSON is still written next to the input.
_, audit = _pipeline_artifacts(messy_csv)
assert audit.exists()
def test_apply_custom_steps_subset(self, messy_csv):
result = runner.invoke(
app, [str(messy_csv), "--apply", "--steps", "text_clean,missing"],
)
assert result.exit_code == 0
_, audit = _pipeline_artifacts(messy_csv)
data = json.loads(audit.read_text())
tools = [s["tool"] for s in data["steps"]]
assert tools == ["text_clean", "missing"]
# ---------------------------------------------------------------------------
# Strict mode
# ---------------------------------------------------------------------------
class TestStrict:
def test_strict_out_of_order_exits_2(self, messy_csv):
result = runner.invoke(
app,
[str(messy_csv), "--steps", "dedup,text_clean", "--strict", "--apply"],
)
assert result.exit_code == 2
assert "abort" in result.output.lower()
def test_strict_out_of_order_writes_nothing(self, messy_csv):
result = runner.invoke(
app,
[str(messy_csv), "--steps", "dedup,text_clean", "--strict", "--apply"],
)
assert result.exit_code == 2
out_csv, audit = _pipeline_artifacts(messy_csv)
assert not out_csv.exists()
assert not audit.exists()
# ---------------------------------------------------------------------------
# Round-trip: --recommend --output then --pipeline --apply
# ---------------------------------------------------------------------------
class TestRoundTrip:
def test_save_then_run_saved_pipeline(self, messy_csv, tmp_path):
pj = tmp_path / "p.json"
r1 = runner.invoke(app, ["--recommend", "--output", str(pj)])
assert r1.exit_code == 0
assert pj.exists()
r2 = runner.invoke(
app, [str(messy_csv), "--pipeline", str(pj), "--apply"],
)
assert r2.exit_code == 0
out_csv, audit = _pipeline_artifacts(messy_csv)
assert out_csv.exists()
assert audit.exists()
# ---------------------------------------------------------------------------
# Step error handling (--continue-on-error)
# ---------------------------------------------------------------------------
class TestStepError:
"""A dedup step with an invalid survivor_rule raises a ConfigError at
run time, letting us exercise the stop/continue-on-error contract."""
def _bad_pipeline(self, tmp_path):
pj = tmp_path / "bad.json"
Pipeline.from_dict({
"steps": [{
"tool": "dedup",
"options": {"survivor_rule": "not_a_real_rule"},
}]
}).to_file(pj)
return pj
def test_step_error_halts_without_continue(self, messy_csv, tmp_path):
pj = self._bad_pipeline(tmp_path)
result = runner.invoke(
app, [str(messy_csv), "--pipeline", str(pj), "--apply"],
)
assert result.exit_code != 0
out_csv, audit = _pipeline_artifacts(messy_csv)
# Halted before writing output.
assert not out_csv.exists()
assert not audit.exists()
def test_continue_on_error_completes_and_records_error(self, messy_csv, tmp_path):
pj = self._bad_pipeline(tmp_path)
result = runner.invoke(
app,
[str(messy_csv), "--pipeline", str(pj), "--apply",
"--continue-on-error"],
)
assert result.exit_code == 0
out_csv, audit = _pipeline_artifacts(messy_csv)
assert out_csv.exists()
assert audit.exists()
data = json.loads(audit.read_text())
assert len(data["steps"]) == 1
assert data["steps"][0]["error"], "expected the failed step's error recorded"

View File

@@ -322,3 +322,499 @@ class TestSoftDependencies:
assert len(order) == len(TOOL_NAMES), (
f"SOFT_DEPENDENCIES contain a cycle; topo order={order}"
)
# ---------------------------------------------------------------------------
# Per-adapter summary correctness — exact numbers on KNOWN-messy input.
# Each adapter is also exercised through run_pipeline so the StepResult
# carries the adapter's summary verbatim.
# ---------------------------------------------------------------------------
def _run_one(df, tool, options):
"""Run a single-step pipeline and return (StepResult, PipelineResult)."""
res = run_pipeline(df, Pipeline(steps=[Step(tool, options)]))
return res.step_results[0], res
class TestTextCleanSummary:
def test_two_trimmable_cells_counted(self):
df = pd.DataFrame({
"a": [" x ", "y", " z"], # 2 cells need trimming (" x ", " z")
"b": ["ok", "fine", "good"], # already clean
})
out, summary = TOOL_ADAPTERS["text_clean"](df, {"trim": True})
assert summary["cells_total"] == 6
assert summary["cells_changed"] == 2
assert sorted(summary["columns_processed"]) == ["a", "b"]
assert out["a"].tolist() == ["x", "y", "z"]
def test_title_case_changes_all_cells(self):
df = pd.DataFrame({"name": ["alice smith", "BOB JONES"]})
out, summary = TOOL_ADAPTERS["text_clean"](df, {"case": "title"})
assert summary["cells_changed"] == 2
assert out["name"].tolist() == ["Alice Smith", "Bob Jones"]
def test_collapse_whitespace_counts_internal_runs(self):
df = pd.DataFrame({"name": ["a b", "c d", "e f"]})
out, summary = TOOL_ADAPTERS["text_clean"](
df, {"trim": True, "collapse_whitespace": True},
)
# "a b" and "e f" collapse; "c d" is already single-spaced.
assert summary["cells_changed"] == 2
assert out["name"].tolist() == ["a b", "c d", "e f"]
def test_summary_visible_through_run_pipeline(self):
df = pd.DataFrame({"a": [" x ", "y"]})
sr, _res = _run_one(df, "text_clean", {"trim": True})
assert sr.skipped is False
assert sr.error is None
assert sr.summary["cells_changed"] == 1
assert sr.summary["cells_total"] == 2
class TestFormatStandardizeSummary:
def test_one_unparseable_phone(self):
df = pd.DataFrame({
"phone": ["(415) 555-1234", "not a phone", "+44 20 7946 0958"],
})
out, summary = TOOL_ADAPTERS["format_standardize"](
df, {"column_types": {"phone": "phone"}},
)
assert summary["cells_total"] == 3
assert summary["cells_unparseable"] == 1
assert summary["cells_changed"] == 2
assert summary["columns_processed"] == ["phone"]
assert out["phone"].tolist() == [
"+14155551234", "not a phone", "+442079460958",
]
def test_date_standardization_counts(self):
df = pd.DataFrame({"signup_date": ["2024-01-05", "Jan 5 2024", "garbage"]})
out, summary = TOOL_ADAPTERS["format_standardize"](
df, {"column_types": {"signup_date": "date"}},
)
# "2024-01-05" already canonical; "Jan 5 2024" rewritten; "garbage" fails.
assert summary["cells_unparseable"] == 1
assert summary["cells_changed"] == 1
assert out["signup_date"].tolist()[:2] == ["2024-01-05", "2024-01-05"]
def test_summary_visible_through_run_pipeline(self):
df = pd.DataFrame({"phone": ["(415) 555-1234", "bad"]})
sr, _ = _run_one(df, "format_standardize", {"column_types": {"phone": "phone"}})
assert sr.summary["cells_unparseable"] == 1
assert sr.summary["columns_processed"] == ["phone"]
class TestMissingSummary:
def test_median_fills_each_blank(self):
df = pd.DataFrame({"val": [1.0, np.nan, 3.0, np.nan, 5.0]})
out, summary = TOOL_ADAPTERS["missing"](df, {"strategy": "median"})
assert summary["cells_filled"] == 2 # exactly the 2 NaNs
assert summary["rows_dropped"] == 0
assert out["val"].tolist() == [1.0, 3.0, 3.0, 3.0, 5.0] # median is 3.0
def test_drop_row_by_threshold(self):
# row_drop_threshold is the *fraction* of nulls needed to drop a row.
df = pd.DataFrame({
"a": [1.0, np.nan, 3.0],
"b": ["x", np.nan, "z"], # middle row is 100% null
})
out, summary = TOOL_ADAPTERS["missing"](
df, {"strategy": "drop_row", "row_drop_threshold": 0.4},
)
assert summary["rows_dropped"] == 1
assert len(out) == 2
def test_sentinel_standardization_count(self):
df = pd.DataFrame({"x": ["ok", "N/A", "fine", "N/A"]})
out, summary = TOOL_ADAPTERS["missing"](df, {
"strategy": "none",
"sentinels": ["N/A"],
"standardize_sentinels": True,
})
assert summary["sentinels_standardized"] == 2
# The two "N/A" cells became real NaN.
assert out["x"].isna().sum() == 2
assert out["x"].tolist()[0] == "ok"
def test_summary_visible_through_run_pipeline(self):
df = pd.DataFrame({"val": [1.0, np.nan, 3.0]})
sr, _ = _run_one(df, "missing", {"strategy": "median"})
assert sr.summary["cells_filled"] == 1
class TestColumnMapSummary:
def test_single_rename(self):
df = pd.DataFrame({"old": [1, 2], "keep": [3, 4]})
out, summary = TOOL_ADAPTERS["column_map"](
df, {"mapping": {"old": "new"}, "unmapped": "keep"},
)
assert summary["columns_renamed"] == 1
assert summary["columns_dropped"] == []
assert list(out.columns) == ["new", "keep"]
def test_unmapped_drop_reports_dropped_columns(self):
df = pd.DataFrame({"old": [1, 2], "keep": [3, 4]})
out, summary = TOOL_ADAPTERS["column_map"](
df, {"mapping": {"old": "new"}, "unmapped": "drop"},
)
assert summary["columns_renamed"] == 1
assert summary["columns_dropped"] == ["keep"]
assert list(out.columns) == ["new"]
def test_summary_visible_through_run_pipeline(self):
df = pd.DataFrame({"old": [1], "keep": [2]})
sr, _ = _run_one(df, "column_map", {"mapping": {"old": "new"}})
assert sr.summary["columns_renamed"] == 1
class TestDedupSummary:
def test_exact_duplicate_rows(self):
df = pd.DataFrame({
"email": ["a@x.com", "b@x.com", "a@x.com", "a@x.com"],
"name": ["A", "B", "A", "A"],
})
out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "first"})
assert summary["input_rows"] == 4
assert summary["output_rows"] == 2
assert summary["duplicates_removed"] == 2
assert summary["groups"] == 1
assert out["email"].tolist() == ["a@x.com", "b@x.com"]
def test_explicit_exact_strategy_on_column(self):
df = pd.DataFrame({
"email": ["a@x.com", "b@x.com", "a@x.com"],
"name": ["A", "B", "C"],
})
out, summary = TOOL_ADAPTERS["dedup"](df, {
"survivor_rule": "first",
"strategies": [{"columns": [
{"column": "email", "algorithm": "exact", "threshold": 100},
]}],
})
assert summary["duplicates_removed"] == 1
assert summary["groups"] == 1
def test_most_complete_keeps_fuller_survivor(self):
df = pd.DataFrame({
"email": ["a@x.com", "a@x.com"],
"name": ["", "Alice"], # second row is more complete
"phone": ["111", "111"],
})
out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "most_complete"})
assert summary["duplicates_removed"] == 1
assert out.iloc[0]["name"] == "Alice"
def test_no_duplicates_is_noop(self):
df = pd.DataFrame({"email": ["a@x.com", "b@x.com"], "name": ["A", "B"]})
out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "first"})
assert summary["duplicates_removed"] == 0
assert summary["output_rows"] == 2
def test_summary_visible_through_run_pipeline(self):
df = pd.DataFrame({"email": ["a@x.com", "a@x.com"], "name": ["A", "A"]})
sr, res = _run_one(df, "dedup", {"survivor_rule": "first"})
assert sr.summary["duplicates_removed"] == 1
assert res.final_rows == 1
# ---------------------------------------------------------------------------
# Data flow — a later step depends on an earlier step's output
# ---------------------------------------------------------------------------
class TestDataFlow:
def test_text_clean_enables_dedup_match(self):
# The two phones differ only by surrounding whitespace; without
# the trim they are distinct, so dedup alone would keep both.
df = pd.DataFrame({"phone": [" +14155551234 ", "+14155551234"]})
p = Pipeline(steps=[
Step("text_clean", {"trim": True}),
Step("dedup", {"survivor_rule": "first"}),
])
res = run_pipeline(df, p)
assert res.initial_rows == 2
assert res.final_rows == 1
assert res.final_df["phone"].tolist() == ["+14155551234"]
def test_dedup_default_matching_normalizes_whitespace(self):
# Note: dedup's exact matcher already normalizes surrounding
# whitespace, so the two phones collapse even WITHOUT a prior
# text_clean. The survivor still carries the un-trimmed value.
df = pd.DataFrame({"phone": [" +14155551234 ", "+14155551234"]})
res = run_pipeline(df, Pipeline(steps=[Step("dedup", {"survivor_rule": "first"})]))
assert res.final_rows == 1
# Survivor keeps the raw (still-padded) text — dedup does not clean.
assert res.final_df["phone"].tolist() == [" +14155551234 "]
def test_chained_initial_and_final_rows(self):
df = pd.DataFrame({
"name": [" Al ", "al", "Bob"],
"v": [1, 1, 2],
})
p = Pipeline(steps=[
Step("text_clean", {"trim": True, "case": "title"}),
Step("dedup", {"survivor_rule": "first"}),
])
res = run_pipeline(df, p)
# " Al " and "al" both become "Al" → duplicate rows collapse.
assert res.initial_rows == 3
assert res.final_rows == 2
assert "Al" in res.final_df["name"].tolist()
# ---------------------------------------------------------------------------
# Error handling — stop_on_error semantics
# ---------------------------------------------------------------------------
class TestErrorHandling:
def test_most_recent_without_date_raises_by_default(self, messy_df):
# dedup with survivor_rule="most_recent" but no date_column errors.
p = Pipeline(steps=[Step("dedup", {"survivor_rule": "most_recent"})])
with pytest.raises(InputValidationError):
run_pipeline(messy_df, p)
def test_continue_on_error_sets_error_string(self):
df = pd.DataFrame({"email": ["a@x.com", "a@x.com"], "name": ["A", "B"]})
p = Pipeline(steps=[
Step("text_clean", {"trim": True}),
Step("dedup", {"survivor_rule": "most_recent"}), # will fail
Step("missing", {"strategy": "none"}),
])
res = run_pipeline(df, p, stop_on_error=False)
bad = res.step_results[1]
assert bad.error is not None
assert isinstance(bad.error, str) and bad.error.strip()
# The failed step did NOT change the row count — previous df carried.
assert res.step_results[2].error is None
assert res.final_rows == 2
def test_failed_step_summary_is_empty(self):
df = pd.DataFrame({"e": ["a", "a"], "n": ["x", "y"]})
p = Pipeline(steps=[Step("dedup", {"survivor_rule": "most_recent"})])
res = run_pipeline(df, p, stop_on_error=False)
assert res.step_results[0].summary == {}
assert res.step_results[0].skipped is False
def test_config_error_on_bad_survivor_rule_propagates(self, messy_df):
p = Pipeline(steps=[Step("dedup", {"survivor_rule": "nonsense"})])
with pytest.raises(ConfigError):
run_pipeline(messy_df, p)
# ---------------------------------------------------------------------------
# Edge inputs
# ---------------------------------------------------------------------------
class TestEdgeInputs:
def test_empty_dataframe_runs_clean(self):
empty = pd.DataFrame({"name": [], "phone": []})
res = run_pipeline(
empty,
recommended_pipeline(options={"missing": {"strategy": "none"}}),
)
assert res.initial_rows == 0
assert res.final_rows == 0
assert all(sr.error is None for sr in res.step_results if not sr.skipped)
def test_single_column_dataframe(self):
df = pd.DataFrame({"name": [" Al ", "al"]})
res = run_pipeline(
df, Pipeline(steps=[Step("text_clean", {"trim": True, "case": "title"})]),
)
assert res.final_df["name"].tolist() == ["Al", "Al"]
def test_all_steps_disabled_returns_unchanged(self, messy_df):
snapshot = messy_df.copy(deep=True)
p = Pipeline(steps=[
Step("text_clean", enabled=False),
Step("format_standardize", enabled=False),
Step("missing", enabled=False),
Step("dedup", enabled=False),
])
res = run_pipeline(messy_df, p)
assert all(sr.skipped is True for sr in res.step_results)
assert res.final_rows == res.initial_rows == 5
pd.testing.assert_frame_equal(res.final_df, snapshot)
def test_empty_pipeline_is_identity(self, messy_df):
res = run_pipeline(messy_df, Pipeline(steps=[]))
assert res.step_results == []
assert res.final_rows == 5
pd.testing.assert_frame_equal(res.final_df, messy_df)
# ---------------------------------------------------------------------------
# Serialization round-trips with disabled / named / nested-option steps
# ---------------------------------------------------------------------------
class TestSerializationRoundtrips:
def test_disabled_and_named_step_survive_dict(self):
p = Pipeline(steps=[
Step("text_clean", {"trim": True}, enabled=False, name="Pre-clean"),
Step("dedup", {"survivor_rule": "first"}, name="Final dedup"),
])
loaded = Pipeline.from_dict(p.to_dict())
assert loaded.steps[0].enabled is False
assert loaded.steps[0].name == "Pre-clean"
assert loaded.steps[0].options == {"trim": True}
assert loaded.steps[1].name == "Final dedup"
assert loaded.steps[1].display_name() == "Final dedup"
def test_nested_options_survive_dict(self):
nested = {
"column_types": {"phone": "phone", "signup_date": "date"},
}
strat = {
"survivor_rule": "most_complete",
"strategies": [{"columns": [
{"column": "email", "algorithm": "exact", "threshold": 100},
]}],
}
p = Pipeline(steps=[
Step("format_standardize", nested),
Step("dedup", strat),
])
loaded = Pipeline.from_dict(p.to_dict())
assert loaded.steps[0].options["column_types"] == nested["column_types"]
assert loaded.steps[1].options["strategies"] == strat["strategies"]
def test_nested_options_survive_file(self, tmp_path):
p = Pipeline(steps=[
Step("format_standardize",
{"column_types": {"phone": "phone"}},
enabled=False, name="formats"),
])
path = tmp_path / "pipe.json"
p.to_file(path)
loaded = Pipeline.from_file(path)
assert loaded.steps[0].enabled is False
assert loaded.steps[0].name == "formats"
assert loaded.steps[0].options == {"column_types": {"phone": "phone"}}
def test_roundtrip_is_idempotent(self):
p = Pipeline(steps=[
Step("text_clean", {"trim": True}, enabled=False, name="x"),
Step("missing", {"strategy": "median"}),
])
once = Pipeline.from_dict(p.to_dict())
twice = Pipeline.from_dict(once.to_dict())
assert once.to_dict() == twice.to_dict() == p.to_dict()
# ---------------------------------------------------------------------------
# recommended_pipeline(include=...) — subsetting, ordering, option seeding
# ---------------------------------------------------------------------------
class TestRecommendedInclude:
def test_subset_preserves_given_order(self):
p = recommended_pipeline(include=["dedup", "text_clean"])
assert [s.tool for s in p.steps] == ["dedup", "text_clean"]
def test_column_map_first(self):
p = recommended_pipeline(include=[
"column_map", "text_clean", "format_standardize", "missing", "dedup",
])
assert p.steps[0].tool == "column_map"
assert len(p.steps) == 5
def test_column_map_last(self):
p = recommended_pipeline(include=[
"text_clean", "format_standardize", "missing", "dedup", "column_map",
])
assert p.steps[-1].tool == "column_map"
def test_unknown_tool_in_include_raises(self):
with pytest.raises(InputValidationError):
recommended_pipeline(include=["text_clean", "not_a_tool"])
def test_options_seeding_only_targets_named_tool(self):
p = recommended_pipeline(
include=["text_clean", "dedup"],
options={"dedup": {"survivor_rule": "last"}},
)
assert p.steps[0].options == {} # text_clean unseeded
assert p.steps[1].options == {"survivor_rule": "last"}
def test_empty_include_yields_no_steps(self):
p = recommended_pipeline(include=[])
assert p.steps == []
def test_seeded_options_are_independent_copies(self):
seed = {"text_clean": {"trim": True}}
p = recommended_pipeline(include=["text_clean"], options=seed)
# Mutating the produced step must not leak back into the seed.
p.steps[0].options["trim"] = False
assert seed["text_clean"]["trim"] is True
# ---------------------------------------------------------------------------
# Realistic demo integration — messy customers table end-to-end
# ---------------------------------------------------------------------------
class TestDemoIntegration:
@pytest.fixture
def customers_df(self):
return pd.DataFrame({
"Full Name": [" alice smith ", "BOB JONES", "alice smith", ""],
"Email": ["alice@x.com ", "bob@x.com", "alice@x.com", "carol@x.com"],
"Phone": [" +14155551234 ", "+442079460958",
"+14155551234", "+13035551111"],
})
def test_full_recommended_plus_column_map(self, customers_df):
p = recommended_pipeline(
include=["text_clean", "format_standardize", "missing",
"dedup", "column_map"],
options={
"text_clean": {"trim": True, "collapse_whitespace": True},
"missing": {"strategy": "none"},
"dedup": {
"survivor_rule": "most_complete",
"strategies": [{"columns": [
{"column": "Phone", "algorithm": "exact", "threshold": 100},
]}],
},
"column_map": {
"mapping": {"Full Name": "name", "Email": "email",
"Phone": "phone"},
"unmapped": "keep",
},
},
)
res = run_pipeline(customers_df, p)
# Two rows share the same phone after trimming → one duplicate removed.
assert res.initial_rows == 4
assert res.final_rows == 3
assert res.final_rows < res.initial_rows
# Headers were renamed by the trailing column_map step.
assert list(res.final_df.columns) == ["name", "email", "phone"]
# The surviving Alice row kept its (trimmed) phone.
phones = res.final_df["phone"].tolist()
assert "+14155551234" in phones
assert phones.count("+14155551234") == 1 # only one Alice survives
# Every executed step succeeded.
assert all(sr.error is None for sr in res.step_results if not sr.skipped)
# column_map reported the three renames.
cm = res.step_results[-1]
assert cm.step.tool == "column_map"
assert cm.summary["columns_renamed"] == 3
def test_demo_dedup_step_reports_one_duplicate(self, customers_df):
p = recommended_pipeline(options={
"text_clean": {"trim": True},
"missing": {"strategy": "none"},
"dedup": {
"survivor_rule": "most_complete",
"strategies": [{"columns": [
{"column": "Phone", "algorithm": "exact", "threshold": 100},
]}],
},
})
res = run_pipeline(customers_df, p)
dedup_sr = next(s for s in res.step_results if s.step.tool == "dedup")
assert dedup_sr.summary["duplicates_removed"] == 1
assert dedup_sr.summary["groups"] == 1