Files
datatools-dev/tests/test_cli_pipeline.py
Michael 38616d69e2 test(pipeline): complete automated test suite for the pipeline feature
Adds ~115 tests pinning the Automated Workflows feature end to end:

- tests/test_pipeline.py (+43): per-adapter summary correctness on known
  inputs, multi-step data flow, error stop/continue contract, empty /
  single-column / all-disabled edges, dict+file serialization round-trips,
  recommended_pipeline(include=…), and a synthesized demo integration run.
- tests/test_cli_pipeline.py (new, 21): --recommend, dry-run-by-default,
  --apply output CSV + audit JSON, --steps, --strict abort, arg validation,
  --continue-on-error vs halt, and a save→load round-trip. Invokes the Typer
  app directly to bypass the license guard (house pattern).
- tests/gui/test_pipeline_builder.py (+9): reorder ▲/▼, disabled edge
  buttons, disabled-step persistence across reorder, restore-recommended,
  Advanced JSON export/import, and per-tool Configure panels emitting the
  correct option dicts (AppTest).
- tests/gui/test_pipeline_phrasing.py (new, 30): step_phrase/step_status and
  the adapter-key→friendly-name bridge as pure functions, incl. pluralization,
  column prose, and warn/error status derivation.

Full suite: 2565 passed, 91 skipped. No product bugs surfaced. Documents the
coverage in docs/DEVELOPER.md (test tree + a pipeline-coverage note).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-22 18:31:15 +00:00

294 lines
11 KiB
Python

