Files
datatools-dev/src/cli_text_clean.py
Michael d32b58e61a feat(license): add Lite SKU; remove user-facing free trial
Two coupled changes:

1. Lite tier
   - New Tier.LITE in src/license/schema.py.
   - FEATURES_BY_TIER[Tier.LITE] = {Deduplicator, Text Cleaner,
     Format Standardizer}. The three universally-useful tools that
     cover the most common bookkeeping / RevOps / Klaviyo prep
     workflows. Other six tools require Core.
   - i18n: license.tier_lite, license.feature_locked_title,
     license.feature_locked_body, license.upgrade_link,
     license.status_locked (en + es).
   - Per-tool feature gate at every GUI tool page
     (require_feature_or_render_upgrade) and every tool CLI
     (guard(feature=...)). A locked tool renders an upgrade
     prompt + Manage-license button (GUI) or exits with code 2
     (CLI).
   - Home grid: tool cards the user's tier doesn't unlock get a
     red 🔒 Locked badge in place of green Ready.

2. Trial removed
   - Activation form's "Start 1-year trial" button removed.
   - license_cli's `trial` subcommand removed.
   - activation.trial_button / activation.trial_help i18n keys
     dropped (pack parity test stays green).
   - Tier.TRIAL stays in the enum (back-compat with any field-
     tested trial licenses); LicenseManager._mint stays internal
     for tests and the seller's key generator.
   - Decision logged in DECISIONS §9b: a 1-year all-features
     trial undercuts paid Lite; paid-only keeps tier economics
     clean.

Tests (+29 net): +17 Lite-tier unit/guard tests + 13 Lite-tier
GUI tests + 1 trial-absent assertion - 2 trial CLI tests - 1
trial GUI button test. Total: 1995 → 2024.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 17:19:30 +00:00

381 lines
14 KiB
Python

