@@ -1,711 +0,0 @@
""" Review & normalize gate page.
Sits between the home-page upload and every tool page. Walks the user
through every analyzer finding, lets them auto-fix, preview, customize,
or skip each one, and produces a :class:`NormalizationResult` stashed in
session state. Tool pages refuse to load until this gate has passed.
State contract
--------------
Session state read:
* ``home_uploaded_bytes`` / ``home_uploaded_name`` — current upload.
* ``home_findings`` — list of :class:`Finding` from the home-page scan.
* ``review_decisions`` — dict[finding_id, Decision]; user ' s choices so far.
Session state written:
* ``review_decisions`` — updated as the user flips controls.
* ``normalization_result`` — :class:`NormalizationResult` after Apply.
* ``normalization_for`` — content hash of the upload the result is for.
"""
from __future__ import annotations
import hashlib
import io
import sys
from pathlib import Path
from typing import Optional
import pandas as pd
import streamlit as st
# Project root on sys.path (mirrors app.py).
_project_root = Path ( __file__ ) . resolve ( ) . parent . parent . parent . parent
if str ( _project_root ) not in sys . path :
sys . path . insert ( 0 , str ( _project_root ) )
from src . core . analyze import Finding , analyze
from src . core . fixes import get_fix
from src . core . io import detect_encoding , repair_bytes
from src . core . normalize import (
Decision ,
NormalizationResult ,
apply_decisions ,
auto_fix ,
gate_summary ,
is_normalized ,
)
from src . gui . components import hide_streamlit_chrome
# Common single-byte and multi-byte encodings the user might pick to
# correct a misdetection. Ordered by frequency in real-world Western /
# multilingual data; keep the list short — too many options just adds
# noise. The user can type a custom encoding via the "Other" entry.
_OVERRIDE_ENCODINGS = [
" (detected) " ,
" utf-8 " ,
" utf-8-sig " ,
" cp1252 " ,
" iso-8859-1 " ,
" iso-8859-15 " ,
" cp1250 " ,
" iso-8859-2 " ,
" cp1251 " ,
" koi8-r " ,
" mac-roman " ,
" shift_jis " ,
" cp932 " ,
" gb18030 " ,
" big5 " ,
" euc-kr " ,
" cp949 " ,
" utf-16 " ,
" utf-16-le " ,
" utf-16-be " ,
" Other… " ,
]
st . set_page_config ( page_title = " Review & Normalize " , page_icon = " 🛡️ " , layout = " wide " )
hide_streamlit_chrome ( )
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _upload_hash ( ) - > Optional [ str ] :
data = st . session_state . get ( " home_uploaded_bytes " )
if not data :
return None
return hashlib . sha256 ( data ) . hexdigest ( )
def _detected_encoding_for_session ( ) - > Optional [ str ] :
""" Run charset detection on the session bytes via a tmp file. """
data = st . session_state . get ( " home_uploaded_bytes " )
name = st . session_state . get ( " home_uploaded_name " ) or " tmp.csv "
if not data :
return None
import tempfile
suffix = " . " + name . rsplit ( " . " , 1 ) [ - 1 ] if " . " in name else " .csv "
with tempfile . NamedTemporaryFile ( suffix = suffix , delete = False ) as fh :
fh . write ( data )
tmp_path = Path ( fh . name )
try :
return detect_encoding ( tmp_path )
finally :
tmp_path . unlink ( missing_ok = True )
def _load_df_from_session ( encoding_override : Optional [ str ] = None ) - > Optional [ pd . DataFrame ] :
""" Re-parse the session upload through the same pipeline the home page
uses, so the review page operates on identical bytes.
When *encoding_override* is set, decode with that encoding instead of
UTF-8. The override flows into ``repair_bytes`` so the wide-encoding
transcode and decode_replaced fallback both honor the user ' s choice.
"""
data = st . session_state . get ( " home_uploaded_bytes " )
name = st . session_state . get ( " home_uploaded_name " ) or " "
if not data :
return None
suffix = name . rsplit ( " . " , 1 ) [ - 1 ] . lower ( ) if " . " in name else " "
if suffix in ( " xlsx " , " xls " ) :
return pd . read_excel ( io . BytesIO ( data ) , dtype = str , keep_default_na = False )
delim = " \t " if suffix == " tsv " else " , "
if delim == " , " :
head = data [ : 4096 ] . decode ( " utf-8 " , errors = " replace " )
for cand in ( " \t " , " ; " , " | " ) :
if head . count ( cand ) > head . count ( " , " ) * 1.5 :
delim = cand
break
enc = encoding_override or " utf-8 "
repair = repair_bytes ( data , encoding = enc , delimiter = delim )
return pd . read_csv (
io . BytesIO ( repair . repaired_bytes ) ,
encoding = " utf-8 " , delimiter = delim ,
dtype = str , keep_default_na = False , on_bad_lines = " warn " ,
)
def _run_analysis_with_override ( encoding_override : Optional [ str ] ) - > list [ Finding ] :
""" Re-run analyze() on the session upload with an encoding override.
Mirrors components._run_analysis_on_upload but writes the bytes to a
tempfile so analyze() goes through the path-based loader (which is
where the encoding_override hook lives — DataFrame-mode analysis has
nothing to override).
"""
data = st . session_state . get ( " home_uploaded_bytes " )
name = st . session_state . get ( " home_uploaded_name " ) or " tmp.csv "
if not data :
return [ ]
import tempfile
suffix = " . " + name . rsplit ( " . " , 1 ) [ - 1 ] if " . " in name else " .csv "
with tempfile . NamedTemporaryFile ( suffix = suffix , delete = False ) as fh :
fh . write ( data )
tmp_path = Path ( fh . name )
try :
return analyze ( tmp_path , encoding_override = encoding_override )
finally :
tmp_path . unlink ( missing_ok = True )
def _confidence_pill ( c : str ) - > str :
""" Streamlit-markdown pill for the confidence tier. """
palette = { " high " : " green " , " medium " : " orange " , " low " : " red " }
return f " : { palette . get ( c , ' gray ' ) } -background[** { c . upper ( ) } **] "
def _severity_pill ( s : str ) - > str :
palette = { " info " : " blue " , " warn " : " orange " , " error " : " red " }
return f " : { palette . get ( s , ' gray ' ) } -background[** { s } **] "
# ---------------------------------------------------------------------------
# Output options (Advanced — re-encode the cleaned DataFrame for download)
# ---------------------------------------------------------------------------
# (label_shown_to_user, codec_passed_to_pandas)
_OUTPUT_ENCODINGS = [
( " UTF-8 (recommended) " , " utf-8 " ) ,
( " UTF-8 with BOM (Excel) " , " utf-8-sig " ) ,
( " Windows-1252 (Western Europe) " , " cp1252 " ) ,
( " ISO-8859-1 / Latin-1 " , " iso-8859-1 " ) ,
( " ISO-8859-15 / Latin-9 " , " iso-8859-15 " ) ,
( " Windows-1250 (Central Europe) " , " cp1250 " ) ,
( " ISO-8859-2 / Latin-2 " , " iso-8859-2 " ) ,
( " Windows-1251 (Cyrillic) " , " cp1251 " ) ,
( " Shift_JIS (Japanese) " , " shift_jis " ) ,
( " GB18030 (Chinese) " , " gb18030 " ) ,
( " Big5 (Traditional Chinese) " , " big5 " ) ,
( " EUC-KR (Korean) " , " euc-kr " ) ,
( " UTF-16 LE with BOM " , " utf-16 " ) ,
]
_OUTPUT_DELIMITERS = [
( " Comma , " , " , " ) ,
( " Tab \\ t " , " \t " ) ,
( " Semicolon ; " , " ; " ) ,
( " Pipe | " , " | " ) ,
]
_OUTPUT_LINE_TERMINATORS = [
( " LF — \\ n (Unix / web / git default) " , " \n " ) ,
( " CRLF — \\ r \\ n (Windows / classic Excel) " , " \r \n " ) ,
( " CR — \\ r (classic Mac, very rare) " , " \r " ) ,
]
def _build_output_bytes (
df : pd . DataFrame ,
* ,
encoding : str ,
delimiter : str ,
line_terminator : str ,
) - > tuple [ bytes , Optional [ str ] ] :
""" Serialize *df* with the user ' s output options.
Returns ``(bytes, error_message)``. ``error_message`` is non-None when
the chosen encoding cannot represent at least one cell — characters
that don ' t exist in the target codepage are replaced with ``?`` so
the user still gets a download, plus a warning telling them which
target was lossy.
"""
buf = io . StringIO ( )
df . to_csv ( buf , index = False , sep = delimiter , lineterminator = line_terminator )
text = buf . getvalue ( )
try :
return text . encode ( encoding ) , None
except UnicodeEncodeError :
# Find the first character that fails so the message is useful.
bad : Optional [ str ] = None
for ch in text :
try :
ch . encode ( encoding )
except UnicodeEncodeError :
bad = ch
break
msg = (
f " Some characters cannot be represented in { encoding } "
+ ( f " (first offender: { bad !r} ) " if bad else " " )
+ " . Falling back to ' ? ' replacement; non-Latin content will be lost. "
)
return text . encode ( encoding , errors = " replace " ) , msg
def _preview_table ( f : Finding , decision_action : str , payload : Optional [ dict ] ) - > Optional [ pd . DataFrame ] :
""" Build a before/after preview from finding samples.
Runs the registered fix function on each sample value individually so
the user sees exactly what would change. Returns None when no preview
is meaningful (no samples, or no fix registered).
"""
if not f . samples :
return None
fix_fn = get_fix ( f . fix_action )
if fix_fn is None :
# No fix to preview; show samples as-is.
return pd . DataFrame (
[ { " row " : r , " column " : c , " value " : v } for r , c , v in f . samples ]
)
rows = [ ]
for r , col , val in f . samples :
# Run the fix on a tiny single-cell DataFrame so payload semantics
# (e.g. lowercase_email's column targeting) are honored.
mini = pd . DataFrame ( { col : [ val ] } )
try :
new_df , _ = fix_fn ( mini , payload )
new_val = new_df [ col ] . iloc [ 0 ]
except Exception as e :
new_val = f " <preview error: { e } > "
rows . append ( { " row " : r , " column " : col , " before " : val , " after " : new_val } )
return pd . DataFrame ( rows )
# ---------------------------------------------------------------------------
# Page body
# ---------------------------------------------------------------------------
st . title ( " 🛡️ Review & Normalize " )
st . caption (
" Every finding is shown below with the algorithm that would fix it. "
" Auto-fix the high-confidence ones in one click; preview or customize "
" the rest before applying. "
)
# Pre-flight: if nothing has been uploaded yet, let the user upload
# directly from this page instead of bouncing them back to the home
# screen. Once a file is picked, we auto-run the analyzer (the user is
# already on the Review page — they've implicitly committed to a scan),
# stash the result, and rerun so the rest of the page picks it up.
findings : list [ Finding ] = st . session_state . get ( " home_findings " ) or [ ]
upload_name = st . session_state . get ( " home_uploaded_name " )
if not upload_name :
st . info (
" Upload a CSV or Excel file to begin reviewing. The analyzer runs "
" locally and your data never leaves this computer. "
)
review_upload = st . file_uploader (
" Choose a file " ,
type = [ " csv " , " tsv " , " xlsx " , " xls " ] ,
key = " review_upload " ,
help = " Drag-and-drop or browse for a CSV, TSV, or Excel file. " ,
)
if review_upload is None :
st . stop ( )
# New file → stash bytes + size + name, drop any stale state, then
# run the analyzer. The rerun at the bottom lets the rest of this
# page render with the upload in place.
same_file = (
st . session_state . get ( " home_uploaded_name " ) == review_upload . name
and st . session_state . get ( " home_uploaded_size " ) == review_upload . size
)
if not same_file :
st . session_state [ " home_uploaded_name " ] = review_upload . name
st . session_state [ " home_uploaded_size " ] = review_upload . size
st . session_state [ " home_uploaded_bytes " ] = review_upload . getvalue ( )
st . session_state . pop ( " home_findings " , None )
st . session_state . pop ( " home_skipped " , None )
st . session_state . pop ( " review_decisions " , None )
st . session_state . pop ( " normalization_result " , None )
st . session_state . pop ( " normalization_for " , None )
st . session_state . pop ( " encoding_override " , None )
if st . session_state . get ( " home_findings " ) is None :
with st . spinner ( " Analyzing… " ) :
st . session_state [ " home_findings " ] = _run_analysis_with_override ( None )
st . session_state [ " home_skipped " ] = False
st . rerun ( )
# ---- Encoding picker --------------------------------------------------------
#
# Charset detection misfires on small files, byte-equivalent codepages
# (cp1252 vs Latin-1 vs cp1250), and content where every byte happens to
# decode under the wrong encoding (KOI8-R bytes that look like Shift_JIS).
# When the user spots mojibake or U+FFFD chars in the findings list, this
# picker is the escape hatch — pick the right encoding, re-run the analyzer.
with st . container ( border = True ) :
detected_enc = _detected_encoding_for_session ( )
current_override = st . session_state . get ( " encoding_override " )
suffix = ( st . session_state . get ( " home_uploaded_name " ) or " " )
suffix = suffix . rsplit ( " . " , 1 ) [ - 1 ] . lower ( ) if " . " in suffix else " "
is_excel = suffix in ( " xlsx " , " xls " )
st . markdown ( " **File encoding** " )
if is_excel :
st . caption (
" Excel files store text as Unicode internally — encoding override "
" doesn ' t apply. Skip this section. "
)
else :
cap_parts = [ f " Detected: ` { detected_enc or ' unknown ' } ` " ]
if current_override :
cap_parts . append ( f " Currently using: ` { current_override } ` " )
st . caption (
" · " . join ( cap_parts )
+ " · Override only if you see mojibake (e.g. `é` for `é`) or U+FFFD "
" (`<60> `) in the findings below. "
)
col_pick , col_custom , col_apply = st . columns ( [ 2 , 2 , 1 ] )
with col_pick :
current_label = current_override or " (detected) "
try :
idx = _OVERRIDE_ENCODINGS . index ( current_label )
except ValueError :
idx = _OVERRIDE_ENCODINGS . index ( " Other… " )
chosen = st . selectbox (
" Encoding " ,
options = _OVERRIDE_ENCODINGS ,
index = idx ,
key = " encoding_override_select " ,
label_visibility = " collapsed " ,
)
custom_value : Optional [ str ] = None
with col_custom :
if chosen == " Other… " :
custom_value = st . text_input (
" Custom encoding (e.g. `cp1257`, `iso-8859-9`) " ,
value = current_override if current_override and current_override not in _OVERRIDE_ENCODINGS else " " ,
key = " encoding_override_custom " ,
label_visibility = " collapsed " ,
placeholder = " cp1257 " ,
)
with col_apply :
if st . button ( " Re-analyze " , use_container_width = True ) :
if chosen == " (detected) " :
new_override = None
elif chosen == " Other… " :
new_override = ( custom_value or " " ) . strip ( ) or None
else :
new_override = chosen
# Sanity-check the override actually decodes the bytes.
data = st . session_state . get ( " home_uploaded_bytes " ) or b " "
if new_override is not None :
try :
data . decode ( new_override , errors = " strict " )
decode_ok = True
decode_err = None
except ( UnicodeDecodeError , LookupError ) as e :
decode_ok = False
decode_err = str ( e )
else :
decode_ok = True
decode_err = None
if not decode_ok :
st . warning (
f " ` { new_override } ` cannot decode this file: { decode_err } . "
f " Re-running anyway with replacement-character fallback so "
f " you can see where the failures are. "
)
# Re-run analysis with the override and refresh session state.
st . session_state [ " encoding_override " ] = new_override
st . session_state [ " home_findings " ] = _run_analysis_with_override ( new_override )
# Drop any prior gate result; the user must re-apply.
st . session_state . pop ( " normalization_result " , None )
st . session_state . pop ( " normalization_for " , None )
st . session_state . pop ( " review_decisions " , None )
st . rerun ( )
# Reload findings — the picker above may have just rewritten them.
findings = st . session_state . get ( " home_findings " ) or [ ]
if not findings :
st . success ( " ✓ No findings to review. The file is already clean — open any tool to begin. " )
st . stop ( )
# ---- Top-line counters -------------------------------------------------------
n_high = sum ( 1 for f in findings if f . confidence == " high " and not f . pre_applied and f . fix_action )
n_medium = sum ( 1 for f in findings if f . confidence == " medium " and not f . pre_applied )
n_low = sum ( 1 for f in findings if f . confidence == " low " and not f . pre_applied )
n_pre = sum ( 1 for f in findings if f . pre_applied )
n_block = sum ( 1 for f in findings if f . severity == " error " )
c1 , c2 , c3 , c4 , c5 = st . columns ( 5 )
c1 . metric ( " High confidence " , n_high , help = " Round-trip safe — eligible for auto-fix. " )
c2 . metric ( " Medium " , n_medium , help = " Right call in the common case; preview before applying. " )
c3 . metric ( " Low " , n_low , help = " Heuristic — opt in only. " )
c4 . metric ( " Already applied " , n_pre , help = " Fixed during the read pass (BOM, NUL, line endings). " )
c5 . metric ( " Blocking " , n_block , help = " Severity = error; must be resolved or waived. " )
st . divider ( )
# ---- Top-level controls ------------------------------------------------------
decisions_state : dict = st . session_state . setdefault ( " review_decisions " , { } )
bar_left , bar_mid , bar_right = st . columns ( [ 1.2 , 1.2 , 3 ] )
with bar_left :
if st . button ( " ✨ Auto-fix high-confidence " , type = " primary " , use_container_width = True ) :
for f in findings :
if (
not f . pre_applied
and f . confidence == " high "
and f . fix_action
and get_fix ( f . fix_action ) is not None
) :
decisions_state [ f . id ] = Decision ( finding_id = f . id , action = " auto " )
st . rerun ( )
with bar_mid :
if st . button ( " Skip everything (not recommended) " , use_container_width = True ) :
for f in findings :
if not f . pre_applied :
decisions_state [ f . id ] = Decision ( finding_id = f . id , action = " skip " )
st . rerun ( )
# ---- Per-finding cards -------------------------------------------------------
# Sort: blocking first, then high (unfixed), medium, low, pre-applied.
def _sort_key ( f : Finding ) - > tuple :
severity_rank = { " error " : 0 , " warn " : 1 , " info " : 2 } [ f . severity ]
confidence_rank = { " high " : 0 , " medium " : 1 , " low " : 2 } [ f . confidence ]
return ( int ( f . pre_applied ) , severity_rank , confidence_rank , f . id )
for f in sorted ( findings , key = _sort_key ) :
decision = decisions_state . get ( f . id )
decision_action = decision . action if decision else (
" auto " if ( f . pre_applied or ( f . confidence == " high " and f . fix_action ) ) else " skip "
)
title_bits = [
_severity_pill ( f . severity ) ,
_confidence_pill ( f . confidence ) ,
f " ** { f . id } ** " ,
f " ( { f . count } ) " ,
]
if f . pre_applied :
title_bits . append ( " :gray-background[applied during read] " )
with st . expander ( " " . join ( title_bits ) , expanded = ( f . severity == " error " ) ) :
st . caption ( f . description )
if f . tool :
st . caption ( f " Owned by: ` { f . tool } ` " )
if f . pre_applied :
st . info ( " This was already applied during the file read pass — no decision needed. " )
continue
if not f . fix_action :
if f . severity == " error " :
st . error (
" Blocking finding with no auto-fix. Choose **Skip / waive** to "
" acknowledge and proceed (not recommended), or fix the file outside "
" DataTools and re-upload. "
)
else :
st . info ( " Informational only — no fix to apply. " )
# Decision radio
choice_labels = {
" auto " : " Auto-fix with our algorithm " ,
" skip " : " Skip / waive (no change) " ,
}
# Customize is offered for fixes that take a meaningful payload.
if f . fix_action in ( " replace_null_sentinels " , ) :
choice_labels [ " modified " ] = " Customize "
chosen = st . radio (
" Decision " ,
options = list ( choice_labels . keys ( ) ) ,
index = list ( choice_labels . keys ( ) ) . index ( decision_action )
if decision_action in choice_labels else 0 ,
format_func = lambda k : choice_labels [ k ] ,
key = f " decision_ { f . id } " ,
horizontal = True ,
)
# Customize payload editor (only for the modified action)
payload : Optional [ dict ] = None
if chosen == " modified " and f . fix_action == " replace_null_sentinels " :
default_sentinels = " , " . join ( sorted ( [
" n/a " , " na " , " nan " , " null " , " none " , " - " , " -- " , " tbd " , " unknown " ,
] ) )
text = st . text_area (
" Sentinels (comma-separated, case-insensitive): " ,
value = ( decision . payload or { } ) . get (
" sentinels_raw " , default_sentinels ,
) if decision else default_sentinels ,
key = f " sentinels_ { f . id } " ,
)
sentinels = [ s . strip ( ) for s in text . split ( " , " ) if s . strip ( ) ]
payload = { " sentinels " : sentinels , " sentinels_raw " : text }
# Persist
decisions_state [ f . id ] = Decision (
finding_id = f . id , action = chosen , payload = payload ,
)
# Preview
if chosen != " skip " and f . samples :
preview = _preview_table ( f , chosen , payload )
if preview is not None and not preview . empty :
st . markdown ( " **Preview** (showing up to 5 affected cells) " )
st . dataframe ( preview , use_container_width = True , hide_index = True )
st . divider ( )
# ---- Apply ------------------------------------------------------------------
bottom_left , bottom_mid , bottom_right = st . columns ( [ 1 , 1 , 3 ] )
with bottom_left :
apply_clicked = st . button (
" ✅ Apply & enter tools " , type = " primary " , use_container_width = True ,
disabled = not decisions_state ,
)
with bottom_mid :
reset_clicked = st . button ( " Reset all decisions " , use_container_width = True )
if reset_clicked :
st . session_state . pop ( " review_decisions " , None )
st . session_state . pop ( " normalization_result " , None )
st . session_state . pop ( " normalization_for " , None )
st . rerun ( )
if apply_clicked :
df = _load_df_from_session (
encoding_override = st . session_state . get ( " encoding_override " )
)
if df is None :
st . error ( " Could not re-read the uploaded file. Try re-uploading. " )
st . stop ( )
decisions_list = [ d for d in decisions_state . values ( ) if isinstance ( d , Decision ) ]
result = apply_decisions ( df , findings , decisions_list )
st . session_state [ " normalization_result " ] = result
st . session_state [ " normalization_for " ] = _upload_hash ( )
summary = gate_summary ( result )
if result . passed and is_normalized ( findings , result ) :
st . success (
f " ✓ Gate passed — { summary [ ' fixes_applied ' ] } fix(es) applied, "
f " { summary [ ' cells_changed ' ] } cell(s) changed. You can now open any tool. "
)
elif result . blocking_findings :
st . error (
f " Gate blocked by error-level findings: "
f " { ' , ' . join ( b . id for b in result . blocking_findings ) } . "
f " Resolve or waive them above before continuing. "
)
elif result . pending_findings :
st . warning (
f " Pending decisions remain on: "
f " { ' , ' . join ( f . id for f in result . pending_findings ) } . "
f " Choose Auto-fix or Skip for each before continuing. "
)
# Persisted summary (re-render on reload)
result : Optional [ NormalizationResult ] = st . session_state . get ( " normalization_result " )
if result is not None and st . session_state . get ( " normalization_for " ) == _upload_hash ( ) :
with st . expander ( " Audit log " ) :
if result . applied :
st . markdown ( " **Applied fixes** " )
st . dataframe (
pd . DataFrame ( [
{
" finding " : a . finding_id ,
" fix_action " : a . fix_action ,
" decision " : a . decision ,
" cells_changed " : a . cells_changed ,
}
for a in result . applied
] ) ,
use_container_width = True , hide_index = True ,
)
if result . skipped_findings :
st . markdown ( " **Skipped (waived by user)** " )
st . write ( [ f . id for f in result . skipped_findings ] )
if result . passed :
st . markdown ( " --- " )
st . markdown ( " **Download normalized file** " )
with st . expander ( " ⚙️ Advanced output options " ) :
st . caption (
" Defaults match what the analyzer normalized to: UTF-8, "
" comma-separated, LF line endings. Override only if your "
" destination tool requires a specific format. "
)
col_enc , col_delim , col_le = st . columns ( 3 )
with col_enc :
enc_choice = st . selectbox (
" Encoding (code page) " ,
options = [ label for label , _ in _OUTPUT_ENCODINGS ] ,
index = 0 ,
key = " output_encoding_select " ,
)
out_encoding = next (
codec for label , codec in _OUTPUT_ENCODINGS if label == enc_choice
)
with col_delim :
delim_choice = st . selectbox (
" Delimiter " ,
options = [ label for label , _ in _OUTPUT_DELIMITERS ] ,
index = 0 ,
key = " output_delim_select " ,
)
out_delim = next (
ch for label , ch in _OUTPUT_DELIMITERS if label == delim_choice
)
with col_le :
le_choice = st . selectbox (
" Line terminator " ,
options = [ label for label , _ in _OUTPUT_LINE_TERMINATORS ] ,
index = 0 ,
key = " output_le_select " ,
)
out_le = next (
ch for label , ch in _OUTPUT_LINE_TERMINATORS if label == le_choice
)
data , encode_warn = _build_output_bytes (
result . cleaned_df ,
encoding = out_encoding ,
delimiter = out_delim ,
line_terminator = out_le ,
)
if encode_warn :
st . warning ( encode_warn )
ext = " tsv " if out_delim == " \t " else " csv "
mime = " text/tab-separated-values " if out_delim == " \t " else " text/csv "
file_name = f " { Path ( upload_name ) . stem } .normalized. { ext } "
st . download_button (
f " ⬇️ Download { file_name } " ,
data = data ,
file_name = file_name ,
mime = mime ,
type = " primary " ,
)