Files
datatools-dev/src/gui/pages/9_Pipeline_Runner.py
Michael 143c775cdf fix(footer,nav): left-justify buttons, drop per-page caption bar, hide sidebar Close
Three small follow-ups to the sticky-footer rework:

- Left-justify the footer buttons (and reposition the Help popover
  to anchor at the left edge so it lines up with its trigger).
- Remove the per-page ``st.divider() + st.caption("Runs locally…")``
  trailing block from all 9 tool pages. The new sticky footer
  covers that text, so it was rendering as an empty white bar at
  the bottom of each tool page.
- Hide the Close entry from the sidebar nav via CSS. The page stays
  registered with st.navigation so /close is still routable for the
  sticky-footer Close button — only the sidebar link + its section
  header are hidden (via :has() on stNavSection).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 15:04:12 +00:00

448 lines
15 KiB
Python

"""DataTools Automated Workflows — Streamlit page."""
from __future__ import annotations
import io
import json
import sys
from pathlib import Path
import pandas as pd
import streamlit as st
_project_root = Path(__file__).resolve().parent.parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from src.gui.components import (
back_to_home_link,
render_sticky_footer,
hide_streamlit_chrome,
html_download_button,
pickup_or_upload,
require_feature_or_render_upgrade,
)
from src.i18n import t
from src.core.pipeline import (
Pipeline,
SOFT_DEPENDENCIES,
Step,
TOOL_NAMES,
recommended_pipeline,
run_pipeline,
validate_pipeline,
)
from src.license import FeatureFlag
hide_streamlit_chrome()
render_sticky_footer()
back_to_home_link()
from src.audit import log_page_open
log_page_open("9_Pipeline_Runner")
require_feature_or_render_upgrade(FeatureFlag.PIPELINE_RUNNER)
# ---------------------------------------------------------------------------
# Header
# ---------------------------------------------------------------------------
st.title(t("tools.09_pipeline_runner.page_title"))
st.caption(t("tools.09_pipeline_runner.page_caption"))
# ---------------------------------------------------------------------------
# File upload
# ---------------------------------------------------------------------------
uploaded = pickup_or_upload(
label="Upload CSV or Excel file",
key="pipeline_file_upload",
types=["csv", "tsv", "xlsx", "xls"],
)
if uploaded is None:
st.info("Upload a CSV, TSV, or Excel file to begin.")
st.stop()
@st.cache_data(show_spinner=False)
def _read_uploaded(name: str, data: bytes) -> pd.DataFrame:
suffix = Path(name).suffix.lower()
bio = io.BytesIO(data)
if suffix in (".xlsx", ".xls"):
return pd.read_excel(bio)
for enc in ("utf-8", "utf-8-sig", "latin-1"):
try:
bio.seek(0)
sep = "\t" if suffix == ".tsv" else ","
return pd.read_csv(bio, encoding=enc, sep=sep, on_bad_lines="warn")
except UnicodeDecodeError:
continue
bio.seek(0)
return pd.read_csv(bio, encoding="latin-1")
try:
df = _read_uploaded(uploaded.name, uploaded.getvalue())
except Exception as e:
from src.core.errors import format_for_user
st.error(
f"**Could not read `{uploaded.name}`**\n\n"
f"```\n{format_for_user(e)}\n```"
)
st.stop()
# Collapse the input preview and pipeline editor once the user has clicked
# Run Pipeline so the Results section below is the primary visual focus.
# The user can re-expand either expander to re-inspect or adjust.
_has_result = st.session_state.get("pipeline_result") is not None
with st.expander(f"Preview: {uploaded.name}", expanded=not _has_result):
st.caption(f"{len(df)} rows, {len(df.columns)} columns")
st.dataframe(df.head(10), use_container_width=True)
st.divider()
# ---------------------------------------------------------------------------
# Pipeline builder
# ---------------------------------------------------------------------------
#
# Wrapped in an outer expander whose default state mirrors the preview
# expander above: open before a result exists, folded once the user has
# clicked Run Pipeline. The pipeline editor is this page's "Options"
# section — structurally analogous to Text Cleaner's options block.
with st.expander("Options", expanded=not _has_result):
mode = st.radio(
"How would you like to define the pipeline?",
[
"Use the recommended default (text-clean → format → missing → dedup)",
"Build interactively",
"Upload a saved pipeline JSON",
],
index=0,
)
if "pipeline_rows" not in st.session_state:
default = recommended_pipeline()
st.session_state["pipeline_rows"] = pd.DataFrame([
{
"tool": s.tool, "enabled": s.enabled,
"options_json": json.dumps(s.options),
}
for s in default.steps
])
if mode.startswith("Use the recommended"):
default = recommended_pipeline()
st.session_state["pipeline_rows"] = pd.DataFrame([
{
"tool": s.tool, "enabled": s.enabled,
"options_json": json.dumps(s.options),
}
for s in default.steps
])
elif mode.startswith("Upload"):
pipeline_file = st.file_uploader(
"Pipeline JSON", type=["json"], key="pipeline_upload",
)
if pipeline_file is not None:
try:
data = json.loads(pipeline_file.getvalue())
uploaded_pipe = Pipeline.from_dict(data)
st.session_state["pipeline_rows"] = pd.DataFrame([
{
"tool": s.tool, "enabled": s.enabled,
"options_json": json.dumps(s.options),
}
for s in uploaded_pipe.steps
])
st.success(f"Loaded {len(uploaded_pipe.steps)} step(s).")
except Exception as e:
from src.core.errors import format_for_user
st.error(f"**Could not parse pipeline**\n\n```\n{format_for_user(e)}\n```")
st.caption(
"Edit the table to add, remove, reorder (drag the row index), enable, "
"or configure each step. Tool order is recommended, not enforced — "
"violations surface as warnings below the table."
)
edited = st.data_editor(
st.session_state["pipeline_rows"],
use_container_width=True,
num_rows="dynamic",
column_config={
"tool": st.column_config.SelectboxColumn(
"Tool", options=TOOL_NAMES, required=True,
),
"enabled": st.column_config.CheckboxColumn("Enabled"),
"options_json": st.column_config.TextColumn(
"Options (JSON)",
help='e.g. {"column_types": {"phone": "phone"}}',
),
},
key="pipeline_editor",
)
st.session_state["pipeline_rows"] = edited
# Build a Pipeline object from the editor state.
steps_list: list[Step] = []
parse_errors: list[str] = []
for i, row in edited.iterrows():
tool = row.get("tool")
if not tool or pd.isna(tool):
continue
raw_opts = row.get("options_json") or "{}"
if pd.isna(raw_opts):
raw_opts = "{}"
try:
opts = json.loads(raw_opts) if isinstance(raw_opts, str) else dict(raw_opts)
if not isinstance(opts, dict):
raise ValueError("options must be a JSON object")
except Exception as e:
parse_errors.append(f"Step {i + 1}: {e}")
continue
try:
steps_list.append(Step(
tool=str(tool),
options=opts,
enabled=bool(row.get("enabled", True)),
))
except Exception as e:
parse_errors.append(f"Step {i + 1}: {e}")
if parse_errors:
for err in parse_errors:
st.error(err)
current_pipeline = Pipeline(steps=steps_list) if steps_list else None
if current_pipeline is not None:
warnings = validate_pipeline(current_pipeline)
if warnings:
st.warning(
"Pipeline is out of recommended order:\n\n"
+ "\n".join(f"- {w}" for w in warnings)
+ "\n\nThe pipeline will still run — these are recommendations only."
)
with st.expander("Recommended tool order — why each step belongs where it does"):
st.markdown(
"\n".join(
f"- **{e}** before **{l}** — {why}"
for e, l, why in SOFT_DEPENDENCIES
)
)
st.divider()
# ---------------------------------------------------------------------------
# Run
# ---------------------------------------------------------------------------
run_disabled = current_pipeline is None or not current_pipeline.steps
if st.button(
"Run Pipeline",
type="primary",
use_container_width=True,
disabled=run_disabled,
):
progress = st.progress(0.0, text="Starting...")
log_box = st.empty()
log_lines: list[str] = []
total_enabled = sum(1 for s in current_pipeline.steps if s.enabled)
completed = [0]
def _on_step(sr) -> None:
completed[0] += 1
if sr.skipped:
log_lines.append(f"{sr.step.display_name()} (skipped)")
elif sr.error:
log_lines.append(
f"{sr.step.display_name()}{sr.error.splitlines()[0]}"
)
else:
log_lines.append(
f"{sr.step.display_name()}{sr.elapsed_seconds*1000:.0f} ms"
)
log_box.markdown("\n".join(log_lines))
progress.progress(
completed[0] / max(total_enabled, 1),
text=f"Step {completed[0]}/{total_enabled}",
)
try:
result = run_pipeline(
df, current_pipeline,
on_step_complete=_on_step,
stop_on_error=False,
)
except Exception as e:
from src.core.errors import format_for_user
st.error(f"**Pipeline halted**\n\n```\n{format_for_user(e)}\n```")
st.stop()
progress.progress(1.0, text="Done")
st.session_state["pipeline_result"] = result
from src.audit import log_event
log_event("tool_run", "Automated Workflows run", page="9_Pipeline_Runner")
st.session_state["pipeline_input_name"] = uploaded.name
# One-shot flag picked up on the next pass to scroll the parent
# document to the Results anchor (see scroll snippet at end of file).
st.session_state["_pipeline_scroll_to_results"] = True
# Force a second rerun so the preview and options expanders see
# the new result on the NEXT script pass and collapse themselves.
# Without this they stay expanded until the user touches any
# other widget.
st.rerun()
result = st.session_state.get("pipeline_result")
if result is None:
st.info(
"Configure the pipeline above and click **Run Pipeline** to "
"execute it on your file."
)
st.stop()
# ---------------------------------------------------------------------------
# Results
# ---------------------------------------------------------------------------
# Anchor target for the auto-scroll snippet at the end of this block.
# A bare ``<div id="...">`` survives Streamlit's HTML sanitizer (only
# ``<script>`` is stripped), and a 1px-tall div doesn't visually shift
# anything. Placed before the subheader so the scrolled-to viewport
# starts a few pixels above the section heading rather than below it.
st.markdown(
'<div id="pipeline-results-anchor" style="height:1px"></div>',
unsafe_allow_html=True,
)
st.subheader("Results")
m1, m2, m3, m4 = st.columns(4)
m1.metric("Initial rows", result.initial_rows)
m2.metric("Final rows", result.final_rows)
m3.metric("Steps run", sum(1 for s in result.step_results if not s.skipped))
m4.metric("Elapsed", f"{result.total_elapsed:.2f} s")
st.markdown("**Per-step summary**")
step_df = pd.DataFrame([
{
"step": sr.step.display_name(),
"status": (
"skipped" if sr.skipped
else "error" if sr.error
else "ok"
),
"elapsed_ms": int(sr.elapsed_seconds * 1000),
"summary": json.dumps(sr.summary, default=str)[:200],
"error": sr.error or "",
}
for sr in result.step_results
])
st.dataframe(step_df, use_container_width=True, hide_index=True)
st.markdown("**Output preview (first 10 rows)**")
st.dataframe(result.final_df.head(10), use_container_width=True)
# ---------------------------------------------------------------------------
# Downloads
# ---------------------------------------------------------------------------
#
# All three byte buffers are prepared up front (outside the columns) so
# each ``st.download_button`` sees stable ``data`` across reruns and an
# explicit ``key`` — without those, Streamlit auto-derived widget IDs
# can collide for multiple download_buttons in adjacent columns and
# only the first one actually fires on click. The pipeline-JSON button
# now renders unconditionally (disabled when no pipeline is defined)
# so the layout stays steady.
st.divider()
stem = Path(st.session_state.get("pipeline_input_name", "input")).stem
cleaned_bytes = result.final_df.to_csv(index=False).encode("utf-8-sig")
pipeline_bytes = json.dumps(
current_pipeline.to_dict() if current_pipeline else {"steps": []},
indent=2, default=str,
).encode("utf-8")
audit_bytes = json.dumps({
"warnings": result.warnings,
"initial_rows": result.initial_rows,
"final_rows": result.final_rows,
"total_elapsed_seconds": result.total_elapsed,
"steps": [
{
"tool": sr.step.tool,
"name": sr.step.display_name(),
"enabled": sr.step.enabled,
"skipped": sr.skipped,
"elapsed_seconds": sr.elapsed_seconds,
"summary": sr.summary,
"error": sr.error,
}
for sr in result.step_results
],
}, indent=2, default=str).encode("utf-8")
_pipeline_empty = current_pipeline is None or not current_pipeline.steps
dl_a, dl_b, dl_c = st.columns(3)
with dl_a:
html_download_button(
"Download cleaned CSV",
cleaned_bytes,
file_name=f"{stem}_pipeline.csv",
mime="text/csv",
)
with dl_b:
html_download_button(
"Download pipeline JSON",
pipeline_bytes,
file_name="pipeline.json",
mime="application/json",
disabled=_pipeline_empty,
help=(
"No pipeline defined."
if _pipeline_empty
else "Save this and pass --pipeline pipeline.json to the CLI to re-run on next week's file."
),
)
with dl_c:
html_download_button(
"Download run audit",
audit_bytes,
file_name=f"{stem}_pipeline_audit.json",
mime="application/json",
)
# ---------------------------------------------------------------------------
# Post-run auto-scroll
# ---------------------------------------------------------------------------
#
# When the user clicks Run Pipeline, the preview + options collapse but
# Streamlit by itself doesn't scroll — the Results section is at the
# bottom of a tall script so the user has to find it. Inject a tiny
# component-html iframe that calls ``scrollIntoView`` on the parent's
# Results anchor. Streamlit's main page is same-origin with component
# iframes so ``window.parent.document`` access is allowed.
#
# The flag is one-shot (``pop`` removes it) so re-renders triggered by
# unrelated widgets in the Results section don't yank the viewport
# back to the top of Results.
if st.session_state.pop("_pipeline_scroll_to_results", False):
from streamlit.components.v1 import html as _components_html
_components_html(
"""
<script>
const doc = window.parent.document;
const target = doc.getElementById('pipeline-results-anchor');
if (target) target.scrollIntoView({behavior: 'smooth', block: 'start'});
</script>
""",
height=0,
)