Files
datatools-dev/src/cli.py
Michael d32b58e61a feat(license): add Lite SKU; remove user-facing free trial
Two coupled changes:

1. Lite tier
   - New Tier.LITE in src/license/schema.py.
   - FEATURES_BY_TIER[Tier.LITE] = {Deduplicator, Text Cleaner,
     Format Standardizer}. The three universally-useful tools that
     cover the most common bookkeeping / RevOps / Klaviyo prep
     workflows. Other six tools require Core.
   - i18n: license.tier_lite, license.feature_locked_title,
     license.feature_locked_body, license.upgrade_link,
     license.status_locked (en + es).
   - Per-tool feature gate at every GUI tool page
     (require_feature_or_render_upgrade) and every tool CLI
     (guard(feature=...)). A locked tool renders an upgrade
     prompt + Manage-license button (GUI) or exits with code 2
     (CLI).
   - Home grid: tool cards the user's tier doesn't unlock get a
     red 🔒 Locked badge in place of green Ready.

2. Trial removed
   - Activation form's "Start 1-year trial" button removed.
   - license_cli's `trial` subcommand removed.
   - activation.trial_button / activation.trial_help i18n keys
     dropped (pack parity test stays green).
   - Tier.TRIAL stays in the enum (back-compat with any field-
     tested trial licenses); LicenseManager._mint stays internal
     for tests and the seller's key generator.
   - Decision logged in DECISIONS §9b: a 1-year all-features
     trial undercuts paid Lite; paid-only keeps tier economics
     clean.

Tests (+29 net): +17 Lite-tier unit/guard tests + 13 Lite-tier
GUI tests + 1 trial-absent assertion - 2 trial CLI tests - 1
trial GUI button test. Total: 1995 → 2024.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 17:19:30 +00:00

510 lines
18 KiB
Python

