Files
datatools-dev/src/gui/components/_legacy.py
Michael 30e257cc44 fix(gui): move Quit button to sidebar so it shows on every page
The footer placement was easy to miss (below all tool cards) and only
rendered on the home page. Hook the button into hide_streamlit_chrome()
so every page that hides default chrome — home + all 9 tool pages — gets
the Quit button at the bottom of the sidebar without per-page edits.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 13:33:32 +00:00

1247 lines
43 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Reusable Streamlit widgets for the DataTools GUI."""
from __future__ import annotations
import io
import os
import signal
from typing import Optional
import pandas as pd
import streamlit as st
from src.core.dedup import (
Algorithm,
ColumnMatchStrategy,
DeduplicationResult,
MatchResult,
MatchStrategy,
SurvivorRule,
)
from src.core.config import (
ColumnStrategyConfig,
DeduplicationConfig,
StrategyConfig,
)
from src.core.normalizers import NormalizerType
# ---------------------------------------------------------------------------
# App chrome — hide Streamlit default UI for app-like feel
# ---------------------------------------------------------------------------
_HIDE_CHROME_CSS = """
<style>
/* Make the Streamlit header transparent and out of the way, but DO NOT
`display: none` it — the sidebar's collapsed-state expand button is
anchored in the header region, and removing the header makes a
collapsed sidebar impossible to reopen. */
header[data-testid="stHeader"] {
background: transparent !important;
height: 0 !important;
}
/* Hide main hamburger menu and deploy button explicitly (don't rely on
hiding the whole header). */
#MainMenu,
[data-testid="stMainMenu"],
[data-testid="stAppDeployButton"] {
display: none !important;
}
/* Keep the sidebar expand control visible and clickable above page content. */
[data-testid="stSidebarCollapsedControl"] {
display: flex !important;
visibility: visible !important;
z-index: 999 !important;
}
/* Hide footer */
footer {
display: none !important;
}
/* Reclaim top padding lost from hidden header */
.stAppViewBlockContainer,
[data-testid="stAppViewBlockContainer"] {
padding-top: 1rem !important;
}
/* Scale content to fit app window */
.stApp {
zoom: 0.85;
}
</style>
"""
def hide_streamlit_chrome() -> None:
"""Inject CSS to hide Streamlit's default header, menu, and footer.
Also renders a Quit button at the bottom of the sidebar so every page
that hides the default chrome still has a clean way to shut the
server down.
"""
st.markdown(_HIDE_CHROME_CSS, unsafe_allow_html=True)
with st.sidebar:
st.markdown("---")
quit_button(key="quit_app_button_sidebar")
# ---------------------------------------------------------------------------
# Clean shutdown
# ---------------------------------------------------------------------------
def quit_button(label: str = "Quit app", *, key: str = "quit_app_button") -> None:
"""Render a Quit button that terminates the Streamlit server.
Streamlit has no first-class shutdown hook, so closing the browser tab
leaves the server (and the python process running it) alive — the user
has to Ctrl+C in the shell. This helper signals the Streamlit process
so the shell returns cleanly. SIGTERM lets Streamlit run its own
shutdown handlers; if that's unavailable on the platform, fall back to
SIGINT (the same signal Ctrl+C delivers).
"""
if st.session_state.get("_app_shutting_down"):
st.success("App shut down. You can close this browser tab.")
st.stop()
if st.button(label, key=key, type="secondary"):
st.session_state["_app_shutting_down"] = True
sig = getattr(signal, "SIGTERM", signal.SIGINT)
os.kill(os.getpid(), sig)
st.rerun()
# ---------------------------------------------------------------------------
# Config panel (advanced options)
# ---------------------------------------------------------------------------
def config_panel(df: pd.DataFrame) -> dict:
"""Render the Advanced Options expander. Returns a settings dict.
Keys returned:
strategies: list[MatchStrategy] | None
survivor_rule: SurvivorRule
date_column: str | None
merge: bool
"""
columns = list(df.columns)
with st.expander("Advanced Options"):
col_left, col_right = st.columns(2)
with col_left:
subset_cols = st.multiselect(
"Match on columns",
columns,
default=[],
help="Leave empty to auto-detect based on column names.",
)
key_cols = st.multiselect(
"Strong keys",
columns,
default=[],
help="Columns that uniquely identify records (e.g., EIN, SKU). Each is an independent exact-match strategy.",
)
fuzzy_cols = st.multiselect(
"Fuzzy columns",
columns,
default=[],
help="Columns to fuzzy-match. Others use exact matching.",
)
with col_right:
algorithm = st.selectbox(
"Fuzzy algorithm",
["jaro_winkler", "levenshtein", "token_set_ratio"],
index=0,
help="jaro_winkler: best for names. levenshtein: best for typos. token_set_ratio: best for addresses.",
)
threshold = st.slider(
"Similarity threshold",
min_value=50,
max_value=100,
value=85,
help="Lower = more matches but more false positives.",
)
survivor = st.selectbox(
"Survivor rule",
["first", "last", "most-complete", "most-recent"],
index=0,
help="Which row to keep when duplicates are found.",
)
# Second row of options
col_a, col_b = st.columns(2)
with col_a:
normalize_options = {c: "auto" for c in columns}
normalizer_types = ["auto", "email", "phone", "name", "address", "string", "none"]
normalize_map: dict[str, str] = {}
if fuzzy_cols or subset_cols:
target_cols = fuzzy_cols or subset_cols
st.markdown("**Per-column normalizers**")
for col_name in target_cols:
norm = st.selectbox(
f"Normalizer for '{col_name}'",
normalizer_types,
index=0,
key=f"norm_{col_name}",
)
if norm not in ("auto", "none"):
normalize_map[col_name] = norm
with col_b:
merge = st.checkbox(
"Merge mode",
value=False,
help="Fill missing fields in the surviving row from removed duplicates.",
)
date_column: Optional[str] = None
if survivor == "most-recent":
date_column = st.selectbox(
"Date column",
columns,
help="Required for most-recent survivor rule.",
)
# Config save/load
st.divider()
cfg_left, cfg_right = st.columns(2)
with cfg_left:
config_file = st.file_uploader(
"Load config profile",
type=["json"],
help="Load previously saved settings.",
key="config_upload",
)
if config_file is not None:
import json
try:
data = json.loads(config_file.read())
loaded = DeduplicationConfig.from_dict(data)
st.session_state["loaded_config"] = loaded
st.success("Config loaded.")
except Exception as e:
st.error(f"Failed to load config: {e}")
with cfg_right:
if st.button("Save current settings"):
cfg = _build_config(
subset_cols, key_cols, fuzzy_cols,
algorithm, threshold, normalize_map,
survivor, date_column, merge,
)
cfg_json = cfg.to_dict()
import json
st.download_button(
"Download config JSON",
data=json.dumps(cfg_json, indent=2),
file_name="dedup_config.json",
mime="application/json",
)
# Build strategies from selections
strategies = _build_strategies(
subset_cols, key_cols, fuzzy_cols,
algorithm, threshold, normalize_map,
)
# Survivor rule mapping
survivor_map = {
"first": SurvivorRule.KEEP_FIRST,
"last": SurvivorRule.KEEP_LAST,
"most-complete": SurvivorRule.KEEP_MOST_COMPLETE,
"most-recent": SurvivorRule.KEEP_MOST_RECENT,
}
return {
"strategies": strategies,
"survivor_rule": survivor_map[survivor],
"date_column": date_column,
"merge": merge,
}
def _build_strategies(
subset_cols: list[str],
key_cols: list[str],
fuzzy_cols: list[str],
algorithm: str,
threshold: int,
normalize_map: dict[str, str],
) -> Optional[list[MatchStrategy]]:
"""Build MatchStrategy list from GUI selections. Returns None for auto-detect."""
strategies: list[MatchStrategy] = []
# If user selected columns explicitly, build from those
if subset_cols or fuzzy_cols:
target_cols = subset_cols if subset_cols else fuzzy_cols
fuzzy_set = set(fuzzy_cols)
col_strats: list[ColumnMatchStrategy] = []
for col in target_cols:
norm = None
if col in normalize_map:
norm = NormalizerType(normalize_map[col])
if col in fuzzy_set:
algo = Algorithm(algorithm)
thresh = float(threshold)
else:
algo = Algorithm.EXACT
thresh = 100.0
col_strats.append(ColumnMatchStrategy(
column=col, algorithm=algo, threshold=thresh, normalizer=norm,
))
strategies.append(MatchStrategy(column_strategies=col_strats))
# Add strong key strategies
if key_cols:
for col in key_cols:
strategies.append(MatchStrategy(column_strategies=[
ColumnMatchStrategy(column=col, algorithm=Algorithm.EXACT, threshold=100.0)
]))
return strategies if strategies else None
def _build_config(
subset_cols, key_cols, fuzzy_cols,
algorithm, threshold, normalize_map,
survivor, date_column, merge,
) -> DeduplicationConfig:
"""Build a DeduplicationConfig from GUI state."""
cfg = DeduplicationConfig(
survivor_rule=survivor.replace("-", "_"),
date_column=date_column,
merge=merge,
subset_columns=subset_cols or None,
fuzzy_columns=fuzzy_cols or None,
default_algorithm=algorithm,
default_threshold=float(threshold),
normalize_map=normalize_map or None,
)
strategies = _build_strategies(
subset_cols, key_cols, fuzzy_cols,
algorithm, threshold, normalize_map,
)
if strategies:
cfg.strategies = [
StrategyConfig(columns=[
ColumnStrategyConfig(
column=cs.column,
algorithm=cs.algorithm.value,
threshold=cs.threshold,
normalizer=cs.normalizer.value if cs.normalizer else None,
)
for cs in s.column_strategies
])
for s in strategies
]
return cfg
# ---------------------------------------------------------------------------
# Match group review card
# ---------------------------------------------------------------------------
def _find_differing_cols(
group: MatchResult, df: pd.DataFrame, display_cols: list[str],
) -> list[str]:
"""Return columns where values differ across rows in the group."""
differing = []
for col in display_cols:
values = set()
for idx in group.row_indices:
values.add(str(df.iloc[idx].get(col, "")).strip())
if len(values) > 1:
differing.append(col)
return differing
def match_group_card(
group: MatchResult,
df: pd.DataFrame,
group_num: int,
) -> None:
"""Render an expandable match group card with side-by-side diff.
Users select which rows to keep via checkboxes. When exactly one row
is kept they can also cherry-pick column values from the other rows.
Decision format stored in ``st.session_state["review_decisions"]``::
{group_id: {"keep_indices": [int, ...], "overrides": {col: val}}}
"""
confidence = group.confidence
matched_on = ", ".join(group.matched_on)
n_rows = len(group.row_indices)
gid = group.group_id
decisions = st.session_state.get("review_decisions", {})
has_decision = gid in decisions
decision_dict = decisions.get(gid, {})
keep_indices = decision_dict.get("keep_indices", []) if has_decision else []
overrides = decision_dict.get("overrides", {}) if has_decision else {}
# Build label — append decision status if already decided
label = (
f"Group {group_num}: {n_rows} rows "
f"(confidence: {confidence:.0f}%) "
f"[{matched_on}]"
)
if has_decision:
if len(keep_indices) == n_rows:
label += " — Kept All"
elif len(keep_indices) == 1:
label += " — Merged (customized)" if overrides else " — Merged"
else:
label += f" — Split (kept {len(keep_indices)} of {n_rows})"
# Decided groups collapse; undecided groups stay open
expanded = not has_decision
display_cols = [c for c in df.columns if not str(c).startswith("_norm_")]
differing_cols = _find_differing_cols(group, df, display_cols)
with st.expander(label, expanded=expanded):
if has_decision:
# --- Decided state: read-only table with diff highlighting ---
rows_data = []
for idx in group.row_indices:
row = {"Row": idx + 1}
for col in display_cols:
row[col] = df.iloc[idx].get(col, "")
rows_data.append(row)
compare_df = pd.DataFrame(rows_data).set_index("Row")
def _highlight_diffs(s: pd.Series) -> list[str]:
styles = []
first_val = str(s.iloc[0]).strip() if len(s) > 0 else ""
for val in s:
val_str = str(val).strip()
if val_str != first_val and val_str and first_val:
styles.append(
"background-color: rgba(245, 166, 35, 0.2)"
)
elif not val_str and first_val:
styles.append(
"background-color: rgba(240, 82, 82, 0.1)"
)
else:
styles.append("")
return styles
styled = compare_df.style.apply(_highlight_diffs, axis=0)
st.dataframe(styled, use_container_width=True)
if len(keep_indices) == n_rows:
st.info("Decision: Kept All")
elif len(keep_indices) == 1:
msg = "Decision: Merge"
if overrides:
msg += f" ({len(overrides)} column(s) customized)"
st.success(msg)
else:
kept = ", ".join(str(i + 1) for i in sorted(keep_indices))
st.success(
f"Decision: Keep rows {kept} "
f"(removing {n_rows - len(keep_indices)})"
)
def _undo(g=gid):
st.session_state["review_decisions"].pop(g, None)
st.session_state.pop(f"editor_{g}", None)
st.button("Undo", key=f"undo_{gid}", on_click=_undo)
else:
# --- Undecided: interactive editor with inline checkboxes & dropdowns ---
editor_rows = []
for idx in group.row_indices:
row_data = {"Keep": idx == group.survivor_index, "Row": idx + 1}
for col in display_cols:
row_data[col] = str(df.iloc[idx].get(col, ""))
editor_rows.append(row_data)
editor_df = pd.DataFrame(editor_rows)
col_config = {
"Keep": st.column_config.CheckboxColumn(
"Keep", default=True, width="small",
),
"Row": st.column_config.NumberColumn("Row", width="small"),
}
for col in differing_cols:
vals = []
for idx in group.row_indices:
v = str(df.iloc[idx].get(col, "")).strip()
if v not in vals:
vals.append(v)
if "" not in vals:
vals.append("")
col_config[col] = st.column_config.SelectboxColumn(
col, options=vals, required=False,
)
disabled_cols = ["Row"] + [
c for c in display_cols if c not in differing_cols
]
edited = st.data_editor(
editor_df,
column_config=col_config,
disabled=disabled_cols,
use_container_width=True,
hide_index=True,
key=f"editor_{gid}",
)
# Read which rows are checked
checked = [
idx
for i, idx in enumerate(group.row_indices)
if edited.iloc[i]["Keep"]
]
if differing_cols:
st.caption(
f"Columns with differences (editable): "
f"{', '.join(differing_cols)}"
)
# Status + surviving rows preview
if len(checked) == 0:
st.warning("Select at least one row to keep.")
else:
if len(checked) == n_rows:
st.caption("Keeping all rows (no duplicates removed)")
elif len(checked) == 1:
st.caption(
f"Merging into Row {checked[0] + 1}, "
f"removing {n_rows - 1} row(s)"
)
else:
st.caption(
f"Keeping {len(checked)} rows, "
f"removing {n_rows - len(checked)}"
)
# Build preview of surviving rows with edits applied
checked_positions = [
i for i, idx in enumerate(group.row_indices)
if idx in checked
]
preview = edited.iloc[checked_positions].drop(
columns=["Keep"],
).reset_index(drop=True)
st.markdown("**Surviving rows preview:**")
st.dataframe(preview, use_container_width=True, hide_index=True)
# Confirm
def _on_confirm(
g=gid, indices=list(group.row_indices),
diff=differing_cols, surv=group.survivor_index,
):
editor_state = st.session_state.get(f"editor_{g}", {})
ed_rows = editor_state.get("edited_rows", {})
# Determine which rows to keep
keep = []
for i, idx in enumerate(indices):
changes = ed_rows.get(i, {})
default_keep = idx == surv
if changes.get("Keep", default_keep):
keep.append(idx)
if not keep:
keep = list(indices)
# Column overrides (single-survivor merge only)
ovr: dict[str, str] = {}
if len(keep) == 1:
surv_idx = keep[0]
surv_pos = indices.index(surv_idx)
surv_changes = ed_rows.get(surv_pos, {})
the_df = st.session_state["df"]
for c in diff:
if c in surv_changes:
new_val = (
str(surv_changes[c])
if surv_changes[c] is not None
else ""
)
orig = str(
the_df.iloc[surv_idx].get(c, "")
).strip()
if new_val.strip() != orig:
ovr[c] = new_val
st.session_state["review_decisions"][g] = {
"keep_indices": keep,
"overrides": ovr,
}
st.button(
"Confirm",
key=f"confirm_{gid}",
type="primary",
on_click=_on_confirm,
disabled=(len(checked) == 0),
)
# ---------------------------------------------------------------------------
# Results summary + downloads
# ---------------------------------------------------------------------------
def results_summary(
result: DeduplicationResult,
original_df: pd.DataFrame,
) -> None:
"""Render summary stats and download buttons."""
removed = result.original_row_count - len(result.deduplicated_df)
# Summary metrics
col1, col2, col3, col4 = st.columns(4)
col1.metric("Rows In", result.original_row_count)
col2.metric("Rows Out", len(result.deduplicated_df))
col3.metric("Removed", removed)
col4.metric("Groups", len(result.match_groups))
st.divider()
# Download buttons
dl_left, dl_mid, dl_right = st.columns(3)
with dl_left:
csv_bytes = result.deduplicated_df.to_csv(index=False).encode("utf-8-sig")
st.download_button(
"Download Deduplicated CSV",
data=csv_bytes,
file_name="deduplicated.csv",
mime="text/csv",
)
with dl_mid:
if not result.removed_df.empty:
removed_bytes = result.removed_df.to_csv(index=False).encode("utf-8-sig")
st.download_button(
"Download Removed Rows",
data=removed_bytes,
file_name="removed_rows.csv",
mime="text/csv",
)
with dl_right:
if result.match_groups:
groups_data = _build_match_groups_csv(result, original_df)
st.download_button(
"Download Match Groups Report",
data=groups_data,
file_name="match_groups.csv",
mime="text/csv",
)
def apply_review_decisions(
original_df: pd.DataFrame,
match_groups: list[MatchResult],
decisions: dict,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Build final DataFrames by applying user review decisions.
Supports three modes per group:
- **Merge** (1 row kept): single survivor with optional column overrides.
- **Split** (some rows kept): selected rows survive, others removed.
- **Keep all** (all rows kept): no rows removed.
- **No decision**: engine default (single survivor).
Returns ``(deduplicated_df, removed_df)``.
"""
remove_indices: set[int] = set()
row_overrides: dict[int, dict[str, str]] = {}
for group in match_groups:
gid = group.group_id
decision = decisions.get(gid)
# No decision yet — accept with engine defaults
if decision is None:
keep = {group.survivor_index}
else:
keep = set(decision.get("keep_indices", group.row_indices))
# Safety: never remove all rows in a group
if not keep:
keep = set(group.row_indices)
for idx in group.row_indices:
if idx not in keep:
remove_indices.add(idx)
# Column overrides (only meaningful for single-survivor merge)
ovr = decision.get("overrides", {}) if decision else {}
if ovr and len(keep) == 1:
row_overrides[next(iter(keep))] = ovr
# Build output DataFrames
kept = [i for i in range(len(original_df)) if i not in remove_indices]
if row_overrides:
rows = []
for i in kept:
row = original_df.iloc[i].copy()
if i in row_overrides:
for col, val in row_overrides[i].items():
if col in row.index:
row[col] = val
rows.append(row)
deduped = pd.DataFrame(rows).reset_index(drop=True)
else:
deduped = original_df.iloc[kept].copy().reset_index(drop=True)
removed = (
original_df.iloc[sorted(remove_indices)].copy().reset_index(drop=True)
if remove_indices
else pd.DataFrame()
)
return deduped, removed
def _build_match_groups_csv(
result: DeduplicationResult,
original_df: pd.DataFrame,
) -> bytes:
"""Build the match groups audit CSV as bytes."""
rows = []
for g in result.match_groups:
for idx in g.row_indices:
row_data = {
"_group_id": g.group_id + 1,
"_is_survivor": idx == g.survivor_index,
"_confidence": g.confidence,
"_matched_on": ", ".join(g.matched_on),
"_original_row": idx + 1,
}
for col in original_df.columns:
if not str(col).startswith("_norm_"):
row_data[col] = original_df.iloc[idx].get(col, "") if idx < len(original_df) else ""
rows.append(row_data)
groups_df = pd.DataFrame(rows)
return groups_df.to_csv(index=False).encode("utf-8-sig")
# ---------------------------------------------------------------------------
# Analyzer integration (upload-time data quality findings)
# ---------------------------------------------------------------------------
# Tool id -> friendly display name. Single source of truth for the GUI; the
# CLI keeps its own copy so each entrypoint stays self-contained.
TOOL_DISPLAY_NAMES: dict[str, str] = {
"01_deduplicator": "Deduplicator",
"02_text_cleaner": "Text Cleaner",
"03_format_standardizer": "Format Standardizer",
"04_missing_handler": "Missing Value Handler",
"05_column_mapper": "Column Mapper",
"06_outlier_detector": "Outlier Detector",
"07_multi_file_merger": "Multi-File Merger",
"08_validator_reporter": "Validator & Reporter",
"09_pipeline_runner": "Pipeline Runner",
}
_SEVERITY_ICON: dict[str, str] = {
"info": "",
"warn": "⚠️",
"error": "🛑",
}
_SEVERITY_COLOR: dict[str, str] = {
"info": "blue",
"warn": "orange",
"error": "red",
}
# Map tool id to the streamlit page path under src/gui/. Skipped tools (no
# page yet) return empty string and the "Open" button is omitted.
_TOOL_PAGE_PATHS: dict[str, str] = {
"01_deduplicator": "pages/1_Deduplicator.py",
"02_text_cleaner": "pages/2_Text_Cleaner.py",
"03_format_standardizer": "pages/3_Format_Standardizer.py",
"04_missing_handler": "pages/4_Missing_Values.py",
"05_column_mapper": "pages/5_Column_Mapper.py",
"06_outlier_detector": "pages/6_Outlier_Detector.py",
"07_multi_file_merger": "pages/7_Multi_File_Merger.py",
"08_validator_reporter": "pages/8_Validator_Reporter.py",
"09_pipeline_runner": "pages/9_Pipeline_Runner.py",
}
def tool_display_name(tool_id: str) -> str:
"""Map a stable tool id to its GUI display name; falls back to the id."""
return TOOL_DISPLAY_NAMES.get(tool_id, tool_id) if tool_id else "Informational"
def _tool_page_slug(tool_id: str) -> str:
return _TOOL_PAGE_PATHS.get(tool_id, "")
def render_findings_panel(findings, *, header: str = "Detected issues") -> None:
"""Render a list of :class:`Finding` objects grouped by tool.
Each tool gets a header with the count, an open-tool button, and a list
of the findings underneath. Severity icon + count are shown inline so
the user can decide which tool to open first.
"""
from src.core.analyze import findings_by_tool # local import to avoid cycle
from src.core.text_clean import hidden_char_css
if not findings:
st.success("No issues detected. Open any tool below to start working.")
return
# Inject the hidden-char badge styles once so every sample value below
# can render leading/trailing whitespace and invisibles as visible badges.
st.markdown(hidden_char_css() + _SAMPLE_TABLE_CSS, unsafe_allow_html=True)
by_sev: dict[str, int] = {}
for f in findings:
by_sev[f.severity] = by_sev.get(f.severity, 0) + 1
sev_summary = " · ".join(
f"{_SEVERITY_ICON[s]} {by_sev[s]} {s}"
for s in ("error", "warn", "info") if by_sev.get(s)
)
st.markdown(f"### {header}")
st.caption(sev_summary)
grouped = findings_by_tool(findings)
untargeted = [f for f in findings if not f.tool]
for tool_id in sorted(grouped):
items = grouped[tool_id]
with st.expander(
f"{tool_display_name(tool_id)}{len(items)} finding(s)",
expanded=any(f.severity == "error" for f in items),
):
for f in items:
_render_one_finding(f)
page_slug = _tool_page_slug(tool_id)
if page_slug:
# Streamlit resolves page paths relative to the entrypoint
# (src/gui/app.py), so a leading ``src/gui/`` would point
# outside the allowed page tree on Windows.
st.page_link(page_slug, label=f"Open {tool_display_name(tool_id)}")
if untargeted:
with st.expander(
f"Other / file-level — {len(untargeted)} finding(s)",
expanded=False,
):
for f in untargeted:
_render_one_finding(f)
_PREVIEW_TABLE_CSS = """
<style>
.hidden-aware-preview {
width: 100%;
border-collapse: collapse;
font-size: 0.9em;
}
.hidden-aware-preview th,
.hidden-aware-preview td {
padding: 4px 8px;
border: 1px solid #eee;
text-align: left;
vertical-align: top;
font-family: ui-monospace, SFMono-Regular, monospace;
/* pre-wrap so internal ASCII whitespace and embedded newlines render
as the user wrote them; otherwise browsers collapse adjacent spaces. */
white-space: pre-wrap;
word-break: break-word;
max-width: 32em;
}
.hidden-aware-preview thead th {
background: #f6f8fa;
position: sticky;
top: 0;
}
.hidden-aware-preview tbody tr:nth-child(even) { background: #fafafa; }
.hidden-aware-preview .row-num {
color: #888;
font-family: inherit;
background: #f6f8fa;
text-align: right;
}
.hidden-aware-preview-wrap {
max-height: 26rem;
overflow: auto;
border: 1px solid #eee;
border-radius: 4px;
}
</style>
"""
def render_hidden_aware_preview(
df,
*,
n_rows: int = 10,
caption: str | None = None,
) -> None:
"""Render a DataFrame preview that shows hidden characters in every cell.
Used for the Text Cleaner's "before" and "after" previews so the user
can actually see the leading/trailing whitespace, NBSP padding,
zero-width characters, and smart punctuation that the cleaner is going
to remove (or just removed). A plain ``st.dataframe`` collapses outer
ASCII whitespace and renders invisibles as nothing, defeating the
point of a preview in a cleanup tool.
Headers and cell values are both routed through
:func:`visualize_hidden_html` with ``mark_outer_whitespace=True``.
"""
import pandas as pd
from src.core.text_clean import hidden_char_css, visualize_hidden_html
if df is None or len(df) == 0:
st.info("No rows to preview.")
return
sliced = df.head(n_rows) if len(df) > n_rows else df
st.markdown(hidden_char_css() + _PREVIEW_TABLE_CSS, unsafe_allow_html=True)
if caption:
st.caption(caption)
header_cells = "".join(
f"<th>{visualize_hidden_html(str(c), mark_outer_whitespace=True)}</th>"
for c in sliced.columns
)
body_rows: list[str] = []
for row_idx, (orig_idx, row) in enumerate(sliced.iterrows(), start=1):
cells = ["<td class='row-num'>" + str(row_idx) + "</td>"]
for col in sliced.columns:
value = row[col]
if isinstance(value, str):
rendered = visualize_hidden_html(value, mark_outer_whitespace=True)
elif pd.isna(value):
rendered = "<span style='color:#aaa'>NaN</span>"
else:
# Non-string scalars (numerics, bools) just stringify; they
# won't have invisible chars but we still need html-escape.
rendered = visualize_hidden_html(str(value))
cells.append(f"<td>{rendered}</td>")
body_rows.append("<tr>" + "".join(cells) + "</tr>")
st.markdown(
"<div class='hidden-aware-preview-wrap'>"
"<table class='hidden-aware-preview'>"
f"<thead><tr><th class='row-num'>#</th>{header_cells}</tr></thead>"
f"<tbody>{''.join(body_rows)}</tbody>"
"</table>"
"</div>",
unsafe_allow_html=True,
)
_SAMPLE_TABLE_CSS = """
<style>
.findings-sample-table {
width: 100%;
border-collapse: collapse;
font-size: 0.9em;
}
.findings-sample-table th,
.findings-sample-table td {
padding: 4px 8px;
border-bottom: 1px solid #eee;
text-align: left;
vertical-align: top;
}
.findings-sample-table td.value {
font-family: ui-monospace, SFMono-Regular, monospace;
/* pre-wrap so any ASCII whitespace inside the value is preserved
visually (browsers collapse adjacent spaces by default). */
white-space: pre-wrap;
word-break: break-word;
}
.findings-sample-table tbody tr:hover { background: #fafafa; }
</style>
"""
def _render_one_finding(f) -> None:
from src.core.text_clean import visualize_hidden_html
color = _SEVERITY_COLOR[f.severity]
icon = _SEVERITY_ICON[f.severity]
column_part = f" in `{f.column}`" if getattr(f, "column", None) else ""
st.markdown(
f"{icon} :{color}[**{f.id}**]{column_part}{f.description}"
)
if f.samples:
# Render samples as an HTML table so leading/trailing whitespace
# and invisible characters in the value column show up as badges.
# A plain st.dataframe collapses outer whitespace and renders
# NBSP/ZWSP as nothing, defeating the point of the audit.
rows_html = []
for row, col, value in f.samples:
rendered_value = visualize_hidden_html(
str(value), mark_outer_whitespace=True,
)
rendered_col = visualize_hidden_html(
str(col), mark_outer_whitespace=True,
)
rows_html.append(
"<tr>"
f"<td>{int(row) + 1 if isinstance(row, int) else row}</td>"
f"<td><code>{rendered_col}</code></td>"
f"<td class='value'>{rendered_value}</td>"
"</tr>"
)
st.markdown(
"<table class='findings-sample-table'>"
"<thead><tr>"
"<th>Row</th><th>Column</th><th>Value</th>"
"</tr></thead>"
f"<tbody>{''.join(rows_html)}</tbody>"
"</table>",
unsafe_allow_html=True,
)
def upload_and_analyze_section() -> None:
"""Render the upload + analyze panel for the home page.
Stashes the uploaded file (name + bytes) and findings in session state
so individual tool pages can pick them up if they want to skip their
own uploader. Each tool page already has its own uploader today, so
this is purely additive.
"""
st.markdown("### 📤 Upload a file to start")
st.caption(
"Optional: scan an uploaded file for data quality issues and see "
"which tools can fix each one. Skip if you already know what you need."
)
st.caption(
"**Up to 1 GB.** Formats: CSV, TSV, XLSX, XLS. "
"Delimiters auto-detected: comma, tab, semicolon, pipe. "
"Encodings auto-detected: UTF-8 (with/without BOM), UTF-16, "
"cp1252, Latin-1/9, cp1250, ISO-8859-2, cp1251, KOI8-R, "
"Mac Roman, Shift_JIS, GB18030, Big5, EUC-KR — and override on the Review page."
)
uploaded = st.file_uploader(
"Upload CSV or Excel",
type=["csv", "tsv", "xlsx", "xls"],
key="home_upload",
help=(
"Up to 1 GB. Comma / tab / semicolon / pipe delimiters all "
"auto-detected. Encoding auto-detected with override on the "
"Review page if needed."
),
)
if uploaded is None:
return
# Stash on every fresh upload so all tool pages can pick it up.
if (
st.session_state.get("home_uploaded_name") != uploaded.name
or st.session_state.get("home_uploaded_size") != uploaded.size
):
st.session_state["home_uploaded_name"] = uploaded.name
st.session_state["home_uploaded_size"] = uploaded.size
st.session_state["home_uploaded_bytes"] = uploaded.getvalue()
# Drop stale findings on a new upload.
st.session_state.pop("home_findings", None)
st.session_state.pop("home_skipped", None)
col_run, col_skip, _ = st.columns([1, 1, 4])
with col_run:
run_clicked = st.button("Run analysis", type="primary", key="home_run_analysis")
with col_skip:
skip_clicked = st.button("Skip", key="home_skip_analysis")
if skip_clicked:
st.session_state["home_findings"] = []
st.session_state["home_skipped"] = True
if run_clicked:
with st.spinner("Scanning…"):
findings = _run_analysis_on_upload(uploaded)
st.session_state["home_findings"] = findings
st.session_state["home_skipped"] = False
findings = st.session_state.get("home_findings")
if findings is None:
return
if st.session_state.get("home_skipped"):
st.info("Analysis skipped. Open any tool below to start working.")
return
st.divider()
render_findings_panel(findings)
def _run_analysis_on_upload(uploaded):
"""Read the uploaded file with pre-parse repair, then analyze."""
from src.core.analyze import analyze
from src.core.io import repair_bytes
name = uploaded.name
data = uploaded.getvalue()
suffix = name.rsplit(".", 1)[-1].lower() if "." in name else ""
if suffix in ("xlsx", "xls"):
df = pd.read_excel(io.BytesIO(data), dtype=str, keep_default_na=False)
return analyze(df)
# CSV / TSV: run repair_bytes so the user sees csv_* findings.
text_head = data[:4096].decode("utf-8", errors="replace")
delim = "\t" if suffix == "tsv" else ","
if delim == ",":
for cand in ("\t", ";", "|"):
if text_head.count(cand) > text_head.count(",") * 1.5:
delim = cand
break
repair = repair_bytes(data, encoding="utf-8", delimiter=delim)
df = pd.read_csv(
io.BytesIO(repair.repaired_bytes),
encoding="utf-8", delimiter=delim,
dtype=str, keep_default_na=False, on_bad_lines="warn",
)
return analyze(df, repair_result=repair)
def findings_count_for_tool(tool_id: str) -> int:
"""How many findings in session state target *tool_id*; 0 when none.
Used by the home-page tool grid to badge cards that have actionable
findings without re-running the analyzer.
"""
findings = st.session_state.get("home_findings") or []
return sum(1 for f in findings if f.tool == tool_id)
# ---------------------------------------------------------------------------
# Cross-page upload pickup
# ---------------------------------------------------------------------------
class _StashedUpload:
"""Duck-types ``st.runtime.uploaded_file_manager.UploadedFile`` enough
for the tool pages: ``.name``, ``.size``, ``.getvalue()``.
Tool pages that previously consumed a Streamlit ``UploadedFile`` can
accept this in its place without changes.
"""
__slots__ = ("name", "size", "_data")
def __init__(self, name: str, data: bytes) -> None:
self.name = name
self.size = len(data)
self._data = data
def getvalue(self) -> bytes:
return self._data
def read(self) -> bytes:
return self._data
def require_normalization_gate() -> None:
"""Block the calling tool page until the upload has passed the gate.
Tool pages should call this immediately after their imports. When the
current session upload has not been normalized — no
``normalization_result``, the result is for a different upload, or the
result didn't pass — the user is shown a banner and a button to jump
to the Review page; the rest of the page is short-circuited via
``st.stop()``.
Pages that genuinely don't need a clean dataframe (rare) can opt out
by simply not calling this.
"""
import hashlib
has_upload = st.session_state.get("home_uploaded_bytes") is not None
if not has_upload:
# No upload yet — let the page's own uploader handle it; the gate
# will kick in once a file is present.
return
upload_hash = hashlib.sha256(
st.session_state["home_uploaded_bytes"]
).hexdigest()
result = st.session_state.get("normalization_result")
matched = (
result is not None
and st.session_state.get("normalization_for") == upload_hash
and getattr(result, "passed", False)
)
if matched:
return
name = st.session_state.get("home_uploaded_name", "the uploaded file")
st.warning(
f"**{name}** must pass the CSV-normalization gate before you can "
f"use this tool. Open the Review page to apply the fixes our "
f"analyzer recommends."
)
if st.button("Go to Review & Normalize", type="primary"):
st.switch_page("pages/0_Review.py")
st.stop()
def pickup_or_upload(
*,
label: str,
key: str,
types: list[str],
help: str | None = None,
):
"""Return an upload object, preferring the home-page upload when present.
Behavior:
- If ``st.session_state['home_uploaded_bytes']`` is set and the user
hasn't asked for a different file on this page, render a banner
("Using *<name>* from upload screen") plus a "Use a different file"
button, and return a :class:`_StashedUpload` shim.
- Otherwise render the standard ``st.file_uploader`` with the supplied
*label*, *key*, and *types*. Returns the Streamlit ``UploadedFile``
directly (or ``None`` if nothing uploaded).
The ``_StashedUpload`` shim exposes ``.name``, ``.size``, and
``.getvalue()`` so existing tool-page code that consumes a Streamlit
upload object works without changes.
"""
override_key = f"{key}__override"
has_session_upload = st.session_state.get("home_uploaded_bytes") is not None
use_session = has_session_upload and not st.session_state.get(override_key, False)
if use_session:
name = st.session_state.get("home_uploaded_name", "uploaded file")
st.info(f"Using **{name}** from the upload screen.")
if st.button("Use a different file", key=f"{key}__pick_diff"):
st.session_state[override_key] = True
st.rerun()
return _StashedUpload(name, st.session_state["home_uploaded_bytes"])
if {"csv", "tsv", "xlsx", "xls"} & set(types):
st.caption(
"Up to 1 GB. Delimiters auto-detected: comma, tab, semicolon, pipe. "
"Encoding auto-detected (UTF-8 / UTF-16 / cp1252 / Latin-1 family / "
"cp1250 / cp1251 / KOI8-R / Mac Roman / Shift_JIS / GB18030 / Big5 / "
"EUC-KR), with override on the Review page."
)
uploaded = st.file_uploader(label, type=types, key=key, help=help)
if uploaded is not None and st.session_state.get(override_key):
# User has uploaded their own file on this page; clear the override
# so the next visit to a tool page starts fresh.
pass
if uploaded is None and st.session_state.get(override_key) and has_session_upload:
if st.button("Switch back to upload-screen file", key=f"{key}__switch_back"):
st.session_state[override_key] = False
st.rerun()
return uploaded