"""DataTools Find Duplicates — full working tool page.""" from __future__ import annotations import sys import tempfile from pathlib import Path import pandas as pd import streamlit as st # Ensure project root is on sys.path so `src.core` imports work _project_root = Path(__file__).resolve().parent.parent.parent.parent if str(_project_root) not in sys.path: sys.path.insert(0, str(_project_root)) from src.core.dedup import deduplicate, DeduplicationResult from src.core.io import read_file, list_sheets, detect_encoding, detect_delimiter from src.gui.components import ( apply_review_decisions, back_to_home_link, render_sticky_footer, config_panel, hide_streamlit_chrome, html_download_button, match_group_card, pickup_or_upload, require_feature_or_render_upgrade, results_summary, ) from src.i18n import t from src.license import FeatureFlag hide_streamlit_chrome() render_sticky_footer() require_feature_or_render_upgrade(FeatureFlag.DEDUPLICATOR) # --------------------------------------------------------------------------- # Session state defaults # --------------------------------------------------------------------------- _DEFAULTS = { "df": None, "result": None, "review_decisions": {}, "config": None, "file_name": "", "sheet_names": [], "detected_delimiter": ",", } for key, default in _DEFAULTS.items(): if key not in st.session_state: st.session_state[key] = default # --------------------------------------------------------------------------- # Header # --------------------------------------------------------------------------- st.title(t("tools.01_deduplicator.page_title")) st.caption(t("tools.01_deduplicator.page_caption")) # --------------------------------------------------------------------------- # File upload # --------------------------------------------------------------------------- uploaded = pickup_or_upload( label="Upload CSV or Excel file", key="dedup_file_upload", types=["csv", "tsv", "xlsx", "xls"], help="Supports CSV, TSV, and Excel files. Encoding and delimiters are auto-detected.", ) if uploaded is not None: # Detect if file changed if uploaded.name != st.session_state["file_name"]: st.session_state["file_name"] = uploaded.name st.session_state["result"] = None st.session_state["review_decisions"] = {} # Read the file try: suffix = Path(uploaded.name).suffix with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp.write(uploaded.getvalue()) tmp_path = Path(tmp.name) # Check for Excel sheets / detect delimiter if suffix.lower() in (".xlsx", ".xls"): st.session_state["sheet_names"] = list_sheets(tmp_path) st.session_state["detected_delimiter"] = "," else: st.session_state["sheet_names"] = [] enc = detect_encoding(tmp_path) st.session_state["detected_delimiter"] = detect_delimiter(tmp_path, enc) df = read_file(tmp_path) if not isinstance(df, pd.DataFrame): df = pd.concat(list(df), ignore_index=True) st.session_state["df"] = df tmp_path.unlink(missing_ok=True) except Exception as e: from src.core.errors import format_for_user st.error( f"**Could not read `{uploaded.name}`**\n\n" f"```\n{format_for_user(e)}\n```" ) st.session_state["df"] = None df = st.session_state["df"] if df is not None: # Sheet selector for Excel files if st.session_state["sheet_names"] and len(st.session_state["sheet_names"]) > 1: sheet = st.selectbox( "Select sheet", st.session_state["sheet_names"], ) if sheet != st.session_state.get("_current_sheet"): st.session_state["_current_sheet"] = sheet suffix = Path(uploaded.name).suffix with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp.write(uploaded.getvalue()) tmp_path = Path(tmp.name) df = read_file(tmp_path, sheet_name=sheet) if not isinstance(df, pd.DataFrame): df = pd.concat(list(df), ignore_index=True) st.session_state["df"] = df st.session_state["result"] = None st.session_state["review_decisions"] = {} tmp_path.unlink(missing_ok=True) # Delimiter selector for CSV/TSV files is_csv = Path(uploaded.name).suffix.lower() not in (".xlsx", ".xls") if is_csv: _DELIMITERS = { "Comma (,)": ",", "Tab (\\t)": "\t", "Semicolon (;)": ";", "Pipe (|)": "|", "Other": None, } _DELIM_LABELS = list(_DELIMITERS.keys()) _DELIM_VALUES = list(_DELIMITERS.values()) detected = st.session_state.get("detected_delimiter", ",") default_idx = _DELIM_VALUES.index(detected) if detected in _DELIM_VALUES else 0 chosen_label = st.selectbox( "Delimiter", _DELIM_LABELS, index=default_idx, help="Auto-detected on upload. Change if the preview looks wrong.", ) if chosen_label == "Other": custom_delim = st.text_input( "Enter delimiter character", max_chars=5, help="Enter the character(s) used to separate fields.", ) chosen_delim = custom_delim if custom_delim else "," else: chosen_delim = _DELIMITERS[chosen_label] if chosen_delim != st.session_state.get("_current_delimiter"): st.session_state["_current_delimiter"] = chosen_delim suffix = Path(uploaded.name).suffix with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp.write(uploaded.getvalue()) tmp_path = Path(tmp.name) df = read_file(tmp_path, delimiter=chosen_delim) if not isinstance(df, pd.DataFrame): df = pd.concat(list(df), ignore_index=True) st.session_state["df"] = df st.session_state["result"] = None st.session_state["review_decisions"] = {} tmp_path.unlink(missing_ok=True) # Collapse the input preview + options once a result exists so # the Results section below becomes the primary visual focus # after Find Duplicates runs. Mirrors the Clean Text pattern. _has_result = st.session_state.get("result") is not None # Preview with st.expander(f"Preview: {uploaded.name}", expanded=not _has_result): # Subheader retained inside the expander so collected_text in # the workflow tests still finds "Preview: " — Streamlit's # AppTest does not surface expander labels through the # markdown/caption/subheader collections. st.subheader(f"Preview: {uploaded.name}") st.caption(f"{len(df)} rows, {len(df.columns)} columns") st.dataframe(df.head(10), use_container_width=True) # Advanced options with st.expander("Options", expanded=not _has_result): settings = config_panel(df) # Apply loaded config if present loaded_cfg = st.session_state.get("loaded_config") if loaded_cfg is not None: settings["strategies"] = loaded_cfg.to_strategies() settings["survivor_rule"] = loaded_cfg.to_survivor_rule() settings["date_column"] = loaded_cfg.date_column settings["merge"] = loaded_cfg.merge del st.session_state["loaded_config"] # ------------------------------------------------------------------- # Find Duplicates button # ------------------------------------------------------------------- st.divider() if st.button("Find Duplicates", type="primary", use_container_width=True): progress_bar = st.progress(0, text="Comparing rows...") def _gui_progress(current: int, total: int) -> None: if total > 0: pct = min(current / total, 1.0) progress_bar.progress(pct, text=f"Comparing rows... {current:,}/{total:,}") with st.spinner("Running deduplication..."): result = deduplicate( df, strategies=settings["strategies"], survivor_rule=settings["survivor_rule"], date_column=settings["date_column"], merge=settings["merge"], preview=False, progress_callback=_gui_progress, ) progress_bar.empty() st.session_state["result"] = result st.session_state["review_decisions"] = {} # One-shot flag for the scroll snippet at the bottom of the # page. Force a rerun so the Preview / Options expanders see # the new result on the next pass and collapse themselves. st.session_state["_dedup_scroll_to_results"] = True st.rerun() # ------------------------------------------------------------------- # Results # ------------------------------------------------------------------- result: DeduplicationResult | None = st.session_state["result"] if result is not None: st.divider() # Anchor target for the post-run auto-scroll snippet at the # bottom of this page. A bare ``
`` survives # Streamlit's HTML sanitizer; a 1px-tall div doesn't shift # layout. st.markdown( '
', unsafe_allow_html=True, ) st.subheader("Results") # Summary + download buttons results_summary(result, df) # Match group review if result.match_groups: st.divider() st.subheader("Match Groups") # Batch actions def _accept_all(): for g in result.match_groups: st.session_state["review_decisions"][g.group_id] = { "keep_indices": [g.survivor_index], "overrides": {}, } def _reject_all(): for g in result.match_groups: st.session_state["review_decisions"][g.group_id] = { "keep_indices": list(g.row_indices), "overrides": {}, } def _clear_all(): st.session_state["review_decisions"] = {} for k in list(st.session_state): if k.startswith("editor_"): del st.session_state[k] action_left, action_mid, action_right = st.columns(3) with action_left: st.button("Accept All", on_click=_accept_all) with action_mid: st.button("Reject All", on_click=_reject_all) with action_right: st.button("Clear Decisions", on_click=_clear_all) # Individual group cards decisions = st.session_state["review_decisions"] for i, group in enumerate(result.match_groups): match_group_card(group, df, group_num=i + 1) # Show decision summary if decisions: st.divider() merged = 0 customized = 0 split = 0 kept_all = 0 for v in decisions.values(): if not isinstance(v, dict): continue ki = v.get("keep_indices", []) gid_for_v = next( (gid for gid, d in decisions.items() if d is v), None, ) group_size = next( (len(g.row_indices) for g in result.match_groups if g.group_id == gid_for_v), 0, ) if len(ki) == group_size: kept_all += 1 elif len(ki) == 1: if v.get("overrides"): customized += 1 else: merged += 1 else: split += 1 pending = len(result.match_groups) - len(decisions) parts = [] if merged: parts.append(f"{merged} merged") if customized: parts.append(f"{customized} customized") if split: parts.append(f"{split} split") if kept_all: parts.append(f"{kept_all} kept all") parts.append(f"{pending} pending") st.caption("Decisions: " + ", ".join(parts)) # Apply decisions and offer download if st.button( "Apply Review Decisions & Download", type="primary", use_container_width=True, ): reviewed_df, reviewed_removed = apply_review_decisions( df, result.match_groups, decisions, ) # Pre-compute every byte buffer up front so each # ``st.download_button`` sees stable ``data`` # across reruns. Render the empty-removed case # as a disabled button (rather than hiding it) # so layout stays steady and the user can see # why the download isn't available. reviewed_bytes = reviewed_df.to_csv( index=False ).encode("utf-8-sig") reviewed_removed_empty = reviewed_removed.empty reviewed_removed_bytes = ( reviewed_removed.to_csv(index=False).encode("utf-8-sig") if not reviewed_removed_empty else b"" ) html_download_button( "Download Reviewed & Deduplicated CSV", reviewed_bytes, file_name="deduplicated_reviewed.csv", mime="text/csv", ) html_download_button( "Download Reviewed Removed Rows", reviewed_removed_bytes, file_name="removed_reviewed.csv", mime="text/csv", disabled=reviewed_removed_empty, help=( "No rows were removed under the current " "review decisions." if reviewed_removed_empty else None ), ) # Log entries if result.log_entries: with st.expander("Processing Log"): st.code("\n".join(result.log_entries)) else: # No file uploaded — show placeholder st.info("Upload a file to get started.") # --------------------------------------------------------------------------- # Footer # --------------------------------------------------------------------------- st.divider() st.caption( "Runs locally. Your data never leaves this computer. " "| DataTools v3.0" ) # --------------------------------------------------------------------------- # Post-run auto-scroll # --------------------------------------------------------------------------- # # When Find Duplicates fires, the preview + options collapse, but # Streamlit by itself doesn't scroll — the Results section sits below a # tall page so the user has to hunt for it. Inject a tiny # component-html iframe that calls ``scrollIntoView`` on the parent's # Results anchor. The flag is one-shot (``pop`` removes it) so reruns # triggered by unrelated widgets in the Results section don't yank the # viewport back to the top of Results. if st.session_state.pop("_dedup_scroll_to_results", False): from streamlit.components.v1 import html as _components_html _components_html( """ """, height=0, )