"""DataTools Clean Text — Streamlit page.""" from __future__ import annotations import io import json import sys from pathlib import Path import pandas as pd import streamlit as st _project_root = Path(__file__).resolve().parent.parent.parent.parent if str(_project_root) not in sys.path: sys.path.insert(0, str(_project_root)) from src.gui.components import ( hide_streamlit_chrome, pickup_or_upload, render_hidden_aware_preview, require_feature_or_render_upgrade, require_normalization_gate, ) from src.license import FeatureFlag from src.core.text_clean import ( PRESETS, CleanOptions, clean_dataframe, hidden_char_css, visualize_hidden_html, ) hide_streamlit_chrome() require_feature_or_render_upgrade(FeatureFlag.TEXT_CLEANER) require_normalization_gate() # --------------------------------------------------------------------------- # Header # --------------------------------------------------------------------------- st.title("✂️ Clean Text") st.caption( "Trim whitespace, fold smart quotes, strip invisible characters, and " "normalize line endings. Runs locally — your data never leaves this computer." ) # --------------------------------------------------------------------------- # File upload # --------------------------------------------------------------------------- uploaded = pickup_or_upload( label="Upload CSV or Excel file", key="textclean_file_upload", types=["csv", "tsv", "xlsx", "xls"], ) if uploaded is None: st.info("Upload a CSV, TSV, or Excel file to begin.") st.stop() @st.cache_data(show_spinner=False) def _read_uploaded(name: str, data: bytes) -> pd.DataFrame: """Read the uploaded bytes into a DataFrame, treating all cells as strings.""" suffix = Path(name).suffix.lower() bio = io.BytesIO(data) if suffix in (".xlsx", ".xls"): return pd.read_excel(bio, dtype=str, keep_default_na=False) # CSV / TSV — try utf-8 then utf-8-sig then latin-1 as a fallback for enc in ("utf-8", "utf-8-sig", "latin-1"): try: bio.seek(0) sep = "\t" if suffix == ".tsv" else "," return pd.read_csv( bio, dtype=str, keep_default_na=False, encoding=enc, sep=sep, on_bad_lines="warn", ) except UnicodeDecodeError: continue bio.seek(0) return pd.read_csv(bio, dtype=str, keep_default_na=False, encoding="latin-1") try: df = _read_uploaded(uploaded.name, uploaded.getvalue()) except UnicodeDecodeError as e: st.error( f"**Could not decode `{uploaded.name}`**\n\n" f"The file isn't UTF-8, UTF-8-with-BOM, or Latin-1.\n\n" f"_Underlying error: {e}_\n\n" f"Try re-saving the file as UTF-8 from the source application, " f"or convert it with `iconv -f -t utf-8`." ) st.stop() except Exception as e: from src.core.errors import format_for_user st.error( f"**Could not read `{uploaded.name}`**\n\n" f"```\n{format_for_user(e)}\n```" ) st.stop() st.subheader(f"Preview: {uploaded.name}") st.caption(f"{len(df)} rows, {len(df.columns)} columns") preview_show_hidden = st.toggle( "Show hidden characters in preview", value=True, help="Highlights NBSP, zero-width chars, smart quotes, and leading/trailing whitespace.", key="textclean_preview_show_hidden", ) if preview_show_hidden: render_hidden_aware_preview(df, n_rows=10) else: st.dataframe(df.head(10), use_container_width=True) st.divider() # --------------------------------------------------------------------------- # Options # --------------------------------------------------------------------------- st.subheader("Options") preset_label = st.radio( "Preset", ["excel-hygiene (recommended)", "minimal", "paranoid"], index=0, horizontal=True, help=( "excel-hygiene: trim, collapse whitespace, fold smart quotes, strip " "invisible chars, normalize line endings, NFC. " "minimal: only trim and collapse. " "paranoid: everything including NFKC compat fold (lossy)." ), ) preset_key = preset_label.split(" ", 1)[0] options = CleanOptions.from_preset(preset_key) with st.expander("Advanced options"): col_a, col_b = st.columns(2) with col_a: options.trim = st.checkbox("Trim leading/trailing whitespace", value=options.trim) options.collapse_whitespace = st.checkbox( "Collapse internal whitespace", value=options.collapse_whitespace, ) options.normalize_line_endings = st.checkbox( "Normalize line endings (\\r\\n → \\n)", value=options.normalize_line_endings, ) options.strip_control = st.checkbox( "Strip control characters", value=options.strip_control, ) options.strip_bom = st.checkbox("Strip BOM", value=options.strip_bom) with col_b: options.fold_smart_chars = st.checkbox( "Fold smart characters (curly quotes, em-dash, NBSP)", value=options.fold_smart_chars, ) options.strip_zero_width = st.checkbox( "Strip zero-width / invisible characters", value=options.strip_zero_width, ) options.nfc = st.checkbox("Unicode NFC normalization", value=options.nfc) options.nfkc = st.checkbox( "Unicode NFKC compat fold (lossy: ① → 1, fi → fi)", value=options.nfkc, ) st.markdown("**Scope**") string_cols = [ c for c in df.columns if pd.api.types.is_object_dtype(df[c]) or pd.api.types.is_string_dtype(df[c]) ] selected_cols = st.multiselect( "Columns to clean (default: all string columns)", options=list(df.columns), default=string_cols, ) skip_cols = st.multiselect( "Columns to skip even if they look like text", options=list(df.columns), default=[], ) options.columns = selected_cols if selected_cols else None options.skip_columns = list(skip_cols) st.markdown("**Case conversion**") case_global = st.selectbox( "Apply case conversion to selected columns", ["None", "UPPER", "lower", "Title", "Sentence"], index=0, ) case_map = { "UPPER": "upper", "lower": "lower", "Title": "title", "Sentence": "sentence", } if case_global != "None": options.case = case_map[case_global] # type: ignore[assignment] # --------------------------------------------------------------------------- # Run # --------------------------------------------------------------------------- st.divider() if st.button("Clean Text", type="primary", use_container_width=True): with st.spinner("Cleaning..."): try: result = clean_dataframe(df, options) except ValueError as e: st.error(str(e)) st.stop() st.session_state["textclean_result"] = result st.session_state["textclean_input_name"] = uploaded.name result = st.session_state.get("textclean_result") if result is None: st.stop() # --------------------------------------------------------------------------- # Results # --------------------------------------------------------------------------- st.subheader("Results") pct = (result.cells_changed / result.cells_total * 100.0) if result.cells_total else 0.0 m1, m2, m3, m4 = st.columns(4) m1.metric("Cells scanned", result.cells_total) m2.metric("Cells changed", result.cells_changed) m3.metric("% changed", f"{pct:.1f}%") m4.metric("Columns processed", len(result.columns_processed)) if result.cells_changed: counts = result.changes["column"].value_counts() st.markdown("**Changes by column**") st.dataframe( counts.rename("cells_changed").to_frame(), use_container_width=True, ) st.markdown("**Examples (first 25 changes)**") show_hidden = st.toggle( "Show hidden characters (NBSP, ZWSP, smart quotes, control chars…)", value=True, help=( "Highlights characters the cleaner is removing or replacing. " "Hover any badge to see the codepoint and label." ), key="textclean_show_hidden", ) examples = result.changes.head(25).copy() examples["row"] = examples["row"] + 1 if show_hidden: # Inject the badge CSS once, then render an HTML table so the # invisibles in old/new are actually visible to the user. st.markdown(hidden_char_css(), unsafe_allow_html=True) rows_html = [] for _, row in examples.iterrows(): rows_html.append( "" f"{row['row']}" f"{visualize_hidden_html(str(row['column']))}" f"{visualize_hidden_html(str(row['old']))}" f"{visualize_hidden_html(str(row['new']))}" f"{row['ops_applied']}" "" ) st.markdown( "" "" "" "" "" "" "" "" f"{''.join(rows_html)}" "
RowColumnBeforeAfterOps applied
" "", unsafe_allow_html=True, ) else: st.dataframe(examples, use_container_width=True, hide_index=True) st.markdown("**Cleaned preview (first 10 rows)**") # Reuse the same toggle the Examples table uses so the user controls both # the changes audit and the cleaned preview with one switch. if show_hidden: render_hidden_aware_preview(result.cleaned_df, n_rows=10) else: st.dataframe(result.cleaned_df.head(10), use_container_width=True) # --------------------------------------------------------------------------- # Downloads # --------------------------------------------------------------------------- st.divider() stem = Path(st.session_state.get("textclean_input_name", "input")).stem dl_a, dl_b, dl_c = st.columns(3) with dl_a: cleaned_bytes = result.cleaned_df.to_csv(index=False).encode("utf-8-sig") st.download_button( "Download cleaned CSV", data=cleaned_bytes, file_name=f"{stem}_cleaned.csv", mime="text/csv", ) with dl_b: if not result.changes.empty: changes_bytes = result.changes.to_csv(index=False).encode("utf-8-sig") st.download_button( "Download changes audit", data=changes_bytes, file_name=f"{stem}_changes.csv", mime="text/csv", ) with dl_c: config_bytes = json.dumps(options.to_dict(), indent=2).encode("utf-8") st.download_button( "Download config JSON", data=config_bytes, file_name="text_clean_config.json", mime="application/json", ) st.divider() st.caption("Runs locally. Your data never leaves this computer. | DataTools v3.0")