Sweep follow-up to 93e43fc. Display labels now consistent across docs,
landing pages, CLI output, code comments, docstrings, and test prose.
Five parallel surfaces touched:
- docs (EN + ES): README, USER-GUIDE, CLI-REFERENCE, and 11 internal
design/planning docs
- landing pages: index + bookkeeper/revops/shopify-pet
- src: CLI module docstrings, _TOOL_DISPLAY dicts in cli_analyze.py
and gui/components/_legacy.py, core module headers, every tool
page's module docstring
- tests: class/method/module docstrings and section-header comments
- test-cases READMEs
Page slugs (1_Deduplicator etc.), tool_id strings (01_deduplicator
etc.), Python class names (TestDeduplicatorWorkflow, FeatureFlag.*),
URL paths, anchor IDs, CSS classes, and asset filenames were left
intact since they're code identifiers / structural references.
All 2033 tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
368 lines
12 KiB
Python
368 lines
12 KiB
Python
"""CLI for the DataTools Standardize Formats tool (script 03).
|
|
|
|
Usage:
|
|
python -m src.cli_format input.csv \\
|
|
--types 'phone:phone,price:currency,name:name' \\
|
|
--apply
|
|
|
|
# 1 GB international file with per-row country column:
|
|
python -m src.cli_format huge.csv \\
|
|
--types 'phone:phone,address:address,price:currency' \\
|
|
--phone-country country --address-country country \\
|
|
--preserve-code --audit-max 50000 --apply
|
|
|
|
The CLI auto-streams (chunked read/write, bounded RAM) when the input
|
|
exceeds ~100 MB. Force or disable with ``--stream`` / ``--no-stream``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import typer
|
|
from loguru import logger
|
|
|
|
app = typer.Typer(
|
|
name="format",
|
|
help=(
|
|
"Standardize dates, phones, currencies, names, and addresses "
|
|
"in CSV / Excel files.\n\n"
|
|
"Default behaviour: preview the changes (no file written). "
|
|
"Add --apply to write output.\n\n"
|
|
"For 1 GB+ international files, the CLI auto-streams in 50,000-row "
|
|
"chunks so memory stays bounded. Use --phone-country / "
|
|
"--address-country to point at a per-row ISO-3166 column for "
|
|
"country-aware parsing.\n\n"
|
|
"Examples:\n\n"
|
|
" # Preview\n"
|
|
" python -m src.cli_format data.csv --types 'phone:phone,price:currency'\n\n"
|
|
" # International file with per-row country\n"
|
|
" python -m src.cli_format leads.csv --types 'phone:phone' "
|
|
"--phone-country country --apply\n\n"
|
|
" # Force streaming with smaller chunks for tight memory\n"
|
|
" python -m src.cli_format huge.csv --types 'phone:phone' "
|
|
"--stream --chunk-size 10000 --apply\n"
|
|
),
|
|
add_completion=False,
|
|
no_args_is_help=True,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _setup_logging(log_dir: Path) -> Path:
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
log_path = log_dir / f"format_{ts}.log"
|
|
logger.remove()
|
|
logger.add(sys.stderr, level="WARNING", format="{message}")
|
|
logger.add(
|
|
str(log_path), level="DEBUG",
|
|
format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {message}",
|
|
)
|
|
return log_path
|
|
|
|
|
|
def _parse_types(raw: Optional[str]) -> dict[str, str]:
|
|
"""Parse ``col:phone,col:date`` into a dict."""
|
|
if not raw:
|
|
return {}
|
|
out: dict[str, str] = {}
|
|
for piece in raw.split(","):
|
|
piece = piece.strip()
|
|
if not piece:
|
|
continue
|
|
if ":" not in piece:
|
|
raise typer.BadParameter(
|
|
f"Invalid --types piece: {piece!r}. "
|
|
f"Expected 'col:type[,col:type...]' "
|
|
f"where type is one of: date, phone, currency, name, address, email, boolean."
|
|
)
|
|
col, ft = piece.split(":", 1)
|
|
out[col.strip()] = ft.strip()
|
|
return out
|
|
|
|
|
|
_AUTO_STREAM_THRESHOLD = 100 * 1024 * 1024 # 100 MB
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main command
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.command()
|
|
def standardize(
|
|
input_file: str = typer.Argument(..., help="CSV or TSV file path."),
|
|
output: Optional[str] = typer.Option(
|
|
None, "--output", "-o",
|
|
help="Output file path. Default: {input}_standardized.csv",
|
|
),
|
|
apply: bool = typer.Option(
|
|
False, "--apply",
|
|
help="Write the output. Without this flag, only a preview is shown.",
|
|
),
|
|
types: Optional[str] = typer.Option(
|
|
None, "--types",
|
|
help="Per-column types: 'col:type[,col:type...]'. "
|
|
"Types: date, phone, currency, name, address, email, boolean.",
|
|
),
|
|
preset: Optional[str] = typer.Option(
|
|
None, "--preset",
|
|
help="Named preset (e.g. 'us', 'uk', 'eu', 'jp'). Layered before --types.",
|
|
),
|
|
phone_country: Optional[str] = typer.Option(
|
|
None, "--phone-country",
|
|
help="Column name carrying the per-row ISO-3166 country code for phones.",
|
|
),
|
|
address_country: Optional[str] = typer.Option(
|
|
None, "--address-country",
|
|
help="Column name carrying the per-row country code for addresses.",
|
|
),
|
|
phone_region: str = typer.Option(
|
|
"US", "--phone-region",
|
|
help="Default phone region when no per-row column is set. ISO-3166 alpha-2.",
|
|
),
|
|
phone_format: str = typer.Option(
|
|
"E164", "--phone-format",
|
|
help="Phone output format: E164 | INTERNATIONAL | NATIONAL | RFC3966 | DIGITS.",
|
|
),
|
|
preserve_code: bool = typer.Option(
|
|
False, "--preserve-code",
|
|
help="Currency: emit ISO-4217 prefix (e.g. 'USD 1500.00').",
|
|
),
|
|
decimals: int = typer.Option(
|
|
2, "--decimals",
|
|
help="Currency decimal precision.",
|
|
),
|
|
audit_max: int = typer.Option(
|
|
10_000, "--audit-max",
|
|
help="Cap the change-audit at N rows (0 = no audit, -1 = unbounded).",
|
|
),
|
|
stream: Optional[bool] = typer.Option(
|
|
None, "--stream/--no-stream",
|
|
help="Force streaming (chunked, bounded RAM). Auto-on for inputs > 100 MB.",
|
|
),
|
|
chunk_size: int = typer.Option(
|
|
50_000, "--chunk-size",
|
|
help="Rows per chunk in streaming mode.",
|
|
),
|
|
cache_size: int = typer.Option(
|
|
262_144, "--cache-size",
|
|
help="Per-column LRU-cache size (set 0 to disable).",
|
|
),
|
|
encoding_override: Optional[str] = typer.Option(
|
|
None, "--encoding",
|
|
help="Override auto-detected file encoding.",
|
|
),
|
|
delimiter: Optional[str] = typer.Option(
|
|
None, "--delimiter",
|
|
help="Override auto-detected delimiter.",
|
|
),
|
|
config: Optional[str] = typer.Option(
|
|
None, "--config",
|
|
help="Load options from a saved JSON config.",
|
|
),
|
|
save_config: Optional[str] = typer.Option(
|
|
None, "--save-config",
|
|
help="Save current options to a JSON config.",
|
|
),
|
|
):
|
|
"""Standardize formats across a CSV / TSV. Auto-streams for large inputs."""
|
|
from src.core.format_standardize import (
|
|
FieldType,
|
|
StandardizeOptions,
|
|
standardize_dataframe,
|
|
standardize_file,
|
|
)
|
|
from src.core.io import read_file, detect_encoding, detect_delimiter
|
|
import pandas as pd
|
|
|
|
inp = Path(input_file)
|
|
if not inp.exists():
|
|
typer.echo(f"Error: File not found: {inp}", err=True)
|
|
raise typer.Exit(1)
|
|
|
|
log_path = _setup_logging(Path("logs"))
|
|
|
|
# Build options
|
|
if config:
|
|
cp = Path(config)
|
|
if not cp.exists():
|
|
typer.echo(f"Error: Config file not found: {cp}", err=True)
|
|
raise typer.Exit(1)
|
|
options = StandardizeOptions.from_file(cp)
|
|
elif preset:
|
|
try:
|
|
options = StandardizeOptions.from_preset(preset)
|
|
except ValueError as e:
|
|
typer.echo(f"Error: {e}", err=True)
|
|
raise typer.Exit(1)
|
|
else:
|
|
options = StandardizeOptions()
|
|
|
|
parsed_types = _parse_types(types)
|
|
if parsed_types:
|
|
try:
|
|
options.column_types = {
|
|
col: FieldType(t) for col, t in parsed_types.items()
|
|
}
|
|
except ValueError as e:
|
|
typer.echo(
|
|
f"Error: {e}. Valid types: "
|
|
+ ", ".join(sorted(t.value for t in FieldType)),
|
|
err=True,
|
|
)
|
|
raise typer.Exit(1)
|
|
|
|
if not options.column_types:
|
|
typer.echo(
|
|
"Error: no column types declared. Pass --types 'col:type,...' "
|
|
"or --preset / --config with a column_types map.",
|
|
err=True,
|
|
)
|
|
raise typer.Exit(1)
|
|
|
|
if phone_country:
|
|
options.phone_country_column = phone_country
|
|
if address_country:
|
|
options.address_country_column = address_country
|
|
options.phone_region = phone_region
|
|
options.phone_format = phone_format # type: ignore[assignment]
|
|
options.currency_preserve_code = preserve_code
|
|
options.currency_decimals = decimals
|
|
options.audit_max_rows = (
|
|
None if audit_max < 0 else audit_max
|
|
)
|
|
options.cache_size = cache_size
|
|
|
|
if save_config:
|
|
saved = options.to_file(save_config)
|
|
typer.echo(f"Config saved to {saved}")
|
|
|
|
# Decide streaming mode
|
|
file_size = inp.stat().st_size
|
|
use_stream = stream if stream is not None else file_size > _AUTO_STREAM_THRESHOLD
|
|
|
|
enc = encoding_override or detect_encoding(inp)
|
|
delim = delimiter or detect_delimiter(inp, enc)
|
|
|
|
out_path = Path(output) if output else inp.parent / f"{inp.stem}_standardized.csv"
|
|
|
|
typer.echo(
|
|
f"Reading {inp.name} ({file_size/1024/1024:.1f} MB; "
|
|
f"{'streaming' if use_stream else 'in-memory'} mode)..."
|
|
)
|
|
|
|
if use_stream:
|
|
if not apply:
|
|
typer.echo(
|
|
"\nStreaming mode does not produce a preview. "
|
|
"Re-run with --apply to write output, or remove --stream to preview a sample."
|
|
)
|
|
raise typer.Exit(0)
|
|
|
|
last_log = [0.0]
|
|
import time as _time
|
|
|
|
def _progress(rows, chunks):
|
|
now = _time.perf_counter()
|
|
if now - last_log[0] < 1.0:
|
|
return
|
|
last_log[0] = now
|
|
typer.echo(f" ... {rows:,} rows ({chunks} chunks)")
|
|
|
|
t0 = _time.perf_counter()
|
|
res = standardize_file(
|
|
inp, out_path, options,
|
|
chunk_size=chunk_size,
|
|
progress_callback=_progress,
|
|
encoding=enc,
|
|
delimiter=delim,
|
|
)
|
|
elapsed = _time.perf_counter() - t0
|
|
typer.echo(f"\n{'─'*60}")
|
|
typer.echo(f" File: {inp.name}")
|
|
typer.echo(f" Rows: {res.rows_processed:,}")
|
|
typer.echo(f" Chunks: {res.chunks_processed}")
|
|
typer.echo(f" Cells changed: {res.cells_changed:,}")
|
|
typer.echo(
|
|
f" Cells unparseable: {res.cells_unparseable:,} / {res.cells_total:,}"
|
|
)
|
|
typer.echo(
|
|
f" Throughput: {res.rows_processed / max(elapsed, 1e-9):,.0f} rows/sec"
|
|
)
|
|
typer.echo(f" Elapsed: {elapsed:.2f}s")
|
|
typer.echo(f"{'─'*60}")
|
|
typer.echo(f"\nStandardized: {res.output_path}")
|
|
if res.audit_path:
|
|
typer.echo(f"Changes audit: {res.audit_path}")
|
|
typer.echo(f"Log: {log_path}")
|
|
return
|
|
|
|
# In-memory path
|
|
try:
|
|
df = read_file(
|
|
inp, encoding=enc, delimiter=delim, repair=False,
|
|
)
|
|
if not isinstance(df, pd.DataFrame):
|
|
df = pd.concat(list(df), ignore_index=True)
|
|
except Exception as e:
|
|
typer.echo(f"Error reading file: {e}", err=True)
|
|
raise typer.Exit(1)
|
|
|
|
typer.echo(f" {len(df):,} rows, {len(df.columns)} columns")
|
|
|
|
typer.echo("Standardizing...")
|
|
try:
|
|
result = standardize_dataframe(df, options)
|
|
except (ValueError, OSError) as e:
|
|
typer.echo(f"Error: {e}", err=True)
|
|
raise typer.Exit(1)
|
|
|
|
pct = (result.cells_changed / result.cells_total * 100) if result.cells_total else 0
|
|
typer.echo(f"\n{'─'*60}")
|
|
typer.echo(f" File: {inp.name}")
|
|
typer.echo(f" Columns processed: {len(result.columns_processed)}")
|
|
typer.echo(f" Cells scanned: {result.cells_total:,}")
|
|
typer.echo(f" Cells changed: {result.cells_changed:,} ({pct:.1f}%)")
|
|
typer.echo(f" Cells unparseable: {result.cells_unparseable:,}")
|
|
typer.echo(f"{'─'*60}")
|
|
if result.cells_changed and not result.changes.empty:
|
|
typer.echo("\nFirst examples:")
|
|
for _, row in result.changes.head(5).iterrows():
|
|
old = repr(row["old"])[:40]
|
|
new = repr(row["new"])[:40]
|
|
typer.echo(
|
|
f" Row {row['row'] + 1}, {row['column']} "
|
|
f"({row['field_type']}): {old} → {new}"
|
|
)
|
|
|
|
if apply:
|
|
from src.core.io import write_file
|
|
write_file(result.standardized_df, out_path)
|
|
typer.echo(f"\nStandardized: {out_path}")
|
|
if not result.changes.empty:
|
|
audit_path = inp.parent / f"{inp.stem}_changes.csv"
|
|
write_file(result.changes, audit_path)
|
|
typer.echo(f"Changes audit: {audit_path}")
|
|
else:
|
|
typer.echo("\nThis was a preview. Add --apply to write the output.")
|
|
|
|
typer.echo(f"Log: {log_path}")
|
|
|
|
|
|
def main():
|
|
from src.cli_license_guard import guard
|
|
from src.license import FeatureFlag
|
|
guard(feature=FeatureFlag.FORMAT_STANDARDIZER.value)
|
|
app()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|