"""CLI for the DataTools deduplicator.
Usage:
python -m src.cli input.csv # dry-run preview
python -m src.cli input.csv --apply # write deduplicated output
python -m src.cli input.csv --fuzzy name --merge # fuzzy match + merge
python -m src.cli --help # full help
"""
from __future__ import annotations
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
import typer
from loguru import logger
from rapidfuzz import process as rf_process
app = typer.Typer(
name="dedup",
help=(
"Find and remove duplicate rows in CSV and Excel files.\n\n"
"By default, runs in preview mode — shows what would change without "
"modifying anything. Add --apply to write the output.\n\n"
"Examples:\n\n"
" # Preview duplicates in a CSV file\n"
" python -m src.cli customers.csv\n\n"
" # Remove duplicates and save the result\n"
" python -m src.cli customers.csv --apply\n\n"
" # Fuzzy-match on the 'name' column with 80% threshold\n"
" python -m src.cli customers.csv --fuzzy name --threshold 80 --apply\n\n"
" # Match on specific columns only\n"
" python -m src.cli customers.csv --subset email,phone --apply\n\n"
" # Keep the most complete row and merge missing fields\n"
" python -m src.cli customers.csv --survivor most-complete --merge --apply\n"
),
add_completion=False,
no_args_is_help=True,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _setup_logging(log_dir: Path) -> Path:
"""Configure loguru to write a timestamped log file. Returns the log path."""
log_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = log_dir / f"dedup_{ts}.log"
logger.remove() # remove default stderr handler
logger.add(sys.stderr, level="WARNING", format="{message}")
logger.add(str(log_path), level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {message}")
return log_path
def _suggest_column(name: str, available: list[str]) -> str:
"""Return a helpful error message when a column is not found."""
cols_str = ", ".join(available)
matches = rf_process.extract(name, available, limit=1, score_cutoff=50)
if matches:
suggestion = matches[0][0]
return (
f"Column '{name}' not found. "
f"Available columns: {cols_str}. "
f"Did you mean '{suggestion}'?"
)
return f"Column '{name}' not found. Available columns: {cols_str}."
def _validate_columns(requested: list[str], available: list[str]) -> None:
"""Raise typer.BadParameter if any requested column doesn't exist."""
for col in requested:
if col not in available:
raise typer.BadParameter(_suggest_column(col, available))
def _parse_normalize_map(raw: Optional[str]) -> dict[str, str]:
"""Parse 'col:type,col:type' into a dict."""
if not raw:
return {}
result = {}
for pair in raw.split(","):
pair = pair.strip()
if ":" not in pair:
raise typer.BadParameter(
f"Invalid normalize format: '{pair}'. "
f"Expected 'column:type' (e.g., 'email:email,phone:phone')."
)
col, ntype = pair.split(":", 1)
result[col.strip()] = ntype.strip()
return result
def _interactive_review(group, df) -> Optional[bool]:
"""Side-by-side CLI review for a match group. Returns True/False/None."""
from src.core.dedup import MatchResult
group: MatchResult
print(f"\n{'='*60}")
print(f"Match Group {group.group_id + 1} — Confidence: {group.confidence:.1f}%")
print(f"Matched on: {', '.join(group.matched_on)}")
print(f"{'='*60}")
display_cols = [c for c in df.columns if not str(c).startswith("_norm_")]
for idx in group.row_indices:
print(f"\n Row {idx + 1}:")
for col in display_cols:
val = df.iloc[idx].get(col, "")
if str(val).strip():
print(f" {col}: {val}")
while True:
choice = input("\n [y] Merge [n] Keep both [s] Skip remaining: ").strip().lower()
if choice == "y":
return True
if choice == "n":
return False
if choice == "s":
return None
print(" Please enter y, n, or s.")
# ---------------------------------------------------------------------------
# Main command
# ---------------------------------------------------------------------------
@app.command()
def dedup(
input_file: str = typer.Argument(
...,
help="Path to the CSV or Excel file to deduplicate.",
),
output: Optional[str] = typer.Option(
None, "--output", "-o",
help="Output file path. Default: {input}_deduplicated.csv",
),
apply: bool = typer.Option(
False, "--apply",
help="Write the output file. Without this flag, only a preview is shown.",
),
key: Optional[str] = typer.Option(
None, "--key", "-k",
help="Comma-separated strong-key columns (e.g., 'fb_id,ein'). Each is an independent exact-match dedup key.",
),
subset: Optional[str] = typer.Option(
None, "--subset", "-s",
help="Comma-separated columns to match on (default: auto-detect).",
),
fuzzy: Optional[str] = typer.Option(
None, "--fuzzy",
help="Comma-separated columns to fuzzy-match (others use exact match).",
),
algorithm: str = typer.Option(
"jaro_winkler", "--algorithm", "-a",
help="Fuzzy algorithm: levenshtein, jaro_winkler, or token_set_ratio.",
),
threshold: int = typer.Option(
85, "--threshold", "-t",
help="Similarity threshold 0-100 for fuzzy matching.",
),
normalize: Optional[str] = typer.Option(
None, "--normalize",
help="Column normalizers as 'col:type' pairs (e.g., 'email:email,phone:phone').",
),
survivor: str = typer.Option(
"first", "--survivor",
help="Survivor rule: first, last, most-complete, or most-recent.",
),
date_column: Optional[str] = typer.Option(
None, "--date-column",
help="Date column for most-recent survivor rule.",
),
merge: bool = typer.Option(
False, "--merge",
help="Fill missing fields in the surviving row from removed duplicates.",
),
review: bool = typer.Option(
False, "--review",
help="Interactively review each match group before merging.",
),
config: Optional[str] = typer.Option(
None, "--config",
help="Load settings from a saved JSON config file.",
),
save_config: Optional[str] = typer.Option(
None, "--save-config",
help="Save current settings to a JSON config file.",
),
sheet: Optional[str] = typer.Option(
None, "--sheet",
help="Excel sheet name or index (default: first sheet).",
),
encoding_override: Optional[str] = typer.Option(
None, "--encoding",
help="Override auto-detected file encoding.",
),
header_row: Optional[int] = typer.Option(
None, "--header-row",
help="0-based row index for the header (default: auto-detect).",
),
):
"""Find and remove duplicate rows in CSV and Excel files."""
from src.core.io import read_file, write_file, list_sheets
from src.core.dedup import (
Algorithm, ColumnMatchStrategy, MatchStrategy, SurvivorRule,
build_default_strategies, deduplicate,
)
from src.core.normalizers import NormalizerType
from src.core.config import DeduplicationConfig
# Setup
input_path = Path(input_file)
if not input_path.exists():
typer.echo(f"Error: File not found: {input_path}", err=True)
raise typer.Exit(1)
log_path = _setup_logging(Path("logs"))
# Load config if provided
cfg: Optional[DeduplicationConfig] = None
if config:
config_path = Path(config)
if not config_path.exists():
typer.echo(f"Error: Config file not found: {config_path}", err=True)
raise typer.Exit(1)
cfg = DeduplicationConfig.from_file(config_path)
logger.info("Loaded config from {}", config_path)
# Read input
typer.echo(f"Reading {input_path.name}...")
try:
sheet_arg: str | int | None = None
if sheet is not None:
try:
sheet_arg = int(sheet)
except ValueError:
sheet_arg = sheet
df = read_file(
input_path,
encoding=encoding_override,
header_row=header_row,
sheet_name=sheet_arg if sheet_arg is not None else 0,
)
if not isinstance(df, __import__("pandas").DataFrame):
# chunked reading returns generator — materialise for v1
import pandas as pd
df = pd.concat(list(df), ignore_index=True)
except Exception as e:
from src.core.errors import format_for_user
typer.echo(
f"Error reading {input_path}:\n{format_for_user(e)}",
err=True,
)
raise typer.Exit(1)
typer.echo(f" {len(df)} rows, {len(df.columns)} columns")
available_columns = list(df.columns)
# Build strategies
strategies: Optional[list[MatchStrategy]] = None
if cfg and cfg.strategies:
strategies = cfg.to_strategies()
elif subset or fuzzy:
# Build from CLI flags
normalize_map = _parse_normalize_map(normalize)
strategies = []
fuzzy_cols = set(c.strip() for c in fuzzy.split(",")) if fuzzy else set()
if subset:
subset_cols = [c.strip() for c in subset.split(",")]
elif fuzzy_cols:
# When only --fuzzy is given, match on just those columns
subset_cols = list(fuzzy_cols)
else:
subset_cols = available_columns
_validate_columns(subset_cols, available_columns)
if fuzzy_cols:
_validate_columns(list(fuzzy_cols), available_columns)
col_strats: list[ColumnMatchStrategy] = []
for col in subset_cols:
norm = None
if col in normalize_map:
norm = NormalizerType(normalize_map[col])
if col in fuzzy_cols:
algo = Algorithm(algorithm)
thresh = float(threshold)
else:
algo = Algorithm.EXACT
thresh = 100.0
col_strats.append(ColumnMatchStrategy(
column=col, algorithm=algo, threshold=thresh, normalizer=norm,
))
strategies = [MatchStrategy(column_strategies=col_strats)]
# Apply normalizer overrides even with auto-detect
if normalize and strategies is None:
normalize_map = _parse_normalize_map(normalize)
auto_strats = build_default_strategies(df)
# Inject normalize_map into auto strategies
for strat in auto_strats:
for cs in strat.column_strategies:
if cs.column in normalize_map:
cs.normalizer = NormalizerType(normalize_map[cs.column])
strategies = auto_strats
# --key: add user-declared strong keys as standalone exact-match strategies
if key:
key_cols = [c.strip() for c in key.split(",")]
_validate_columns(key_cols, available_columns)
key_strats = [
MatchStrategy(column_strategies=[
ColumnMatchStrategy(column=col, algorithm=Algorithm.EXACT, threshold=100.0)
])
for col in key_cols
]
if strategies is None:
# Combine with auto-detect so user gets both
strategies = build_default_strategies(df) + key_strats
else:
strategies.extend(key_strats)
# Survivor rule
survivor_map = {
"first": SurvivorRule.KEEP_FIRST,
"last": SurvivorRule.KEEP_LAST,
"most-complete": SurvivorRule.KEEP_MOST_COMPLETE,
"most_complete": SurvivorRule.KEEP_MOST_COMPLETE,
"most-recent": SurvivorRule.KEEP_MOST_RECENT,
"most_recent": SurvivorRule.KEEP_MOST_RECENT,
}
if cfg:
surv_rule = cfg.to_survivor_rule()
do_merge = cfg.merge
dc = cfg.date_column
else:
surv_key = survivor.lower().replace("-", "_")
if surv_key not in {r.value for r in SurvivorRule} and surv_key not in survivor_map:
typer.echo(
f"Error: Unknown survivor rule '{survivor}'. "
f"Choose from: first, last, most-complete, most-recent.",
err=True,
)
raise typer.Exit(1)
surv_rule = survivor_map.get(survivor.lower(), SurvivorRule(surv_key))
do_merge = merge
dc = date_column
# Save config if requested
if save_config:
from src.core.config import DeduplicationConfig, StrategyConfig, ColumnStrategyConfig
save_cfg = DeduplicationConfig(
survivor_rule=surv_rule.value,
date_column=dc,
merge=do_merge,
subset_columns=[c.strip() for c in subset.split(",")] if subset else None,
fuzzy_columns=[c.strip() for c in fuzzy.split(",")] if fuzzy else None,
default_algorithm=algorithm,
default_threshold=float(threshold),
normalize_map=_parse_normalize_map(normalize),
)
if strategies:
save_cfg.strategies = [
StrategyConfig(columns=[
ColumnStrategyConfig(
column=cs.column,
algorithm=cs.algorithm.value,
threshold=cs.threshold,
normalizer=cs.normalizer.value if cs.normalizer else None,
)
for cs in s.column_strategies
])
for s in strategies
]
saved = save_cfg.to_file(save_config)
typer.echo(f"Config saved to {saved}")
# Progress bar
progress_cb = None
if len(df) > 10_000:
from tqdm import tqdm
pbar = tqdm(total=len(df) * (len(df) - 1) // 2, desc="Comparing rows",
unit="pairs", leave=False)
def _progress(current: int, total: int):
pbar.update(current - pbar.n)
if current >= total:
pbar.close()
progress_cb = _progress
# Review callback
review_cb = _interactive_review if review else None
# Run dedup
typer.echo("Finding duplicates...")
result = deduplicate(
df,
strategies=strategies,
survivor_rule=surv_rule,
date_column=dc,
merge=do_merge,
preview=not apply,
review_callback=review_cb,
progress_callback=progress_cb,
)
# Print results
_print_results(result, input_path)
# Write output files
if apply:
stem = input_path.stem
suffix = input_path.suffix
out_path = Path(output) if output else input_path.parent / f"{stem}_deduplicated.csv"
write_file(result.deduplicated_df, out_path)
typer.echo(f"\nDeduplicated file: {out_path}")
if not result.removed_df.empty:
removed_path = input_path.parent / f"{stem}_removed.csv"
write_file(result.removed_df, removed_path)
typer.echo(f"Removed rows: {removed_path}")
if result.match_groups:
groups_path = input_path.parent / f"{stem}_match_groups.csv"
_write_match_groups(result, df, groups_path)
typer.echo(f"Match groups: {groups_path}")
else:
typer.echo("\nThis was a preview. Add --apply to write the output files.")
typer.echo(f"Log: {log_path}")
# ---------------------------------------------------------------------------
# Output formatting
# ---------------------------------------------------------------------------
def _print_results(result, input_path: Path) -> None:
"""Print a human-readable summary."""
removed = result.original_row_count - len(result.deduplicated_df)
typer.echo(f"\n{''*50}")
typer.echo(f" File: {input_path.name}")
typer.echo(f" Rows in: {result.original_row_count}")
typer.echo(f" Rows out: {len(result.deduplicated_df)}")
typer.echo(f" Removed: {removed}")
typer.echo(f" Groups: {len(result.match_groups)}")
typer.echo(f"{''*50}")
if result.match_groups:
typer.echo("\nMatch groups:")
for g in result.match_groups[:20]: # cap display
rows_str = ", ".join(str(i + 1) for i in g.row_indices)
surv = g.survivor_index + 1
typer.echo(
f" Group {g.group_id + 1}: rows [{rows_str}] "
f"→ keep row {surv} "
f"(confidence: {g.confidence:.1f}%, "
f"matched on: {', '.join(g.matched_on)})"
)
if len(result.match_groups) > 20:
typer.echo(f" ... and {len(result.match_groups) - 20} more groups")
def _write_match_groups(result, original_df, path: Path) -> None:
"""Write match groups to a CSV for audit."""
import pandas as pd
from src.core.io import write_file
rows = []
for g in result.match_groups:
for idx in g.row_indices:
row_data = {"_group_id": g.group_id + 1}
row_data["_is_survivor"] = idx == g.survivor_index
row_data["_confidence"] = g.confidence
row_data["_matched_on"] = ", ".join(g.matched_on)
row_data["_original_row"] = idx + 1
# Include original data
for col in original_df.columns:
row_data[col] = original_df.iloc[idx].get(col, "")
rows.append(row_data)
groups_df = pd.DataFrame(rows)
write_file(groups_df, path)
# ---------------------------------------------------------------------------
# __main__ support
# ---------------------------------------------------------------------------
def main():
from src.cli_license_guard import guard
from src.license import FeatureFlag
guard(feature=FeatureFlag.DEDUPLICATOR.value)
app()
if __name__ == "__main__":
main()