Files
lifeos-dev/tests/test_business_logic.py

1896 lines
76 KiB
Python

"""
Business Logic Tests
====================
Hand-written tests for specific behavioral contracts.
These test LOGIC, not routes, so they stay manual.
When to add tests here:
- New constraint (e.g. "only one timer running at a time")
- State transitions (e.g. "completing a task sets completed_at")
- Cross-entity effects (e.g. "deleting a project hides its tasks")
- Search behavior
- Sidebar data integrity
"""
from __future__ import annotations
import os
import uuid
from datetime import date, datetime, timezone, timedelta
import pytest
from httpx import AsyncClient
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
# ===========================================================================
# Time Tracking
# ===========================================================================
class TestTimerConstraints:
"""Only one timer can run at a time. Starting a new one auto-stops the old."""
@pytest.mark.asyncio
async def test_single_timer_constraint(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
t1 = await _create_task(db_session, seed_domain["id"], seed_project["id"], "Timer1")
t2 = await _create_task(db_session, seed_domain["id"], seed_project["id"], "Timer2")
await client.post("/time/start", data={"task_id": t1}, follow_redirects=False)
await client.post("/time/start", data={"task_id": t2}, follow_redirects=False)
result = await db_session.execute(
text("SELECT count(*) FROM time_entries WHERE end_at IS NULL")
)
assert result.scalar() <= 1
@pytest.mark.asyncio
async def test_stop_sets_end_at(
self, client: AsyncClient, db_session: AsyncSession, seed_task: dict,
):
await client.post("/time/start", data={"task_id": seed_task["id"]}, follow_redirects=False)
await client.post("/time/stop", follow_redirects=False)
result = await db_session.execute(
text("SELECT count(*) FROM time_entries WHERE end_at IS NULL AND task_id = :tid"),
{"tid": seed_task["id"]},
)
assert result.scalar() == 0
@pytest.mark.asyncio
async def test_running_endpoint_returns_json(
self, client: AsyncClient, seed_task: dict,
):
await client.post("/time/start", data={"task_id": seed_task["id"]}, follow_redirects=False)
r = await client.get("/time/running")
assert r.status_code == 200
data = r.json()
assert data is not None
@pytest.mark.asyncio
async def test_running_json_fields(
self, client: AsyncClient, db_session: AsyncSession, seed_task: dict,
):
"""Running timer JSON includes task_title and elapsed_seconds."""
# Stop any running timer first
await client.post("/time/stop", follow_redirects=False)
await client.post("/time/start", data={"task_id": seed_task["id"]}, follow_redirects=False)
r = await client.get("/time/running")
data = r.json()
assert data["running"] is True
assert data["task_title"] == "Test Task"
assert "elapsed_seconds" in data
assert data["elapsed_seconds"] >= 0
# Cleanup
await client.post("/time/stop", follow_redirects=False)
@pytest.mark.asyncio
async def test_no_running_timer_json(self, client: AsyncClient):
"""When no timer running, JSON returns running=False."""
await client.post("/time/stop", follow_redirects=False)
r = await client.get("/time/running")
data = r.json()
assert data["running"] is False
@pytest.mark.asyncio
async def test_manual_time_entry(
self, client: AsyncClient, db_session: AsyncSession, seed_task: dict,
):
"""Manual time entry creates a record with correct duration."""
today = str(date.today())
r = await client.post("/time/manual", data={
"task_id": seed_task["id"],
"date": today,
"duration_minutes": "45",
"notes": "Manual test entry",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT duration_minutes, notes FROM time_entries "
"WHERE task_id = :tid AND duration_minutes = 45 AND is_deleted = false"),
{"tid": seed_task["id"]},
)
row = result.first()
assert row is not None, "Manual entry not created"
assert row.duration_minutes == 45
@pytest.mark.asyncio
async def test_time_log_page_loads(self, client: AsyncClient):
"""Time log page loads with default 7-day view."""
r = await client.get("/time/")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_time_log_with_days_param(self, client: AsyncClient):
"""Time log page respects days parameter."""
r = await client.get("/time/?days=30")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_time_entry_delete_uses_direct_sql(
self, client: AsyncClient, db_session: AsyncSession, seed_task: dict,
):
"""time_entries has no updated_at - delete uses direct SQL."""
# Create a manual entry
await client.post("/time/manual", data={
"task_id": seed_task["id"],
"date": str(date.today()),
"duration_minutes": "5",
}, follow_redirects=False)
result = await db_session.execute(
text("SELECT id FROM time_entries WHERE task_id = :tid AND duration_minutes = 5 AND is_deleted = false"),
{"tid": seed_task["id"]},
)
row = result.first()
assert row is not None
entry_id = str(row.id)
r = await client.post(f"/time/{entry_id}/delete", follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT is_deleted FROM time_entries WHERE id = :id"), {"id": entry_id},
)
assert result.first().is_deleted is True
# ===========================================================================
# Task Filtering & Status Transitions
# ===========================================================================
class TestTaskFiltering:
"""Test task list filtering by domain, status, priority, context, and sort."""
@pytest.mark.asyncio
async def test_filter_by_status(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
"""Filter tasks by status shows only matching tasks."""
t_open = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"FilterOpen-{_uid()}")
t_done = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"FilterDone-{_uid()}", status="done")
r = await client.get("/tasks/?status=open")
assert "FilterOpen" in r.text
assert "FilterDone" not in r.text
r = await client.get("/tasks/?status=done")
assert "FilterDone" in r.text
# Cleanup
await _delete_tasks(db_session, [t_open, t_done])
@pytest.mark.asyncio
async def test_filter_by_priority(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
tag = _uid()
t_high = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"PriHigh-{tag}", priority=1)
t_low = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"PriLow-{tag}", priority=4)
r = await client.get("/tasks/?priority=1")
assert f"PriHigh-{tag}" in r.text
assert f"PriLow-{tag}" not in r.text
await _delete_tasks(db_session, [t_high, t_low])
@pytest.mark.asyncio
async def test_filter_by_domain(
self, client: AsyncClient, seed_domain: dict, seed_task: dict,
):
r = await client.get(f"/tasks/?domain_id={seed_domain['id']}")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_combined_filters(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
"""Multiple filters applied simultaneously."""
tag = _uid()
t1 = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"Combo-{tag}", priority=1, status="open")
r = await client.get(f"/tasks/?domain_id={seed_domain['id']}&status=open&priority=1")
assert f"Combo-{tag}" in r.text
await _delete_tasks(db_session, [t1])
@pytest.mark.asyncio
async def test_sort_by_priority(self, client: AsyncClient):
r = await client.get("/tasks/?sort=priority")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_sort_by_due_date(self, client: AsyncClient):
r = await client.get("/tasks/?sort=due_date")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_sort_by_title(self, client: AsyncClient):
r = await client.get("/tasks/?sort=title")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_sort_by_created_at(self, client: AsyncClient):
r = await client.get("/tasks/?sort=created_at")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_done_tasks_sort_to_bottom(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
"""Done and cancelled tasks sort after open/in_progress tasks."""
tag = _uid()
t_open = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"SortOpen-{tag}", status="open")
t_done = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"SortDone-{tag}", status="done")
r = await client.get("/tasks/")
# Open task should appear before done task in the HTML
open_pos = r.text.find(f"SortOpen-{tag}")
done_pos = r.text.find(f"SortDone-{tag}")
if open_pos != -1 and done_pos != -1:
assert open_pos < done_pos, "Done task should sort after open task"
await _delete_tasks(db_session, [t_open, t_done])
class TestTaskStatusTransitions:
"""Test all task status transitions and completed_at handling."""
@pytest.mark.asyncio
async def test_toggle_open_to_done(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"Toggle-{_uid()}")
r = await client.post(f"/tasks/{tid}/toggle", follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT status, completed_at FROM tasks WHERE id = :id"), {"id": tid},
)
row = result.first()
assert row.status == "done"
assert row.completed_at is not None
await _delete_tasks(db_session, [tid])
@pytest.mark.asyncio
async def test_toggle_done_to_open(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"ToggleBack-{_uid()}", status="done")
r = await client.post(f"/tasks/{tid}/toggle", follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT status, completed_at FROM tasks WHERE id = :id"), {"id": tid},
)
row = result.first()
assert row.status == "open"
assert row.completed_at is None
await _delete_tasks(db_session, [tid])
@pytest.mark.asyncio
async def test_complete_action_sets_done_and_completed_at(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"Complete-{_uid()}")
r = await client.post(f"/tasks/{tid}/complete", follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT status, completed_at FROM tasks WHERE id = :id"), {"id": tid},
)
row = result.first()
assert row.status == "done"
assert row.completed_at is not None
await _delete_tasks(db_session, [tid])
@pytest.mark.asyncio
async def test_edit_status_to_done_sets_completed_at(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
"""Editing a task status from open to done sets completed_at."""
tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"EditDone-{_uid()}")
r = await client.post(f"/tasks/{tid}/edit", data={
"title": "EditDone", "domain_id": seed_domain["id"],
"priority": "3", "status": "done",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT completed_at FROM tasks WHERE id = :id"), {"id": tid},
)
assert result.first().completed_at is not None
await _delete_tasks(db_session, [tid])
@pytest.mark.asyncio
async def test_edit_status_from_done_clears_completed_at(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
"""Editing a task from done to in_progress clears completed_at."""
tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"EditReopen-{_uid()}", status="done")
# Set completed_at
await db_session.execute(
text("UPDATE tasks SET completed_at = now() WHERE id = :id"), {"id": tid}
)
await db_session.commit()
r = await client.post(f"/tasks/{tid}/edit", data={
"title": "EditReopen", "domain_id": seed_domain["id"],
"priority": "3", "status": "in_progress",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT status, completed_at FROM tasks WHERE id = :id"), {"id": tid},
)
row = result.first()
assert row.status == "in_progress"
# completed_at should be cleared (nullable_fields includes completed_at)
await _delete_tasks(db_session, [tid])
@pytest.mark.asyncio
async def test_all_status_values_accepted(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
"""All valid status values (open, in_progress, blocked, done, cancelled) work."""
for status in ("open", "in_progress", "blocked", "done", "cancelled"):
tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"Status-{status}-{_uid()}")
r = await client.post(f"/tasks/{tid}/edit", data={
"title": f"Status-{status}", "domain_id": seed_domain["id"],
"priority": "3", "status": status,
}, follow_redirects=False)
assert r.status_code == 303, f"Status {status} rejected"
await _delete_tasks(db_session, [tid])
@pytest.mark.asyncio
async def test_quick_add_task(
self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict,
):
tag = _uid()
r = await client.post("/tasks/quick-add", data={
"title": f"QuickAdd-{tag}",
"domain_id": seed_domain["id"],
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT id, status, priority FROM tasks WHERE title = :t AND is_deleted = false"),
{"t": f"QuickAdd-{tag}"},
)
row = result.first()
assert row is not None, "Quick-add task not created"
assert row.status == "open"
assert row.priority == 3
await _delete_tasks(db_session, [str(row.id)])
@pytest.mark.asyncio
async def test_subtask_appears_on_parent_detail(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
"""Subtasks (parent_id) show on parent task detail page."""
parent_id = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"Parent-{_uid()}")
tag = _uid()
child_id = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"Child-{tag}", parent_id=parent_id)
r = await client.get(f"/tasks/{parent_id}")
assert r.status_code == 200
assert f"Child-{tag}" in r.text, "Subtask missing from parent detail"
await _delete_tasks(db_session, [child_id, parent_id])
# ===========================================================================
# Capture Conversions
# ===========================================================================
class TestCaptureConversions:
"""Test all capture-to-entity conversion types and edge cases."""
@pytest.mark.asyncio
async def test_convert_to_task(
self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict,
):
cap_id = await _create_capture(db_session, f"ConvTask-{_uid()}")
r = await client.post(f"/capture/{cap_id}/to-task", data={
"domain_id": seed_domain["id"],
"priority": "2",
}, follow_redirects=False)
assert r.status_code == 303
# Capture should be marked processed
result = await db_session.execute(
text("SELECT processed, converted_to_type, converted_to_id FROM capture WHERE id = :id"),
{"id": cap_id},
)
row = result.first()
assert row.processed is True
assert row.converted_to_type == "task"
assert row.converted_to_id is not None
# Task should exist
result = await db_session.execute(
text("SELECT id FROM tasks WHERE id = :id AND is_deleted = false"),
{"id": row.converted_to_id},
)
assert result.first() is not None, "Converted task not found"
@pytest.mark.asyncio
async def test_convert_to_note(
self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict,
):
tag = _uid()
cap_id = await _create_capture(db_session, f"ConvNote-{tag}")
r = await client.post(f"/capture/{cap_id}/to-note", data={
"domain_id": seed_domain["id"],
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT converted_to_type FROM capture WHERE id = :id"), {"id": cap_id},
)
assert result.first().converted_to_type == "note"
@pytest.mark.asyncio
async def test_convert_to_project(
self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict,
):
cap_id = await _create_capture(db_session, f"ConvProj-{_uid()}")
r = await client.post(f"/capture/{cap_id}/to-project", data={
"domain_id": seed_domain["id"],
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT converted_to_type FROM capture WHERE id = :id"), {"id": cap_id},
)
assert result.first().converted_to_type == "project"
@pytest.mark.asyncio
async def test_convert_to_contact(
self, client: AsyncClient, db_session: AsyncSession,
):
cap_id = await _create_capture(db_session, "John Smith")
r = await client.post(f"/capture/{cap_id}/to-contact", data={
"first_name": "John",
"last_name": "Smith",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT converted_to_type, converted_to_id FROM capture WHERE id = :id"),
{"id": cap_id},
)
row = result.first()
assert row.converted_to_type == "contact"
# Verify contact exists
result = await db_session.execute(
text("SELECT first_name FROM contacts WHERE id = :id"),
{"id": row.converted_to_id},
)
assert result.first().first_name == "John"
@pytest.mark.asyncio
async def test_convert_to_decision(
self, client: AsyncClient, db_session: AsyncSession,
):
cap_id = await _create_capture(db_session, f"DecisionCapture-{_uid()}")
r = await client.post(f"/capture/{cap_id}/to-decision", data={}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT converted_to_type, converted_to_id FROM capture WHERE id = :id"),
{"id": cap_id},
)
row = result.first()
assert row.converted_to_type == "decision"
# Verify decision defaults
result = await db_session.execute(
text("SELECT status, impact FROM decisions WHERE id = :id"), {"id": row.converted_to_id},
)
dec = result.first()
assert dec.status == "proposed"
assert dec.impact == "medium"
@pytest.mark.asyncio
async def test_convert_to_weblink(
self, client: AsyncClient, db_session: AsyncSession,
):
cap_id = await _create_capture(db_session, "Check https://example.com/test for details")
r = await client.post(f"/capture/{cap_id}/to-weblink", data={}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT converted_to_type, converted_to_id FROM capture WHERE id = :id"),
{"id": cap_id},
)
row = result.first()
assert row.converted_to_type == "weblink"
# URL should be extracted
result = await db_session.execute(
text("SELECT url FROM weblinks WHERE id = :id"), {"id": row.converted_to_id},
)
assert "https://example.com/test" in result.first().url
@pytest.mark.asyncio
async def test_convert_to_list_item(
self, client: AsyncClient, db_session: AsyncSession, seed_list: dict,
):
cap_id = await _create_capture(db_session, f"ListItem-{_uid()}")
r = await client.post(f"/capture/{cap_id}/to-list_item", data={
"list_id": seed_list["id"],
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT converted_to_type FROM capture WHERE id = :id"), {"id": cap_id},
)
assert result.first().converted_to_type == "list_item"
@pytest.mark.asyncio
async def test_dismiss_capture(
self, client: AsyncClient, db_session: AsyncSession,
):
cap_id = await _create_capture(db_session, f"Dismiss-{_uid()}")
r = await client.post(f"/capture/{cap_id}/dismiss", follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT processed, converted_to_type FROM capture WHERE id = :id"),
{"id": cap_id},
)
row = result.first()
assert row.processed is True
assert row.converted_to_type == "dismissed"
@pytest.mark.asyncio
async def test_dismissed_hidden_from_inbox(
self, client: AsyncClient, db_session: AsyncSession,
):
"""Dismissed items don't show in inbox."""
tag = _uid()
cap_id = await _create_capture(db_session, f"DismissHide-{tag}")
await client.post(f"/capture/{cap_id}/dismiss", follow_redirects=False)
r = await client.get("/capture/")
assert f"DismissHide-{tag}" not in r.text
@pytest.mark.asyncio
async def test_line_splitting_skips_empty_lines(
self, client: AsyncClient, db_session: AsyncSession,
):
"""Empty lines are skipped in multi-line capture."""
tag = _uid()
r = await client.post("/capture/add", data={
"raw_text": f"Line1-{tag}\n\n\nLine2-{tag}\n \n",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT count(*) FROM capture WHERE raw_text LIKE :pat AND is_deleted = false"),
{"pat": f"%-{tag}"},
)
assert result.scalar() == 2, "Empty lines should be skipped"
@pytest.mark.asyncio
async def test_batch_capture_gets_batch_id(
self, client: AsyncClient, db_session: AsyncSession,
):
"""Multi-line capture assigns same import_batch_id to all items."""
tag = _uid()
await client.post("/capture/add", data={
"raw_text": f"Batch1-{tag}\nBatch2-{tag}\nBatch3-{tag}",
}, follow_redirects=False)
result = await db_session.execute(
text("SELECT DISTINCT import_batch_id FROM capture "
"WHERE raw_text LIKE :pat AND is_deleted = false"),
{"pat": f"Batch%-{tag}"},
)
batch_ids = [r[0] for r in result]
assert len(batch_ids) == 1, "All batch items should share one batch_id"
assert batch_ids[0] is not None, "Batch ID should not be null"
@pytest.mark.asyncio
async def test_single_line_no_batch_id(
self, client: AsyncClient, db_session: AsyncSession,
):
"""Single-line capture has no batch_id."""
tag = _uid()
await client.post("/capture/add", data={
"raw_text": f"Single-{tag}",
}, follow_redirects=False)
result = await db_session.execute(
text("SELECT import_batch_id FROM capture WHERE raw_text = :t AND is_deleted = false"),
{"t": f"Single-{tag}"},
)
row = result.first()
assert row is not None
assert row.import_batch_id is None
@pytest.mark.asyncio
async def test_batch_undo(
self, client: AsyncClient, db_session: AsyncSession,
):
"""Batch undo soft-deletes all items in the batch."""
tag = _uid()
await client.post("/capture/add", data={
"raw_text": f"Undo1-{tag}\nUndo2-{tag}",
}, follow_redirects=False)
result = await db_session.execute(
text("SELECT import_batch_id FROM capture WHERE raw_text LIKE :pat AND is_deleted = false LIMIT 1"),
{"pat": f"Undo%-{tag}"},
)
batch_id = str(result.first()[0])
r = await client.post(f"/capture/batch/{batch_id}/undo", follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT count(*) FROM capture WHERE import_batch_id = :bid AND is_deleted = false"),
{"bid": batch_id},
)
assert result.scalar() == 0, "Batch undo should delete all items"
@pytest.mark.asyncio
async def test_convert_nonexistent_capture_redirects(
self, client: AsyncClient, seed_domain: dict,
):
"""Converting a nonexistent capture redirects to /capture."""
fake_id = str(uuid.uuid4())
r = await client.post(f"/capture/{fake_id}/to-task", data={
"domain_id": seed_domain["id"],
}, follow_redirects=False)
assert r.status_code == 303
@pytest.mark.asyncio
async def test_capture_inbox_vs_processed_view(
self, client: AsyncClient, db_session: AsyncSession,
):
"""Inbox shows unprocessed, processed view shows processed."""
tag = _uid()
cap_id = await _create_capture(db_session, f"ViewTest-{tag}")
r = await client.get("/capture/?show=inbox")
assert f"ViewTest-{tag}" in r.text
# Dismiss it
await client.post(f"/capture/{cap_id}/dismiss", follow_redirects=False)
r = await client.get("/capture/?show=inbox")
assert f"ViewTest-{tag}" not in r.text
r = await client.get("/capture/?show=processed")
assert f"ViewTest-{tag}" in r.text
# ===========================================================================
# Meeting Action Items & Relationships
# ===========================================================================
class TestMeetingRelationships:
"""Test meeting action items, attendees, and linked entities."""
@pytest.mark.asyncio
async def test_create_action_item(
self, client: AsyncClient, db_session: AsyncSession,
seed_meeting: dict, seed_domain: dict,
):
"""Creating an action item creates a task and links it to the meeting."""
tag = _uid()
r = await client.post(f"/meetings/{seed_meeting['id']}/action-item", data={
"title": f"ActionItem-{tag}",
"domain_id": seed_domain["id"],
}, follow_redirects=False)
assert r.status_code == 303
# Task should exist
result = await db_session.execute(
text("SELECT id FROM tasks WHERE title = :t AND is_deleted = false"),
{"t": f"ActionItem-{tag}"},
)
task_row = result.first()
assert task_row is not None, "Action item task not created"
# Junction should exist
result = await db_session.execute(
text("SELECT source FROM meeting_tasks WHERE meeting_id = :mid AND task_id = :tid"),
{"mid": seed_meeting["id"], "tid": str(task_row.id)},
)
junc = result.first()
assert junc is not None, "meeting_tasks junction not created"
assert junc.source == "action_item"
@pytest.mark.asyncio
async def test_action_item_appears_on_meeting_detail(
self, client: AsyncClient, db_session: AsyncSession,
seed_meeting: dict, seed_domain: dict,
):
tag = _uid()
await client.post(f"/meetings/{seed_meeting['id']}/action-item", data={
"title": f"DetailItem-{tag}",
"domain_id": seed_domain["id"],
}, follow_redirects=False)
r = await client.get(f"/meetings/{seed_meeting['id']}")
assert f"DetailItem-{tag}" in r.text
@pytest.mark.asyncio
async def test_meeting_status_filter(
self, client: AsyncClient,
):
"""Meeting list can be filtered by status."""
r = await client.get("/meetings/?status=scheduled")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_meeting_detail_loads_all_sections(
self, client: AsyncClient, seed_meeting: dict,
):
"""Meeting detail page loads action items, notes, decisions, attendees sections."""
r = await client.get(f"/meetings/{seed_meeting['id']}")
assert r.status_code == 200
# ===========================================================================
# Admin / Trash Full Lifecycle
# ===========================================================================
class TestAdminTrashLifecycle:
"""Full lifecycle: create item -> soft delete -> verify in trash -> permanent delete -> confirm gone."""
@pytest.mark.asyncio
async def test_note_trash_lifecycle(
self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict,
):
unique_title = f"TrashTest-{uuid.uuid4().hex[:8]}"
r = await client.post("/notes/create", data={
"title": unique_title,
"domain_id": seed_domain["id"],
"body": "Trash lifecycle test body",
"content_format": "markdown",
}, follow_redirects=False)
assert r.status_code == 303
location = r.headers.get("location", "")
note_id = location.rstrip("/").split("/")[-1]
assert note_id and note_id != "notes"
r = await client.get("/notes/")
assert unique_title in r.text
r = await client.post(f"/notes/{note_id}/delete", follow_redirects=False)
assert r.status_code in (303, 302)
r = await client.get("/notes/")
assert unique_title not in r.text
result = await db_session.execute(
text("SELECT is_deleted, deleted_at FROM notes WHERE id = :id"), {"id": note_id},
)
row = result.first()
assert row.is_deleted is True
assert row.deleted_at is not None
r = await client.get("/admin/trash/?entity_type=notes")
assert unique_title in r.text
r = await client.post(f"/admin/trash/notes/{note_id}/permanent-delete", follow_redirects=False)
assert r.status_code == 303
r = await client.get("/admin/trash/?entity_type=notes")
assert unique_title not in r.text
result = await db_session.execute(
text("SELECT count(*) FROM notes WHERE id = :id"), {"id": note_id},
)
assert result.scalar() == 0
@pytest.mark.asyncio
async def test_task_trash_lifecycle(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
unique_title = f"TrashTask-{uuid.uuid4().hex[:8]}"
task_id = str(uuid.uuid4())
await db_session.execute(
text("INSERT INTO tasks (id, title, domain_id, project_id, status, priority, sort_order, is_deleted, created_at, updated_at) "
"VALUES (:id, :title, :did, :pid, 'open', 3, 0, false, now(), now())"),
{"id": task_id, "title": unique_title, "did": seed_domain["id"], "pid": seed_project["id"]},
)
await db_session.commit()
r = await client.get("/tasks/")
assert unique_title in r.text
r = await client.post(f"/tasks/{task_id}/delete", follow_redirects=False)
assert r.status_code in (303, 302)
r = await client.get("/admin/trash/?entity_type=tasks")
assert unique_title in r.text
r = await client.post(f"/admin/trash/tasks/{task_id}/restore", follow_redirects=False)
assert r.status_code == 303
r = await client.get("/tasks/")
assert unique_title in r.text
r = await client.get("/admin/trash/?entity_type=tasks")
assert unique_title not in r.text
await client.post(f"/tasks/{task_id}/delete", follow_redirects=False)
await client.post(f"/admin/trash/tasks/{task_id}/permanent-delete", follow_redirects=False)
@pytest.mark.asyncio
async def test_trash_page_loads_with_counts(self, client: AsyncClient):
r = await client.get("/admin/trash/")
assert r.status_code == 200
assert "Trash" in r.text
@pytest.mark.asyncio
async def test_permanent_delete_invalid_table_redirects(self, client: AsyncClient):
fake_id = str(uuid.uuid4())
r = await client.post(f"/admin/trash/nonexistent_table/{fake_id}/permanent-delete", follow_redirects=False)
assert r.status_code == 303
@pytest.mark.asyncio
async def test_restore_invalid_table_redirects(self, client: AsyncClient):
fake_id = str(uuid.uuid4())
r = await client.post(f"/admin/trash/nonexistent_table/{fake_id}/restore", follow_redirects=False)
assert r.status_code == 303
@pytest.mark.asyncio
async def test_empty_trash(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict,
):
"""Empty trash permanently deletes all soft-deleted items."""
# Create and soft-delete a note
tag = _uid()
note_id = str(uuid.uuid4())
await db_session.execute(
text("INSERT INTO notes (id, title, domain_id, body, content_format, is_deleted, deleted_at, created_at, updated_at) "
"VALUES (:id, :title, :did, 'body', 'markdown', true, now(), now(), now())"),
{"id": note_id, "title": f"EmptyTrash-{tag}", "did": seed_domain["id"]},
)
await db_session.commit()
r = await client.post("/admin/trash/empty", follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT count(*) FROM notes WHERE id = :id"), {"id": note_id},
)
assert result.scalar() == 0, "Empty trash should permanently delete items"
@pytest.mark.asyncio
async def test_trash_filter_by_entity_type(self, client: AsyncClient):
"""Trash page can filter by entity type."""
for entity_type in ("tasks", "notes", "projects", "contacts"):
r = await client.get(f"/admin/trash/?entity_type={entity_type}")
assert r.status_code == 200
# ===========================================================================
# Soft Delete & Restore
# ===========================================================================
class TestSoftDeleteBehavior:
"""Soft-deleted items should vanish from lists and reappear after restore."""
@pytest.mark.asyncio
async def test_deleted_task_hidden_from_list(
self, client: AsyncClient, seed_task: dict,
):
await client.post(f"/tasks/{seed_task['id']}/delete", follow_redirects=False)
r = await client.get("/tasks/")
assert seed_task["title"] not in r.text
@pytest.mark.asyncio
async def test_restore_task_reappears(
self, client: AsyncClient, seed_task: dict,
):
await client.post(f"/tasks/{seed_task['id']}/delete", follow_redirects=False)
await client.post(
f"/admin/trash/tasks/{seed_task['id']}/restore",
follow_redirects=False,
)
r = await client.get("/tasks/")
assert seed_task["title"] in r.text
@pytest.mark.asyncio
async def test_deleted_project_hidden(
self, client: AsyncClient, seed_project: dict,
):
await client.post(f"/projects/{seed_project['id']}/delete", follow_redirects=False)
r = await client.get("/projects/")
assert seed_project["name"] not in r.text
# ===========================================================================
# Search
# ===========================================================================
class TestSearchBehavior:
@pytest.mark.asyncio
async def test_search_does_not_crash_on_sql_injection(self, client: AsyncClient):
r = await client.get("/search/?q='; DROP TABLE tasks; --")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_search_empty_query(self, client: AsyncClient):
r = await client.get("/search/?q=")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_search_special_unicode(self, client: AsyncClient):
r = await client.get("/search/?q=日本語テスト")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_search_api_returns_json(self, client: AsyncClient):
"""Search API endpoint returns valid JSON with results array."""
r = await client.get("/search/api?q=Test")
assert r.status_code == 200
data = r.json()
assert "results" in data
assert "query" in data
assert isinstance(data["results"], list)
@pytest.mark.asyncio
async def test_search_api_finds_seed_task(
self, client: AsyncClient, seed_task: dict,
):
"""Search finds existing seed task by title."""
r = await client.get("/search/api?q=Test+Task")
data = r.json()
task_results = [r for r in data["results"] if r["type"] == "tasks"]
# May or may not find it depending on search_vector trigger, but shouldn't crash
assert r.status_code == 200
@pytest.mark.asyncio
async def test_search_api_entity_type_filter(self, client: AsyncClient):
"""Search API can filter by entity_type."""
r = await client.get("/search/api?q=Test&entity_type=tasks")
assert r.status_code == 200
data = r.json()
for result in data["results"]:
assert result["type"] == "tasks"
@pytest.mark.asyncio
async def test_search_api_limit_param(self, client: AsyncClient):
"""Search API respects limit parameter."""
r = await client.get("/search/api?q=Test&limit=2")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_search_deleted_items_excluded(
self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict,
):
"""Soft-deleted items should not appear in search results."""
tag = _uid()
# Create a note with searchable title, then soft-delete it
note_id = str(uuid.uuid4())
await db_session.execute(
text("INSERT INTO notes (id, title, domain_id, body, content_format, is_deleted, created_at, updated_at) "
"VALUES (:id, :title, :did, 'body', 'markdown', false, now(), now())"),
{"id": note_id, "title": f"SearchDelete-{tag}", "did": seed_domain["id"]},
)
await db_session.commit()
# Now soft-delete it
await db_session.execute(
text("UPDATE notes SET is_deleted = true, deleted_at = now() WHERE id = :id"),
{"id": note_id},
)
await db_session.commit()
r = await client.get(f"/search/api?q=SearchDelete-{tag}")
data = r.json()
ids = [r["id"] for r in data["results"]]
assert note_id not in ids, "Deleted note should not appear in search"
# Cleanup
await db_session.execute(text("DELETE FROM notes WHERE id = :id"), {"id": note_id})
await db_session.commit()
# ===========================================================================
# Sidebar
# ===========================================================================
class TestSidebarIntegrity:
@pytest.mark.asyncio
async def test_sidebar_shows_domain_on_every_page(
self, client: AsyncClient, seed_domain: dict,
):
for path in ("/", "/tasks/", "/notes/", "/projects/"):
r = await client.get(path)
assert seed_domain["name"] in r.text, f"Domain missing from sidebar on {path}"
@pytest.mark.asyncio
async def test_sidebar_shows_project_hierarchy(
self, client: AsyncClient, seed_domain: dict, seed_area: dict, seed_project: dict,
):
r = await client.get("/")
assert seed_project["name"] in r.text
@pytest.mark.asyncio
async def test_archived_project_hidden_from_sidebar(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict,
):
"""Archived projects should not appear in the sidebar."""
tag = _uid()
proj_id = str(uuid.uuid4())
await db_session.execute(
text("INSERT INTO projects (id, name, domain_id, status, priority, sort_order, is_deleted, created_at, updated_at) "
"VALUES (:id, :name, :did, 'archived', 3, 0, false, now(), now())"),
{"id": proj_id, "name": f"Archived-{tag}", "did": seed_domain["id"]},
)
await db_session.commit()
r = await client.get("/")
assert f"Archived-{tag}" not in r.text, "Archived project should be hidden from sidebar"
await db_session.execute(text("DELETE FROM projects WHERE id = :id"), {"id": proj_id})
await db_session.commit()
@pytest.mark.asyncio
async def test_sidebar_capture_badge_count(
self, client: AsyncClient, db_session: AsyncSession,
):
"""Sidebar includes capture count for unprocessed items."""
# The sidebar data includes capture_count - we verify the page loads
# and the sidebar renders (capture_count is passed to template)
r = await client.get("/")
assert r.status_code == 200
# The sidebar should render without error
assert "sidebar" in r.text.lower() or "nav" in r.text.lower()
# ===========================================================================
# Focus & Capture Workflows
# ===========================================================================
class TestFocusWorkflow:
@pytest.mark.asyncio
async def test_add_and_remove_from_focus(
self, client: AsyncClient, db_session: AsyncSession, seed_task: dict,
):
r = await client.post("/focus/add", data={
"task_id": seed_task["id"],
"focus_date": str(date.today()),
}, follow_redirects=False)
assert r.status_code in (303, 302)
@pytest.mark.asyncio
async def test_capture_multi_line_creates_multiple(
self, client: AsyncClient, db_session: AsyncSession,
):
await client.post(
"/capture/add",
data={"raw_text": "Line one\nLine two\nLine three"},
follow_redirects=False,
)
result = await db_session.execute(
text("SELECT count(*) FROM capture WHERE is_deleted = false")
)
count = result.scalar()
assert count >= 2, f"Expected multiple capture items, got {count}"
@pytest.mark.asyncio
async def test_focus_toggle_syncs_task_status(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
"""Toggling focus completion also toggles task status."""
tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"FocusSync-{_uid()}")
today = str(date.today())
# Add to focus
r = await client.post("/focus/add", data={
"task_id": tid, "focus_date": today,
}, follow_redirects=False)
assert r.status_code == 303
# Get focus item id
result = await db_session.execute(
text("SELECT id FROM daily_focus WHERE task_id = :tid AND focus_date = :fd AND is_deleted = false"),
{"tid": tid, "fd": date.today()},
)
focus_id = str(result.first().id)
# Toggle complete
await client.post(f"/focus/{focus_id}/toggle", follow_redirects=False)
result = await db_session.execute(
text("SELECT status FROM tasks WHERE id = :id"), {"id": tid},
)
assert result.first().status == "done", "Task should be marked done when focus toggled"
# Toggle back
await client.post(f"/focus/{focus_id}/toggle", follow_redirects=False)
result = await db_session.execute(
text("SELECT status FROM tasks WHERE id = :id"), {"id": tid},
)
assert result.first().status == "open", "Task should be reopened when focus un-toggled"
await _delete_tasks(db_session, [tid])
@pytest.mark.asyncio
async def test_focus_remove_soft_deletes(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
"""Removing from focus soft-deletes the focus entry."""
tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"FocusRm-{_uid()}")
today = str(date.today())
await client.post("/focus/add", data={"task_id": tid, "focus_date": today}, follow_redirects=False)
result = await db_session.execute(
text("SELECT id FROM daily_focus WHERE task_id = :tid AND focus_date = :fd AND is_deleted = false"),
{"tid": tid, "fd": date.today()},
)
focus_id = str(result.first().id)
await client.post(f"/focus/{focus_id}/remove", follow_redirects=False)
result = await db_session.execute(
text("SELECT is_deleted FROM daily_focus WHERE id = :id"), {"id": focus_id},
)
assert result.first().is_deleted is True
await _delete_tasks(db_session, [tid])
@pytest.mark.asyncio
async def test_focus_page_loads_with_date(self, client: AsyncClient):
"""Focus page loads for specific date."""
r = await client.get(f"/focus/?focus_date={date.today()}")
assert r.status_code == 200
# ===========================================================================
# List Items & Hierarchy
# ===========================================================================
class TestListItems:
"""Test list item CRUD, toggle, nesting."""
@pytest.mark.asyncio
async def test_add_item_to_list(
self, client: AsyncClient, db_session: AsyncSession, seed_list: dict,
):
tag = _uid()
r = await client.post(f"/lists/{seed_list['id']}/items/add", data={
"content": f"Item-{tag}",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT content, completed FROM list_items WHERE list_id = :lid AND content = :c AND is_deleted = false"),
{"lid": seed_list["id"], "c": f"Item-{tag}"},
)
row = result.first()
assert row is not None
assert row.completed is False
@pytest.mark.asyncio
async def test_toggle_item_completion(
self, client: AsyncClient, db_session: AsyncSession, seed_list: dict,
):
"""Toggle sets completed=true and completed_at, toggle again reverts."""
item_id = await _create_list_item(db_session, seed_list["id"], f"Toggle-{_uid()}")
# Toggle to completed
r = await client.post(
f"/lists/{seed_list['id']}/items/{item_id}/toggle", follow_redirects=False,
)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT completed, completed_at FROM list_items WHERE id = :id"), {"id": item_id},
)
row = result.first()
assert row.completed is True
assert row.completed_at is not None
# Toggle back
await client.post(
f"/lists/{seed_list['id']}/items/{item_id}/toggle", follow_redirects=False,
)
result = await db_session.execute(
text("SELECT completed, completed_at FROM list_items WHERE id = :id"), {"id": item_id},
)
row = result.first()
assert row.completed is False
assert row.completed_at is None
@pytest.mark.asyncio
async def test_nested_item(
self, client: AsyncClient, db_session: AsyncSession, seed_list: dict,
):
"""Items can be nested via parent_item_id."""
parent_id = await _create_list_item(db_session, seed_list["id"], f"Parent-{_uid()}")
tag = _uid()
r = await client.post(f"/lists/{seed_list['id']}/items/add", data={
"content": f"Child-{tag}",
"parent_item_id": parent_id,
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT parent_item_id FROM list_items WHERE content = :c AND is_deleted = false"),
{"c": f"Child-{tag}"},
)
assert str(result.first().parent_item_id) == parent_id
@pytest.mark.asyncio
async def test_edit_item_content(
self, client: AsyncClient, db_session: AsyncSession, seed_list: dict,
):
item_id = await _create_list_item(db_session, seed_list["id"], "OldContent")
r = await client.post(
f"/lists/{seed_list['id']}/items/{item_id}/edit",
data={"content": "NewContent"},
follow_redirects=False,
)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT content FROM list_items WHERE id = :id"), {"id": item_id},
)
assert result.first().content == "NewContent"
@pytest.mark.asyncio
async def test_delete_item(
self, client: AsyncClient, db_session: AsyncSession, seed_list: dict,
):
item_id = await _create_list_item(db_session, seed_list["id"], f"DeleteMe-{_uid()}")
r = await client.post(
f"/lists/{seed_list['id']}/items/{item_id}/delete", follow_redirects=False,
)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT is_deleted FROM list_items WHERE id = :id"), {"id": item_id},
)
assert result.first().is_deleted is True
@pytest.mark.asyncio
async def test_list_item_count_accuracy(
self, client: AsyncClient, db_session: AsyncSession, seed_list: dict,
):
"""List view shows accurate item_count and completed_count."""
i1 = await _create_list_item(db_session, seed_list["id"], f"Count1-{_uid()}")
i2 = await _create_list_item(db_session, seed_list["id"], f"Count2-{_uid()}")
# Complete one
await db_session.execute(
text("UPDATE list_items SET completed = true, completed_at = now() WHERE id = :id"),
{"id": i1},
)
await db_session.commit()
r = await client.get("/lists/")
assert r.status_code == 200
# Page should load without error (counts are subqueries)
# ===========================================================================
# Decisions & Supersession
# ===========================================================================
class TestDecisions:
@pytest.mark.asyncio
async def test_decision_filter_by_status(self, client: AsyncClient):
for status in ("proposed", "decided", "archived", "rejected"):
r = await client.get(f"/decisions/?status={status}")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_decision_filter_by_impact(self, client: AsyncClient):
for impact in ("high", "medium", "low"):
r = await client.get(f"/decisions/?impact={impact}")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_decision_supersession(
self, client: AsyncClient, db_session: AsyncSession,
):
"""Decision A can be superseded by Decision B."""
a_id = str(uuid.uuid4())
b_id = str(uuid.uuid4())
await db_session.execute(
text("INSERT INTO decisions (id, title, status, impact, is_deleted, created_at, updated_at) "
"VALUES (:id, 'DecA', 'decided', 'high', false, now(), now())"),
{"id": a_id},
)
await db_session.execute(
text("INSERT INTO decisions (id, title, status, impact, is_deleted, created_at, updated_at) "
"VALUES (:id, 'DecB', 'decided', 'high', false, now(), now())"),
{"id": b_id},
)
await db_session.commit()
# Set A superseded by B
r = await client.post(f"/decisions/{a_id}/edit", data={
"title": "DecA", "status": "decided", "impact": "high",
"superseded_by_id": b_id,
}, follow_redirects=False)
assert r.status_code == 303
# Check detail page shows supersession
r = await client.get(f"/decisions/{a_id}")
assert "DecB" in r.text, "Superseding decision should appear on detail"
# B's detail should show it supersedes A
r = await client.get(f"/decisions/{b_id}")
assert "DecA" in r.text, "Superseded decision should appear"
# Cleanup
await db_session.execute(text("DELETE FROM decisions WHERE id IN (:a, :b)"), {"a": a_id, "b": b_id})
await db_session.commit()
@pytest.mark.asyncio
async def test_decision_meeting_link(
self, client: AsyncClient, db_session: AsyncSession, seed_meeting: dict,
):
"""Decision can be linked to a meeting."""
tag = _uid()
r = await client.post("/decisions/create", data={
"title": f"MeetDec-{tag}",
"status": "proposed",
"impact": "medium",
"meeting_id": seed_meeting["id"],
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT meeting_id FROM decisions WHERE title = :t AND is_deleted = false"),
{"t": f"MeetDec-{tag}"},
)
assert str(result.first().meeting_id) == seed_meeting["id"]
# ===========================================================================
# Appointments & Contact Associations
# ===========================================================================
class TestAppointments:
@pytest.mark.asyncio
async def test_timeframe_upcoming(self, client: AsyncClient):
r = await client.get("/appointments/?timeframe=upcoming")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_timeframe_past(self, client: AsyncClient):
r = await client.get("/appointments/?timeframe=past")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_timeframe_all(self, client: AsyncClient):
r = await client.get("/appointments/?timeframe=all")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_create_appointment_with_contacts(
self, client: AsyncClient, db_session: AsyncSession, seed_contact: dict,
):
"""Creating an appointment with contacts creates junction records."""
tag = _uid()
tomorrow = str(date.today() + timedelta(days=1))
r = await client.post("/appointments/create", data={
"title": f"Appt-{tag}",
"start_date": tomorrow,
"start_time": "10:00",
"end_date": tomorrow,
"end_time": "11:00",
"contact_ids": seed_contact["id"],
}, follow_redirects=False)
assert r.status_code == 303
# Find the appointment
result = await db_session.execute(
text("SELECT id FROM appointments WHERE title = :t AND is_deleted = false"),
{"t": f"Appt-{tag}"},
)
appt_id = str(result.first().id)
# Check junction
result = await db_session.execute(
text("SELECT count(*) FROM contact_appointments WHERE appointment_id = :aid"),
{"aid": appt_id},
)
assert result.scalar() >= 1, "Contact junction not created"
@pytest.mark.asyncio
async def test_all_day_appointment(
self, client: AsyncClient, db_session: AsyncSession,
):
"""All-day appointment sets start_at to T00:00:00."""
tag = _uid()
tomorrow = str(date.today() + timedelta(days=1))
r = await client.post("/appointments/create", data={
"title": f"AllDay-{tag}",
"start_date": tomorrow,
"end_date": tomorrow,
"all_day": "on",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT all_day, start_at FROM appointments WHERE title = :t AND is_deleted = false"),
{"t": f"AllDay-{tag}"},
)
row = result.first()
assert row.all_day is True
@pytest.mark.asyncio
async def test_edit_appointment_rebuilds_contacts(
self, client: AsyncClient, db_session: AsyncSession, seed_contact: dict,
):
"""Editing an appointment rebuilds contact associations."""
tag = _uid()
tomorrow = str(date.today() + timedelta(days=1))
# Create without contact
r = await client.post("/appointments/create", data={
"title": f"EditAppt-{tag}",
"start_date": tomorrow,
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT id FROM appointments WHERE title = :t AND is_deleted = false"),
{"t": f"EditAppt-{tag}"},
)
appt_id = str(result.first().id)
# Edit with contact
await client.post(f"/appointments/{appt_id}/edit", data={
"title": f"EditAppt-{tag}",
"start_date": tomorrow,
"contact_ids": seed_contact["id"],
}, follow_redirects=False)
result = await db_session.execute(
text("SELECT count(*) FROM contact_appointments WHERE appointment_id = :aid"),
{"aid": appt_id},
)
assert result.scalar() >= 1
@pytest.mark.asyncio
async def test_appointment_detail_shows_contacts(
self, client: AsyncClient, seed_appointment: dict,
):
r = await client.get(f"/appointments/{seed_appointment['id']}")
assert r.status_code == 200
# ===========================================================================
# Project Progress & Hierarchy
# ===========================================================================
class TestProjectHierarchy:
@pytest.mark.asyncio
async def test_project_progress_calculation(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict,
):
"""Project progress shows correct done/total percentage."""
tag = _uid()
proj_id = str(uuid.uuid4())
await db_session.execute(
text("INSERT INTO projects (id, name, domain_id, status, priority, sort_order, is_deleted, created_at, updated_at) "
"VALUES (:id, :name, :did, 'active', 3, 0, false, now(), now())"),
{"id": proj_id, "name": f"Prog-{tag}", "did": seed_domain["id"]},
)
# 2 tasks: 1 done, 1 open = 50%
t1 = await _create_task(db_session, seed_domain["id"], proj_id, f"ProgOpen-{tag}")
t2 = await _create_task(db_session, seed_domain["id"], proj_id, f"ProgDone-{tag}", status="done")
r = await client.get("/projects/")
assert r.status_code == 200
# Page should show project with progress (won't test exact %, just that it loads)
assert f"Prog-{tag}" in r.text
await _delete_tasks(db_session, [t1, t2])
await db_session.execute(text("DELETE FROM projects WHERE id = :id"), {"id": proj_id})
await db_session.commit()
@pytest.mark.asyncio
async def test_project_progress_zero_tasks(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict,
):
"""Project with zero tasks shows 0% progress (no division by zero)."""
tag = _uid()
proj_id = str(uuid.uuid4())
await db_session.execute(
text("INSERT INTO projects (id, name, domain_id, status, priority, sort_order, is_deleted, created_at, updated_at) "
"VALUES (:id, :name, :did, 'active', 3, 0, false, now(), now())"),
{"id": proj_id, "name": f"ZeroProg-{tag}", "did": seed_domain["id"]},
)
await db_session.commit()
r = await client.get("/projects/")
assert r.status_code == 200 # Should not crash from division by zero
await db_session.execute(text("DELETE FROM projects WHERE id = :id"), {"id": proj_id})
await db_session.commit()
@pytest.mark.asyncio
async def test_project_filter_by_status(self, client: AsyncClient):
for status in ("active", "on_hold", "completed", "archived"):
r = await client.get(f"/projects/?status={status}")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_project_filter_by_domain(
self, client: AsyncClient, seed_domain: dict,
):
r = await client.get(f"/projects/?domain_id={seed_domain['id']}")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_project_detail_tabs(
self, client: AsyncClient, seed_project: dict,
):
"""Project detail loads with different tab parameters."""
for tab in ("tasks", "notes", "links"):
r = await client.get(f"/projects/{seed_project['id']}?tab={tab}")
assert r.status_code == 200
# ===========================================================================
# Cascade / Orphan Behavior
# ===========================================================================
class TestCascadeBehavior:
"""Test cross-entity effects when parent entities are deleted."""
@pytest.mark.asyncio
async def test_deleted_task_hidden_from_focus(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
"""When a task is deleted, focus entries referencing it should not crash focus page."""
tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"FocusCascade-{_uid()}")
today = str(date.today())
await client.post("/focus/add", data={"task_id": tid, "focus_date": today}, follow_redirects=False)
# Delete the task
await client.post(f"/tasks/{tid}/delete", follow_redirects=False)
# Focus page should still load (JOIN filters deleted tasks)
r = await client.get(f"/focus/?focus_date={today}")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_deleted_project_tasks_still_accessible(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict,
):
"""Tasks survive when their project is soft-deleted."""
tag = _uid()
proj_id = str(uuid.uuid4())
await db_session.execute(
text("INSERT INTO projects (id, name, domain_id, status, priority, sort_order, is_deleted, created_at, updated_at) "
"VALUES (:id, :name, :did, 'active', 3, 0, false, now(), now())"),
{"id": proj_id, "name": f"CascProj-{tag}", "did": seed_domain["id"]},
)
tid = await _create_task(db_session, seed_domain["id"], proj_id, f"OrphanTask-{tag}")
# Delete the project
await client.post(f"/projects/{proj_id}/delete", follow_redirects=False)
# Task should still appear on tasks list
r = await client.get("/tasks/")
assert f"OrphanTask-{tag}" in r.text, "Task should survive project deletion"
# Task detail should still load
r = await client.get(f"/tasks/{tid}")
assert r.status_code == 200
await _delete_tasks(db_session, [tid])
await db_session.execute(text("DELETE FROM projects WHERE id = :id"), {"id": proj_id})
await db_session.commit()
@pytest.mark.asyncio
async def test_time_entries_cascade_on_task_delete(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
"""time_entries FK has ON DELETE CASCADE - entries deleted with task."""
tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"TimeCascade-{_uid()}")
# Create a time entry for this task
await db_session.execute(
text("INSERT INTO time_entries (task_id, start_at, end_at, duration_minutes, is_deleted, created_at) "
"VALUES (:tid, now(), now(), 10, false, now())"),
{"tid": tid},
)
await db_session.commit()
# Hard delete the task (CASCADE should remove time_entries)
await db_session.execute(text("DELETE FROM tasks WHERE id = :id"), {"id": tid})
await db_session.commit()
result = await db_session.execute(
text("SELECT count(*) FROM time_entries WHERE task_id = :tid"), {"tid": tid},
)
assert result.scalar() == 0, "time_entries should cascade delete with task"
@pytest.mark.asyncio
async def test_list_items_cascade_on_list_delete(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict,
):
"""list_items FK has ON DELETE CASCADE - items deleted with list."""
tag = _uid()
list_id = str(uuid.uuid4())
await db_session.execute(
text("INSERT INTO lists (id, name, domain_id, list_type, sort_order, is_deleted, created_at, updated_at) "
"VALUES (:id, :name, :did, 'checklist', 0, false, now(), now())"),
{"id": list_id, "name": f"CascList-{tag}", "did": seed_domain["id"]},
)
item_id = await _create_list_item(db_session, list_id, f"CascItem-{tag}")
# Hard delete the list
await db_session.execute(text("DELETE FROM lists WHERE id = :id"), {"id": list_id})
await db_session.commit()
result = await db_session.execute(
text("SELECT count(*) FROM list_items WHERE id = :id"), {"id": item_id},
)
assert result.scalar() == 0, "list_items should cascade delete with list"
# ===========================================================================
# File Operations
# ===========================================================================
class TestFileOperations:
@pytest.mark.asyncio
async def test_file_list_loads(self, client: AsyncClient):
r = await client.get("/files/")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_file_download_nonexistent_redirects(self, client: AsyncClient):
"""Download of nonexistent file redirects to file list."""
fake_id = str(uuid.uuid4())
r = await client.get(f"/files/{fake_id}/download", follow_redirects=False)
assert r.status_code == 303
@pytest.mark.asyncio
async def test_file_preview_nonexistent_redirects(self, client: AsyncClient):
fake_id = str(uuid.uuid4())
r = await client.get(f"/files/{fake_id}/preview", follow_redirects=False)
assert r.status_code == 303
@pytest.mark.asyncio
async def test_file_context_filter(self, client: AsyncClient):
"""Files page loads with context filter params."""
r = await client.get(f"/files/?context_type=tasks&context_id={uuid.uuid4()}")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_file_serve_nonexistent_redirects(self, client: AsyncClient):
fake_id = str(uuid.uuid4())
r = await client.get(f"/files/{fake_id}/serve", follow_redirects=False)
assert r.status_code == 303
# ===========================================================================
# Form Validation & Edge Cases
# ===========================================================================
class TestFormValidation:
@pytest.mark.asyncio
async def test_task_create_missing_title_returns_422(self, client: AsyncClient, seed_domain: dict):
"""Missing required 'title' field returns 422."""
r = await client.post("/tasks/create", data={
"domain_id": seed_domain["id"],
"priority": "3",
}, follow_redirects=False)
assert r.status_code == 422
@pytest.mark.asyncio
async def test_task_create_missing_domain_returns_422(self, client: AsyncClient):
"""Missing required 'domain_id' field returns 422."""
r = await client.post("/tasks/create", data={
"title": "NoDomain",
"priority": "3",
}, follow_redirects=False)
assert r.status_code == 422
@pytest.mark.asyncio
async def test_note_create_missing_title_returns_422(self, client: AsyncClient):
r = await client.post("/notes/create", data={
"body": "No title",
}, follow_redirects=False)
assert r.status_code == 422
@pytest.mark.asyncio
async def test_tags_split_correctly(
self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict,
):
"""Tags string is split by comma into array."""
tag = _uid()
r = await client.post("/notes/create", data={
"title": f"TagTest-{tag}",
"domain_id": seed_domain["id"],
"content_format": "markdown",
"tags": "alpha, beta, gamma",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT tags FROM notes WHERE title = :t AND is_deleted = false"),
{"t": f"TagTest-{tag}"},
)
row = result.first()
assert row is not None
tags = row.tags
assert "alpha" in tags
assert "beta" in tags
assert "gamma" in tags
@pytest.mark.asyncio
async def test_empty_tags_stored_as_null(
self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict,
):
"""Empty tags string doesn't create an array with empty strings."""
tag = _uid()
r = await client.post("/notes/create", data={
"title": f"NoTag-{tag}",
"domain_id": seed_domain["id"],
"content_format": "markdown",
"tags": "",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT tags FROM notes WHERE title = :t AND is_deleted = false"),
{"t": f"NoTag-{tag}"},
)
row = result.first()
# Tags should be null or empty, not [""]
assert row.tags is None or row.tags == []
# ===========================================================================
# Edge Cases
# ===========================================================================
class TestEdgeCases:
@pytest.mark.asyncio
async def test_invalid_uuid_in_path(self, client: AsyncClient):
r = await client.get("/tasks/not-a-valid-uuid")
assert r.status_code in (404, 422, 400, 303)
@pytest.mark.asyncio
async def test_timer_start_without_task_id(self, client: AsyncClient):
r = await client.post("/time/start", data={}, follow_redirects=False)
assert r.status_code != 200
@pytest.mark.asyncio
async def test_double_delete_doesnt_crash(
self, client: AsyncClient, seed_task: dict,
):
await client.post(f"/tasks/{seed_task['id']}/delete", follow_redirects=False)
r = await client.post(f"/tasks/{seed_task['id']}/delete", follow_redirects=False)
assert r.status_code in (303, 302, 404)
@pytest.mark.asyncio
async def test_toggle_nonexistent_task_redirects(self, client: AsyncClient):
fake_id = str(uuid.uuid4())
r = await client.post(f"/tasks/{fake_id}/toggle", follow_redirects=False)
assert r.status_code == 303
@pytest.mark.asyncio
async def test_nonexistent_detail_pages_redirect(self, client: AsyncClient):
"""Nonexistent entity detail pages redirect gracefully."""
fake_id = str(uuid.uuid4())
for path in (
f"/tasks/{fake_id}", f"/notes/{fake_id}", f"/projects/{fake_id}",
f"/meetings/{fake_id}", f"/decisions/{fake_id}", f"/appointments/{fake_id}",
f"/lists/{fake_id}",
):
r = await client.get(path, follow_redirects=False)
assert r.status_code == 303, f"{path} did not redirect for nonexistent ID"
@pytest.mark.asyncio
async def test_stop_timer_when_none_running(self, client: AsyncClient):
"""Stopping timer when none running should not crash."""
await client.post("/time/stop", follow_redirects=False)
r = await client.post("/time/stop", follow_redirects=False)
assert r.status_code == 303
@pytest.mark.asyncio
async def test_capture_add_redirect_to(
self, client: AsyncClient,
):
"""Capture add with redirect_to redirects to specified URL."""
r = await client.post("/capture/add", data={
"raw_text": "redirect test",
"redirect_to": "/tasks/",
}, follow_redirects=False)
assert r.status_code == 303
assert r.headers.get("location") == "/tasks/"
@pytest.mark.asyncio
async def test_capture_add_ignores_external_redirect(
self, client: AsyncClient,
):
"""Capture add ignores redirect_to that doesn't start with /."""
r = await client.post("/capture/add", data={
"raw_text": "external redirect test",
"redirect_to": "https://evil.com",
}, follow_redirects=False)
assert r.status_code == 303
assert r.headers.get("location") == "/capture"
# ===========================================================================
# Helpers
# ===========================================================================
def _uid() -> str:
"""Short unique ID for test data."""
return uuid.uuid4().hex[:8]
async def _create_task(
db: AsyncSession, domain_id: str, project_id: str, title: str,
status: str = "open", priority: int = 3, parent_id: str = None,
) -> str:
_id = str(uuid.uuid4())
await db.execute(
text("INSERT INTO tasks (id, domain_id, project_id, parent_id, title, status, priority, sort_order, is_deleted, created_at, updated_at) "
"VALUES (:id, :did, :pid, :parent_id, :title, :status, :priority, 0, false, now(), now())"),
{"id": _id, "did": domain_id, "pid": project_id, "parent_id": parent_id, "title": title, "status": status, "priority": priority},
)
await db.commit()
return _id
async def _delete_tasks(db: AsyncSession, task_ids: list[str]) -> None:
for tid in task_ids:
await db.execute(text("DELETE FROM tasks WHERE id = :id"), {"id": tid})
await db.commit()
async def _create_capture(db: AsyncSession, raw_text: str) -> str:
_id = str(uuid.uuid4())
await db.execute(
text("INSERT INTO capture (id, raw_text, processed, is_deleted, created_at, updated_at) "
"VALUES (:id, :text, false, false, now(), now())"),
{"id": _id, "text": raw_text},
)
await db.commit()
return _id
async def _create_list_item(db: AsyncSession, list_id: str, content: str) -> str:
_id = str(uuid.uuid4())
await db.execute(
text("INSERT INTO list_items (id, list_id, content, completed, sort_order, is_deleted, created_at, updated_at) "
"VALUES (:id, :lid, :content, false, 0, false, now(), now())"),
{"id": _id, "lid": list_id, "content": content},
)
await db.commit()
return _id