"""CLI for the DataTools text cleaner (script 02). Usage: python -m src.cli_text_clean input.csv # dry-run preview python -m src.cli_text_clean input.csv --apply # write cleaned file python -m src.cli_text_clean input.csv --preset minimal --apply python -m src.cli_text_clean input.csv --case upper:name --apply python -m src.cli_text_clean --help # full help """ from __future__ import annotations import sys from datetime import datetime from pathlib import Path from typing import Optional import typer from loguru import logger app = typer.Typer( name="text-clean", help=( "Clean and normalize text content in CSV and Excel files.\n\n" "By default, runs in preview mode — shows what would change without " "modifying anything. Add --apply to write the output.\n\n" "Examples:\n\n" " # Preview what would change\n" " python -m src.cli_text_clean messy.csv\n\n" " # Apply the safe defaults (excel-hygiene preset)\n" " python -m src.cli_text_clean messy.csv --apply\n\n" " # Minimal: only trim and collapse whitespace\n" " python -m src.cli_text_clean messy.csv --preset minimal --apply\n\n" " # Title-case the 'name' column, leave others alone for case\n" " python -m src.cli_text_clean people.csv --case title:name --apply\n\n" " # Clean only specific columns\n" " python -m src.cli_text_clean orders.csv --columns vendor,product --apply\n\n" " # Skip a free-text column from cleaning\n" " python -m src.cli_text_clean tickets.csv --skip notes --apply\n" ), add_completion=False, no_args_is_help=True, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _setup_logging(log_dir: Path) -> Path: """Configure loguru to write a timestamped log file. Returns the log path.""" log_dir.mkdir(parents=True, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") log_path = log_dir / f"text_clean_{ts}.log" logger.remove() logger.add(sys.stderr, level="WARNING", format="{message}") logger.add( str(log_path), level="DEBUG", format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {message}", ) return log_path def _parse_case(raw: Optional[str]) -> tuple[Optional[str], dict[str, str]]: """Parse --case argument. Forms: --case upper -> ("upper", {}) (apply to all selected) --case title:name -> (None, {"name": "title"}) --case upper:code,title:name -> (None, {...}) """ if not raw: return None, {} if ":" not in raw: # Bare mode applies to all selected columns return raw.strip(), {} per_col: dict[str, str] = {} for piece in raw.split(","): piece = piece.strip() if not piece: continue if ":" not in piece: raise typer.BadParameter( f"Invalid --case piece: '{piece}'. " f"Expected 'mode' or 'mode:col[,mode:col...]' " f"(e.g., 'upper' or 'title:name,upper:code')." ) mode, col = piece.split(":", 1) per_col[col.strip()] = mode.strip() return None, per_col def _split_csv_arg(raw: Optional[str]) -> Optional[list[str]]: if raw is None: return None return [c.strip() for c in raw.split(",") if c.strip()] # --------------------------------------------------------------------------- # Main command # --------------------------------------------------------------------------- @app.command() def clean( input_file: str = typer.Argument( ..., help="Path to the CSV or Excel file to clean.", ), output: Optional[str] = typer.Option( None, "--output", "-o", help="Output file path. Default: {input}_cleaned.csv", ), apply: bool = typer.Option( False, "--apply", help="Write the output files. Without this flag, only a preview is shown.", ), preset: str = typer.Option( "excel-hygiene", "--preset", help="Preset: minimal, excel-hygiene, or paranoid.", ), columns: Optional[str] = typer.Option( None, "--columns", help="Comma-separated columns to clean (default: all string columns).", ), skip: Optional[str] = typer.Option( None, "--skip", help="Comma-separated columns to skip even if they look like text.", ), case: Optional[str] = typer.Option( None, "--case", help=( "Case conversion. Bare mode 'upper'|'lower'|'title'|'sentence' applies to " "all selected columns. Per-column form: 'mode:col[,mode:col]' " "(e.g., 'title:name,upper:code')." ), ), no_trim: bool = typer.Option(False, "--no-trim", help="Disable whitespace trim."), no_collapse: bool = typer.Option( False, "--no-collapse", help="Disable internal whitespace collapse.", ), no_nfc: bool = typer.Option(False, "--no-nfc", help="Disable Unicode NFC normalization."), nfkc: bool = typer.Option( False, "--nfkc", help="Enable NFKC compat fold (lossy: ① → 1, fi → fi). Default off.", ), no_smart_chars: bool = typer.Option( False, "--no-smart-chars", help="Disable smart-character folding (curly quotes, em/en-dash, NBSP).", ), no_zero_width: bool = typer.Option( False, "--no-zero-width", help="Disable zero-width / invisible char strip.", ), no_bom: bool = typer.Option(False, "--no-bom", help="Disable BOM strip."), no_control: bool = typer.Option( False, "--no-control", help="Disable control-character strip.", ), no_line_endings: bool = typer.Option( False, "--no-line-endings", help="Disable line-ending normalization.", ), full_changelog: bool = typer.Option( False, "--full-changelog", help="Write every cell change to the audit CSV (default caps to first 1000).", ), config: Optional[str] = typer.Option( None, "--config", help="Load options from a saved JSON config file.", ), save_config: Optional[str] = typer.Option( None, "--save-config", help="Save current options to a JSON config file.", ), sheet: Optional[str] = typer.Option( None, "--sheet", help="Excel sheet name or index (default: first sheet).", ), encoding_override: Optional[str] = typer.Option( None, "--encoding", help="Override auto-detected file encoding.", ), header_row: Optional[int] = typer.Option( None, "--header-row", help="0-based row index for the header (default: auto-detect).", ), ): """Clean and normalize text in a CSV or Excel file.""" from src.core.io import read_file, write_file from src.core.text_clean import ( CleanOptions, PRESETS, clean_dataframe, ) import pandas as pd # ------------------------------------------------------------------ # Validate inputs # ------------------------------------------------------------------ input_path = Path(input_file) if not input_path.exists(): typer.echo(f"Error: File not found: {input_path}", err=True) raise typer.Exit(1) if preset not in PRESETS: typer.echo( f"Error: Unknown preset '{preset}'. " f"Choose from: {', '.join(sorted(PRESETS))}.", err=True, ) raise typer.Exit(1) log_path = _setup_logging(Path("logs")) # ------------------------------------------------------------------ # Build CleanOptions # ------------------------------------------------------------------ if config: cfg_path = Path(config) if not cfg_path.exists(): typer.echo(f"Error: Config file not found: {cfg_path}", err=True) raise typer.Exit(1) options = CleanOptions.from_file(cfg_path) logger.info("Loaded config from {}", cfg_path) else: options = CleanOptions.from_preset(preset) # CLI overrides on top of preset/config if no_trim: options.trim = False if no_collapse: options.collapse_whitespace = False if no_nfc: options.nfc = False if nfkc: options.nfkc = True if no_smart_chars: options.fold_smart_chars = False if no_zero_width: options.strip_zero_width = False if no_bom: options.strip_bom = False if no_control: options.strip_control = False if no_line_endings: options.normalize_line_endings = False cols_list = _split_csv_arg(columns) if cols_list is not None: options.columns = cols_list skip_list = _split_csv_arg(skip) if skip_list: options.skip_columns = skip_list bare_case, per_col_case = _parse_case(case) if bare_case: options.case = bare_case # type: ignore[assignment] if per_col_case: options.case_columns = {**options.case_columns, **per_col_case} # type: ignore[dict-item] # ------------------------------------------------------------------ # Save config if requested (after CLI merge so the file reflects intent) # ------------------------------------------------------------------ if save_config: saved = options.to_file(save_config) typer.echo(f"Config saved to {saved}") # ------------------------------------------------------------------ # Read input # ------------------------------------------------------------------ typer.echo(f"Reading {input_path.name}...") try: sheet_arg: str | int | None = None if sheet is not None: try: sheet_arg = int(sheet) except ValueError: sheet_arg = sheet df = read_file( input_path, encoding=encoding_override, header_row=header_row, sheet_name=sheet_arg if sheet_arg is not None else 0, # Bypass byte-level repair so the user's preset/flag choices # remain authoritative. The cell-level cleaner does the # smart-quote / NUL / BOM work itself. repair=False, ) if not isinstance(df, pd.DataFrame): df = pd.concat(list(df), ignore_index=True) except Exception as e: typer.echo(f"Error reading file: {e}", err=True) raise typer.Exit(1) typer.echo(f" {len(df)} rows, {len(df.columns)} columns") # ------------------------------------------------------------------ # Run pipeline # ------------------------------------------------------------------ typer.echo("Cleaning text...") try: result = clean_dataframe(df, options) except ValueError as e: typer.echo(f"Error: {e}", err=True) raise typer.Exit(1) _print_results(result, input_path, options) # ------------------------------------------------------------------ # Write output # ------------------------------------------------------------------ if apply: stem = input_path.stem out_path = Path(output) if output else input_path.parent / f"{stem}_cleaned.csv" write_file(result.cleaned_df, out_path) typer.echo(f"\nCleaned file: {out_path}") if not result.changes.empty: changes_path = input_path.parent / f"{stem}_changes.csv" audit_df = result.changes cap = 1000 if not full_changelog and len(audit_df) > cap: typer.echo( f"Note: changelog capped at {cap} rows. " f"Use --full-changelog to write all {len(audit_df)} changes." ) audit_df = audit_df.head(cap) write_file(audit_df, changes_path) typer.echo(f"Changes audit: {changes_path}") else: typer.echo("\nThis was a preview. Add --apply to write the output files.") typer.echo(f"Log: {log_path}") # --------------------------------------------------------------------------- # Output formatting # --------------------------------------------------------------------------- def _print_results(result, input_path: Path, options) -> None: pct = (result.cells_changed / result.cells_total * 100.0) if result.cells_total else 0.0 typer.echo(f"\n{'─'*50}") typer.echo(f" File: {input_path.name}") typer.echo(f" Columns processed: {len(result.columns_processed)}") typer.echo(f" Cells scanned: {result.cells_total}") typer.echo(f" Cells changed: {result.cells_changed} ({pct:.1f}%)") typer.echo(f"{'─'*50}") if result.cells_changed and not result.changes.empty: # Per-column change counts counts = result.changes["column"].value_counts() typer.echo("\nChanges by column:") for col, n in counts.head(10).items(): typer.echo(f" {col}: {n} cell(s)") if len(counts) > 10: typer.echo(f" ... and {len(counts) - 10} more columns") # Show first few examples typer.echo("\nFirst examples:") for _, row in result.changes.head(5).iterrows(): old = repr(row["old"])[:40] new = repr(row["new"])[:40] typer.echo( f" Row {row['row'] + 1}, {row['column']}: {old} → {new} " f"[{row['ops_applied']}]" ) # --------------------------------------------------------------------------- # __main__ # --------------------------------------------------------------------------- def main(): app() if __name__ == "__main__": main()