570 lines
20 KiB
Python
570 lines
20 KiB
Python
"""Processes: reusable workflows/checklists with runs and step tracking."""
|
|
|
|
from fastapi import APIRouter, Request, Form, Depends
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.responses import RedirectResponse
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import text
|
|
from typing import Optional
|
|
from datetime import datetime, timezone
|
|
|
|
from core.database import get_db
|
|
from core.base_repository import BaseRepository
|
|
from core.sidebar import get_sidebar_data
|
|
|
|
router = APIRouter(prefix="/processes", tags=["processes"])
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
|
|
# ── Process Template CRUD ─────────────────────────────────────
|
|
|
|
@router.get("/")
|
|
async def list_processes(
|
|
request: Request,
|
|
status: Optional[str] = None,
|
|
process_type: Optional[str] = None,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
sidebar = await get_sidebar_data(db)
|
|
filters = {}
|
|
if status:
|
|
filters["status"] = status
|
|
if process_type:
|
|
filters["process_type"] = process_type
|
|
|
|
repo = BaseRepository("processes", db)
|
|
items = await repo.list(filters=filters, sort="sort_order")
|
|
|
|
# Get step counts per process
|
|
result = await db.execute(text("""
|
|
SELECT process_id, count(*) as step_count
|
|
FROM process_steps WHERE is_deleted = false
|
|
GROUP BY process_id
|
|
"""))
|
|
step_counts = {str(r.process_id): r.step_count for r in result}
|
|
|
|
for item in items:
|
|
item["step_count"] = step_counts.get(str(item["id"]), 0)
|
|
|
|
return templates.TemplateResponse("processes.html", {
|
|
"request": request, "sidebar": sidebar, "items": items,
|
|
"current_status": status or "",
|
|
"current_type": process_type or "",
|
|
"page_title": "Processes", "active_nav": "processes",
|
|
})
|
|
|
|
|
|
@router.get("/create")
|
|
async def create_form(request: Request, db: AsyncSession = Depends(get_db)):
|
|
sidebar = await get_sidebar_data(db)
|
|
return templates.TemplateResponse("processes_form.html", {
|
|
"request": request, "sidebar": sidebar, "item": None,
|
|
"page_title": "New Process", "active_nav": "processes",
|
|
})
|
|
|
|
|
|
@router.post("/create")
|
|
async def create_process(
|
|
request: Request,
|
|
name: str = Form(...),
|
|
description: Optional[str] = Form(None),
|
|
process_type: str = Form("checklist"),
|
|
status: str = Form("draft"),
|
|
category: Optional[str] = Form(None),
|
|
tags: Optional[str] = Form(None),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
repo = BaseRepository("processes", db)
|
|
data = {
|
|
"name": name,
|
|
"description": description,
|
|
"process_type": process_type,
|
|
"status": status,
|
|
}
|
|
if category and category.strip():
|
|
data["category"] = category
|
|
if tags and tags.strip():
|
|
data["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
|
|
|
|
item = await repo.create(data)
|
|
return RedirectResponse(url=f"/processes/{item['id']}", status_code=303)
|
|
|
|
|
|
@router.get("/runs")
|
|
async def list_all_runs(
|
|
request: Request,
|
|
status: Optional[str] = None,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""List all runs across all processes."""
|
|
sidebar = await get_sidebar_data(db)
|
|
|
|
where = "pr.is_deleted = false"
|
|
params = {}
|
|
if status:
|
|
where += " AND pr.status = :status"
|
|
params["status"] = status
|
|
|
|
result = await db.execute(text(f"""
|
|
SELECT pr.*, p.name as process_name,
|
|
proj.name as project_name,
|
|
(SELECT count(*) FROM process_run_steps WHERE run_id = pr.id AND is_deleted = false) as total_steps,
|
|
(SELECT count(*) FROM process_run_steps WHERE run_id = pr.id AND is_deleted = false AND status = 'completed') as completed_steps
|
|
FROM process_runs pr
|
|
JOIN processes p ON pr.process_id = p.id
|
|
LEFT JOIN projects proj ON pr.project_id = proj.id
|
|
WHERE {where}
|
|
ORDER BY pr.created_at DESC
|
|
"""), params)
|
|
items = [dict(r._mapping) for r in result]
|
|
|
|
return templates.TemplateResponse("process_runs.html", {
|
|
"request": request, "sidebar": sidebar, "items": items,
|
|
"current_status": status or "",
|
|
"page_title": "All Process Runs", "active_nav": "processes",
|
|
})
|
|
|
|
|
|
@router.get("/runs/{run_id}")
|
|
async def run_detail(run_id: str, request: Request, db: AsyncSession = Depends(get_db)):
|
|
"""View a specific process run with step checklist."""
|
|
sidebar = await get_sidebar_data(db)
|
|
|
|
# Get the run with process info
|
|
result = await db.execute(text("""
|
|
SELECT pr.*, p.name as process_name, p.id as process_id_ref,
|
|
proj.name as project_name,
|
|
c.first_name as contact_first, c.last_name as contact_last
|
|
FROM process_runs pr
|
|
JOIN processes p ON pr.process_id = p.id
|
|
LEFT JOIN projects proj ON pr.project_id = proj.id
|
|
LEFT JOIN contacts c ON pr.contact_id = c.id
|
|
WHERE pr.id = :id
|
|
"""), {"id": run_id})
|
|
run = result.first()
|
|
if not run:
|
|
return RedirectResponse(url="/processes/runs", status_code=303)
|
|
run = dict(run._mapping)
|
|
|
|
# Get run steps
|
|
result = await db.execute(text("""
|
|
SELECT * FROM process_run_steps
|
|
WHERE run_id = :run_id AND is_deleted = false
|
|
ORDER BY sort_order, created_at
|
|
"""), {"run_id": run_id})
|
|
steps = [dict(r._mapping) for r in result]
|
|
|
|
total = len(steps)
|
|
completed = sum(1 for s in steps if s["status"] == "completed")
|
|
|
|
# Get linked tasks via junction table
|
|
result = await db.execute(text("""
|
|
SELECT t.id, t.title, t.status, t.priority,
|
|
prt.run_step_id,
|
|
p.name as project_name
|
|
FROM process_run_tasks prt
|
|
JOIN tasks t ON prt.task_id = t.id
|
|
LEFT JOIN projects p ON t.project_id = p.id
|
|
WHERE prt.run_step_id IN (
|
|
SELECT id FROM process_run_steps WHERE run_id = :run_id
|
|
)
|
|
ORDER BY t.created_at
|
|
"""), {"run_id": run_id})
|
|
tasks = [dict(r._mapping) for r in result]
|
|
|
|
# Map tasks to their steps
|
|
step_tasks = {}
|
|
for task in tasks:
|
|
sid = str(task["run_step_id"])
|
|
step_tasks.setdefault(sid, []).append(task)
|
|
|
|
return templates.TemplateResponse("process_run_detail.html", {
|
|
"request": request, "sidebar": sidebar,
|
|
"run": run, "steps": steps, "tasks": tasks,
|
|
"step_tasks": step_tasks,
|
|
"total_steps": total, "completed_steps": completed,
|
|
"page_title": run["title"], "active_nav": "processes",
|
|
})
|
|
|
|
|
|
@router.get("/{process_id}")
|
|
async def process_detail(process_id: str, request: Request, db: AsyncSession = Depends(get_db)):
|
|
repo = BaseRepository("processes", db)
|
|
sidebar = await get_sidebar_data(db)
|
|
item = await repo.get(process_id)
|
|
if not item:
|
|
return RedirectResponse(url="/processes", status_code=303)
|
|
|
|
# Get steps
|
|
result = await db.execute(text("""
|
|
SELECT * FROM process_steps
|
|
WHERE process_id = :pid AND is_deleted = false
|
|
ORDER BY sort_order, created_at
|
|
"""), {"pid": process_id})
|
|
steps = [dict(r._mapping) for r in result]
|
|
|
|
# Get runs
|
|
result = await db.execute(text("""
|
|
SELECT pr.*,
|
|
proj.name as project_name,
|
|
(SELECT count(*) FROM process_run_steps WHERE run_id = pr.id AND is_deleted = false) as total_steps,
|
|
(SELECT count(*) FROM process_run_steps WHERE run_id = pr.id AND is_deleted = false AND status = 'completed') as completed_steps
|
|
FROM process_runs pr
|
|
LEFT JOIN projects proj ON pr.project_id = proj.id
|
|
WHERE pr.process_id = :pid AND pr.is_deleted = false
|
|
ORDER BY pr.created_at DESC
|
|
"""), {"pid": process_id})
|
|
runs = [dict(r._mapping) for r in result]
|
|
|
|
# Load projects and contacts for "Start Run" form
|
|
projects_repo = BaseRepository("projects", db)
|
|
projects = await projects_repo.list()
|
|
contacts_repo = BaseRepository("contacts", db)
|
|
contacts = await contacts_repo.list()
|
|
|
|
return templates.TemplateResponse("processes_detail.html", {
|
|
"request": request, "sidebar": sidebar,
|
|
"item": item, "steps": steps, "runs": runs,
|
|
"projects": projects, "contacts": contacts,
|
|
"page_title": item["name"], "active_nav": "processes",
|
|
})
|
|
|
|
|
|
@router.get("/{process_id}/edit")
|
|
async def edit_form(process_id: str, request: Request, db: AsyncSession = Depends(get_db)):
|
|
repo = BaseRepository("processes", db)
|
|
sidebar = await get_sidebar_data(db)
|
|
item = await repo.get(process_id)
|
|
if not item:
|
|
return RedirectResponse(url="/processes", status_code=303)
|
|
|
|
return templates.TemplateResponse("processes_form.html", {
|
|
"request": request, "sidebar": sidebar, "item": item,
|
|
"page_title": "Edit Process", "active_nav": "processes",
|
|
})
|
|
|
|
|
|
@router.post("/{process_id}/edit")
|
|
async def update_process(
|
|
process_id: str,
|
|
name: str = Form(...),
|
|
description: Optional[str] = Form(None),
|
|
process_type: str = Form("checklist"),
|
|
status: str = Form("draft"),
|
|
category: Optional[str] = Form(None),
|
|
tags: Optional[str] = Form(None),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
repo = BaseRepository("processes", db)
|
|
data = {
|
|
"name": name,
|
|
"description": description,
|
|
"process_type": process_type,
|
|
"status": status,
|
|
"category": category if category and category.strip() else None,
|
|
}
|
|
if tags and tags.strip():
|
|
data["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
|
|
else:
|
|
data["tags"] = None
|
|
|
|
await repo.update(process_id, data)
|
|
return RedirectResponse(url=f"/processes/{process_id}", status_code=303)
|
|
|
|
|
|
@router.post("/{process_id}/delete")
|
|
async def delete_process(process_id: str, request: Request, db: AsyncSession = Depends(get_db)):
|
|
repo = BaseRepository("processes", db)
|
|
await repo.soft_delete(process_id)
|
|
return RedirectResponse(url="/processes", status_code=303)
|
|
|
|
|
|
# ── Process Steps ─────────────────────────────────────────────
|
|
|
|
@router.post("/{process_id}/steps/add")
|
|
async def add_step(
|
|
process_id: str,
|
|
title: str = Form(...),
|
|
instructions: Optional[str] = Form(None),
|
|
expected_output: Optional[str] = Form(None),
|
|
estimated_days: Optional[str] = Form(None),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
# Get current max sort_order
|
|
result = await db.execute(text("""
|
|
SELECT coalesce(max(sort_order), -1) + 1 as next_order
|
|
FROM process_steps WHERE process_id = :pid AND is_deleted = false
|
|
"""), {"pid": process_id})
|
|
next_order = result.scalar()
|
|
|
|
repo = BaseRepository("process_steps", db)
|
|
data = {
|
|
"process_id": process_id,
|
|
"title": title,
|
|
"sort_order": next_order,
|
|
}
|
|
if instructions and instructions.strip():
|
|
data["instructions"] = instructions
|
|
if expected_output and expected_output.strip():
|
|
data["expected_output"] = expected_output
|
|
if estimated_days and estimated_days.strip():
|
|
data["estimated_days"] = int(estimated_days)
|
|
|
|
await repo.create(data)
|
|
return RedirectResponse(url=f"/processes/{process_id}", status_code=303)
|
|
|
|
|
|
@router.post("/{process_id}/steps/{step_id}/edit")
|
|
async def edit_step(
|
|
process_id: str,
|
|
step_id: str,
|
|
title: str = Form(...),
|
|
instructions: Optional[str] = Form(None),
|
|
expected_output: Optional[str] = Form(None),
|
|
estimated_days: Optional[str] = Form(None),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
repo = BaseRepository("process_steps", db)
|
|
data = {
|
|
"title": title,
|
|
"instructions": instructions if instructions and instructions.strip() else None,
|
|
"expected_output": expected_output if expected_output and expected_output.strip() else None,
|
|
"estimated_days": int(estimated_days) if estimated_days and estimated_days.strip() else None,
|
|
}
|
|
await repo.update(step_id, data)
|
|
return RedirectResponse(url=f"/processes/{process_id}", status_code=303)
|
|
|
|
|
|
@router.post("/{process_id}/steps/{step_id}/delete")
|
|
async def delete_step(process_id: str, step_id: str, db: AsyncSession = Depends(get_db)):
|
|
repo = BaseRepository("process_steps", db)
|
|
await repo.soft_delete(step_id)
|
|
return RedirectResponse(url=f"/processes/{process_id}", status_code=303)
|
|
|
|
|
|
@router.post("/{process_id}/steps/reorder")
|
|
async def reorder_steps(
|
|
process_id: str,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
form = await request.form()
|
|
ids = form.getlist("step_ids")
|
|
if ids:
|
|
repo = BaseRepository("process_steps", db)
|
|
await repo.reorder(ids)
|
|
return RedirectResponse(url=f"/processes/{process_id}", status_code=303)
|
|
|
|
|
|
# ── Process Runs ──────────────────────────────────────────────
|
|
|
|
@router.post("/{process_id}/runs/start")
|
|
async def start_run(
|
|
process_id: str,
|
|
title: str = Form(...),
|
|
task_generation: str = Form("all_at_once"),
|
|
project_id: Optional[str] = Form(None),
|
|
contact_id: Optional[str] = Form(None),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Start a new process run: snapshot steps, optionally generate tasks."""
|
|
# Get process
|
|
proc_repo = BaseRepository("processes", db)
|
|
process = await proc_repo.get(process_id)
|
|
if not process:
|
|
return RedirectResponse(url="/processes", status_code=303)
|
|
|
|
# Create the run
|
|
run_repo = BaseRepository("process_runs", db)
|
|
run_data = {
|
|
"process_id": process_id,
|
|
"title": title,
|
|
"status": "in_progress",
|
|
"process_type": process["process_type"],
|
|
"task_generation": task_generation,
|
|
"started_at": datetime.now(timezone.utc),
|
|
}
|
|
if project_id and project_id.strip():
|
|
run_data["project_id"] = project_id
|
|
if contact_id and contact_id.strip():
|
|
run_data["contact_id"] = contact_id
|
|
|
|
run = await run_repo.create(run_data)
|
|
|
|
# Snapshot steps from the process template
|
|
result = await db.execute(text("""
|
|
SELECT * FROM process_steps
|
|
WHERE process_id = :pid AND is_deleted = false
|
|
ORDER BY sort_order, created_at
|
|
"""), {"pid": process_id})
|
|
template_steps = [dict(r._mapping) for r in result]
|
|
|
|
step_repo = BaseRepository("process_run_steps", db)
|
|
run_steps = []
|
|
for step in template_steps:
|
|
rs = await step_repo.create({
|
|
"run_id": str(run["id"]),
|
|
"title": step["title"],
|
|
"instructions": step.get("instructions"),
|
|
"status": "pending",
|
|
"sort_order": step["sort_order"],
|
|
})
|
|
run_steps.append(rs)
|
|
|
|
# Task generation
|
|
if run_steps:
|
|
await _generate_tasks(db, run, run_steps, task_generation)
|
|
|
|
return RedirectResponse(url=f"/processes/runs/{run['id']}", status_code=303)
|
|
|
|
|
|
async def _generate_tasks(db, run, run_steps, mode):
|
|
"""Generate tasks for run steps based on mode."""
|
|
task_repo = BaseRepository("tasks", db)
|
|
|
|
# Get a default domain for tasks
|
|
result = await db.execute(text(
|
|
"SELECT id FROM domains WHERE is_deleted = false ORDER BY sort_order LIMIT 1"
|
|
))
|
|
row = result.first()
|
|
default_domain_id = str(row[0]) if row else None
|
|
|
|
if not default_domain_id:
|
|
return
|
|
|
|
if mode == "all_at_once":
|
|
steps_to_generate = run_steps
|
|
else: # step_by_step
|
|
steps_to_generate = [run_steps[0]]
|
|
|
|
for step in steps_to_generate:
|
|
task_data = {
|
|
"title": step["title"],
|
|
"description": step.get("instructions") or "",
|
|
"status": "open",
|
|
"priority": 3,
|
|
"domain_id": default_domain_id,
|
|
}
|
|
if run.get("project_id"):
|
|
task_data["project_id"] = str(run["project_id"])
|
|
|
|
task = await task_repo.create(task_data)
|
|
|
|
# Link via junction table
|
|
await db.execute(text("""
|
|
INSERT INTO process_run_tasks (run_step_id, task_id)
|
|
VALUES (:rsid, :tid)
|
|
ON CONFLICT DO NOTHING
|
|
"""), {"rsid": str(step["id"]), "tid": str(task["id"])})
|
|
|
|
|
|
@router.post("/runs/{run_id}/steps/{step_id}/complete")
|
|
async def complete_step(
|
|
run_id: str,
|
|
step_id: str,
|
|
notes: Optional[str] = Form(None),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Mark a run step as completed."""
|
|
now = datetime.now(timezone.utc)
|
|
step_repo = BaseRepository("process_run_steps", db)
|
|
await step_repo.update(step_id, {
|
|
"status": "completed",
|
|
"completed_at": now,
|
|
"notes": notes if notes and notes.strip() else None,
|
|
})
|
|
|
|
# If step_by_step mode, generate task for next pending step
|
|
result = await db.execute(text("""
|
|
SELECT pr.task_generation FROM process_runs pr WHERE pr.id = :rid
|
|
"""), {"rid": run_id})
|
|
run_row = result.first()
|
|
|
|
if run_row and run_row.task_generation == "step_by_step":
|
|
# Find next pending step
|
|
result = await db.execute(text("""
|
|
SELECT * FROM process_run_steps
|
|
WHERE run_id = :rid AND is_deleted = false AND status = 'pending'
|
|
ORDER BY sort_order LIMIT 1
|
|
"""), {"rid": run_id})
|
|
next_step = result.first()
|
|
|
|
if next_step:
|
|
next_step = dict(next_step._mapping)
|
|
# Get the full run for project_id
|
|
run_repo = BaseRepository("process_runs", db)
|
|
run = await run_repo.get(run_id)
|
|
await _generate_tasks(db, run, [next_step], "all_at_once")
|
|
|
|
# Auto-complete run if all steps done
|
|
result = await db.execute(text("""
|
|
SELECT count(*) FILTER (WHERE status != 'completed') as pending
|
|
FROM process_run_steps
|
|
WHERE run_id = :rid AND is_deleted = false
|
|
"""), {"rid": run_id})
|
|
pending = result.scalar()
|
|
if pending == 0:
|
|
run_repo = BaseRepository("process_runs", db)
|
|
await run_repo.update(run_id, {
|
|
"status": "completed",
|
|
"completed_at": now,
|
|
})
|
|
|
|
return RedirectResponse(url=f"/processes/runs/{run_id}", status_code=303)
|
|
|
|
|
|
@router.post("/runs/{run_id}/steps/{step_id}/uncomplete")
|
|
async def uncomplete_step(
|
|
run_id: str,
|
|
step_id: str,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Undo step completion."""
|
|
step_repo = BaseRepository("process_run_steps", db)
|
|
await step_repo.update(step_id, {
|
|
"status": "pending",
|
|
"completed_at": None,
|
|
})
|
|
|
|
# If run was completed, reopen it
|
|
run_repo = BaseRepository("process_runs", db)
|
|
run = await run_repo.get(run_id)
|
|
if run and run["status"] == "completed":
|
|
await run_repo.update(run_id, {
|
|
"status": "in_progress",
|
|
"completed_at": None,
|
|
})
|
|
|
|
return RedirectResponse(url=f"/processes/runs/{run_id}", status_code=303)
|
|
|
|
|
|
@router.post("/runs/{run_id}/complete")
|
|
async def complete_run(run_id: str, db: AsyncSession = Depends(get_db)):
|
|
"""Mark entire run as complete."""
|
|
now = datetime.now(timezone.utc)
|
|
run_repo = BaseRepository("process_runs", db)
|
|
await run_repo.update(run_id, {
|
|
"status": "completed",
|
|
"completed_at": now,
|
|
})
|
|
|
|
# Mark all pending steps as completed too
|
|
await db.execute(text("""
|
|
UPDATE process_run_steps
|
|
SET status = 'completed', completed_at = :now, updated_at = :now
|
|
WHERE run_id = :rid AND status != 'completed' AND is_deleted = false
|
|
"""), {"rid": run_id, "now": now})
|
|
|
|
return RedirectResponse(url=f"/processes/runs/{run_id}", status_code=303)
|
|
|
|
|
|
@router.post("/runs/{run_id}/delete")
|
|
async def delete_run(run_id: str, db: AsyncSession = Depends(get_db)):
|
|
# Get process_id before deleting for redirect
|
|
run_repo = BaseRepository("process_runs", db)
|
|
run = await run_repo.get(run_id)
|
|
await run_repo.soft_delete(run_id)
|
|
if run:
|
|
return RedirectResponse(url=f"/processes/{run['process_id']}", status_code=303)
|
|
return RedirectResponse(url="/processes/runs", status_code=303)
|