"""DataTools Standardize Formats — Streamlit page.""" from __future__ import annotations import io import json import sys from pathlib import Path import pandas as pd import streamlit as st _project_root = Path(__file__).resolve().parent.parent.parent.parent if str(_project_root) not in sys.path: sys.path.insert(0, str(_project_root)) from src.gui.components import ( back_to_home_link, render_sticky_footer, render_tool_header, hide_streamlit_chrome, html_download_button, pickup_or_upload, require_feature_or_render_upgrade, ) from src.i18n import t from src.core.format_standardize import ( PRESETS, FieldType, StandardizeOptions, standardize_dataframe, ) from src.license import FeatureFlag hide_streamlit_chrome() render_sticky_footer() back_to_home_link() from src.audit import log_page_open log_page_open("3_Format_Standardizer") require_feature_or_render_upgrade(FeatureFlag.FORMAT_STANDARDIZER) # --------------------------------------------------------------------------- # Header # --------------------------------------------------------------------------- render_tool_header("03_format_standardizer") # --------------------------------------------------------------------------- # File upload # --------------------------------------------------------------------------- uploaded = pickup_or_upload( label="Import CSV or Excel file", key="fmtstd_file_upload", types=["csv", "tsv", "xlsx", "xls"], ) if uploaded is None: st.info("Import a CSV, TSV, or Excel file to begin.") st.stop() @st.cache_data(show_spinner=False) def _read_uploaded(name: str, data: bytes) -> pd.DataFrame: """Read the uploaded bytes into a DataFrame, treating all cells as strings.""" suffix = Path(name).suffix.lower() bio = io.BytesIO(data) if suffix in (".xlsx", ".xls"): return pd.read_excel(bio, dtype=str, keep_default_na=False) for enc in ("utf-8", "utf-8-sig", "latin-1"): try: bio.seek(0) sep = "\t" if suffix == ".tsv" else "," return pd.read_csv( bio, dtype=str, keep_default_na=False, encoding=enc, sep=sep, on_bad_lines="warn", ) except UnicodeDecodeError: continue bio.seek(0) return pd.read_csv(bio, dtype=str, keep_default_na=False, encoding="latin-1") try: df = _read_uploaded(uploaded.name, uploaded.getvalue()) except UnicodeDecodeError as e: st.error( f"**Could not decode `{uploaded.name}`**\n\n" f"The file isn't UTF-8, UTF-8-with-BOM, or Latin-1.\n\n" f"_Underlying error: {e}_\n\n" f"Try re-saving the file as UTF-8 from the source application." ) st.stop() except Exception as e: from src.core.errors import format_for_user st.error( f"**Could not read `{uploaded.name}`**\n\n" f"```\n{format_for_user(e)}\n```" ) st.stop() # Collapse the input preview once the user has clicked Standardize Formats # so the Results section below is the primary visual focus. The user can # re-expand the expander to re-inspect the source rows. _has_result = st.session_state.get("fmtstd_result") is not None with st.expander(f"Preview: {uploaded.name}", expanded=not _has_result): st.caption(f"{len(df)} rows, {len(df.columns)} columns") st.dataframe(df.head(10), width="stretch") st.divider() # --------------------------------------------------------------------------- # Auto-detect column types # --------------------------------------------------------------------------- # # A first pass over a 200-row sample picks a likely field type per column. # It's a hint, not a commitment — every column shows a selectbox the user # can override. Heuristics deliberately err toward "(skip)" rather than # guessing wrong, since wrong guesses produce misleading change audits. import re as _re _DATE_HINT_RE = _re.compile( r"^\s*\d{1,4}[-/.]\d{1,2}[-/.]\d{1,4}\s*$" r"|^\s*[A-Za-z]{3,9}\s+\d{1,2}[, ]+\d{2,4}\s*$" r"|^\s*\d{1,2}\s+[A-Za-z]{3,9}\s+\d{2,4}\s*$" ) _PHONE_HINT_RE = _re.compile(r"^[\s\d().+\-]+$") _CURRENCY_HINT_RE = _re.compile(r"^[\s$€£¥]?\s*-?\d[\d,. ]*\d?\s*$|^\s*\(\s*[$€£¥]?\d.*\)\s*$") _BOOL_TOKENS = {"yes", "no", "y", "n", "true", "false", "t", "f", "0", "1"} def _detect_field_type(col: str, samples: list[str]) -> FieldType | None: """Return a likely :class:`FieldType` for *col*, or None when unsure. Strategy: drop empties, then require ≥80% of remaining sample cells to fit the type's hint regex. Boolean check runs first because ``0/1`` also matches the currency regex; date/phone/currency next; address/name fall back to header-name keywords because their cell shapes overlap with plain free text. """ cells = [s.strip() for s in samples if isinstance(s, str) and s.strip()] if not cells: return None n = len(cells) threshold = max(1, int(n * 0.8)) bool_hits = sum(1 for c in cells if c.casefold() in _BOOL_TOKENS) if bool_hits >= threshold: return FieldType.BOOLEAN date_hits = sum(1 for c in cells if _DATE_HINT_RE.match(c)) if date_hits >= threshold: return FieldType.DATE # Phone: digit-heavy, 7+ digits, no letters. phone_hits = 0 for c in cells: if _PHONE_HINT_RE.match(c) and sum(1 for ch in c if ch.isdigit()) >= 7: phone_hits += 1 if phone_hits >= threshold: return FieldType.PHONE currency_hits = sum(1 for c in cells if _CURRENCY_HINT_RE.match(c)) if currency_hits >= threshold: return FieldType.CURRENCY header = col.lower() if any(tok in header for tok in ("address", "addr", "street")): return FieldType.ADDRESS if any(tok in header for tok in ("name", "customer", "contact")): return FieldType.NAME if any(tok in header for tok in ("date", "dob", "birth", "joined", "created")): return FieldType.DATE if any(tok in header for tok in ("phone", "mobile", "tel")): return FieldType.PHONE if any(tok in header for tok in ("price", "amount", "cost", "total", "fee")): return FieldType.CURRENCY if any(tok in header for tok in ("active", "enabled", "is_", "has_", "flag")): return FieldType.BOOLEAN return None # --------------------------------------------------------------------------- # Options # --------------------------------------------------------------------------- # # Wrapped in an outer expander whose default state mirrors the preview # expander above: open before a result exists, folded once the user has # clicked Standardize Formats. Together they push the Results section to # the top of the visible area after a run. column_types: dict[str, FieldType] = {} extra_abbreviations: dict[str, str] = {} with st.expander("Options", expanded=not _has_result): st.subheader("Column types") st.caption( "Assign each column to a field type. Auto-detected suggestions are " "pre-filled; pick **(skip)** to leave a column untouched." ) _FIELD_LABELS = { "(skip)": None, "Date": FieldType.DATE, "Phone": FieldType.PHONE, "Currency": FieldType.CURRENCY, "Name": FieldType.NAME, "Address": FieldType.ADDRESS, "Boolean": FieldType.BOOLEAN, } _LABEL_BY_TYPE = {v: k for k, v in _FIELD_LABELS.items()} _LABELS = list(_FIELD_LABELS.keys()) sample_size = min(len(df), 200) sample_df = df.head(sample_size) cols_per_row = 3 columns_iter = list(df.columns) for i in range(0, len(columns_iter), cols_per_row): cols_block = st.columns(cols_per_row) for j, col_name in enumerate(columns_iter[i:i + cols_per_row]): with cols_block[j]: detected = _detect_field_type(col_name, sample_df[col_name].tolist()) default_label = _LABEL_BY_TYPE.get(detected, "(skip)") chosen = st.selectbox( col_name, _LABELS, index=_LABELS.index(default_label), key=f"fmtstd_type__{col_name}", ) ft = _FIELD_LABELS[chosen] if ft is not None: column_types[col_name] = ft st.divider() st.subheader("Format options") # --------------------------------------------------------------------------- # Preset bundle picker # --------------------------------------------------------------------------- # # Picking a preset rewrites every option below to that preset's defaults. # It does NOT touch column-type assignments — those are user-driven and # orthogonal. To make the rewrite stick across the rerun, we stash the # preset values into the per-option session keys; the widgets below read # those keys via their ``index``/``value`` arguments. _PRESET_LABELS = { "us-default": "US (default) — ISO 8601 dates · E.164 phones · USD", "european": "European — DMY input · INTL phones · EUR comma decimal", "uk": "UK — DD/MM/YYYY · GB phones · Yes/No booleans", "iso-strict": "ISO Strict — ISO 8601 · bare-number currency · true/false", "legacy-us": "Legacy US — MM/DD/YYYY · National phones · Yes/No", "custom": "Custom — keep current settings", } preset_choice = st.radio( "Standards preset", list(_PRESET_LABELS.keys()), format_func=lambda k: _PRESET_LABELS[k], index=0, horizontal=False, key="fmtstd_preset", help=( "Pick a published standard or regional convention as the baseline. " "Every option below is still individually overridable; choose " "**Custom** to keep whatever you've manually adjusted." ), ) # Detect a preset switch since the last rerun; when it changes (and the # new choice isn't ``custom``), purge the dependent widget keys so # Streamlit lets their ``index=``/``value=`` defaults take effect on the # new render. Without this clear, prior session_state pins the widget to # the previous preset's choice and the apparent picker becomes a no-op. _DEPENDENT_KEYS = [ "fmtstd_date_format", "fmtstd_date_order", "fmtstd_phone_format", "fmtstd_phone_region", "fmtstd_currency_decimal", "fmtstd_currency_decimals", "fmtstd_currency_preserve", "fmtstd_currency_preserve_code", "fmtstd_name_case", "fmtstd_bool_style", ] _last = st.session_state.get("fmtstd_preset_last") if _last != preset_choice: st.session_state["fmtstd_preset_last"] = preset_choice if preset_choice != "custom": for k in _DEPENDENT_KEYS: st.session_state.pop(k, None) st.rerun() # Map preset → widget-state defaults. Done as labels so the radios/selects # below pick up the right index without us re-implementing each map twice. _PRESET_TO_WIDGETS: dict[str, dict[str, str]] = { "us-default": { "date_format": "YYYY-MM-DD (ISO)", "date_order": "MDY (US)", "phone_format": "E.164 (+15551234567)", "phone_region": "US", "currency_decimal": "dot (1,234.56)", "currency_decimals": 2, "currency_preserve_code": False, "name_case": "Title Case", "boolean_style": "True/False", }, "european": { "date_format": "YYYY-MM-DD (ISO)", "date_order": "DMY (EU)", "phone_format": "International (+1 555-123-4567)", "phone_region": "DE", "currency_decimal": "comma (1.234,56)", "currency_decimals": 2, "currency_preserve_code": True, "name_case": "Title Case", "boolean_style": "True/False", }, "uk": { "date_format": "DD/MM/YYYY", "date_order": "DMY (EU)", "phone_format": "International (+1 555-123-4567)", "phone_region": "GB", "currency_decimal": "dot (1,234.56)", "currency_decimals": 2, "currency_preserve_code": False, "name_case": "Title Case", "boolean_style": "Yes/No", }, "iso-strict": { "date_format": "YYYY-MM-DD (ISO)", "date_order": "MDY (US)", "phone_format": "E.164 (+15551234567)", "phone_region": "US", "currency_decimal": "dot (1,234.56)", "currency_decimals": 0, "currency_preserve_code": True, "name_case": "Title Case", "boolean_style": "true/false", }, "legacy-us": { "date_format": "MM/DD/YYYY", "date_order": "MDY (US)", "phone_format": "National ((555) 123-4567)", "phone_region": "US", "currency_decimal": "dot (1,234.56)", "currency_decimals": 2, "currency_preserve_code": False, "name_case": "Title Case", "boolean_style": "Yes/No", }, } # ``iso-strict`` wants currency with no rounding; the GUI exposes that via # the "preserve original precision" checkbox rather than a sentinel value # in the number-input. Map that here. _PRESET_PRESERVE_DECIMALS: dict[str, bool] = { "iso-strict": True, } def _preset_default(key: str, fallback): """Pull the preset-driven default for *key*, or *fallback* on Custom.""" if preset_choice == "custom": return fallback return _PRESET_TO_WIDGETS[preset_choice].get(key, fallback) opt_cols = st.columns(2) with opt_cols[0]: st.markdown("**Dates**") _DATE_LABELS = ["YYYY-MM-DD (ISO)", "MM/DD/YYYY", "DD/MM/YYYY", "DD-Mon-YYYY", "Mon DD, YYYY"] date_format_label = st.selectbox( "Output format", _DATE_LABELS, index=_DATE_LABELS.index(_preset_default("date_format", "YYYY-MM-DD (ISO)")), key="fmtstd_date_format", ) date_format_map = { "YYYY-MM-DD (ISO)": "%Y-%m-%d", "MM/DD/YYYY": "%m/%d/%Y", "DD/MM/YYYY": "%d/%m/%Y", "DD-Mon-YYYY": "%d-%b-%Y", "Mon DD, YYYY": "%b %d, %Y", } _DATE_ORDER_LABELS = ["MDY (US)", "DMY (EU)"] date_order = st.radio( "Ambiguous input order (e.g. 01/02/2024)", _DATE_ORDER_LABELS, index=_DATE_ORDER_LABELS.index(_preset_default("date_order", "MDY (US)")), horizontal=True, key="fmtstd_date_order", ) st.markdown("**Phones**") _PHONE_LABELS = [ "E.164 (+15551234567)", "International (+1 555-123-4567)", "National ((555) 123-4567)", "Digits only", ] phone_format_label = st.selectbox( "Output format", _PHONE_LABELS, index=_PHONE_LABELS.index(_preset_default("phone_format", "E.164 (+15551234567)")), key="fmtstd_phone_format", ) phone_format_map = { "E.164 (+15551234567)": "E164", "International (+1 555-123-4567)": "INTERNATIONAL", "National ((555) 123-4567)": "NATIONAL", "Digits only": "DIGITS", } phone_region = st.text_input( "Default region (ISO-2)", value=_preset_default("phone_region", "US"), max_chars=2, help="Region used when the input has no country code. ``US``, ``GB``, ``DE``, etc.", key="fmtstd_phone_region", ).upper() or "US" with opt_cols[1]: st.markdown("**Currency**") _CURR_DECIMAL_LABELS = ["dot (1,234.56)", "comma (1.234,56)"] currency_decimal = st.radio( "Decimal separator in input", _CURR_DECIMAL_LABELS, index=_CURR_DECIMAL_LABELS.index(_preset_default("currency_decimal", "dot (1,234.56)")), horizontal=True, key="fmtstd_currency_decimal", ) currency_decimals = st.number_input( "Round to decimals", min_value=0, max_value=8, value=int(_preset_default("currency_decimals", 2)), step=1, key="fmtstd_currency_decimals", ) preserve_decimals = st.checkbox( "Preserve original precision (don't round)", value=_PRESET_PRESERVE_DECIMALS.get(preset_choice, False), key="fmtstd_currency_preserve", ) currency_preserve_code = st.checkbox( "Preserve currency code (emit `USD 1234.56`, `EUR 99.00`, etc.)", value=bool(_preset_default("currency_preserve_code", False)), help=( "Detects an ISO 4217 code or symbol in the input ($/€/£/¥/USD/" "EUR/...) and re-emits it as a space-separated prefix on the " "standardized number. Cells without a currency marker emit " "just the number." ), key="fmtstd_currency_preserve_code", ) st.markdown("**Names**") _NAME_CASE_LABELS = ["Title Case", "UPPER", "lower"] name_case_label = st.selectbox( "Casing", _NAME_CASE_LABELS, index=_NAME_CASE_LABELS.index(_preset_default("name_case", "Title Case")), key="fmtstd_name_case", ) name_case_map = {"Title Case": "title", "UPPER": "upper", "lower": "lower"} st.markdown("**Booleans**") _BOOL_LABELS = ["True/False", "true/false", "Yes/No", "Y/N", "1/0"] boolean_style = st.selectbox( "Output style", _BOOL_LABELS, index=_BOOL_LABELS.index(_preset_default("boolean_style", "True/False")), key="fmtstd_bool_style", ) # --------------------------------------------------------------------------- # Address abbreviations — built-in USPS table is editable # --------------------------------------------------------------------------- # # Users with international addresses (German Strasse, Spanish-language # Avenida, French Boulevard variants) need to override the built-in # table. Show it in a data_editor so the override is visible — the table # is small, this is the right surface. if any(ft == FieldType.ADDRESS for ft in column_types.values()): with st.expander("Custom address abbreviations (advanced)", expanded=False): st.caption( "Add or override entries in the address abbreviation table. " "Each row maps a short form (case-insensitive, periods OK) to " "the long form the standardizer should emit. Built-in USPS " "Pub. 28 entries (`St` → `Street`, `Ave` → `Avenue`, …) apply " "automatically; rows here merge on top and can override them." ) starter = pd.DataFrame( [ {"abbreviation": "", "expansion": ""}, {"abbreviation": "", "expansion": ""}, {"abbreviation": "", "expansion": ""}, ] ) edited = st.data_editor( starter, num_rows="dynamic", width="stretch", column_config={ "abbreviation": st.column_config.TextColumn( "Short form", help="Case-insensitive, trailing period optional. e.g. ``Strasse``", ), "expansion": st.column_config.TextColumn( "Long form", help="What the standardizer emits. e.g. ``Straße``", ), }, key="fmtstd_extra_abbrev", ) for _, row in edited.iterrows(): k = str(row.get("abbreviation") or "").strip() v = str(row.get("expansion") or "").strip() if k and v: extra_abbreviations[k] = v if extra_abbreviations: st.success( f"{len(extra_abbreviations)} custom mapping(s) will merge " "with the built-in table." ) options = StandardizeOptions( column_types=column_types, date_output_format=date_format_map[date_format_label], date_order="MDY" if date_order.startswith("MDY") else "DMY", phone_format=phone_format_map[phone_format_label], # type: ignore[arg-type] phone_region=phone_region, currency_decimal="dot" if currency_decimal.startswith("dot") else "comma", currency_decimals=None if preserve_decimals else int(currency_decimals), currency_preserve_code=currency_preserve_code, name_case=name_case_map[name_case_label], # type: ignore[arg-type] boolean_style=boolean_style, # type: ignore[arg-type] extra_abbreviations=extra_abbreviations, ) # --------------------------------------------------------------------------- # Run # --------------------------------------------------------------------------- st.divider() if not column_types: st.warning("Pick a field type for at least one column to enable standardization.") run_disabled = not column_types if st.button( "Standardize Formats", type="primary", width="stretch", disabled=run_disabled, ): with st.spinner("Standardizing..."): try: result = standardize_dataframe(df, options) except ValueError as e: st.error(str(e)) st.stop() st.session_state["fmtstd_result"] = result from src.audit import log_event log_event("tool_run", "Standardize Formats run", page="3_Format_Standardizer") st.session_state["fmtstd_input_name"] = uploaded.name # One-shot flag picked up on the next pass to scroll the parent # document to the Results anchor (see scroll snippet below). st.session_state["_fmtstd_scroll_to_results"] = True # Force a second rerun so the preview and options expanders see # the new result on the NEXT script pass and collapse themselves. # Without this they stay expanded until the user touches any # other widget. st.rerun() result = st.session_state.get("fmtstd_result") if result is None: st.stop() # --------------------------------------------------------------------------- # Results # --------------------------------------------------------------------------- # Anchor target for the auto-scroll snippet at the end of this block. # A bare ``
`` survives Streamlit's HTML sanitizer (only # `` """, height=1, )