"""CLI for the DataTools text cleaner (script 02).
Usage:
python -m src.cli_text_clean input.csv # dry-run preview
python -m src.cli_text_clean input.csv --apply # write cleaned file
python -m src.cli_text_clean input.csv --preset minimal --apply
python -m src.cli_text_clean input.csv --case upper:name --apply
python -m src.cli_text_clean --help # full help
"""
from __future__ import annotations
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
import typer
from loguru import logger
app = typer.Typer(
name="text-clean",
help=(
"Clean and normalize text content in CSV and Excel files.\n\n"
"By default, runs in preview mode — shows what would change without "
"modifying anything. Add --apply to write the output.\n\n"
"Examples:\n\n"
" # Preview what would change\n"
" python -m src.cli_text_clean messy.csv\n\n"
" # Apply the safe defaults (excel-hygiene preset)\n"
" python -m src.cli_text_clean messy.csv --apply\n\n"
" # Minimal: only trim and collapse whitespace\n"
" python -m src.cli_text_clean messy.csv --preset minimal --apply\n\n"
" # Title-case the 'name' column, leave others alone for case\n"
" python -m src.cli_text_clean people.csv --case title:name --apply\n\n"
" # Clean only specific columns\n"
" python -m src.cli_text_clean orders.csv --columns vendor,product --apply\n\n"
" # Skip a free-text column from cleaning\n"
" python -m src.cli_text_clean tickets.csv --skip notes --apply\n"
),
add_completion=False,
no_args_is_help=True,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _setup_logging(log_dir: Path) -> Path:
"""Configure loguru to write a timestamped log file. Returns the log path."""
log_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = log_dir / f"text_clean_{ts}.log"
logger.remove()
logger.add(sys.stderr, level="WARNING", format="{message}")
logger.add(
str(log_path),
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {message}",
)
return log_path
def _parse_case(raw: Optional[str]) -> tuple[Optional[str], dict[str, str]]:
"""Parse --case argument.
Forms:
--case upper -> ("upper", {}) (apply to all selected)
--case title:name -> (None, {"name": "title"})
--case upper:code,title:name -> (None, {...})
"""
if not raw:
return None, {}
if ":" not in raw:
# Bare mode applies to all selected columns
return raw.strip(), {}
per_col: dict[str, str] = {}
for piece in raw.split(","):
piece = piece.strip()
if not piece:
continue
if ":" not in piece:
raise typer.BadParameter(
f"Invalid --case piece: '{piece}'. "
f"Expected 'mode' or 'mode:col[,mode:col...]' "
f"(e.g., 'upper' or 'title:name,upper:code')."
)
mode, col = piece.split(":", 1)
per_col[col.strip()] = mode.strip()
return None, per_col
def _split_csv_arg(raw: Optional[str]) -> Optional[list[str]]:
if raw is None:
return None
return [c.strip() for c in raw.split(",") if c.strip()]
# ---------------------------------------------------------------------------
# Main command
# ---------------------------------------------------------------------------
@app.command()
def clean(
input_file: str = typer.Argument(
...,
help="Path to the CSV or Excel file to clean.",
),
output: Optional[str] = typer.Option(
None, "--output", "-o",
help="Output file path. Default: {input}_cleaned.csv",
),
apply: bool = typer.Option(
False, "--apply",
help="Write the output files. Without this flag, only a preview is shown.",
),
preset: str = typer.Option(
"excel-hygiene", "--preset",
help="Preset: minimal, excel-hygiene, or paranoid.",
),
columns: Optional[str] = typer.Option(
None, "--columns",
help="Comma-separated columns to clean (default: all string columns).",
),
skip: Optional[str] = typer.Option(
None, "--skip",
help="Comma-separated columns to skip even if they look like text.",
),
case: Optional[str] = typer.Option(
None, "--case",
help=(
"Case conversion. Bare mode 'upper'|'lower'|'title'|'sentence' applies to "
"all selected columns. Per-column form: 'mode:col[,mode:col]' "
"(e.g., 'title:name,upper:code')."
),
),
no_trim: bool = typer.Option(False, "--no-trim", help="Disable whitespace trim."),
no_collapse: bool = typer.Option(
False, "--no-collapse", help="Disable internal whitespace collapse.",
),
no_nfc: bool = typer.Option(False, "--no-nfc", help="Disable Unicode NFC normalization."),
nfkc: bool = typer.Option(
False, "--nfkc",
help="Enable NFKC compat fold (lossy: ① → 1, fi → fi). Default off.",
),
no_smart_chars: bool = typer.Option(
False, "--no-smart-chars",
help="Disable smart-character folding (curly quotes, em/en-dash, NBSP).",
),
no_zero_width: bool = typer.Option(
False, "--no-zero-width", help="Disable zero-width / invisible char strip.",
),
no_bom: bool = typer.Option(False, "--no-bom", help="Disable BOM strip."),
no_control: bool = typer.Option(
False, "--no-control", help="Disable control-character strip.",
),
no_line_endings: bool = typer.Option(
False, "--no-line-endings", help="Disable line-ending normalization.",
),
full_changelog: bool = typer.Option(
False, "--full-changelog",
help="Write every cell change to the audit CSV (default caps to first 1000).",
),
config: Optional[str] = typer.Option(
None, "--config",
help="Load options from a saved JSON config file.",
),
save_config: Optional[str] = typer.Option(
None, "--save-config",
help="Save current options to a JSON config file.",
),
sheet: Optional[str] = typer.Option(
None, "--sheet",
help="Excel sheet name or index (default: first sheet).",
),
encoding_override: Optional[str] = typer.Option(
None, "--encoding",
help="Override auto-detected file encoding.",
),
header_row: Optional[int] = typer.Option(
None, "--header-row",
help="0-based row index for the header (default: auto-detect).",
),
):
"""Clean and normalize text in a CSV or Excel file."""
from src.core.io import read_file, write_file
from src.core.text_clean import (
CleanOptions,
PRESETS,
clean_dataframe,
)
import pandas as pd
# ------------------------------------------------------------------
# Validate inputs
# ------------------------------------------------------------------
input_path = Path(input_file)
if not input_path.exists():
typer.echo(f"Error: File not found: {input_path}", err=True)
raise typer.Exit(1)
if preset not in PRESETS:
typer.echo(
f"Error: Unknown preset '{preset}'. "
f"Choose from: {', '.join(sorted(PRESETS))}.",
err=True,
)
raise typer.Exit(1)
log_path = _setup_logging(Path("logs"))
# ------------------------------------------------------------------
# Build CleanOptions
# ------------------------------------------------------------------
if config:
cfg_path = Path(config)
if not cfg_path.exists():
typer.echo(f"Error: Config file not found: {cfg_path}", err=True)
raise typer.Exit(1)
options = CleanOptions.from_file(cfg_path)
logger.info("Loaded config from {}", cfg_path)
else:
options = CleanOptions.from_preset(preset)
# CLI overrides on top of preset/config
if no_trim:
options.trim = False
if no_collapse:
options.collapse_whitespace = False
if no_nfc:
options.nfc = False
if nfkc:
options.nfkc = True
if no_smart_chars:
options.fold_smart_chars = False
if no_zero_width:
options.strip_zero_width = False
if no_bom:
options.strip_bom = False
if no_control:
options.strip_control = False
if no_line_endings:
options.normalize_line_endings = False
cols_list = _split_csv_arg(columns)
if cols_list is not None:
options.columns = cols_list
skip_list = _split_csv_arg(skip)
if skip_list:
options.skip_columns = skip_list
bare_case, per_col_case = _parse_case(case)
if bare_case:
options.case = bare_case # type: ignore[assignment]
if per_col_case:
options.case_columns = {**options.case_columns, **per_col_case} # type: ignore[dict-item]
# ------------------------------------------------------------------
# Save config if requested (after CLI merge so the file reflects intent)
# ------------------------------------------------------------------
if save_config:
saved = options.to_file(save_config)
typer.echo(f"Config saved to {saved}")
# ------------------------------------------------------------------
# Read input
# ------------------------------------------------------------------
typer.echo(f"Reading {input_path.name}...")
try:
sheet_arg: str | int | None = None
if sheet is not None:
try:
sheet_arg = int(sheet)
except ValueError:
sheet_arg = sheet
df = read_file(
input_path,
encoding=encoding_override,
header_row=header_row,
sheet_name=sheet_arg if sheet_arg is not None else 0,
# Bypass byte-level repair so the user's preset/flag choices
# remain authoritative. The cell-level cleaner does the
# smart-quote / NUL / BOM work itself.
repair=False,
)
if not isinstance(df, pd.DataFrame):
df = pd.concat(list(df), ignore_index=True)
except Exception as e:
typer.echo(f"Error reading file: {e}", err=True)
raise typer.Exit(1)
typer.echo(f" {len(df)} rows, {len(df.columns)} columns")
# ------------------------------------------------------------------
# Run pipeline
# ------------------------------------------------------------------
typer.echo("Cleaning text...")
try:
result = clean_dataframe(df, options)
except ValueError as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
_print_results(result, input_path, options)
# ------------------------------------------------------------------
# Write output
# ------------------------------------------------------------------
if apply:
stem = input_path.stem
out_path = Path(output) if output else input_path.parent / f"{stem}_cleaned.csv"
write_file(result.cleaned_df, out_path)
typer.echo(f"\nCleaned file: {out_path}")
if not result.changes.empty:
changes_path = input_path.parent / f"{stem}_changes.csv"
audit_df = result.changes
cap = 1000
if not full_changelog and len(audit_df) > cap:
typer.echo(
f"Note: changelog capped at {cap} rows. "
f"Use --full-changelog to write all {len(audit_df)} changes."
)
audit_df = audit_df.head(cap)
write_file(audit_df, changes_path)
typer.echo(f"Changes audit: {changes_path}")
else:
typer.echo("\nThis was a preview. Add --apply to write the output files.")
typer.echo(f"Log: {log_path}")
# ---------------------------------------------------------------------------
# Output formatting
# ---------------------------------------------------------------------------
def _print_results(result, input_path: Path, options) -> None:
pct = (result.cells_changed / result.cells_total * 100.0) if result.cells_total else 0.0
typer.echo(f"\n{''*50}")
typer.echo(f" File: {input_path.name}")
typer.echo(f" Columns processed: {len(result.columns_processed)}")
typer.echo(f" Cells scanned: {result.cells_total}")
typer.echo(f" Cells changed: {result.cells_changed} ({pct:.1f}%)")
typer.echo(f"{''*50}")
if result.cells_changed and not result.changes.empty:
# Per-column change counts
counts = result.changes["column"].value_counts()
typer.echo("\nChanges by column:")
for col, n in counts.head(10).items():
typer.echo(f" {col}: {n} cell(s)")
if len(counts) > 10:
typer.echo(f" ... and {len(counts) - 10} more columns")
# Show first few examples
typer.echo("\nFirst examples:")
for _, row in result.changes.head(5).iterrows():
old = repr(row["old"])[:40]
new = repr(row["new"])[:40]
typer.echo(
f" Row {row['row'] + 1}, {row['column']}: {old}{new} "
f"[{row['ops_applied']}]"
)
# ---------------------------------------------------------------------------
# __main__
# ---------------------------------------------------------------------------
def main():
from src.cli_license_guard import guard
from src.license import FeatureFlag
guard(feature=FeatureFlag.TEXT_CLEANER.value)
app()
if __name__ == "__main__":
main()