Files
datatools-dev/tests/test_pipeline.py
Michael 38616d69e2 test(pipeline): complete automated test suite for the pipeline feature
Adds ~115 tests pinning the Automated Workflows feature end to end:

- tests/test_pipeline.py (+43): per-adapter summary correctness on known
  inputs, multi-step data flow, error stop/continue contract, empty /
  single-column / all-disabled edges, dict+file serialization round-trips,
  recommended_pipeline(include=…), and a synthesized demo integration run.
- tests/test_cli_pipeline.py (new, 21): --recommend, dry-run-by-default,
  --apply output CSV + audit JSON, --steps, --strict abort, arg validation,
  --continue-on-error vs halt, and a save→load round-trip. Invokes the Typer
  app directly to bypass the license guard (house pattern).
- tests/gui/test_pipeline_builder.py (+9): reorder ▲/▼, disabled edge
  buttons, disabled-step persistence across reorder, restore-recommended,
  Advanced JSON export/import, and per-tool Configure panels emitting the
  correct option dicts (AppTest).
- tests/gui/test_pipeline_phrasing.py (new, 30): step_phrase/step_status and
  the adapter-key→friendly-name bridge as pure functions, incl. pluralization,
  column prose, and warn/error status derivation.

Full suite: 2565 passed, 91 skipped. No product bugs surfaced. Documents the
coverage in docs/DEVELOPER.md (test tree + a pipeline-coverage note).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-22 18:31:15 +00:00

821 lines
32 KiB
Python

