Files
lifeos-dev/routers/capture.py
M Dombaugh a21e00d0e0 feat: enhanced capture queue with full conversion, batching, and filtering
- Convert to 7 entity types: task, note, project, list item, contact, decision, weblink
- Each conversion has a dedicated form page with pre-filled fields and context selectors
- Multi-line paste creates batch with shared import_batch_id and undo button
- 3-tab filtering: Inbox (unprocessed), Processed (with conversion links), All
- Context pre-fill: optional area/project selectors on capture form
- Processed items show type badge and link to converted entity
- Sidebar badge count for unprocessed items already working

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 21:47:23 +00:00

352 lines
12 KiB
Python

"""Capture: quick text capture queue with conversion to any entity type."""
import re
from uuid import uuid4
from typing import Optional
from fastapi import APIRouter, Request, Form, Depends
from fastapi.templating import Jinja2Templates
from fastapi.responses import RedirectResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from core.database import get_db
from core.base_repository import BaseRepository
from core.sidebar import get_sidebar_data
router = APIRouter(prefix="/capture", tags=["capture"])
templates = Jinja2Templates(directory="templates")
CONVERT_TYPES = {
"task": "Task",
"note": "Note",
"project": "Project",
"list_item": "List Item",
"contact": "Contact",
"decision": "Decision",
"weblink": "Weblink",
}
@router.get("/")
async def list_capture(request: Request, show: str = "inbox", db: AsyncSession = Depends(get_db)):
sidebar = await get_sidebar_data(db)
if show == "processed":
where = "is_deleted = false AND processed = true"
elif show == "all":
where = "is_deleted = false"
else: # inbox
where = "is_deleted = false AND processed = false"
result = await db.execute(text(f"""
SELECT * FROM capture WHERE {where} ORDER BY created_at DESC
"""))
items = [dict(r._mapping) for r in result]
# Mark first item per batch for batch-undo display
batches = {}
for item in items:
bid = item.get("import_batch_id")
if bid:
bid_str = str(bid)
if bid_str not in batches:
batches[bid_str] = 0
item["_batch_first"] = True
else:
item["_batch_first"] = False
batches[bid_str] += 1
else:
item["_batch_first"] = False
# Get lists for list_item conversion
result = await db.execute(text(
"SELECT id, name FROM lists WHERE is_deleted = false ORDER BY name"
))
all_lists = [dict(r._mapping) for r in result]
return templates.TemplateResponse("capture.html", {
"request": request, "sidebar": sidebar, "items": items,
"show": show, "batches": batches, "all_lists": all_lists,
"convert_types": CONVERT_TYPES,
"page_title": "Capture", "active_nav": "capture",
})
@router.post("/add")
async def add_capture(
request: Request,
raw_text: str = Form(...),
area_id: Optional[str] = Form(None),
project_id: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
repo = BaseRepository("capture", db)
lines = [l.strip() for l in raw_text.strip().split("\n") if l.strip()]
batch_id = str(uuid4()) if len(lines) > 1 else None
for line in lines:
data = {"raw_text": line, "processed": False}
if batch_id:
data["import_batch_id"] = batch_id
if area_id and area_id.strip():
data["area_id"] = area_id
if project_id and project_id.strip():
data["project_id"] = project_id
await repo.create(data)
return RedirectResponse(url="/capture", status_code=303)
# ---- Batch undo (must be before /{capture_id} routes) ----
@router.post("/batch/{batch_id}/undo")
async def batch_undo(batch_id: str, db: AsyncSession = Depends(get_db)):
"""Delete all items from a batch."""
await db.execute(text("""
UPDATE capture SET is_deleted = true, deleted_at = now(), updated_at = now()
WHERE import_batch_id = :bid AND is_deleted = false
"""), {"bid": batch_id})
await db.commit()
return RedirectResponse(url="/capture", status_code=303)
# ---- Conversion form page ----
@router.get("/{capture_id}/convert/{convert_type}")
async def convert_form(
capture_id: str, convert_type: str,
request: Request, db: AsyncSession = Depends(get_db),
):
"""Show conversion form for a specific capture item."""
repo = BaseRepository("capture", db)
item = await repo.get(capture_id)
if not item or convert_type not in CONVERT_TYPES:
return RedirectResponse(url="/capture", status_code=303)
sidebar = await get_sidebar_data(db)
result = await db.execute(text(
"SELECT id, name FROM lists WHERE is_deleted = false ORDER BY name"
))
all_lists = [dict(r._mapping) for r in result]
# Parse name for contact pre-fill
parts = item["raw_text"].strip().split(None, 1)
first_name = parts[0] if parts else item["raw_text"]
last_name = parts[1] if len(parts) > 1 else ""
# Extract URL for weblink pre-fill
url_match = re.search(r'https?://\S+', item["raw_text"])
prefill_url = url_match.group(0) if url_match else ""
prefill_label = item["raw_text"].replace(prefill_url, "").strip() if url_match else item["raw_text"]
return templates.TemplateResponse("capture_convert.html", {
"request": request, "sidebar": sidebar,
"item": item, "convert_type": convert_type,
"type_label": CONVERT_TYPES[convert_type],
"all_lists": all_lists,
"first_name": first_name, "last_name": last_name,
"prefill_url": prefill_url, "prefill_label": prefill_label or item["raw_text"],
"page_title": f"Convert to {CONVERT_TYPES[convert_type]}",
"active_nav": "capture",
})
# ---- Conversion handlers ----
@router.post("/{capture_id}/to-task")
async def convert_to_task(
capture_id: str, request: Request,
domain_id: str = Form(...),
project_id: Optional[str] = Form(None),
priority: int = Form(3),
title: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
task_repo = BaseRepository("tasks", db)
data = {
"title": (title or item["raw_text"]).strip(),
"domain_id": domain_id, "status": "open", "priority": priority,
}
if project_id and project_id.strip():
data["project_id"] = project_id
task = await task_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "task",
"converted_to_id": str(task["id"]),
})
return RedirectResponse(url=f"/tasks/{task['id']}", status_code=303)
@router.post("/{capture_id}/to-note")
async def convert_to_note(
capture_id: str, request: Request,
title: Optional[str] = Form(None),
domain_id: Optional[str] = Form(None),
project_id: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
note_repo = BaseRepository("notes", db)
raw = item["raw_text"]
data = {"title": (title or raw[:100]).strip(), "body": raw}
if domain_id and domain_id.strip():
data["domain_id"] = domain_id
if project_id and project_id.strip():
data["project_id"] = project_id
note = await note_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "note",
"converted_to_id": str(note["id"]),
})
return RedirectResponse(url=f"/notes/{note['id']}", status_code=303)
@router.post("/{capture_id}/to-project")
async def convert_to_project(
capture_id: str, request: Request,
domain_id: str = Form(...),
name: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
project_repo = BaseRepository("projects", db)
data = {"name": (name or item["raw_text"]).strip(), "domain_id": domain_id, "status": "active"}
project = await project_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "project",
"converted_to_id": str(project["id"]),
})
return RedirectResponse(url=f"/projects/{project['id']}", status_code=303)
@router.post("/{capture_id}/to-list_item")
async def convert_to_list_item(
capture_id: str, request: Request,
list_id: str = Form(...),
content: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
li_repo = BaseRepository("list_items", db)
data = {"list_id": list_id, "content": (content or item["raw_text"]).strip()}
li = await li_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "list_item",
"converted_to_id": str(li["id"]), "list_id": list_id,
})
return RedirectResponse(url=f"/lists/{list_id}", status_code=303)
@router.post("/{capture_id}/to-contact")
async def convert_to_contact(
capture_id: str, request: Request,
first_name: str = Form(...),
last_name: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
contact_repo = BaseRepository("contacts", db)
data = {"first_name": first_name.strip()}
if last_name and last_name.strip():
data["last_name"] = last_name.strip()
contact = await contact_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "contact",
"converted_to_id": str(contact["id"]),
})
return RedirectResponse(url=f"/contacts/{contact['id']}", status_code=303)
@router.post("/{capture_id}/to-decision")
async def convert_to_decision(
capture_id: str, request: Request,
title: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
decision_repo = BaseRepository("decisions", db)
data = {"title": (title or item["raw_text"]).strip(), "status": "proposed", "impact": "medium"}
decision = await decision_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "decision",
"converted_to_id": str(decision["id"]),
})
return RedirectResponse(url=f"/decisions/{decision['id']}", status_code=303)
@router.post("/{capture_id}/to-weblink")
async def convert_to_weblink(
capture_id: str, request: Request,
label: Optional[str] = Form(None),
url: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
capture_repo = BaseRepository("capture", db)
item = await capture_repo.get(capture_id)
if not item:
return RedirectResponse(url="/capture", status_code=303)
weblink_repo = BaseRepository("weblinks", db)
raw = item["raw_text"]
url_match = re.search(r'https?://\S+', raw)
link_url = (url.strip() if url and url.strip() else None) or (url_match.group(0) if url_match else raw)
link_label = (label.strip() if label and label.strip() else None) or (raw.replace(link_url, "").strip() if url_match else raw[:100])
if not link_label:
link_label = link_url
data = {"label": link_label, "url": link_url}
weblink = await weblink_repo.create(data)
await capture_repo.update(capture_id, {
"processed": True, "converted_to_type": "weblink",
"converted_to_id": str(weblink["id"]),
})
return RedirectResponse(url="/weblinks", status_code=303)
@router.post("/{capture_id}/dismiss")
async def dismiss_capture(capture_id: str, db: AsyncSession = Depends(get_db)):
repo = BaseRepository("capture", db)
await repo.update(capture_id, {"processed": True, "converted_to_type": "dismissed"})
return RedirectResponse(url="/capture", status_code=303)
@router.post("/{capture_id}/delete")
async def delete_capture(capture_id: str, db: AsyncSession = Depends(get_db)):
repo = BaseRepository("capture", db)
await repo.soft_delete(capture_id)
return RedirectResponse(url="/capture", status_code=303)