"""Processes: reusable workflows/checklists with runs and step tracking.""" from fastapi import APIRouter, Request, Form, Depends from fastapi.templating import Jinja2Templates from fastapi.responses import RedirectResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import text from typing import Optional from datetime import datetime, timezone from core.database import get_db from core.base_repository import BaseRepository from core.sidebar import get_sidebar_data router = APIRouter(prefix="/processes", tags=["processes"]) templates = Jinja2Templates(directory="templates") # ── Process Template CRUD ───────────────────────────────────── @router.get("/") async def list_processes( request: Request, status: Optional[str] = None, process_type: Optional[str] = None, db: AsyncSession = Depends(get_db), ): sidebar = await get_sidebar_data(db) filters = {} if status: filters["status"] = status if process_type: filters["process_type"] = process_type repo = BaseRepository("processes", db) items = await repo.list(filters=filters, sort="sort_order") # Get step counts per process result = await db.execute(text(""" SELECT process_id, count(*) as step_count FROM process_steps WHERE is_deleted = false GROUP BY process_id """)) step_counts = {str(r.process_id): r.step_count for r in result} for item in items: item["step_count"] = step_counts.get(str(item["id"]), 0) return templates.TemplateResponse("processes.html", { "request": request, "sidebar": sidebar, "items": items, "current_status": status or "", "current_type": process_type or "", "page_title": "Processes", "active_nav": "processes", }) @router.get("/create") async def create_form(request: Request, db: AsyncSession = Depends(get_db)): sidebar = await get_sidebar_data(db) return templates.TemplateResponse("processes_form.html", { "request": request, "sidebar": sidebar, "item": None, "page_title": "New Process", "active_nav": "processes", }) @router.post("/create") async def create_process( request: Request, name: str = Form(...), description: Optional[str] = Form(None), process_type: str = Form("checklist"), status: str = Form("draft"), category: Optional[str] = Form(None), tags: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): repo = BaseRepository("processes", db) data = { "name": name, "description": description, "process_type": process_type, "status": status, } if category and category.strip(): data["category"] = category if tags and tags.strip(): data["tags"] = [t.strip() for t in tags.split(",") if t.strip()] item = await repo.create(data) return RedirectResponse(url=f"/processes/{item['id']}", status_code=303) @router.get("/runs") async def list_all_runs( request: Request, status: Optional[str] = None, db: AsyncSession = Depends(get_db), ): """List all runs across all processes.""" sidebar = await get_sidebar_data(db) where = "pr.is_deleted = false" params = {} if status: where += " AND pr.status = :status" params["status"] = status result = await db.execute(text(f""" SELECT pr.*, p.name as process_name, proj.name as project_name, (SELECT count(*) FROM process_run_steps WHERE run_id = pr.id AND is_deleted = false) as total_steps, (SELECT count(*) FROM process_run_steps WHERE run_id = pr.id AND is_deleted = false AND status = 'completed') as completed_steps FROM process_runs pr JOIN processes p ON pr.process_id = p.id LEFT JOIN projects proj ON pr.project_id = proj.id WHERE {where} ORDER BY pr.created_at DESC """), params) items = [dict(r._mapping) for r in result] return templates.TemplateResponse("process_runs.html", { "request": request, "sidebar": sidebar, "items": items, "current_status": status or "", "page_title": "All Process Runs", "active_nav": "processes", }) @router.get("/runs/{run_id}") async def run_detail(run_id: str, request: Request, db: AsyncSession = Depends(get_db)): """View a specific process run with step checklist.""" sidebar = await get_sidebar_data(db) # Get the run with process info result = await db.execute(text(""" SELECT pr.*, p.name as process_name, p.id as process_id_ref, proj.name as project_name, c.first_name as contact_first, c.last_name as contact_last FROM process_runs pr JOIN processes p ON pr.process_id = p.id LEFT JOIN projects proj ON pr.project_id = proj.id LEFT JOIN contacts c ON pr.contact_id = c.id WHERE pr.id = :id """), {"id": run_id}) run = result.first() if not run: return RedirectResponse(url="/processes/runs", status_code=303) run = dict(run._mapping) # Get run steps result = await db.execute(text(""" SELECT * FROM process_run_steps WHERE run_id = :run_id AND is_deleted = false ORDER BY sort_order, created_at """), {"run_id": run_id}) steps = [dict(r._mapping) for r in result] total = len(steps) completed = sum(1 for s in steps if s["status"] == "completed") # Get linked tasks via junction table result = await db.execute(text(""" SELECT t.id, t.title, t.status, t.priority, prt.run_step_id, p.name as project_name FROM process_run_tasks prt JOIN tasks t ON prt.task_id = t.id LEFT JOIN projects p ON t.project_id = p.id WHERE prt.run_step_id IN ( SELECT id FROM process_run_steps WHERE run_id = :run_id ) ORDER BY t.created_at """), {"run_id": run_id}) tasks = [dict(r._mapping) for r in result] # Map tasks to their steps step_tasks = {} for task in tasks: sid = str(task["run_step_id"]) step_tasks.setdefault(sid, []).append(task) return templates.TemplateResponse("process_run_detail.html", { "request": request, "sidebar": sidebar, "run": run, "steps": steps, "tasks": tasks, "step_tasks": step_tasks, "total_steps": total, "completed_steps": completed, "page_title": run["title"], "active_nav": "processes", }) @router.get("/{process_id}") async def process_detail(process_id: str, request: Request, db: AsyncSession = Depends(get_db)): repo = BaseRepository("processes", db) sidebar = await get_sidebar_data(db) item = await repo.get(process_id) if not item: return RedirectResponse(url="/processes", status_code=303) # Get steps result = await db.execute(text(""" SELECT * FROM process_steps WHERE process_id = :pid AND is_deleted = false ORDER BY sort_order, created_at """), {"pid": process_id}) steps = [dict(r._mapping) for r in result] # Get runs result = await db.execute(text(""" SELECT pr.*, proj.name as project_name, (SELECT count(*) FROM process_run_steps WHERE run_id = pr.id AND is_deleted = false) as total_steps, (SELECT count(*) FROM process_run_steps WHERE run_id = pr.id AND is_deleted = false AND status = 'completed') as completed_steps FROM process_runs pr LEFT JOIN projects proj ON pr.project_id = proj.id WHERE pr.process_id = :pid AND pr.is_deleted = false ORDER BY pr.created_at DESC """), {"pid": process_id}) runs = [dict(r._mapping) for r in result] # Load projects and contacts for "Start Run" form projects_repo = BaseRepository("projects", db) projects = await projects_repo.list() contacts_repo = BaseRepository("contacts", db) contacts = await contacts_repo.list() return templates.TemplateResponse("processes_detail.html", { "request": request, "sidebar": sidebar, "item": item, "steps": steps, "runs": runs, "projects": projects, "contacts": contacts, "page_title": item["name"], "active_nav": "processes", }) @router.get("/{process_id}/edit") async def edit_form(process_id: str, request: Request, db: AsyncSession = Depends(get_db)): repo = BaseRepository("processes", db) sidebar = await get_sidebar_data(db) item = await repo.get(process_id) if not item: return RedirectResponse(url="/processes", status_code=303) return templates.TemplateResponse("processes_form.html", { "request": request, "sidebar": sidebar, "item": item, "page_title": "Edit Process", "active_nav": "processes", }) @router.post("/{process_id}/edit") async def update_process( process_id: str, name: str = Form(...), description: Optional[str] = Form(None), process_type: str = Form("checklist"), status: str = Form("draft"), category: Optional[str] = Form(None), tags: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): repo = BaseRepository("processes", db) data = { "name": name, "description": description, "process_type": process_type, "status": status, "category": category if category and category.strip() else None, } if tags and tags.strip(): data["tags"] = [t.strip() for t in tags.split(",") if t.strip()] else: data["tags"] = None await repo.update(process_id, data) return RedirectResponse(url=f"/processes/{process_id}", status_code=303) @router.post("/{process_id}/delete") async def delete_process(process_id: str, request: Request, db: AsyncSession = Depends(get_db)): repo = BaseRepository("processes", db) await repo.soft_delete(process_id) return RedirectResponse(url="/processes", status_code=303) # ── Process Steps ───────────────────────────────────────────── @router.post("/{process_id}/steps/add") async def add_step( process_id: str, title: str = Form(...), instructions: Optional[str] = Form(None), expected_output: Optional[str] = Form(None), estimated_days: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): # Get current max sort_order result = await db.execute(text(""" SELECT coalesce(max(sort_order), -1) + 1 as next_order FROM process_steps WHERE process_id = :pid AND is_deleted = false """), {"pid": process_id}) next_order = result.scalar() repo = BaseRepository("process_steps", db) data = { "process_id": process_id, "title": title, "sort_order": next_order, } if instructions and instructions.strip(): data["instructions"] = instructions if expected_output and expected_output.strip(): data["expected_output"] = expected_output if estimated_days and estimated_days.strip(): data["estimated_days"] = int(estimated_days) await repo.create(data) return RedirectResponse(url=f"/processes/{process_id}", status_code=303) @router.post("/{process_id}/steps/{step_id}/edit") async def edit_step( process_id: str, step_id: str, title: str = Form(...), instructions: Optional[str] = Form(None), expected_output: Optional[str] = Form(None), estimated_days: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): repo = BaseRepository("process_steps", db) data = { "title": title, "instructions": instructions if instructions and instructions.strip() else None, "expected_output": expected_output if expected_output and expected_output.strip() else None, "estimated_days": int(estimated_days) if estimated_days and estimated_days.strip() else None, } await repo.update(step_id, data) return RedirectResponse(url=f"/processes/{process_id}", status_code=303) @router.post("/{process_id}/steps/{step_id}/delete") async def delete_step(process_id: str, step_id: str, db: AsyncSession = Depends(get_db)): repo = BaseRepository("process_steps", db) await repo.soft_delete(step_id) return RedirectResponse(url=f"/processes/{process_id}", status_code=303) @router.post("/{process_id}/steps/reorder") async def reorder_steps( process_id: str, request: Request, db: AsyncSession = Depends(get_db), ): form = await request.form() ids = form.getlist("step_ids") if ids: repo = BaseRepository("process_steps", db) await repo.reorder(ids) return RedirectResponse(url=f"/processes/{process_id}", status_code=303) # ── Process Runs ────────────────────────────────────────────── @router.post("/{process_id}/runs/start") async def start_run( process_id: str, title: str = Form(...), task_generation: str = Form("all_at_once"), project_id: Optional[str] = Form(None), contact_id: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): """Start a new process run: snapshot steps, optionally generate tasks.""" # Get process proc_repo = BaseRepository("processes", db) process = await proc_repo.get(process_id) if not process: return RedirectResponse(url="/processes", status_code=303) # Create the run run_repo = BaseRepository("process_runs", db) run_data = { "process_id": process_id, "title": title, "status": "in_progress", "process_type": process["process_type"], "task_generation": task_generation, "started_at": datetime.now(timezone.utc), } if project_id and project_id.strip(): run_data["project_id"] = project_id if contact_id and contact_id.strip(): run_data["contact_id"] = contact_id run = await run_repo.create(run_data) # Snapshot steps from the process template result = await db.execute(text(""" SELECT * FROM process_steps WHERE process_id = :pid AND is_deleted = false ORDER BY sort_order, created_at """), {"pid": process_id}) template_steps = [dict(r._mapping) for r in result] step_repo = BaseRepository("process_run_steps", db) run_steps = [] for step in template_steps: rs = await step_repo.create({ "run_id": str(run["id"]), "title": step["title"], "instructions": step.get("instructions"), "status": "pending", "sort_order": step["sort_order"], }) run_steps.append(rs) # Task generation if run_steps: await _generate_tasks(db, run, run_steps, task_generation) return RedirectResponse(url=f"/processes/runs/{run['id']}", status_code=303) async def _generate_tasks(db, run, run_steps, mode): """Generate tasks for run steps based on mode.""" task_repo = BaseRepository("tasks", db) # Get a default domain for tasks result = await db.execute(text( "SELECT id FROM domains WHERE is_deleted = false ORDER BY sort_order LIMIT 1" )) row = result.first() default_domain_id = str(row[0]) if row else None if not default_domain_id: return if mode == "all_at_once": steps_to_generate = run_steps else: # step_by_step steps_to_generate = [run_steps[0]] for step in steps_to_generate: task_data = { "title": step["title"], "description": step.get("instructions") or "", "status": "open", "priority": 3, "domain_id": default_domain_id, } if run.get("project_id"): task_data["project_id"] = str(run["project_id"]) task = await task_repo.create(task_data) # Link via junction table await db.execute(text(""" INSERT INTO process_run_tasks (run_step_id, task_id) VALUES (:rsid, :tid) ON CONFLICT DO NOTHING """), {"rsid": str(step["id"]), "tid": str(task["id"])}) @router.post("/runs/{run_id}/steps/{step_id}/complete") async def complete_step( run_id: str, step_id: str, notes: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): """Mark a run step as completed.""" now = datetime.now(timezone.utc) step_repo = BaseRepository("process_run_steps", db) await step_repo.update(step_id, { "status": "completed", "completed_at": now, "notes": notes if notes and notes.strip() else None, }) # If step_by_step mode, generate task for next pending step result = await db.execute(text(""" SELECT pr.task_generation FROM process_runs pr WHERE pr.id = :rid """), {"rid": run_id}) run_row = result.first() if run_row and run_row.task_generation == "step_by_step": # Find next pending step result = await db.execute(text(""" SELECT * FROM process_run_steps WHERE run_id = :rid AND is_deleted = false AND status = 'pending' ORDER BY sort_order LIMIT 1 """), {"rid": run_id}) next_step = result.first() if next_step: next_step = dict(next_step._mapping) # Get the full run for project_id run_repo = BaseRepository("process_runs", db) run = await run_repo.get(run_id) await _generate_tasks(db, run, [next_step], "all_at_once") # Auto-complete run if all steps done result = await db.execute(text(""" SELECT count(*) FILTER (WHERE status != 'completed') as pending FROM process_run_steps WHERE run_id = :rid AND is_deleted = false """), {"rid": run_id}) pending = result.scalar() if pending == 0: run_repo = BaseRepository("process_runs", db) await run_repo.update(run_id, { "status": "completed", "completed_at": now, }) return RedirectResponse(url=f"/processes/runs/{run_id}", status_code=303) @router.post("/runs/{run_id}/steps/{step_id}/uncomplete") async def uncomplete_step( run_id: str, step_id: str, db: AsyncSession = Depends(get_db), ): """Undo step completion.""" step_repo = BaseRepository("process_run_steps", db) await step_repo.update(step_id, { "status": "pending", "completed_at": None, }) # If run was completed, reopen it run_repo = BaseRepository("process_runs", db) run = await run_repo.get(run_id) if run and run["status"] == "completed": await run_repo.update(run_id, { "status": "in_progress", "completed_at": None, }) return RedirectResponse(url=f"/processes/runs/{run_id}", status_code=303) @router.post("/runs/{run_id}/complete") async def complete_run(run_id: str, db: AsyncSession = Depends(get_db)): """Mark entire run as complete.""" now = datetime.now(timezone.utc) run_repo = BaseRepository("process_runs", db) await run_repo.update(run_id, { "status": "completed", "completed_at": now, }) # Mark all pending steps as completed too await db.execute(text(""" UPDATE process_run_steps SET status = 'completed', completed_at = :now, updated_at = :now WHERE run_id = :rid AND status != 'completed' AND is_deleted = false """), {"rid": run_id, "now": now}) return RedirectResponse(url=f"/processes/runs/{run_id}", status_code=303) @router.post("/runs/{run_id}/delete") async def delete_run(run_id: str, db: AsyncSession = Depends(get_db)): # Get process_id before deleting for redirect run_repo = BaseRepository("process_runs", db) run = await run_repo.get(run_id) await run_repo.soft_delete(run_id) if run: return RedirectResponse(url=f"/processes/{run['process_id']}", status_code=303) return RedirectResponse(url="/processes/runs", status_code=303)