feat(pipeline): visual module-card builder for Automated Workflows

Replaces the raw options_json data-editor table with a per-step "module
card" builder matching the locked design mockup
(layout-review/09_pipeline_runner.html): each step shows a friendly name +
caption, an enable toggle, ▲/▼/✕ reorder/remove controls, and a Configure
expander that renders that tool's own controls in plain language. Raw JSON
is demoted to an Advanced import/export section.

New src/gui/components/pipeline_modules.py holds the adapter-key→tool_id
friendly-name bridge, one plain-language config renderer per tool
(text_clean, format_standardize, missing, column_map, dedup — emitting the
exact JSON option shapes the core adapters accept), and render_step_card.
Steps live in session state as an ordered list with stable ids so widget
keys survive reorder/remove. Reorder is ▲/▼ buttons (no JS drag dependency).

The on-disk/CLI pipeline JSON format is unchanged — CLI and src/core
untouched. Adds tests/gui/test_pipeline_builder.py (AppTest) covering seed,
configure panels, toggle/add/remove, and a full run.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-22 18:16:09 +00:00
parent fd9606c67b
commit 837f4b88b5
3 changed files with 645 additions and 126 deletions

View File

@@ -0,0 +1,376 @@
"""Visual pipeline builder — per-step "module" cards + plain-language config panels.
The Automated Workflows page (``9_Pipeline_Runner.py``) used to configure each
step through a raw ``options_json`` text column. This module replaces that with
one **module card** per step: a friendly name + caption, an enable toggle,
reorder/remove controls, and a **Configure** expander that renders that tool's
own controls in plain language (no JSON). Raw JSON survives only as the page's
Advanced import/export surface.
Each config renderer takes the step's current ``options`` dict, renders the
curated controls from the design mockup (``layout-review/09_pipeline_runner.html``),
and returns an updated **JSON-serialisable** options dict — the same shape the
``TOOL_ADAPTERS`` in ``src/core/pipeline.py`` consume via ``Options.from_dict``.
Two hard Streamlit constraints shaped this:
* No nested expanders — the per-step Configure expander means config renderers
here must NOT open their own expander, and the page must not wrap the card
stack in an outer expander.
* Widget identity must be stable across reorder/remove — every widget key is
derived from a step's stable ``id``, never its list position.
"""
from __future__ import annotations
from typing import Any, Callable, Optional
import pandas as pd
import streamlit as st
from src.gui.tools_registry import tool_name
# ---------------------------------------------------------------------------
# Adapter-key → registry tool_id bridge
# ---------------------------------------------------------------------------
#
# Pipeline steps are keyed by adapter name (``text_clean``); the tools registry
# and i18n packs are keyed by tool_id (``02_text_cleaner``). The registry has no
# reverse lookup, so we keep the bridge here. ``step_label`` resolves the
# localized friendly name; ``step_caption`` returns a short, plain-English "what
# this step does" line for the card body.
PIPELINE_TOOL_META: dict[str, str] = {
"text_clean": "02_text_cleaner",
"format_standardize": "03_format_standardizer",
"missing": "04_missing_handler",
"column_map": "05_column_mapper",
"dedup": "01_deduplicator",
}
_STEP_CAPTIONS: dict[str, str] = {
"text_clean": "Trim spaces, collapse repeats, strip invisible characters.",
"format_standardize": "Canonicalize phones, dates, currency, names per column.",
"missing": "Flag, fill, or drop blank cells (and disguised blanks).",
"column_map": "Rename source columns onto your target column names.",
"dedup": "Find duplicate rows and keep one survivor per group.",
}
def step_label(tool: str) -> str:
"""Friendly, localized name for a pipeline adapter key (falls back to the key)."""
tool_id = PIPELINE_TOOL_META.get(tool)
return tool_name(tool_id) if tool_id else tool
def step_caption(tool: str) -> str:
return _STEP_CAPTIONS.get(tool, "")
# ---------------------------------------------------------------------------
# Per-tool config renderers
# ---------------------------------------------------------------------------
#
# Uniform signature: ``render_<tool>_config(df, options, kp) -> options``.
# * ``df`` — the uploaded DataFrame (for column lists / type hints).
# * ``options`` — the step's current options dict (seed widget defaults).
# * ``kp`` — key prefix, unique per step (``f"{tool}_{id}"``).
# Returns a JSON-serialisable options dict. Renderers must not open expanders.
_CASE_LABELS: list[tuple[str, Optional[str]]] = [
("Leave as-is", None),
("UPPERCASE", "upper"),
("lowercase", "lower"),
("Title Case", "title"),
("Sentence case", "sentence"),
]
def render_text_clean_config(df: pd.DataFrame, options: dict, kp: str) -> dict:
trim = st.checkbox(
"Trim leading & trailing whitespace",
value=bool(options.get("trim", True)), key=f"{kp}_trim",
)
collapse = st.checkbox(
"Collapse repeated spaces to one",
value=bool(options.get("collapse_whitespace", True)), key=f"{kp}_collapse",
)
fold = st.checkbox(
"Normalize smart quotes & dashes to plain ASCII",
value=bool(options.get("fold_smart_chars", True)), key=f"{kp}_fold",
)
strip_zw = st.checkbox(
"Strip zero-width / invisible characters",
value=bool(options.get("strip_zero_width", True)), key=f"{kp}_zw",
)
cur_case = options.get("case")
case_idx = next((i for i, (_, v) in enumerate(_CASE_LABELS) if v == cur_case), 0)
case_choice = st.selectbox(
"Letter case",
[lbl for lbl, _ in _CASE_LABELS],
index=case_idx, key=f"{kp}_case",
)
case_val = next(v for lbl, v in _CASE_LABELS if lbl == case_choice)
out: dict[str, Any] = {
"trim": trim,
"collapse_whitespace": collapse,
"fold_smart_chars": fold,
"strip_zero_width": strip_zw,
}
if case_val is not None:
out["case"] = case_val
return out
_FORMAT_LABELS: list[tuple[str, Optional[str]]] = [
("Leave as-is", None),
("Date", "date"),
("Phone number", "phone"),
("Currency", "currency"),
("Name", "name"),
("Address", "address"),
("Email", "email"),
("Boolean (yes/no)", "boolean"),
]
def render_format_standardize_config(df: pd.DataFrame, options: dict, kp: str) -> dict:
st.caption(
"Pick a target format for each column. Columns left as “Leave as-is” "
"are untouched."
)
current = dict(options.get("column_types", {}))
labels = [lbl for lbl, _ in _FORMAT_LABELS]
column_types: dict[str, str] = {}
for col in df.columns:
cur_val = current.get(col)
idx = next((i for i, (_, v) in enumerate(_FORMAT_LABELS) if v == cur_val), 0)
choice = st.selectbox(
str(col), labels, index=idx, key=f"{kp}_fmt__{col}",
)
val = next(v for lbl, v in _FORMAT_LABELS if lbl == choice)
if val is not None:
column_types[str(col)] = val
return {"column_types": column_types}
# Plain-language blank-handling choices → core strategy values. "fill" is a UI
# token expanded to numeric median + categorical mode (MissingOptions handles
# the per-dtype split via ``categorical_strategy``).
_MISSING_CHOICES: list[tuple[str, str]] = [
("Flag them (mark blanks, change nothing)", "flag"),
("Fill them in (numbers → median, text → most common)", "fill"),
("Drop rows that have any blank", "drop"),
]
def _missing_mode_from_strategy(strategy: Optional[str]) -> str:
if strategy in ("drop_row", "drop_col", "drop_both"):
return "drop"
if strategy in ("mean", "median", "mode", "constant", "ffill", "bfill", "interpolate"):
return "fill"
return "flag"
def render_missing_config(df: pd.DataFrame, options: dict, kp: str) -> dict:
from src.core.missing import DEFAULT_SENTINELS
cur_mode = _missing_mode_from_strategy(options.get("strategy"))
mode_idx = next((i for i, (_, v) in enumerate(_MISSING_CHOICES) if v == cur_mode), 0)
mode_choice = st.radio(
"What should happen to blank cells?",
[lbl for lbl, _ in _MISSING_CHOICES],
index=mode_idx, key=f"{kp}_strategy",
)
mode = next(v for lbl, v in _MISSING_CHOICES if lbl == mode_choice)
seed_sentinels = options.get("sentinels") or list(DEFAULT_SENTINELS)
sent_text = st.text_input(
"Treat these as blank (comma-separated)",
value=", ".join(seed_sentinels), key=f"{kp}_sentinels",
help="Matched case-insensitively after stripping whitespace.",
)
sentinels = [s.strip() for s in sent_text.split(",") if s.strip()]
out: dict[str, Any] = {
"standardize_sentinels": True,
"sentinels": sentinels,
}
if mode == "flag":
out["strategy"] = "none"
elif mode == "fill":
out["strategy"] = "median"
out["categorical_strategy"] = "mode"
else: # drop
out["strategy"] = "drop_row"
return out
_UNMAPPED_CHOICES = ["keep", "drop", "error"]
def render_column_map_config(df: pd.DataFrame, options: dict, kp: str) -> dict:
st.caption(
"Type the target name each source column should become. Leave a target "
"blank to keep that column's name unchanged."
)
current = dict(options.get("mapping", {}))
table = pd.DataFrame(
{
"source": [str(c) for c in df.columns],
"target": [current.get(str(c), "") for c in df.columns],
}
)
edited = st.data_editor(
table,
width="stretch",
hide_index=True,
disabled=["source"],
column_config={
"source": st.column_config.TextColumn("Source column"),
"target": st.column_config.TextColumn("Rename to"),
},
key=f"{kp}_mapping",
)
mapping = {
str(r["source"]): str(r["target"]).strip()
for _, r in edited.iterrows()
if str(r.get("target") or "").strip()
}
c1, c2 = st.columns(2)
with c1:
unmapped = st.selectbox(
"Columns with no rename",
_UNMAPPED_CHOICES,
index=_UNMAPPED_CHOICES.index(options.get("unmapped", "keep"))
if options.get("unmapped") in _UNMAPPED_CHOICES else 0,
key=f"{kp}_unmapped",
help="keep: leave them in place · drop: remove them · error: stop the run.",
)
with c2:
coerce = st.checkbox(
"Coerce values to target types",
value=bool(options.get("coerce_types", False)), key=f"{kp}_coerce",
)
return {"mapping": mapping, "unmapped": unmapped, "coerce_types": coerce}
_SURVIVOR_LABELS: list[tuple[str, str]] = [
("Keep the most complete row", "most_complete"),
("Keep the first seen", "first"),
("Keep the last seen", "last"),
("Keep the most recent (by date)", "most_recent"),
]
def render_dedup_config(df: pd.DataFrame, options: dict, kp: str) -> dict:
cur_rule = options.get("survivor_rule", "first")
rule_idx = next((i for i, (_, v) in enumerate(_SURVIVOR_LABELS) if v == cur_rule), 0)
rule_choice = st.selectbox(
"When rows match, which one survives?",
[lbl for lbl, _ in _SURVIVOR_LABELS],
index=rule_idx, key=f"{kp}_survivor",
)
survivor_rule = next(v for lbl, v in _SURVIVOR_LABELS if lbl == rule_choice)
merge = st.checkbox(
"Merge matched rows (fill each survivor's blanks from its duplicates)",
value=bool(options.get("merge", False)), key=f"{kp}_merge",
)
# Recover the previously-selected match columns from the stored strategies
# (a single exact-match strategy over the chosen columns).
prev_cols: list[str] = []
for strat in options.get("strategies", []) or []:
for c in strat.get("columns", []):
if c.get("column"):
prev_cols.append(c["column"])
all_cols = [str(c) for c in df.columns]
match_cols = st.multiselect(
"Match on these columns",
all_cols,
default=[c for c in prev_cols if c in all_cols],
key=f"{kp}_matchcols",
help="Rows are duplicates when these columns all match. Leave empty to auto-detect.",
)
out: dict[str, Any] = {"survivor_rule": survivor_rule, "merge": merge}
if match_cols:
out["strategies"] = [
{"columns": [
{"column": c, "algorithm": "exact", "threshold": 100}
for c in match_cols
]}
]
if survivor_rule == "most_recent":
date_default = options.get("date_column")
date_idx = all_cols.index(date_default) if date_default in all_cols else 0
out["date_column"] = st.selectbox(
"Date column (for most-recent)",
all_cols, index=date_idx, key=f"{kp}_datecol",
) if all_cols else None
return out
CONFIG_RENDERERS: dict[str, Callable[[pd.DataFrame, dict, str], dict]] = {
"text_clean": render_text_clean_config,
"format_standardize": render_format_standardize_config,
"missing": render_missing_config,
"column_map": render_column_map_config,
"dedup": render_dedup_config,
}
# ---------------------------------------------------------------------------
# Module card
# ---------------------------------------------------------------------------
def render_step_card(
df: pd.DataFrame, step: dict, idx: int, total: int,
) -> Optional[str]:
"""Render one pipeline step as a module card.
Mutates ``step`` in place (``enabled`` toggle, ``options`` from the Configure
panel). Returns an action string (``"up"`` / ``"down"`` / ``"remove"``) when
the user clicks a reorder/remove control, else ``None`` — the caller applies
the action to the step list and reruns.
"""
sid = step["id"]
kp = f"{step['tool']}_{sid}"
action: Optional[str] = None
with st.container(border=True):
head, toggle, up, down, rm = st.columns([0.66, 0.12, 0.07, 0.07, 0.08])
with head:
st.markdown(f"**{idx + 1}. {step_label(step['tool'])}**")
st.caption(step_caption(step["tool"]))
with toggle:
step["enabled"] = st.toggle(
"On", value=step.get("enabled", True), key=f"{kp}_enabled",
help="Disabled steps are kept in the pipeline but skipped at run time.",
)
with up:
if st.button("", key=f"{kp}_up", disabled=idx == 0,
help="Move up", width="stretch"):
action = "up"
with down:
if st.button("", key=f"{kp}_down", disabled=idx == total - 1,
help="Move down", width="stretch"):
action = "down"
with rm:
if st.button("", key=f"{kp}_rm", help="Remove step", width="stretch"):
action = "remove"
renderer = CONFIG_RENDERERS.get(step["tool"])
with st.expander(f"Configure: {step_label(step['tool'])}"):
if renderer is None:
st.caption("This step has no options.")
else:
step["options"] = renderer(df, step.get("options", {}) or {}, kp)
return action

