"""Integration tests for the pipeline CLI (src/cli_pipeline.py). The Typer ``app`` is invoked directly via ``CliRunner`` to bypass the license ``guard(...)`` that ``main()`` runs before ``app()`` — matching the house pattern in ``test_cli_text_clean.py``. """ from __future__ import annotations import json import pandas as pd import pytest from typer.testing import CliRunner from src.cli_pipeline import app from src.core.pipeline import Pipeline, _DEFAULT_ORDER runner = CliRunner() @pytest.fixture def messy_csv(tmp_path): """A small messy CSV with duplicate / whitespace / mixed-case rows.""" df = pd.DataFrame({ "name": [" Alice ", "alice", "Bob", "Charlie"], "email": ["A@X.COM", "a@x.com", "bob@x.com", "charlie@x.com"], "phone": ["555-1234", "5551234", "555-9999", "555-0000"], "signup_date": ["2020-01-01", "2020-01-01", "2020-02-02", "2020-03-03"], }) path = tmp_path / "messy.csv" df.to_csv(path, index=False) return path def _pipeline_artifacts(csv_path): """The output CSV + audit JSON the CLI writes next to *csv_path*.""" out_csv = csv_path.parent / f"{csv_path.stem}_pipeline.csv" audit = csv_path.parent / f"{csv_path.stem}_pipeline.json" return out_csv, audit # --------------------------------------------------------------------------- # --recommend # --------------------------------------------------------------------------- class TestRecommend: def test_recommend_prints_valid_json(self): result = runner.invoke(app, ["--recommend"]) assert result.exit_code == 0 data = json.loads(result.output) assert "steps" in data tools = [s["tool"] for s in data["steps"]] assert tools == list(_DEFAULT_ORDER) def test_recommend_default_tools_in_order(self): result = runner.invoke(app, ["--recommend"]) data = json.loads(result.output) tools = [s["tool"] for s in data["steps"]] assert tools == ["text_clean", "format_standardize", "missing", "dedup"] assert len(tools) == 4 def test_recommend_output_writes_loadable_file(self, tmp_path): out = tmp_path / "pipeline.json" result = runner.invoke(app, ["--recommend", "--output", str(out)]) assert result.exit_code == 0 assert out.exists() # Confirmation message printed instead of raw JSON. assert str(out) in result.output pipe = Pipeline.from_file(out) assert [s.tool for s in pipe.steps] == list(_DEFAULT_ORDER) def test_recommend_output_message_not_json(self, tmp_path): out = tmp_path / "pipeline.json" result = runner.invoke(app, ["--recommend", "--output", str(out)]) assert "saved to" in result.output.lower() # --------------------------------------------------------------------------- # Argument / input validation # --------------------------------------------------------------------------- class TestArgValidation: def test_no_args_exits_2(self): result = runner.invoke(app, []) assert result.exit_code == 2 assert "input file is required" in result.output.lower() def test_nonexistent_input_exits_1(self, tmp_path): missing = tmp_path / "does_not_exist_xyz.csv" result = runner.invoke(app, [str(missing)]) assert result.exit_code == 1 assert "not found" in result.output.lower() def test_pipeline_and_steps_together_exits_1(self, messy_csv, tmp_path): pj = tmp_path / "p.json" Pipeline.from_dict({"steps": [{"tool": "text_clean"}]}).to_file(pj) result = runner.invoke( app, [str(messy_csv), "--pipeline", str(pj), "--steps", "text_clean"], ) assert result.exit_code == 1 assert "not both" in result.output.lower() def test_pipeline_nonexistent_exits_1(self, messy_csv, tmp_path): missing = tmp_path / "no_such_pipeline.json" result = runner.invoke( app, [str(messy_csv), "--pipeline", str(missing)], ) assert result.exit_code == 1 assert "not found" in result.output.lower() def test_unknown_tool_in_steps_errors(self, messy_csv): result = runner.invoke(app, [str(messy_csv), "--steps", "bogus_tool"]) assert result.exit_code != 0 # Helpful error naming the offending value. assert "bogus_tool" in result.output # --------------------------------------------------------------------------- # Dry-run (default) # --------------------------------------------------------------------------- class TestDryRun: def test_dry_run_exit_0_and_plan_printed(self, messy_csv): result = runner.invoke(app, [str(messy_csv)]) assert result.exit_code == 0 assert "Pipeline plan:" in result.output assert "plan-only run" in result.output def test_dry_run_writes_no_artifacts(self, messy_csv): result = runner.invoke(app, [str(messy_csv)]) assert result.exit_code == 0 out_csv, audit = _pipeline_artifacts(messy_csv) assert not out_csv.exists() assert not audit.exists() # --------------------------------------------------------------------------- # --apply # --------------------------------------------------------------------------- class TestApply: def test_apply_default_pipeline_writes_outputs(self, messy_csv): result = runner.invoke(app, [str(messy_csv), "--apply"]) assert result.exit_code == 0 out_csv, audit = _pipeline_artifacts(messy_csv) assert out_csv.exists() assert audit.exists() # Output CSV is readable. df = pd.read_csv(out_csv) assert len(df.columns) >= 1 def test_apply_audit_has_documented_keys(self, messy_csv): result = runner.invoke(app, [str(messy_csv), "--apply"]) assert result.exit_code == 0 _, audit = _pipeline_artifacts(messy_csv) data = json.loads(audit.read_text()) for key in ( "pipeline", "warnings", "initial_rows", "final_rows", "total_elapsed_seconds", "steps", ): assert key in data, f"missing audit key: {key}" # One step entry per pipeline step (default = 4). assert len(data["steps"]) == len(_DEFAULT_ORDER) for step in data["steps"]: for k in ( "tool", "name", "enabled", "skipped", "elapsed_seconds", "summary", "error", ): assert k in step, f"missing step key: {k}" def test_apply_dedup_reduces_rows(self, messy_csv): result = runner.invoke(app, [str(messy_csv), "--apply"]) assert result.exit_code == 0 _, audit = _pipeline_artifacts(messy_csv) data = json.loads(audit.read_text()) # 4 input rows; the first two are duplicates once cleaned/standardized. assert data["initial_rows"] == 4 assert data["final_rows"] < data["initial_rows"] def test_apply_custom_output_path(self, messy_csv, tmp_path): out = tmp_path / "custom.csv" result = runner.invoke( app, [str(messy_csv), "--apply", "--output", str(out)], ) assert result.exit_code == 0 assert out.exists() # Default-named CSV should NOT be written when --output is given. default_csv, _ = _pipeline_artifacts(messy_csv) assert not default_csv.exists() # Audit JSON is still written next to the input. _, audit = _pipeline_artifacts(messy_csv) assert audit.exists() def test_apply_custom_steps_subset(self, messy_csv): result = runner.invoke( app, [str(messy_csv), "--apply", "--steps", "text_clean,missing"], ) assert result.exit_code == 0 _, audit = _pipeline_artifacts(messy_csv) data = json.loads(audit.read_text()) tools = [s["tool"] for s in data["steps"]] assert tools == ["text_clean", "missing"] # --------------------------------------------------------------------------- # Strict mode # --------------------------------------------------------------------------- class TestStrict: def test_strict_out_of_order_exits_2(self, messy_csv): result = runner.invoke( app, [str(messy_csv), "--steps", "dedup,text_clean", "--strict", "--apply"], ) assert result.exit_code == 2 assert "abort" in result.output.lower() def test_strict_out_of_order_writes_nothing(self, messy_csv): result = runner.invoke( app, [str(messy_csv), "--steps", "dedup,text_clean", "--strict", "--apply"], ) assert result.exit_code == 2 out_csv, audit = _pipeline_artifacts(messy_csv) assert not out_csv.exists() assert not audit.exists() # --------------------------------------------------------------------------- # Round-trip: --recommend --output then --pipeline --apply # --------------------------------------------------------------------------- class TestRoundTrip: def test_save_then_run_saved_pipeline(self, messy_csv, tmp_path): pj = tmp_path / "p.json" r1 = runner.invoke(app, ["--recommend", "--output", str(pj)]) assert r1.exit_code == 0 assert pj.exists() r2 = runner.invoke( app, [str(messy_csv), "--pipeline", str(pj), "--apply"], ) assert r2.exit_code == 0 out_csv, audit = _pipeline_artifacts(messy_csv) assert out_csv.exists() assert audit.exists() # --------------------------------------------------------------------------- # Step error handling (--continue-on-error) # --------------------------------------------------------------------------- class TestStepError: """A dedup step with an invalid survivor_rule raises a ConfigError at run time, letting us exercise the stop/continue-on-error contract.""" def _bad_pipeline(self, tmp_path): pj = tmp_path / "bad.json" Pipeline.from_dict({ "steps": [{ "tool": "dedup", "options": {"survivor_rule": "not_a_real_rule"}, }] }).to_file(pj) return pj def test_step_error_halts_without_continue(self, messy_csv, tmp_path): pj = self._bad_pipeline(tmp_path) result = runner.invoke( app, [str(messy_csv), "--pipeline", str(pj), "--apply"], ) assert result.exit_code != 0 out_csv, audit = _pipeline_artifacts(messy_csv) # Halted before writing output. assert not out_csv.exists() assert not audit.exists() def test_continue_on_error_completes_and_records_error(self, messy_csv, tmp_path): pj = self._bad_pipeline(tmp_path) result = runner.invoke( app, [str(messy_csv), "--pipeline", str(pj), "--apply", "--continue-on-error"], ) assert result.exit_code == 0 out_csv, audit = _pipeline_artifacts(messy_csv) assert out_csv.exists() assert audit.exists() data = json.loads(audit.read_text()) assert len(data["steps"]) == 1 assert data["steps"][0]["error"], "expected the failed step's error recorded"