"""Tests for src/core/pipeline.py.""" from __future__ import annotations import json import numpy as np import pandas as pd import pytest from src.core.errors import ConfigError, InputValidationError from src.core.pipeline import ( Pipeline, PipelineResult, SOFT_DEPENDENCIES, Step, StepResult, TOOL_ADAPTERS, TOOL_NAMES, recommended_pipeline, run_pipeline, validate_pipeline, ) # --------------------------------------------------------------------------- # Step / Pipeline construction # --------------------------------------------------------------------------- class TestStep: def test_unknown_tool_raises(self): with pytest.raises(ConfigError): Step(tool="bogus_tool") def test_default_options_empty_dict(self): s = Step(tool="text_clean") assert s.options == {} assert s.enabled is True def test_display_name_falls_back_to_tool(self): assert Step(tool="dedup").display_name() == "dedup" assert Step(tool="dedup", name="Final dedup").display_name() == "Final dedup" class TestPipelineSerialization: def test_roundtrip_dict(self): p = Pipeline(steps=[ Step("text_clean", {"trim": True}), Step("dedup", {"survivor_rule": "first"}), ]) out = p.to_dict() loaded = Pipeline.from_dict(out) assert len(loaded.steps) == 2 assert loaded.steps[0].tool == "text_clean" assert loaded.steps[1].options["survivor_rule"] == "first" def test_roundtrip_file(self, tmp_path): p = Pipeline(steps=[Step("text_clean")]) path = tmp_path / "p.json" p.to_file(path) loaded = Pipeline.from_file(path) assert loaded.steps[0].tool == "text_clean" def test_from_dict_missing_steps_key(self): with pytest.raises(ConfigError): Pipeline.from_dict({}) def test_from_dict_missing_tool(self): with pytest.raises(ConfigError): Pipeline.from_dict({"steps": [{"options": {}}]}) # --------------------------------------------------------------------------- # recommended_pipeline # --------------------------------------------------------------------------- class TestRecommendedPipeline: def test_default_order(self): p = recommended_pipeline() assert [s.tool for s in p.steps] == [ "text_clean", "format_standardize", "missing", "dedup", ] def test_default_passes_validation(self): p = recommended_pipeline() assert validate_pipeline(p) == [] def test_include_overrides_default(self): p = recommended_pipeline(include=["text_clean", "missing"]) assert [s.tool for s in p.steps] == ["text_clean", "missing"] def test_options_seed_reaches_step(self): p = recommended_pipeline(options={"text_clean": {"trim": False}}) assert p.steps[0].options == {"trim": False} def test_unknown_tool_raises(self): with pytest.raises(InputValidationError): recommended_pipeline(include=["bogus"]) def test_can_place_column_map_first_or_last(self): # Both placements must be acceptable per the docstring. first = recommended_pipeline(include=[ "column_map", "text_clean", "format_standardize", "missing", "dedup", ]) last = recommended_pipeline(include=[ "text_clean", "format_standardize", "missing", "column_map", "dedup", ]) # No soft-dependency rule names column_map, so neither warns. assert validate_pipeline(first) == [] assert validate_pipeline(last) == [] # --------------------------------------------------------------------------- # validate_pipeline — soft dependencies # --------------------------------------------------------------------------- class TestValidatePipeline: def test_in_order_no_warnings(self): p = recommended_pipeline() assert validate_pipeline(p) == [] def test_dedup_before_text_clean_warns(self): p = Pipeline(steps=[Step("dedup"), Step("text_clean")]) ws = validate_pipeline(p) assert len(ws) == 1 assert "dedup" in ws[0] and "text_clean" in ws[0] def test_format_before_text_clean_warns(self): p = Pipeline(steps=[Step("format_standardize"), Step("text_clean")]) ws = validate_pipeline(p) assert any("format_standardize" in w for w in ws) def test_disabled_steps_ignored(self): # Disabled dedup-first should not trigger a warning. p = Pipeline(steps=[ Step("dedup", enabled=False), Step("text_clean"), ]) assert validate_pipeline(p) == [] def test_duplicate_tool_does_not_double_warn(self): # text_clean twice (legitimate: two-pass cleaning) shouldn't # generate redundant warnings. p = Pipeline(steps=[ Step("text_clean"), Step("text_clean"), ]) assert validate_pipeline(p) == [] # --------------------------------------------------------------------------- # run_pipeline — execution # --------------------------------------------------------------------------- @pytest.fixture def messy_df(): return pd.DataFrame({ "name": [" Alice ", "BOB", "N/A", "", "charlie "], "phone": ["(415) 555-1234", "+44 20 7946 0958", "03-3210-7000", "", "(415) 555-1234"], "country": ["US", "GB", "JP", "", "US"], }) class TestRunPipeline: def test_recommended_pipeline_runs_end_to_end(self, messy_df): p = recommended_pipeline(options={ "format_standardize": { "column_types": {"phone": "phone"}, "phone_country_column": "country", }, "missing": {"strategy": "none"}, }) res = run_pipeline(messy_df, p) assert isinstance(res, PipelineResult) assert res.initial_rows == 5 # Dedup at the end removes the Alice/charlie duplicate (same phone). assert res.final_rows < res.initial_rows assert res.warnings == [] def test_initial_df_not_mutated(self, messy_df): snapshot = messy_df.copy(deep=True) run_pipeline(messy_df, recommended_pipeline()) pd.testing.assert_frame_equal(messy_df, snapshot) def test_disabled_step_skipped(self, messy_df): p = Pipeline(steps=[ Step("text_clean", enabled=False), Step("missing", options={"strategy": "none"}), ]) res = run_pipeline(messy_df, p) assert res.step_results[0].skipped is True assert res.step_results[1].skipped is False def test_step_results_ordered_and_timed(self, messy_df): p = recommended_pipeline(options={ "missing": {"strategy": "none"}, }) res = run_pipeline(messy_df, p) assert len(res.step_results) == 4 for sr in res.step_results: assert sr.elapsed_seconds >= 0 assert [sr.step.tool for sr in res.step_results] == [ "text_clean", "format_standardize", "missing", "dedup", ] def test_warnings_returned_but_run_proceeds(self, messy_df): p = Pipeline(steps=[ Step("dedup"), Step("text_clean"), ]) res = run_pipeline(messy_df, p) assert res.warnings # warnings present # Both steps still ran. assert all(not sr.skipped for sr in res.step_results) def test_progress_callback_fires_per_step(self, messy_df): seen: list[StepResult] = [] p = Pipeline(steps=[ Step("text_clean"), Step("missing", options={"strategy": "none"}), ]) run_pipeline(messy_df, p, on_step_complete=seen.append) assert len(seen) == 2 assert all(isinstance(s, StepResult) for s in seen) def test_progress_callback_exception_does_not_abort(self, messy_df): def bad(_sr): raise RuntimeError("boom") p = Pipeline(steps=[Step("text_clean")]) # Must not raise. res = run_pipeline(messy_df, p, on_step_complete=bad) assert res.final_rows == 5 def test_stop_on_error_default(self, messy_df): # Force an error by giving format_standardize a non-existent column. p = Pipeline(steps=[ Step("format_standardize", options={ "column_types": {"does_not_exist": "phone"}, }), ]) with pytest.raises(InputValidationError): run_pipeline(messy_df, p) def test_continue_on_error_carries_previous_df(self, messy_df): p = Pipeline(steps=[ Step("text_clean"), Step("format_standardize", options={ "column_types": {"does_not_exist": "phone"}, }), Step("missing", options={"strategy": "none"}), ]) res = run_pipeline(messy_df, p, stop_on_error=False) # Step 2 errored, step 3 still ran. assert res.step_results[1].error is not None assert res.step_results[2].error is None assert res.final_rows == 5 def test_non_dataframe_input(self): with pytest.raises(InputValidationError): run_pipeline([1, 2, 3], recommended_pipeline()) # type: ignore[arg-type] # --------------------------------------------------------------------------- # Per-tool adapter sanity # --------------------------------------------------------------------------- class TestAdapters: @pytest.mark.parametrize("tool", TOOL_NAMES) def test_adapter_with_default_options_runs(self, tool, messy_df): # Each adapter must accept an empty options dict and return a # (df, summary) pair. out_df, summary = TOOL_ADAPTERS[tool](messy_df, {}) assert isinstance(out_df, pd.DataFrame) assert isinstance(summary, dict) def test_format_standardize_adapter_passes_column_types(self, messy_df): out, summary = TOOL_ADAPTERS["format_standardize"]( messy_df, {"column_types": {"phone": "phone"}}, ) assert summary["columns_processed"] == ["phone"] def test_dedup_adapter_with_unknown_survivor_rule_raises(self, messy_df): with pytest.raises(ConfigError): TOOL_ADAPTERS["dedup"](messy_df, {"survivor_rule": "bogus"}) # --------------------------------------------------------------------------- # SOFT_DEPENDENCIES integrity # --------------------------------------------------------------------------- class TestSoftDependencies: def test_every_pair_uses_known_tools(self): for earlier, later, _ in SOFT_DEPENDENCIES: assert earlier in TOOL_NAMES assert later in TOOL_NAMES def test_all_reasons_non_empty(self): for _, _, why in SOFT_DEPENDENCIES: assert why and isinstance(why, str) # Reason should be a sentence — at least 20 chars. assert len(why) > 20 def test_dependencies_form_a_dag(self): # No cycles — there must exist a topological ordering of the # tools such that every soft dependency (earlier, later) # is satisfied. With 5 tools and 6 deps this is easy to verify. from collections import defaultdict, deque edges: dict[str, list[str]] = defaultdict(list) in_degree: dict[str, int] = {t: 0 for t in TOOL_NAMES} for e, l, _ in SOFT_DEPENDENCIES: edges[e].append(l) in_degree[l] += 1 queue = deque(t for t, d in in_degree.items() if d == 0) order = [] while queue: t = queue.popleft() order.append(t) for nxt in edges[t]: in_degree[nxt] -= 1 if in_degree[nxt] == 0: queue.append(nxt) assert len(order) == len(TOOL_NAMES), ( f"SOFT_DEPENDENCIES contain a cycle; topo order={order}" ) # --------------------------------------------------------------------------- # Per-adapter summary correctness — exact numbers on KNOWN-messy input. # Each adapter is also exercised through run_pipeline so the StepResult # carries the adapter's summary verbatim. # --------------------------------------------------------------------------- def _run_one(df, tool, options): """Run a single-step pipeline and return (StepResult, PipelineResult).""" res = run_pipeline(df, Pipeline(steps=[Step(tool, options)])) return res.step_results[0], res class TestTextCleanSummary: def test_two_trimmable_cells_counted(self): df = pd.DataFrame({ "a": [" x ", "y", " z"], # 2 cells need trimming (" x ", " z") "b": ["ok", "fine", "good"], # already clean }) out, summary = TOOL_ADAPTERS["text_clean"](df, {"trim": True}) assert summary["cells_total"] == 6 assert summary["cells_changed"] == 2 assert sorted(summary["columns_processed"]) == ["a", "b"] assert out["a"].tolist() == ["x", "y", "z"] def test_title_case_changes_all_cells(self): df = pd.DataFrame({"name": ["alice smith", "BOB JONES"]}) out, summary = TOOL_ADAPTERS["text_clean"](df, {"case": "title"}) assert summary["cells_changed"] == 2 assert out["name"].tolist() == ["Alice Smith", "Bob Jones"] def test_collapse_whitespace_counts_internal_runs(self): df = pd.DataFrame({"name": ["a b", "c d", "e f"]}) out, summary = TOOL_ADAPTERS["text_clean"]( df, {"trim": True, "collapse_whitespace": True}, ) # "a b" and "e f" collapse; "c d" is already single-spaced. assert summary["cells_changed"] == 2 assert out["name"].tolist() == ["a b", "c d", "e f"] def test_summary_visible_through_run_pipeline(self): df = pd.DataFrame({"a": [" x ", "y"]}) sr, _res = _run_one(df, "text_clean", {"trim": True}) assert sr.skipped is False assert sr.error is None assert sr.summary["cells_changed"] == 1 assert sr.summary["cells_total"] == 2 class TestFormatStandardizeSummary: def test_one_unparseable_phone(self): df = pd.DataFrame({ "phone": ["(415) 555-1234", "not a phone", "+44 20 7946 0958"], }) out, summary = TOOL_ADAPTERS["format_standardize"]( df, {"column_types": {"phone": "phone"}}, ) assert summary["cells_total"] == 3 assert summary["cells_unparseable"] == 1 assert summary["cells_changed"] == 2 assert summary["columns_processed"] == ["phone"] assert out["phone"].tolist() == [ "+14155551234", "not a phone", "+442079460958", ] def test_date_standardization_counts(self): df = pd.DataFrame({"signup_date": ["2024-01-05", "Jan 5 2024", "garbage"]}) out, summary = TOOL_ADAPTERS["format_standardize"]( df, {"column_types": {"signup_date": "date"}}, ) # "2024-01-05" already canonical; "Jan 5 2024" rewritten; "garbage" fails. assert summary["cells_unparseable"] == 1 assert summary["cells_changed"] == 1 assert out["signup_date"].tolist()[:2] == ["2024-01-05", "2024-01-05"] def test_summary_visible_through_run_pipeline(self): df = pd.DataFrame({"phone": ["(415) 555-1234", "bad"]}) sr, _ = _run_one(df, "format_standardize", {"column_types": {"phone": "phone"}}) assert sr.summary["cells_unparseable"] == 1 assert sr.summary["columns_processed"] == ["phone"] class TestMissingSummary: def test_median_fills_each_blank(self): df = pd.DataFrame({"val": [1.0, np.nan, 3.0, np.nan, 5.0]}) out, summary = TOOL_ADAPTERS["missing"](df, {"strategy": "median"}) assert summary["cells_filled"] == 2 # exactly the 2 NaNs assert summary["rows_dropped"] == 0 assert out["val"].tolist() == [1.0, 3.0, 3.0, 3.0, 5.0] # median is 3.0 def test_drop_row_by_threshold(self): # row_drop_threshold is the *fraction* of nulls needed to drop a row. df = pd.DataFrame({ "a": [1.0, np.nan, 3.0], "b": ["x", np.nan, "z"], # middle row is 100% null }) out, summary = TOOL_ADAPTERS["missing"]( df, {"strategy": "drop_row", "row_drop_threshold": 0.4}, ) assert summary["rows_dropped"] == 1 assert len(out) == 2 def test_sentinel_standardization_count(self): df = pd.DataFrame({"x": ["ok", "N/A", "fine", "N/A"]}) out, summary = TOOL_ADAPTERS["missing"](df, { "strategy": "none", "sentinels": ["N/A"], "standardize_sentinels": True, }) assert summary["sentinels_standardized"] == 2 # The two "N/A" cells became real NaN. assert out["x"].isna().sum() == 2 assert out["x"].tolist()[0] == "ok" def test_summary_visible_through_run_pipeline(self): df = pd.DataFrame({"val": [1.0, np.nan, 3.0]}) sr, _ = _run_one(df, "missing", {"strategy": "median"}) assert sr.summary["cells_filled"] == 1 class TestColumnMapSummary: def test_single_rename(self): df = pd.DataFrame({"old": [1, 2], "keep": [3, 4]}) out, summary = TOOL_ADAPTERS["column_map"]( df, {"mapping": {"old": "new"}, "unmapped": "keep"}, ) assert summary["columns_renamed"] == 1 assert summary["columns_dropped"] == [] assert list(out.columns) == ["new", "keep"] def test_unmapped_drop_reports_dropped_columns(self): df = pd.DataFrame({"old": [1, 2], "keep": [3, 4]}) out, summary = TOOL_ADAPTERS["column_map"]( df, {"mapping": {"old": "new"}, "unmapped": "drop"}, ) assert summary["columns_renamed"] == 1 assert summary["columns_dropped"] == ["keep"] assert list(out.columns) == ["new"] def test_summary_visible_through_run_pipeline(self): df = pd.DataFrame({"old": [1], "keep": [2]}) sr, _ = _run_one(df, "column_map", {"mapping": {"old": "new"}}) assert sr.summary["columns_renamed"] == 1 class TestDedupSummary: def test_exact_duplicate_rows(self): df = pd.DataFrame({ "email": ["a@x.com", "b@x.com", "a@x.com", "a@x.com"], "name": ["A", "B", "A", "A"], }) out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "first"}) assert summary["input_rows"] == 4 assert summary["output_rows"] == 2 assert summary["duplicates_removed"] == 2 assert summary["groups"] == 1 assert out["email"].tolist() == ["a@x.com", "b@x.com"] def test_explicit_exact_strategy_on_column(self): df = pd.DataFrame({ "email": ["a@x.com", "b@x.com", "a@x.com"], "name": ["A", "B", "C"], }) out, summary = TOOL_ADAPTERS["dedup"](df, { "survivor_rule": "first", "strategies": [{"columns": [ {"column": "email", "algorithm": "exact", "threshold": 100}, ]}], }) assert summary["duplicates_removed"] == 1 assert summary["groups"] == 1 def test_most_complete_keeps_fuller_survivor(self): df = pd.DataFrame({ "email": ["a@x.com", "a@x.com"], "name": ["", "Alice"], # second row is more complete "phone": ["111", "111"], }) out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "most_complete"}) assert summary["duplicates_removed"] == 1 assert out.iloc[0]["name"] == "Alice" def test_no_duplicates_is_noop(self): df = pd.DataFrame({"email": ["a@x.com", "b@x.com"], "name": ["A", "B"]}) out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "first"}) assert summary["duplicates_removed"] == 0 assert summary["output_rows"] == 2 def test_summary_visible_through_run_pipeline(self): df = pd.DataFrame({"email": ["a@x.com", "a@x.com"], "name": ["A", "A"]}) sr, res = _run_one(df, "dedup", {"survivor_rule": "first"}) assert sr.summary["duplicates_removed"] == 1 assert res.final_rows == 1 # --------------------------------------------------------------------------- # Data flow — a later step depends on an earlier step's output # --------------------------------------------------------------------------- class TestDataFlow: def test_text_clean_enables_dedup_match(self): # The two phones differ only by surrounding whitespace; without # the trim they are distinct, so dedup alone would keep both. df = pd.DataFrame({"phone": [" +14155551234 ", "+14155551234"]}) p = Pipeline(steps=[ Step("text_clean", {"trim": True}), Step("dedup", {"survivor_rule": "first"}), ]) res = run_pipeline(df, p) assert res.initial_rows == 2 assert res.final_rows == 1 assert res.final_df["phone"].tolist() == ["+14155551234"] def test_dedup_default_matching_normalizes_whitespace(self): # Note: dedup's exact matcher already normalizes surrounding # whitespace, so the two phones collapse even WITHOUT a prior # text_clean. The survivor still carries the un-trimmed value. df = pd.DataFrame({"phone": [" +14155551234 ", "+14155551234"]}) res = run_pipeline(df, Pipeline(steps=[Step("dedup", {"survivor_rule": "first"})])) assert res.final_rows == 1 # Survivor keeps the raw (still-padded) text — dedup does not clean. assert res.final_df["phone"].tolist() == [" +14155551234 "] def test_chained_initial_and_final_rows(self): df = pd.DataFrame({ "name": [" Al ", "al", "Bob"], "v": [1, 1, 2], }) p = Pipeline(steps=[ Step("text_clean", {"trim": True, "case": "title"}), Step("dedup", {"survivor_rule": "first"}), ]) res = run_pipeline(df, p) # " Al " and "al" both become "Al" → duplicate rows collapse. assert res.initial_rows == 3 assert res.final_rows == 2 assert "Al" in res.final_df["name"].tolist() # --------------------------------------------------------------------------- # Error handling — stop_on_error semantics # --------------------------------------------------------------------------- class TestErrorHandling: def test_most_recent_without_date_raises_by_default(self, messy_df): # dedup with survivor_rule="most_recent" but no date_column errors. p = Pipeline(steps=[Step("dedup", {"survivor_rule": "most_recent"})]) with pytest.raises(InputValidationError): run_pipeline(messy_df, p) def test_continue_on_error_sets_error_string(self): df = pd.DataFrame({"email": ["a@x.com", "a@x.com"], "name": ["A", "B"]}) p = Pipeline(steps=[ Step("text_clean", {"trim": True}), Step("dedup", {"survivor_rule": "most_recent"}), # will fail Step("missing", {"strategy": "none"}), ]) res = run_pipeline(df, p, stop_on_error=False) bad = res.step_results[1] assert bad.error is not None assert isinstance(bad.error, str) and bad.error.strip() # The failed step did NOT change the row count — previous df carried. assert res.step_results[2].error is None assert res.final_rows == 2 def test_failed_step_summary_is_empty(self): df = pd.DataFrame({"e": ["a", "a"], "n": ["x", "y"]}) p = Pipeline(steps=[Step("dedup", {"survivor_rule": "most_recent"})]) res = run_pipeline(df, p, stop_on_error=False) assert res.step_results[0].summary == {} assert res.step_results[0].skipped is False def test_config_error_on_bad_survivor_rule_propagates(self, messy_df): p = Pipeline(steps=[Step("dedup", {"survivor_rule": "nonsense"})]) with pytest.raises(ConfigError): run_pipeline(messy_df, p) # --------------------------------------------------------------------------- # Edge inputs # --------------------------------------------------------------------------- class TestEdgeInputs: def test_empty_dataframe_runs_clean(self): empty = pd.DataFrame({"name": [], "phone": []}) res = run_pipeline( empty, recommended_pipeline(options={"missing": {"strategy": "none"}}), ) assert res.initial_rows == 0 assert res.final_rows == 0 assert all(sr.error is None for sr in res.step_results if not sr.skipped) def test_single_column_dataframe(self): df = pd.DataFrame({"name": [" Al ", "al"]}) res = run_pipeline( df, Pipeline(steps=[Step("text_clean", {"trim": True, "case": "title"})]), ) assert res.final_df["name"].tolist() == ["Al", "Al"] def test_all_steps_disabled_returns_unchanged(self, messy_df): snapshot = messy_df.copy(deep=True) p = Pipeline(steps=[ Step("text_clean", enabled=False), Step("format_standardize", enabled=False), Step("missing", enabled=False), Step("dedup", enabled=False), ]) res = run_pipeline(messy_df, p) assert all(sr.skipped is True for sr in res.step_results) assert res.final_rows == res.initial_rows == 5 pd.testing.assert_frame_equal(res.final_df, snapshot) def test_empty_pipeline_is_identity(self, messy_df): res = run_pipeline(messy_df, Pipeline(steps=[])) assert res.step_results == [] assert res.final_rows == 5 pd.testing.assert_frame_equal(res.final_df, messy_df) # --------------------------------------------------------------------------- # Serialization round-trips with disabled / named / nested-option steps # --------------------------------------------------------------------------- class TestSerializationRoundtrips: def test_disabled_and_named_step_survive_dict(self): p = Pipeline(steps=[ Step("text_clean", {"trim": True}, enabled=False, name="Pre-clean"), Step("dedup", {"survivor_rule": "first"}, name="Final dedup"), ]) loaded = Pipeline.from_dict(p.to_dict()) assert loaded.steps[0].enabled is False assert loaded.steps[0].name == "Pre-clean" assert loaded.steps[0].options == {"trim": True} assert loaded.steps[1].name == "Final dedup" assert loaded.steps[1].display_name() == "Final dedup" def test_nested_options_survive_dict(self): nested = { "column_types": {"phone": "phone", "signup_date": "date"}, } strat = { "survivor_rule": "most_complete", "strategies": [{"columns": [ {"column": "email", "algorithm": "exact", "threshold": 100}, ]}], } p = Pipeline(steps=[ Step("format_standardize", nested), Step("dedup", strat), ]) loaded = Pipeline.from_dict(p.to_dict()) assert loaded.steps[0].options["column_types"] == nested["column_types"] assert loaded.steps[1].options["strategies"] == strat["strategies"] def test_nested_options_survive_file(self, tmp_path): p = Pipeline(steps=[ Step("format_standardize", {"column_types": {"phone": "phone"}}, enabled=False, name="formats"), ]) path = tmp_path / "pipe.json" p.to_file(path) loaded = Pipeline.from_file(path) assert loaded.steps[0].enabled is False assert loaded.steps[0].name == "formats" assert loaded.steps[0].options == {"column_types": {"phone": "phone"}} def test_roundtrip_is_idempotent(self): p = Pipeline(steps=[ Step("text_clean", {"trim": True}, enabled=False, name="x"), Step("missing", {"strategy": "median"}), ]) once = Pipeline.from_dict(p.to_dict()) twice = Pipeline.from_dict(once.to_dict()) assert once.to_dict() == twice.to_dict() == p.to_dict() # --------------------------------------------------------------------------- # recommended_pipeline(include=...) — subsetting, ordering, option seeding # --------------------------------------------------------------------------- class TestRecommendedInclude: def test_subset_preserves_given_order(self): p = recommended_pipeline(include=["dedup", "text_clean"]) assert [s.tool for s in p.steps] == ["dedup", "text_clean"] def test_column_map_first(self): p = recommended_pipeline(include=[ "column_map", "text_clean", "format_standardize", "missing", "dedup", ]) assert p.steps[0].tool == "column_map" assert len(p.steps) == 5 def test_column_map_last(self): p = recommended_pipeline(include=[ "text_clean", "format_standardize", "missing", "dedup", "column_map", ]) assert p.steps[-1].tool == "column_map" def test_unknown_tool_in_include_raises(self): with pytest.raises(InputValidationError): recommended_pipeline(include=["text_clean", "not_a_tool"]) def test_options_seeding_only_targets_named_tool(self): p = recommended_pipeline( include=["text_clean", "dedup"], options={"dedup": {"survivor_rule": "last"}}, ) assert p.steps[0].options == {} # text_clean unseeded assert p.steps[1].options == {"survivor_rule": "last"} def test_empty_include_yields_no_steps(self): p = recommended_pipeline(include=[]) assert p.steps == [] def test_seeded_options_are_independent_copies(self): seed = {"text_clean": {"trim": True}} p = recommended_pipeline(include=["text_clean"], options=seed) # Mutating the produced step must not leak back into the seed. p.steps[0].options["trim"] = False assert seed["text_clean"]["trim"] is True # --------------------------------------------------------------------------- # Realistic demo integration — messy customers table end-to-end # --------------------------------------------------------------------------- class TestDemoIntegration: @pytest.fixture def customers_df(self): return pd.DataFrame({ "Full Name": [" alice smith ", "BOB JONES", "alice smith", ""], "Email": ["alice@x.com ", "bob@x.com", "alice@x.com", "carol@x.com"], "Phone": [" +14155551234 ", "+442079460958", "+14155551234", "+13035551111"], }) def test_full_recommended_plus_column_map(self, customers_df): p = recommended_pipeline( include=["text_clean", "format_standardize", "missing", "dedup", "column_map"], options={ "text_clean": {"trim": True, "collapse_whitespace": True}, "missing": {"strategy": "none"}, "dedup": { "survivor_rule": "most_complete", "strategies": [{"columns": [ {"column": "Phone", "algorithm": "exact", "threshold": 100}, ]}], }, "column_map": { "mapping": {"Full Name": "name", "Email": "email", "Phone": "phone"}, "unmapped": "keep", }, }, ) res = run_pipeline(customers_df, p) # Two rows share the same phone after trimming → one duplicate removed. assert res.initial_rows == 4 assert res.final_rows == 3 assert res.final_rows < res.initial_rows # Headers were renamed by the trailing column_map step. assert list(res.final_df.columns) == ["name", "email", "phone"] # The surviving Alice row kept its (trimmed) phone. phones = res.final_df["phone"].tolist() assert "+14155551234" in phones assert phones.count("+14155551234") == 1 # only one Alice survives # Every executed step succeeded. assert all(sr.error is None for sr in res.step_results if not sr.skipped) # column_map reported the three renames. cm = res.step_results[-1] assert cm.step.tool == "column_map" assert cm.summary["columns_renamed"] == 3 def test_demo_dedup_step_reports_one_duplicate(self, customers_df): p = recommended_pipeline(options={ "text_clean": {"trim": True}, "missing": {"strategy": "none"}, "dedup": { "survivor_rule": "most_complete", "strategies": [{"columns": [ {"column": "Phone", "algorithm": "exact", "threshold": 100}, ]}], }, }) res = run_pipeline(customers_df, p) dedup_sr = next(s for s in res.step_results if s.step.tool == "dedup") assert dedup_sr.summary["duplicates_removed"] == 1 assert dedup_sr.summary["groups"] == 1