"""Tests for src/core/pipeline.py.""" from __future__ import annotations import json import numpy as np import pandas as pd import pytest from src.core.errors import ConfigError, InputValidationError from src.core.pipeline import ( Pipeline, PipelineResult, SOFT_DEPENDENCIES, Step, StepResult, TOOL_ADAPTERS, TOOL_NAMES, recommended_pipeline, run_pipeline, validate_pipeline, ) # --------------------------------------------------------------------------- # Step / Pipeline construction # --------------------------------------------------------------------------- class TestStep: def test_unknown_tool_raises(self): with pytest.raises(ConfigError): Step(tool="bogus_tool") def test_default_options_empty_dict(self): s = Step(tool="text_clean") assert s.options == {} assert s.enabled is True def test_display_name_falls_back_to_tool(self): assert Step(tool="dedup").display_name() == "dedup" assert Step(tool="dedup", name="Final dedup").display_name() == "Final dedup" class TestPipelineSerialization: def test_roundtrip_dict(self): p = Pipeline(steps=[ Step("text_clean", {"trim": True}), Step("dedup", {"survivor_rule": "first"}), ]) out = p.to_dict() loaded = Pipeline.from_dict(out) assert len(loaded.steps) == 2 assert loaded.steps[0].tool == "text_clean" assert loaded.steps[1].options["survivor_rule"] == "first" def test_roundtrip_file(self, tmp_path): p = Pipeline(steps=[Step("text_clean")]) path = tmp_path / "p.json" p.to_file(path) loaded = Pipeline.from_file(path) assert loaded.steps[0].tool == "text_clean" def test_from_dict_missing_steps_key(self): with pytest.raises(ConfigError): Pipeline.from_dict({}) def test_from_dict_missing_tool(self): with pytest.raises(ConfigError): Pipeline.from_dict({"steps": [{"options": {}}]}) # --------------------------------------------------------------------------- # recommended_pipeline # --------------------------------------------------------------------------- class TestRecommendedPipeline: def test_default_order(self): p = recommended_pipeline() assert [s.tool for s in p.steps] == [ "text_clean", "format_standardize", "missing", "dedup", ] def test_default_passes_validation(self): p = recommended_pipeline() assert validate_pipeline(p) == [] def test_include_overrides_default(self): p = recommended_pipeline(include=["text_clean", "missing"]) assert [s.tool for s in p.steps] == ["text_clean", "missing"] def test_options_seed_reaches_step(self): p = recommended_pipeline(options={"text_clean": {"trim": False}}) assert p.steps[0].options == {"trim": False} def test_unknown_tool_raises(self): with pytest.raises(InputValidationError): recommended_pipeline(include=["bogus"]) def test_can_place_column_map_first_or_last(self): # Both placements must be acceptable per the docstring. first = recommended_pipeline(include=[ "column_map", "text_clean", "format_standardize", "missing", "dedup", ]) last = recommended_pipeline(include=[ "text_clean", "format_standardize", "missing", "column_map", "dedup", ]) # No soft-dependency rule names column_map, so neither warns. assert validate_pipeline(first) == [] assert validate_pipeline(last) == [] # --------------------------------------------------------------------------- # validate_pipeline — soft dependencies # --------------------------------------------------------------------------- class TestValidatePipeline: def test_in_order_no_warnings(self): p = recommended_pipeline() assert validate_pipeline(p) == [] def test_dedup_before_text_clean_warns(self): p = Pipeline(steps=[Step("dedup"), Step("text_clean")]) ws = validate_pipeline(p) assert len(ws) == 1 assert "dedup" in ws[0] and "text_clean" in ws[0] def test_format_before_text_clean_warns(self): p = Pipeline(steps=[Step("format_standardize"), Step("text_clean")]) ws = validate_pipeline(p) assert any("format_standardize" in w for w in ws) def test_disabled_steps_ignored(self): # Disabled dedup-first should not trigger a warning. p = Pipeline(steps=[ Step("dedup", enabled=False), Step("text_clean"), ]) assert validate_pipeline(p) == [] def test_duplicate_tool_does_not_double_warn(self): # text_clean twice (legitimate: two-pass cleaning) shouldn't # generate redundant warnings. p = Pipeline(steps=[ Step("text_clean"), Step("text_clean"), ]) assert validate_pipeline(p) == [] # --------------------------------------------------------------------------- # run_pipeline — execution # --------------------------------------------------------------------------- @pytest.fixture def messy_df(): return pd.DataFrame({ "name": [" Alice ", "BOB", "N/A", "", "charlie "], "phone": ["(415) 555-1234", "+44 20 7946 0958", "03-3210-7000", "", "(415) 555-1234"], "country": ["US", "GB", "JP", "", "US"], }) class TestRunPipeline: def test_recommended_pipeline_runs_end_to_end(self, messy_df): p = recommended_pipeline(options={ "format_standardize": { "column_types": {"phone": "phone"}, "phone_country_column": "country", }, "missing": {"strategy": "none"}, }) res = run_pipeline(messy_df, p) assert isinstance(res, PipelineResult) assert res.initial_rows == 5 # Dedup at the end removes the Alice/charlie duplicate (same phone). assert res.final_rows < res.initial_rows assert res.warnings == [] def test_initial_df_not_mutated(self, messy_df): snapshot = messy_df.copy(deep=True) run_pipeline(messy_df, recommended_pipeline()) pd.testing.assert_frame_equal(messy_df, snapshot) def test_disabled_step_skipped(self, messy_df): p = Pipeline(steps=[ Step("text_clean", enabled=False), Step("missing", options={"strategy": "none"}), ]) res = run_pipeline(messy_df, p) assert res.step_results[0].skipped is True assert res.step_results[1].skipped is False def test_step_results_ordered_and_timed(self, messy_df): p = recommended_pipeline(options={ "missing": {"strategy": "none"}, }) res = run_pipeline(messy_df, p) assert len(res.step_results) == 4 for sr in res.step_results: assert sr.elapsed_seconds >= 0 assert [sr.step.tool for sr in res.step_results] == [ "text_clean", "format_standardize", "missing", "dedup", ] def test_warnings_returned_but_run_proceeds(self, messy_df): p = Pipeline(steps=[ Step("dedup"), Step("text_clean"), ]) res = run_pipeline(messy_df, p) assert res.warnings # warnings present # Both steps still ran. assert all(not sr.skipped for sr in res.step_results) def test_progress_callback_fires_per_step(self, messy_df): seen: list[StepResult] = [] p = Pipeline(steps=[ Step("text_clean"), Step("missing", options={"strategy": "none"}), ]) run_pipeline(messy_df, p, on_step_complete=seen.append) assert len(seen) == 2 assert all(isinstance(s, StepResult) for s in seen) def test_progress_callback_exception_does_not_abort(self, messy_df): def bad(_sr): raise RuntimeError("boom") p = Pipeline(steps=[Step("text_clean")]) # Must not raise. res = run_pipeline(messy_df, p, on_step_complete=bad) assert res.final_rows == 5 def test_stop_on_error_default(self, messy_df): # Force an error by giving format_standardize a non-existent column. p = Pipeline(steps=[ Step("format_standardize", options={ "column_types": {"does_not_exist": "phone"}, }), ]) with pytest.raises(InputValidationError): run_pipeline(messy_df, p) def test_continue_on_error_carries_previous_df(self, messy_df): p = Pipeline(steps=[ Step("text_clean"), Step("format_standardize", options={ "column_types": {"does_not_exist": "phone"}, }), Step("missing", options={"strategy": "none"}), ]) res = run_pipeline(messy_df, p, stop_on_error=False) # Step 2 errored, step 3 still ran. assert res.step_results[1].error is not None assert res.step_results[2].error is None assert res.final_rows == 5 def test_non_dataframe_input(self): with pytest.raises(InputValidationError): run_pipeline([1, 2, 3], recommended_pipeline()) # type: ignore[arg-type] # --------------------------------------------------------------------------- # Per-tool adapter sanity # --------------------------------------------------------------------------- class TestAdapters: @pytest.mark.parametrize("tool", TOOL_NAMES) def test_adapter_with_default_options_runs(self, tool, messy_df): # Each adapter must accept an empty options dict and return a # (df, summary) pair. out_df, summary = TOOL_ADAPTERS[tool](messy_df, {}) assert isinstance(out_df, pd.DataFrame) assert isinstance(summary, dict) def test_format_standardize_adapter_passes_column_types(self, messy_df): out, summary = TOOL_ADAPTERS["format_standardize"]( messy_df, {"column_types": {"phone": "phone"}}, ) assert summary["columns_processed"] == ["phone"] def test_dedup_adapter_with_unknown_survivor_rule_raises(self, messy_df): with pytest.raises(ConfigError): TOOL_ADAPTERS["dedup"](messy_df, {"survivor_rule": "bogus"}) # --------------------------------------------------------------------------- # SOFT_DEPENDENCIES integrity # --------------------------------------------------------------------------- class TestSoftDependencies: def test_every_pair_uses_known_tools(self): for earlier, later, _ in SOFT_DEPENDENCIES: assert earlier in TOOL_NAMES assert later in TOOL_NAMES def test_all_reasons_non_empty(self): for _, _, why in SOFT_DEPENDENCIES: assert why and isinstance(why, str) # Reason should be a sentence — at least 20 chars. assert len(why) > 20 def test_dependencies_form_a_dag(self): # No cycles — there must exist a topological ordering of the # tools such that every soft dependency (earlier, later) # is satisfied. With 5 tools and 6 deps this is easy to verify. from collections import defaultdict, deque edges: dict[str, list[str]] = defaultdict(list) in_degree: dict[str, int] = {t: 0 for t in TOOL_NAMES} for e, l, _ in SOFT_DEPENDENCIES: edges[e].append(l) in_degree[l] += 1 queue = deque(t for t, d in in_degree.items() if d == 0) order = [] while queue: t = queue.popleft() order.append(t) for nxt in edges[t]: in_degree[nxt] -= 1 if in_degree[nxt] == 0: queue.append(nxt) assert len(order) == len(TOOL_NAMES), ( f"SOFT_DEPENDENCIES contain a cycle; topo order={order}" )