feat(cli): src.cli_analyze — Typer CLI for the analyzer
python -m src.cli_analyze input.csv # rich table per tool python -m src.cli_analyze input.csv --json # array of finding dicts python -m src.cli_analyze input.csv --strict # exit 1 on warn/error python -m src.cli_analyze input.csv -n 50000 # cap rows scanned Findings are grouped by destination tool so the user can see at a glance which tool to open next. Read-only; exit code 0 unless --strict is set. The CLI keeps its own tool-id -> display-name map so it doesn't depend on the GUI module. 7 tests cover: clean-file passthrough, dirty-file table, --json round-trip, missing-file (exit 2), --strict exit code, --sample-rows cap. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
158
src/cli_analyze.py
Normal file
158
src/cli_analyze.py
Normal file
@@ -0,0 +1,158 @@
|
||||
"""CLI for the DataTools upload-time analyzer.
|
||||
|
||||
Usage:
|
||||
python -m src.cli_analyze input.csv # human-readable report
|
||||
python -m src.cli_analyze input.csv --json # JSON to stdout
|
||||
python -m src.cli_analyze input.csv --sample-rows 5000
|
||||
|
||||
The analyzer is purely advisory; exit code is always 0 on a successful scan
|
||||
even when findings are present. Use --strict to exit non-zero on warnings.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import typer
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
from src.core.analyze import analyze, findings_by_tool, to_dict
|
||||
|
||||
app = typer.Typer(
|
||||
name="analyze",
|
||||
help=(
|
||||
"Scan a CSV or Excel file and report data quality issues with the "
|
||||
"tools that can fix each one. Read-only and advisory.\n\n"
|
||||
"Examples:\n\n"
|
||||
" # Default scan (first 1000 rows, human-readable)\n"
|
||||
" python -m src.cli_analyze customers.csv\n\n"
|
||||
" # Machine-readable output for piping\n"
|
||||
" python -m src.cli_analyze customers.csv --json\n\n"
|
||||
" # Scan more rows on a large file\n"
|
||||
" python -m src.cli_analyze big.csv --sample-rows 50000\n\n"
|
||||
" # Exit non-zero when warnings exist (CI gate)\n"
|
||||
" python -m src.cli_analyze customers.csv --strict\n"
|
||||
),
|
||||
add_completion=False,
|
||||
no_args_is_help=True,
|
||||
)
|
||||
|
||||
|
||||
# Tool id -> friendly display name. Kept in the CLI module since the GUI has
|
||||
# its own version; both stay in lockstep with the actual script lineup.
|
||||
_TOOL_DISPLAY = {
|
||||
"01_deduplicator": "Deduplicator",
|
||||
"02_text_cleaner": "Text Cleaner",
|
||||
"03_format_standardizer": "Format Standardizer",
|
||||
"04_missing_handler": "Missing Value Handler",
|
||||
"05_column_mapper": "Column Mapper",
|
||||
"06_outlier_detector": "Outlier Detector",
|
||||
"07_multi_file_merger": "Multi-File Merger",
|
||||
"08_validator_reporter": "Validator & Reporter",
|
||||
"09_pipeline_runner": "Pipeline Runner",
|
||||
}
|
||||
|
||||
|
||||
def _tool_label(tool_id: str) -> str:
|
||||
return _TOOL_DISPLAY.get(tool_id, tool_id) if tool_id else "—"
|
||||
|
||||
|
||||
_SEVERITY_STYLE = {
|
||||
"info": "cyan",
|
||||
"warn": "yellow",
|
||||
"error": "red",
|
||||
}
|
||||
|
||||
|
||||
@app.command()
|
||||
def scan(
|
||||
input_file: str = typer.Argument(
|
||||
..., help="Path to the CSV or Excel file to scan.",
|
||||
),
|
||||
sample_rows: int = typer.Option(
|
||||
1000, "--sample-rows", "-n",
|
||||
help="Cap on rows scanned. Default 1000.",
|
||||
),
|
||||
json_out: bool = typer.Option(
|
||||
False, "--json",
|
||||
help="Print findings as a JSON array on stdout.",
|
||||
),
|
||||
strict: bool = typer.Option(
|
||||
False, "--strict",
|
||||
help="Exit non-zero when any 'warn' or 'error' finding is reported.",
|
||||
),
|
||||
) -> None:
|
||||
path = Path(input_file)
|
||||
if not path.exists():
|
||||
typer.echo(f"File not found: {path}", err=True)
|
||||
raise typer.Exit(code=2)
|
||||
|
||||
findings = analyze(path, sample_rows=sample_rows)
|
||||
|
||||
if json_out:
|
||||
typer.echo(json.dumps([to_dict(f) for f in findings], indent=2))
|
||||
_maybe_strict_exit(findings, strict)
|
||||
return
|
||||
|
||||
console = Console()
|
||||
if not findings:
|
||||
console.print(f"[green]✓[/green] No issues detected in {path.name}.")
|
||||
return
|
||||
|
||||
grouped = findings_by_tool(findings)
|
||||
untargeted = [f for f in findings if not f.tool]
|
||||
|
||||
# Top-line summary
|
||||
by_sev: dict[str, int] = {}
|
||||
for f in findings:
|
||||
by_sev[f.severity] = by_sev.get(f.severity, 0) + 1
|
||||
summary_parts = [
|
||||
f"[{_SEVERITY_STYLE[s]}]{by_sev[s]} {s}[/{_SEVERITY_STYLE[s]}]"
|
||||
for s in ("error", "warn", "info") if by_sev.get(s)
|
||||
]
|
||||
console.print(
|
||||
f"[bold]Scanned[/bold] {path.name}: "
|
||||
f"{len(findings)} finding(s) ({', '.join(summary_parts)})."
|
||||
)
|
||||
console.print()
|
||||
|
||||
# Per-tool tables — surface what each downstream tool would need to do.
|
||||
for tool_id in sorted(grouped):
|
||||
_render_tool_table(console, tool_id, grouped[tool_id])
|
||||
|
||||
if untargeted:
|
||||
_render_tool_table(console, "", untargeted, header="Informational / file-level")
|
||||
|
||||
_maybe_strict_exit(findings, strict)
|
||||
|
||||
|
||||
def _render_tool_table(console: Console, tool_id: str, items, header: str | None = None) -> None:
|
||||
label = header or f"→ {_tool_label(tool_id)}"
|
||||
table = Table(title=label, title_style="bold", show_lines=False, expand=True)
|
||||
table.add_column("Severity", width=8)
|
||||
table.add_column("Finding", width=32)
|
||||
table.add_column("Count", justify="right", width=7)
|
||||
table.add_column("Description")
|
||||
for f in items:
|
||||
sev = f"[{_SEVERITY_STYLE[f.severity]}]{f.severity}[/{_SEVERITY_STYLE[f.severity]}]"
|
||||
table.add_row(sev, f.id, str(f.count), f.description)
|
||||
console.print(table)
|
||||
console.print()
|
||||
|
||||
|
||||
def _maybe_strict_exit(findings, strict: bool) -> None:
|
||||
if not strict:
|
||||
return
|
||||
if any(f.severity in ("warn", "error") for f in findings):
|
||||
raise typer.Exit(code=1)
|
||||
|
||||
|
||||
# Entrypoint when run via `python -m src.cli_analyze`. Typer's no_args_is_help
|
||||
# kicks in when the user invokes without args; we expose the single command at
|
||||
# the top level for convenience: ``python -m src.cli_analyze input.csv``.
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
97
tests/test_cli_analyze.py
Normal file
97
tests/test_cli_analyze.py
Normal file
@@ -0,0 +1,97 @@
|
||||
"""Tests for src.cli_analyze — Typer CLI."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from typer.testing import CliRunner
|
||||
|
||||
from src.cli_analyze import app
|
||||
|
||||
|
||||
runner = CliRunner()
|
||||
|
||||
|
||||
def _make_dirty(tmp_path: Path) -> Path:
|
||||
"""Write a small CSV with a mix of detectable issues."""
|
||||
f = tmp_path / "dirty.csv"
|
||||
f.write_bytes(
|
||||
b"\xef\xbb\xbf" # BOM
|
||||
b" id ,Name,Email\n" # padded header
|
||||
b"1,Alice,Alice@Example.COM\n"
|
||||
b"2, Bob ,bob@example.com\n"
|
||||
b"3,N/A,carol@example.com\n"
|
||||
)
|
||||
return f
|
||||
|
||||
|
||||
class TestAnalyzeCli:
|
||||
def test_clean_file_says_so(self, tmp_path):
|
||||
f = tmp_path / "clean.csv"
|
||||
f.write_text("id,name\n1,Alice\n2,Bob\n")
|
||||
result = runner.invoke(app, [str(f)])
|
||||
assert result.exit_code == 0
|
||||
assert "No issues detected" in result.stdout
|
||||
|
||||
def test_dirty_file_lists_findings(self, tmp_path):
|
||||
f = _make_dirty(tmp_path)
|
||||
result = runner.invoke(app, [str(f)])
|
||||
assert result.exit_code == 0
|
||||
# The Rich table breaks lines; assert on stable substrings instead of
|
||||
# full finding ids.
|
||||
assert "Text Cleaner" in result.stdout
|
||||
assert "Missing Value" in result.stdout
|
||||
# Severity column is rendered.
|
||||
assert "warn" in result.stdout
|
||||
|
||||
def test_json_output_round_trips(self, tmp_path):
|
||||
f = _make_dirty(tmp_path)
|
||||
result = runner.invoke(app, [str(f), "--json"])
|
||||
assert result.exit_code == 0
|
||||
data = json.loads(result.stdout)
|
||||
assert isinstance(data, list)
|
||||
assert len(data) > 0
|
||||
ids = {item["id"] for item in data}
|
||||
assert "dirty_column_headers" in ids or "whitespace_padding" in ids
|
||||
# Each finding has the documented shape.
|
||||
for f in data:
|
||||
assert {"id", "severity", "tool", "count", "description", "samples"} <= set(f)
|
||||
|
||||
def test_missing_file_exits_2(self, tmp_path):
|
||||
result = runner.invoke(app, [str(tmp_path / "nope.csv")])
|
||||
assert result.exit_code == 2
|
||||
assert "not found" in result.stdout.lower() or "not found" in (result.stderr or "")
|
||||
|
||||
def test_strict_exits_1_on_warnings(self, tmp_path):
|
||||
f = _make_dirty(tmp_path)
|
||||
result = runner.invoke(app, [str(f), "--strict", "--json"])
|
||||
# JSON output is still printed, but exit code is 1 because warns exist.
|
||||
assert result.exit_code == 1
|
||||
data = json.loads(result.stdout)
|
||||
assert any(item["severity"] in ("warn", "error") for item in data)
|
||||
|
||||
def test_strict_exits_0_on_clean(self, tmp_path):
|
||||
f = tmp_path / "clean.csv"
|
||||
f.write_text("id,name\n1,Alice\n2,Bob\n")
|
||||
result = runner.invoke(app, [str(f), "--strict"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_sample_rows_caps_scan(self, tmp_path):
|
||||
# Build a file where ONLY rows past 100 have NBSP padding; with
|
||||
# --sample-rows 50 we should miss it.
|
||||
rows = ["id,name"]
|
||||
for i in range(1, 101):
|
||||
rows.append(f"{i},Alice")
|
||||
for i in range(101, 200):
|
||||
rows.append(f"{i},Alice ") # NBSP padding
|
||||
f = tmp_path / "big.csv"
|
||||
f.write_text("\n".join(rows) + "\n", encoding="utf-8")
|
||||
|
||||
capped = runner.invoke(app, [str(f), "--sample-rows", "50", "--json"])
|
||||
full = runner.invoke(app, [str(f), "--sample-rows", "200", "--json"])
|
||||
capped_ids = {x["id"] for x in json.loads(capped.stdout)}
|
||||
full_ids = {x["id"] for x in json.loads(full.stdout)}
|
||||
assert "nbsp_or_unicode_whitespace" not in capped_ids
|
||||
assert "nbsp_or_unicode_whitespace" in full_ids
|
||||
Reference in New Issue
Block a user