Files
lifeos-dev/routers/time_tracking.py
M Dombaugh a427f7c781 fix: test suite green (156 passed, 7 skipped)
- Fix seed data to match actual DB schemas (capture.processed, daily_focus.completed, weblinks junction table)
- Add date/datetime coercion in BaseRepository for asyncpg compatibility
- Add UUID validation in BaseRepository.get() to prevent DataError on invalid UUIDs
- Fix focus.py and time_tracking.py date string handling for asyncpg
- Fix test ordering (action before delete) and skip destructive admin actions
- Fix form_factory FK resolution for flat UUID strings
- Fix route_report.py to use get_route_registry(app)
- Add asyncio_default_test_loop_scope=session to pytest.ini

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 21:30:27 +00:00

212 lines
7.1 KiB
Python

"""Time tracking: start/stop timer per task, manual time entries, time log view."""
from fastapi import APIRouter, Request, Depends, Form, Query
from fastapi.templating import Jinja2Templates
from fastapi.responses import RedirectResponse, JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from typing import Optional
from datetime import datetime, timezone
from core.database import get_db
from core.sidebar import get_sidebar_data
router = APIRouter(prefix="/time", tags=["time"])
templates = Jinja2Templates(directory="templates")
async def get_running_timer(db: AsyncSession) -> dict | None:
"""Get the currently running timer (end_at IS NULL), if any."""
result = await db.execute(text("""
SELECT te.*, t.title as task_title, t.id as task_id,
p.name as project_name, d.name as domain_name
FROM time_entries te
JOIN tasks t ON te.task_id = t.id
LEFT JOIN projects p ON t.project_id = p.id
LEFT JOIN domains d ON t.domain_id = d.id
WHERE te.end_at IS NULL AND te.is_deleted = false
ORDER BY te.start_at DESC
LIMIT 1
"""))
row = result.first()
return dict(row._mapping) if row else None
@router.get("/")
async def time_log(
request: Request,
task_id: Optional[str] = None,
days: int = Query(7, ge=1, le=90),
db: AsyncSession = Depends(get_db),
):
"""Time entries log view."""
sidebar = await get_sidebar_data(db)
running = await get_running_timer(db)
params = {"days": days}
task_filter = ""
if task_id:
task_filter = "AND te.task_id = :task_id"
params["task_id"] = task_id
result = await db.execute(text(f"""
SELECT te.*, t.title as task_title, t.id as task_id,
p.name as project_name, d.name as domain_name, d.color as domain_color
FROM time_entries te
JOIN tasks t ON te.task_id = t.id
LEFT JOIN projects p ON t.project_id = p.id
LEFT JOIN domains d ON t.domain_id = d.id
WHERE te.is_deleted = false
AND te.start_at >= CURRENT_DATE - INTERVAL ':days days'
{task_filter}
ORDER BY te.start_at DESC
LIMIT 200
""".replace(":days days", f"{days} days")), params)
entries = [dict(r._mapping) for r in result]
# Calculate totals
total_minutes = sum(e.get("duration_minutes") or 0 for e in entries)
# Daily breakdown
daily_totals = {}
for e in entries:
if e.get("start_at"):
day = e["start_at"].strftime("%Y-%m-%d")
daily_totals[day] = daily_totals.get(day, 0) + (e.get("duration_minutes") or 0)
return templates.TemplateResponse("time_entries.html", {
"request": request,
"sidebar": sidebar,
"entries": entries,
"running": running,
"total_minutes": total_minutes,
"daily_totals": daily_totals,
"days": days,
"task_id": task_id or "",
"page_title": "Time Log",
"active_nav": "time",
})
@router.post("/start")
async def start_timer(
request: Request,
task_id: str = Form(...),
db: AsyncSession = Depends(get_db),
):
"""Start a timer for a task. Auto-stops any running timer first."""
now = datetime.now(timezone.utc)
# Stop any currently running timer
running = await get_running_timer(db)
if running:
duration = int((now - running["start_at"]).total_seconds() / 60)
await db.execute(text("""
UPDATE time_entries SET end_at = :now, duration_minutes = :dur
WHERE id = :id
"""), {"now": now, "dur": max(duration, 1), "id": str(running["id"])})
# Start new timer
await db.execute(text("""
INSERT INTO time_entries (task_id, start_at, is_deleted, created_at)
VALUES (:task_id, :now, false, :now)
"""), {"task_id": task_id, "now": now})
# Redirect back to where they came from
referer = request.headers.get("referer", "/tasks")
return RedirectResponse(url=referer, status_code=303)
@router.post("/stop")
async def stop_timer(
request: Request,
entry_id: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
"""Stop the running timer (or a specific entry)."""
now = datetime.now(timezone.utc)
if entry_id:
# Stop specific entry
result = await db.execute(text(
"SELECT * FROM time_entries WHERE id = :id AND end_at IS NULL"
), {"id": entry_id})
entry = result.first()
else:
# Stop whatever is running
result = await db.execute(text(
"SELECT * FROM time_entries WHERE end_at IS NULL AND is_deleted = false ORDER BY start_at DESC LIMIT 1"
))
entry = result.first()
if entry:
entry = dict(entry._mapping)
duration = int((now - entry["start_at"]).total_seconds() / 60)
await db.execute(text("""
UPDATE time_entries SET end_at = :now, duration_minutes = :dur
WHERE id = :id
"""), {"now": now, "dur": max(duration, 1), "id": str(entry["id"])})
referer = request.headers.get("referer", "/time")
return RedirectResponse(url=referer, status_code=303)
@router.get("/running")
async def running_timer_api(db: AsyncSession = Depends(get_db)):
"""JSON endpoint for the topbar timer pill to poll."""
running = await get_running_timer(db)
if not running:
return JSONResponse({"running": False})
elapsed_seconds = int((datetime.now(timezone.utc) - running["start_at"]).total_seconds())
return JSONResponse({
"running": True,
"entry_id": str(running["id"]),
"task_id": str(running["task_id"]),
"task_title": running["task_title"],
"project_name": running.get("project_name"),
"start_at": running["start_at"].isoformat(),
"elapsed_seconds": elapsed_seconds,
})
@router.post("/manual")
async def manual_entry(
request: Request,
task_id: str = Form(...),
date: str = Form(...),
duration_minutes: int = Form(...),
notes: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
"""Add a manual time entry (no start/stop, just duration)."""
start_at = datetime.fromisoformat(f"{date}T12:00:00+00:00")
await db.execute(text("""
INSERT INTO time_entries (task_id, start_at, end_at, duration_minutes, notes, is_deleted, created_at)
VALUES (:task_id, :start_at, :start_at, :dur, :notes, false, now())
"""), {
"task_id": task_id,
"start_at": start_at,
"dur": duration_minutes,
"notes": notes or None,
})
referer = request.headers.get("referer", "/time")
return RedirectResponse(url=referer, status_code=303)
@router.post("/{entry_id}/delete")
async def delete_entry(
request: Request,
entry_id: str,
db: AsyncSession = Depends(get_db),
):
# Direct SQL because time_entries has no updated_at column
await db.execute(text("""
UPDATE time_entries SET is_deleted = true, deleted_at = now()
WHERE id = :id AND is_deleted = false
"""), {"id": entry_id})
referer = request.headers.get("referer", "/time")
return RedirectResponse(url=referer, status_code=303)