"""Capture: quick text capture queue with conversion to any entity type.""" import re from uuid import uuid4 from typing import Optional from fastapi import APIRouter, Request, Form, Depends from fastapi.templating import Jinja2Templates from fastapi.responses import RedirectResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import text from core.database import get_db from core.base_repository import BaseRepository from core.sidebar import get_sidebar_data from routers.weblinks import get_default_folder_id router = APIRouter(prefix="/capture", tags=["capture"]) templates = Jinja2Templates(directory="templates") CONVERT_TYPES = { "task": "Task", "note": "Note", "project": "Project", "list_item": "List Item", "contact": "Contact", "decision": "Decision", "link": "Link", } @router.get("/") async def list_capture(request: Request, show: str = "inbox", db: AsyncSession = Depends(get_db)): sidebar = await get_sidebar_data(db) if show == "processed": where = "is_deleted = false AND processed = true" elif show == "all": where = "is_deleted = false" else: # inbox where = "is_deleted = false AND processed = false" result = await db.execute(text(f""" SELECT * FROM capture WHERE {where} ORDER BY created_at DESC """)) items = [dict(r._mapping) for r in result] # Mark first item per batch for batch-undo display batches = {} for item in items: bid = item.get("import_batch_id") if bid: bid_str = str(bid) if bid_str not in batches: batches[bid_str] = 0 item["_batch_first"] = True else: item["_batch_first"] = False batches[bid_str] += 1 else: item["_batch_first"] = False # Get lists for list_item conversion result = await db.execute(text( "SELECT id, name FROM lists WHERE is_deleted = false ORDER BY name" )) all_lists = [dict(r._mapping) for r in result] return templates.TemplateResponse("capture.html", { "request": request, "sidebar": sidebar, "items": items, "show": show, "batches": batches, "all_lists": all_lists, "convert_types": CONVERT_TYPES, "page_title": "Capture", "active_nav": "capture", }) @router.post("/add") async def add_capture( request: Request, raw_text: str = Form(...), redirect_to: Optional[str] = Form(None), area_id: Optional[str] = Form(None), project_id: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): repo = BaseRepository("capture", db) lines = [l.strip() for l in raw_text.strip().split("\n") if l.strip()] batch_id = str(uuid4()) if len(lines) > 1 else None for line in lines: data = {"raw_text": line, "processed": False} if batch_id: data["import_batch_id"] = batch_id if area_id and area_id.strip(): data["area_id"] = area_id if project_id and project_id.strip(): data["project_id"] = project_id await repo.create(data) url = redirect_to if redirect_to and redirect_to.startswith("/") else "/capture" return RedirectResponse(url=url, status_code=303) # ---- Batch undo (must be before /{capture_id} routes) ---- @router.post("/batch/{batch_id}/undo") async def batch_undo(batch_id: str, db: AsyncSession = Depends(get_db)): """Delete all items from a batch.""" await db.execute(text(""" UPDATE capture SET is_deleted = true, deleted_at = now(), updated_at = now() WHERE import_batch_id = :bid AND is_deleted = false """), {"bid": batch_id}) await db.commit() return RedirectResponse(url="/capture", status_code=303) # ---- Conversion form page ---- @router.get("/{capture_id}/convert/{convert_type}") async def convert_form( capture_id: str, convert_type: str, request: Request, db: AsyncSession = Depends(get_db), ): """Show conversion form for a specific capture item.""" repo = BaseRepository("capture", db) item = await repo.get(capture_id) if not item or convert_type not in CONVERT_TYPES: return RedirectResponse(url="/capture", status_code=303) sidebar = await get_sidebar_data(db) result = await db.execute(text( "SELECT id, name FROM lists WHERE is_deleted = false ORDER BY name" )) all_lists = [dict(r._mapping) for r in result] # Parse name for contact pre-fill parts = item["raw_text"].strip().split(None, 1) first_name = parts[0] if parts else item["raw_text"] last_name = parts[1] if len(parts) > 1 else "" # Extract URL for weblink pre-fill url_match = re.search(r'https?://\S+', item["raw_text"]) prefill_url = url_match.group(0) if url_match else "" prefill_label = item["raw_text"].replace(prefill_url, "").strip() if url_match else item["raw_text"] return templates.TemplateResponse("capture_convert.html", { "request": request, "sidebar": sidebar, "item": item, "convert_type": convert_type, "type_label": CONVERT_TYPES[convert_type], "all_lists": all_lists, "first_name": first_name, "last_name": last_name, "prefill_url": prefill_url, "prefill_label": prefill_label or item["raw_text"], "page_title": f"Convert to {CONVERT_TYPES[convert_type]}", "active_nav": "capture", }) # ---- Conversion handlers ---- @router.post("/{capture_id}/to-task") async def convert_to_task( capture_id: str, request: Request, domain_id: str = Form(...), project_id: Optional[str] = Form(None), priority: int = Form(3), title: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): capture_repo = BaseRepository("capture", db) item = await capture_repo.get(capture_id) if not item: return RedirectResponse(url="/capture", status_code=303) task_repo = BaseRepository("tasks", db) data = { "title": (title or item["raw_text"]).strip(), "domain_id": domain_id, "status": "open", "priority": priority, } if project_id and project_id.strip(): data["project_id"] = project_id task = await task_repo.create(data) await capture_repo.update(capture_id, { "processed": True, "converted_to_type": "task", "converted_to_id": str(task["id"]), }) return RedirectResponse(url=f"/tasks/{task['id']}", status_code=303) @router.post("/{capture_id}/to-note") async def convert_to_note( capture_id: str, request: Request, title: Optional[str] = Form(None), domain_id: Optional[str] = Form(None), project_id: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): capture_repo = BaseRepository("capture", db) item = await capture_repo.get(capture_id) if not item: return RedirectResponse(url="/capture", status_code=303) note_repo = BaseRepository("notes", db) raw = item["raw_text"] data = {"title": (title or raw[:100]).strip(), "body": raw} if domain_id and domain_id.strip(): data["domain_id"] = domain_id if project_id and project_id.strip(): data["project_id"] = project_id note = await note_repo.create(data) await capture_repo.update(capture_id, { "processed": True, "converted_to_type": "note", "converted_to_id": str(note["id"]), }) return RedirectResponse(url=f"/notes/{note['id']}", status_code=303) @router.post("/{capture_id}/to-project") async def convert_to_project( capture_id: str, request: Request, domain_id: str = Form(...), name: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): capture_repo = BaseRepository("capture", db) item = await capture_repo.get(capture_id) if not item: return RedirectResponse(url="/capture", status_code=303) project_repo = BaseRepository("projects", db) data = {"name": (name or item["raw_text"]).strip(), "domain_id": domain_id, "status": "active"} project = await project_repo.create(data) await capture_repo.update(capture_id, { "processed": True, "converted_to_type": "project", "converted_to_id": str(project["id"]), }) return RedirectResponse(url=f"/projects/{project['id']}", status_code=303) @router.post("/{capture_id}/to-list_item") async def convert_to_list_item( capture_id: str, request: Request, list_id: str = Form(...), content: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): capture_repo = BaseRepository("capture", db) item = await capture_repo.get(capture_id) if not item: return RedirectResponse(url="/capture", status_code=303) li_repo = BaseRepository("list_items", db) data = {"list_id": list_id, "content": (content or item["raw_text"]).strip()} li = await li_repo.create(data) await capture_repo.update(capture_id, { "processed": True, "converted_to_type": "list_item", "converted_to_id": str(li["id"]), "list_id": list_id, }) return RedirectResponse(url=f"/lists/{list_id}", status_code=303) @router.post("/{capture_id}/to-contact") async def convert_to_contact( capture_id: str, request: Request, first_name: str = Form(...), last_name: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): capture_repo = BaseRepository("capture", db) item = await capture_repo.get(capture_id) if not item: return RedirectResponse(url="/capture", status_code=303) contact_repo = BaseRepository("contacts", db) data = {"first_name": first_name.strip()} if last_name and last_name.strip(): data["last_name"] = last_name.strip() contact = await contact_repo.create(data) await capture_repo.update(capture_id, { "processed": True, "converted_to_type": "contact", "converted_to_id": str(contact["id"]), }) return RedirectResponse(url=f"/contacts/{contact['id']}", status_code=303) @router.post("/{capture_id}/to-decision") async def convert_to_decision( capture_id: str, request: Request, title: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): capture_repo = BaseRepository("capture", db) item = await capture_repo.get(capture_id) if not item: return RedirectResponse(url="/capture", status_code=303) decision_repo = BaseRepository("decisions", db) data = {"title": (title or item["raw_text"]).strip(), "status": "proposed", "impact": "medium"} decision = await decision_repo.create(data) await capture_repo.update(capture_id, { "processed": True, "converted_to_type": "decision", "converted_to_id": str(decision["id"]), }) return RedirectResponse(url=f"/decisions/{decision['id']}", status_code=303) @router.post("/{capture_id}/to-link") async def convert_to_link( capture_id: str, request: Request, label: Optional[str] = Form(None), url: Optional[str] = Form(None), db: AsyncSession = Depends(get_db), ): capture_repo = BaseRepository("capture", db) item = await capture_repo.get(capture_id) if not item: return RedirectResponse(url="/capture", status_code=303) link_repo = BaseRepository("links", db) raw = item["raw_text"] url_match = re.search(r'https?://\S+', raw) link_url = (url.strip() if url and url.strip() else None) or (url_match.group(0) if url_match else raw) link_label = (label.strip() if label and label.strip() else None) or (raw.replace(link_url, "").strip() if url_match else raw[:100]) if not link_label: link_label = link_url data = {"label": link_label, "url": link_url} link = await link_repo.create(data) # Assign to Default folder default_fid = await get_default_folder_id(db) await db.execute(text(""" INSERT INTO folder_links (folder_id, link_id) VALUES (:fid, :lid) ON CONFLICT DO NOTHING """), {"fid": default_fid, "lid": link["id"]}) await capture_repo.update(capture_id, { "processed": True, "converted_to_type": "link", "converted_to_id": str(link["id"]), }) return RedirectResponse(url="/links", status_code=303) @router.post("/{capture_id}/dismiss") async def dismiss_capture(capture_id: str, db: AsyncSession = Depends(get_db)): repo = BaseRepository("capture", db) await repo.update(capture_id, {"processed": True, "converted_to_type": "dismissed"}) return RedirectResponse(url="/capture", status_code=303) @router.post("/{capture_id}/delete") async def delete_capture(capture_id: str, db: AsyncSession = Depends(get_db)): repo = BaseRepository("capture", db) await repo.soft_delete(capture_id) return RedirectResponse(url="/capture", status_code=303)