Two coupled changes:
1. Lite tier
- New Tier.LITE in src/license/schema.py.
- FEATURES_BY_TIER[Tier.LITE] = {Deduplicator, Text Cleaner,
Format Standardizer}. The three universally-useful tools that
cover the most common bookkeeping / RevOps / Klaviyo prep
workflows. Other six tools require Core.
- i18n: license.tier_lite, license.feature_locked_title,
license.feature_locked_body, license.upgrade_link,
license.status_locked (en + es).
- Per-tool feature gate at every GUI tool page
(require_feature_or_render_upgrade) and every tool CLI
(guard(feature=...)). A locked tool renders an upgrade
prompt + Manage-license button (GUI) or exits with code 2
(CLI).
- Home grid: tool cards the user's tier doesn't unlock get a
red 🔒 Locked badge in place of green Ready.
2. Trial removed
- Activation form's "Start 1-year trial" button removed.
- license_cli's `trial` subcommand removed.
- activation.trial_button / activation.trial_help i18n keys
dropped (pack parity test stays green).
- Tier.TRIAL stays in the enum (back-compat with any field-
tested trial licenses); LicenseManager._mint stays internal
for tests and the seller's key generator.
- Decision logged in DECISIONS §9b: a 1-year all-features
trial undercuts paid Lite; paid-only keeps tier economics
clean.
Tests (+29 net): +17 Lite-tier unit/guard tests + 13 Lite-tier
GUI tests + 1 trial-absent assertion - 2 trial CLI tests - 1
trial GUI button test. Total: 1995 → 2024.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
510 lines
18 KiB
Python
510 lines
18 KiB
Python
"""CLI for the DataTools deduplicator.
|
|
|
|
Usage:
|
|
python -m src.cli input.csv # dry-run preview
|
|
python -m src.cli input.csv --apply # write deduplicated output
|
|
python -m src.cli input.csv --fuzzy name --merge # fuzzy match + merge
|
|
python -m src.cli --help # full help
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import typer
|
|
from loguru import logger
|
|
from rapidfuzz import process as rf_process
|
|
|
|
app = typer.Typer(
|
|
name="dedup",
|
|
help=(
|
|
"Find and remove duplicate rows in CSV and Excel files.\n\n"
|
|
"By default, runs in preview mode — shows what would change without "
|
|
"modifying anything. Add --apply to write the output.\n\n"
|
|
"Examples:\n\n"
|
|
" # Preview duplicates in a CSV file\n"
|
|
" python -m src.cli customers.csv\n\n"
|
|
" # Remove duplicates and save the result\n"
|
|
" python -m src.cli customers.csv --apply\n\n"
|
|
" # Fuzzy-match on the 'name' column with 80% threshold\n"
|
|
" python -m src.cli customers.csv --fuzzy name --threshold 80 --apply\n\n"
|
|
" # Match on specific columns only\n"
|
|
" python -m src.cli customers.csv --subset email,phone --apply\n\n"
|
|
" # Keep the most complete row and merge missing fields\n"
|
|
" python -m src.cli customers.csv --survivor most-complete --merge --apply\n"
|
|
),
|
|
add_completion=False,
|
|
no_args_is_help=True,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _setup_logging(log_dir: Path) -> Path:
|
|
"""Configure loguru to write a timestamped log file. Returns the log path."""
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
log_path = log_dir / f"dedup_{ts}.log"
|
|
logger.remove() # remove default stderr handler
|
|
logger.add(sys.stderr, level="WARNING", format="{message}")
|
|
logger.add(str(log_path), level="DEBUG",
|
|
format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {message}")
|
|
return log_path
|
|
|
|
|
|
def _suggest_column(name: str, available: list[str]) -> str:
|
|
"""Return a helpful error message when a column is not found."""
|
|
cols_str = ", ".join(available)
|
|
matches = rf_process.extract(name, available, limit=1, score_cutoff=50)
|
|
if matches:
|
|
suggestion = matches[0][0]
|
|
return (
|
|
f"Column '{name}' not found. "
|
|
f"Available columns: {cols_str}. "
|
|
f"Did you mean '{suggestion}'?"
|
|
)
|
|
return f"Column '{name}' not found. Available columns: {cols_str}."
|
|
|
|
|
|
def _validate_columns(requested: list[str], available: list[str]) -> None:
|
|
"""Raise typer.BadParameter if any requested column doesn't exist."""
|
|
for col in requested:
|
|
if col not in available:
|
|
raise typer.BadParameter(_suggest_column(col, available))
|
|
|
|
|
|
def _parse_normalize_map(raw: Optional[str]) -> dict[str, str]:
|
|
"""Parse 'col:type,col:type' into a dict."""
|
|
if not raw:
|
|
return {}
|
|
result = {}
|
|
for pair in raw.split(","):
|
|
pair = pair.strip()
|
|
if ":" not in pair:
|
|
raise typer.BadParameter(
|
|
f"Invalid normalize format: '{pair}'. "
|
|
f"Expected 'column:type' (e.g., 'email:email,phone:phone')."
|
|
)
|
|
col, ntype = pair.split(":", 1)
|
|
result[col.strip()] = ntype.strip()
|
|
return result
|
|
|
|
|
|
def _interactive_review(group, df) -> Optional[bool]:
|
|
"""Side-by-side CLI review for a match group. Returns True/False/None."""
|
|
from src.core.dedup import MatchResult
|
|
group: MatchResult
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"Match Group {group.group_id + 1} — Confidence: {group.confidence:.1f}%")
|
|
print(f"Matched on: {', '.join(group.matched_on)}")
|
|
print(f"{'='*60}")
|
|
|
|
display_cols = [c for c in df.columns if not str(c).startswith("_norm_")]
|
|
for idx in group.row_indices:
|
|
print(f"\n Row {idx + 1}:")
|
|
for col in display_cols:
|
|
val = df.iloc[idx].get(col, "")
|
|
if str(val).strip():
|
|
print(f" {col}: {val}")
|
|
|
|
while True:
|
|
choice = input("\n [y] Merge [n] Keep both [s] Skip remaining: ").strip().lower()
|
|
if choice == "y":
|
|
return True
|
|
if choice == "n":
|
|
return False
|
|
if choice == "s":
|
|
return None
|
|
print(" Please enter y, n, or s.")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main command
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.command()
|
|
def dedup(
|
|
input_file: str = typer.Argument(
|
|
...,
|
|
help="Path to the CSV or Excel file to deduplicate.",
|
|
),
|
|
output: Optional[str] = typer.Option(
|
|
None, "--output", "-o",
|
|
help="Output file path. Default: {input}_deduplicated.csv",
|
|
),
|
|
apply: bool = typer.Option(
|
|
False, "--apply",
|
|
help="Write the output file. Without this flag, only a preview is shown.",
|
|
),
|
|
key: Optional[str] = typer.Option(
|
|
None, "--key", "-k",
|
|
help="Comma-separated strong-key columns (e.g., 'fb_id,ein'). Each is an independent exact-match dedup key.",
|
|
),
|
|
subset: Optional[str] = typer.Option(
|
|
None, "--subset", "-s",
|
|
help="Comma-separated columns to match on (default: auto-detect).",
|
|
),
|
|
fuzzy: Optional[str] = typer.Option(
|
|
None, "--fuzzy",
|
|
help="Comma-separated columns to fuzzy-match (others use exact match).",
|
|
),
|
|
algorithm: str = typer.Option(
|
|
"jaro_winkler", "--algorithm", "-a",
|
|
help="Fuzzy algorithm: levenshtein, jaro_winkler, or token_set_ratio.",
|
|
),
|
|
threshold: int = typer.Option(
|
|
85, "--threshold", "-t",
|
|
help="Similarity threshold 0-100 for fuzzy matching.",
|
|
),
|
|
normalize: Optional[str] = typer.Option(
|
|
None, "--normalize",
|
|
help="Column normalizers as 'col:type' pairs (e.g., 'email:email,phone:phone').",
|
|
),
|
|
survivor: str = typer.Option(
|
|
"first", "--survivor",
|
|
help="Survivor rule: first, last, most-complete, or most-recent.",
|
|
),
|
|
date_column: Optional[str] = typer.Option(
|
|
None, "--date-column",
|
|
help="Date column for most-recent survivor rule.",
|
|
),
|
|
merge: bool = typer.Option(
|
|
False, "--merge",
|
|
help="Fill missing fields in the surviving row from removed duplicates.",
|
|
),
|
|
review: bool = typer.Option(
|
|
False, "--review",
|
|
help="Interactively review each match group before merging.",
|
|
),
|
|
config: Optional[str] = typer.Option(
|
|
None, "--config",
|
|
help="Load settings from a saved JSON config file.",
|
|
),
|
|
save_config: Optional[str] = typer.Option(
|
|
None, "--save-config",
|
|
help="Save current settings to a JSON config file.",
|
|
),
|
|
sheet: Optional[str] = typer.Option(
|
|
None, "--sheet",
|
|
help="Excel sheet name or index (default: first sheet).",
|
|
),
|
|
encoding_override: Optional[str] = typer.Option(
|
|
None, "--encoding",
|
|
help="Override auto-detected file encoding.",
|
|
),
|
|
header_row: Optional[int] = typer.Option(
|
|
None, "--header-row",
|
|
help="0-based row index for the header (default: auto-detect).",
|
|
),
|
|
):
|
|
"""Find and remove duplicate rows in CSV and Excel files."""
|
|
from src.core.io import read_file, write_file, list_sheets
|
|
from src.core.dedup import (
|
|
Algorithm, ColumnMatchStrategy, MatchStrategy, SurvivorRule,
|
|
build_default_strategies, deduplicate,
|
|
)
|
|
from src.core.normalizers import NormalizerType
|
|
from src.core.config import DeduplicationConfig
|
|
|
|
# Setup
|
|
input_path = Path(input_file)
|
|
if not input_path.exists():
|
|
typer.echo(f"Error: File not found: {input_path}", err=True)
|
|
raise typer.Exit(1)
|
|
|
|
log_path = _setup_logging(Path("logs"))
|
|
|
|
# Load config if provided
|
|
cfg: Optional[DeduplicationConfig] = None
|
|
if config:
|
|
config_path = Path(config)
|
|
if not config_path.exists():
|
|
typer.echo(f"Error: Config file not found: {config_path}", err=True)
|
|
raise typer.Exit(1)
|
|
cfg = DeduplicationConfig.from_file(config_path)
|
|
logger.info("Loaded config from {}", config_path)
|
|
|
|
# Read input
|
|
typer.echo(f"Reading {input_path.name}...")
|
|
try:
|
|
sheet_arg: str | int | None = None
|
|
if sheet is not None:
|
|
try:
|
|
sheet_arg = int(sheet)
|
|
except ValueError:
|
|
sheet_arg = sheet
|
|
|
|
df = read_file(
|
|
input_path,
|
|
encoding=encoding_override,
|
|
header_row=header_row,
|
|
sheet_name=sheet_arg if sheet_arg is not None else 0,
|
|
)
|
|
if not isinstance(df, __import__("pandas").DataFrame):
|
|
# chunked reading returns generator — materialise for v1
|
|
import pandas as pd
|
|
df = pd.concat(list(df), ignore_index=True)
|
|
except Exception as e:
|
|
from src.core.errors import format_for_user
|
|
typer.echo(
|
|
f"Error reading {input_path}:\n{format_for_user(e)}",
|
|
err=True,
|
|
)
|
|
raise typer.Exit(1)
|
|
|
|
typer.echo(f" {len(df)} rows, {len(df.columns)} columns")
|
|
available_columns = list(df.columns)
|
|
|
|
# Build strategies
|
|
strategies: Optional[list[MatchStrategy]] = None
|
|
|
|
if cfg and cfg.strategies:
|
|
strategies = cfg.to_strategies()
|
|
elif subset or fuzzy:
|
|
# Build from CLI flags
|
|
normalize_map = _parse_normalize_map(normalize)
|
|
strategies = []
|
|
|
|
fuzzy_cols = set(c.strip() for c in fuzzy.split(",")) if fuzzy else set()
|
|
if subset:
|
|
subset_cols = [c.strip() for c in subset.split(",")]
|
|
elif fuzzy_cols:
|
|
# When only --fuzzy is given, match on just those columns
|
|
subset_cols = list(fuzzy_cols)
|
|
else:
|
|
subset_cols = available_columns
|
|
|
|
_validate_columns(subset_cols, available_columns)
|
|
if fuzzy_cols:
|
|
_validate_columns(list(fuzzy_cols), available_columns)
|
|
|
|
col_strats: list[ColumnMatchStrategy] = []
|
|
for col in subset_cols:
|
|
norm = None
|
|
if col in normalize_map:
|
|
norm = NormalizerType(normalize_map[col])
|
|
|
|
if col in fuzzy_cols:
|
|
algo = Algorithm(algorithm)
|
|
thresh = float(threshold)
|
|
else:
|
|
algo = Algorithm.EXACT
|
|
thresh = 100.0
|
|
|
|
col_strats.append(ColumnMatchStrategy(
|
|
column=col, algorithm=algo, threshold=thresh, normalizer=norm,
|
|
))
|
|
|
|
strategies = [MatchStrategy(column_strategies=col_strats)]
|
|
|
|
# Apply normalizer overrides even with auto-detect
|
|
if normalize and strategies is None:
|
|
normalize_map = _parse_normalize_map(normalize)
|
|
auto_strats = build_default_strategies(df)
|
|
# Inject normalize_map into auto strategies
|
|
for strat in auto_strats:
|
|
for cs in strat.column_strategies:
|
|
if cs.column in normalize_map:
|
|
cs.normalizer = NormalizerType(normalize_map[cs.column])
|
|
strategies = auto_strats
|
|
|
|
# --key: add user-declared strong keys as standalone exact-match strategies
|
|
if key:
|
|
key_cols = [c.strip() for c in key.split(",")]
|
|
_validate_columns(key_cols, available_columns)
|
|
key_strats = [
|
|
MatchStrategy(column_strategies=[
|
|
ColumnMatchStrategy(column=col, algorithm=Algorithm.EXACT, threshold=100.0)
|
|
])
|
|
for col in key_cols
|
|
]
|
|
if strategies is None:
|
|
# Combine with auto-detect so user gets both
|
|
strategies = build_default_strategies(df) + key_strats
|
|
else:
|
|
strategies.extend(key_strats)
|
|
|
|
# Survivor rule
|
|
survivor_map = {
|
|
"first": SurvivorRule.KEEP_FIRST,
|
|
"last": SurvivorRule.KEEP_LAST,
|
|
"most-complete": SurvivorRule.KEEP_MOST_COMPLETE,
|
|
"most_complete": SurvivorRule.KEEP_MOST_COMPLETE,
|
|
"most-recent": SurvivorRule.KEEP_MOST_RECENT,
|
|
"most_recent": SurvivorRule.KEEP_MOST_RECENT,
|
|
}
|
|
if cfg:
|
|
surv_rule = cfg.to_survivor_rule()
|
|
do_merge = cfg.merge
|
|
dc = cfg.date_column
|
|
else:
|
|
surv_key = survivor.lower().replace("-", "_")
|
|
if surv_key not in {r.value for r in SurvivorRule} and surv_key not in survivor_map:
|
|
typer.echo(
|
|
f"Error: Unknown survivor rule '{survivor}'. "
|
|
f"Choose from: first, last, most-complete, most-recent.",
|
|
err=True,
|
|
)
|
|
raise typer.Exit(1)
|
|
surv_rule = survivor_map.get(survivor.lower(), SurvivorRule(surv_key))
|
|
do_merge = merge
|
|
dc = date_column
|
|
|
|
# Save config if requested
|
|
if save_config:
|
|
from src.core.config import DeduplicationConfig, StrategyConfig, ColumnStrategyConfig
|
|
save_cfg = DeduplicationConfig(
|
|
survivor_rule=surv_rule.value,
|
|
date_column=dc,
|
|
merge=do_merge,
|
|
subset_columns=[c.strip() for c in subset.split(",")] if subset else None,
|
|
fuzzy_columns=[c.strip() for c in fuzzy.split(",")] if fuzzy else None,
|
|
default_algorithm=algorithm,
|
|
default_threshold=float(threshold),
|
|
normalize_map=_parse_normalize_map(normalize),
|
|
)
|
|
if strategies:
|
|
save_cfg.strategies = [
|
|
StrategyConfig(columns=[
|
|
ColumnStrategyConfig(
|
|
column=cs.column,
|
|
algorithm=cs.algorithm.value,
|
|
threshold=cs.threshold,
|
|
normalizer=cs.normalizer.value if cs.normalizer else None,
|
|
)
|
|
for cs in s.column_strategies
|
|
])
|
|
for s in strategies
|
|
]
|
|
saved = save_cfg.to_file(save_config)
|
|
typer.echo(f"Config saved to {saved}")
|
|
|
|
# Progress bar
|
|
progress_cb = None
|
|
if len(df) > 10_000:
|
|
from tqdm import tqdm
|
|
pbar = tqdm(total=len(df) * (len(df) - 1) // 2, desc="Comparing rows",
|
|
unit="pairs", leave=False)
|
|
|
|
def _progress(current: int, total: int):
|
|
pbar.update(current - pbar.n)
|
|
if current >= total:
|
|
pbar.close()
|
|
|
|
progress_cb = _progress
|
|
|
|
# Review callback
|
|
review_cb = _interactive_review if review else None
|
|
|
|
# Run dedup
|
|
typer.echo("Finding duplicates...")
|
|
result = deduplicate(
|
|
df,
|
|
strategies=strategies,
|
|
survivor_rule=surv_rule,
|
|
date_column=dc,
|
|
merge=do_merge,
|
|
preview=not apply,
|
|
review_callback=review_cb,
|
|
progress_callback=progress_cb,
|
|
)
|
|
|
|
# Print results
|
|
_print_results(result, input_path)
|
|
|
|
# Write output files
|
|
if apply:
|
|
stem = input_path.stem
|
|
suffix = input_path.suffix
|
|
|
|
out_path = Path(output) if output else input_path.parent / f"{stem}_deduplicated.csv"
|
|
write_file(result.deduplicated_df, out_path)
|
|
typer.echo(f"\nDeduplicated file: {out_path}")
|
|
|
|
if not result.removed_df.empty:
|
|
removed_path = input_path.parent / f"{stem}_removed.csv"
|
|
write_file(result.removed_df, removed_path)
|
|
typer.echo(f"Removed rows: {removed_path}")
|
|
|
|
if result.match_groups:
|
|
groups_path = input_path.parent / f"{stem}_match_groups.csv"
|
|
_write_match_groups(result, df, groups_path)
|
|
typer.echo(f"Match groups: {groups_path}")
|
|
else:
|
|
typer.echo("\nThis was a preview. Add --apply to write the output files.")
|
|
|
|
typer.echo(f"Log: {log_path}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Output formatting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _print_results(result, input_path: Path) -> None:
|
|
"""Print a human-readable summary."""
|
|
removed = result.original_row_count - len(result.deduplicated_df)
|
|
typer.echo(f"\n{'─'*50}")
|
|
typer.echo(f" File: {input_path.name}")
|
|
typer.echo(f" Rows in: {result.original_row_count}")
|
|
typer.echo(f" Rows out: {len(result.deduplicated_df)}")
|
|
typer.echo(f" Removed: {removed}")
|
|
typer.echo(f" Groups: {len(result.match_groups)}")
|
|
typer.echo(f"{'─'*50}")
|
|
|
|
if result.match_groups:
|
|
typer.echo("\nMatch groups:")
|
|
for g in result.match_groups[:20]: # cap display
|
|
rows_str = ", ".join(str(i + 1) for i in g.row_indices)
|
|
surv = g.survivor_index + 1
|
|
typer.echo(
|
|
f" Group {g.group_id + 1}: rows [{rows_str}] "
|
|
f"→ keep row {surv} "
|
|
f"(confidence: {g.confidence:.1f}%, "
|
|
f"matched on: {', '.join(g.matched_on)})"
|
|
)
|
|
if len(result.match_groups) > 20:
|
|
typer.echo(f" ... and {len(result.match_groups) - 20} more groups")
|
|
|
|
|
|
def _write_match_groups(result, original_df, path: Path) -> None:
|
|
"""Write match groups to a CSV for audit."""
|
|
import pandas as pd
|
|
from src.core.io import write_file
|
|
|
|
rows = []
|
|
for g in result.match_groups:
|
|
for idx in g.row_indices:
|
|
row_data = {"_group_id": g.group_id + 1}
|
|
row_data["_is_survivor"] = idx == g.survivor_index
|
|
row_data["_confidence"] = g.confidence
|
|
row_data["_matched_on"] = ", ".join(g.matched_on)
|
|
row_data["_original_row"] = idx + 1
|
|
# Include original data
|
|
for col in original_df.columns:
|
|
row_data[col] = original_df.iloc[idx].get(col, "")
|
|
rows.append(row_data)
|
|
|
|
groups_df = pd.DataFrame(rows)
|
|
write_file(groups_df, path)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# __main__ support
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
from src.cli_license_guard import guard
|
|
from src.license import FeatureFlag
|
|
guard(feature=FeatureFlag.DEDUPLICATOR.value)
|
|
app()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|