"""CLI for the DataTools Automated Workflows tool (script 09). Usage: # Run the recommended default pipeline (text → format → missing → dedup): python -m src.cli_pipeline input.csv --apply # Quick custom order via --steps: python -m src.cli_pipeline input.csv \\ --steps text_clean,format_standardize,missing --apply # Save the recommended pipeline to a JSON for editing: python -m src.cli_pipeline --recommend --output pipeline.json # Run a saved pipeline: python -m src.cli_pipeline weekly_export.csv --pipeline pipeline.json --apply # Strict mode: fail if the pipeline contains soft-dependency violations python -m src.cli_pipeline data.csv --steps dedup,text_clean \\ --strict --apply """ from __future__ import annotations import json import sys from datetime import datetime from pathlib import Path from typing import Optional import typer from loguru import logger app = typer.Typer( name="pipeline", help=( "Chain DataTools cleaning steps into one orchestrated workflow.\n\n" "Default behaviour: preview the plan + run the pipeline (no file " "written). Add --apply to write the cleaned output and audit log.\n\n" "The pipeline RECOMMENDS an order based on tool dependencies " "(text-clean before format-standardize, format before dedup, etc.) " "and WARNS on out-of-order configs but does not block them. Use " "--strict to escalate warnings to errors.\n\n" "Tools available: text_clean, format_standardize, missing, " "column_map, dedup." ), add_completion=False, no_args_is_help=False, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _setup_logging(log_dir: Path) -> Path: log_dir.mkdir(parents=True, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") log_path = log_dir / f"pipeline_{ts}.log" logger.remove() logger.add(sys.stderr, level="WARNING", format="{message}") logger.add( str(log_path), level="DEBUG", format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {message}", ) return log_path def _split_csv_arg(raw: Optional[str]) -> Optional[list[str]]: if raw is None: return None return [c.strip() for c in raw.split(",") if c.strip()] # --------------------------------------------------------------------------- # Main command # --------------------------------------------------------------------------- @app.command() def run( input_file: Optional[str] = typer.Argument( None, help="CSV / TSV / Excel file. Optional with --recommend.", ), pipeline_path: Optional[str] = typer.Option( None, "--pipeline", "-p", help="Path to a pipeline JSON file (Pipeline.from_file format).", ), steps: Optional[str] = typer.Option( None, "--steps", help=( "Quick pipeline: comma-separated tool names in execution order. " "Each step uses defaults. Example: 'text_clean,format_standardize,dedup'." ), ), recommend: bool = typer.Option( False, "--recommend", help="Print (or save) the recommended default pipeline and exit.", ), output: Optional[str] = typer.Option( None, "--output", "-o", help=( "When --recommend is set, save the pipeline JSON here. " "Otherwise, write the pipeline output to this CSV path " "(default: {input}_pipeline.csv)." ), ), apply: bool = typer.Option( False, "--apply", help="Write the output. Without this flag, only the plan is shown.", ), strict: bool = typer.Option( False, "--strict", help="Treat soft-dependency warnings as errors (refuse to run).", ), continue_on_error: bool = typer.Option( False, "--continue-on-error", help="Don't abort if a step fails; carry the previous step's df forward.", ), encoding_override: Optional[str] = typer.Option( None, "--encoding", help="Override auto-detected file encoding.", ), delimiter: Optional[str] = typer.Option( None, "--delimiter", help="Override auto-detected delimiter.", ), ): """Run a DataTools cleaning pipeline.""" from src.core.pipeline import ( Pipeline, recommended_pipeline, run_pipeline, validate_pipeline, ) # ------------------------------------------------------------------ # --recommend: print or save the default pipeline and exit # ------------------------------------------------------------------ if recommend: pipe = recommended_pipeline() body = json.dumps(pipe.to_dict(), indent=2) if output: Path(output).write_text(body) typer.echo(f"Recommended pipeline saved to {output}") else: typer.echo(body) return if not input_file: typer.echo( "Error: input file is required (or use --recommend to " "emit the default pipeline).", err=True, ) raise typer.Exit(2) inp = Path(input_file) if not inp.exists(): typer.echo(f"Error: File not found: {inp}", err=True) raise typer.Exit(1) log_path = _setup_logging(Path("logs")) # ------------------------------------------------------------------ # Resolve pipeline source: --pipeline file, --steps list, or default # ------------------------------------------------------------------ if pipeline_path and steps: typer.echo( "Error: pass either --pipeline or --steps, not both.", err=True, ) raise typer.Exit(1) if pipeline_path: pp = Path(pipeline_path) if not pp.exists(): typer.echo(f"Error: pipeline file not found: {pp}", err=True) raise typer.Exit(1) try: pipe = Pipeline.from_file(pp) except Exception as e: from src.core.errors import format_for_user typer.echo(f"Error reading pipeline: {format_for_user(e)}", err=True) raise typer.Exit(1) elif steps: names = _split_csv_arg(steps) or [] try: pipe = recommended_pipeline(include=names) except Exception as e: from src.core.errors import format_for_user typer.echo(f"Error: {format_for_user(e)}", err=True) raise typer.Exit(1) else: pipe = recommended_pipeline() # ------------------------------------------------------------------ # Plan + warnings # ------------------------------------------------------------------ warnings = validate_pipeline(pipe) typer.echo(f"\n{'─'*60}") typer.echo(" Pipeline plan:") for i, step in enumerate(pipe.steps, 1): flag = " " if step.enabled else "✗ " typer.echo(f" {i}. {flag}{step.display_name():<22} options={step.options or {}}") typer.echo(f"{'─'*60}") if warnings: typer.echo("\nSoft-dependency warnings (recommended order violated):") for w in warnings: typer.echo(f" ! {w}") if strict: typer.echo( "\nAborting: --strict was set. Reorder the steps or drop --strict.", err=True, ) raise typer.Exit(2) if not apply: typer.echo( "\nThis was a plan-only run. Add --apply to execute the pipeline." ) typer.echo(f"Log: {log_path}") return # ------------------------------------------------------------------ # Read input + execute # ------------------------------------------------------------------ from src.core.io import read_file, write_file import pandas as pd typer.echo(f"\nReading {inp.name}...") try: df = read_file( inp, encoding=encoding_override, delimiter=delimiter, repair=False, ) if not isinstance(df, pd.DataFrame): df = pd.concat(list(df), ignore_index=True) except Exception as e: typer.echo(f"Error reading file: {e}", err=True) raise typer.Exit(1) typer.echo(f" {len(df):,} rows, {len(df.columns)} columns") typer.echo("\nExecuting pipeline:") def _on_step(sr) -> None: if sr.skipped: typer.echo(f" - {sr.step.display_name()} (skipped)") elif sr.error: typer.echo(f" ✗ {sr.step.display_name()} ({sr.elapsed_seconds*1000:.0f} ms) — ERROR: {sr.error.splitlines()[0]}") else: typer.echo(f" ✓ {sr.step.display_name()} ({sr.elapsed_seconds*1000:.0f} ms) {sr.summary}") try: result = run_pipeline( df, pipe, on_step_complete=_on_step, stop_on_error=not continue_on_error, ) except Exception as e: from src.core.errors import format_for_user typer.echo(f"\nPipeline halted: {format_for_user(e)}", err=True) raise typer.Exit(1) typer.echo(f"\n{'─'*60}") typer.echo(f" Initial rows: {result.initial_rows:,}") typer.echo(f" Final rows: {result.final_rows:,}") typer.echo(f" Steps run: {sum(1 for s in result.step_results if not s.skipped)}") typer.echo(f" Total elapsed: {result.total_elapsed:.2f} s") typer.echo(f"{'─'*60}") # ------------------------------------------------------------------ # Write output + audit # ------------------------------------------------------------------ out_path = Path(output) if output else inp.parent / f"{inp.stem}_pipeline.csv" write_file(result.final_df, out_path) typer.echo(f"\nPipeline output: {out_path}") audit_path = inp.parent / f"{inp.stem}_pipeline.json" audit_path.write_text(json.dumps({ "pipeline": pipe.to_dict(), "warnings": result.warnings, "initial_rows": result.initial_rows, "final_rows": result.final_rows, "total_elapsed_seconds": result.total_elapsed, "steps": [ { "tool": sr.step.tool, "name": sr.step.display_name(), "enabled": sr.step.enabled, "skipped": sr.skipped, "elapsed_seconds": sr.elapsed_seconds, "summary": sr.summary, "error": sr.error, } for sr in result.step_results ], }, indent=2, default=str)) typer.echo(f"Pipeline audit: {audit_path}") typer.echo(f"Log: {log_path}") def main() -> None: from src.cli_license_guard import guard from src.license import FeatureFlag guard(feature=FeatureFlag.PIPELINE_RUNNER.value) app() if __name__ == "__main__": main()