Files
datatools-dev/src/cli_column_map.py
Michael d32b58e61a feat(license): add Lite SKU; remove user-facing free trial
Two coupled changes:

1. Lite tier
   - New Tier.LITE in src/license/schema.py.
   - FEATURES_BY_TIER[Tier.LITE] = {Deduplicator, Text Cleaner,
     Format Standardizer}. The three universally-useful tools that
     cover the most common bookkeeping / RevOps / Klaviyo prep
     workflows. Other six tools require Core.
   - i18n: license.tier_lite, license.feature_locked_title,
     license.feature_locked_body, license.upgrade_link,
     license.status_locked (en + es).
   - Per-tool feature gate at every GUI tool page
     (require_feature_or_render_upgrade) and every tool CLI
     (guard(feature=...)). A locked tool renders an upgrade
     prompt + Manage-license button (GUI) or exits with code 2
     (CLI).
   - Home grid: tool cards the user's tier doesn't unlock get a
     red 🔒 Locked badge in place of green Ready.

2. Trial removed
   - Activation form's "Start 1-year trial" button removed.
   - license_cli's `trial` subcommand removed.
   - activation.trial_button / activation.trial_help i18n keys
     dropped (pack parity test stays green).
   - Tier.TRIAL stays in the enum (back-compat with any field-
     tested trial licenses); LicenseManager._mint stays internal
     for tests and the seller's key generator.
   - Decision logged in DECISIONS §9b: a 1-year all-features
     trial undercuts paid Lite; paid-only keeps tier economics
     clean.

Tests (+29 net): +17 Lite-tier unit/guard tests + 13 Lite-tier
GUI tests + 1 trial-absent assertion - 2 trial CLI tests - 1
trial GUI button test. Total: 1995 → 2024.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 17:19:30 +00:00

359 lines
12 KiB
Python

