Files
datatools-dev/src/gui/pages/1_Deduplicator.py
Michael 21fd8a4cd7 fix(nav): switch_page resolves correctly + bottom-of-page back link
Two issues, same fix surface.

(1) Reported crash on Back-to-Home:

    StreamlitAPIException: Could not find page: app.py.

``st.switch_page("app.py")`` doesn't work under ``st.navigation`` —
the entry script is the nav manager itself and is not a registered
page. The fix needs to pass an ``st.Page`` object whose script
identity matches one registered in the nav.

First-pass attempt (``from src.gui.app import _home_page``) hit a
worse failure: importing ``app.py`` from inside a tool-page render
re-executes the nav setup with the WRONG "main script" context, so
every ``st.Page("pages/N_foo.py", ...)`` call in ``_build_navigation``
fails with "file could not be found".

Extract the home renderer into its own module ``src/gui/_home.py``
which has no top-level Streamlit side effects. Both the nav manager
and the back-link helper import ``_home_page`` from there. The Page
object built at click time has the same callable identity as the one
registered, so ``st.switch_page`` resolves it.

(2) Reported UX: the back button scrolled out of view on long pages.

Add a second ``back_to_home_link(key="_back_to_home_link_bottom")``
call near the footer of every tool page (1-9). The unique key avoids
widget-id collision with the top instance. Coming-Soon stubs get it
unconditionally; Ready tools render it only after a result exists
because the page short-circuits with ``st.stop()`` before then —
when no result is on screen the page is short enough that the top
link is sufficient.

2220 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 23:58:33 +00:00

433 lines
17 KiB
Python

