Files
lifeos-prod/routers/capture.py
2026-03-03 00:44:33 +00:00

362 lines
12 KiB
Python

"""Capture: quick text capture queue with conversion to any entity type."""
import re
from uuid import uuid4
from typing import Optional
from fastapi import APIRouter, Request, Form, Depends
from fastapi.templating import Jinja2Templates
from fastapi.responses import RedirectResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from core.database import get_db
from core.base_repository import BaseRepository
from core.sidebar import get_sidebar_data
from routers.weblinks import get_default_folder_id
router = APIRouter(prefix="/capture", tags=["capture"])
templates = Jinja2Templates(directory="templates")
CONVERT_TYPES = {
"task": "Task",
"note": "Note",
"project": "Project",
"list_item": "List Item",
"contact": "Contact",
"decision": "Decision",
"link": "Link",
}
@router.get("/")
async def list_capture(request: Request, show: str = "inbox", db: AsyncSession = Depends(get_db)):
sidebar = await get_sidebar_data(db)
if show == "processed":
where = "is_deleted = false AND processed = true"
elif show == "all":
where = "is_deleted = false"
else: # inbox
where = "is_deleted = false AND processed = false"
result = await db.execute(text(f"""
SELECT * FROM capture WHERE {where} ORDER BY created_at DESC
"""))
items = [dict(r._mapping) for r in result]
# Mark first item per batch for batch-undo display
batches = {}
for item in items:
bid = item.get("import_batch_id")
if bid:
bid_str = str(bid)
if bid_str not in batches:
batches[bid_str] = 0
item["_batch_first"] = True
else:
item["_batch_first"] = False
batches[bid_str] += 1
else:
item["_batch_first"] = False
# Get lists for list_item conversion
result = await db.execute(text(
"SELECT id, name FROM lists WHERE is_deleted = false ORDER BY name"
))
all_lists = [dict(r._mapping) for r in result]
return templates.TemplateResponse("capture.html", {
"request": request, "sidebar": sidebar, "items": items,
"show": show, "batches": batches, "all_lists": all_lists,
"convert_types": CONVERT_TYPES,
"page_title": "Capture", "active_nav": "capture",
})
@router.post("/add")
async def add_capture(
request: Request,
raw_text: str = Form(...),
redirect_to: Optional[str] = Form(None),
area_id: Optional[str] = Form(None),
project_id: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
repo = BaseRepository("capture", db)
lines = [l.strip() for l in raw_text.strip().split("\n") if l.strip()]
batch_id = str(uuid4()) if len(lines) > 1 else None
for line in lines:
data = {"raw_text": line, "processed": False}
if batch_id:
data["import_batch_id"] = batch_id
if area_id and area_id.strip():
data["area_id"] = area_id
if project_id and project_id.strip():
data["project_id"] = project_id
await repo.create(data)
url = redirect_to if redirect_to and redirect_to.startswith("/") else "/capture"
return RedirectResponse(url=url, status_code=303)
# ---- Batch undo (must be before /{capture_id} routes) ----
@router.post("/batch/{batch_id}/undo")
async def batch_undo(batch_id: str, db: AsyncSession = Depends(get_db)):
"""Delete all items from a batch."""
await db.execute(text("""
UPDATE capture SET is_deleted = true, deleted_at = now(), updated_at = now()
WHERE import_batch_id = :bid AND is_deleted = false
"""), {"bid": batch_id})
await db.commit()
return RedirectResponse(url="/capture", status_code=303)
# ---- Conversion form page ----
@router.get("/{capture_id}/convert/{convert_type}")
async def convert_form(
capture_id: str, convert_type: str,
request: Request, db: AsyncSession = Depends(get_db),
):
"""Show conversion form for a specific capture item."""
repo = BaseRepository("capture", db)
item = await repo.get(capture_id)
if not item or convert_type not in CONVERT_TYPES:
return RedirectResponse(url="/capture", status_code=303)
sidebar = await get_sidebar_data(db)
result = await db.execute(text(
"SELECT id, name FROM lists WHERE is_deleted = false ORDER BY name"
))
all_lists = [dict(r._mapping) for r in result]
# Parse name for contact pre-fill
parts = item["raw_text"].strip().split(None, 1)
first_name = parts[0] if parts else item["raw_text"]
last_name = parts[1] if len(parts) > 1 else ""
# Extract URL for weblink pre-fill
url_match = re.search(r'https?://\S+', item["raw_text"])
prefill_url = url_match.group(0) if url_match else ""
prefill_label = item["raw_text"].replace(prefill_url, "").strip() if url_match else item["raw_text"]
return templates.TemplateResponse("capture_convert.html", {
"request": request, "sidebar": sidebar,
"item": item, "convert_type": convert_type,
"type_label": CONVERT_TYPES[convert_type],
"all_lists": all_lists,
"first_name": first_name, "last_name": last_name,
"prefill_url": prefill_url, "prefill_label": prefill_label or item["raw_text"],
"page_title": f"Convert to {CONVERT_TYPES[convert_type]}",
"active_nav": "capture",
})
# ---- Conversion handlers ----
@router.post("/{capture_id}/to-task")
async def convert_to_task(
capture_id: str, request: Request,
domain_id: str = Form(...),
project_id: Optional[str] = Form(None),
priority: int = Form(3),
title: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
task_repo = BaseRepository("tasks", db)
data = {
"title": (title or item["raw_text"]).strip(),
"domain_id": domain_id, "status": "open", "priority": priority,
}
if project_id and project_id.strip():
data["project_id"] = project_id
task = await task_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "task",
"converted_to_id": str(task["id"]),
})
return RedirectResponse(url=f"/tasks/{task['id']}", status_code=303)
@router.post("/{capture_id}/to-note")
async def convert_to_note(
capture_id: str, request: Request,
title: Optional[str] = Form(None),
domain_id: Optional[str] = Form(None),
project_id: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
note_repo = BaseRepository("notes", db)
raw = item["raw_text"]
data = {"title": (title or raw[:100]).strip(), "body": raw}
if domain_id and domain_id.strip():
data["domain_id"] = domain_id
if project_id and project_id.strip():
data["project_id"] = project_id
note = await note_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "note",
"converted_to_id": str(note["id"]),
})
return RedirectResponse(url=f"/notes/{note['id']}", status_code=303)
@router.post("/{capture_id}/to-project")
async def convert_to_project(
capture_id: str, request: Request,
domain_id: str = Form(...),
name: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
project_repo = BaseRepository("projects", db)
data = {"name": (name or item["raw_text"]).strip(), "domain_id": domain_id, "status": "active"}
project = await project_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "project",
"converted_to_id": str(project["id"]),
})
return RedirectResponse(url=f"/projects/{project['id']}", status_code=303)
@router.post("/{capture_id}/to-list_item")
async def convert_to_list_item(
capture_id: str, request: Request,
list_id: str = Form(...),
content: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
li_repo = BaseRepository("list_items", db)
data = {"list_id": list_id, "content": (content or item["raw_text"]).strip()}
li = await li_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "list_item",
"converted_to_id": str(li["id"]), "list_id": list_id,
})
return RedirectResponse(url=f"/lists/{list_id}", status_code=303)
@router.post("/{capture_id}/to-contact")
async def convert_to_contact(
capture_id: str, request: Request,
first_name: str = Form(...),
last_name: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
contact_repo = BaseRepository("contacts", db)
data = {"first_name": first_name.strip()}
if last_name and last_name.strip():
data["last_name"] = last_name.strip()
contact = await contact_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "contact",
"converted_to_id": str(contact["id"]),
})
return RedirectResponse(url=f"/contacts/{contact['id']}", status_code=303)
@router.post("/{capture_id}/to-decision")
async def convert_to_decision(
capture_id: str, request: Request,
title: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
decision_repo = BaseRepository("decisions", db)
data = {"title": (title or item["raw_text"]).strip(), "status": "proposed", "impact": "medium"}
decision = await decision_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "decision",
"converted_to_id": str(decision["id"]),
})
return RedirectResponse(url=f"/decisions/{decision['id']}", status_code=303)
@router.post("/{capture_id}/to-link")
async def convert_to_link(
capture_id: str, request: Request,
label: Optional[str] = Form(None),
url: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
link_repo = BaseRepository("links", db)
raw = item["raw_text"]
url_match = re.search(r'https?://\S+', raw)
link_url = (url.strip() if url and url.strip() else None) or (url_match.group(0) if url_match else raw)
link_label = (label.strip() if label and label.strip() else None) or (raw.replace(link_url, "").strip() if url_match else raw[:100])
if not link_label:
link_label = link_url
data = {"label": link_label, "url": link_url}
link = await link_repo.create(data)
# Assign to Default folder
default_fid = await get_default_folder_id(db)
await db.execute(text("""
INSERT INTO folder_links (folder_id, link_id)
VALUES (:fid, :lid) ON CONFLICT DO NOTHING
"""), {"fid": default_fid, "lid": link["id"]})
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "link",
"converted_to_id": str(link["id"]),
})
return RedirectResponse(url="/links", status_code=303)
@router.post("/{capture_id}/dismiss")
async def dismiss_capture(capture_id: str, db: AsyncSession = Depends(get_db)):
repo = BaseRepository("capture", db)
await repo.update(capture_id, {"processed": True, "converted_to_type": "dismissed"})
return RedirectResponse(url="/capture", status_code=303)
@router.post("/{capture_id}/delete")
async def delete_capture(capture_id: str, db: AsyncSession = Depends(get_db)):
repo = BaseRepository("capture", db)
await repo.soft_delete(capture_id)
return RedirectResponse(url="/capture", status_code=303)