diff --git a/src/gui/components/pipeline_modules.py b/src/gui/components/pipeline_modules.py new file mode 100644 index 0000000..f9c116c --- /dev/null +++ b/src/gui/components/pipeline_modules.py @@ -0,0 +1,376 @@ +"""Visual pipeline builder — per-step "module" cards + plain-language config panels. + +The Automated Workflows page (``9_Pipeline_Runner.py``) used to configure each +step through a raw ``options_json`` text column. This module replaces that with +one **module card** per step: a friendly name + caption, an enable toggle, +reorder/remove controls, and a **Configure** expander that renders that tool's +own controls in plain language (no JSON). Raw JSON survives only as the page's +Advanced import/export surface. + +Each config renderer takes the step's current ``options`` dict, renders the +curated controls from the design mockup (``layout-review/09_pipeline_runner.html``), +and returns an updated **JSON-serialisable** options dict — the same shape the +``TOOL_ADAPTERS`` in ``src/core/pipeline.py`` consume via ``Options.from_dict``. + +Two hard Streamlit constraints shaped this: + * No nested expanders — the per-step Configure expander means config renderers + here must NOT open their own expander, and the page must not wrap the card + stack in an outer expander. + * Widget identity must be stable across reorder/remove — every widget key is + derived from a step's stable ``id``, never its list position. +""" + +from __future__ import annotations + +from typing import Any, Callable, Optional + +import pandas as pd +import streamlit as st + +from src.gui.tools_registry import tool_name + + +# --------------------------------------------------------------------------- +# Adapter-key → registry tool_id bridge +# --------------------------------------------------------------------------- +# +# Pipeline steps are keyed by adapter name (``text_clean``); the tools registry +# and i18n packs are keyed by tool_id (``02_text_cleaner``). The registry has no +# reverse lookup, so we keep the bridge here. ``step_label`` resolves the +# localized friendly name; ``step_caption`` returns a short, plain-English "what +# this step does" line for the card body. + +PIPELINE_TOOL_META: dict[str, str] = { + "text_clean": "02_text_cleaner", + "format_standardize": "03_format_standardizer", + "missing": "04_missing_handler", + "column_map": "05_column_mapper", + "dedup": "01_deduplicator", +} + +_STEP_CAPTIONS: dict[str, str] = { + "text_clean": "Trim spaces, collapse repeats, strip invisible characters.", + "format_standardize": "Canonicalize phones, dates, currency, names per column.", + "missing": "Flag, fill, or drop blank cells (and disguised blanks).", + "column_map": "Rename source columns onto your target column names.", + "dedup": "Find duplicate rows and keep one survivor per group.", +} + + +def step_label(tool: str) -> str: + """Friendly, localized name for a pipeline adapter key (falls back to the key).""" + tool_id = PIPELINE_TOOL_META.get(tool) + return tool_name(tool_id) if tool_id else tool + + +def step_caption(tool: str) -> str: + return _STEP_CAPTIONS.get(tool, "") + + +# --------------------------------------------------------------------------- +# Per-tool config renderers +# --------------------------------------------------------------------------- +# +# Uniform signature: ``render__config(df, options, kp) -> options``. +# * ``df`` — the uploaded DataFrame (for column lists / type hints). +# * ``options`` — the step's current options dict (seed widget defaults). +# * ``kp`` — key prefix, unique per step (``f"{tool}_{id}"``). +# Returns a JSON-serialisable options dict. Renderers must not open expanders. + + +_CASE_LABELS: list[tuple[str, Optional[str]]] = [ + ("Leave as-is", None), + ("UPPERCASE", "upper"), + ("lowercase", "lower"), + ("Title Case", "title"), + ("Sentence case", "sentence"), +] + + +def render_text_clean_config(df: pd.DataFrame, options: dict, kp: str) -> dict: + trim = st.checkbox( + "Trim leading & trailing whitespace", + value=bool(options.get("trim", True)), key=f"{kp}_trim", + ) + collapse = st.checkbox( + "Collapse repeated spaces to one", + value=bool(options.get("collapse_whitespace", True)), key=f"{kp}_collapse", + ) + fold = st.checkbox( + "Normalize smart quotes & dashes to plain ASCII", + value=bool(options.get("fold_smart_chars", True)), key=f"{kp}_fold", + ) + strip_zw = st.checkbox( + "Strip zero-width / invisible characters", + value=bool(options.get("strip_zero_width", True)), key=f"{kp}_zw", + ) + + cur_case = options.get("case") + case_idx = next((i for i, (_, v) in enumerate(_CASE_LABELS) if v == cur_case), 0) + case_choice = st.selectbox( + "Letter case", + [lbl for lbl, _ in _CASE_LABELS], + index=case_idx, key=f"{kp}_case", + ) + case_val = next(v for lbl, v in _CASE_LABELS if lbl == case_choice) + + out: dict[str, Any] = { + "trim": trim, + "collapse_whitespace": collapse, + "fold_smart_chars": fold, + "strip_zero_width": strip_zw, + } + if case_val is not None: + out["case"] = case_val + return out + + +_FORMAT_LABELS: list[tuple[str, Optional[str]]] = [ + ("Leave as-is", None), + ("Date", "date"), + ("Phone number", "phone"), + ("Currency", "currency"), + ("Name", "name"), + ("Address", "address"), + ("Email", "email"), + ("Boolean (yes/no)", "boolean"), +] + + +def render_format_standardize_config(df: pd.DataFrame, options: dict, kp: str) -> dict: + st.caption( + "Pick a target format for each column. Columns left as “Leave as-is” " + "are untouched." + ) + current = dict(options.get("column_types", {})) + labels = [lbl for lbl, _ in _FORMAT_LABELS] + column_types: dict[str, str] = {} + for col in df.columns: + cur_val = current.get(col) + idx = next((i for i, (_, v) in enumerate(_FORMAT_LABELS) if v == cur_val), 0) + choice = st.selectbox( + str(col), labels, index=idx, key=f"{kp}_fmt__{col}", + ) + val = next(v for lbl, v in _FORMAT_LABELS if lbl == choice) + if val is not None: + column_types[str(col)] = val + return {"column_types": column_types} + + +# Plain-language blank-handling choices → core strategy values. "fill" is a UI +# token expanded to numeric median + categorical mode (MissingOptions handles +# the per-dtype split via ``categorical_strategy``). +_MISSING_CHOICES: list[tuple[str, str]] = [ + ("Flag them (mark blanks, change nothing)", "flag"), + ("Fill them in (numbers → median, text → most common)", "fill"), + ("Drop rows that have any blank", "drop"), +] + + +def _missing_mode_from_strategy(strategy: Optional[str]) -> str: + if strategy in ("drop_row", "drop_col", "drop_both"): + return "drop" + if strategy in ("mean", "median", "mode", "constant", "ffill", "bfill", "interpolate"): + return "fill" + return "flag" + + +def render_missing_config(df: pd.DataFrame, options: dict, kp: str) -> dict: + from src.core.missing import DEFAULT_SENTINELS + + cur_mode = _missing_mode_from_strategy(options.get("strategy")) + mode_idx = next((i for i, (_, v) in enumerate(_MISSING_CHOICES) if v == cur_mode), 0) + mode_choice = st.radio( + "What should happen to blank cells?", + [lbl for lbl, _ in _MISSING_CHOICES], + index=mode_idx, key=f"{kp}_strategy", + ) + mode = next(v for lbl, v in _MISSING_CHOICES if lbl == mode_choice) + + seed_sentinels = options.get("sentinels") or list(DEFAULT_SENTINELS) + sent_text = st.text_input( + "Treat these as blank (comma-separated)", + value=", ".join(seed_sentinels), key=f"{kp}_sentinels", + help="Matched case-insensitively after stripping whitespace.", + ) + sentinels = [s.strip() for s in sent_text.split(",") if s.strip()] + + out: dict[str, Any] = { + "standardize_sentinels": True, + "sentinels": sentinels, + } + if mode == "flag": + out["strategy"] = "none" + elif mode == "fill": + out["strategy"] = "median" + out["categorical_strategy"] = "mode" + else: # drop + out["strategy"] = "drop_row" + return out + + +_UNMAPPED_CHOICES = ["keep", "drop", "error"] + + +def render_column_map_config(df: pd.DataFrame, options: dict, kp: str) -> dict: + st.caption( + "Type the target name each source column should become. Leave a target " + "blank to keep that column's name unchanged." + ) + current = dict(options.get("mapping", {})) + table = pd.DataFrame( + { + "source": [str(c) for c in df.columns], + "target": [current.get(str(c), "") for c in df.columns], + } + ) + edited = st.data_editor( + table, + width="stretch", + hide_index=True, + disabled=["source"], + column_config={ + "source": st.column_config.TextColumn("Source column"), + "target": st.column_config.TextColumn("Rename to"), + }, + key=f"{kp}_mapping", + ) + mapping = { + str(r["source"]): str(r["target"]).strip() + for _, r in edited.iterrows() + if str(r.get("target") or "").strip() + } + + c1, c2 = st.columns(2) + with c1: + unmapped = st.selectbox( + "Columns with no rename", + _UNMAPPED_CHOICES, + index=_UNMAPPED_CHOICES.index(options.get("unmapped", "keep")) + if options.get("unmapped") in _UNMAPPED_CHOICES else 0, + key=f"{kp}_unmapped", + help="keep: leave them in place · drop: remove them · error: stop the run.", + ) + with c2: + coerce = st.checkbox( + "Coerce values to target types", + value=bool(options.get("coerce_types", False)), key=f"{kp}_coerce", + ) + return {"mapping": mapping, "unmapped": unmapped, "coerce_types": coerce} + + +_SURVIVOR_LABELS: list[tuple[str, str]] = [ + ("Keep the most complete row", "most_complete"), + ("Keep the first seen", "first"), + ("Keep the last seen", "last"), + ("Keep the most recent (by date)", "most_recent"), +] + + +def render_dedup_config(df: pd.DataFrame, options: dict, kp: str) -> dict: + cur_rule = options.get("survivor_rule", "first") + rule_idx = next((i for i, (_, v) in enumerate(_SURVIVOR_LABELS) if v == cur_rule), 0) + rule_choice = st.selectbox( + "When rows match, which one survives?", + [lbl for lbl, _ in _SURVIVOR_LABELS], + index=rule_idx, key=f"{kp}_survivor", + ) + survivor_rule = next(v for lbl, v in _SURVIVOR_LABELS if lbl == rule_choice) + + merge = st.checkbox( + "Merge matched rows (fill each survivor's blanks from its duplicates)", + value=bool(options.get("merge", False)), key=f"{kp}_merge", + ) + + # Recover the previously-selected match columns from the stored strategies + # (a single exact-match strategy over the chosen columns). + prev_cols: list[str] = [] + for strat in options.get("strategies", []) or []: + for c in strat.get("columns", []): + if c.get("column"): + prev_cols.append(c["column"]) + all_cols = [str(c) for c in df.columns] + match_cols = st.multiselect( + "Match on these columns", + all_cols, + default=[c for c in prev_cols if c in all_cols], + key=f"{kp}_matchcols", + help="Rows are duplicates when these columns all match. Leave empty to auto-detect.", + ) + + out: dict[str, Any] = {"survivor_rule": survivor_rule, "merge": merge} + if match_cols: + out["strategies"] = [ + {"columns": [ + {"column": c, "algorithm": "exact", "threshold": 100} + for c in match_cols + ]} + ] + if survivor_rule == "most_recent": + date_default = options.get("date_column") + date_idx = all_cols.index(date_default) if date_default in all_cols else 0 + out["date_column"] = st.selectbox( + "Date column (for most-recent)", + all_cols, index=date_idx, key=f"{kp}_datecol", + ) if all_cols else None + return out + + +CONFIG_RENDERERS: dict[str, Callable[[pd.DataFrame, dict, str], dict]] = { + "text_clean": render_text_clean_config, + "format_standardize": render_format_standardize_config, + "missing": render_missing_config, + "column_map": render_column_map_config, + "dedup": render_dedup_config, +} + + +# --------------------------------------------------------------------------- +# Module card +# --------------------------------------------------------------------------- + + +def render_step_card( + df: pd.DataFrame, step: dict, idx: int, total: int, +) -> Optional[str]: + """Render one pipeline step as a module card. + + Mutates ``step`` in place (``enabled`` toggle, ``options`` from the Configure + panel). Returns an action string (``"up"`` / ``"down"`` / ``"remove"``) when + the user clicks a reorder/remove control, else ``None`` — the caller applies + the action to the step list and reruns. + """ + sid = step["id"] + kp = f"{step['tool']}_{sid}" + action: Optional[str] = None + + with st.container(border=True): + head, toggle, up, down, rm = st.columns([0.66, 0.12, 0.07, 0.07, 0.08]) + with head: + st.markdown(f"**{idx + 1}. {step_label(step['tool'])}**") + st.caption(step_caption(step["tool"])) + with toggle: + step["enabled"] = st.toggle( + "On", value=step.get("enabled", True), key=f"{kp}_enabled", + help="Disabled steps are kept in the pipeline but skipped at run time.", + ) + with up: + if st.button("▲", key=f"{kp}_up", disabled=idx == 0, + help="Move up", width="stretch"): + action = "up" + with down: + if st.button("▼", key=f"{kp}_down", disabled=idx == total - 1, + help="Move down", width="stretch"): + action = "down" + with rm: + if st.button("✕", key=f"{kp}_rm", help="Remove step", width="stretch"): + action = "remove" + + renderer = CONFIG_RENDERERS.get(step["tool"]) + with st.expander(f"Configure: {step_label(step['tool'])}"): + if renderer is None: + st.caption("This step has no options.") + else: + step["options"] = renderer(df, step.get("options", {}) or {}, kp) + + return action diff --git a/src/gui/pages/9_Pipeline_Runner.py b/src/gui/pages/9_Pipeline_Runner.py index 7eb400c..0652213 100644 --- a/src/gui/pages/9_Pipeline_Runner.py +++ b/src/gui/pages/9_Pipeline_Runner.py @@ -32,6 +32,7 @@ from src.core.pipeline import ( run_pipeline, validate_pipeline, ) +from src.gui.components.pipeline_modules import render_step_card, step_label from src.license import FeatureFlag hide_streamlit_chrome() @@ -104,135 +105,186 @@ st.divider() # --------------------------------------------------------------------------- -# Pipeline builder +# Pipeline builder — visual module cards # --------------------------------------------------------------------------- # -# Wrapped in an outer expander whose default state mirrors the preview -# expander above: open before a result exists, folded once the user has -# clicked Run Pipeline. The pipeline editor is this page's "Options" -# section — structurally analogous to Text Cleaner's options block. +# Each step is a "module" card (src/gui/components/pipeline_modules.py) with a +# plain-language Configure panel — no raw JSON. Steps live in session state as +# an ordered list of dicts, each carrying a STABLE integer id so widget keys +# survive reorder/remove. Raw JSON is import/export only, under Advanced. +# +# NB: the builder is NOT wrapped in an outer expander — per-step Configure +# panels are expanders, and Streamlit forbids nesting expanders. -with st.expander("Options", expanded=not _has_result): - mode = st.radio( - "How would you like to define the pipeline?", - [ - "Use the recommended default (text-clean → format → missing → dedup)", - "Build interactively", - "Import a saved pipeline JSON", - ], - index=0, + +def _seed_steps_from(pipeline) -> None: + """Replace the session step list from a Pipeline, assigning fresh ids.""" + seq = st.session_state.get("pipeline_step_seq", 0) + steps: list[dict] = [] + for s in pipeline.steps: + steps.append({ + "id": seq, "tool": s.tool, + "enabled": s.enabled, "options": dict(s.options), + }) + seq += 1 + st.session_state["pipeline_steps"] = steps + st.session_state["pipeline_step_seq"] = seq + + +if "pipeline_steps" not in st.session_state: + _seed_steps_from(recommended_pipeline()) + +st.subheader("Build your pipeline") + +mode = st.radio( + "How would you like to define the pipeline?", + [ + "Use the recommended default (Clean Text → Standardize → Fix Missing → Find Duplicates)", + "Build interactively", + "Import a saved pipeline JSON", + ], + index=0, + key="pipeline_mode", +) + +if mode.startswith("Use the recommended"): + # Only reseed on an explicit click that lands here while the steps already + # diverge — otherwise every rerun would wipe edits. We detect "user just + # selected this mode" by comparing against the recommended default and + # offering a one-click restore rather than silently discarding. + rec_dict = recommended_pipeline().to_dict() + cur_dict = { + "steps": [ + {"tool": s["tool"], "options": s["options"], + "enabled": s["enabled"], "name": None} + for s in st.session_state["pipeline_steps"] + ] + } + if cur_dict != rec_dict: + st.info( + "You've edited the recommended steps, so they're now yours to " + "change — you're effectively in **Build interactively** mode. " + "Restore the suggested steps to discard your edits." + ) + if st.button("↺ Restore recommended steps"): + _seed_steps_from(recommended_pipeline()) + st.rerun() +elif mode.startswith("Import"): + pipeline_file = st.file_uploader( + "Pipeline JSON", type=["json"], key="pipeline_upload", + ) + if pipeline_file is not None: + try: + data = json.loads(pipeline_file.getvalue()) + _seed_steps_from(Pipeline.from_dict(data)) + st.success( + f"Loaded {len(st.session_state['pipeline_steps'])} step(s). " + "Switch to **Build interactively** to tweak them." + ) + except Exception as e: + from src.core.errors import format_for_user + st.error(f"**Could not parse pipeline**\n\n```\n{format_for_user(e)}\n```") + +st.caption( + "Each step is a module: toggle it on/off, reorder with ▲ ▼, remove with ✕, " + "and open **Configure** to set its options in plain language. Tool order is " + "recommended, not enforced — violations surface as warnings below." +) + +# Render the module stack. A reorder/remove action mutates the list and reruns. +steps = st.session_state["pipeline_steps"] +total = len(steps) +pending_action: tuple[str, int] | None = None +for i, step in enumerate(steps): + act = render_step_card(df, step, i, total) + if act is not None: + pending_action = (act, i) + +if pending_action is not None: + act, i = pending_action + if act == "remove": + steps.pop(i) + elif act == "up" and i > 0: + steps[i - 1], steps[i] = steps[i], steps[i - 1] + elif act == "down" and i < total - 1: + steps[i + 1], steps[i] = steps[i], steps[i + 1] + st.session_state["pipeline_steps"] = steps + st.rerun() + +# Add-step control. +add_col, btn_col = st.columns([0.7, 0.3]) +with add_col: + add_tool = st.selectbox( + "Add a step", + TOOL_NAMES, + format_func=step_label, + key="pipeline_add_tool", + label_visibility="collapsed", + ) +with btn_col: + if st.button("➕ Add step", width="stretch"): + seq = st.session_state.get("pipeline_step_seq", 0) + steps.append({"id": seq, "tool": add_tool, "enabled": True, "options": {}}) + st.session_state["pipeline_step_seq"] = seq + 1 + st.rerun() + +# Build a Pipeline object from the step list. +steps_list: list[Step] = [] +parse_errors: list[str] = [] +for i, step in enumerate(steps): + try: + steps_list.append(Step( + tool=str(step["tool"]), + options=dict(step.get("options") or {}), + enabled=bool(step.get("enabled", True)), + )) + except Exception as e: + parse_errors.append(f"Step {i + 1} ({step.get('tool')}): {e}") + +for err in parse_errors: + st.error(err) + +current_pipeline = Pipeline(steps=steps_list) if steps_list else None + +if current_pipeline is not None: + warnings = validate_pipeline(current_pipeline) + if warnings: + st.warning( + "Pipeline is out of recommended order:\n\n" + + "\n".join(f"- {w}" for w in warnings) + + "\n\nThe pipeline will still run — these are recommendations only." + ) + +with st.expander("Recommended tool order — why each step belongs where it does"): + st.markdown( + "\n".join( + f"- **{step_label(e)}** before **{step_label(l)}** — {why}" + for e, l, why in SOFT_DEPENDENCIES + ) ) - if "pipeline_rows" not in st.session_state: - default = recommended_pipeline() - st.session_state["pipeline_rows"] = pd.DataFrame([ - { - "tool": s.tool, "enabled": s.enabled, - "options_json": json.dumps(s.options), - } - for s in default.steps - ]) - - if mode.startswith("Use the recommended"): - default = recommended_pipeline() - st.session_state["pipeline_rows"] = pd.DataFrame([ - { - "tool": s.tool, "enabled": s.enabled, - "options_json": json.dumps(s.options), - } - for s in default.steps - ]) - elif mode.startswith("Import"): - pipeline_file = st.file_uploader( - "Pipeline JSON", type=["json"], key="pipeline_upload", - ) - if pipeline_file is not None: - try: - data = json.loads(pipeline_file.getvalue()) - uploaded_pipe = Pipeline.from_dict(data) - st.session_state["pipeline_rows"] = pd.DataFrame([ - { - "tool": s.tool, "enabled": s.enabled, - "options_json": json.dumps(s.options), - } - for s in uploaded_pipe.steps - ]) - st.success(f"Loaded {len(uploaded_pipe.steps)} step(s).") - except Exception as e: - from src.core.errors import format_for_user - st.error(f"**Could not parse pipeline**\n\n```\n{format_for_user(e)}\n```") - +with st.expander("Advanced — import / export pipeline as JSON"): st.caption( - "Edit the table to add, remove, reorder (drag the row index), enable, " - "or configure each step. Tool order is recommended, not enforced — " - "violations surface as warnings below the table." + "For sharing or version control. Editing is done in the step panels " + "above — this is just the saved form of the same settings. The same " + "JSON runs in the CLI via `--pipeline pipeline.json`." ) - edited = st.data_editor( - st.session_state["pipeline_rows"], - width="stretch", - num_rows="dynamic", - column_config={ - "tool": st.column_config.SelectboxColumn( - "Tool", options=TOOL_NAMES, required=True, - ), - "enabled": st.column_config.CheckboxColumn("Enabled"), - "options_json": st.column_config.TextColumn( - "Options (JSON)", - help='e.g. {"column_types": {"phone": "phone"}}', - ), - }, - key="pipeline_editor", + export_json = json.dumps( + current_pipeline.to_dict() if current_pipeline else {"steps": []}, + indent=2, default=str, ) - st.session_state["pipeline_rows"] = edited - - # Build a Pipeline object from the editor state. - steps_list: list[Step] = [] - parse_errors: list[str] = [] - for i, row in edited.iterrows(): - tool = row.get("tool") - if not tool or pd.isna(tool): - continue - raw_opts = row.get("options_json") or "{}" - if pd.isna(raw_opts): - raw_opts = "{}" + st.code(export_json, language="json") + adv_paste = st.text_area( + "Paste pipeline JSON to load it", key="pipeline_json_paste", height=140, + ) + if st.button("Load pasted JSON", disabled=not adv_paste.strip()): try: - opts = json.loads(raw_opts) if isinstance(raw_opts, str) else dict(raw_opts) - if not isinstance(opts, dict): - raise ValueError("options must be a JSON object") + _seed_steps_from(Pipeline.from_dict(json.loads(adv_paste))) + st.success("Loaded. Scroll up to see the steps.") + st.rerun() except Exception as e: - parse_errors.append(f"Step {i + 1}: {e}") - continue - try: - steps_list.append(Step( - tool=str(tool), - options=opts, - enabled=bool(row.get("enabled", True)), - )) - except Exception as e: - parse_errors.append(f"Step {i + 1}: {e}") - - if parse_errors: - for err in parse_errors: - st.error(err) - - current_pipeline = Pipeline(steps=steps_list) if steps_list else None - - if current_pipeline is not None: - warnings = validate_pipeline(current_pipeline) - if warnings: - st.warning( - "Pipeline is out of recommended order:\n\n" - + "\n".join(f"- {w}" for w in warnings) - + "\n\nThe pipeline will still run — these are recommendations only." - ) - - with st.expander("Recommended tool order — why each step belongs where it does"): - st.markdown( - "\n".join( - f"- **{e}** before **{l}** — {why}" - for e, l, why in SOFT_DEPENDENCIES - ) - ) + from src.core.errors import format_for_user + st.error(f"**Could not parse pipeline**\n\n```\n{format_for_user(e)}\n```") st.divider() @@ -257,14 +309,14 @@ if st.button( def _on_step(sr) -> None: completed[0] += 1 if sr.skipped: - log_lines.append(f"○ {sr.step.display_name()} (skipped)") + log_lines.append(f"○ {step_label(sr.step.tool)} (skipped)") elif sr.error: log_lines.append( - f"✗ {sr.step.display_name()} — {sr.error.splitlines()[0]}" + f"✗ {step_label(sr.step.tool)} — {sr.error.splitlines()[0]}" ) else: log_lines.append( - f"✓ {sr.step.display_name()} — {sr.elapsed_seconds*1000:.0f} ms" + f"✓ {step_label(sr.step.tool)} — {sr.elapsed_seconds*1000:.0f} ms" ) log_box.markdown("\n".join(log_lines)) progress.progress( @@ -330,11 +382,11 @@ m4.metric("Elapsed", f"{result.total_elapsed:.2f} s") st.markdown("**Per-step summary**") step_df = pd.DataFrame([ { - "step": sr.step.display_name(), + "step": step_label(sr.step.tool), "status": ( - "skipped" if sr.skipped - else "error" if sr.error - else "ok" + "⏭ skipped" if sr.skipped + else "✗ error" if sr.error + else "✓ ok" ), "elapsed_ms": int(sr.elapsed_seconds * 1000), "summary": json.dumps(sr.summary, default=str)[:200], diff --git a/tests/gui/test_pipeline_builder.py b/tests/gui/test_pipeline_builder.py new file mode 100644 index 0000000..d671933 --- /dev/null +++ b/tests/gui/test_pipeline_builder.py @@ -0,0 +1,91 @@ +"""Pipeline Runner — visual module-card builder contract (AppTest). + +Pins the behaviors the JSON-table → module-card rewrite introduced: +recommended steps seed as cards with friendly names, each step exposes a +plain-language Configure panel (no raw per-row JSON), steps can be toggled / +added / removed, JSON lives only under Advanced, and a run produces results +with friendly step names. The page's bare initial-render contract across junk +files is covered separately in ``tests/test_junk_corpus_tool_pages.py``. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest +from streamlit.testing.v1 import AppTest + +_PAGE = ( + Path(__file__).resolve().parent.parent.parent + / "src" / "gui" / "pages" / "9_Pipeline_Runner.py" +) + +_CSV = ( + b"name,email,phone,signup_date\n" + b" Jane Doe ,jane@acme.io,512-555-0190,2024-01-04\n" + b"jane doe,JANE@ACME.IO,(512) 555-0190,01/04/2024\n" + b"Bob Smith,bob@globex.com,720.555.7781,2024-02-11\n" +) + + +def _app() -> AppTest: + at = AppTest.from_file(str(_PAGE), default_timeout=30) + at.session_state["home_uploaded_bytes"] = _CSV + at.session_state["home_uploaded_name"] = "customers.csv" + at.session_state["home_uploaded_size"] = len(_CSV) + return at.run() + + +def test_recommended_steps_seed_as_named_cards(): + at = _app() + assert not at.exception + tools = [s["tool"] for s in at.session_state["pipeline_steps"]] + assert tools == ["text_clean", "format_standardize", "missing", "dedup"] + md = " ".join(m.value for m in at.markdown) + for friendly in ("Clean Text", "Standardize Formats", + "Fix Missing Values", "Find Duplicates"): + assert friendly in md + + +def test_each_step_has_a_configure_panel_and_json_is_advanced_only(): + at = _app() + labels = [e.label for e in at.get("expander")] + assert any(l.startswith("Configure: Clean Text") for l in labels) + assert any(l.startswith("Configure: Find Duplicates") for l in labels) + # Raw JSON is import/export only — never a per-step editing surface. + assert any("Advanced — import / export" in l for l in labels) + + +def test_toggle_disables_step_and_persists(): + at = _app() + at.toggle[0].set_value(False).run() + assert at.session_state["pipeline_steps"][0]["enabled"] is False + + +def test_add_step_appends_a_working_config_panel(): + at = _app() + [s for s in at.selectbox if s.key == "pipeline_add_tool"][0].set_value("column_map").run() + [b for b in at.button if "Add step" in b.label][0].click().run() + assert not at.exception + assert at.session_state["pipeline_steps"][-1]["tool"] == "column_map" + labels = [e.label for e in at.get("expander")] + assert any(l.startswith("Configure: Map Columns") for l in labels) + + +def test_remove_step_drops_it(): + at = _app() + before = len(at.session_state["pipeline_steps"]) + # The first ✕ remove button in the card stack. + [b for b in at.button if b.label == "✕"][0].click().run() + assert not at.exception + assert len(at.session_state["pipeline_steps"]) == before - 1 + + +def test_run_produces_results_with_friendly_names(): + at = _app() + [b for b in at.button if b.label == "Run Pipeline"][0].click().run() + assert not at.exception, at.exception + assert "pipeline_result" in at.session_state + res = at.session_state["pipeline_result"] + assert res.initial_rows == 3 and res.final_rows == 2 # the two Jane rows merge + assert all(sr.error is None for sr in res.step_results)