Move the shutdown control out of the inline sidebar widget and into its own page (pages/99_Close.py), so it appears in the sidebar nav alongside the tool pages. An explicit confirm button on the page prevents accidental nav clicks from killing a live session. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1246 lines
43 KiB
Python
1246 lines
43 KiB
Python
"""Reusable Streamlit widgets for the DataTools GUI."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import io
|
||
import os
|
||
import threading
|
||
import time
|
||
from typing import Optional
|
||
|
||
import pandas as pd
|
||
import streamlit as st
|
||
|
||
from src.core.dedup import (
|
||
Algorithm,
|
||
ColumnMatchStrategy,
|
||
DeduplicationResult,
|
||
MatchResult,
|
||
MatchStrategy,
|
||
SurvivorRule,
|
||
)
|
||
from src.core.config import (
|
||
ColumnStrategyConfig,
|
||
DeduplicationConfig,
|
||
StrategyConfig,
|
||
)
|
||
from src.core.normalizers import NormalizerType
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# App chrome — hide Streamlit default UI for app-like feel
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_HIDE_CHROME_CSS = """
|
||
<style>
|
||
/* Make the Streamlit header transparent and out of the way, but DO NOT
|
||
`display: none` it — the sidebar's collapsed-state expand button is
|
||
anchored in the header region, and removing the header makes a
|
||
collapsed sidebar impossible to reopen. */
|
||
header[data-testid="stHeader"] {
|
||
background: transparent !important;
|
||
height: 0 !important;
|
||
}
|
||
/* Hide main hamburger menu and deploy button explicitly (don't rely on
|
||
hiding the whole header). */
|
||
#MainMenu,
|
||
[data-testid="stMainMenu"],
|
||
[data-testid="stAppDeployButton"] {
|
||
display: none !important;
|
||
}
|
||
/* Keep the sidebar expand control visible and clickable above page content. */
|
||
[data-testid="stSidebarCollapsedControl"] {
|
||
display: flex !important;
|
||
visibility: visible !important;
|
||
z-index: 999 !important;
|
||
}
|
||
/* Hide footer */
|
||
footer {
|
||
display: none !important;
|
||
}
|
||
/* Reclaim top padding lost from hidden header */
|
||
.stAppViewBlockContainer,
|
||
[data-testid="stAppViewBlockContainer"] {
|
||
padding-top: 1rem !important;
|
||
}
|
||
/* Scale content to fit app window */
|
||
.stApp {
|
||
zoom: 0.85;
|
||
}
|
||
</style>
|
||
"""
|
||
|
||
|
||
def hide_streamlit_chrome() -> None:
|
||
"""Inject CSS to hide Streamlit's default header, menu, and footer."""
|
||
st.markdown(_HIDE_CHROME_CSS, unsafe_allow_html=True)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Clean shutdown
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def quit_button(label: str = "Quit app", *, key: str = "quit_app_button") -> None:
|
||
"""Render a Quit button that terminates the Streamlit server.
|
||
|
||
Streamlit has no first-class shutdown hook, and signalling the
|
||
process (SIGTERM/SIGINT) does not reliably terminate it — Streamlit
|
||
installs its own handlers and the tornado/asyncio loop swallows or
|
||
defers the signal, so the browser sees the websocket drop while the
|
||
python process stays alive. To shut down cleanly we schedule
|
||
``os._exit(0)`` on a daemon thread after a short delay; the delay
|
||
lets the current rerun finish painting the "shutting down" message
|
||
before the process is hard-killed.
|
||
"""
|
||
if st.session_state.get("_app_shutting_down"):
|
||
st.success("Shutting down… you can close this browser tab.")
|
||
st.stop()
|
||
|
||
if st.button(label, key=key, type="secondary"):
|
||
st.session_state["_app_shutting_down"] = True
|
||
|
||
def _hard_exit() -> None:
|
||
time.sleep(0.5)
|
||
os._exit(0)
|
||
|
||
threading.Thread(target=_hard_exit, daemon=True).start()
|
||
st.rerun()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Config panel (advanced options)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def config_panel(df: pd.DataFrame) -> dict:
|
||
"""Render the Advanced Options expander. Returns a settings dict.
|
||
|
||
Keys returned:
|
||
strategies: list[MatchStrategy] | None
|
||
survivor_rule: SurvivorRule
|
||
date_column: str | None
|
||
merge: bool
|
||
"""
|
||
columns = list(df.columns)
|
||
|
||
with st.expander("Advanced Options"):
|
||
col_left, col_right = st.columns(2)
|
||
|
||
with col_left:
|
||
subset_cols = st.multiselect(
|
||
"Match on columns",
|
||
columns,
|
||
default=[],
|
||
help="Leave empty to auto-detect based on column names.",
|
||
)
|
||
key_cols = st.multiselect(
|
||
"Strong keys",
|
||
columns,
|
||
default=[],
|
||
help="Columns that uniquely identify records (e.g., EIN, SKU). Each is an independent exact-match strategy.",
|
||
)
|
||
fuzzy_cols = st.multiselect(
|
||
"Fuzzy columns",
|
||
columns,
|
||
default=[],
|
||
help="Columns to fuzzy-match. Others use exact matching.",
|
||
)
|
||
|
||
with col_right:
|
||
algorithm = st.selectbox(
|
||
"Fuzzy algorithm",
|
||
["jaro_winkler", "levenshtein", "token_set_ratio"],
|
||
index=0,
|
||
help="jaro_winkler: best for names. levenshtein: best for typos. token_set_ratio: best for addresses.",
|
||
)
|
||
threshold = st.slider(
|
||
"Similarity threshold",
|
||
min_value=50,
|
||
max_value=100,
|
||
value=85,
|
||
help="Lower = more matches but more false positives.",
|
||
)
|
||
survivor = st.selectbox(
|
||
"Survivor rule",
|
||
["first", "last", "most-complete", "most-recent"],
|
||
index=0,
|
||
help="Which row to keep when duplicates are found.",
|
||
)
|
||
|
||
# Second row of options
|
||
col_a, col_b = st.columns(2)
|
||
|
||
with col_a:
|
||
normalize_options = {c: "auto" for c in columns}
|
||
normalizer_types = ["auto", "email", "phone", "name", "address", "string", "none"]
|
||
|
||
normalize_map: dict[str, str] = {}
|
||
if fuzzy_cols or subset_cols:
|
||
target_cols = fuzzy_cols or subset_cols
|
||
st.markdown("**Per-column normalizers**")
|
||
for col_name in target_cols:
|
||
norm = st.selectbox(
|
||
f"Normalizer for '{col_name}'",
|
||
normalizer_types,
|
||
index=0,
|
||
key=f"norm_{col_name}",
|
||
)
|
||
if norm not in ("auto", "none"):
|
||
normalize_map[col_name] = norm
|
||
|
||
with col_b:
|
||
merge = st.checkbox(
|
||
"Merge mode",
|
||
value=False,
|
||
help="Fill missing fields in the surviving row from removed duplicates.",
|
||
)
|
||
date_column: Optional[str] = None
|
||
if survivor == "most-recent":
|
||
date_column = st.selectbox(
|
||
"Date column",
|
||
columns,
|
||
help="Required for most-recent survivor rule.",
|
||
)
|
||
|
||
# Config save/load
|
||
st.divider()
|
||
cfg_left, cfg_right = st.columns(2)
|
||
|
||
with cfg_left:
|
||
config_file = st.file_uploader(
|
||
"Load config profile",
|
||
type=["json"],
|
||
help="Load previously saved settings.",
|
||
key="config_upload",
|
||
)
|
||
if config_file is not None:
|
||
import json
|
||
try:
|
||
data = json.loads(config_file.read())
|
||
loaded = DeduplicationConfig.from_dict(data)
|
||
st.session_state["loaded_config"] = loaded
|
||
st.success("Config loaded.")
|
||
except Exception as e:
|
||
st.error(f"Failed to load config: {e}")
|
||
|
||
with cfg_right:
|
||
if st.button("Save current settings"):
|
||
cfg = _build_config(
|
||
subset_cols, key_cols, fuzzy_cols,
|
||
algorithm, threshold, normalize_map,
|
||
survivor, date_column, merge,
|
||
)
|
||
cfg_json = cfg.to_dict()
|
||
import json
|
||
st.download_button(
|
||
"Download config JSON",
|
||
data=json.dumps(cfg_json, indent=2),
|
||
file_name="dedup_config.json",
|
||
mime="application/json",
|
||
)
|
||
|
||
# Build strategies from selections
|
||
strategies = _build_strategies(
|
||
subset_cols, key_cols, fuzzy_cols,
|
||
algorithm, threshold, normalize_map,
|
||
)
|
||
|
||
# Survivor rule mapping
|
||
survivor_map = {
|
||
"first": SurvivorRule.KEEP_FIRST,
|
||
"last": SurvivorRule.KEEP_LAST,
|
||
"most-complete": SurvivorRule.KEEP_MOST_COMPLETE,
|
||
"most-recent": SurvivorRule.KEEP_MOST_RECENT,
|
||
}
|
||
|
||
return {
|
||
"strategies": strategies,
|
||
"survivor_rule": survivor_map[survivor],
|
||
"date_column": date_column,
|
||
"merge": merge,
|
||
}
|
||
|
||
|
||
def _build_strategies(
|
||
subset_cols: list[str],
|
||
key_cols: list[str],
|
||
fuzzy_cols: list[str],
|
||
algorithm: str,
|
||
threshold: int,
|
||
normalize_map: dict[str, str],
|
||
) -> Optional[list[MatchStrategy]]:
|
||
"""Build MatchStrategy list from GUI selections. Returns None for auto-detect."""
|
||
strategies: list[MatchStrategy] = []
|
||
|
||
# If user selected columns explicitly, build from those
|
||
if subset_cols or fuzzy_cols:
|
||
target_cols = subset_cols if subset_cols else fuzzy_cols
|
||
fuzzy_set = set(fuzzy_cols)
|
||
col_strats: list[ColumnMatchStrategy] = []
|
||
for col in target_cols:
|
||
norm = None
|
||
if col in normalize_map:
|
||
norm = NormalizerType(normalize_map[col])
|
||
if col in fuzzy_set:
|
||
algo = Algorithm(algorithm)
|
||
thresh = float(threshold)
|
||
else:
|
||
algo = Algorithm.EXACT
|
||
thresh = 100.0
|
||
col_strats.append(ColumnMatchStrategy(
|
||
column=col, algorithm=algo, threshold=thresh, normalizer=norm,
|
||
))
|
||
strategies.append(MatchStrategy(column_strategies=col_strats))
|
||
|
||
# Add strong key strategies
|
||
if key_cols:
|
||
for col in key_cols:
|
||
strategies.append(MatchStrategy(column_strategies=[
|
||
ColumnMatchStrategy(column=col, algorithm=Algorithm.EXACT, threshold=100.0)
|
||
]))
|
||
|
||
return strategies if strategies else None
|
||
|
||
|
||
def _build_config(
|
||
subset_cols, key_cols, fuzzy_cols,
|
||
algorithm, threshold, normalize_map,
|
||
survivor, date_column, merge,
|
||
) -> DeduplicationConfig:
|
||
"""Build a DeduplicationConfig from GUI state."""
|
||
cfg = DeduplicationConfig(
|
||
survivor_rule=survivor.replace("-", "_"),
|
||
date_column=date_column,
|
||
merge=merge,
|
||
subset_columns=subset_cols or None,
|
||
fuzzy_columns=fuzzy_cols or None,
|
||
default_algorithm=algorithm,
|
||
default_threshold=float(threshold),
|
||
normalize_map=normalize_map or None,
|
||
)
|
||
strategies = _build_strategies(
|
||
subset_cols, key_cols, fuzzy_cols,
|
||
algorithm, threshold, normalize_map,
|
||
)
|
||
if strategies:
|
||
cfg.strategies = [
|
||
StrategyConfig(columns=[
|
||
ColumnStrategyConfig(
|
||
column=cs.column,
|
||
algorithm=cs.algorithm.value,
|
||
threshold=cs.threshold,
|
||
normalizer=cs.normalizer.value if cs.normalizer else None,
|
||
)
|
||
for cs in s.column_strategies
|
||
])
|
||
for s in strategies
|
||
]
|
||
return cfg
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Match group review card
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _find_differing_cols(
|
||
group: MatchResult, df: pd.DataFrame, display_cols: list[str],
|
||
) -> list[str]:
|
||
"""Return columns where values differ across rows in the group."""
|
||
differing = []
|
||
for col in display_cols:
|
||
values = set()
|
||
for idx in group.row_indices:
|
||
values.add(str(df.iloc[idx].get(col, "")).strip())
|
||
if len(values) > 1:
|
||
differing.append(col)
|
||
return differing
|
||
|
||
|
||
def match_group_card(
|
||
group: MatchResult,
|
||
df: pd.DataFrame,
|
||
group_num: int,
|
||
) -> None:
|
||
"""Render an expandable match group card with side-by-side diff.
|
||
|
||
Users select which rows to keep via checkboxes. When exactly one row
|
||
is kept they can also cherry-pick column values from the other rows.
|
||
|
||
Decision format stored in ``st.session_state["review_decisions"]``::
|
||
|
||
{group_id: {"keep_indices": [int, ...], "overrides": {col: val}}}
|
||
"""
|
||
confidence = group.confidence
|
||
matched_on = ", ".join(group.matched_on)
|
||
n_rows = len(group.row_indices)
|
||
gid = group.group_id
|
||
|
||
decisions = st.session_state.get("review_decisions", {})
|
||
has_decision = gid in decisions
|
||
decision_dict = decisions.get(gid, {})
|
||
keep_indices = decision_dict.get("keep_indices", []) if has_decision else []
|
||
overrides = decision_dict.get("overrides", {}) if has_decision else {}
|
||
|
||
# Build label — append decision status if already decided
|
||
label = (
|
||
f"Group {group_num}: {n_rows} rows "
|
||
f"(confidence: {confidence:.0f}%) "
|
||
f"[{matched_on}]"
|
||
)
|
||
if has_decision:
|
||
if len(keep_indices) == n_rows:
|
||
label += " — Kept All"
|
||
elif len(keep_indices) == 1:
|
||
label += " — Merged (customized)" if overrides else " — Merged"
|
||
else:
|
||
label += f" — Split (kept {len(keep_indices)} of {n_rows})"
|
||
|
||
# Decided groups collapse; undecided groups stay open
|
||
expanded = not has_decision
|
||
|
||
display_cols = [c for c in df.columns if not str(c).startswith("_norm_")]
|
||
differing_cols = _find_differing_cols(group, df, display_cols)
|
||
|
||
with st.expander(label, expanded=expanded):
|
||
if has_decision:
|
||
# --- Decided state: read-only table with diff highlighting ---
|
||
rows_data = []
|
||
for idx in group.row_indices:
|
||
row = {"Row": idx + 1}
|
||
for col in display_cols:
|
||
row[col] = df.iloc[idx].get(col, "")
|
||
rows_data.append(row)
|
||
compare_df = pd.DataFrame(rows_data).set_index("Row")
|
||
|
||
def _highlight_diffs(s: pd.Series) -> list[str]:
|
||
styles = []
|
||
first_val = str(s.iloc[0]).strip() if len(s) > 0 else ""
|
||
for val in s:
|
||
val_str = str(val).strip()
|
||
if val_str != first_val and val_str and first_val:
|
||
styles.append(
|
||
"background-color: rgba(245, 166, 35, 0.2)"
|
||
)
|
||
elif not val_str and first_val:
|
||
styles.append(
|
||
"background-color: rgba(240, 82, 82, 0.1)"
|
||
)
|
||
else:
|
||
styles.append("")
|
||
return styles
|
||
|
||
styled = compare_df.style.apply(_highlight_diffs, axis=0)
|
||
st.dataframe(styled, use_container_width=True)
|
||
|
||
if len(keep_indices) == n_rows:
|
||
st.info("Decision: Kept All")
|
||
elif len(keep_indices) == 1:
|
||
msg = "Decision: Merge"
|
||
if overrides:
|
||
msg += f" ({len(overrides)} column(s) customized)"
|
||
st.success(msg)
|
||
else:
|
||
kept = ", ".join(str(i + 1) for i in sorted(keep_indices))
|
||
st.success(
|
||
f"Decision: Keep rows {kept} "
|
||
f"(removing {n_rows - len(keep_indices)})"
|
||
)
|
||
|
||
def _undo(g=gid):
|
||
st.session_state["review_decisions"].pop(g, None)
|
||
st.session_state.pop(f"editor_{g}", None)
|
||
|
||
st.button("Undo", key=f"undo_{gid}", on_click=_undo)
|
||
|
||
else:
|
||
# --- Undecided: interactive editor with inline checkboxes & dropdowns ---
|
||
editor_rows = []
|
||
for idx in group.row_indices:
|
||
row_data = {"Keep": idx == group.survivor_index, "Row": idx + 1}
|
||
for col in display_cols:
|
||
row_data[col] = str(df.iloc[idx].get(col, ""))
|
||
editor_rows.append(row_data)
|
||
editor_df = pd.DataFrame(editor_rows)
|
||
|
||
col_config = {
|
||
"Keep": st.column_config.CheckboxColumn(
|
||
"Keep", default=True, width="small",
|
||
),
|
||
"Row": st.column_config.NumberColumn("Row", width="small"),
|
||
}
|
||
for col in differing_cols:
|
||
vals = []
|
||
for idx in group.row_indices:
|
||
v = str(df.iloc[idx].get(col, "")).strip()
|
||
if v not in vals:
|
||
vals.append(v)
|
||
if "" not in vals:
|
||
vals.append("")
|
||
col_config[col] = st.column_config.SelectboxColumn(
|
||
col, options=vals, required=False,
|
||
)
|
||
|
||
disabled_cols = ["Row"] + [
|
||
c for c in display_cols if c not in differing_cols
|
||
]
|
||
|
||
edited = st.data_editor(
|
||
editor_df,
|
||
column_config=col_config,
|
||
disabled=disabled_cols,
|
||
use_container_width=True,
|
||
hide_index=True,
|
||
key=f"editor_{gid}",
|
||
)
|
||
|
||
# Read which rows are checked
|
||
checked = [
|
||
idx
|
||
for i, idx in enumerate(group.row_indices)
|
||
if edited.iloc[i]["Keep"]
|
||
]
|
||
|
||
if differing_cols:
|
||
st.caption(
|
||
f"Columns with differences (editable): "
|
||
f"{', '.join(differing_cols)}"
|
||
)
|
||
|
||
# Status + surviving rows preview
|
||
if len(checked) == 0:
|
||
st.warning("Select at least one row to keep.")
|
||
else:
|
||
if len(checked) == n_rows:
|
||
st.caption("Keeping all rows (no duplicates removed)")
|
||
elif len(checked) == 1:
|
||
st.caption(
|
||
f"Merging into Row {checked[0] + 1}, "
|
||
f"removing {n_rows - 1} row(s)"
|
||
)
|
||
else:
|
||
st.caption(
|
||
f"Keeping {len(checked)} rows, "
|
||
f"removing {n_rows - len(checked)}"
|
||
)
|
||
|
||
# Build preview of surviving rows with edits applied
|
||
checked_positions = [
|
||
i for i, idx in enumerate(group.row_indices)
|
||
if idx in checked
|
||
]
|
||
preview = edited.iloc[checked_positions].drop(
|
||
columns=["Keep"],
|
||
).reset_index(drop=True)
|
||
st.markdown("**Surviving rows preview:**")
|
||
st.dataframe(preview, use_container_width=True, hide_index=True)
|
||
|
||
# Confirm
|
||
def _on_confirm(
|
||
g=gid, indices=list(group.row_indices),
|
||
diff=differing_cols, surv=group.survivor_index,
|
||
):
|
||
editor_state = st.session_state.get(f"editor_{g}", {})
|
||
ed_rows = editor_state.get("edited_rows", {})
|
||
|
||
# Determine which rows to keep
|
||
keep = []
|
||
for i, idx in enumerate(indices):
|
||
changes = ed_rows.get(i, {})
|
||
default_keep = idx == surv
|
||
if changes.get("Keep", default_keep):
|
||
keep.append(idx)
|
||
if not keep:
|
||
keep = list(indices)
|
||
|
||
# Column overrides (single-survivor merge only)
|
||
ovr: dict[str, str] = {}
|
||
if len(keep) == 1:
|
||
surv_idx = keep[0]
|
||
surv_pos = indices.index(surv_idx)
|
||
surv_changes = ed_rows.get(surv_pos, {})
|
||
the_df = st.session_state["df"]
|
||
for c in diff:
|
||
if c in surv_changes:
|
||
new_val = (
|
||
str(surv_changes[c])
|
||
if surv_changes[c] is not None
|
||
else ""
|
||
)
|
||
orig = str(
|
||
the_df.iloc[surv_idx].get(c, "")
|
||
).strip()
|
||
if new_val.strip() != orig:
|
||
ovr[c] = new_val
|
||
|
||
st.session_state["review_decisions"][g] = {
|
||
"keep_indices": keep,
|
||
"overrides": ovr,
|
||
}
|
||
|
||
st.button(
|
||
"Confirm",
|
||
key=f"confirm_{gid}",
|
||
type="primary",
|
||
on_click=_on_confirm,
|
||
disabled=(len(checked) == 0),
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Results summary + downloads
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def results_summary(
|
||
result: DeduplicationResult,
|
||
original_df: pd.DataFrame,
|
||
) -> None:
|
||
"""Render summary stats and download buttons."""
|
||
removed = result.original_row_count - len(result.deduplicated_df)
|
||
|
||
# Summary metrics
|
||
col1, col2, col3, col4 = st.columns(4)
|
||
col1.metric("Rows In", result.original_row_count)
|
||
col2.metric("Rows Out", len(result.deduplicated_df))
|
||
col3.metric("Removed", removed)
|
||
col4.metric("Groups", len(result.match_groups))
|
||
|
||
st.divider()
|
||
|
||
# Download buttons
|
||
dl_left, dl_mid, dl_right = st.columns(3)
|
||
|
||
with dl_left:
|
||
csv_bytes = result.deduplicated_df.to_csv(index=False).encode("utf-8-sig")
|
||
st.download_button(
|
||
"Download Deduplicated CSV",
|
||
data=csv_bytes,
|
||
file_name="deduplicated.csv",
|
||
mime="text/csv",
|
||
)
|
||
|
||
with dl_mid:
|
||
if not result.removed_df.empty:
|
||
removed_bytes = result.removed_df.to_csv(index=False).encode("utf-8-sig")
|
||
st.download_button(
|
||
"Download Removed Rows",
|
||
data=removed_bytes,
|
||
file_name="removed_rows.csv",
|
||
mime="text/csv",
|
||
)
|
||
|
||
with dl_right:
|
||
if result.match_groups:
|
||
groups_data = _build_match_groups_csv(result, original_df)
|
||
st.download_button(
|
||
"Download Match Groups Report",
|
||
data=groups_data,
|
||
file_name="match_groups.csv",
|
||
mime="text/csv",
|
||
)
|
||
|
||
|
||
def apply_review_decisions(
|
||
original_df: pd.DataFrame,
|
||
match_groups: list[MatchResult],
|
||
decisions: dict,
|
||
) -> tuple[pd.DataFrame, pd.DataFrame]:
|
||
"""Build final DataFrames by applying user review decisions.
|
||
|
||
Supports three modes per group:
|
||
|
||
- **Merge** (1 row kept): single survivor with optional column overrides.
|
||
- **Split** (some rows kept): selected rows survive, others removed.
|
||
- **Keep all** (all rows kept): no rows removed.
|
||
- **No decision**: engine default (single survivor).
|
||
|
||
Returns ``(deduplicated_df, removed_df)``.
|
||
"""
|
||
remove_indices: set[int] = set()
|
||
row_overrides: dict[int, dict[str, str]] = {}
|
||
|
||
for group in match_groups:
|
||
gid = group.group_id
|
||
decision = decisions.get(gid)
|
||
|
||
# No decision yet — accept with engine defaults
|
||
if decision is None:
|
||
keep = {group.survivor_index}
|
||
else:
|
||
keep = set(decision.get("keep_indices", group.row_indices))
|
||
# Safety: never remove all rows in a group
|
||
if not keep:
|
||
keep = set(group.row_indices)
|
||
|
||
for idx in group.row_indices:
|
||
if idx not in keep:
|
||
remove_indices.add(idx)
|
||
|
||
# Column overrides (only meaningful for single-survivor merge)
|
||
ovr = decision.get("overrides", {}) if decision else {}
|
||
if ovr and len(keep) == 1:
|
||
row_overrides[next(iter(keep))] = ovr
|
||
|
||
# Build output DataFrames
|
||
kept = [i for i in range(len(original_df)) if i not in remove_indices]
|
||
|
||
if row_overrides:
|
||
rows = []
|
||
for i in kept:
|
||
row = original_df.iloc[i].copy()
|
||
if i in row_overrides:
|
||
for col, val in row_overrides[i].items():
|
||
if col in row.index:
|
||
row[col] = val
|
||
rows.append(row)
|
||
deduped = pd.DataFrame(rows).reset_index(drop=True)
|
||
else:
|
||
deduped = original_df.iloc[kept].copy().reset_index(drop=True)
|
||
|
||
removed = (
|
||
original_df.iloc[sorted(remove_indices)].copy().reset_index(drop=True)
|
||
if remove_indices
|
||
else pd.DataFrame()
|
||
)
|
||
|
||
return deduped, removed
|
||
|
||
|
||
def _build_match_groups_csv(
|
||
result: DeduplicationResult,
|
||
original_df: pd.DataFrame,
|
||
) -> bytes:
|
||
"""Build the match groups audit CSV as bytes."""
|
||
rows = []
|
||
for g in result.match_groups:
|
||
for idx in g.row_indices:
|
||
row_data = {
|
||
"_group_id": g.group_id + 1,
|
||
"_is_survivor": idx == g.survivor_index,
|
||
"_confidence": g.confidence,
|
||
"_matched_on": ", ".join(g.matched_on),
|
||
"_original_row": idx + 1,
|
||
}
|
||
for col in original_df.columns:
|
||
if not str(col).startswith("_norm_"):
|
||
row_data[col] = original_df.iloc[idx].get(col, "") if idx < len(original_df) else ""
|
||
rows.append(row_data)
|
||
|
||
groups_df = pd.DataFrame(rows)
|
||
return groups_df.to_csv(index=False).encode("utf-8-sig")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Analyzer integration (upload-time data quality findings)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# Tool id -> friendly display name. Single source of truth for the GUI; the
|
||
# CLI keeps its own copy so each entrypoint stays self-contained.
|
||
TOOL_DISPLAY_NAMES: dict[str, str] = {
|
||
"01_deduplicator": "Deduplicator",
|
||
"02_text_cleaner": "Text Cleaner",
|
||
"03_format_standardizer": "Format Standardizer",
|
||
"04_missing_handler": "Missing Value Handler",
|
||
"05_column_mapper": "Column Mapper",
|
||
"06_outlier_detector": "Outlier Detector",
|
||
"07_multi_file_merger": "Multi-File Merger",
|
||
"08_validator_reporter": "Validator & Reporter",
|
||
"09_pipeline_runner": "Pipeline Runner",
|
||
}
|
||
|
||
_SEVERITY_ICON: dict[str, str] = {
|
||
"info": "ℹ️",
|
||
"warn": "⚠️",
|
||
"error": "🛑",
|
||
}
|
||
|
||
_SEVERITY_COLOR: dict[str, str] = {
|
||
"info": "blue",
|
||
"warn": "orange",
|
||
"error": "red",
|
||
}
|
||
|
||
# Map tool id to the streamlit page path under src/gui/. Skipped tools (no
|
||
# page yet) return empty string and the "Open" button is omitted.
|
||
_TOOL_PAGE_PATHS: dict[str, str] = {
|
||
"01_deduplicator": "pages/1_Deduplicator.py",
|
||
"02_text_cleaner": "pages/2_Text_Cleaner.py",
|
||
"03_format_standardizer": "pages/3_Format_Standardizer.py",
|
||
"04_missing_handler": "pages/4_Missing_Values.py",
|
||
"05_column_mapper": "pages/5_Column_Mapper.py",
|
||
"06_outlier_detector": "pages/6_Outlier_Detector.py",
|
||
"07_multi_file_merger": "pages/7_Multi_File_Merger.py",
|
||
"08_validator_reporter": "pages/8_Validator_Reporter.py",
|
||
"09_pipeline_runner": "pages/9_Pipeline_Runner.py",
|
||
}
|
||
|
||
|
||
def tool_display_name(tool_id: str) -> str:
|
||
"""Map a stable tool id to its GUI display name; falls back to the id."""
|
||
return TOOL_DISPLAY_NAMES.get(tool_id, tool_id) if tool_id else "Informational"
|
||
|
||
|
||
def _tool_page_slug(tool_id: str) -> str:
|
||
return _TOOL_PAGE_PATHS.get(tool_id, "")
|
||
|
||
|
||
def render_findings_panel(findings, *, header: str = "Detected issues") -> None:
|
||
"""Render a list of :class:`Finding` objects grouped by tool.
|
||
|
||
Each tool gets a header with the count, an open-tool button, and a list
|
||
of the findings underneath. Severity icon + count are shown inline so
|
||
the user can decide which tool to open first.
|
||
"""
|
||
from src.core.analyze import findings_by_tool # local import to avoid cycle
|
||
from src.core.text_clean import hidden_char_css
|
||
|
||
if not findings:
|
||
st.success("No issues detected. Open any tool below to start working.")
|
||
return
|
||
|
||
# Inject the hidden-char badge styles once so every sample value below
|
||
# can render leading/trailing whitespace and invisibles as visible badges.
|
||
st.markdown(hidden_char_css() + _SAMPLE_TABLE_CSS, unsafe_allow_html=True)
|
||
|
||
by_sev: dict[str, int] = {}
|
||
for f in findings:
|
||
by_sev[f.severity] = by_sev.get(f.severity, 0) + 1
|
||
sev_summary = " · ".join(
|
||
f"{_SEVERITY_ICON[s]} {by_sev[s]} {s}"
|
||
for s in ("error", "warn", "info") if by_sev.get(s)
|
||
)
|
||
st.markdown(f"### {header}")
|
||
st.caption(sev_summary)
|
||
|
||
grouped = findings_by_tool(findings)
|
||
untargeted = [f for f in findings if not f.tool]
|
||
|
||
for tool_id in sorted(grouped):
|
||
items = grouped[tool_id]
|
||
with st.expander(
|
||
f"{tool_display_name(tool_id)} — {len(items)} finding(s)",
|
||
expanded=any(f.severity == "error" for f in items),
|
||
):
|
||
for f in items:
|
||
_render_one_finding(f)
|
||
page_slug = _tool_page_slug(tool_id)
|
||
if page_slug:
|
||
# Streamlit resolves page paths relative to the entrypoint
|
||
# (src/gui/app.py), so a leading ``src/gui/`` would point
|
||
# outside the allowed page tree on Windows.
|
||
st.page_link(page_slug, label=f"Open {tool_display_name(tool_id)} →")
|
||
|
||
if untargeted:
|
||
with st.expander(
|
||
f"Other / file-level — {len(untargeted)} finding(s)",
|
||
expanded=False,
|
||
):
|
||
for f in untargeted:
|
||
_render_one_finding(f)
|
||
|
||
|
||
_PREVIEW_TABLE_CSS = """
|
||
<style>
|
||
.hidden-aware-preview {
|
||
width: 100%;
|
||
border-collapse: collapse;
|
||
font-size: 0.9em;
|
||
}
|
||
.hidden-aware-preview th,
|
||
.hidden-aware-preview td {
|
||
padding: 4px 8px;
|
||
border: 1px solid #eee;
|
||
text-align: left;
|
||
vertical-align: top;
|
||
font-family: ui-monospace, SFMono-Regular, monospace;
|
||
/* pre-wrap so internal ASCII whitespace and embedded newlines render
|
||
as the user wrote them; otherwise browsers collapse adjacent spaces. */
|
||
white-space: pre-wrap;
|
||
word-break: break-word;
|
||
max-width: 32em;
|
||
}
|
||
.hidden-aware-preview thead th {
|
||
background: #f6f8fa;
|
||
position: sticky;
|
||
top: 0;
|
||
}
|
||
.hidden-aware-preview tbody tr:nth-child(even) { background: #fafafa; }
|
||
.hidden-aware-preview .row-num {
|
||
color: #888;
|
||
font-family: inherit;
|
||
background: #f6f8fa;
|
||
text-align: right;
|
||
}
|
||
.hidden-aware-preview-wrap {
|
||
max-height: 26rem;
|
||
overflow: auto;
|
||
border: 1px solid #eee;
|
||
border-radius: 4px;
|
||
}
|
||
</style>
|
||
"""
|
||
|
||
|
||
def render_hidden_aware_preview(
|
||
df,
|
||
*,
|
||
n_rows: int = 10,
|
||
caption: str | None = None,
|
||
) -> None:
|
||
"""Render a DataFrame preview that shows hidden characters in every cell.
|
||
|
||
Used for the Text Cleaner's "before" and "after" previews so the user
|
||
can actually see the leading/trailing whitespace, NBSP padding,
|
||
zero-width characters, and smart punctuation that the cleaner is going
|
||
to remove (or just removed). A plain ``st.dataframe`` collapses outer
|
||
ASCII whitespace and renders invisibles as nothing, defeating the
|
||
point of a preview in a cleanup tool.
|
||
|
||
Headers and cell values are both routed through
|
||
:func:`visualize_hidden_html` with ``mark_outer_whitespace=True``.
|
||
"""
|
||
import pandas as pd
|
||
from src.core.text_clean import hidden_char_css, visualize_hidden_html
|
||
|
||
if df is None or len(df) == 0:
|
||
st.info("No rows to preview.")
|
||
return
|
||
|
||
sliced = df.head(n_rows) if len(df) > n_rows else df
|
||
|
||
st.markdown(hidden_char_css() + _PREVIEW_TABLE_CSS, unsafe_allow_html=True)
|
||
if caption:
|
||
st.caption(caption)
|
||
|
||
header_cells = "".join(
|
||
f"<th>{visualize_hidden_html(str(c), mark_outer_whitespace=True)}</th>"
|
||
for c in sliced.columns
|
||
)
|
||
|
||
body_rows: list[str] = []
|
||
for row_idx, (orig_idx, row) in enumerate(sliced.iterrows(), start=1):
|
||
cells = ["<td class='row-num'>" + str(row_idx) + "</td>"]
|
||
for col in sliced.columns:
|
||
value = row[col]
|
||
if isinstance(value, str):
|
||
rendered = visualize_hidden_html(value, mark_outer_whitespace=True)
|
||
elif pd.isna(value):
|
||
rendered = "<span style='color:#aaa'>NaN</span>"
|
||
else:
|
||
# Non-string scalars (numerics, bools) just stringify; they
|
||
# won't have invisible chars but we still need html-escape.
|
||
rendered = visualize_hidden_html(str(value))
|
||
cells.append(f"<td>{rendered}</td>")
|
||
body_rows.append("<tr>" + "".join(cells) + "</tr>")
|
||
|
||
st.markdown(
|
||
"<div class='hidden-aware-preview-wrap'>"
|
||
"<table class='hidden-aware-preview'>"
|
||
f"<thead><tr><th class='row-num'>#</th>{header_cells}</tr></thead>"
|
||
f"<tbody>{''.join(body_rows)}</tbody>"
|
||
"</table>"
|
||
"</div>",
|
||
unsafe_allow_html=True,
|
||
)
|
||
|
||
|
||
_SAMPLE_TABLE_CSS = """
|
||
<style>
|
||
.findings-sample-table {
|
||
width: 100%;
|
||
border-collapse: collapse;
|
||
font-size: 0.9em;
|
||
}
|
||
.findings-sample-table th,
|
||
.findings-sample-table td {
|
||
padding: 4px 8px;
|
||
border-bottom: 1px solid #eee;
|
||
text-align: left;
|
||
vertical-align: top;
|
||
}
|
||
.findings-sample-table td.value {
|
||
font-family: ui-monospace, SFMono-Regular, monospace;
|
||
/* pre-wrap so any ASCII whitespace inside the value is preserved
|
||
visually (browsers collapse adjacent spaces by default). */
|
||
white-space: pre-wrap;
|
||
word-break: break-word;
|
||
}
|
||
.findings-sample-table tbody tr:hover { background: #fafafa; }
|
||
</style>
|
||
"""
|
||
|
||
|
||
def _render_one_finding(f) -> None:
|
||
from src.core.text_clean import visualize_hidden_html
|
||
|
||
color = _SEVERITY_COLOR[f.severity]
|
||
icon = _SEVERITY_ICON[f.severity]
|
||
column_part = f" in `{f.column}`" if getattr(f, "column", None) else ""
|
||
st.markdown(
|
||
f"{icon} :{color}[**{f.id}**]{column_part} — {f.description}"
|
||
)
|
||
if f.samples:
|
||
# Render samples as an HTML table so leading/trailing whitespace
|
||
# and invisible characters in the value column show up as badges.
|
||
# A plain st.dataframe collapses outer whitespace and renders
|
||
# NBSP/ZWSP as nothing, defeating the point of the audit.
|
||
rows_html = []
|
||
for row, col, value in f.samples:
|
||
rendered_value = visualize_hidden_html(
|
||
str(value), mark_outer_whitespace=True,
|
||
)
|
||
rendered_col = visualize_hidden_html(
|
||
str(col), mark_outer_whitespace=True,
|
||
)
|
||
rows_html.append(
|
||
"<tr>"
|
||
f"<td>{int(row) + 1 if isinstance(row, int) else row}</td>"
|
||
f"<td><code>{rendered_col}</code></td>"
|
||
f"<td class='value'>{rendered_value}</td>"
|
||
"</tr>"
|
||
)
|
||
st.markdown(
|
||
"<table class='findings-sample-table'>"
|
||
"<thead><tr>"
|
||
"<th>Row</th><th>Column</th><th>Value</th>"
|
||
"</tr></thead>"
|
||
f"<tbody>{''.join(rows_html)}</tbody>"
|
||
"</table>",
|
||
unsafe_allow_html=True,
|
||
)
|
||
|
||
|
||
def upload_and_analyze_section() -> None:
|
||
"""Render the upload + analyze panel for the home page.
|
||
|
||
Stashes the uploaded file (name + bytes) and findings in session state
|
||
so individual tool pages can pick them up if they want to skip their
|
||
own uploader. Each tool page already has its own uploader today, so
|
||
this is purely additive.
|
||
"""
|
||
st.markdown("### 📤 Upload a file to start")
|
||
st.caption(
|
||
"Optional: scan an uploaded file for data quality issues and see "
|
||
"which tools can fix each one. Skip if you already know what you need."
|
||
)
|
||
st.caption(
|
||
"**Up to 1 GB.** Formats: CSV, TSV, XLSX, XLS. "
|
||
"Delimiters auto-detected: comma, tab, semicolon, pipe. "
|
||
"Encodings auto-detected: UTF-8 (with/without BOM), UTF-16, "
|
||
"cp1252, Latin-1/9, cp1250, ISO-8859-2, cp1251, KOI8-R, "
|
||
"Mac Roman, Shift_JIS, GB18030, Big5, EUC-KR — and override on the Review page."
|
||
)
|
||
|
||
uploaded = st.file_uploader(
|
||
"Upload CSV or Excel",
|
||
type=["csv", "tsv", "xlsx", "xls"],
|
||
key="home_upload",
|
||
help=(
|
||
"Up to 1 GB. Comma / tab / semicolon / pipe delimiters all "
|
||
"auto-detected. Encoding auto-detected with override on the "
|
||
"Review page if needed."
|
||
),
|
||
)
|
||
if uploaded is None:
|
||
return
|
||
|
||
# Stash on every fresh upload so all tool pages can pick it up.
|
||
if (
|
||
st.session_state.get("home_uploaded_name") != uploaded.name
|
||
or st.session_state.get("home_uploaded_size") != uploaded.size
|
||
):
|
||
st.session_state["home_uploaded_name"] = uploaded.name
|
||
st.session_state["home_uploaded_size"] = uploaded.size
|
||
st.session_state["home_uploaded_bytes"] = uploaded.getvalue()
|
||
# Drop stale findings on a new upload.
|
||
st.session_state.pop("home_findings", None)
|
||
st.session_state.pop("home_skipped", None)
|
||
|
||
col_run, col_skip, _ = st.columns([1, 1, 4])
|
||
with col_run:
|
||
run_clicked = st.button("Run analysis", type="primary", key="home_run_analysis")
|
||
with col_skip:
|
||
skip_clicked = st.button("Skip", key="home_skip_analysis")
|
||
|
||
if skip_clicked:
|
||
st.session_state["home_findings"] = []
|
||
st.session_state["home_skipped"] = True
|
||
|
||
if run_clicked:
|
||
with st.spinner("Scanning…"):
|
||
findings = _run_analysis_on_upload(uploaded)
|
||
st.session_state["home_findings"] = findings
|
||
st.session_state["home_skipped"] = False
|
||
|
||
findings = st.session_state.get("home_findings")
|
||
if findings is None:
|
||
return
|
||
|
||
if st.session_state.get("home_skipped"):
|
||
st.info("Analysis skipped. Open any tool below to start working.")
|
||
return
|
||
|
||
st.divider()
|
||
render_findings_panel(findings)
|
||
|
||
|
||
def _run_analysis_on_upload(uploaded):
|
||
"""Read the uploaded file with pre-parse repair, then analyze."""
|
||
from src.core.analyze import analyze
|
||
from src.core.io import repair_bytes
|
||
|
||
name = uploaded.name
|
||
data = uploaded.getvalue()
|
||
suffix = name.rsplit(".", 1)[-1].lower() if "." in name else ""
|
||
|
||
if suffix in ("xlsx", "xls"):
|
||
df = pd.read_excel(io.BytesIO(data), dtype=str, keep_default_na=False)
|
||
return analyze(df)
|
||
|
||
# CSV / TSV: run repair_bytes so the user sees csv_* findings.
|
||
text_head = data[:4096].decode("utf-8", errors="replace")
|
||
delim = "\t" if suffix == "tsv" else ","
|
||
if delim == ",":
|
||
for cand in ("\t", ";", "|"):
|
||
if text_head.count(cand) > text_head.count(",") * 1.5:
|
||
delim = cand
|
||
break
|
||
repair = repair_bytes(data, encoding="utf-8", delimiter=delim)
|
||
df = pd.read_csv(
|
||
io.BytesIO(repair.repaired_bytes),
|
||
encoding="utf-8", delimiter=delim,
|
||
dtype=str, keep_default_na=False, on_bad_lines="warn",
|
||
)
|
||
return analyze(df, repair_result=repair)
|
||
|
||
|
||
def findings_count_for_tool(tool_id: str) -> int:
|
||
"""How many findings in session state target *tool_id*; 0 when none.
|
||
|
||
Used by the home-page tool grid to badge cards that have actionable
|
||
findings without re-running the analyzer.
|
||
"""
|
||
findings = st.session_state.get("home_findings") or []
|
||
return sum(1 for f in findings if f.tool == tool_id)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Cross-page upload pickup
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class _StashedUpload:
|
||
"""Duck-types ``st.runtime.uploaded_file_manager.UploadedFile`` enough
|
||
for the tool pages: ``.name``, ``.size``, ``.getvalue()``.
|
||
|
||
Tool pages that previously consumed a Streamlit ``UploadedFile`` can
|
||
accept this in its place without changes.
|
||
"""
|
||
|
||
__slots__ = ("name", "size", "_data")
|
||
|
||
def __init__(self, name: str, data: bytes) -> None:
|
||
self.name = name
|
||
self.size = len(data)
|
||
self._data = data
|
||
|
||
def getvalue(self) -> bytes:
|
||
return self._data
|
||
|
||
def read(self) -> bytes:
|
||
return self._data
|
||
|
||
|
||
def require_normalization_gate() -> None:
|
||
"""Block the calling tool page until the upload has passed the gate.
|
||
|
||
Tool pages should call this immediately after their imports. When the
|
||
current session upload has not been normalized — no
|
||
``normalization_result``, the result is for a different upload, or the
|
||
result didn't pass — the user is shown a banner and a button to jump
|
||
to the Review page; the rest of the page is short-circuited via
|
||
``st.stop()``.
|
||
|
||
Pages that genuinely don't need a clean dataframe (rare) can opt out
|
||
by simply not calling this.
|
||
"""
|
||
import hashlib
|
||
has_upload = st.session_state.get("home_uploaded_bytes") is not None
|
||
if not has_upload:
|
||
# No upload yet — let the page's own uploader handle it; the gate
|
||
# will kick in once a file is present.
|
||
return
|
||
|
||
upload_hash = hashlib.sha256(
|
||
st.session_state["home_uploaded_bytes"]
|
||
).hexdigest()
|
||
result = st.session_state.get("normalization_result")
|
||
matched = (
|
||
result is not None
|
||
and st.session_state.get("normalization_for") == upload_hash
|
||
and getattr(result, "passed", False)
|
||
)
|
||
if matched:
|
||
return
|
||
|
||
name = st.session_state.get("home_uploaded_name", "the uploaded file")
|
||
st.warning(
|
||
f"**{name}** must pass the CSV-normalization gate before you can "
|
||
f"use this tool. Open the Review page to apply the fixes our "
|
||
f"analyzer recommends."
|
||
)
|
||
if st.button("Go to Review & Normalize", type="primary"):
|
||
st.switch_page("pages/0_Review.py")
|
||
st.stop()
|
||
|
||
|
||
def pickup_or_upload(
|
||
*,
|
||
label: str,
|
||
key: str,
|
||
types: list[str],
|
||
help: str | None = None,
|
||
):
|
||
"""Return an upload object, preferring the home-page upload when present.
|
||
|
||
Behavior:
|
||
|
||
- If ``st.session_state['home_uploaded_bytes']`` is set and the user
|
||
hasn't asked for a different file on this page, render a banner
|
||
("Using *<name>* from upload screen") plus a "Use a different file"
|
||
button, and return a :class:`_StashedUpload` shim.
|
||
- Otherwise render the standard ``st.file_uploader`` with the supplied
|
||
*label*, *key*, and *types*. Returns the Streamlit ``UploadedFile``
|
||
directly (or ``None`` if nothing uploaded).
|
||
|
||
The ``_StashedUpload`` shim exposes ``.name``, ``.size``, and
|
||
``.getvalue()`` so existing tool-page code that consumes a Streamlit
|
||
upload object works without changes.
|
||
"""
|
||
override_key = f"{key}__override"
|
||
has_session_upload = st.session_state.get("home_uploaded_bytes") is not None
|
||
use_session = has_session_upload and not st.session_state.get(override_key, False)
|
||
|
||
if use_session:
|
||
name = st.session_state.get("home_uploaded_name", "uploaded file")
|
||
st.info(f"Using **{name}** from the upload screen.")
|
||
if st.button("Use a different file", key=f"{key}__pick_diff"):
|
||
st.session_state[override_key] = True
|
||
st.rerun()
|
||
return _StashedUpload(name, st.session_state["home_uploaded_bytes"])
|
||
|
||
if {"csv", "tsv", "xlsx", "xls"} & set(types):
|
||
st.caption(
|
||
"Up to 1 GB. Delimiters auto-detected: comma, tab, semicolon, pipe. "
|
||
"Encoding auto-detected (UTF-8 / UTF-16 / cp1252 / Latin-1 family / "
|
||
"cp1250 / cp1251 / KOI8-R / Mac Roman / Shift_JIS / GB18030 / Big5 / "
|
||
"EUC-KR), with override on the Review page."
|
||
)
|
||
uploaded = st.file_uploader(label, type=types, key=key, help=help)
|
||
if uploaded is not None and st.session_state.get(override_key):
|
||
# User has uploaded their own file on this page; clear the override
|
||
# so the next visit to a tool page starts fresh.
|
||
pass
|
||
if uploaded is None and st.session_state.get(override_key) and has_session_upload:
|
||
if st.button("Switch back to upload-screen file", key=f"{key}__switch_back"):
|
||
st.session_state[override_key] = False
|
||
st.rerun()
|
||
return uploaded
|