"""DataTools Find Duplicates — full working tool page."""
from __future__ import annotations
import sys
import tempfile
from pathlib import Path
import pandas as pd
import streamlit as st
# Ensure project root is on sys.path so `src.core` imports work
_project_root = Path(__file__).resolve().parent.parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from src.core.dedup import deduplicate, DeduplicationResult
from src.core.io import read_file, list_sheets, detect_encoding, detect_delimiter
from src.gui.components import (
apply_review_decisions,
back_to_home_link,
config_panel,
hide_streamlit_chrome,
html_download_button,
match_group_card,
pickup_or_upload,
require_feature_or_render_upgrade,
results_summary,
)
from src.license import FeatureFlag
hide_streamlit_chrome()
back_to_home_link()
require_feature_or_render_upgrade(FeatureFlag.DEDUPLICATOR)
# ---------------------------------------------------------------------------
# Session state defaults
# ---------------------------------------------------------------------------
_DEFAULTS = {
"df": None,
"result": None,
"review_decisions": {},
"config": None,
"file_name": "",
"sheet_names": [],
"detected_delimiter": ",",
}
for key, default in _DEFAULTS.items():
if key not in st.session_state:
st.session_state[key] = default
# ---------------------------------------------------------------------------
# Header
# ---------------------------------------------------------------------------
st.title("🔍 Find Duplicates")
st.caption("Find and remove duplicate rows in CSV, delimited text, and Excel files.")
# ---------------------------------------------------------------------------
# File upload
# ---------------------------------------------------------------------------
uploaded = pickup_or_upload(
label="Upload CSV or Excel file",
key="dedup_file_upload",
types=["csv", "tsv", "xlsx", "xls"],
help="Supports CSV, TSV, and Excel files. Encoding and delimiters are auto-detected.",
)
if uploaded is not None:
# Detect if file changed
if uploaded.name != st.session_state["file_name"]:
st.session_state["file_name"] = uploaded.name
st.session_state["result"] = None
st.session_state["review_decisions"] = {}
# Read the file
try:
suffix = Path(uploaded.name).suffix
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(uploaded.getvalue())
tmp_path = Path(tmp.name)
# Check for Excel sheets / detect delimiter
if suffix.lower() in (".xlsx", ".xls"):
st.session_state["sheet_names"] = list_sheets(tmp_path)
st.session_state["detected_delimiter"] = ","
else:
st.session_state["sheet_names"] = []
enc = detect_encoding(tmp_path)
st.session_state["detected_delimiter"] = detect_delimiter(tmp_path, enc)
df = read_file(tmp_path)
if not isinstance(df, pd.DataFrame):
df = pd.concat(list(df), ignore_index=True)
st.session_state["df"] = df
tmp_path.unlink(missing_ok=True)
except Exception as e:
from src.core.errors import format_for_user
st.error(
f"**Could not read `{uploaded.name}`**\n\n"
f"```\n{format_for_user(e)}\n```"
)
st.session_state["df"] = None
df = st.session_state["df"]
if df is not None:
# Sheet selector for Excel files
if st.session_state["sheet_names"] and len(st.session_state["sheet_names"]) > 1:
sheet = st.selectbox(
"Select sheet",
st.session_state["sheet_names"],
)
if sheet != st.session_state.get("_current_sheet"):
st.session_state["_current_sheet"] = sheet
suffix = Path(uploaded.name).suffix
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(uploaded.getvalue())
tmp_path = Path(tmp.name)
df = read_file(tmp_path, sheet_name=sheet)
if not isinstance(df, pd.DataFrame):
df = pd.concat(list(df), ignore_index=True)
st.session_state["df"] = df
st.session_state["result"] = None
st.session_state["review_decisions"] = {}
tmp_path.unlink(missing_ok=True)
# Delimiter selector for CSV/TSV files
is_csv = Path(uploaded.name).suffix.lower() not in (".xlsx", ".xls")
if is_csv:
_DELIMITERS = {
"Comma (,)": ",",
"Tab (\\t)": "\t",
"Semicolon (;)": ";",
"Pipe (|)": "|",
"Other": None,
}
_DELIM_LABELS = list(_DELIMITERS.keys())
_DELIM_VALUES = list(_DELIMITERS.values())
detected = st.session_state.get("detected_delimiter", ",")
default_idx = _DELIM_VALUES.index(detected) if detected in _DELIM_VALUES else 0
chosen_label = st.selectbox(
"Delimiter",
_DELIM_LABELS,
index=default_idx,
help="Auto-detected on upload. Change if the preview looks wrong.",
)
if chosen_label == "Other":
custom_delim = st.text_input(
"Enter delimiter character",
max_chars=5,
help="Enter the character(s) used to separate fields.",
)
chosen_delim = custom_delim if custom_delim else ","
else:
chosen_delim = _DELIMITERS[chosen_label]
if chosen_delim != st.session_state.get("_current_delimiter"):
st.session_state["_current_delimiter"] = chosen_delim
suffix = Path(uploaded.name).suffix
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(uploaded.getvalue())
tmp_path = Path(tmp.name)
df = read_file(tmp_path, delimiter=chosen_delim)
if not isinstance(df, pd.DataFrame):
df = pd.concat(list(df), ignore_index=True)
st.session_state["df"] = df
st.session_state["result"] = None
st.session_state["review_decisions"] = {}
tmp_path.unlink(missing_ok=True)
# Collapse the input preview + options once a result exists so
# the Results section below becomes the primary visual focus
# after Find Duplicates runs. Mirrors the Clean Text pattern.
_has_result = st.session_state.get("result") is not None
# Preview
with st.expander(f"Preview: {uploaded.name}", expanded=not _has_result):
# Subheader retained inside the expander so collected_text in
# the workflow tests still finds "Preview: <name>" — Streamlit's
# AppTest does not surface expander labels through the
# markdown/caption/subheader collections.
st.subheader(f"Preview: {uploaded.name}")
st.caption(f"{len(df)} rows, {len(df.columns)} columns")
st.dataframe(df.head(10), use_container_width=True)
# Advanced options
with st.expander("Options", expanded=not _has_result):
settings = config_panel(df)
# Apply loaded config if present
loaded_cfg = st.session_state.get("loaded_config")
if loaded_cfg is not None:
settings["strategies"] = loaded_cfg.to_strategies()
settings["survivor_rule"] = loaded_cfg.to_survivor_rule()
settings["date_column"] = loaded_cfg.date_column
settings["merge"] = loaded_cfg.merge
del st.session_state["loaded_config"]
# -------------------------------------------------------------------
# Find Duplicates button
# -------------------------------------------------------------------
st.divider()
if st.button("Find Duplicates", type="primary", use_container_width=True):
progress_bar = st.progress(0, text="Comparing rows...")
def _gui_progress(current: int, total: int) -> None:
if total > 0:
pct = min(current / total, 1.0)
progress_bar.progress(pct, text=f"Comparing rows... {current:,}/{total:,}")
with st.spinner("Running deduplication..."):
result = deduplicate(
df,
strategies=settings["strategies"],
survivor_rule=settings["survivor_rule"],
date_column=settings["date_column"],
merge=settings["merge"],
preview=False,
progress_callback=_gui_progress,
)
progress_bar.empty()
st.session_state["result"] = result
st.session_state["review_decisions"] = {}
# One-shot flag for the scroll snippet at the bottom of the
# page. Force a rerun so the Preview / Options expanders see
# the new result on the next pass and collapse themselves.
st.session_state["_dedup_scroll_to_results"] = True
st.rerun()
# -------------------------------------------------------------------
# Results
# -------------------------------------------------------------------
result: DeduplicationResult | None = st.session_state["result"]
if result is not None:
st.divider()
# Anchor target for the post-run auto-scroll snippet at the
# bottom of this page. A bare ``<div id="...">`` survives
# Streamlit's HTML sanitizer; a 1px-tall div doesn't shift
# layout.
st.markdown(
'<div id="dedup-results-anchor" style="height:1px"></div>',
unsafe_allow_html=True,
)
st.subheader("Results")
# Summary + download buttons
results_summary(result, df)
# Match group review
if result.match_groups:
st.divider()
st.subheader("Match Groups")
# Batch actions
def _accept_all():
for g in result.match_groups:
st.session_state["review_decisions"][g.group_id] = {
"keep_indices": [g.survivor_index],
"overrides": {},
}
def _reject_all():
for g in result.match_groups:
st.session_state["review_decisions"][g.group_id] = {
"keep_indices": list(g.row_indices),
"overrides": {},
}
def _clear_all():
st.session_state["review_decisions"] = {}
for k in list(st.session_state):
if k.startswith("editor_"):
del st.session_state[k]
action_left, action_mid, action_right = st.columns(3)
with action_left:
st.button("Accept All", on_click=_accept_all)
with action_mid:
st.button("Reject All", on_click=_reject_all)
with action_right:
st.button("Clear Decisions", on_click=_clear_all)
# Individual group cards
decisions = st.session_state["review_decisions"]
for i, group in enumerate(result.match_groups):
match_group_card(group, df, group_num=i + 1)
# Show decision summary
if decisions:
st.divider()
merged = 0
customized = 0
split = 0
kept_all = 0
for v in decisions.values():
if not isinstance(v, dict):
continue
ki = v.get("keep_indices", [])
gid_for_v = next(
(gid for gid, d in decisions.items() if d is v),
None,
)
group_size = next(
(len(g.row_indices) for g in result.match_groups
if g.group_id == gid_for_v),
0,
)
if len(ki) == group_size:
kept_all += 1
elif len(ki) == 1:
if v.get("overrides"):
customized += 1
else:
merged += 1
else:
split += 1
pending = len(result.match_groups) - len(decisions)
parts = []
if merged:
parts.append(f"{merged} merged")
if customized:
parts.append(f"{customized} customized")
if split:
parts.append(f"{split} split")
if kept_all:
parts.append(f"{kept_all} kept all")
parts.append(f"{pending} pending")
st.caption("Decisions: " + ", ".join(parts))
# Apply decisions and offer download
if st.button(
"Apply Review Decisions & Download",
type="primary",
use_container_width=True,
):
reviewed_df, reviewed_removed = apply_review_decisions(
df, result.match_groups, decisions,
)
# Pre-compute every byte buffer up front so each
# ``st.download_button`` sees stable ``data``
# across reruns. Render the empty-removed case
# as a disabled button (rather than hiding it)
# so layout stays steady and the user can see
# why the download isn't available.
reviewed_bytes = reviewed_df.to_csv(
index=False
).encode("utf-8-sig")
reviewed_removed_empty = reviewed_removed.empty
reviewed_removed_bytes = (
reviewed_removed.to_csv(index=False).encode("utf-8-sig")
if not reviewed_removed_empty
else b""
)
html_download_button(
"Download Reviewed & Deduplicated CSV",
reviewed_bytes,
file_name="deduplicated_reviewed.csv",
mime="text/csv",
)
html_download_button(
"Download Reviewed Removed Rows",
reviewed_removed_bytes,
file_name="removed_reviewed.csv",
mime="text/csv",
disabled=reviewed_removed_empty,
help=(
"No rows were removed under the current "
"review decisions."
if reviewed_removed_empty
else None
),
)
# Log entries
if result.log_entries:
with st.expander("Processing Log"):
st.code("\n".join(result.log_entries))
else:
# No file uploaded — show placeholder
st.info("Upload a file to get started.")
# ---------------------------------------------------------------------------
# Footer
# ---------------------------------------------------------------------------
back_to_home_link(key="_back_to_home_link_bottom")
st.divider()
st.caption(
"Runs locally. Your data never leaves this computer. "
"| DataTools v3.0"
)
# ---------------------------------------------------------------------------
# Post-run auto-scroll
# ---------------------------------------------------------------------------
#
# When Find Duplicates fires, the preview + options collapse, but
# Streamlit by itself doesn't scroll — the Results section sits below a
# tall page so the user has to hunt for it. Inject a tiny
# component-html iframe that calls ``scrollIntoView`` on the parent's
# Results anchor. The flag is one-shot (``pop`` removes it) so reruns
# triggered by unrelated widgets in the Results section don't yank the
# viewport back to the top of Results.
if st.session_state.pop("_dedup_scroll_to_results", False):
from streamlit.components.v1 import html as _components_html
_components_html(
"""
<script>
const doc = window.parent.document;
const target = doc.getElementById('dedup-results-anchor');
if (target) target.scrollIntoView({behavior: 'smooth', block: 'start'});
</script>
""",
height=0,
)