From 5c62fb611704ba24700572a8dfaa48c1de37d438 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 29 Apr 2026 15:53:11 +0000 Subject: [PATCH] =?UTF-8?q?feat(cli):=20src.cli=5Fanalyze=20=E2=80=94=20Ty?= =?UTF-8?q?per=20CLI=20for=20the=20analyzer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit python -m src.cli_analyze input.csv # rich table per tool python -m src.cli_analyze input.csv --json # array of finding dicts python -m src.cli_analyze input.csv --strict # exit 1 on warn/error python -m src.cli_analyze input.csv -n 50000 # cap rows scanned Findings are grouped by destination tool so the user can see at a glance which tool to open next. Read-only; exit code 0 unless --strict is set. The CLI keeps its own tool-id -> display-name map so it doesn't depend on the GUI module. 7 tests cover: clean-file passthrough, dirty-file table, --json round-trip, missing-file (exit 2), --strict exit code, --sample-rows cap. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cli_analyze.py | 158 ++++++++++++++++++++++++++++++++++++++ tests/test_cli_analyze.py | 97 +++++++++++++++++++++++ 2 files changed, 255 insertions(+) create mode 100644 src/cli_analyze.py create mode 100644 tests/test_cli_analyze.py diff --git a/src/cli_analyze.py b/src/cli_analyze.py new file mode 100644 index 0000000..f21d421 --- /dev/null +++ b/src/cli_analyze.py @@ -0,0 +1,158 @@ +"""CLI for the DataTools upload-time analyzer. + +Usage: + python -m src.cli_analyze input.csv # human-readable report + python -m src.cli_analyze input.csv --json # JSON to stdout + python -m src.cli_analyze input.csv --sample-rows 5000 + +The analyzer is purely advisory; exit code is always 0 on a successful scan +even when findings are present. Use --strict to exit non-zero on warnings. +""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path +from typing import Optional + +import typer +from rich.console import Console +from rich.table import Table + +from src.core.analyze import analyze, findings_by_tool, to_dict + +app = typer.Typer( + name="analyze", + help=( + "Scan a CSV or Excel file and report data quality issues with the " + "tools that can fix each one. Read-only and advisory.\n\n" + "Examples:\n\n" + " # Default scan (first 1000 rows, human-readable)\n" + " python -m src.cli_analyze customers.csv\n\n" + " # Machine-readable output for piping\n" + " python -m src.cli_analyze customers.csv --json\n\n" + " # Scan more rows on a large file\n" + " python -m src.cli_analyze big.csv --sample-rows 50000\n\n" + " # Exit non-zero when warnings exist (CI gate)\n" + " python -m src.cli_analyze customers.csv --strict\n" + ), + add_completion=False, + no_args_is_help=True, +) + + +# Tool id -> friendly display name. Kept in the CLI module since the GUI has +# its own version; both stay in lockstep with the actual script lineup. +_TOOL_DISPLAY = { + "01_deduplicator": "Deduplicator", + "02_text_cleaner": "Text Cleaner", + "03_format_standardizer": "Format Standardizer", + "04_missing_handler": "Missing Value Handler", + "05_column_mapper": "Column Mapper", + "06_outlier_detector": "Outlier Detector", + "07_multi_file_merger": "Multi-File Merger", + "08_validator_reporter": "Validator & Reporter", + "09_pipeline_runner": "Pipeline Runner", +} + + +def _tool_label(tool_id: str) -> str: + return _TOOL_DISPLAY.get(tool_id, tool_id) if tool_id else "—" + + +_SEVERITY_STYLE = { + "info": "cyan", + "warn": "yellow", + "error": "red", +} + + +@app.command() +def scan( + input_file: str = typer.Argument( + ..., help="Path to the CSV or Excel file to scan.", + ), + sample_rows: int = typer.Option( + 1000, "--sample-rows", "-n", + help="Cap on rows scanned. Default 1000.", + ), + json_out: bool = typer.Option( + False, "--json", + help="Print findings as a JSON array on stdout.", + ), + strict: bool = typer.Option( + False, "--strict", + help="Exit non-zero when any 'warn' or 'error' finding is reported.", + ), +) -> None: + path = Path(input_file) + if not path.exists(): + typer.echo(f"File not found: {path}", err=True) + raise typer.Exit(code=2) + + findings = analyze(path, sample_rows=sample_rows) + + if json_out: + typer.echo(json.dumps([to_dict(f) for f in findings], indent=2)) + _maybe_strict_exit(findings, strict) + return + + console = Console() + if not findings: + console.print(f"[green]✓[/green] No issues detected in {path.name}.") + return + + grouped = findings_by_tool(findings) + untargeted = [f for f in findings if not f.tool] + + # Top-line summary + by_sev: dict[str, int] = {} + for f in findings: + by_sev[f.severity] = by_sev.get(f.severity, 0) + 1 + summary_parts = [ + f"[{_SEVERITY_STYLE[s]}]{by_sev[s]} {s}[/{_SEVERITY_STYLE[s]}]" + for s in ("error", "warn", "info") if by_sev.get(s) + ] + console.print( + f"[bold]Scanned[/bold] {path.name}: " + f"{len(findings)} finding(s) ({', '.join(summary_parts)})." + ) + console.print() + + # Per-tool tables — surface what each downstream tool would need to do. + for tool_id in sorted(grouped): + _render_tool_table(console, tool_id, grouped[tool_id]) + + if untargeted: + _render_tool_table(console, "", untargeted, header="Informational / file-level") + + _maybe_strict_exit(findings, strict) + + +def _render_tool_table(console: Console, tool_id: str, items, header: str | None = None) -> None: + label = header or f"→ {_tool_label(tool_id)}" + table = Table(title=label, title_style="bold", show_lines=False, expand=True) + table.add_column("Severity", width=8) + table.add_column("Finding", width=32) + table.add_column("Count", justify="right", width=7) + table.add_column("Description") + for f in items: + sev = f"[{_SEVERITY_STYLE[f.severity]}]{f.severity}[/{_SEVERITY_STYLE[f.severity]}]" + table.add_row(sev, f.id, str(f.count), f.description) + console.print(table) + console.print() + + +def _maybe_strict_exit(findings, strict: bool) -> None: + if not strict: + return + if any(f.severity in ("warn", "error") for f in findings): + raise typer.Exit(code=1) + + +# Entrypoint when run via `python -m src.cli_analyze`. Typer's no_args_is_help +# kicks in when the user invokes without args; we expose the single command at +# the top level for convenience: ``python -m src.cli_analyze input.csv``. +if __name__ == "__main__": + app() diff --git a/tests/test_cli_analyze.py b/tests/test_cli_analyze.py new file mode 100644 index 0000000..6ca24b3 --- /dev/null +++ b/tests/test_cli_analyze.py @@ -0,0 +1,97 @@ +"""Tests for src.cli_analyze — Typer CLI.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest +from typer.testing import CliRunner + +from src.cli_analyze import app + + +runner = CliRunner() + + +def _make_dirty(tmp_path: Path) -> Path: + """Write a small CSV with a mix of detectable issues.""" + f = tmp_path / "dirty.csv" + f.write_bytes( + b"\xef\xbb\xbf" # BOM + b" id ,Name,Email\n" # padded header + b"1,Alice,Alice@Example.COM\n" + b"2, Bob ,bob@example.com\n" + b"3,N/A,carol@example.com\n" + ) + return f + + +class TestAnalyzeCli: + def test_clean_file_says_so(self, tmp_path): + f = tmp_path / "clean.csv" + f.write_text("id,name\n1,Alice\n2,Bob\n") + result = runner.invoke(app, [str(f)]) + assert result.exit_code == 0 + assert "No issues detected" in result.stdout + + def test_dirty_file_lists_findings(self, tmp_path): + f = _make_dirty(tmp_path) + result = runner.invoke(app, [str(f)]) + assert result.exit_code == 0 + # The Rich table breaks lines; assert on stable substrings instead of + # full finding ids. + assert "Text Cleaner" in result.stdout + assert "Missing Value" in result.stdout + # Severity column is rendered. + assert "warn" in result.stdout + + def test_json_output_round_trips(self, tmp_path): + f = _make_dirty(tmp_path) + result = runner.invoke(app, [str(f), "--json"]) + assert result.exit_code == 0 + data = json.loads(result.stdout) + assert isinstance(data, list) + assert len(data) > 0 + ids = {item["id"] for item in data} + assert "dirty_column_headers" in ids or "whitespace_padding" in ids + # Each finding has the documented shape. + for f in data: + assert {"id", "severity", "tool", "count", "description", "samples"} <= set(f) + + def test_missing_file_exits_2(self, tmp_path): + result = runner.invoke(app, [str(tmp_path / "nope.csv")]) + assert result.exit_code == 2 + assert "not found" in result.stdout.lower() or "not found" in (result.stderr or "") + + def test_strict_exits_1_on_warnings(self, tmp_path): + f = _make_dirty(tmp_path) + result = runner.invoke(app, [str(f), "--strict", "--json"]) + # JSON output is still printed, but exit code is 1 because warns exist. + assert result.exit_code == 1 + data = json.loads(result.stdout) + assert any(item["severity"] in ("warn", "error") for item in data) + + def test_strict_exits_0_on_clean(self, tmp_path): + f = tmp_path / "clean.csv" + f.write_text("id,name\n1,Alice\n2,Bob\n") + result = runner.invoke(app, [str(f), "--strict"]) + assert result.exit_code == 0 + + def test_sample_rows_caps_scan(self, tmp_path): + # Build a file where ONLY rows past 100 have NBSP padding; with + # --sample-rows 50 we should miss it. + rows = ["id,name"] + for i in range(1, 101): + rows.append(f"{i},Alice") + for i in range(101, 200): + rows.append(f"{i},Alice ") # NBSP padding + f = tmp_path / "big.csv" + f.write_text("\n".join(rows) + "\n", encoding="utf-8") + + capped = runner.invoke(app, [str(f), "--sample-rows", "50", "--json"]) + full = runner.invoke(app, [str(f), "--sample-rows", "200", "--json"]) + capped_ids = {x["id"] for x in json.loads(capped.stdout)} + full_ids = {x["id"] for x in json.loads(full.stdout)} + assert "nbsp_or_unicode_whitespace" not in capped_ids + assert "nbsp_or_unicode_whitespace" in full_ids