Initial commit
This commit is contained in:
569
routers/processes.py
Normal file
569
routers/processes.py
Normal file
@@ -0,0 +1,569 @@
|
||||
"""Processes: reusable workflows/checklists with runs and step tracking."""
|
||||
|
||||
from fastapi import APIRouter, Request, Form, Depends
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from fastapi.responses import RedirectResponse
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import text
|
||||
from typing import Optional
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from core.database import get_db
|
||||
from core.base_repository import BaseRepository
|
||||
from core.sidebar import get_sidebar_data
|
||||
|
||||
router = APIRouter(prefix="/processes", tags=["processes"])
|
||||
templates = Jinja2Templates(directory="templates")
|
||||
|
||||
|
||||
# ── Process Template CRUD ─────────────────────────────────────
|
||||
|
||||
@router.get("/")
|
||||
async def list_processes(
|
||||
request: Request,
|
||||
status: Optional[str] = None,
|
||||
process_type: Optional[str] = None,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
sidebar = await get_sidebar_data(db)
|
||||
filters = {}
|
||||
if status:
|
||||
filters["status"] = status
|
||||
if process_type:
|
||||
filters["process_type"] = process_type
|
||||
|
||||
repo = BaseRepository("processes", db)
|
||||
items = await repo.list(filters=filters, sort="sort_order")
|
||||
|
||||
# Get step counts per process
|
||||
result = await db.execute(text("""
|
||||
SELECT process_id, count(*) as step_count
|
||||
FROM process_steps WHERE is_deleted = false
|
||||
GROUP BY process_id
|
||||
"""))
|
||||
step_counts = {str(r.process_id): r.step_count for r in result}
|
||||
|
||||
for item in items:
|
||||
item["step_count"] = step_counts.get(str(item["id"]), 0)
|
||||
|
||||
return templates.TemplateResponse("processes.html", {
|
||||
"request": request, "sidebar": sidebar, "items": items,
|
||||
"current_status": status or "",
|
||||
"current_type": process_type or "",
|
||||
"page_title": "Processes", "active_nav": "processes",
|
||||
})
|
||||
|
||||
|
||||
@router.get("/create")
|
||||
async def create_form(request: Request, db: AsyncSession = Depends(get_db)):
|
||||
sidebar = await get_sidebar_data(db)
|
||||
return templates.TemplateResponse("processes_form.html", {
|
||||
"request": request, "sidebar": sidebar, "item": None,
|
||||
"page_title": "New Process", "active_nav": "processes",
|
||||
})
|
||||
|
||||
|
||||
@router.post("/create")
|
||||
async def create_process(
|
||||
request: Request,
|
||||
name: str = Form(...),
|
||||
description: Optional[str] = Form(None),
|
||||
process_type: str = Form("checklist"),
|
||||
status: str = Form("draft"),
|
||||
category: Optional[str] = Form(None),
|
||||
tags: Optional[str] = Form(None),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
repo = BaseRepository("processes", db)
|
||||
data = {
|
||||
"name": name,
|
||||
"description": description,
|
||||
"process_type": process_type,
|
||||
"status": status,
|
||||
}
|
||||
if category and category.strip():
|
||||
data["category"] = category
|
||||
if tags and tags.strip():
|
||||
data["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
|
||||
|
||||
item = await repo.create(data)
|
||||
return RedirectResponse(url=f"/processes/{item['id']}", status_code=303)
|
||||
|
||||
|
||||
@router.get("/runs")
|
||||
async def list_all_runs(
|
||||
request: Request,
|
||||
status: Optional[str] = None,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
"""List all runs across all processes."""
|
||||
sidebar = await get_sidebar_data(db)
|
||||
|
||||
where = "pr.is_deleted = false"
|
||||
params = {}
|
||||
if status:
|
||||
where += " AND pr.status = :status"
|
||||
params["status"] = status
|
||||
|
||||
result = await db.execute(text(f"""
|
||||
SELECT pr.*, p.name as process_name,
|
||||
proj.name as project_name,
|
||||
(SELECT count(*) FROM process_run_steps WHERE run_id = pr.id AND is_deleted = false) as total_steps,
|
||||
(SELECT count(*) FROM process_run_steps WHERE run_id = pr.id AND is_deleted = false AND status = 'completed') as completed_steps
|
||||
FROM process_runs pr
|
||||
JOIN processes p ON pr.process_id = p.id
|
||||
LEFT JOIN projects proj ON pr.project_id = proj.id
|
||||
WHERE {where}
|
||||
ORDER BY pr.created_at DESC
|
||||
"""), params)
|
||||
items = [dict(r._mapping) for r in result]
|
||||
|
||||
return templates.TemplateResponse("process_runs.html", {
|
||||
"request": request, "sidebar": sidebar, "items": items,
|
||||
"current_status": status or "",
|
||||
"page_title": "All Process Runs", "active_nav": "processes",
|
||||
})
|
||||
|
||||
|
||||
@router.get("/runs/{run_id}")
|
||||
async def run_detail(run_id: str, request: Request, db: AsyncSession = Depends(get_db)):
|
||||
"""View a specific process run with step checklist."""
|
||||
sidebar = await get_sidebar_data(db)
|
||||
|
||||
# Get the run with process info
|
||||
result = await db.execute(text("""
|
||||
SELECT pr.*, p.name as process_name, p.id as process_id_ref,
|
||||
proj.name as project_name,
|
||||
c.first_name as contact_first, c.last_name as contact_last
|
||||
FROM process_runs pr
|
||||
JOIN processes p ON pr.process_id = p.id
|
||||
LEFT JOIN projects proj ON pr.project_id = proj.id
|
||||
LEFT JOIN contacts c ON pr.contact_id = c.id
|
||||
WHERE pr.id = :id
|
||||
"""), {"id": run_id})
|
||||
run = result.first()
|
||||
if not run:
|
||||
return RedirectResponse(url="/processes/runs", status_code=303)
|
||||
run = dict(run._mapping)
|
||||
|
||||
# Get run steps
|
||||
result = await db.execute(text("""
|
||||
SELECT * FROM process_run_steps
|
||||
WHERE run_id = :run_id AND is_deleted = false
|
||||
ORDER BY sort_order, created_at
|
||||
"""), {"run_id": run_id})
|
||||
steps = [dict(r._mapping) for r in result]
|
||||
|
||||
total = len(steps)
|
||||
completed = sum(1 for s in steps if s["status"] == "completed")
|
||||
|
||||
# Get linked tasks via junction table
|
||||
result = await db.execute(text("""
|
||||
SELECT t.id, t.title, t.status, t.priority,
|
||||
prt.run_step_id,
|
||||
p.name as project_name
|
||||
FROM process_run_tasks prt
|
||||
JOIN tasks t ON prt.task_id = t.id
|
||||
LEFT JOIN projects p ON t.project_id = p.id
|
||||
WHERE prt.run_step_id IN (
|
||||
SELECT id FROM process_run_steps WHERE run_id = :run_id
|
||||
)
|
||||
ORDER BY t.created_at
|
||||
"""), {"run_id": run_id})
|
||||
tasks = [dict(r._mapping) for r in result]
|
||||
|
||||
# Map tasks to their steps
|
||||
step_tasks = {}
|
||||
for task in tasks:
|
||||
sid = str(task["run_step_id"])
|
||||
step_tasks.setdefault(sid, []).append(task)
|
||||
|
||||
return templates.TemplateResponse("process_run_detail.html", {
|
||||
"request": request, "sidebar": sidebar,
|
||||
"run": run, "steps": steps, "tasks": tasks,
|
||||
"step_tasks": step_tasks,
|
||||
"total_steps": total, "completed_steps": completed,
|
||||
"page_title": run["title"], "active_nav": "processes",
|
||||
})
|
||||
|
||||
|
||||
@router.get("/{process_id}")
|
||||
async def process_detail(process_id: str, request: Request, db: AsyncSession = Depends(get_db)):
|
||||
repo = BaseRepository("processes", db)
|
||||
sidebar = await get_sidebar_data(db)
|
||||
item = await repo.get(process_id)
|
||||
if not item:
|
||||
return RedirectResponse(url="/processes", status_code=303)
|
||||
|
||||
# Get steps
|
||||
result = await db.execute(text("""
|
||||
SELECT * FROM process_steps
|
||||
WHERE process_id = :pid AND is_deleted = false
|
||||
ORDER BY sort_order, created_at
|
||||
"""), {"pid": process_id})
|
||||
steps = [dict(r._mapping) for r in result]
|
||||
|
||||
# Get runs
|
||||
result = await db.execute(text("""
|
||||
SELECT pr.*,
|
||||
proj.name as project_name,
|
||||
(SELECT count(*) FROM process_run_steps WHERE run_id = pr.id AND is_deleted = false) as total_steps,
|
||||
(SELECT count(*) FROM process_run_steps WHERE run_id = pr.id AND is_deleted = false AND status = 'completed') as completed_steps
|
||||
FROM process_runs pr
|
||||
LEFT JOIN projects proj ON pr.project_id = proj.id
|
||||
WHERE pr.process_id = :pid AND pr.is_deleted = false
|
||||
ORDER BY pr.created_at DESC
|
||||
"""), {"pid": process_id})
|
||||
runs = [dict(r._mapping) for r in result]
|
||||
|
||||
# Load projects and contacts for "Start Run" form
|
||||
projects_repo = BaseRepository("projects", db)
|
||||
projects = await projects_repo.list()
|
||||
contacts_repo = BaseRepository("contacts", db)
|
||||
contacts = await contacts_repo.list()
|
||||
|
||||
return templates.TemplateResponse("processes_detail.html", {
|
||||
"request": request, "sidebar": sidebar,
|
||||
"item": item, "steps": steps, "runs": runs,
|
||||
"projects": projects, "contacts": contacts,
|
||||
"page_title": item["name"], "active_nav": "processes",
|
||||
})
|
||||
|
||||
|
||||
@router.get("/{process_id}/edit")
|
||||
async def edit_form(process_id: str, request: Request, db: AsyncSession = Depends(get_db)):
|
||||
repo = BaseRepository("processes", db)
|
||||
sidebar = await get_sidebar_data(db)
|
||||
item = await repo.get(process_id)
|
||||
if not item:
|
||||
return RedirectResponse(url="/processes", status_code=303)
|
||||
|
||||
return templates.TemplateResponse("processes_form.html", {
|
||||
"request": request, "sidebar": sidebar, "item": item,
|
||||
"page_title": "Edit Process", "active_nav": "processes",
|
||||
})
|
||||
|
||||
|
||||
@router.post("/{process_id}/edit")
|
||||
async def update_process(
|
||||
process_id: str,
|
||||
name: str = Form(...),
|
||||
description: Optional[str] = Form(None),
|
||||
process_type: str = Form("checklist"),
|
||||
status: str = Form("draft"),
|
||||
category: Optional[str] = Form(None),
|
||||
tags: Optional[str] = Form(None),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
repo = BaseRepository("processes", db)
|
||||
data = {
|
||||
"name": name,
|
||||
"description": description,
|
||||
"process_type": process_type,
|
||||
"status": status,
|
||||
"category": category if category and category.strip() else None,
|
||||
}
|
||||
if tags and tags.strip():
|
||||
data["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
|
||||
else:
|
||||
data["tags"] = None
|
||||
|
||||
await repo.update(process_id, data)
|
||||
return RedirectResponse(url=f"/processes/{process_id}", status_code=303)
|
||||
|
||||
|
||||
@router.post("/{process_id}/delete")
|
||||
async def delete_process(process_id: str, request: Request, db: AsyncSession = Depends(get_db)):
|
||||
repo = BaseRepository("processes", db)
|
||||
await repo.soft_delete(process_id)
|
||||
return RedirectResponse(url="/processes", status_code=303)
|
||||
|
||||
|
||||
# ── Process Steps ─────────────────────────────────────────────
|
||||
|
||||
@router.post("/{process_id}/steps/add")
|
||||
async def add_step(
|
||||
process_id: str,
|
||||
title: str = Form(...),
|
||||
instructions: Optional[str] = Form(None),
|
||||
expected_output: Optional[str] = Form(None),
|
||||
estimated_days: Optional[str] = Form(None),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
# Get current max sort_order
|
||||
result = await db.execute(text("""
|
||||
SELECT coalesce(max(sort_order), -1) + 1 as next_order
|
||||
FROM process_steps WHERE process_id = :pid AND is_deleted = false
|
||||
"""), {"pid": process_id})
|
||||
next_order = result.scalar()
|
||||
|
||||
repo = BaseRepository("process_steps", db)
|
||||
data = {
|
||||
"process_id": process_id,
|
||||
"title": title,
|
||||
"sort_order": next_order,
|
||||
}
|
||||
if instructions and instructions.strip():
|
||||
data["instructions"] = instructions
|
||||
if expected_output and expected_output.strip():
|
||||
data["expected_output"] = expected_output
|
||||
if estimated_days and estimated_days.strip():
|
||||
data["estimated_days"] = int(estimated_days)
|
||||
|
||||
await repo.create(data)
|
||||
return RedirectResponse(url=f"/processes/{process_id}", status_code=303)
|
||||
|
||||
|
||||
@router.post("/{process_id}/steps/{step_id}/edit")
|
||||
async def edit_step(
|
||||
process_id: str,
|
||||
step_id: str,
|
||||
title: str = Form(...),
|
||||
instructions: Optional[str] = Form(None),
|
||||
expected_output: Optional[str] = Form(None),
|
||||
estimated_days: Optional[str] = Form(None),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
repo = BaseRepository("process_steps", db)
|
||||
data = {
|
||||
"title": title,
|
||||
"instructions": instructions if instructions and instructions.strip() else None,
|
||||
"expected_output": expected_output if expected_output and expected_output.strip() else None,
|
||||
"estimated_days": int(estimated_days) if estimated_days and estimated_days.strip() else None,
|
||||
}
|
||||
await repo.update(step_id, data)
|
||||
return RedirectResponse(url=f"/processes/{process_id}", status_code=303)
|
||||
|
||||
|
||||
@router.post("/{process_id}/steps/{step_id}/delete")
|
||||
async def delete_step(process_id: str, step_id: str, db: AsyncSession = Depends(get_db)):
|
||||
repo = BaseRepository("process_steps", db)
|
||||
await repo.soft_delete(step_id)
|
||||
return RedirectResponse(url=f"/processes/{process_id}", status_code=303)
|
||||
|
||||
|
||||
@router.post("/{process_id}/steps/reorder")
|
||||
async def reorder_steps(
|
||||
process_id: str,
|
||||
request: Request,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
form = await request.form()
|
||||
ids = form.getlist("step_ids")
|
||||
if ids:
|
||||
repo = BaseRepository("process_steps", db)
|
||||
await repo.reorder(ids)
|
||||
return RedirectResponse(url=f"/processes/{process_id}", status_code=303)
|
||||
|
||||
|
||||
# ── Process Runs ──────────────────────────────────────────────
|
||||
|
||||
@router.post("/{process_id}/runs/start")
|
||||
async def start_run(
|
||||
process_id: str,
|
||||
title: str = Form(...),
|
||||
task_generation: str = Form("all_at_once"),
|
||||
project_id: Optional[str] = Form(None),
|
||||
contact_id: Optional[str] = Form(None),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
"""Start a new process run: snapshot steps, optionally generate tasks."""
|
||||
# Get process
|
||||
proc_repo = BaseRepository("processes", db)
|
||||
process = await proc_repo.get(process_id)
|
||||
if not process:
|
||||
return RedirectResponse(url="/processes", status_code=303)
|
||||
|
||||
# Create the run
|
||||
run_repo = BaseRepository("process_runs", db)
|
||||
run_data = {
|
||||
"process_id": process_id,
|
||||
"title": title,
|
||||
"status": "in_progress",
|
||||
"process_type": process["process_type"],
|
||||
"task_generation": task_generation,
|
||||
"started_at": datetime.now(timezone.utc),
|
||||
}
|
||||
if project_id and project_id.strip():
|
||||
run_data["project_id"] = project_id
|
||||
if contact_id and contact_id.strip():
|
||||
run_data["contact_id"] = contact_id
|
||||
|
||||
run = await run_repo.create(run_data)
|
||||
|
||||
# Snapshot steps from the process template
|
||||
result = await db.execute(text("""
|
||||
SELECT * FROM process_steps
|
||||
WHERE process_id = :pid AND is_deleted = false
|
||||
ORDER BY sort_order, created_at
|
||||
"""), {"pid": process_id})
|
||||
template_steps = [dict(r._mapping) for r in result]
|
||||
|
||||
step_repo = BaseRepository("process_run_steps", db)
|
||||
run_steps = []
|
||||
for step in template_steps:
|
||||
rs = await step_repo.create({
|
||||
"run_id": str(run["id"]),
|
||||
"title": step["title"],
|
||||
"instructions": step.get("instructions"),
|
||||
"status": "pending",
|
||||
"sort_order": step["sort_order"],
|
||||
})
|
||||
run_steps.append(rs)
|
||||
|
||||
# Task generation
|
||||
if run_steps:
|
||||
await _generate_tasks(db, run, run_steps, task_generation)
|
||||
|
||||
return RedirectResponse(url=f"/processes/runs/{run['id']}", status_code=303)
|
||||
|
||||
|
||||
async def _generate_tasks(db, run, run_steps, mode):
|
||||
"""Generate tasks for run steps based on mode."""
|
||||
task_repo = BaseRepository("tasks", db)
|
||||
|
||||
# Get a default domain for tasks
|
||||
result = await db.execute(text(
|
||||
"SELECT id FROM domains WHERE is_deleted = false ORDER BY sort_order LIMIT 1"
|
||||
))
|
||||
row = result.first()
|
||||
default_domain_id = str(row[0]) if row else None
|
||||
|
||||
if not default_domain_id:
|
||||
return
|
||||
|
||||
if mode == "all_at_once":
|
||||
steps_to_generate = run_steps
|
||||
else: # step_by_step
|
||||
steps_to_generate = [run_steps[0]]
|
||||
|
||||
for step in steps_to_generate:
|
||||
task_data = {
|
||||
"title": step["title"],
|
||||
"description": step.get("instructions") or "",
|
||||
"status": "open",
|
||||
"priority": 3,
|
||||
"domain_id": default_domain_id,
|
||||
}
|
||||
if run.get("project_id"):
|
||||
task_data["project_id"] = str(run["project_id"])
|
||||
|
||||
task = await task_repo.create(task_data)
|
||||
|
||||
# Link via junction table
|
||||
await db.execute(text("""
|
||||
INSERT INTO process_run_tasks (run_step_id, task_id)
|
||||
VALUES (:rsid, :tid)
|
||||
ON CONFLICT DO NOTHING
|
||||
"""), {"rsid": str(step["id"]), "tid": str(task["id"])})
|
||||
|
||||
|
||||
@router.post("/runs/{run_id}/steps/{step_id}/complete")
|
||||
async def complete_step(
|
||||
run_id: str,
|
||||
step_id: str,
|
||||
notes: Optional[str] = Form(None),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
"""Mark a run step as completed."""
|
||||
now = datetime.now(timezone.utc)
|
||||
step_repo = BaseRepository("process_run_steps", db)
|
||||
await step_repo.update(step_id, {
|
||||
"status": "completed",
|
||||
"completed_at": now,
|
||||
"notes": notes if notes and notes.strip() else None,
|
||||
})
|
||||
|
||||
# If step_by_step mode, generate task for next pending step
|
||||
result = await db.execute(text("""
|
||||
SELECT pr.task_generation FROM process_runs pr WHERE pr.id = :rid
|
||||
"""), {"rid": run_id})
|
||||
run_row = result.first()
|
||||
|
||||
if run_row and run_row.task_generation == "step_by_step":
|
||||
# Find next pending step
|
||||
result = await db.execute(text("""
|
||||
SELECT * FROM process_run_steps
|
||||
WHERE run_id = :rid AND is_deleted = false AND status = 'pending'
|
||||
ORDER BY sort_order LIMIT 1
|
||||
"""), {"rid": run_id})
|
||||
next_step = result.first()
|
||||
|
||||
if next_step:
|
||||
next_step = dict(next_step._mapping)
|
||||
# Get the full run for project_id
|
||||
run_repo = BaseRepository("process_runs", db)
|
||||
run = await run_repo.get(run_id)
|
||||
await _generate_tasks(db, run, [next_step], "all_at_once")
|
||||
|
||||
# Auto-complete run if all steps done
|
||||
result = await db.execute(text("""
|
||||
SELECT count(*) FILTER (WHERE status != 'completed') as pending
|
||||
FROM process_run_steps
|
||||
WHERE run_id = :rid AND is_deleted = false
|
||||
"""), {"rid": run_id})
|
||||
pending = result.scalar()
|
||||
if pending == 0:
|
||||
run_repo = BaseRepository("process_runs", db)
|
||||
await run_repo.update(run_id, {
|
||||
"status": "completed",
|
||||
"completed_at": now,
|
||||
})
|
||||
|
||||
return RedirectResponse(url=f"/processes/runs/{run_id}", status_code=303)
|
||||
|
||||
|
||||
@router.post("/runs/{run_id}/steps/{step_id}/uncomplete")
|
||||
async def uncomplete_step(
|
||||
run_id: str,
|
||||
step_id: str,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
"""Undo step completion."""
|
||||
step_repo = BaseRepository("process_run_steps", db)
|
||||
await step_repo.update(step_id, {
|
||||
"status": "pending",
|
||||
"completed_at": None,
|
||||
})
|
||||
|
||||
# If run was completed, reopen it
|
||||
run_repo = BaseRepository("process_runs", db)
|
||||
run = await run_repo.get(run_id)
|
||||
if run and run["status"] == "completed":
|
||||
await run_repo.update(run_id, {
|
||||
"status": "in_progress",
|
||||
"completed_at": None,
|
||||
})
|
||||
|
||||
return RedirectResponse(url=f"/processes/runs/{run_id}", status_code=303)
|
||||
|
||||
|
||||
@router.post("/runs/{run_id}/complete")
|
||||
async def complete_run(run_id: str, db: AsyncSession = Depends(get_db)):
|
||||
"""Mark entire run as complete."""
|
||||
now = datetime.now(timezone.utc)
|
||||
run_repo = BaseRepository("process_runs", db)
|
||||
await run_repo.update(run_id, {
|
||||
"status": "completed",
|
||||
"completed_at": now,
|
||||
})
|
||||
|
||||
# Mark all pending steps as completed too
|
||||
await db.execute(text("""
|
||||
UPDATE process_run_steps
|
||||
SET status = 'completed', completed_at = :now, updated_at = :now
|
||||
WHERE run_id = :rid AND status != 'completed' AND is_deleted = false
|
||||
"""), {"rid": run_id, "now": now})
|
||||
|
||||
return RedirectResponse(url=f"/processes/runs/{run_id}", status_code=303)
|
||||
|
||||
|
||||
@router.post("/runs/{run_id}/delete")
|
||||
async def delete_run(run_id: str, db: AsyncSession = Depends(get_db)):
|
||||
# Get process_id before deleting for redirect
|
||||
run_repo = BaseRepository("process_runs", db)
|
||||
run = await run_repo.get(run_id)
|
||||
await run_repo.soft_delete(run_id)
|
||||
if run:
|
||||
return RedirectResponse(url=f"/processes/{run['process_id']}", status_code=303)
|
||||
return RedirectResponse(url="/processes/runs", status_code=303)
|
||||
Reference in New Issue
Block a user