"""Integration tests for the pipeline CLI (src/cli_pipeline.py).
The Typer ``app`` is invoked directly via ``CliRunner`` to bypass the
license ``guard(...)`` that ``main()`` runs before ``app()`` — matching the
house pattern in ``test_cli_text_clean.py``.
"""
from __future__ import annotations
import json
import pandas as pd
import pytest
from typer.testing import CliRunner
from src.cli_pipeline import app
from src.core.pipeline import Pipeline, _DEFAULT_ORDER
runner = CliRunner()
@pytest.fixture
def messy_csv(tmp_path):
"""A small messy CSV with duplicate / whitespace / mixed-case rows."""
df = pd.DataFrame({
"name": [" Alice ", "alice", "Bob", "Charlie"],
"email": ["A@X.COM", "a@x.com", "bob@x.com", "charlie@x.com"],
"phone": ["555-1234", "5551234", "555-9999", "555-0000"],
"signup_date": ["2020-01-01", "2020-01-01", "2020-02-02", "2020-03-03"],
})
path = tmp_path / "messy.csv"
df.to_csv(path, index=False)
return path
def _pipeline_artifacts(csv_path):
"""The output CSV + audit JSON the CLI writes next to *csv_path*."""
out_csv = csv_path.parent / f"{csv_path.stem}_pipeline.csv"
audit = csv_path.parent / f"{csv_path.stem}_pipeline.json"
return out_csv, audit
# ---------------------------------------------------------------------------
# --recommend
# ---------------------------------------------------------------------------
class TestRecommend:
def test_recommend_prints_valid_json(self):
result = runner.invoke(app, ["--recommend"])
assert result.exit_code == 0
data = json.loads(result.output)
assert "steps" in data
tools = [s["tool"] for s in data["steps"]]
assert tools == list(_DEFAULT_ORDER)
def test_recommend_default_tools_in_order(self):
result = runner.invoke(app, ["--recommend"])
data = json.loads(result.output)
tools = [s["tool"] for s in data["steps"]]
assert tools == ["text_clean", "format_standardize", "missing", "dedup"]
assert len(tools) == 4
def test_recommend_output_writes_loadable_file(self, tmp_path):
out = tmp_path / "pipeline.json"
result = runner.invoke(app, ["--recommend", "--output", str(out)])
assert result.exit_code == 0
assert out.exists()
# Confirmation message printed instead of raw JSON.
assert str(out) in result.output
pipe = Pipeline.from_file(out)
assert [s.tool for s in pipe.steps] == list(_DEFAULT_ORDER)
def test_recommend_output_message_not_json(self, tmp_path):
out = tmp_path / "pipeline.json"
result = runner.invoke(app, ["--recommend", "--output", str(out)])
assert "saved to" in result.output.lower()
# ---------------------------------------------------------------------------
# Argument / input validation
# ---------------------------------------------------------------------------
class TestArgValidation:
def test_no_args_exits_2(self):
result = runner.invoke(app, [])
assert result.exit_code == 2
assert "input file is required" in result.output.lower()
def test_nonexistent_input_exits_1(self, tmp_path):
missing = tmp_path / "does_not_exist_xyz.csv"
result = runner.invoke(app, [str(missing)])
assert result.exit_code == 1
assert "not found" in result.output.lower()
def test_pipeline_and_steps_together_exits_1(self, messy_csv, tmp_path):
pj = tmp_path / "p.json"
Pipeline.from_dict({"steps": [{"tool": "text_clean"}]}).to_file(pj)
result = runner.invoke(
app,
[str(messy_csv), "--pipeline", str(pj), "--steps", "text_clean"],
)
assert result.exit_code == 1
assert "not both" in result.output.lower()
def test_pipeline_nonexistent_exits_1(self, messy_csv, tmp_path):
missing = tmp_path / "no_such_pipeline.json"
result = runner.invoke(
app, [str(messy_csv), "--pipeline", str(missing)],
)
assert result.exit_code == 1
assert "not found" in result.output.lower()
def test_unknown_tool_in_steps_errors(self, messy_csv):
result = runner.invoke(app, [str(messy_csv), "--steps", "bogus_tool"])
assert result.exit_code != 0
# Helpful error naming the offending value.
assert "bogus_tool" in result.output
# ---------------------------------------------------------------------------
# Dry-run (default)
# ---------------------------------------------------------------------------
class TestDryRun:
def test_dry_run_exit_0_and_plan_printed(self, messy_csv):
result = runner.invoke(app, [str(messy_csv)])
assert result.exit_code == 0
assert "Pipeline plan:" in result.output
assert "plan-only run" in result.output
def test_dry_run_writes_no_artifacts(self, messy_csv):
result = runner.invoke(app, [str(messy_csv)])
assert result.exit_code == 0
out_csv, audit = _pipeline_artifacts(messy_csv)
assert not out_csv.exists()
assert not audit.exists()
# ---------------------------------------------------------------------------
# --apply
# ---------------------------------------------------------------------------
class TestApply:
def test_apply_default_pipeline_writes_outputs(self, messy_csv):
result = runner.invoke(app, [str(messy_csv), "--apply"])
assert result.exit_code == 0
out_csv, audit = _pipeline_artifacts(messy_csv)
assert out_csv.exists()
assert audit.exists()
# Output CSV is readable.
df = pd.read_csv(out_csv)
assert len(df.columns) >= 1
def test_apply_audit_has_documented_keys(self, messy_csv):
result = runner.invoke(app, [str(messy_csv), "--apply"])
assert result.exit_code == 0
_, audit = _pipeline_artifacts(messy_csv)
data = json.loads(audit.read_text())
for key in (
"pipeline", "warnings", "initial_rows", "final_rows",
"total_elapsed_seconds", "steps",
):
assert key in data, f"missing audit key: {key}"
# One step entry per pipeline step (default = 4).
assert len(data["steps"]) == len(_DEFAULT_ORDER)
for step in data["steps"]:
for k in (
"tool", "name", "enabled", "skipped",
"elapsed_seconds", "summary", "error",
):
assert k in step, f"missing step key: {k}"
def test_apply_dedup_reduces_rows(self, messy_csv):
result = runner.invoke(app, [str(messy_csv), "--apply"])
assert result.exit_code == 0
_, audit = _pipeline_artifacts(messy_csv)
data = json.loads(audit.read_text())
# 4 input rows; the first two are duplicates once cleaned/standardized.
assert data["initial_rows"] == 4
assert data["final_rows"] < data["initial_rows"]
def test_apply_custom_output_path(self, messy_csv, tmp_path):
out = tmp_path / "custom.csv"
result = runner.invoke(
app, [str(messy_csv), "--apply", "--output", str(out)],
)
assert result.exit_code == 0
assert out.exists()
# Default-named CSV should NOT be written when --output is given.
default_csv, _ = _pipeline_artifacts(messy_csv)
assert not default_csv.exists()
# Audit JSON is still written next to the input.
_, audit = _pipeline_artifacts(messy_csv)
assert audit.exists()
def test_apply_custom_steps_subset(self, messy_csv):
result = runner.invoke(
app, [str(messy_csv), "--apply", "--steps", "text_clean,missing"],
)
assert result.exit_code == 0
_, audit = _pipeline_artifacts(messy_csv)
data = json.loads(audit.read_text())
tools = [s["tool"] for s in data["steps"]]
assert tools == ["text_clean", "missing"]
# ---------------------------------------------------------------------------
# Strict mode
# ---------------------------------------------------------------------------
class TestStrict:
def test_strict_out_of_order_exits_2(self, messy_csv):
result = runner.invoke(
app,
[str(messy_csv), "--steps", "dedup,text_clean", "--strict", "--apply"],
)
assert result.exit_code == 2
assert "abort" in result.output.lower()
def test_strict_out_of_order_writes_nothing(self, messy_csv):
result = runner.invoke(
app,
[str(messy_csv), "--steps", "dedup,text_clean", "--strict", "--apply"],
)
assert result.exit_code == 2
out_csv, audit = _pipeline_artifacts(messy_csv)
assert not out_csv.exists()
assert not audit.exists()
# ---------------------------------------------------------------------------
# Round-trip: --recommend --output then --pipeline --apply
# ---------------------------------------------------------------------------
class TestRoundTrip:
def test_save_then_run_saved_pipeline(self, messy_csv, tmp_path):
pj = tmp_path / "p.json"
r1 = runner.invoke(app, ["--recommend", "--output", str(pj)])
assert r1.exit_code == 0
assert pj.exists()
r2 = runner.invoke(
app, [str(messy_csv), "--pipeline", str(pj), "--apply"],
)
assert r2.exit_code == 0
out_csv, audit = _pipeline_artifacts(messy_csv)
assert out_csv.exists()
assert audit.exists()
# ---------------------------------------------------------------------------
# Step error handling (--continue-on-error)
# ---------------------------------------------------------------------------
class TestStepError:
"""A dedup step with an invalid survivor_rule raises a ConfigError at
run time, letting us exercise the stop/continue-on-error contract."""
def _bad_pipeline(self, tmp_path):
pj = tmp_path / "bad.json"
Pipeline.from_dict({
"steps": [{
"tool": "dedup",
"options": {"survivor_rule": "not_a_real_rule"},
}]
}).to_file(pj)
return pj
def test_step_error_halts_without_continue(self, messy_csv, tmp_path):
pj = self._bad_pipeline(tmp_path)
result = runner.invoke(
app, [str(messy_csv), "--pipeline", str(pj), "--apply"],
)
assert result.exit_code != 0
out_csv, audit = _pipeline_artifacts(messy_csv)
# Halted before writing output.
assert not out_csv.exists()
assert not audit.exists()
def test_continue_on_error_completes_and_records_error(self, messy_csv, tmp_path):
pj = self._bad_pipeline(tmp_path)
result = runner.invoke(
app,
[str(messy_csv), "--pipeline", str(pj), "--apply",
"--continue-on-error"],
)
assert result.exit_code == 0
out_csv, audit = _pipeline_artifacts(messy_csv)
assert out_csv.exists()
assert audit.exists()
data = json.loads(audit.read_text())
assert len(data["steps"]) == 1
assert data["steps"][0]["error"], "expected the failed step's error recorded"