Files
datatools-dev/src/gui/components/_legacy.py
Michael 24ee021314 fix(footer): hide the helper page_link row that was leaking into pages
Same wrong-testid bug as the Close click handler: the CSS rule
that's supposed to position the hidden ``st.page_link`` off-screen
was selecting ``a[data-testid="stPageLink"]``, but the bare
``stPageLink`` testid is on the OUTER wrapper div — the anchor
uses ``stPageLink-NavLink``. ``:has(a[data-testid="stPageLink"]...)``
matched nothing, so the helper rendered as a full-size visible
row at the bottom of every page (the "large white bar blocking
content" the user reported).

Fix: switch both the ``:has()`` rule and the no-:has() fallback
to ``a[data-testid="stPageLink-NavLink"][href*="close"]``. The
``href*="close"`` form also works for base-path deployments
(``/myapp/close``), matching the click handler's selector.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 16:07:07 +00:00

2307 lines
84 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Reusable Streamlit widgets for the DataTools GUI."""
from __future__ import annotations
import io
import os
import sys
import threading
import time
from typing import Optional
import pandas as pd
import streamlit as st
from src.i18n import t as _t
from src.core.dedup import (
Algorithm,
ColumnMatchStrategy,
DeduplicationResult,
MatchResult,
MatchStrategy,
SurvivorRule,
)
from src.core.config import (
ColumnStrategyConfig,
DeduplicationConfig,
StrategyConfig,
)
from src.core.normalizers import NormalizerType
# ---------------------------------------------------------------------------
# App chrome — hide Streamlit default UI for app-like feel
# ---------------------------------------------------------------------------
_HIDE_CHROME_CSS = """
<style>
/* Make the Streamlit header transparent and out of the way, but DO NOT
`display: none` it — the sidebar's collapsed-state expand button is
anchored in the header region, and removing the header makes a
collapsed sidebar impossible to reopen. */
header[data-testid="stHeader"] {
background: transparent !important;
height: 0 !important;
}
/* Hide main hamburger menu and deploy button explicitly (don't rely on
hiding the whole header). */
#MainMenu,
[data-testid="stMainMenu"],
[data-testid="stAppDeployButton"] {
display: none !important;
}
/* Keep the sidebar expand control visible and clickable above page content. */
[data-testid="stSidebarCollapsedControl"] {
display: flex !important;
visibility: visible !important;
z-index: 999 !important;
}
/* Hide footer */
footer {
display: none !important;
}
/* Hide the Activate + Close entries from the sidebar nav. Both
pages stay registered (so /activate and /close remain
URL-routable) but are reached from the sticky-footer Help
popover instead of the sidebar. They are grouped under the
unlabeled section alongside Home in ``app.py`` so hiding the
two links here leaves no orphan section header behind. We
target the LinkContainer (Streamlit's per-entry wrapper) so the
list item collapses, not just the inner anchor — otherwise the
container's spacing would still occupy a row. */
[data-testid="stSidebarNav"] [data-testid="stSidebarNavLinkContainer"]:has(a[href$="/activate"]),
[data-testid="stSidebarNav"] [data-testid="stSidebarNavLinkContainer"]:has(a[href$="/activate/"]),
[data-testid="stSidebarNav"] [data-testid="stSidebarNavLinkContainer"]:has(a[href$="/close"]),
[data-testid="stSidebarNav"] [data-testid="stSidebarNavLinkContainer"]:has(a[href$="/close/"]) {
display: none !important;
}
/* Defensive fallback for browsers without :has() support — at
least hide the anchor itself so the entry isn't clickable. */
[data-testid="stSidebarNav"] a[href$="/activate"],
[data-testid="stSidebarNav"] a[href$="/activate/"],
[data-testid="stSidebarNav"] a[href$="/close"],
[data-testid="stSidebarNav"] a[href$="/close/"] {
display: none !important;
}
/* Reclaim top padding lost from hidden header. Slim the bottom too —
Streamlit's default leaves several rems below the last widget. */
.stAppViewBlockContainer,
[data-testid="stAppViewBlockContainer"] {
padding-top: 0.5rem !important;
padding-bottom: 0.75rem !important;
}
/* Scale content to fit app window */
.stApp {
zoom: 0.85;
}
/* ---------- Compact-spacing layer ---------- */
/* Streamlit ships generous vertical rhythm (~1rem gap between every
block, 1.5rem+ above each heading, 1rem on dividers). For a desktop
data app that's a lot of empty space. Tighten the gaps without
making the layout look cramped. */
[data-testid="stVerticalBlock"] { gap: 0.5rem !important; }
[data-testid="stHorizontalBlock"] { gap: 0.5rem !important; }
/* Headings — tighter top space + a hair less below. */
.stApp h1 { margin-top: 0.25rem !important; margin-bottom: 0.5rem !important; }
.stApp h2 { margin-top: 0.5rem !important; margin-bottom: 0.4rem !important; }
.stApp h3 { margin-top: 0.4rem !important; margin-bottom: 0.3rem !important; }
.stApp h4 { margin-top: 0.3rem !important; margin-bottom: 0.25rem !important; }
/* st.divider() — Streamlit's default hr has 1rem above and below. */
[data-testid="stMarkdownContainer"] hr,
hr { margin-top: 0.4rem !important; margin-bottom: 0.4rem !important; }
/* Markdown paragraphs + captions — slim trailing space. */
[data-testid="stMarkdownContainer"] p { margin-bottom: 0.25rem; }
[data-testid="stCaption"],
[data-testid="stCaptionContainer"] { margin-bottom: 0.25rem; }
/* Expander header padding — Streamlit's default is roomy. */
[data-testid="stExpander"] details > summary {
padding-top: 0.35rem;
padding-bottom: 0.35rem;
}
/* Buttons / file-uploader / metric tiles — tighter spacing. */
[data-testid="stButton"],
[data-testid="stDownloadButton"] { margin-top: 0; margin-bottom: 0; }
[data-testid="stFileUploader"] { margin-bottom: 0.25rem; }
[data-testid="stMetric"] {
padding-top: 0.25rem;
padding-bottom: 0.25rem;
}
</style>
"""
def hide_streamlit_chrome(*, gate_license: bool = True) -> None:
"""Inject CSS to hide Streamlit's default header, menu, and footer.
Also renders the sidebar language selector + license status badge,
since every entrypoint that hides the default chrome wants those
visible in the same place. Pages that want a clean chrome without
them can inject ``_HIDE_CHROME_CSS`` themselves instead of calling
this.
When *gate_license* is True (the default) the function calls
:func:`require_license_or_render_activation` after the sidebar
widgets render. If no valid license is present, the activation
form replaces the page body and the page short-circuits via
``st.stop()``. The Activate page itself passes ``False`` so it
can render its own form without recursion.
"""
st.markdown(_HIDE_CHROME_CSS, unsafe_allow_html=True)
# Stamp a session-start record into the audit log the first time
# any page renders. Idempotent — subsequent calls are no-ops.
# Wrapped because a broken audit log MUST NOT take the GUI down.
try:
from src.audit import log_session_start
log_session_start()
except Exception:
import traceback, sys
print("DataTools: audit log session-start failed:", file=sys.stderr)
traceback.print_exc()
# Production-safe check runs first so a misconfigured shipped
# build refuses to render anything (rather than rendering a
# broken activation form that doesn't accept real blobs).
# No-op in source / pytest runs.
from src.license import assert_production_safe
assert_production_safe()
# Imported lazily so this module stays importable in environments
# where the i18n packs haven't been laid out (e.g. unit tests of
# individual legacy helpers).
from src.i18n import render_language_selector
render_language_selector()
# License chrome: sidebar status badge + inline gate.
from .activation import (
render_license_status_sidebar,
require_license_or_render_activation,
)
render_license_status_sidebar()
# Diagnostics sidebar is DISABLED — the async-writer redesign
# didn't actually fix the blank-pages symptom on the user's
# machine. The sidebar calls ``audit_log_path()`` which is pure
# now, so the failure mode must be elsewhere; keep this off
# while we diagnose so the user has a working GUI.
if False:
try:
_render_diagnostics_sidebar()
except Exception:
import traceback, sys
print("DataTools: diagnostics sidebar render failed:", file=sys.stderr)
traceback.print_exc()
if gate_license:
require_license_or_render_activation()
def _render_diagnostics_sidebar() -> None:
"""Render a small Diagnostics expander in the sidebar.
Shows the path to the current session's audit log and an "Open
folder" button. Lives behind an expander so it doesn't take
screen space until the user opens it; the support flow is
"client mails us the file, we tell them what went wrong."
"""
from src.audit import audit_log_dir, audit_log_path
log_path = audit_log_path()
with st.sidebar:
with st.expander("🩺 Diagnostics", expanded=False):
st.caption("Audit log for this session:")
st.code(str(log_path), language=None)
if st.button(
"📂 Open log folder",
key="_diag_open_logs",
type="secondary",
use_container_width=True,
):
opened = _open_in_file_manager(audit_log_dir(), select=log_path)
if not opened:
st.warning(
"Could not open the file manager from here. "
"Path is above — paste it into your file manager."
)
# ---------------------------------------------------------------------------
# Clean shutdown
# ---------------------------------------------------------------------------
_FAREWELL_SCRIPT_TEMPLATE = """
<script>
(function () {
// Strategy: append a full-screen overlay directly to the parent's
// document.body (Streamlit's component iframes carry
// allow-same-origin, so cross-frame DOM access is permitted).
//
// Closing the tab via JavaScript only works in windows JS opened —
// Chrome/Edge --app windows qualify; a regular browser tab does
// NOT, and there's no way to override that from page JS (no flag,
// no API, no keystroke injection — synthesized keydown events
// never reach the browser chrome or the OS). When close fails we
// navigate the window to ``about:blank`` so the user at least
// sees a clean blank tab instead of the connection-error overlay
// Streamlit shows when the websocket drops.
//
// Display-mode detection (``standalone`` for --app windows,
// ``browser`` for regular tabs) lets us skip the futile close
// attempt on regular tabs and route straight to the about:blank
// fallback.
function isStandalone(win) {
try {
return win.matchMedia('(display-mode: standalone)').matches
|| win.matchMedia('(display-mode: minimal-ui)').matches
|| win.matchMedia('(display-mode: fullscreen)').matches;
} catch (e) { return false; }
}
function buildOverlay(doc) {
var overlay = doc.createElement('div');
overlay.id = 'datatools-farewell-overlay';
overlay.style.cssText =
'position:fixed;inset:0;background:#0f1115;color:#e8eaed;' +
'z-index:2147483647;display:flex;align-items:center;' +
'justify-content:center;font-family:system-ui,-apple-system,sans-serif;';
overlay.innerHTML =
'<div style="text-align:center;padding:32px 40px;border:1px solid #252a36;' +
'border-radius:12px;background:#161922;max-width:480px;">' +
'<h1 style="margin:0 0 8px 0;font-weight:600;letter-spacing:-0.01em;">' +
'__TITLE__</h1>' +
'<p style="opacity:0.7;margin:0 0 20px 0;">__SUBTITLE__</p>' +
'<button id="datatools-close-btn" style="' +
'background:#6ee7b7;color:#052e1a;font-weight:600;' +
'padding:10px 20px;border-radius:8px;border:none;' +
'font-size:15px;cursor:pointer;font-family:inherit;">' +
'__CLOSE_BTN__</button>' +
'<p id="datatools-close-hint" style="' +
'display:none;font-size:13px;opacity:0.6;margin:14px 0 0 0;">' +
'__CLOSE_HINT__</p>' +
'</div>';
return overlay;
}
function tryClose(win) {
// Escalating attempts. None of these can override the browser's
// close-restriction policy on regular tabs.
try { win.close(); } catch (e) {}
if (win.closed) return true;
try {
var w = win.open('', '_self', '');
if (w) {
try { w.close(); } catch (e) {}
}
} catch (e) {}
if (win.closed) return true;
try { win.top.close(); } catch (e) {}
return win.closed;
}
function fallbackToBlank(win) {
// Navigate to about:blank so the user sees a clean empty tab
// instead of the farewell overlay frozen on a connection-error
// page. They can still close the tab themselves (Ctrl+W /
// ⌘W / clicking the tab's X). Done as a single fast call — no
// history entry pollution because location.replace doesn't
// push to history.
try { win.location.replace('about:blank'); } catch (e) {}
}
function wireClose(doc, win) {
var btn = doc.getElementById('datatools-close-btn');
if (!btn) return;
btn.onclick = function () {
var standalone = isStandalone(win);
if (tryClose(win)) return;
// Close failed (or definitely will fail in a regular tab).
// Surface the hint immediately, then redirect to about:blank
// after a short delay so the user has a moment to read why.
var hint = doc.getElementById('datatools-close-hint');
if (hint) hint.style.display = 'block';
setTimeout(function () {
if (!win.closed) fallbackToBlank(win);
}, standalone ? 250 : 1500);
};
}
try {
var doc = window.top.document;
var win = window.top;
if (!doc.getElementById('datatools-farewell-overlay')) {
doc.body.appendChild(buildOverlay(doc));
}
wireClose(doc, win);
// Auto-close attempt on first paint — succeeds in Chrome --app
// windows, fails silently on regular tabs (and we don't redirect
// automatically here; the manual button drives that path so the
// user is in control).
tryClose(win);
} catch (e) {
// Cross-origin access denied (shouldn't happen given Streamlit's
// sandbox flags, but fall back gracefully): cover this iframe.
document.body.appendChild(buildOverlay(document));
wireClose(document, window);
}
})();
</script>
"""
def _js_html_safe(s: str) -> str:
"""Escape *s* so it can be embedded inside the farewell overlay's
JS-single-quoted, innerHTML-bound payload.
Order matters: backslash first (so subsequent escapes don't get
re-escaped), then the JS string-terminator, then HTML-special chars.
"""
return (
s.replace("\\", "\\\\")
.replace("'", "\\'")
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
)
def _farewell_script() -> str:
"""Render the farewell overlay JS with the current language's strings."""
return (
_FAREWELL_SCRIPT_TEMPLATE
.replace("__TITLE__", _js_html_safe(_t("quit.farewell_title")))
.replace("__SUBTITLE__", _js_html_safe(_t("quit.farewell_subtitle")))
.replace("__CLOSE_BTN__", _js_html_safe(_t("quit.close_window_button")))
.replace("__CLOSE_HINT__", _js_html_safe(_t("quit.close_hint")))
)
def _downloads_dir() -> "Path":
"""Return the user's Downloads folder.
Defaults to ``~/Downloads``. Overrideable via the
``DATATOOLS_DOWNLOADS_DIR`` environment variable so tests can write
to a temp directory instead of polluting the developer's home.
"""
import os
from pathlib import Path
override = os.environ.get("DATATOOLS_DOWNLOADS_DIR")
if override:
return Path(override)
return Path.home() / "Downloads"
def _open_in_file_manager(folder: "Path", *, select: "Path | None" = None) -> bool:
"""Open the OS file manager at *folder*, optionally highlighting *select*.
Windows
``explorer <folder>`` only. We deliberately do NOT use
``explorer /select,<file>``: when the path contains a space
(e.g. ``C:\\Users\\Michael Dombaugh\\Downloads``), Python's
``subprocess.Popen`` quotes the ``/select,...`` argument as one
unit, and Explorer's ``/select`` parser does not handle that
form — it silently falls back to opening the user's default
view (typically Documents). Opening the bare folder works
reliably regardless of spaces. ``os.startfile`` is kept as a
last-resort fallback only.
macOS
``open -R <file>`` reveals the file in Finder when ``select``
is given; otherwise just opens the folder.
Linux / *BSD
``xdg-open`` on the folder. No reliable cross-distro way to
highlight a specific file.
Returns ``True`` if any of the dispatch attempts succeeded
(no guarantee the window actually surfaced — the caller should
surface a fallback path so the user can paste it manually).
"""
import os
import subprocess
if sys.platform == "win32":
try:
subprocess.Popen(["explorer", str(folder)])
return True
except Exception:
pass
try:
os.startfile(str(folder)) # type: ignore[attr-defined]
return True
except Exception:
return False
if sys.platform == "darwin":
try:
if select is not None:
subprocess.Popen(["open", "-R", str(select)])
else:
subprocess.Popen(["open", str(folder)])
return True
except Exception:
return False
# Linux / *BSD / etc.
try:
subprocess.Popen(["xdg-open", str(folder)])
return True
except Exception:
return False
def local_download_button(
label: str,
data: bytes,
*,
file_name: str,
mime: str = "application/octet-stream", # noqa: ARG001 — kept for API compat
disabled: bool = False,
help: str | None = None,
use_container_width: bool = True,
) -> None:
"""Save bytes directly to the user's Downloads folder.
DataTools runs as a local Streamlit app, so the "server" IS the
user's machine — we can write straight to ``~/Downloads/<file_name>``
instead of going through the browser save dialog. On click:
1. Bytes are written to ``Path.home() / "Downloads" / file_name``
(overwriting any existing file with the same name).
2. The page reruns and renders a success caption naming the exact
absolute path the file landed at.
3. An "Open Downloads folder" button appears that pops the OS file
manager (Explorer / Finder / xdg-open) at the parent directory.
Why not ``st.download_button`` or an HTML data: URL anchor?
- ``st.download_button`` has a long-standing failure mode where
only the first button on the page fires when multiple are
stacked together.
- Data: URLs balloon by 33% (base64) and leave the user guessing
where the browser saved it (default Downloads folder or wherever
they last picked — varies per browser).
The save-server-side path is unambiguous, works the same regardless
of browser settings, and gives the user a real link to the file.
The ``mime`` parameter is accepted for backwards compatibility with
the previous helper signature; it is no longer relevant because
nothing on the wire knows the bytes' content type.
"""
import hashlib
from pathlib import Path
# Stable widget keys, namespaced by file_name + content digest so
# repeated renders of the same content keep their saved-state
# banner, but a re-run that produced different bytes gets a fresh
# button with no stale success message.
digest = hashlib.sha1(data, usedforsecurity=False).hexdigest()[:8]
btn_key = f"_dl_btn_{file_name}_{digest}"
saved_key = f"_dl_saved_{file_name}_{digest}"
open_key = f"_dl_open_{file_name}_{digest}"
clicked = st.button(
label,
key=btn_key,
disabled=disabled,
help=help,
type="secondary",
use_container_width=use_container_width,
)
if clicked:
target_dir = _downloads_dir()
try:
target_dir.mkdir(parents=True, exist_ok=True)
target = target_dir / file_name
target.write_bytes(data)
st.session_state[saved_key] = str(target)
except Exception as e:
st.error(
f"Could not save **{file_name}** to `{target_dir}`: {e}"
)
return
saved_path_str = st.session_state.get(saved_key)
if saved_path_str:
saved_path = Path(saved_path_str)
st.success(f"✓ Saved to `{saved_path_str}`")
if st.button(
"📂 Open Downloads folder",
key=open_key,
type="secondary",
):
opened = _open_in_file_manager(saved_path.parent, select=saved_path)
if opened:
# The dispatch returned non-zero; the OS may still have
# opened the window behind the active one. Surface a
# confirmation so the user knows we tried.
st.toast(f"Opening {saved_path.parent}", icon="📂")
else:
st.warning(
f"Could not open the file manager from here. "
f"The file is at:\n\n`{saved_path_str}`"
)
# Back-compat alias: existing call sites use the old name. New code
# should prefer ``local_download_button``.
html_download_button = local_download_button
def render_sticky_footer() -> None:
"""Slim fixed-position footer with Close and Help controls.
Mounted as a direct child of ``<body>`` via a component-iframe so
it lives outside every Streamlit container — required because
``.stApp`` carries ``zoom: 0.85`` and Streamlit's content
columns add padding/positioning context that would otherwise
distort or clip the bar.
Close is a full-page ``<a href="./close">`` link to the Close
page, which runs ``shutdown_app`` on render. State loss is fine
here — the process is terminating. (This was the reason the
Back-to-Home variant of this footer was retired; that case
needed a soft nav widget. Close does not.)
Help is pure UI: clicking toggles a small overlay panel
containing the version and support email — no navigation, so
no state loss.
"""
import html as _html
import json as _json
from src import __version__
close_label = _html.escape(_t("footer.close"))
help_label = _html.escape(_t("footer.help"))
help_title = _html.escape(_t("footer.help_title"))
help_version = _html.escape(
_t("footer.help_version").format(version=__version__)
)
support_email = "support@unalogix.com"
help_support_text = _t("footer.help_support").format(email=support_email)
help_support_html = _html.escape(help_support_text).replace(
_html.escape(support_email),
f'<a href="mailto:{_html.escape(support_email)}">'
f'{_html.escape(support_email)}</a>',
)
license_label = _html.escape(_t("footer.help_license_label"))
help_dismiss = _html.escape(_t("footer.help_dismiss"))
# License section — read state and branch on activated/valid. The
# query is wrapped because a corrupted license file MUST NOT stop
# the footer from rendering; in that case we fall back to the
# "ask to activate" branch.
try:
from src.license import current_state as _license_state
state = _license_state()
except Exception:
state = None
if state is not None and state.activated and state.valid:
active_line = _t("footer.help_license_active").format(
name=state.name or state.email or "",
)
expires_line = _t("footer.help_license_expires").format(
date=(state.expires_at or "")[:10],
days=state.days_remaining,
)
manage_link = _html.escape(_t("footer.help_manage_link"))
license_html = (
f'<div class="dt-help-row"><span class="dt-help-key">'
f'{license_label}:</span> {_html.escape(active_line)}</div>'
f'<div class="dt-help-row dt-help-sub">'
f'{_html.escape(expires_line)}</div>'
f'<div class="dt-help-row">'
f'<a href="./activate" target="_self">{manage_link}</a></div>'
)
else:
inactive_line = _html.escape(_t("footer.help_license_inactive"))
activate_link = _html.escape(_t("footer.help_activate_link"))
license_html = (
f'<div class="dt-help-row"><span class="dt-help-key">'
f'{license_label}:</span> {inactive_line}</div>'
f'<div class="dt-help-row">'
f'<a href="./activate" target="_self">{activate_link}</a></div>'
)
popover_html = (
f'<div class="dt-help-title">{help_title}</div>'
f'<div class="dt-help-row">{help_version}</div>'
f'{license_html}'
f'<div class="dt-help-row">{help_support_html}</div>'
f'<button type="button" class="dt-help-dismiss">{help_dismiss}</button>'
)
st.markdown(
"""
<style>
[data-testid="stAppViewBlockContainer"] {
padding-bottom: 3rem !important;
}
#datatools-sticky-footer {
position: fixed !important;
bottom: 0 !important;
left: 0 !important;
right: 0 !important;
background: rgba(255, 255, 255, 0.97) !important;
backdrop-filter: blur(8px);
-webkit-backdrop-filter: blur(8px);
border-top: 1px solid rgba(49, 51, 63, 0.2) !important;
padding: 0.25rem 0.75rem !important;
z-index: 2147483646 !important;
display: flex !important;
align-items: center !important;
justify-content: flex-start !important;
gap: 0.4rem !important;
font-family: system-ui, -apple-system, sans-serif !important;
box-sizing: border-box !important;
min-height: 32px !important;
}
#datatools-sticky-footer .datatools-footer-btn {
display: inline-block !important;
color: rgb(38, 39, 48) !important;
background: rgb(240, 242, 246) !important;
text-decoration: none !important;
padding: 0.2rem 0.6rem !important;
border-radius: 0.35rem !important;
border: 1px solid rgba(49, 51, 63, 0.22) !important;
font-size: 12px !important;
font-weight: 500 !important;
line-height: 1.3 !important;
cursor: pointer !important;
transition: background 0.12s ease, border-color 0.12s ease;
}
#datatools-sticky-footer .datatools-footer-btn:hover {
background: rgb(225, 228, 235) !important;
border-color: rgba(49, 51, 63, 0.35) !important;
}
#datatools-sticky-footer .datatools-footer-btn.close {
color: rgb(176, 0, 32) !important;
border-color: rgba(176, 0, 32, 0.35) !important;
}
#datatools-sticky-footer .datatools-footer-btn.close:hover {
background: rgba(176, 0, 32, 0.08) !important;
}
#datatools-help-popover {
position: fixed !important;
left: 0.75rem !important;
bottom: 44px !important;
background: white !important;
border: 1px solid rgba(49, 51, 63, 0.25) !important;
border-radius: 0.5rem !important;
box-shadow: 0 8px 20px rgba(0,0,0,0.12) !important;
padding: 0.75rem 0.9rem !important;
z-index: 2147483647 !important;
font-family: system-ui, -apple-system, sans-serif !important;
font-size: 13px !important;
color: rgb(38, 39, 48) !important;
min-width: 220px !important;
max-width: 320px !important;
}
#datatools-help-popover[hidden] { display: none !important; }
#datatools-help-popover .dt-help-title {
font-weight: 600 !important;
margin-bottom: 0.35rem !important;
}
#datatools-help-popover .dt-help-row {
margin: 0.15rem 0 !important;
line-height: 1.4 !important;
}
#datatools-help-popover .dt-help-row.dt-help-sub {
color: rgb(90, 95, 110) !important;
font-size: 12px !important;
margin-left: 0.65rem !important;
}
#datatools-help-popover .dt-help-key {
color: rgb(90, 95, 110) !important;
font-weight: 500 !important;
}
#datatools-help-popover .dt-help-row a {
color: rgb(0, 102, 204) !important;
text-decoration: none !important;
}
#datatools-help-popover .dt-help-row a:hover {
text-decoration: underline !important;
}
#datatools-help-popover .dt-help-dismiss {
margin-top: 0.5rem !important;
font-size: 11px !important;
color: rgb(90, 95, 110) !important;
background: none !important;
border: none !important;
cursor: pointer !important;
padding: 0 !important;
}
#datatools-help-popover .dt-help-dismiss:hover {
color: rgb(38, 39, 48) !important;
}
/* Hide the sticky-footer's helper st.page_link off-screen but
keep it in the DOM + clickable. The footer's Close button
dispatches a programmatic click on this link so navigation uses
Streamlit's soft nav (preserves the websocket, no visible page
reload) instead of the browser hard-nav an ``<a href="./close">``
would trigger. Off-screen (rather than ``display:none``) so
React event delegation works reliably across browsers.
NOTE on the selector: Streamlit's page_link renders an outer
wrapper div with ``data-testid="stPageLink"`` and an inner anchor
with ``data-testid="stPageLink-NavLink"`` — the NavLink suffix
is required to match the anchor (the bare testid is on the
wrapper). ``href*="close"`` works across both root (``/close``)
and base-path (``/myapp/close``) deployments. */
[data-testid="stElementContainer"]:has(a[data-testid="stPageLink-NavLink"][href*="close"]) {
position: absolute !important;
left: -9999px !important;
top: -9999px !important;
width: 1px !important;
height: 1px !important;
overflow: hidden !important;
opacity: 0 !important;
pointer-events: none !important;
}
/* Defensive fallback for browsers without :has() — at least
shrink the inline page_link so it doesn't render a visible row.
Same testid note as above. */
a[data-testid="stPageLink-NavLink"][href*="close"] {
visibility: hidden !important;
height: 0 !important;
padding: 0 !important;
margin: 0 !important;
}
</style>
""",
unsafe_allow_html=True,
)
# Hidden Streamlit page_link to the close page. The footer's
# Close button programmatically clicks the anchor this renders,
# which triggers Streamlit's soft navigation (same code path
# the previous sidebar Close entry used). The link is positioned
# off-screen via the CSS above so it doesn't take page space
# but remains reachable to the JS click dispatch.
#
# Wrapped because ``st.page_link`` raises ``KeyError('url_pathname')``
# under ``AppTest`` (the test harness does not populate the page-nav
# session keys ``page_link`` needs to mark itself active/inactive).
# The JS click handler has a hard-nav fallback when this helper
# link isn't present, so a failure here only costs the soft-nav
# optimization — Close still works.
try:
st.page_link(
"pages/99_Close.py",
label=_t("footer.close"),
)
except Exception:
pass
st.iframe(
f"""
<script>
(function () {{
var labels = {_json.dumps({
"close": close_label,
"help": help_label,
"popover_html": popover_html,
})};
function build(doc) {{
var prev = doc.getElementById('datatools-sticky-footer');
if (prev) prev.remove();
var prevPop = doc.getElementById('datatools-help-popover');
if (prevPop) prevPop.remove();
var div = doc.createElement('div');
div.id = 'datatools-sticky-footer';
var helpBtn = doc.createElement('button');
helpBtn.type = 'button';
helpBtn.className = 'datatools-footer-btn help';
helpBtn.textContent = labels.help;
var closeBtn = doc.createElement('button');
closeBtn.type = 'button';
closeBtn.className = 'datatools-footer-btn close';
closeBtn.textContent = labels.close;
// Soft-nav via the hidden ``st.page_link`` that
// ``render_sticky_footer`` injects. Streamlit owns its click
// handler and will route through ``st.switch_page`` (same
// code path the old sidebar Close entry used) — no full-page
// reload, no websocket churn. Fall back to a hard nav if the
// helper link hasn't rendered yet (first paint race) so the
// button is never a no-op.
//
// The page_link's anchor uses ``data-testid="stPageLink-NavLink"``
// (the outer wrapper div carries the bare ``stPageLink`` testid;
// dispatching click on the wrapper doesn't fire Streamlit's
// React onClick handler). ``href*="close"`` covers both root
// (/close) and base-path (e.g. /myapp/close) deployments.
closeBtn.addEventListener('click', function (e) {{
e.preventDefault();
var helper = doc.querySelector(
'a[data-testid="stPageLink-NavLink"][href*="close"]'
);
if (helper) {{
helper.click();
return;
}}
// Hard-nav fallback. ``window`` inside this script is the
// component iframe's window — changing ITS location only
// navigates the iframe (which lives in srcdoc and is
// invisible). Use the parent doc's location so the whole
// app navigates.
var topWin = (doc.defaultView) || window.parent || window.top || window;
try {{ topWin.location.href = './close'; }}
catch (err) {{ window.top.location.href = './close'; }}
}});
div.appendChild(helpBtn);
div.appendChild(closeBtn);
var pop = doc.createElement('div');
pop.id = 'datatools-help-popover';
pop.hidden = true;
pop.innerHTML = labels.popover_html;
helpBtn.addEventListener('click', function (e) {{
e.preventDefault();
pop.hidden = !pop.hidden;
}});
pop.querySelector('.dt-help-dismiss').addEventListener('click', function () {{
pop.hidden = true;
}});
doc.addEventListener('click', function (e) {{
if (pop.hidden) return;
if (pop.contains(e.target) || helpBtn.contains(e.target)) return;
pop.hidden = true;
}});
doc.body.appendChild(div);
doc.body.appendChild(pop);
}}
try {{
build(window.parent.document);
}} catch (e) {{
build(document);
}}
}})();
</script>
""",
height=1,
)
def _render_sticky_footer_DISABLED() -> None:
"""Slim fixed-position footer at the bottom of the viewport.
Contains a "Back to Home" link that's always visible regardless of
scroll position. The footer is mounted as a direct child of
``<body>`` via a component-iframe script so it lives OUTSIDE every
Streamlit container — that matters because ``.stApp`` carries
``zoom: 0.85`` (our compact-layout scaler) and Streamlit's content
columns add their own padding/positioning context that previously
swallowed the in-place ``st.markdown`` footer.
The implementation is two-pass:
1. ``st.markdown`` injects the CSS rules into the parent document.
Class-targeted, so the rules apply once the footer DOM node
exists regardless of where it lives.
2. ``st.iframe`` renders a zero-height iframe
whose JS reaches ``window.parent.document`` and creates / moves
a ``#datatools-sticky-footer`` div directly under ``<body>``.
This bypasses every Streamlit container.
The anchor uses ``href="home"`` (relative) so Streamlit's URL
routing resolves it to the Home page and the link works correctly
behind a reverse proxy or non-root mount.
"""
import html as _html
import json as _json
label_raw = _t("nav.back_to_home")
label_esc = _html.escape(label_raw)
# CSS rules live in the parent document. Class selector so a
# re-rendered/relocated footer div picks them up automatically.
st.markdown(
"""
<style>
[data-testid="stAppViewBlockContainer"] {
padding-bottom: 4rem !important;
}
#datatools-sticky-footer {
position: fixed !important;
bottom: 0 !important;
left: 0 !important;
right: 0 !important;
background: rgba(255, 255, 255, 0.97) !important;
backdrop-filter: blur(8px);
-webkit-backdrop-filter: blur(8px);
border-top: 1px solid rgba(49, 51, 63, 0.25) !important;
padding: 0.5rem 1.25rem !important;
z-index: 2147483646 !important;
display: flex !important;
align-items: center !important;
justify-content: flex-start !important;
font-family: system-ui, -apple-system, sans-serif !important;
box-sizing: border-box !important;
}
#datatools-sticky-footer a.datatools-sticky-footer-link {
display: inline-block !important;
color: rgb(38, 39, 48) !important;
text-decoration: none !important;
padding: 0.4rem 0.9rem !important;
border-radius: 0.5rem !important;
border: 1px solid rgba(49, 51, 63, 0.28) !important;
background: rgb(240, 242, 246) !important;
font-size: 14px !important;
font-weight: 500 !important;
line-height: 1.4 !important;
cursor: pointer !important;
transition: background 0.12s ease, border-color 0.12s ease;
}
#datatools-sticky-footer a.datatools-sticky-footer-link:hover {
background: rgb(225, 228, 235) !important;
border-color: rgba(49, 51, 63, 0.4) !important;
}
#datatools-sticky-footer a.datatools-sticky-footer-link:active {
background: rgb(210, 214, 222) !important;
}
</style>
""",
unsafe_allow_html=True,
)
# Move the footer to <body> directly via component iframe. The
# iframe carries allow-same-origin so window.parent.document is
# reachable; if a sandbox config ever blocks that we fall back to
# rendering inside the iframe itself (still visible, just sized
# to the iframe rather than the viewport).
st.iframe(
f"""
<script>
(function () {{
var label = {_json.dumps(label_raw)};
function build(doc) {{
var prev = doc.getElementById('datatools-sticky-footer');
if (prev) prev.remove();
var div = doc.createElement('div');
div.id = 'datatools-sticky-footer';
var a = doc.createElement('a');
a.className = 'datatools-sticky-footer-link';
// Navigate to the app root (``/``) instead of ``/home``. The
// home page is registered with ``default=True``, which serves
// it at the root URL. ``/home`` is NOT a recognized URL on
// every Streamlit minor version even with ``url_path="home"``
// — some builds reserve the alias only for non-default pages.
// Using ``./`` is robust against both: it resolves to the
// current document's directory, which on a single-segment
// tool-page URL like ``/01_deduplicator`` is the server root.
a.href = './';
a.target = '_self';
a.textContent = label;
div.appendChild(a);
return div;
}}
try {{
var doc = window.parent.document;
doc.body.appendChild(build(doc));
}} catch (e) {{
document.body.appendChild(build(document));
}}
}})();
</script>
""",
height=1,
)
def back_to_home_link(*, key: str = "_back_to_home_link") -> None:
"""Render a "← Back to Home" affordance on a tool page.
Tool pages reached from the home findings panel benefit from an
explicit return-to-home control so a user working through findings
on multiple uploaded files can hop between files without hunting
through the sidebar. Call this twice on each tool page — once
near the top (default key) and once at the bottom with
``key="_back_to_home_link_bottom"`` so the control stays reachable
after the user scrolls through long results.
Implementation: ``st.switch_page`` under ``st.navigation`` requires
either a file path to a page in ``pages/`` or a ``StreamlitPage``
object whose script identity matches one registered in the nav.
The entry script ``app.py`` is the nav manager itself — it cannot
be switched-to by filename. So we import the home callable from
``src.gui.app`` and rebuild the same ``st.Page`` registration here.
Streamlit identifies pages by the underlying callable's qualified
name, so a freshly-constructed Page resolves to the registered one.
"""
if st.button(_t("nav.back_to_home"), key=key, type="secondary"):
# Import from the renderer module (not from app.py — importing
# app.py would re-execute its navigation setup with the wrong
# "main script" context and blow up the pages/ path resolution).
from src.gui._home import _home_page
st.switch_page(
st.Page(_home_page, title="Home", icon="🧹", url_path="home"),
)
def shutdown_app() -> None:
"""Terminate the Streamlit server immediately, no confirm.
Designed to be called from a page whose mere act of rendering means
the user wants to quit (e.g., the sidebar Close entry). Schedules
``os._exit(0)`` on a daemon thread so the process terminates after
the farewell overlay has had a chance to paint, then injects the
overlay JS and short-circuits the rest of the page via ``st.stop``.
Streamlit has no first-class shutdown hook, and signalling the
process (SIGTERM/SIGINT) does not reliably terminate it — Streamlit
installs its own handlers and the tornado/asyncio loop swallows or
defers the signal, so the browser sees the websocket drop while the
python process stays alive. ``os._exit`` is the only reliable kill.
The hard-exit thread is skipped under pytest so the test suite does
not suicide when a test renders this page. The overlay + caption
still render so test assertions about content work.
"""
if not st.session_state.get("_app_shutting_down"):
st.session_state["_app_shutting_down"] = True
# Drain the audit log queue to disk before the process dies.
# Bounded by a 500ms timeout so a stuck disk can't delay
# shutdown beyond the daemon-thread's own 1s grace period.
try:
from src.audit import flush_audit_log, log_event
log_event("session", "Session ending")
flush_audit_log(timeout_s=0.5)
except Exception:
pass
if "pytest" not in sys.modules:
def _hard_exit() -> None:
time.sleep(1.0)
os._exit(0)
threading.Thread(target=_hard_exit, daemon=True).start()
st.iframe(_farewell_script(), height=1)
st.success(_t("quit.shutting_down"))
st.stop()
# ---------------------------------------------------------------------------
# Config panel (advanced options)
# ---------------------------------------------------------------------------
def config_panel(df: pd.DataFrame) -> dict:
"""Render the Advanced Options expander. Returns a settings dict.
Keys returned:
strategies: list[MatchStrategy] | None
survivor_rule: SurvivorRule
date_column: str | None
merge: bool
"""
columns = list(df.columns)
with st.expander("Advanced Options"):
col_left, col_right = st.columns(2)
with col_left:
subset_cols = st.multiselect(
"Match on columns",
columns,
default=[],
help="Leave empty to auto-detect based on column names.",
)
key_cols = st.multiselect(
"Strong keys",
columns,
default=[],
help="Columns that uniquely identify records (e.g., EIN, SKU). Each is an independent exact-match strategy.",
)
fuzzy_cols = st.multiselect(
"Fuzzy columns",
columns,
default=[],
help="Columns to fuzzy-match. Others use exact matching.",
)
with col_right:
algorithm = st.selectbox(
"Fuzzy algorithm",
["jaro_winkler", "levenshtein", "token_set_ratio"],
index=0,
help="jaro_winkler: best for names. levenshtein: best for typos. token_set_ratio: best for addresses.",
)
threshold = st.slider(
"Similarity threshold",
min_value=50,
max_value=100,
value=85,
help="Lower = more matches but more false positives.",
)
survivor = st.selectbox(
"Survivor rule",
["first", "last", "most-complete", "most-recent"],
index=0,
help="Which row to keep when duplicates are found.",
)
# Second row of options
col_a, col_b = st.columns(2)
with col_a:
normalize_options = {c: "auto" for c in columns}
normalizer_types = ["auto", "email", "phone", "name", "address", "string", "none"]
normalize_map: dict[str, str] = {}
if fuzzy_cols or subset_cols:
target_cols = fuzzy_cols or subset_cols
st.markdown("**Per-column normalizers**")
for col_name in target_cols:
norm = st.selectbox(
f"Normalizer for '{col_name}'",
normalizer_types,
index=0,
key=f"norm_{col_name}",
)
if norm not in ("auto", "none"):
normalize_map[col_name] = norm
with col_b:
merge = st.checkbox(
"Merge mode",
value=False,
help="Fill missing fields in the surviving row from removed duplicates.",
)
date_column: Optional[str] = None
if survivor == "most-recent":
date_column = st.selectbox(
"Date column",
columns,
help="Required for most-recent survivor rule.",
)
# Config save/load
st.divider()
cfg_left, cfg_right = st.columns(2)
with cfg_left:
config_file = st.file_uploader(
"Load config profile",
type=["json"],
help="Load previously saved settings.",
key="config_upload",
)
if config_file is not None:
import json
try:
data = json.loads(config_file.read())
loaded = DeduplicationConfig.from_dict(data)
st.session_state["loaded_config"] = loaded
st.success("Config loaded.")
except Exception as e:
st.error(f"Failed to load config: {e}")
with cfg_right:
if st.button("Save current settings"):
cfg = _build_config(
subset_cols, key_cols, fuzzy_cols,
algorithm, threshold, normalize_map,
survivor, date_column, merge,
)
cfg_json = cfg.to_dict()
import json
html_download_button(
"Download config JSON",
json.dumps(cfg_json, indent=2).encode("utf-8"),
file_name="dedup_config.json",
mime="application/json",
)
# Build strategies from selections
strategies = _build_strategies(
subset_cols, key_cols, fuzzy_cols,
algorithm, threshold, normalize_map,
)
# Survivor rule mapping
survivor_map = {
"first": SurvivorRule.KEEP_FIRST,
"last": SurvivorRule.KEEP_LAST,
"most-complete": SurvivorRule.KEEP_MOST_COMPLETE,
"most-recent": SurvivorRule.KEEP_MOST_RECENT,
}
return {
"strategies": strategies,
"survivor_rule": survivor_map[survivor],
"date_column": date_column,
"merge": merge,
}
def _build_strategies(
subset_cols: list[str],
key_cols: list[str],
fuzzy_cols: list[str],
algorithm: str,
threshold: int,
normalize_map: dict[str, str],
) -> Optional[list[MatchStrategy]]:
"""Build MatchStrategy list from GUI selections. Returns None for auto-detect."""
strategies: list[MatchStrategy] = []
# If user selected columns explicitly, build from those
if subset_cols or fuzzy_cols:
target_cols = subset_cols if subset_cols else fuzzy_cols
fuzzy_set = set(fuzzy_cols)
col_strats: list[ColumnMatchStrategy] = []
for col in target_cols:
norm = None
if col in normalize_map:
norm = NormalizerType(normalize_map[col])
if col in fuzzy_set:
algo = Algorithm(algorithm)
thresh = float(threshold)
else:
algo = Algorithm.EXACT
thresh = 100.0
col_strats.append(ColumnMatchStrategy(
column=col, algorithm=algo, threshold=thresh, normalizer=norm,
))
strategies.append(MatchStrategy(column_strategies=col_strats))
# Add strong key strategies
if key_cols:
for col in key_cols:
strategies.append(MatchStrategy(column_strategies=[
ColumnMatchStrategy(column=col, algorithm=Algorithm.EXACT, threshold=100.0)
]))
return strategies if strategies else None
def _build_config(
subset_cols, key_cols, fuzzy_cols,
algorithm, threshold, normalize_map,
survivor, date_column, merge,
) -> DeduplicationConfig:
"""Build a DeduplicationConfig from GUI state."""
cfg = DeduplicationConfig(
survivor_rule=survivor.replace("-", "_"),
date_column=date_column,
merge=merge,
subset_columns=subset_cols or None,
fuzzy_columns=fuzzy_cols or None,
default_algorithm=algorithm,
default_threshold=float(threshold),
normalize_map=normalize_map or None,
)
strategies = _build_strategies(
subset_cols, key_cols, fuzzy_cols,
algorithm, threshold, normalize_map,
)
if strategies:
cfg.strategies = [
StrategyConfig(columns=[
ColumnStrategyConfig(
column=cs.column,
algorithm=cs.algorithm.value,
threshold=cs.threshold,
normalizer=cs.normalizer.value if cs.normalizer else None,
)
for cs in s.column_strategies
])
for s in strategies
]
return cfg
# ---------------------------------------------------------------------------
# Match group review card
# ---------------------------------------------------------------------------
def _find_differing_cols(
group: MatchResult, df: pd.DataFrame, display_cols: list[str],
) -> list[str]:
"""Return columns where values differ across rows in the group."""
differing = []
for col in display_cols:
values = set()
for idx in group.row_indices:
values.add(str(df.iloc[idx].get(col, "")).strip())
if len(values) > 1:
differing.append(col)
return differing
def match_group_card(
group: MatchResult,
df: pd.DataFrame,
group_num: int,
) -> None:
"""Render an expandable match group card with side-by-side diff.
Users select which rows to keep via checkboxes. When exactly one row
is kept they can also cherry-pick column values from the other rows.
Decision format stored in ``st.session_state["review_decisions"]``::
{group_id: {"keep_indices": [int, ...], "overrides": {col: val}}}
"""
confidence = group.confidence
matched_on = ", ".join(group.matched_on)
n_rows = len(group.row_indices)
gid = group.group_id
decisions = st.session_state.get("review_decisions", {})
has_decision = gid in decisions
decision_dict = decisions.get(gid, {})
keep_indices = decision_dict.get("keep_indices", []) if has_decision else []
overrides = decision_dict.get("overrides", {}) if has_decision else {}
# Build label — append decision status if already decided
label = (
f"Group {group_num}: {n_rows} rows "
f"(confidence: {confidence:.0f}%) "
f"[{matched_on}]"
)
if has_decision:
if len(keep_indices) == n_rows:
label += " — Kept All"
elif len(keep_indices) == 1:
label += " — Merged (customized)" if overrides else " — Merged"
else:
label += f" — Split (kept {len(keep_indices)} of {n_rows})"
# Decided groups collapse; undecided groups stay open
expanded = not has_decision
display_cols = [c for c in df.columns if not str(c).startswith("_norm_")]
differing_cols = _find_differing_cols(group, df, display_cols)
with st.expander(label, expanded=expanded):
if has_decision:
# --- Decided state: read-only table with diff highlighting ---
rows_data = []
for idx in group.row_indices:
row = {"Row": idx + 1}
for col in display_cols:
row[col] = df.iloc[idx].get(col, "")
rows_data.append(row)
compare_df = pd.DataFrame(rows_data).set_index("Row")
def _highlight_diffs(s: pd.Series) -> list[str]:
styles = []
first_val = str(s.iloc[0]).strip() if len(s) > 0 else ""
for val in s:
val_str = str(val).strip()
if val_str != first_val and val_str and first_val:
styles.append(
"background-color: rgba(245, 166, 35, 0.2)"
)
elif not val_str and first_val:
styles.append(
"background-color: rgba(240, 82, 82, 0.1)"
)
else:
styles.append("")
return styles
styled = compare_df.style.apply(_highlight_diffs, axis=0)
st.dataframe(styled, use_container_width=True)
if len(keep_indices) == n_rows:
st.info("Decision: Kept All")
elif len(keep_indices) == 1:
msg = "Decision: Merge"
if overrides:
msg += f" ({len(overrides)} column(s) customized)"
st.success(msg)
else:
kept = ", ".join(str(i + 1) for i in sorted(keep_indices))
st.success(
f"Decision: Keep rows {kept} "
f"(removing {n_rows - len(keep_indices)})"
)
def _undo(g=gid):
st.session_state["review_decisions"].pop(g, None)
st.session_state.pop(f"editor_{g}", None)
st.button("Undo", key=f"undo_{gid}", on_click=_undo)
else:
# --- Undecided: interactive editor with inline checkboxes & dropdowns ---
editor_rows = []
for idx in group.row_indices:
row_data = {"Keep": idx == group.survivor_index, "Row": idx + 1}
for col in display_cols:
row_data[col] = str(df.iloc[idx].get(col, ""))
editor_rows.append(row_data)
editor_df = pd.DataFrame(editor_rows)
col_config = {
"Keep": st.column_config.CheckboxColumn(
"Keep", default=True, width="small",
),
"Row": st.column_config.NumberColumn("Row", width="small"),
}
for col in differing_cols:
vals = []
for idx in group.row_indices:
v = str(df.iloc[idx].get(col, "")).strip()
if v not in vals:
vals.append(v)
if "" not in vals:
vals.append("")
col_config[col] = st.column_config.SelectboxColumn(
col, options=vals, required=False,
)
disabled_cols = ["Row"] + [
c for c in display_cols if c not in differing_cols
]
edited = st.data_editor(
editor_df,
column_config=col_config,
disabled=disabled_cols,
use_container_width=True,
hide_index=True,
key=f"editor_{gid}",
)
# Read which rows are checked
checked = [
idx
for i, idx in enumerate(group.row_indices)
if edited.iloc[i]["Keep"]
]
if differing_cols:
st.caption(
f"Columns with differences (editable): "
f"{', '.join(differing_cols)}"
)
# Status + surviving rows preview
if len(checked) == 0:
st.warning("Select at least one row to keep.")
else:
if len(checked) == n_rows:
st.caption("Keeping all rows (no duplicates removed)")
elif len(checked) == 1:
st.caption(
f"Merging into Row {checked[0] + 1}, "
f"removing {n_rows - 1} row(s)"
)
else:
st.caption(
f"Keeping {len(checked)} rows, "
f"removing {n_rows - len(checked)}"
)
# Build preview of surviving rows with edits applied
checked_positions = [
i for i, idx in enumerate(group.row_indices)
if idx in checked
]
preview = edited.iloc[checked_positions].drop(
columns=["Keep"],
).reset_index(drop=True)
st.markdown("**Surviving rows preview:**")
st.dataframe(preview, use_container_width=True, hide_index=True)
# Confirm
def _on_confirm(
g=gid, indices=list(group.row_indices),
diff=differing_cols, surv=group.survivor_index,
):
editor_state = st.session_state.get(f"editor_{g}", {})
ed_rows = editor_state.get("edited_rows", {})
# Determine which rows to keep
keep = []
for i, idx in enumerate(indices):
changes = ed_rows.get(i, {})
default_keep = idx == surv
if changes.get("Keep", default_keep):
keep.append(idx)
if not keep:
keep = list(indices)
# Column overrides (single-survivor merge only)
ovr: dict[str, str] = {}
if len(keep) == 1:
surv_idx = keep[0]
surv_pos = indices.index(surv_idx)
surv_changes = ed_rows.get(surv_pos, {})
the_df = st.session_state["df"]
for c in diff:
if c in surv_changes:
new_val = (
str(surv_changes[c])
if surv_changes[c] is not None
else ""
)
orig = str(
the_df.iloc[surv_idx].get(c, "")
).strip()
if new_val.strip() != orig:
ovr[c] = new_val
st.session_state["review_decisions"][g] = {
"keep_indices": keep,
"overrides": ovr,
}
st.button(
"Confirm",
key=f"confirm_{gid}",
type="primary",
on_click=_on_confirm,
disabled=(len(checked) == 0),
)
# ---------------------------------------------------------------------------
# Results summary + downloads
# ---------------------------------------------------------------------------
def results_summary(
result: DeduplicationResult,
original_df: pd.DataFrame,
) -> None:
"""Render summary stats and download buttons."""
removed = result.original_row_count - len(result.deduplicated_df)
# Summary metrics
col1, col2, col3, col4 = st.columns(4)
col1.metric("Rows In", result.original_row_count)
col2.metric("Rows Out", len(result.deduplicated_df))
col3.metric("Removed", removed)
col4.metric("Groups", len(result.match_groups))
st.divider()
# Download buttons
dl_left, dl_mid, dl_right = st.columns(3)
with dl_left:
csv_bytes = result.deduplicated_df.to_csv(index=False).encode("utf-8-sig")
html_download_button(
"Download Deduplicated CSV",
csv_bytes,
file_name="deduplicated.csv",
mime="text/csv",
)
with dl_mid:
if not result.removed_df.empty:
removed_bytes = result.removed_df.to_csv(index=False).encode("utf-8-sig")
html_download_button(
"Download Removed Rows",
removed_bytes,
file_name="removed_rows.csv",
mime="text/csv",
)
with dl_right:
if result.match_groups:
groups_data = _build_match_groups_csv(result, original_df)
html_download_button(
"Download Match Groups Report",
groups_data,
file_name="match_groups.csv",
mime="text/csv",
)
def apply_review_decisions(
original_df: pd.DataFrame,
match_groups: list[MatchResult],
decisions: dict,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Build final DataFrames by applying user review decisions.
Supports three modes per group:
- **Merge** (1 row kept): single survivor with optional column overrides.
- **Split** (some rows kept): selected rows survive, others removed.
- **Keep all** (all rows kept): no rows removed.
- **No decision**: engine default (single survivor).
Returns ``(deduplicated_df, removed_df)``.
"""
remove_indices: set[int] = set()
row_overrides: dict[int, dict[str, str]] = {}
for group in match_groups:
gid = group.group_id
decision = decisions.get(gid)
# No decision yet — accept with engine defaults
if decision is None:
keep = {group.survivor_index}
else:
keep = set(decision.get("keep_indices", group.row_indices))
# Safety: never remove all rows in a group
if not keep:
keep = set(group.row_indices)
for idx in group.row_indices:
if idx not in keep:
remove_indices.add(idx)
# Column overrides (only meaningful for single-survivor merge)
ovr = decision.get("overrides", {}) if decision else {}
if ovr and len(keep) == 1:
row_overrides[next(iter(keep))] = ovr
# Build output DataFrames
kept = [i for i in range(len(original_df)) if i not in remove_indices]
if row_overrides:
rows = []
for i in kept:
row = original_df.iloc[i].copy()
if i in row_overrides:
for col, val in row_overrides[i].items():
if col in row.index:
row[col] = val
rows.append(row)
deduped = pd.DataFrame(rows).reset_index(drop=True)
else:
deduped = original_df.iloc[kept].copy().reset_index(drop=True)
removed = (
original_df.iloc[sorted(remove_indices)].copy().reset_index(drop=True)
if remove_indices
else pd.DataFrame()
)
return deduped, removed
def _build_match_groups_csv(
result: DeduplicationResult,
original_df: pd.DataFrame,
) -> bytes:
"""Build the match groups audit CSV as bytes."""
rows = []
for g in result.match_groups:
for idx in g.row_indices:
row_data = {
"_group_id": g.group_id + 1,
"_is_survivor": idx == g.survivor_index,
"_confidence": g.confidence,
"_matched_on": ", ".join(g.matched_on),
"_original_row": idx + 1,
}
for col in original_df.columns:
if not str(col).startswith("_norm_"):
row_data[col] = original_df.iloc[idx].get(col, "") if idx < len(original_df) else ""
rows.append(row_data)
groups_df = pd.DataFrame(rows)
return groups_df.to_csv(index=False).encode("utf-8-sig")
# ---------------------------------------------------------------------------
# Analyzer integration (upload-time data quality findings)
# ---------------------------------------------------------------------------
# Tool id -> friendly display name. Single source of truth for the GUI; the
# CLI keeps its own copy so each entrypoint stays self-contained.
TOOL_DISPLAY_NAMES: dict[str, str] = {
"01_deduplicator": "Find Duplicates",
"02_text_cleaner": "Clean Text",
"03_format_standardizer": "Standardize Formats",
"04_missing_handler": "Fix Missing Values",
"05_column_mapper": "Map Columns",
"06_outlier_detector": "Find Unusual Values",
"07_multi_file_merger": "Combine Files",
"08_validator_reporter": "Quality Check",
"09_pipeline_runner": "Automated Workflows",
}
_SEVERITY_ICON: dict[str, str] = {
"info": "",
"warn": "⚠️",
"error": "🛑",
}
_SEVERITY_COLOR: dict[str, str] = {
"info": "blue",
"warn": "orange",
"error": "red",
}
# Map tool id to the streamlit page path under src/gui/. Skipped tools (no
# page yet) return empty string and the "Open" button is omitted.
_TOOL_PAGE_PATHS: dict[str, str] = {
"01_deduplicator": "pages/1_Deduplicator.py",
"02_text_cleaner": "pages/2_Text_Cleaner.py",
"03_format_standardizer": "pages/3_Format_Standardizer.py",
"04_missing_handler": "pages/4_Missing_Values.py",
"05_column_mapper": "pages/5_Column_Mapper.py",
"06_outlier_detector": "pages/6_Outlier_Detector.py",
"07_multi_file_merger": "pages/7_Multi_File_Merger.py",
"08_validator_reporter": "pages/8_Validator_Reporter.py",
"09_pipeline_runner": "pages/9_Pipeline_Runner.py",
}
def tool_display_name(tool_id: str) -> str:
"""Map a stable tool id to its GUI display name; falls back to the id.
Routes through the active language pack so the home grid, findings
panel headers, and "Open tool" buttons all stay in sync with the
sidebar's language selection.
"""
if not tool_id:
return _t("findings.untargeted_label")
translated = _t(f"tools.{tool_id}.name")
if translated != f"tools.{tool_id}.name":
return translated
return TOOL_DISPLAY_NAMES.get(tool_id, tool_id)
def _tool_page_slug(tool_id: str) -> str:
return _TOOL_PAGE_PATHS.get(tool_id, "")
def render_findings_panel(
findings,
*,
header: str | None = None,
key_namespace: str = "",
) -> None:
"""Render a list of :class:`Finding` objects grouped by tool.
Each tool gets a header with the count, an open-tool button, and a list
of the findings underneath. Severity icon + count are shown inline so
the user can decide which tool to open first.
"""
from src.core.analyze import findings_by_tool # local import to avoid cycle
from src.core.text_clean import hidden_char_css
if header is None:
header = _t("findings.header")
if not findings:
st.success(_t("findings.none"))
return
# Inject the hidden-char badge styles once so every sample value below
# can render leading/trailing whitespace and invisibles as visible badges.
st.markdown(hidden_char_css() + _SAMPLE_TABLE_CSS, unsafe_allow_html=True)
by_sev: dict[str, int] = {}
for f in findings:
by_sev[f.severity] = by_sev.get(f.severity, 0) + 1
sev_summary = " · ".join(
_t(
"findings.severity_summary_segment",
icon=_SEVERITY_ICON[s], n=by_sev[s], severity=s,
)
for s in ("error", "warn", "info") if by_sev.get(s)
)
st.markdown(f"### {header}")
st.caption(sev_summary)
grouped = findings_by_tool(findings)
untargeted = [f for f in findings if not f.tool]
for tool_id in sorted(grouped):
items = grouped[tool_id]
name = tool_display_name(tool_id)
with st.expander(
_t("findings.tool_section_label", tool=name, n=len(items)),
expanded=any(f.severity == "error" for f in items),
):
for f in items:
_render_one_finding(f)
page_slug = _tool_page_slug(tool_id)
if page_slug:
# Render as a primary (red) ``st.button`` rather than the
# subtle ``st.page_link`` we used before — the previous
# rendering blended into the page, making the per-tool
# jump non-obvious. The button triggers ``st.switch_page``
# so navigation is still a soft switch (no full reload).
#
# ``key_namespace`` is hashed into the widget key so the
# home page (which calls this once PER uploaded file)
# doesn't collide on the shared tool_id — two files both
# having Clean Text findings would otherwise produce two
# buttons with the same key and Streamlit refuses.
import hashlib as _hashlib
ns = _hashlib.sha1(
(key_namespace or "").encode("utf-8"),
usedforsecurity=False,
).hexdigest()[:8]
if st.button(
_t("findings.open_tool", tool=name),
key=f"_findings_open_{tool_id}_{ns}",
type="primary",
use_container_width=False,
):
st.switch_page(page_slug)
if untargeted:
with st.expander(
_t("findings.other_section_label", n=len(untargeted)),
expanded=False,
):
for f in untargeted:
_render_one_finding(f)
_PREVIEW_TABLE_CSS = """
<style>
.hidden-aware-preview {
width: 100%;
border-collapse: collapse;
font-size: 0.9em;
}
.hidden-aware-preview th,
.hidden-aware-preview td {
padding: 4px 8px;
border: 1px solid #eee;
text-align: left;
vertical-align: top;
font-family: ui-monospace, SFMono-Regular, monospace;
/* pre-wrap so internal ASCII whitespace and embedded newlines render
as the user wrote them; otherwise browsers collapse adjacent spaces. */
white-space: pre-wrap;
word-break: break-word;
max-width: 32em;
}
.hidden-aware-preview thead th {
background: #f6f8fa;
position: sticky;
top: 0;
}
.hidden-aware-preview tbody tr:nth-child(even) { background: #fafafa; }
.hidden-aware-preview .row-num {
color: #888;
font-family: inherit;
background: #f6f8fa;
text-align: right;
}
.hidden-aware-preview-wrap {
max-height: 26rem;
overflow: auto;
border: 1px solid #eee;
border-radius: 4px;
}
</style>
"""
def render_hidden_aware_preview(
df,
*,
n_rows: int = 10,
caption: str | None = None,
) -> None:
"""Render a DataFrame preview that shows hidden characters in every cell.
Used for the Clean Text tool's "before" and "after" previews so the user
can actually see the leading/trailing whitespace, NBSP padding,
zero-width characters, and smart punctuation that the cleaner is going
to remove (or just removed). A plain ``st.dataframe`` collapses outer
ASCII whitespace and renders invisibles as nothing, defeating the
point of a preview in a cleanup tool.
Headers and cell values are both routed through
:func:`visualize_hidden_html` with ``mark_outer_whitespace=True``.
"""
import pandas as pd
from src.core.text_clean import hidden_char_css, visualize_hidden_html
if df is None or len(df) == 0:
st.info("No rows to preview.")
return
sliced = df.head(n_rows) if len(df) > n_rows else df
st.markdown(hidden_char_css() + _PREVIEW_TABLE_CSS, unsafe_allow_html=True)
if caption:
st.caption(caption)
header_cells = "".join(
f"<th>{visualize_hidden_html(str(c), mark_outer_whitespace=True)}</th>"
for c in sliced.columns
)
body_rows: list[str] = []
for row_idx, (orig_idx, row) in enumerate(sliced.iterrows(), start=1):
cells = ["<td class='row-num'>" + str(row_idx) + "</td>"]
for col in sliced.columns:
value = row[col]
if isinstance(value, str):
rendered = visualize_hidden_html(value, mark_outer_whitespace=True)
elif pd.isna(value):
rendered = "<span style='color:#aaa'>NaN</span>"
else:
# Non-string scalars (numerics, bools) just stringify; they
# won't have invisible chars but we still need html-escape.
rendered = visualize_hidden_html(str(value))
cells.append(f"<td>{rendered}</td>")
body_rows.append("<tr>" + "".join(cells) + "</tr>")
st.markdown(
"<div class='hidden-aware-preview-wrap'>"
"<table class='hidden-aware-preview'>"
f"<thead><tr><th class='row-num'>#</th>{header_cells}</tr></thead>"
f"<tbody>{''.join(body_rows)}</tbody>"
"</table>"
"</div>",
unsafe_allow_html=True,
)
_SAMPLE_TABLE_CSS = """
<style>
.findings-sample-table {
width: 100%;
border-collapse: collapse;
font-size: 0.9em;
}
.findings-sample-table th,
.findings-sample-table td {
padding: 4px 8px;
border-bottom: 1px solid #eee;
text-align: left;
vertical-align: top;
}
.findings-sample-table td.value {
font-family: ui-monospace, SFMono-Regular, monospace;
/* pre-wrap so any ASCII whitespace inside the value is preserved
visually (browsers collapse adjacent spaces by default). */
white-space: pre-wrap;
word-break: break-word;
}
.findings-sample-table tbody tr:hover { background: #fafafa; }
</style>
"""
def _render_one_finding(f) -> None:
from src.core.text_clean import visualize_hidden_html
color = _SEVERITY_COLOR[f.severity]
icon = _SEVERITY_ICON[f.severity]
column_part = f" in `{f.column}`" if getattr(f, "column", None) else ""
st.markdown(
f"{icon} :{color}[**{f.id}**]{column_part}{f.description}"
)
if f.samples:
# Render samples as an HTML table so leading/trailing whitespace
# and invisible characters in the value column show up as badges.
# A plain st.dataframe collapses outer whitespace and renders
# NBSP/ZWSP as nothing, defeating the point of the audit.
rows_html = []
for row, col, value in f.samples:
rendered_value = visualize_hidden_html(
str(value), mark_outer_whitespace=True,
)
rendered_col = visualize_hidden_html(
str(col), mark_outer_whitespace=True,
)
rows_html.append(
"<tr>"
f"<td>{int(row) + 1 if isinstance(row, int) else row}</td>"
f"<td><code>{rendered_col}</code></td>"
f"<td class='value'>{rendered_value}</td>"
"</tr>"
)
st.markdown(
"<table class='findings-sample-table'>"
"<thead><tr>"
"<th>Row</th><th>Column</th><th>Value</th>"
"</tr></thead>"
f"<tbody>{''.join(rows_html)}</tbody>"
"</table>",
unsafe_allow_html=True,
)
def upload_and_analyze_section() -> None:
"""Render the upload + analyze panel for the home page.
Stashes the uploaded file (name + bytes) and findings in session state
so individual tool pages can pick them up if they want to skip their
own uploader. Each tool page already has its own uploader today, so
this is purely additive.
"""
st.markdown(f"### {_t('upload.heading')}")
st.caption(_t("upload.intro"))
st.caption(_t("upload.limits"))
uploaded = st.file_uploader(
_t("upload.uploader_label"),
type=["csv", "tsv", "xlsx", "xls"],
key="home_upload",
help=_t("upload.uploader_help"),
)
if uploaded is None:
return
# Stash on every fresh upload so all tool pages can pick it up.
if (
st.session_state.get("home_uploaded_name") != uploaded.name
or st.session_state.get("home_uploaded_size") != uploaded.size
):
st.session_state["home_uploaded_name"] = uploaded.name
st.session_state["home_uploaded_size"] = uploaded.size
st.session_state["home_uploaded_bytes"] = uploaded.getvalue()
# Drop stale findings on a new upload.
st.session_state.pop("home_findings", None)
st.session_state.pop("home_skipped", None)
col_run, col_skip, _ = st.columns([1, 1, 4])
with col_run:
run_clicked = st.button(_t("upload.run_button"), type="primary", key="home_run_analysis")
with col_skip:
skip_clicked = st.button(_t("upload.skip_button"), key="home_skip_analysis")
if skip_clicked:
st.session_state["home_findings"] = []
st.session_state["home_skipped"] = True
if run_clicked:
with st.spinner(_t("upload.scanning")):
findings = _run_analysis_on_upload(uploaded)
st.session_state["home_findings"] = findings
st.session_state["home_skipped"] = False
findings = st.session_state.get("home_findings")
if findings is None:
return
if st.session_state.get("home_skipped"):
st.info(_t("upload.skipped_notice"))
return
st.divider()
render_findings_panel(findings)
def _run_analysis_on_upload(uploaded):
"""Read the uploaded file with pre-parse repair, then analyze.
Errors are caught and surfaced as a single synthetic ``Finding``
instead of bubbling a traceback up into the page chrome. A bad
file (empty bytes, unreadable encoding, pandas parse failure on
one of several uploaded files) should yield a clean red banner for
that file, not kill the whole multi-file analysis run.
"""
import hashlib
from src.audit import log_event, log_exception
from src.core.analyze import Finding, analyze
from src.core.errors import format_for_user
from src.core.io import repair_bytes
name = uploaded.name
data = uploaded.getvalue()
suffix = name.rsplit(".", 1)[-1].lower() if "." in name else ""
digest = hashlib.sha1(
data, usedforsecurity=False,
).hexdigest()[:12] if data else "empty"
log_event(
"analyze",
f"Analyzing {name}",
filename=name,
bytes=len(data),
sha1_12=digest,
suffix=suffix,
)
def _error_finding(description: str, fid: str = "analysis_failed") -> list[Finding]:
return [Finding(
id=fid,
severity="error",
tool="",
count=1,
description=description,
confidence="high",
fix_action="",
)]
if not data:
log_event(
"analyze",
f"Skipping {name} — 0 bytes",
level="warn",
filename=name,
outcome="empty_upload",
)
return _error_finding(
f"`{name}` is empty (0 bytes). Please re-upload — the bytes "
f"may not have transferred correctly from your browser.",
fid="empty_upload",
)
try:
if suffix in ("xlsx", "xls"):
df = pd.read_excel(io.BytesIO(data), dtype=str, keep_default_na=False)
findings = analyze(df)
log_event(
"analyze",
f"Analyzed {name} ({len(findings)} findings)",
filename=name,
bytes=len(data),
sha1_12=digest,
findings=len(findings),
rows=len(df), cols=len(df.columns),
)
return findings
# CSV / TSV: run repair_bytes so the user sees csv_* findings.
text_head = data[:4096].decode("utf-8", errors="replace")
delim = "\t" if suffix == "tsv" else ","
if delim == ",":
for cand in ("\t", ";", "|"):
if text_head.count(cand) > text_head.count(",") * 1.5:
delim = cand
break
repair = repair_bytes(data, encoding="utf-8", delimiter=delim)
if not repair.repaired_bytes:
log_event(
"analyze",
f"Skipping {name} — empty after repair",
level="warn",
filename=name,
outcome="empty_after_repair",
)
return _error_finding(
f"`{name}` is empty after pre-parse repair "
f"(original was {len(data)} bytes — likely all NUL "
f"bytes or stripped during a BOM/line-ending pass). "
f"Open the file in a text editor to confirm it has "
f"content.",
fid="empty_after_repair",
)
df = pd.read_csv(
io.BytesIO(repair.repaired_bytes),
encoding="utf-8", delimiter=delim,
dtype=str, keep_default_na=False, on_bad_lines="warn",
)
findings = analyze(df, repair_result=repair)
log_event(
"analyze",
f"Analyzed {name} ({len(findings)} findings)",
filename=name,
bytes=len(data),
sha1_12=digest,
findings=len(findings),
rows=len(df), cols=len(df.columns),
delimiter=repr(delim),
)
return findings
except pd.errors.EmptyDataError as e:
log_exception(
f"analyze({name})",
e,
filename=name,
outcome="empty_after_repair",
)
return _error_finding(
f"`{name}` could not be parsed — pandas reports no columns "
f"in the file. Original size was {len(data)} bytes. Open "
f"the file in a text editor to confirm the header row is "
f"present and uses the same delimiter as the data rows.",
fid="empty_after_repair",
)
except Exception as e:
log_exception(
f"analyze({name})",
e,
filename=name,
outcome="analysis_failed",
)
return _error_finding(
f"`{name}` could not be analyzed: {format_for_user(e)}",
)
def findings_count_for_tool(tool_id: str) -> int:
"""How many findings in session state target *tool_id*; 0 when none.
Used by the home-page tool grid to badge cards that have actionable
findings without re-running the analyzer.
"""
findings = st.session_state.get("home_findings") or []
return sum(1 for f in findings if f.tool == tool_id)
# ---------------------------------------------------------------------------
# Cross-page upload pickup
# ---------------------------------------------------------------------------
class _StashedUpload:
"""Duck-types ``st.runtime.uploaded_file_manager.UploadedFile`` enough
for the tool pages: ``.name``, ``.size``, ``.getvalue()``.
Tool pages that previously consumed a Streamlit ``UploadedFile`` can
accept this in its place without changes.
"""
__slots__ = ("name", "size", "_data")
def __init__(self, name: str, data: bytes) -> None:
self.name = name
self.size = len(data)
self._data = data
def getvalue(self) -> bytes:
return self._data
def read(self) -> bytes:
return self._data
def pickup_or_upload(
*,
label: str,
key: str,
types: list[str],
help: str | None = None,
):
"""Return an upload object, preferring the home-page upload when present.
Behavior:
- If ``st.session_state['home_uploaded_bytes']`` is set and the user
hasn't asked for a different file on this page, render a banner
("Using *<name>* from upload screen") plus a "Use a different file"
button, and return a :class:`_StashedUpload` shim.
- Otherwise render the standard ``st.file_uploader`` with the supplied
*label*, *key*, and *types*. Returns the Streamlit ``UploadedFile``
directly (or ``None`` if nothing uploaded).
The ``_StashedUpload`` shim exposes ``.name``, ``.size``, and
``.getvalue()`` so existing tool-page code that consumes a Streamlit
upload object works without changes.
"""
override_key = f"{key}__override"
has_session_upload = st.session_state.get("home_uploaded_bytes") is not None
use_session = has_session_upload and not st.session_state.get(override_key, False)
if use_session:
name = st.session_state.get("home_uploaded_name") or _t("gate.default_name")
st.info(_t("upload.using_session_file", name=name))
if st.button(_t("upload.use_different_file"), key=f"{key}__pick_diff"):
st.session_state[override_key] = True
st.rerun()
return _StashedUpload(name, st.session_state["home_uploaded_bytes"])
if {"csv", "tsv", "xlsx", "xls"} & set(types):
st.caption(_t("upload.pickup_caption"))
uploaded = st.file_uploader(label, type=types, key=key, help=help)
if uploaded is not None and st.session_state.get(override_key):
# User has uploaded their own file on this page; clear the override
# so the next visit to a tool page starts fresh.
pass
if uploaded is None and st.session_state.get(override_key) and has_session_upload:
if st.button(_t("upload.switch_back"), key=f"{key}__switch_back"):
st.session_state[override_key] = False
st.rerun()
return uploaded