"""CLI for the DataTools Column Mapper (script 05).
Usage:
python -m src.cli_column_map input.csv # auto-mapping preview
python -m src.cli_column_map input.csv --schema target.json --apply
python -m src.cli_column_map input.csv --rename "First Name=first_name,Email=email" --apply
python -m src.cli_column_map input.csv --schema target.json --preset strict-schema --apply
python -m src.cli_column_map input.csv --schema target.json --coerce --apply
python -m src.cli_column_map --help
"""
from __future__ import annotations
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
import typer
from loguru import logger
app = typer.Typer(
name="column-map",
help=(
"Rename columns, enforce a target schema, and coerce types in CSV / Excel files.\n\n"
"Default behaviour: preview the mapping (no file written). Add --apply "
"to write the mapped output and audit log.\n\n"
"Examples:\n\n"
" # Show what auto-mapping would do (no schema → identity)\n"
" python -m src.cli_column_map vendor.csv\n\n"
" # Map against a target JSON schema with strict drop / coerce / reorder\n"
" python -m src.cli_column_map vendor.csv --schema target.json "
"--preset strict-schema --apply\n\n"
" # Hand-rolled rename without a schema\n"
" python -m src.cli_column_map data.csv "
"--rename 'First Name=first_name,Last Name=last_name' --apply\n\n"
" # Coerce specific columns inline\n"
" python -m src.cli_column_map data.csv "
"--coerce-col 'age:integer,joined:date' --apply\n"
),
add_completion=False,
no_args_is_help=True,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _setup_logging(log_dir: Path) -> Path:
log_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = log_dir / f"column_map_{ts}.log"
logger.remove()
logger.add(sys.stderr, level="WARNING", format="{message}")
logger.add(
str(log_path),
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {message}",
)
return log_path
def _parse_pairs(raw: Optional[str], separator: str = ",") -> dict[str, str]:
"""Parse ``a=1,b=2`` into a dict."""
if not raw:
return {}
out: dict[str, str] = {}
for piece in raw.split(separator):
piece = piece.strip()
if not piece:
continue
if "=" not in piece:
raise typer.BadParameter(
f"Invalid pair: {piece!r}. Expected 'key=value[,key=value...]'."
)
k, v = piece.split("=", 1)
out[k.strip()] = v.strip()
return out
def _parse_coerce(raw: Optional[str]) -> dict[str, str]:
"""Parse ``age:integer,joined:date`` into a dict."""
if not raw:
return {}
out: dict[str, str] = {}
for piece in raw.split(","):
piece = piece.strip()
if not piece:
continue
if ":" not in piece:
raise typer.BadParameter(
f"Invalid --coerce-col piece: {piece!r}. "
f"Expected 'col:dtype[,col:dtype...]'."
)
col, dtype = piece.split(":", 1)
out[col.strip()] = dtype.strip()
return out
# ---------------------------------------------------------------------------
# Main command
# ---------------------------------------------------------------------------
@app.command()
def map_(
input_file: str = typer.Argument(
...,
help="Path to the CSV or Excel file.",
),
output: Optional[str] = typer.Option(
None, "--output", "-o",
help="Output file path. Default: {input}_mapped.csv",
),
apply: bool = typer.Option(
False, "--apply",
help="Write the output. Without this flag, only the mapping plan is shown.",
),
preset: str = typer.Option(
"rename-only", "--preset",
help="Preset: rename-only, strict-schema, or lenient-schema.",
),
schema: Optional[str] = typer.Option(
None, "--schema",
help="Path to a target schema JSON file (TargetSchema format).",
),
rename: Optional[str] = typer.Option(
None, "--rename",
help="Explicit rename pairs: 'src=tgt[,src=tgt...]' (overrides auto-inference).",
),
coerce_col: Optional[str] = typer.Option(
None, "--coerce-col",
help=(
"Inline type coercion (no schema needed): 'col:dtype[,col:dtype...]'. "
"Valid dtypes: string, integer, float, boolean, date, datetime, category, auto."
),
),
unmapped: Optional[str] = typer.Option(
None, "--unmapped",
help="Strategy for unmapped source columns: keep | drop | error.",
),
threshold: Optional[float] = typer.Option(
None, "--threshold",
help="Fuzzy-match threshold for auto-inference (0.0..1.0). Default 0.6.",
),
no_auto: bool = typer.Option(
False, "--no-auto",
help="Disable auto-inference; honour only explicit --rename pairs.",
),
no_coerce: bool = typer.Option(
False, "--no-coerce",
help="Disable type coercion (overrides preset).",
),
no_reorder: bool = typer.Option(
False, "--no-reorder",
help="Disable schema-order reorder (overrides preset).",
),
no_required: bool = typer.Option(
False, "--no-required",
help="Don't enforce required-target presence (overrides preset).",
),
config: Optional[str] = typer.Option(
None, "--config",
help="Load options from a saved JSON config file.",
),
save_config: Optional[str] = typer.Option(
None, "--save-config",
help="Save current options to a JSON config file.",
),
sheet: Optional[str] = typer.Option(
None, "--sheet",
help="Excel sheet name or index (default: first sheet).",
),
encoding_override: Optional[str] = typer.Option(
None, "--encoding",
help="Override auto-detected file encoding.",
),
header_row: Optional[int] = typer.Option(
None, "--header-row",
help="0-based row index for the header (default: auto-detect).",
),
):
"""Map source columns to a target schema; rename, coerce, drop, reorder."""
from src.core.io import read_file, write_file
from src.core.column_mapper import (
MapOptions,
PRESETS,
TargetField,
TargetSchema,
coerce_series,
map_columns,
)
import pandas as pd
input_path = Path(input_file)
if not input_path.exists():
typer.echo(f"Error: File not found: {input_path}", err=True)
raise typer.Exit(1)
if preset not in PRESETS:
typer.echo(
f"Error: Unknown preset '{preset}'. "
f"Choose from: {', '.join(sorted(PRESETS))}.",
err=True,
)
raise typer.Exit(1)
log_path = _setup_logging(Path("logs"))
# Build options
if config:
cfg_path = Path(config)
if not cfg_path.exists():
typer.echo(f"Error: Config file not found: {cfg_path}", err=True)
raise typer.Exit(1)
options = MapOptions.from_file(cfg_path)
else:
options = MapOptions.from_preset(preset)
if schema:
sp = Path(schema)
if not sp.exists():
typer.echo(f"Error: Schema file not found: {sp}", err=True)
raise typer.Exit(1)
options.schema = TargetSchema.from_file(sp)
if rename:
options.mapping = {**options.mapping, **_parse_pairs(rename)}
if unmapped:
options.unmapped = unmapped # type: ignore[assignment]
if threshold is not None:
options.fuzzy_threshold = threshold
if no_auto:
options.auto_infer = False
if no_coerce:
options.coerce_types = False
if no_reorder:
options.reorder_to_schema = False
if no_required:
options.enforce_required = False
# Inline coercion (no schema): build a tiny one-field-per-column schema.
inline_coerce = _parse_coerce(coerce_col)
if inline_coerce and options.schema is None:
options.schema = TargetSchema(fields=[
TargetField(name=col, dtype=dt) # type: ignore[arg-type]
for col, dt in inline_coerce.items()
])
options.coerce_types = True
if save_config:
saved = options.to_file(save_config)
typer.echo(f"Config saved to {saved}")
# Read input
typer.echo(f"Reading {input_path.name}...")
try:
sheet_arg: str | int | None = None
if sheet is not None:
try:
sheet_arg = int(sheet)
except ValueError:
sheet_arg = sheet
df = read_file(
input_path,
encoding=encoding_override,
header_row=header_row,
sheet_name=sheet_arg if sheet_arg is not None else 0,
repair=False,
)
if not isinstance(df, pd.DataFrame):
df = pd.concat(list(df), ignore_index=True)
except Exception as e:
typer.echo(f"Error reading file: {e}", err=True)
raise typer.Exit(1)
typer.echo(f" {len(df)} rows, {len(df.columns)} columns")
typer.echo("Mapping columns...")
try:
result = map_columns(df, options)
except (ValueError, OSError) as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
_print_results(result, input_path, options)
if apply:
stem = input_path.stem
out_path = Path(output) if output else input_path.parent / f"{stem}_mapped.csv"
write_file(result.mapped_df, out_path)
typer.echo(f"\nMapped file: {out_path}")
# Audit: write the resolved mapping as JSON next to the output.
audit_path = input_path.parent / f"{stem}_mapping.json"
audit_path.write_text(json.dumps({
"mapping": result.mapping,
"inferred_pairs": result.inferred_pairs,
"columns_renamed": result.columns_renamed,
"columns_dropped": result.columns_dropped,
"columns_added": result.columns_added,
"coercion_failures": result.coercion_failures,
"unmapped_kept": result.unmapped_kept,
"missing_required_targets": result.missing_required_targets,
}, indent=2, default=str))
typer.echo(f"Mapping audit: {audit_path}")
else:
typer.echo("\nThis was a preview. Add --apply to write the mapped output.")
typer.echo(f"Log: {log_path}")
# ---------------------------------------------------------------------------
# Output formatting
# ---------------------------------------------------------------------------
def _print_results(result, input_path: Path, options) -> None:
typer.echo(f"\n{''*60}")
typer.echo(f" File: {input_path.name}")
typer.echo(f" Columns renamed: {result.columns_renamed}")
typer.echo(f" Columns dropped: {len(result.columns_dropped)}")
typer.echo(f" Columns added: {len(result.columns_added)}")
typer.echo(f" Unmapped kept: {len(result.unmapped_kept)}")
typer.echo(f" Coercion failures: "
f"{sum(result.coercion_failures.values())} cells across "
f"{len(result.coercion_failures)} column(s)")
typer.echo(f"{''*60}")
if result.mapping:
typer.echo("\nMapping:")
for src, tgt in result.mapping.items():
tag = " (auto)" if src in result.inferred_pairs else ""
arrow = "" if src != tgt else ""
typer.echo(f" {src!r} {arrow} {tgt!r}{tag}")
if result.columns_dropped:
typer.echo(f"\nDropped: {result.columns_dropped}")
if result.columns_added:
typer.echo(f"\nAdded (defaults): {result.columns_added}")
if result.coercion_failures:
typer.echo("\nCoercion failures:")
for col, n in result.coercion_failures.items():
typer.echo(f" {col}: {n} row(s) could not be coerced")
if result.missing_required_targets:
typer.echo(f"\nMissing required targets: {result.missing_required_targets}")
# ---------------------------------------------------------------------------
# __main__
# ---------------------------------------------------------------------------
def main():
from src.cli_license_guard import guard
from src.license import FeatureFlag
guard(feature=FeatureFlag.COLUMN_MAPPER.value)
app()
if __name__ == "__main__":
main()