Adds ~115 tests pinning the Automated Workflows feature end to end: - tests/test_pipeline.py (+43): per-adapter summary correctness on known inputs, multi-step data flow, error stop/continue contract, empty / single-column / all-disabled edges, dict+file serialization round-trips, recommended_pipeline(include=…), and a synthesized demo integration run. - tests/test_cli_pipeline.py (new, 21): --recommend, dry-run-by-default, --apply output CSV + audit JSON, --steps, --strict abort, arg validation, --continue-on-error vs halt, and a save→load round-trip. Invokes the Typer app directly to bypass the license guard (house pattern). - tests/gui/test_pipeline_builder.py (+9): reorder ▲/▼, disabled edge buttons, disabled-step persistence across reorder, restore-recommended, Advanced JSON export/import, and per-tool Configure panels emitting the correct option dicts (AppTest). - tests/gui/test_pipeline_phrasing.py (new, 30): step_phrase/step_status and the adapter-key→friendly-name bridge as pure functions, incl. pluralization, column prose, and warn/error status derivation. Full suite: 2565 passed, 91 skipped. No product bugs surfaced. Documents the coverage in docs/DEVELOPER.md (test tree + a pipeline-coverage note). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
821 lines
32 KiB
Python
821 lines
32 KiB
Python
"""Tests for src/core/pipeline.py."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import pytest
|
|
|
|
from src.core.errors import ConfigError, InputValidationError
|
|
from src.core.pipeline import (
|
|
Pipeline,
|
|
PipelineResult,
|
|
SOFT_DEPENDENCIES,
|
|
Step,
|
|
StepResult,
|
|
TOOL_ADAPTERS,
|
|
TOOL_NAMES,
|
|
recommended_pipeline,
|
|
run_pipeline,
|
|
validate_pipeline,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Step / Pipeline construction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStep:
|
|
def test_unknown_tool_raises(self):
|
|
with pytest.raises(ConfigError):
|
|
Step(tool="bogus_tool")
|
|
|
|
def test_default_options_empty_dict(self):
|
|
s = Step(tool="text_clean")
|
|
assert s.options == {}
|
|
assert s.enabled is True
|
|
|
|
def test_display_name_falls_back_to_tool(self):
|
|
assert Step(tool="dedup").display_name() == "dedup"
|
|
assert Step(tool="dedup", name="Final dedup").display_name() == "Final dedup"
|
|
|
|
|
|
class TestPipelineSerialization:
|
|
def test_roundtrip_dict(self):
|
|
p = Pipeline(steps=[
|
|
Step("text_clean", {"trim": True}),
|
|
Step("dedup", {"survivor_rule": "first"}),
|
|
])
|
|
out = p.to_dict()
|
|
loaded = Pipeline.from_dict(out)
|
|
assert len(loaded.steps) == 2
|
|
assert loaded.steps[0].tool == "text_clean"
|
|
assert loaded.steps[1].options["survivor_rule"] == "first"
|
|
|
|
def test_roundtrip_file(self, tmp_path):
|
|
p = Pipeline(steps=[Step("text_clean")])
|
|
path = tmp_path / "p.json"
|
|
p.to_file(path)
|
|
loaded = Pipeline.from_file(path)
|
|
assert loaded.steps[0].tool == "text_clean"
|
|
|
|
def test_from_dict_missing_steps_key(self):
|
|
with pytest.raises(ConfigError):
|
|
Pipeline.from_dict({})
|
|
|
|
def test_from_dict_missing_tool(self):
|
|
with pytest.raises(ConfigError):
|
|
Pipeline.from_dict({"steps": [{"options": {}}]})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# recommended_pipeline
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestRecommendedPipeline:
|
|
def test_default_order(self):
|
|
p = recommended_pipeline()
|
|
assert [s.tool for s in p.steps] == [
|
|
"text_clean", "format_standardize", "missing", "dedup",
|
|
]
|
|
|
|
def test_default_passes_validation(self):
|
|
p = recommended_pipeline()
|
|
assert validate_pipeline(p) == []
|
|
|
|
def test_include_overrides_default(self):
|
|
p = recommended_pipeline(include=["text_clean", "missing"])
|
|
assert [s.tool for s in p.steps] == ["text_clean", "missing"]
|
|
|
|
def test_options_seed_reaches_step(self):
|
|
p = recommended_pipeline(options={"text_clean": {"trim": False}})
|
|
assert p.steps[0].options == {"trim": False}
|
|
|
|
def test_unknown_tool_raises(self):
|
|
with pytest.raises(InputValidationError):
|
|
recommended_pipeline(include=["bogus"])
|
|
|
|
def test_can_place_column_map_first_or_last(self):
|
|
# Both placements must be acceptable per the docstring.
|
|
first = recommended_pipeline(include=[
|
|
"column_map", "text_clean", "format_standardize", "missing", "dedup",
|
|
])
|
|
last = recommended_pipeline(include=[
|
|
"text_clean", "format_standardize", "missing", "column_map", "dedup",
|
|
])
|
|
# No soft-dependency rule names column_map, so neither warns.
|
|
assert validate_pipeline(first) == []
|
|
assert validate_pipeline(last) == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# validate_pipeline — soft dependencies
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestValidatePipeline:
|
|
def test_in_order_no_warnings(self):
|
|
p = recommended_pipeline()
|
|
assert validate_pipeline(p) == []
|
|
|
|
def test_dedup_before_text_clean_warns(self):
|
|
p = Pipeline(steps=[Step("dedup"), Step("text_clean")])
|
|
ws = validate_pipeline(p)
|
|
assert len(ws) == 1
|
|
assert "dedup" in ws[0] and "text_clean" in ws[0]
|
|
|
|
def test_format_before_text_clean_warns(self):
|
|
p = Pipeline(steps=[Step("format_standardize"), Step("text_clean")])
|
|
ws = validate_pipeline(p)
|
|
assert any("format_standardize" in w for w in ws)
|
|
|
|
def test_disabled_steps_ignored(self):
|
|
# Disabled dedup-first should not trigger a warning.
|
|
p = Pipeline(steps=[
|
|
Step("dedup", enabled=False),
|
|
Step("text_clean"),
|
|
])
|
|
assert validate_pipeline(p) == []
|
|
|
|
def test_duplicate_tool_does_not_double_warn(self):
|
|
# text_clean twice (legitimate: two-pass cleaning) shouldn't
|
|
# generate redundant warnings.
|
|
p = Pipeline(steps=[
|
|
Step("text_clean"),
|
|
Step("text_clean"),
|
|
])
|
|
assert validate_pipeline(p) == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# run_pipeline — execution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture
|
|
def messy_df():
|
|
return pd.DataFrame({
|
|
"name": [" Alice ", "BOB", "N/A", "", "charlie "],
|
|
"phone": ["(415) 555-1234", "+44 20 7946 0958", "03-3210-7000", "", "(415) 555-1234"],
|
|
"country": ["US", "GB", "JP", "", "US"],
|
|
})
|
|
|
|
|
|
class TestRunPipeline:
|
|
def test_recommended_pipeline_runs_end_to_end(self, messy_df):
|
|
p = recommended_pipeline(options={
|
|
"format_standardize": {
|
|
"column_types": {"phone": "phone"},
|
|
"phone_country_column": "country",
|
|
},
|
|
"missing": {"strategy": "none"},
|
|
})
|
|
res = run_pipeline(messy_df, p)
|
|
assert isinstance(res, PipelineResult)
|
|
assert res.initial_rows == 5
|
|
# Dedup at the end removes the Alice/charlie duplicate (same phone).
|
|
assert res.final_rows < res.initial_rows
|
|
assert res.warnings == []
|
|
|
|
def test_initial_df_not_mutated(self, messy_df):
|
|
snapshot = messy_df.copy(deep=True)
|
|
run_pipeline(messy_df, recommended_pipeline())
|
|
pd.testing.assert_frame_equal(messy_df, snapshot)
|
|
|
|
def test_disabled_step_skipped(self, messy_df):
|
|
p = Pipeline(steps=[
|
|
Step("text_clean", enabled=False),
|
|
Step("missing", options={"strategy": "none"}),
|
|
])
|
|
res = run_pipeline(messy_df, p)
|
|
assert res.step_results[0].skipped is True
|
|
assert res.step_results[1].skipped is False
|
|
|
|
def test_step_results_ordered_and_timed(self, messy_df):
|
|
p = recommended_pipeline(options={
|
|
"missing": {"strategy": "none"},
|
|
})
|
|
res = run_pipeline(messy_df, p)
|
|
assert len(res.step_results) == 4
|
|
for sr in res.step_results:
|
|
assert sr.elapsed_seconds >= 0
|
|
assert [sr.step.tool for sr in res.step_results] == [
|
|
"text_clean", "format_standardize", "missing", "dedup",
|
|
]
|
|
|
|
def test_warnings_returned_but_run_proceeds(self, messy_df):
|
|
p = Pipeline(steps=[
|
|
Step("dedup"),
|
|
Step("text_clean"),
|
|
])
|
|
res = run_pipeline(messy_df, p)
|
|
assert res.warnings # warnings present
|
|
# Both steps still ran.
|
|
assert all(not sr.skipped for sr in res.step_results)
|
|
|
|
def test_progress_callback_fires_per_step(self, messy_df):
|
|
seen: list[StepResult] = []
|
|
p = Pipeline(steps=[
|
|
Step("text_clean"),
|
|
Step("missing", options={"strategy": "none"}),
|
|
])
|
|
run_pipeline(messy_df, p, on_step_complete=seen.append)
|
|
assert len(seen) == 2
|
|
assert all(isinstance(s, StepResult) for s in seen)
|
|
|
|
def test_progress_callback_exception_does_not_abort(self, messy_df):
|
|
def bad(_sr):
|
|
raise RuntimeError("boom")
|
|
p = Pipeline(steps=[Step("text_clean")])
|
|
# Must not raise.
|
|
res = run_pipeline(messy_df, p, on_step_complete=bad)
|
|
assert res.final_rows == 5
|
|
|
|
def test_stop_on_error_default(self, messy_df):
|
|
# Force an error by giving format_standardize a non-existent column.
|
|
p = Pipeline(steps=[
|
|
Step("format_standardize", options={
|
|
"column_types": {"does_not_exist": "phone"},
|
|
}),
|
|
])
|
|
with pytest.raises(InputValidationError):
|
|
run_pipeline(messy_df, p)
|
|
|
|
def test_continue_on_error_carries_previous_df(self, messy_df):
|
|
p = Pipeline(steps=[
|
|
Step("text_clean"),
|
|
Step("format_standardize", options={
|
|
"column_types": {"does_not_exist": "phone"},
|
|
}),
|
|
Step("missing", options={"strategy": "none"}),
|
|
])
|
|
res = run_pipeline(messy_df, p, stop_on_error=False)
|
|
# Step 2 errored, step 3 still ran.
|
|
assert res.step_results[1].error is not None
|
|
assert res.step_results[2].error is None
|
|
assert res.final_rows == 5
|
|
|
|
def test_non_dataframe_input(self):
|
|
with pytest.raises(InputValidationError):
|
|
run_pipeline([1, 2, 3], recommended_pipeline()) # type: ignore[arg-type]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-tool adapter sanity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestAdapters:
|
|
@pytest.mark.parametrize("tool", TOOL_NAMES)
|
|
def test_adapter_with_default_options_runs(self, tool, messy_df):
|
|
# Each adapter must accept an empty options dict and return a
|
|
# (df, summary) pair.
|
|
out_df, summary = TOOL_ADAPTERS[tool](messy_df, {})
|
|
assert isinstance(out_df, pd.DataFrame)
|
|
assert isinstance(summary, dict)
|
|
|
|
def test_format_standardize_adapter_passes_column_types(self, messy_df):
|
|
out, summary = TOOL_ADAPTERS["format_standardize"](
|
|
messy_df, {"column_types": {"phone": "phone"}},
|
|
)
|
|
assert summary["columns_processed"] == ["phone"]
|
|
|
|
def test_dedup_adapter_with_unknown_survivor_rule_raises(self, messy_df):
|
|
with pytest.raises(ConfigError):
|
|
TOOL_ADAPTERS["dedup"](messy_df, {"survivor_rule": "bogus"})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SOFT_DEPENDENCIES integrity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSoftDependencies:
|
|
def test_every_pair_uses_known_tools(self):
|
|
for earlier, later, _ in SOFT_DEPENDENCIES:
|
|
assert earlier in TOOL_NAMES
|
|
assert later in TOOL_NAMES
|
|
|
|
def test_all_reasons_non_empty(self):
|
|
for _, _, why in SOFT_DEPENDENCIES:
|
|
assert why and isinstance(why, str)
|
|
# Reason should be a sentence — at least 20 chars.
|
|
assert len(why) > 20
|
|
|
|
def test_dependencies_form_a_dag(self):
|
|
# No cycles — there must exist a topological ordering of the
|
|
# tools such that every soft dependency (earlier, later)
|
|
# is satisfied. With 5 tools and 6 deps this is easy to verify.
|
|
from collections import defaultdict, deque
|
|
edges: dict[str, list[str]] = defaultdict(list)
|
|
in_degree: dict[str, int] = {t: 0 for t in TOOL_NAMES}
|
|
for e, l, _ in SOFT_DEPENDENCIES:
|
|
edges[e].append(l)
|
|
in_degree[l] += 1
|
|
queue = deque(t for t, d in in_degree.items() if d == 0)
|
|
order = []
|
|
while queue:
|
|
t = queue.popleft()
|
|
order.append(t)
|
|
for nxt in edges[t]:
|
|
in_degree[nxt] -= 1
|
|
if in_degree[nxt] == 0:
|
|
queue.append(nxt)
|
|
assert len(order) == len(TOOL_NAMES), (
|
|
f"SOFT_DEPENDENCIES contain a cycle; topo order={order}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-adapter summary correctness — exact numbers on KNOWN-messy input.
|
|
# Each adapter is also exercised through run_pipeline so the StepResult
|
|
# carries the adapter's summary verbatim.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _run_one(df, tool, options):
|
|
"""Run a single-step pipeline and return (StepResult, PipelineResult)."""
|
|
res = run_pipeline(df, Pipeline(steps=[Step(tool, options)]))
|
|
return res.step_results[0], res
|
|
|
|
|
|
class TestTextCleanSummary:
|
|
def test_two_trimmable_cells_counted(self):
|
|
df = pd.DataFrame({
|
|
"a": [" x ", "y", " z"], # 2 cells need trimming (" x ", " z")
|
|
"b": ["ok", "fine", "good"], # already clean
|
|
})
|
|
out, summary = TOOL_ADAPTERS["text_clean"](df, {"trim": True})
|
|
assert summary["cells_total"] == 6
|
|
assert summary["cells_changed"] == 2
|
|
assert sorted(summary["columns_processed"]) == ["a", "b"]
|
|
assert out["a"].tolist() == ["x", "y", "z"]
|
|
|
|
def test_title_case_changes_all_cells(self):
|
|
df = pd.DataFrame({"name": ["alice smith", "BOB JONES"]})
|
|
out, summary = TOOL_ADAPTERS["text_clean"](df, {"case": "title"})
|
|
assert summary["cells_changed"] == 2
|
|
assert out["name"].tolist() == ["Alice Smith", "Bob Jones"]
|
|
|
|
def test_collapse_whitespace_counts_internal_runs(self):
|
|
df = pd.DataFrame({"name": ["a b", "c d", "e f"]})
|
|
out, summary = TOOL_ADAPTERS["text_clean"](
|
|
df, {"trim": True, "collapse_whitespace": True},
|
|
)
|
|
# "a b" and "e f" collapse; "c d" is already single-spaced.
|
|
assert summary["cells_changed"] == 2
|
|
assert out["name"].tolist() == ["a b", "c d", "e f"]
|
|
|
|
def test_summary_visible_through_run_pipeline(self):
|
|
df = pd.DataFrame({"a": [" x ", "y"]})
|
|
sr, _res = _run_one(df, "text_clean", {"trim": True})
|
|
assert sr.skipped is False
|
|
assert sr.error is None
|
|
assert sr.summary["cells_changed"] == 1
|
|
assert sr.summary["cells_total"] == 2
|
|
|
|
|
|
class TestFormatStandardizeSummary:
|
|
def test_one_unparseable_phone(self):
|
|
df = pd.DataFrame({
|
|
"phone": ["(415) 555-1234", "not a phone", "+44 20 7946 0958"],
|
|
})
|
|
out, summary = TOOL_ADAPTERS["format_standardize"](
|
|
df, {"column_types": {"phone": "phone"}},
|
|
)
|
|
assert summary["cells_total"] == 3
|
|
assert summary["cells_unparseable"] == 1
|
|
assert summary["cells_changed"] == 2
|
|
assert summary["columns_processed"] == ["phone"]
|
|
assert out["phone"].tolist() == [
|
|
"+14155551234", "not a phone", "+442079460958",
|
|
]
|
|
|
|
def test_date_standardization_counts(self):
|
|
df = pd.DataFrame({"signup_date": ["2024-01-05", "Jan 5 2024", "garbage"]})
|
|
out, summary = TOOL_ADAPTERS["format_standardize"](
|
|
df, {"column_types": {"signup_date": "date"}},
|
|
)
|
|
# "2024-01-05" already canonical; "Jan 5 2024" rewritten; "garbage" fails.
|
|
assert summary["cells_unparseable"] == 1
|
|
assert summary["cells_changed"] == 1
|
|
assert out["signup_date"].tolist()[:2] == ["2024-01-05", "2024-01-05"]
|
|
|
|
def test_summary_visible_through_run_pipeline(self):
|
|
df = pd.DataFrame({"phone": ["(415) 555-1234", "bad"]})
|
|
sr, _ = _run_one(df, "format_standardize", {"column_types": {"phone": "phone"}})
|
|
assert sr.summary["cells_unparseable"] == 1
|
|
assert sr.summary["columns_processed"] == ["phone"]
|
|
|
|
|
|
class TestMissingSummary:
|
|
def test_median_fills_each_blank(self):
|
|
df = pd.DataFrame({"val": [1.0, np.nan, 3.0, np.nan, 5.0]})
|
|
out, summary = TOOL_ADAPTERS["missing"](df, {"strategy": "median"})
|
|
assert summary["cells_filled"] == 2 # exactly the 2 NaNs
|
|
assert summary["rows_dropped"] == 0
|
|
assert out["val"].tolist() == [1.0, 3.0, 3.0, 3.0, 5.0] # median is 3.0
|
|
|
|
def test_drop_row_by_threshold(self):
|
|
# row_drop_threshold is the *fraction* of nulls needed to drop a row.
|
|
df = pd.DataFrame({
|
|
"a": [1.0, np.nan, 3.0],
|
|
"b": ["x", np.nan, "z"], # middle row is 100% null
|
|
})
|
|
out, summary = TOOL_ADAPTERS["missing"](
|
|
df, {"strategy": "drop_row", "row_drop_threshold": 0.4},
|
|
)
|
|
assert summary["rows_dropped"] == 1
|
|
assert len(out) == 2
|
|
|
|
def test_sentinel_standardization_count(self):
|
|
df = pd.DataFrame({"x": ["ok", "N/A", "fine", "N/A"]})
|
|
out, summary = TOOL_ADAPTERS["missing"](df, {
|
|
"strategy": "none",
|
|
"sentinels": ["N/A"],
|
|
"standardize_sentinels": True,
|
|
})
|
|
assert summary["sentinels_standardized"] == 2
|
|
# The two "N/A" cells became real NaN.
|
|
assert out["x"].isna().sum() == 2
|
|
assert out["x"].tolist()[0] == "ok"
|
|
|
|
def test_summary_visible_through_run_pipeline(self):
|
|
df = pd.DataFrame({"val": [1.0, np.nan, 3.0]})
|
|
sr, _ = _run_one(df, "missing", {"strategy": "median"})
|
|
assert sr.summary["cells_filled"] == 1
|
|
|
|
|
|
class TestColumnMapSummary:
|
|
def test_single_rename(self):
|
|
df = pd.DataFrame({"old": [1, 2], "keep": [3, 4]})
|
|
out, summary = TOOL_ADAPTERS["column_map"](
|
|
df, {"mapping": {"old": "new"}, "unmapped": "keep"},
|
|
)
|
|
assert summary["columns_renamed"] == 1
|
|
assert summary["columns_dropped"] == []
|
|
assert list(out.columns) == ["new", "keep"]
|
|
|
|
def test_unmapped_drop_reports_dropped_columns(self):
|
|
df = pd.DataFrame({"old": [1, 2], "keep": [3, 4]})
|
|
out, summary = TOOL_ADAPTERS["column_map"](
|
|
df, {"mapping": {"old": "new"}, "unmapped": "drop"},
|
|
)
|
|
assert summary["columns_renamed"] == 1
|
|
assert summary["columns_dropped"] == ["keep"]
|
|
assert list(out.columns) == ["new"]
|
|
|
|
def test_summary_visible_through_run_pipeline(self):
|
|
df = pd.DataFrame({"old": [1], "keep": [2]})
|
|
sr, _ = _run_one(df, "column_map", {"mapping": {"old": "new"}})
|
|
assert sr.summary["columns_renamed"] == 1
|
|
|
|
|
|
class TestDedupSummary:
|
|
def test_exact_duplicate_rows(self):
|
|
df = pd.DataFrame({
|
|
"email": ["a@x.com", "b@x.com", "a@x.com", "a@x.com"],
|
|
"name": ["A", "B", "A", "A"],
|
|
})
|
|
out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "first"})
|
|
assert summary["input_rows"] == 4
|
|
assert summary["output_rows"] == 2
|
|
assert summary["duplicates_removed"] == 2
|
|
assert summary["groups"] == 1
|
|
assert out["email"].tolist() == ["a@x.com", "b@x.com"]
|
|
|
|
def test_explicit_exact_strategy_on_column(self):
|
|
df = pd.DataFrame({
|
|
"email": ["a@x.com", "b@x.com", "a@x.com"],
|
|
"name": ["A", "B", "C"],
|
|
})
|
|
out, summary = TOOL_ADAPTERS["dedup"](df, {
|
|
"survivor_rule": "first",
|
|
"strategies": [{"columns": [
|
|
{"column": "email", "algorithm": "exact", "threshold": 100},
|
|
]}],
|
|
})
|
|
assert summary["duplicates_removed"] == 1
|
|
assert summary["groups"] == 1
|
|
|
|
def test_most_complete_keeps_fuller_survivor(self):
|
|
df = pd.DataFrame({
|
|
"email": ["a@x.com", "a@x.com"],
|
|
"name": ["", "Alice"], # second row is more complete
|
|
"phone": ["111", "111"],
|
|
})
|
|
out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "most_complete"})
|
|
assert summary["duplicates_removed"] == 1
|
|
assert out.iloc[0]["name"] == "Alice"
|
|
|
|
def test_no_duplicates_is_noop(self):
|
|
df = pd.DataFrame({"email": ["a@x.com", "b@x.com"], "name": ["A", "B"]})
|
|
out, summary = TOOL_ADAPTERS["dedup"](df, {"survivor_rule": "first"})
|
|
assert summary["duplicates_removed"] == 0
|
|
assert summary["output_rows"] == 2
|
|
|
|
def test_summary_visible_through_run_pipeline(self):
|
|
df = pd.DataFrame({"email": ["a@x.com", "a@x.com"], "name": ["A", "A"]})
|
|
sr, res = _run_one(df, "dedup", {"survivor_rule": "first"})
|
|
assert sr.summary["duplicates_removed"] == 1
|
|
assert res.final_rows == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data flow — a later step depends on an earlier step's output
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDataFlow:
|
|
def test_text_clean_enables_dedup_match(self):
|
|
# The two phones differ only by surrounding whitespace; without
|
|
# the trim they are distinct, so dedup alone would keep both.
|
|
df = pd.DataFrame({"phone": [" +14155551234 ", "+14155551234"]})
|
|
p = Pipeline(steps=[
|
|
Step("text_clean", {"trim": True}),
|
|
Step("dedup", {"survivor_rule": "first"}),
|
|
])
|
|
res = run_pipeline(df, p)
|
|
assert res.initial_rows == 2
|
|
assert res.final_rows == 1
|
|
assert res.final_df["phone"].tolist() == ["+14155551234"]
|
|
|
|
def test_dedup_default_matching_normalizes_whitespace(self):
|
|
# Note: dedup's exact matcher already normalizes surrounding
|
|
# whitespace, so the two phones collapse even WITHOUT a prior
|
|
# text_clean. The survivor still carries the un-trimmed value.
|
|
df = pd.DataFrame({"phone": [" +14155551234 ", "+14155551234"]})
|
|
res = run_pipeline(df, Pipeline(steps=[Step("dedup", {"survivor_rule": "first"})]))
|
|
assert res.final_rows == 1
|
|
# Survivor keeps the raw (still-padded) text — dedup does not clean.
|
|
assert res.final_df["phone"].tolist() == [" +14155551234 "]
|
|
|
|
def test_chained_initial_and_final_rows(self):
|
|
df = pd.DataFrame({
|
|
"name": [" Al ", "al", "Bob"],
|
|
"v": [1, 1, 2],
|
|
})
|
|
p = Pipeline(steps=[
|
|
Step("text_clean", {"trim": True, "case": "title"}),
|
|
Step("dedup", {"survivor_rule": "first"}),
|
|
])
|
|
res = run_pipeline(df, p)
|
|
# " Al " and "al" both become "Al" → duplicate rows collapse.
|
|
assert res.initial_rows == 3
|
|
assert res.final_rows == 2
|
|
assert "Al" in res.final_df["name"].tolist()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Error handling — stop_on_error semantics
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestErrorHandling:
|
|
def test_most_recent_without_date_raises_by_default(self, messy_df):
|
|
# dedup with survivor_rule="most_recent" but no date_column errors.
|
|
p = Pipeline(steps=[Step("dedup", {"survivor_rule": "most_recent"})])
|
|
with pytest.raises(InputValidationError):
|
|
run_pipeline(messy_df, p)
|
|
|
|
def test_continue_on_error_sets_error_string(self):
|
|
df = pd.DataFrame({"email": ["a@x.com", "a@x.com"], "name": ["A", "B"]})
|
|
p = Pipeline(steps=[
|
|
Step("text_clean", {"trim": True}),
|
|
Step("dedup", {"survivor_rule": "most_recent"}), # will fail
|
|
Step("missing", {"strategy": "none"}),
|
|
])
|
|
res = run_pipeline(df, p, stop_on_error=False)
|
|
bad = res.step_results[1]
|
|
assert bad.error is not None
|
|
assert isinstance(bad.error, str) and bad.error.strip()
|
|
# The failed step did NOT change the row count — previous df carried.
|
|
assert res.step_results[2].error is None
|
|
assert res.final_rows == 2
|
|
|
|
def test_failed_step_summary_is_empty(self):
|
|
df = pd.DataFrame({"e": ["a", "a"], "n": ["x", "y"]})
|
|
p = Pipeline(steps=[Step("dedup", {"survivor_rule": "most_recent"})])
|
|
res = run_pipeline(df, p, stop_on_error=False)
|
|
assert res.step_results[0].summary == {}
|
|
assert res.step_results[0].skipped is False
|
|
|
|
def test_config_error_on_bad_survivor_rule_propagates(self, messy_df):
|
|
p = Pipeline(steps=[Step("dedup", {"survivor_rule": "nonsense"})])
|
|
with pytest.raises(ConfigError):
|
|
run_pipeline(messy_df, p)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Edge inputs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestEdgeInputs:
|
|
def test_empty_dataframe_runs_clean(self):
|
|
empty = pd.DataFrame({"name": [], "phone": []})
|
|
res = run_pipeline(
|
|
empty,
|
|
recommended_pipeline(options={"missing": {"strategy": "none"}}),
|
|
)
|
|
assert res.initial_rows == 0
|
|
assert res.final_rows == 0
|
|
assert all(sr.error is None for sr in res.step_results if not sr.skipped)
|
|
|
|
def test_single_column_dataframe(self):
|
|
df = pd.DataFrame({"name": [" Al ", "al"]})
|
|
res = run_pipeline(
|
|
df, Pipeline(steps=[Step("text_clean", {"trim": True, "case": "title"})]),
|
|
)
|
|
assert res.final_df["name"].tolist() == ["Al", "Al"]
|
|
|
|
def test_all_steps_disabled_returns_unchanged(self, messy_df):
|
|
snapshot = messy_df.copy(deep=True)
|
|
p = Pipeline(steps=[
|
|
Step("text_clean", enabled=False),
|
|
Step("format_standardize", enabled=False),
|
|
Step("missing", enabled=False),
|
|
Step("dedup", enabled=False),
|
|
])
|
|
res = run_pipeline(messy_df, p)
|
|
assert all(sr.skipped is True for sr in res.step_results)
|
|
assert res.final_rows == res.initial_rows == 5
|
|
pd.testing.assert_frame_equal(res.final_df, snapshot)
|
|
|
|
def test_empty_pipeline_is_identity(self, messy_df):
|
|
res = run_pipeline(messy_df, Pipeline(steps=[]))
|
|
assert res.step_results == []
|
|
assert res.final_rows == 5
|
|
pd.testing.assert_frame_equal(res.final_df, messy_df)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Serialization round-trips with disabled / named / nested-option steps
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSerializationRoundtrips:
|
|
def test_disabled_and_named_step_survive_dict(self):
|
|
p = Pipeline(steps=[
|
|
Step("text_clean", {"trim": True}, enabled=False, name="Pre-clean"),
|
|
Step("dedup", {"survivor_rule": "first"}, name="Final dedup"),
|
|
])
|
|
loaded = Pipeline.from_dict(p.to_dict())
|
|
assert loaded.steps[0].enabled is False
|
|
assert loaded.steps[0].name == "Pre-clean"
|
|
assert loaded.steps[0].options == {"trim": True}
|
|
assert loaded.steps[1].name == "Final dedup"
|
|
assert loaded.steps[1].display_name() == "Final dedup"
|
|
|
|
def test_nested_options_survive_dict(self):
|
|
nested = {
|
|
"column_types": {"phone": "phone", "signup_date": "date"},
|
|
}
|
|
strat = {
|
|
"survivor_rule": "most_complete",
|
|
"strategies": [{"columns": [
|
|
{"column": "email", "algorithm": "exact", "threshold": 100},
|
|
]}],
|
|
}
|
|
p = Pipeline(steps=[
|
|
Step("format_standardize", nested),
|
|
Step("dedup", strat),
|
|
])
|
|
loaded = Pipeline.from_dict(p.to_dict())
|
|
assert loaded.steps[0].options["column_types"] == nested["column_types"]
|
|
assert loaded.steps[1].options["strategies"] == strat["strategies"]
|
|
|
|
def test_nested_options_survive_file(self, tmp_path):
|
|
p = Pipeline(steps=[
|
|
Step("format_standardize",
|
|
{"column_types": {"phone": "phone"}},
|
|
enabled=False, name="formats"),
|
|
])
|
|
path = tmp_path / "pipe.json"
|
|
p.to_file(path)
|
|
loaded = Pipeline.from_file(path)
|
|
assert loaded.steps[0].enabled is False
|
|
assert loaded.steps[0].name == "formats"
|
|
assert loaded.steps[0].options == {"column_types": {"phone": "phone"}}
|
|
|
|
def test_roundtrip_is_idempotent(self):
|
|
p = Pipeline(steps=[
|
|
Step("text_clean", {"trim": True}, enabled=False, name="x"),
|
|
Step("missing", {"strategy": "median"}),
|
|
])
|
|
once = Pipeline.from_dict(p.to_dict())
|
|
twice = Pipeline.from_dict(once.to_dict())
|
|
assert once.to_dict() == twice.to_dict() == p.to_dict()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# recommended_pipeline(include=...) — subsetting, ordering, option seeding
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestRecommendedInclude:
|
|
def test_subset_preserves_given_order(self):
|
|
p = recommended_pipeline(include=["dedup", "text_clean"])
|
|
assert [s.tool for s in p.steps] == ["dedup", "text_clean"]
|
|
|
|
def test_column_map_first(self):
|
|
p = recommended_pipeline(include=[
|
|
"column_map", "text_clean", "format_standardize", "missing", "dedup",
|
|
])
|
|
assert p.steps[0].tool == "column_map"
|
|
assert len(p.steps) == 5
|
|
|
|
def test_column_map_last(self):
|
|
p = recommended_pipeline(include=[
|
|
"text_clean", "format_standardize", "missing", "dedup", "column_map",
|
|
])
|
|
assert p.steps[-1].tool == "column_map"
|
|
|
|
def test_unknown_tool_in_include_raises(self):
|
|
with pytest.raises(InputValidationError):
|
|
recommended_pipeline(include=["text_clean", "not_a_tool"])
|
|
|
|
def test_options_seeding_only_targets_named_tool(self):
|
|
p = recommended_pipeline(
|
|
include=["text_clean", "dedup"],
|
|
options={"dedup": {"survivor_rule": "last"}},
|
|
)
|
|
assert p.steps[0].options == {} # text_clean unseeded
|
|
assert p.steps[1].options == {"survivor_rule": "last"}
|
|
|
|
def test_empty_include_yields_no_steps(self):
|
|
p = recommended_pipeline(include=[])
|
|
assert p.steps == []
|
|
|
|
def test_seeded_options_are_independent_copies(self):
|
|
seed = {"text_clean": {"trim": True}}
|
|
p = recommended_pipeline(include=["text_clean"], options=seed)
|
|
# Mutating the produced step must not leak back into the seed.
|
|
p.steps[0].options["trim"] = False
|
|
assert seed["text_clean"]["trim"] is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Realistic demo integration — messy customers table end-to-end
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDemoIntegration:
|
|
@pytest.fixture
|
|
def customers_df(self):
|
|
return pd.DataFrame({
|
|
"Full Name": [" alice smith ", "BOB JONES", "alice smith", ""],
|
|
"Email": ["alice@x.com ", "bob@x.com", "alice@x.com", "carol@x.com"],
|
|
"Phone": [" +14155551234 ", "+442079460958",
|
|
"+14155551234", "+13035551111"],
|
|
})
|
|
|
|
def test_full_recommended_plus_column_map(self, customers_df):
|
|
p = recommended_pipeline(
|
|
include=["text_clean", "format_standardize", "missing",
|
|
"dedup", "column_map"],
|
|
options={
|
|
"text_clean": {"trim": True, "collapse_whitespace": True},
|
|
"missing": {"strategy": "none"},
|
|
"dedup": {
|
|
"survivor_rule": "most_complete",
|
|
"strategies": [{"columns": [
|
|
{"column": "Phone", "algorithm": "exact", "threshold": 100},
|
|
]}],
|
|
},
|
|
"column_map": {
|
|
"mapping": {"Full Name": "name", "Email": "email",
|
|
"Phone": "phone"},
|
|
"unmapped": "keep",
|
|
},
|
|
},
|
|
)
|
|
res = run_pipeline(customers_df, p)
|
|
|
|
# Two rows share the same phone after trimming → one duplicate removed.
|
|
assert res.initial_rows == 4
|
|
assert res.final_rows == 3
|
|
assert res.final_rows < res.initial_rows
|
|
|
|
# Headers were renamed by the trailing column_map step.
|
|
assert list(res.final_df.columns) == ["name", "email", "phone"]
|
|
|
|
# The surviving Alice row kept its (trimmed) phone.
|
|
phones = res.final_df["phone"].tolist()
|
|
assert "+14155551234" in phones
|
|
assert phones.count("+14155551234") == 1 # only one Alice survives
|
|
|
|
# Every executed step succeeded.
|
|
assert all(sr.error is None for sr in res.step_results if not sr.skipped)
|
|
# column_map reported the three renames.
|
|
cm = res.step_results[-1]
|
|
assert cm.step.tool == "column_map"
|
|
assert cm.summary["columns_renamed"] == 3
|
|
|
|
def test_demo_dedup_step_reports_one_duplicate(self, customers_df):
|
|
p = recommended_pipeline(options={
|
|
"text_clean": {"trim": True},
|
|
"missing": {"strategy": "none"},
|
|
"dedup": {
|
|
"survivor_rule": "most_complete",
|
|
"strategies": [{"columns": [
|
|
{"column": "Phone", "algorithm": "exact", "threshold": 100},
|
|
]}],
|
|
},
|
|
})
|
|
res = run_pipeline(customers_df, p)
|
|
dedup_sr = next(s for s in res.step_results if s.step.tool == "dedup")
|
|
assert dedup_sr.summary["duplicates_removed"] == 1
|
|
assert dedup_sr.summary["groups"] == 1
|