View File

@@ -32,6 +32,7 @@ from src.core.pipeline import (
run_pipeline,
validate_pipeline,
)
from src.gui.components.pipeline_modules import render_step_card, step_label
from src.license import FeatureFlag
hide_streamlit_chrome()
@@ -104,135 +105,186 @@ st.divider()
# ---------------------------------------------------------------------------
# Pipeline builder
# Pipeline builder — visual module cards
# ---------------------------------------------------------------------------
#
# Wrapped in an outer expander whose default state mirrors the preview
# expander above: open before a result exists, folded once the user has
# clicked Run Pipeline. The pipeline editor is this page's "Options"
# section — structurally analogous to Text Cleaner's options block.
# Each step is a "module" card (src/gui/components/pipeline_modules.py) with a
# plain-language Configure panel — no raw JSON. Steps live in session state as
# an ordered list of dicts, each carrying a STABLE integer id so widget keys
# survive reorder/remove. Raw JSON is import/export only, under Advanced.
#
# NB: the builder is NOT wrapped in an outer expander — per-step Configure
# panels are expanders, and Streamlit forbids nesting expanders.
with st.expander("Options", expanded=not _has_result):
mode = st.radio(
"How would you like to define the pipeline?",
[
"Use the recommended default (text-clean → format → missing → dedup)",
"Build interactively",
"Import a saved pipeline JSON",
],
index=0,
def _seed_steps_from(pipeline) -> None:
"""Replace the session step list from a Pipeline, assigning fresh ids."""
seq = st.session_state.get("pipeline_step_seq", 0)
steps: list[dict] = []
for s in pipeline.steps:
steps.append({
"id": seq, "tool": s.tool,
"enabled": s.enabled, "options": dict(s.options),
})
seq += 1
st.session_state["pipeline_steps"] = steps
st.session_state["pipeline_step_seq"] = seq
if "pipeline_steps" not in st.session_state:
_seed_steps_from(recommended_pipeline())
st.subheader("Build your pipeline")
mode = st.radio(
"How would you like to define the pipeline?",
[
"Use the recommended default (Clean Text → Standardize → Fix Missing → Find Duplicates)",
"Build interactively",
"Import a saved pipeline JSON",
],
index=0,
key="pipeline_mode",
)
if mode.startswith("Use the recommended"):
# Only reseed on an explicit click that lands here while the steps already
# diverge — otherwise every rerun would wipe edits. We detect "user just
# selected this mode" by comparing against the recommended default and
# offering a one-click restore rather than silently discarding.
rec_dict = recommended_pipeline().to_dict()
cur_dict = {
"steps": [
{"tool": s["tool"], "options": s["options"],
"enabled": s["enabled"], "name": None}
for s in st.session_state["pipeline_steps"]
]
}
if cur_dict != rec_dict:
st.info(
"You've edited the recommended steps, so they're now yours to "
"change — you're effectively in **Build interactively** mode. "
"Restore the suggested steps to discard your edits."
)
if st.button("↺ Restore recommended steps"):
_seed_steps_from(recommended_pipeline())
st.rerun()
elif mode.startswith("Import"):
pipeline_file = st.file_uploader(
"Pipeline JSON", type=["json"], key="pipeline_upload",
)
if pipeline_file is not None:
try:
data = json.loads(pipeline_file.getvalue())
_seed_steps_from(Pipeline.from_dict(data))
st.success(
f"Loaded {len(st.session_state['pipeline_steps'])} step(s). "
"Switch to **Build interactively** to tweak them."
)
except Exception as e:
from src.core.errors import format_for_user
st.error(f"**Could not parse pipeline**\n\n```\n{format_for_user(e)}\n```")
st.caption(
"Each step is a module: toggle it on/off, reorder with ▲ ▼, remove with ✕, "
"and open **Configure** to set its options in plain language. Tool order is "
"recommended, not enforced — violations surface as warnings below."
)
# Render the module stack. A reorder/remove action mutates the list and reruns.
steps = st.session_state["pipeline_steps"]
total = len(steps)
pending_action: tuple[str, int] | None = None
for i, step in enumerate(steps):
act = render_step_card(df, step, i, total)
if act is not None:
pending_action = (act, i)
if pending_action is not None:
act, i = pending_action
if act == "remove":
steps.pop(i)
elif act == "up" and i > 0:
steps[i - 1], steps[i] = steps[i], steps[i - 1]
elif act == "down" and i < total - 1:
steps[i + 1], steps[i] = steps[i], steps[i + 1]
st.session_state["pipeline_steps"] = steps
st.rerun()
# Add-step control.
add_col, btn_col = st.columns([0.7, 0.3])
with add_col:
add_tool = st.selectbox(
"Add a step",
TOOL_NAMES,
format_func=step_label,
key="pipeline_add_tool",
label_visibility="collapsed",
)
with btn_col:
if st.button(" Add step", width="stretch"):
seq = st.session_state.get("pipeline_step_seq", 0)
steps.append({"id": seq, "tool": add_tool, "enabled": True, "options": {}})
st.session_state["pipeline_step_seq"] = seq + 1
st.rerun()
# Build a Pipeline object from the step list.
steps_list: list[Step] = []
parse_errors: list[str] = []
for i, step in enumerate(steps):
try:
steps_list.append(Step(
tool=str(step["tool"]),
options=dict(step.get("options") or {}),
enabled=bool(step.get("enabled", True)),
))
except Exception as e:
parse_errors.append(f"Step {i + 1} ({step.get('tool')}): {e}")
for err in parse_errors:
st.error(err)
current_pipeline = Pipeline(steps=steps_list) if steps_list else None
if current_pipeline is not None:
warnings = validate_pipeline(current_pipeline)
if warnings:
st.warning(
"Pipeline is out of recommended order:\n\n"
+ "\n".join(f"- {w}" for w in warnings)
+ "\n\nThe pipeline will still run — these are recommendations only."
)
with st.expander("Recommended tool order — why each step belongs where it does"):
st.markdown(
"\n".join(
f"- **{step_label(e)}** before **{step_label(l)}** — {why}"
for e, l, why in SOFT_DEPENDENCIES
)
)
if "pipeline_rows" not in st.session_state:
default = recommended_pipeline()
st.session_state["pipeline_rows"] = pd.DataFrame([
{
"tool": s.tool, "enabled": s.enabled,
"options_json": json.dumps(s.options),
}
for s in default.steps
])
if mode.startswith("Use the recommended"):
default = recommended_pipeline()
st.session_state["pipeline_rows"] = pd.DataFrame([
{
"tool": s.tool, "enabled": s.enabled,
"options_json": json.dumps(s.options),
}
for s in default.steps
])
elif mode.startswith("Import"):
pipeline_file = st.file_uploader(
"Pipeline JSON", type=["json"], key="pipeline_upload",
)
if pipeline_file is not None:
try:
data = json.loads(pipeline_file.getvalue())
uploaded_pipe = Pipeline.from_dict(data)
st.session_state["pipeline_rows"] = pd.DataFrame([
{
"tool": s.tool, "enabled": s.enabled,
"options_json": json.dumps(s.options),
}
for s in uploaded_pipe.steps
])
st.success(f"Loaded {len(uploaded_pipe.steps)} step(s).")
except Exception as e:
from src.core.errors import format_for_user
st.error(f"**Could not parse pipeline**\n\n```\n{format_for_user(e)}\n```")
with st.expander("Advanced — import / export pipeline as JSON"):
st.caption(
"Edit the table to add, remove, reorder (drag the row index), enable, "
"or configure each step. Tool order is recommended, not enforced — "
"violations surface as warnings below the table."
"For sharing or version control. Editing is done in the step panels "
"above — this is just the saved form of the same settings. The same "
"JSON runs in the CLI via `--pipeline pipeline.json`."
)
edited = st.data_editor(
st.session_state["pipeline_rows"],
width="stretch",
num_rows="dynamic",
column_config={
"tool": st.column_config.SelectboxColumn(
"Tool", options=TOOL_NAMES, required=True,
),
"enabled": st.column_config.CheckboxColumn("Enabled"),
"options_json": st.column_config.TextColumn(
"Options (JSON)",
help='e.g. {"column_types": {"phone": "phone"}}',
),
},
key="pipeline_editor",
export_json = json.dumps(
current_pipeline.to_dict() if current_pipeline else {"steps": []},
indent=2, default=str,
)
st.session_state["pipeline_rows"] = edited
# Build a Pipeline object from the editor state.
steps_list: list[Step] = []
parse_errors: list[str] = []
for i, row in edited.iterrows():
tool = row.get("tool")
if not tool or pd.isna(tool):
continue
raw_opts = row.get("options_json") or "{}"
if pd.isna(raw_opts):
raw_opts = "{}"
st.code(export_json, language="json")
adv_paste = st.text_area(
"Paste pipeline JSON to load it", key="pipeline_json_paste", height=140,
)
if st.button("Load pasted JSON", disabled=not adv_paste.strip()):
try:
opts = json.loads(raw_opts) if isinstance(raw_opts, str) else dict(raw_opts)
if not isinstance(opts, dict):
raise ValueError("options must be a JSON object")
_seed_steps_from(Pipeline.from_dict(json.loads(adv_paste)))
st.success("Loaded. Scroll up to see the steps.")
st.rerun()
except Exception as e:
parse_errors.append(f"Step {i + 1}: {e}")
continue
try:
steps_list.append(Step(
tool=str(tool),
options=opts,
enabled=bool(row.get("enabled", True)),
))
except Exception as e:
parse_errors.append(f"Step {i + 1}: {e}")
if parse_errors:
for err in parse_errors:
st.error(err)
current_pipeline = Pipeline(steps=steps_list) if steps_list else None
if current_pipeline is not None:
warnings = validate_pipeline(current_pipeline)
if warnings:
st.warning(
"Pipeline is out of recommended order:\n\n"
+ "\n".join(f"- {w}" for w in warnings)
+ "\n\nThe pipeline will still run — these are recommendations only."
)
with st.expander("Recommended tool order — why each step belongs where it does"):
st.markdown(
"\n".join(
f"- **{e}** before **{l}** — {why}"
for e, l, why in SOFT_DEPENDENCIES
)
)
from src.core.errors import format_for_user
st.error(f"**Could not parse pipeline**\n\n```\n{format_for_user(e)}\n```")
st.divider()
@@ -257,14 +309,14 @@ if st.button(
def _on_step(sr) -> None:
completed[0] += 1
if sr.skipped:
log_lines.append(f"{sr.step.display_name()} (skipped)")
log_lines.append(f"{step_label(sr.step.tool)} (skipped)")
elif sr.error:
log_lines.append(
f"{sr.step.display_name()}{sr.error.splitlines()[0]}"
f"{step_label(sr.step.tool)}{sr.error.splitlines()[0]}"
)
else:
log_lines.append(
f"{sr.step.display_name()}{sr.elapsed_seconds*1000:.0f} ms"
f"{step_label(sr.step.tool)}{sr.elapsed_seconds*1000:.0f} ms"
)
log_box.markdown("\n".join(log_lines))
progress.progress(
@@ -330,11 +382,11 @@ m4.metric("Elapsed", f"{result.total_elapsed:.2f} s")
st.markdown("**Per-step summary**")
step_df = pd.DataFrame([
{
"step": sr.step.display_name(),
"step": step_label(sr.step.tool),
"status": (
"skipped" if sr.skipped
else "error" if sr.error
else "ok"
"skipped" if sr.skipped
else "error" if sr.error
else "ok"
),
"elapsed_ms": int(sr.elapsed_seconds * 1000),
"summary": json.dumps(sr.summary, default=str)[:200],

View File

@@ -0,0 +1,91 @@
"""Pipeline Runner — visual module-card builder contract (AppTest).
Pins the behaviors the JSON-table → module-card rewrite introduced:
recommended steps seed as cards with friendly names, each step exposes a
plain-language Configure panel (no raw per-row JSON), steps can be toggled /
added / removed, JSON lives only under Advanced, and a run produces results
with friendly step names. The page's bare initial-render contract across junk
files is covered separately in ``tests/test_junk_corpus_tool_pages.py``.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from streamlit.testing.v1 import AppTest
_PAGE = (
Path(__file__).resolve().parent.parent.parent
/ "src" / "gui" / "pages" / "9_Pipeline_Runner.py"
)
_CSV = (
b"name,email,phone,signup_date\n"
b" Jane Doe ,jane@acme.io,512-555-0190,2024-01-04\n"
b"jane doe,JANE@ACME.IO,(512) 555-0190,01/04/2024\n"
b"Bob Smith,bob@globex.com,720.555.7781,2024-02-11\n"
)
def _app() -> AppTest:
at = AppTest.from_file(str(_PAGE), default_timeout=30)
at.session_state["home_uploaded_bytes"] = _CSV
at.session_state["home_uploaded_name"] = "customers.csv"
at.session_state["home_uploaded_size"] = len(_CSV)
return at.run()
def test_recommended_steps_seed_as_named_cards():
at = _app()
assert not at.exception
tools = [s["tool"] for s in at.session_state["pipeline_steps"]]
assert tools == ["text_clean", "format_standardize", "missing", "dedup"]
md = " ".join(m.value for m in at.markdown)
for friendly in ("Clean Text", "Standardize Formats",
"Fix Missing Values", "Find Duplicates"):
assert friendly in md
def test_each_step_has_a_configure_panel_and_json_is_advanced_only():
at = _app()
labels = [e.label for e in at.get("expander")]
assert any(l.startswith("Configure: Clean Text") for l in labels)
assert any(l.startswith("Configure: Find Duplicates") for l in labels)
# Raw JSON is import/export only — never a per-step editing surface.
assert any("Advanced — import / export" in l for l in labels)
def test_toggle_disables_step_and_persists():
at = _app()
at.toggle[0].set_value(False).run()
assert at.session_state["pipeline_steps"][0]["enabled"] is False
def test_add_step_appends_a_working_config_panel():
at = _app()
[s for s in at.selectbox if s.key == "pipeline_add_tool"][0].set_value("column_map").run()
[b for b in at.button if "Add step" in b.label][0].click().run()
assert not at.exception
assert at.session_state["pipeline_steps"][-1]["tool"] == "column_map"
labels = [e.label for e in at.get("expander")]
assert any(l.startswith("Configure: Map Columns") for l in labels)
def test_remove_step_drops_it():
at = _app()
before = len(at.session_state["pipeline_steps"])
# The first ✕ remove button in the card stack.
[b for b in at.button if b.label == ""][0].click().run()
assert not at.exception
assert len(at.session_state["pipeline_steps"]) == before - 1
def test_run_produces_results_with_friendly_names():
at = _app()
[b for b in at.button if b.label == "Run Pipeline"][0].click().run()
assert not at.exception, at.exception
assert "pipeline_result" in at.session_state
res = at.session_state["pipeline_result"]
assert res.initial_rows == 3 and res.final_rows == 2 # the two Jane rows merge
assert all(sr.error is None for sr in res.step_results)