"""Tests for src/core/pipeline.py."""
from __future__ import annotations
import json
import numpy as np
import pandas as pd
import pytest
from src.core.errors import ConfigError, InputValidationError
from src.core.pipeline import (
Pipeline,
PipelineResult,
SOFT_DEPENDENCIES,
Step,
StepResult,
TOOL_ADAPTERS,
TOOL_NAMES,
recommended_pipeline,
run_pipeline,
validate_pipeline,
)
# ---------------------------------------------------------------------------
# Step / Pipeline construction
# ---------------------------------------------------------------------------
class TestStep:
def test_unknown_tool_raises(self):
with pytest.raises(ConfigError):
Step(tool="bogus_tool")
def test_default_options_empty_dict(self):
s = Step(tool="text_clean")
assert s.options == {}
assert s.enabled is True
def test_display_name_falls_back_to_tool(self):
assert Step(tool="dedup").display_name() == "dedup"
assert Step(tool="dedup", name="Final dedup").display_name() == "Final dedup"
class TestPipelineSerialization:
def test_roundtrip_dict(self):
p = Pipeline(steps=[
Step("text_clean", {"trim": True}),
Step("dedup", {"survivor_rule": "first"}),
])
out = p.to_dict()
loaded = Pipeline.from_dict(out)
assert len(loaded.steps) == 2
assert loaded.steps[0].tool == "text_clean"
assert loaded.steps[1].options["survivor_rule"] == "first"
def test_roundtrip_file(self, tmp_path):
p = Pipeline(steps=[Step("text_clean")])
path = tmp_path / "p.json"
p.to_file(path)
loaded = Pipeline.from_file(path)
assert loaded.steps[0].tool == "text_clean"
def test_from_dict_missing_steps_key(self):
with pytest.raises(ConfigError):
Pipeline.from_dict({})
def test_from_dict_missing_tool(self):
with pytest.raises(ConfigError):
Pipeline.from_dict({"steps": [{"options": {}}]})
# ---------------------------------------------------------------------------
# recommended_pipeline
# ---------------------------------------------------------------------------
class TestRecommendedPipeline:
def test_default_order(self):
p = recommended_pipeline()
assert [s.tool for s in p.steps] == [
"text_clean", "format_standardize", "missing", "dedup",
]
def test_default_passes_validation(self):
p = recommended_pipeline()
assert validate_pipeline(p) == []
def test_include_overrides_default(self):
p = recommended_pipeline(include=["text_clean", "missing"])
assert [s.tool for s in p.steps] == ["text_clean", "missing"]
def test_options_seed_reaches_step(self):
p = recommended_pipeline(options={"text_clean": {"trim": False}})
assert p.steps[0].options == {"trim": False}
def test_unknown_tool_raises(self):
with pytest.raises(InputValidationError):
recommended_pipeline(include=["bogus"])
def test_can_place_column_map_first_or_last(self):
# Both placements must be acceptable per the docstring.
first = recommended_pipeline(include=[
"column_map", "text_clean", "format_standardize", "missing", "dedup",
])
last = recommended_pipeline(include=[
"text_clean", "format_standardize", "missing", "column_map", "dedup",
])
# No soft-dependency rule names column_map, so neither warns.
assert validate_pipeline(first) == []
assert validate_pipeline(last) == []
# ---------------------------------------------------------------------------
# validate_pipeline — soft dependencies
# ---------------------------------------------------------------------------
class TestValidatePipeline:
def test_in_order_no_warnings(self):
p = recommended_pipeline()
assert validate_pipeline(p) == []
def test_dedup_before_text_clean_warns(self):
p = Pipeline(steps=[Step("dedup"), Step("text_clean")])
ws = validate_pipeline(p)
assert len(ws) == 1
assert "dedup" in ws[0] and "text_clean" in ws[0]
def test_format_before_text_clean_warns(self):
p = Pipeline(steps=[Step("format_standardize"), Step("text_clean")])
ws = validate_pipeline(p)
assert any("format_standardize" in w for w in ws)
def test_disabled_steps_ignored(self):
# Disabled dedup-first should not trigger a warning.
p = Pipeline(steps=[
Step("dedup", enabled=False),
Step("text_clean"),
])
assert validate_pipeline(p) == []
def test_duplicate_tool_does_not_double_warn(self):
# text_clean twice (legitimate: two-pass cleaning) shouldn't
# generate redundant warnings.
p = Pipeline(steps=[
Step("text_clean"),
Step("text_clean"),
])
assert validate_pipeline(p) == []
# ---------------------------------------------------------------------------
# run_pipeline — execution
# ---------------------------------------------------------------------------
@pytest.fixture
def messy_df():
return pd.DataFrame({
"name": [" Alice ", "BOB", "N/A", "", "charlie "],
"phone": ["(415) 555-1234", "+44 20 7946 0958", "03-3210-7000", "", "(415) 555-1234"],
"country": ["US", "GB", "JP", "", "US"],
})
class TestRunPipeline:
def test_recommended_pipeline_runs_end_to_end(self, messy_df):
p = recommended_pipeline(options={
"format_standardize": {
"column_types": {"phone": "phone"},
"phone_country_column": "country",
},
"missing": {"strategy": "none"},
})
res = run_pipeline(messy_df, p)
assert isinstance(res, PipelineResult)
assert res.initial_rows == 5
# Dedup at the end removes the Alice/charlie duplicate (same phone).
assert res.final_rows < res.initial_rows
assert res.warnings == []
def test_initial_df_not_mutated(self, messy_df):
snapshot = messy_df.copy(deep=True)
run_pipeline(messy_df, recommended_pipeline())
pd.testing.assert_frame_equal(messy_df, snapshot)
def test_disabled_step_skipped(self, messy_df):
p = Pipeline(steps=[
Step("text_clean", enabled=False),
Step("missing", options={"strategy": "none"}),
])
res = run_pipeline(messy_df, p)
assert res.step_results[0].skipped is True
assert res.step_results[1].skipped is False
def test_step_results_ordered_and_timed(self, messy_df):
p = recommended_pipeline(options={
"missing": {"strategy": "none"},
})
res = run_pipeline(messy_df, p)
assert len(res.step_results) == 4
for sr in res.step_results:
assert sr.elapsed_seconds >= 0
assert [sr.step.tool for sr in res.step_results] == [
"text_clean", "format_standardize", "missing", "dedup",
]
def test_warnings_returned_but_run_proceeds(self, messy_df):
p = Pipeline(steps=[
Step("dedup"),
Step("text_clean"),
])
res = run_pipeline(messy_df, p)
assert res.warnings # warnings present
# Both steps still ran.
assert all(not sr.skipped for sr in res.step_results)
def test_progress_callback_fires_per_step(self, messy_df):
seen: list[StepResult] = []
p = Pipeline(steps=[
Step("text_clean"),
Step("missing", options={"strategy": "none"}),
])
run_pipeline(messy_df, p, on_step_complete=seen.append)
assert len(seen) == 2
assert all(isinstance(s, StepResult) for s in seen)
def test_progress_callback_exception_does_not_abort(self, messy_df):
def bad(_sr):
raise RuntimeError("boom")
p = Pipeline(steps=[Step("text_clean")])
# Must not raise.
res = run_pipeline(messy_df, p, on_step_complete=bad)
assert res.final_rows == 5
def test_stop_on_error_default(self, messy_df):
# Force an error by giving format_standardize a non-existent column.
p = Pipeline(steps=[
Step("format_standardize", options={
"column_types": {"does_not_exist": "phone"},
}),
])
with pytest.raises(InputValidationError):
run_pipeline(messy_df, p)
def test_continue_on_error_carries_previous_df(self, messy_df):
p = Pipeline(steps=[
Step("text_clean"),
Step("format_standardize", options={
"column_types": {"does_not_exist": "phone"},
}),
Step("missing", options={"strategy": "none"}),
])
res = run_pipeline(messy_df, p, stop_on_error=False)
# Step 2 errored, step 3 still ran.
assert res.step_results[1].error is not None
assert res.step_results[2].error is None
assert res.final_rows == 5
def test_non_dataframe_input(self):
with pytest.raises(InputValidationError):
run_pipeline([1, 2, 3], recommended_pipeline()) # type: ignore[arg-type]
# ---------------------------------------------------------------------------
# Per-tool adapter sanity
# ---------------------------------------------------------------------------
class TestAdapters:
@pytest.mark.parametrize("tool", TOOL_NAMES)
def test_adapter_with_default_options_runs(self, tool, messy_df):
# Each adapter must accept an empty options dict and return a
# (df, summary) pair.
out_df, summary = TOOL_ADAPTERS[tool](messy_df, {})
assert isinstance(out_df, pd.DataFrame)
assert isinstance(summary, dict)
def test_format_standardize_adapter_passes_column_types(self, messy_df):
out, summary = TOOL_ADAPTERS["format_standardize"](
messy_df, {"column_types": {"phone": "phone"}},
)
assert summary["columns_processed"] == ["phone"]
def test_dedup_adapter_with_unknown_survivor_rule_raises(self, messy_df):
with pytest.raises(ConfigError):
TOOL_ADAPTERS["dedup"](messy_df, {"survivor_rule": "bogus"})
# ---------------------------------------------------------------------------
# SOFT_DEPENDENCIES integrity
# ---------------------------------------------------------------------------
class TestSoftDependencies:
def test_every_pair_uses_known_tools(self):
for earlier, later, _ in SOFT_DEPENDENCIES:
assert earlier in TOOL_NAMES
assert later in TOOL_NAMES
def test_all_reasons_non_empty(self):
for _, _, why in SOFT_DEPENDENCIES:
assert why and isinstance(why, str)
# Reason should be a sentence — at least 20 chars.
assert len(why) > 20
def test_dependencies_form_a_dag(self):
# No cycles — there must exist a topological ordering of the
# tools such that every soft dependency (earlier, later)
# is satisfied. With 5 tools and 6 deps this is easy to verify.
from collections import defaultdict, deque
edges: dict[str, list[str]] = defaultdict(list)
in_degree: dict[str, int] = {t: 0 for t in TOOL_NAMES}
for e, l, _ in SOFT_DEPENDENCIES:
edges[e].append(l)
in_degree[l] += 1
queue = deque(t for t, d in in_degree.items() if d == 0)
order = []
while queue:
t = queue.popleft()
order.append(t)
for nxt in edges[t]:
in_degree[nxt] -= 1
if in_degree[nxt] == 0:
queue.append(nxt)
assert len(order) == len(TOOL_NAMES), (
f"SOFT_DEPENDENCIES contain a cycle; topo order={order}"
)
# ---------------------------------------------------------------------------
# Per-adapter summary correctness — exact numbers on KNOWN-messy input.
# Each adapter is also exercised through run_pipeline so the StepResult
# carries the adapter's summary verbatim.
# ---------------------------------------------------------------------------
def _run_one(df, tool, options):
"""Run a single-step pipeline and return (StepResult, PipelineResult)."""
res = run_pipeline(df, Pipeline(steps=[Step(tool, options)]))
return res.step_results[0], res
class TestTextCleanSummary:
def test_two_trimmable_cells_counted(self):
df = pd.DataFrame({
"a": [" x ", "y", " z"], # 2 cells need trimming (" x ", " z")
"b": ["ok", "fine", "good"], # already clean
})
out, summary = TOOL_ADAPTERS["text_clean"](df, {"trim": True})
assert summary["cells_total"] == 6
assert summary["cells_changed"] == 2
assert sorted(summary["columns_processed"]) == ["a", "b"]
assert out["a"].tolist() == ["x", "y", "z"]
def test_title_case_changes_all_cells(self):
df = pd.DataFrame({"name": ["alice smith", "BOB JONES"]})
out, summary = TOOL_ADAPTERS["text_clean"](df, {"case": "title"})
assert summary["cells_changed"] == 2
assert out["name"].tolist() == ["Alice Smith", "Bob Jones"]
def test_collapse_whitespace_counts_internal_runs(self):
df = pd.DataFrame({"name": ["a b", "c d", "e f"]})
out, summary = TOOL_ADAPTERS["text_clean"](
df, {"trim": True, "collapse_whitespace": True},
)
# "a b" and "e f" collapse; "c d" is already single-spaced.
assert summary["cells_changed"] == 2
assert out["name"].tolist() == ["a b", "c d", "e f"]
def test_summary_visible_through_run_pipeline(self):
df = pd.DataFrame({"a": [" x ", "y"]})
sr, _res = _run_one(df, "text_clean", {"trim": True})
assert sr.skipped is False
assert sr.error is None
assert sr.summary["cells_changed"] == 1
assert sr.summary["cells_total"] == 2
class TestFormatStandardizeSummary:
def test_one_unparseable_phone(self):
df = pd.DataFrame({
"phone": ["(415) 555-1234", "not a phone", "+44 20 7946 0958"],
})
out, summary = TOOL_ADAPTERS["format_standardize"](
df, {"column_types": {"phone": "phone"}},
)
assert summary["cells_total"] == 3
assert summary["cells_unparseable"] == 1
assert summary["cells_changed"] == 2
assert summary["columns_processed"] == ["phone"]
assert out["phone"].tolist() == [
"+14155551234", "not a phone", "+442079460958",
]
def test_date_standardization_counts(self):
df = pd.DataFrame({"signup_date": ["2024-01-05", "Jan 5 2024", "garbage"]})
out, summary = TOOL_ADAPTERS["format_standardize"](
df, {"column_types": {"signup_date": "date"}},
)
# "2024-01-05" already canonical; "Jan 5 2024" rewritten; "garbage" fails.
assert summary["cells_unparseable"] == 1
assert summary["cells_changed"] == 1
assert out["signup_date"].tolist()[:2] == ["2024-01-05", "2024-01-05"]
def test_summary_visible_through_run_pipeline(self):
df = pd.DataFrame({"phone": ["(415) 555-1234", "bad"]})
sr, _ = _run_one(df, "format_standardize", {"column_types": {"phone": "phone"}})
assert sr.summary["cells_unparseable"] == 1
assert sr.summary["columns_processed"] == ["phone"]
class TestMissingSummary:
def test_median_fills_each_blank(self):
df = pd.DataFrame({"val": [1.0, np.nan, 3.0, np.nan, 5.0]})
out, summary = TOOL_ADAPTERS["missing"](df, {"strategy": "median"})
assert summary["cells_filled"] == 2 # exactly the 2 NaNs
assert summary["rows_dropped"] == 0
assert out["val"].tolist() == [1.0, 3.0, 3.0, 3.0, 5.0] # median is 3.0
def test_drop_row_by_threshold(self):
# row_drop_threshold is the *fraction* of nulls needed to drop a row.
df = pd.DataFrame({
"a": [1.0, np.nan, 3.0],
"b": ["x", np.nan, "z"], # middle row is 100% null
})
out, summary = TOOL_ADAPTERS["missing"](
df, {"strategy": "drop_row", "row_drop_threshold": 0.4},
)
assert summary["rows_dropped"] == 1
assert len(out) == 2
def test_sentinel_standardization_count(self):
df = pd.DataFrame({"x": ["ok", "N/A", "fine", "N/A"]})
out, summary = TOOL_ADAPTERS["missing"](df, {
"strategy": "none",
"sentinels": ["N/A"],
"standardize_sentinels": True,
})
assert summary["sentinels_standardized"] == 2
# The two "N/A" cells became real NaN.
assert out["x"].isna().sum() == 2
assert out["x"].tolist()[0] == "ok"
def test_summary_visible_through_run_pipeline(self):
df = pd.DataFrame({"val": [1.0, np.nan, 3.0]})
sr, _ = _run_one(df, "missing", {"strategy": "median"})
assert sr.summary["cells_filled"] == 1
class TestColumnMapSummary:
def test_single_rename(self):
df = pd.DataFrame({"old": [1, 2], "keep": [3, 4]})
out, summary = TOOL_ADAPTERS["column_map"](
df, {"mapping": {"old": "new"}, "unmapped": "keep"},
)
assert summary["columns_renamed"] == 1
assert summary["columns_dropped"] == []
assert list(out.columns) == ["new", "keep"]
def test_unmapped_drop_reports_dropped_columns(self):
df = pd.DataFrame({"old": [1, 2], "keep": [3, 4]})
out, summary = TOOL_ADAPTERS["column_map"](
df, {"mapping": {"old": "new"}, "unmapped": "drop"},
)
assert summary["columns_renamed"] == 1
assert summary["columns_dropped"] == ["keep"]
assert list(out.columns) == ["new"]
def test_summary_visible_through_run_pipeline(self):
df = pd.DataFrame({"old": [1], "keep": [2]})
sr, _ = _run_one(df, "column_map", {"mapping": {"old": "new"}})
assert sr.summary["columns_renamed"] == 1
class TestDedupSummary:
def test_exact_duplicate_rows(self):
df = pd.DataFrame({
"email": ["a@x.com", "b@x.com", "a@x.com", "a@x.com"],
"name": ["A", "B", "A", "A"],
})
out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "first"})
assert summary["input_rows"] == 4
assert summary["output_rows"] == 2
assert summary["duplicates_removed"] == 2
assert summary["groups"] == 1
assert out["email"].tolist() == ["a@x.com", "b@x.com"]
def test_explicit_exact_strategy_on_column(self):
df = pd.DataFrame({
"email": ["a@x.com", "b@x.com", "a@x.com"],
"name": ["A", "B", "C"],
})
out, summary = TOOL_ADAPTERS["dedup"](df, {
"survivor_rule": "first",
"strategies": [{"columns": [
{"column": "email", "algorithm": "exact", "threshold": 100},
]}],
})
assert summary["duplicates_removed"] == 1
assert summary["groups"] == 1
def test_most_complete_keeps_fuller_survivor(self):
df = pd.DataFrame({
"email": ["a@x.com", "a@x.com"],
"name": ["", "Alice"], # second row is more complete
"phone": ["111", "111"],
})
out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "most_complete"})
assert summary["duplicates_removed"] == 1
assert out.iloc[0]["name"] == "Alice"
def test_no_duplicates_is_noop(self):
df = pd.DataFrame({"email": ["a@x.com", "b@x.com"], "name": ["A", "B"]})
out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "first"})
assert summary["duplicates_removed"] == 0
assert summary["output_rows"] == 2
def test_summary_visible_through_run_pipeline(self):
df = pd.DataFrame({"email": ["a@x.com", "a@x.com"], "name": ["A", "A"]})
sr, res = _run_one(df, "dedup", {"survivor_rule": "first"})
assert sr.summary["duplicates_removed"] == 1
assert res.final_rows == 1
# ---------------------------------------------------------------------------
# Data flow — a later step depends on an earlier step's output
# ---------------------------------------------------------------------------
class TestDataFlow:
def test_text_clean_enables_dedup_match(self):
# The two phones differ only by surrounding whitespace; without
# the trim they are distinct, so dedup alone would keep both.
df = pd.DataFrame({"phone": [" +14155551234 ", "+14155551234"]})
p = Pipeline(steps=[
Step("text_clean", {"trim": True}),
Step("dedup", {"survivor_rule": "first"}),
])
res = run_pipeline(df, p)
assert res.initial_rows == 2
assert res.final_rows == 1
assert res.final_df["phone"].tolist() == ["+14155551234"]
def test_dedup_default_matching_normalizes_whitespace(self):
# Note: dedup's exact matcher already normalizes surrounding
# whitespace, so the two phones collapse even WITHOUT a prior
# text_clean. The survivor still carries the un-trimmed value.
df = pd.DataFrame({"phone": [" +14155551234 ", "+14155551234"]})
res = run_pipeline(df, Pipeline(steps=[Step("dedup", {"survivor_rule": "first"})]))
assert res.final_rows == 1
# Survivor keeps the raw (still-padded) text — dedup does not clean.
assert res.final_df["phone"].tolist() == [" +14155551234 "]
def test_chained_initial_and_final_rows(self):
df = pd.DataFrame({
"name": [" Al ", "al", "Bob"],
"v": [1, 1, 2],
})
p = Pipeline(steps=[
Step("text_clean", {"trim": True, "case": "title"}),
Step("dedup", {"survivor_rule": "first"}),
])
res = run_pipeline(df, p)
# " Al " and "al" both become "Al" → duplicate rows collapse.
assert res.initial_rows == 3
assert res.final_rows == 2
assert "Al" in res.final_df["name"].tolist()
# ---------------------------------------------------------------------------
# Error handling — stop_on_error semantics
# ---------------------------------------------------------------------------
class TestErrorHandling:
def test_most_recent_without_date_raises_by_default(self, messy_df):
# dedup with survivor_rule="most_recent" but no date_column errors.
p = Pipeline(steps=[Step("dedup", {"survivor_rule": "most_recent"})])
with pytest.raises(InputValidationError):
run_pipeline(messy_df, p)
def test_continue_on_error_sets_error_string(self):
df = pd.DataFrame({"email": ["a@x.com", "a@x.com"], "name": ["A", "B"]})
p = Pipeline(steps=[
Step("text_clean", {"trim": True}),
Step("dedup", {"survivor_rule": "most_recent"}), # will fail
Step("missing", {"strategy": "none"}),
])
res = run_pipeline(df, p, stop_on_error=False)
bad = res.step_results[1]
assert bad.error is not None
assert isinstance(bad.error, str) and bad.error.strip()
# The failed step did NOT change the row count — previous df carried.
assert res.step_results[2].error is None
assert res.final_rows == 2
def test_failed_step_summary_is_empty(self):
df = pd.DataFrame({"e": ["a", "a"], "n": ["x", "y"]})
p = Pipeline(steps=[Step("dedup", {"survivor_rule": "most_recent"})])
res = run_pipeline(df, p, stop_on_error=False)
assert res.step_results[0].summary == {}
assert res.step_results[0].skipped is False
def test_config_error_on_bad_survivor_rule_propagates(self, messy_df):
p = Pipeline(steps=[Step("dedup", {"survivor_rule": "nonsense"})])
with pytest.raises(ConfigError):
run_pipeline(messy_df, p)
# ---------------------------------------------------------------------------
# Edge inputs
# ---------------------------------------------------------------------------
class TestEdgeInputs:
def test_empty_dataframe_runs_clean(self):
empty = pd.DataFrame({"name": [], "phone": []})
res = run_pipeline(
empty,
recommended_pipeline(options={"missing": {"strategy": "none"}}),
)
assert res.initial_rows == 0
assert res.final_rows == 0
assert all(sr.error is None for sr in res.step_results if not sr.skipped)
def test_single_column_dataframe(self):
df = pd.DataFrame({"name": [" Al ", "al"]})
res = run_pipeline(
df, Pipeline(steps=[Step("text_clean", {"trim": True, "case": "title"})]),
)
assert res.final_df["name"].tolist() == ["Al", "Al"]
def test_all_steps_disabled_returns_unchanged(self, messy_df):
snapshot = messy_df.copy(deep=True)
p = Pipeline(steps=[
Step("text_clean", enabled=False),
Step("format_standardize", enabled=False),
Step("missing", enabled=False),
Step("dedup", enabled=False),
])
res = run_pipeline(messy_df, p)
assert all(sr.skipped is True for sr in res.step_results)
assert res.final_rows == res.initial_rows == 5
pd.testing.assert_frame_equal(res.final_df, snapshot)
def test_empty_pipeline_is_identity(self, messy_df):
res = run_pipeline(messy_df, Pipeline(steps=[]))
assert res.step_results == []
assert res.final_rows == 5
pd.testing.assert_frame_equal(res.final_df, messy_df)
# ---------------------------------------------------------------------------
# Serialization round-trips with disabled / named / nested-option steps
# ---------------------------------------------------------------------------
class TestSerializationRoundtrips:
def test_disabled_and_named_step_survive_dict(self):
p = Pipeline(steps=[
Step("text_clean", {"trim": True}, enabled=False, name="Pre-clean"),
Step("dedup", {"survivor_rule": "first"}, name="Final dedup"),
])
loaded = Pipeline.from_dict(p.to_dict())
assert loaded.steps[0].enabled is False
assert loaded.steps[0].name == "Pre-clean"
assert loaded.steps[0].options == {"trim": True}
assert loaded.steps[1].name == "Final dedup"
assert loaded.steps[1].display_name() == "Final dedup"
def test_nested_options_survive_dict(self):
nested = {
"column_types": {"phone": "phone", "signup_date": "date"},
}
strat = {
"survivor_rule": "most_complete",
"strategies": [{"columns": [
{"column": "email", "algorithm": "exact", "threshold": 100},
]}],
}
p = Pipeline(steps=[
Step("format_standardize", nested),
Step("dedup", strat),
])
loaded = Pipeline.from_dict(p.to_dict())
assert loaded.steps[0].options["column_types"] == nested["column_types"]
assert loaded.steps[1].options["strategies"] == strat["strategies"]
def test_nested_options_survive_file(self, tmp_path):
p = Pipeline(steps=[
Step("format_standardize",
{"column_types": {"phone": "phone"}},
enabled=False, name="formats"),
])
path = tmp_path / "pipe.json"
p.to_file(path)
loaded = Pipeline.from_file(path)
assert loaded.steps[0].enabled is False
assert loaded.steps[0].name == "formats"
assert loaded.steps[0].options == {"column_types": {"phone": "phone"}}
def test_roundtrip_is_idempotent(self):
p = Pipeline(steps=[
Step("text_clean", {"trim": True}, enabled=False, name="x"),
Step("missing", {"strategy": "median"}),
])
once = Pipeline.from_dict(p.to_dict())
twice = Pipeline.from_dict(once.to_dict())
assert once.to_dict() == twice.to_dict() == p.to_dict()
# ---------------------------------------------------------------------------
# recommended_pipeline(include=...) — subsetting, ordering, option seeding
# ---------------------------------------------------------------------------
class TestRecommendedInclude:
def test_subset_preserves_given_order(self):
p = recommended_pipeline(include=["dedup", "text_clean"])
assert [s.tool for s in p.steps] == ["dedup", "text_clean"]
def test_column_map_first(self):
p = recommended_pipeline(include=[
"column_map", "text_clean", "format_standardize", "missing", "dedup",
])
assert p.steps[0].tool == "column_map"
assert len(p.steps) == 5
def test_column_map_last(self):
p = recommended_pipeline(include=[
"text_clean", "format_standardize", "missing", "dedup", "column_map",
])
assert p.steps[-1].tool == "column_map"
def test_unknown_tool_in_include_raises(self):
with pytest.raises(InputValidationError):
recommended_pipeline(include=["text_clean", "not_a_tool"])
def test_options_seeding_only_targets_named_tool(self):
p = recommended_pipeline(
include=["text_clean", "dedup"],
options={"dedup": {"survivor_rule": "last"}},
)
assert p.steps[0].options == {} # text_clean unseeded
assert p.steps[1].options == {"survivor_rule": "last"}
def test_empty_include_yields_no_steps(self):
p = recommended_pipeline(include=[])
assert p.steps == []
def test_seeded_options_are_independent_copies(self):
seed = {"text_clean": {"trim": True}}
p = recommended_pipeline(include=["text_clean"], options=seed)
# Mutating the produced step must not leak back into the seed.
p.steps[0].options["trim"] = False
assert seed["text_clean"]["trim"] is True
# ---------------------------------------------------------------------------
# Realistic demo integration — messy customers table end-to-end
# ---------------------------------------------------------------------------
class TestDemoIntegration:
@pytest.fixture
def customers_df(self):
return pd.DataFrame({
"Full Name": [" alice smith ", "BOB JONES", "alice smith", ""],
"Email": ["alice@x.com ", "bob@x.com", "alice@x.com", "carol@x.com"],
"Phone": [" +14155551234 ", "+442079460958",
"+14155551234", "+13035551111"],
})
def test_full_recommended_plus_column_map(self, customers_df):
p = recommended_pipeline(
include=["text_clean", "format_standardize", "missing",
"dedup", "column_map"],
options={
"text_clean": {"trim": True, "collapse_whitespace": True},
"missing": {"strategy": "none"},
"dedup": {
"survivor_rule": "most_complete",
"strategies": [{"columns": [
{"column": "Phone", "algorithm": "exact", "threshold": 100},
]}],
},
"column_map": {
"mapping": {"Full Name": "name", "Email": "email",
"Phone": "phone"},
"unmapped": "keep",
},
},
)
res = run_pipeline(customers_df, p)
# Two rows share the same phone after trimming → one duplicate removed.
assert res.initial_rows == 4
assert res.final_rows == 3
assert res.final_rows < res.initial_rows
# Headers were renamed by the trailing column_map step.
assert list(res.final_df.columns) == ["name", "email", "phone"]
# The surviving Alice row kept its (trimmed) phone.
phones = res.final_df["phone"].tolist()
assert "+14155551234" in phones
assert phones.count("+14155551234") == 1 # only one Alice survives
# Every executed step succeeded.
assert all(sr.error is None for sr in res.step_results if not sr.skipped)
# column_map reported the three renames.
cm = res.step_results[-1]
assert cm.step.tool == "column_map"
assert cm.summary["columns_renamed"] == 3
def test_demo_dedup_step_reports_one_duplicate(self, customers_df):
p = recommended_pipeline(options={
"text_clean": {"trim": True},
"missing": {"strategy": "none"},
"dedup": {
"survivor_rule": "most_complete",
"strategies": [{"columns": [
{"column": "Phone", "algorithm": "exact", "threshold": 100},
]}],
},
})
res = run_pipeline(customers_df, p)
dedup_sr = next(s for s in res.step_results if s.step.tool == "dedup")
assert dedup_sr.summary["duplicates_removed"] == 1
assert dedup_sr.summary["groups"] == 1