""" Business Logic Tests ==================== Hand-written tests for specific behavioral contracts. These test LOGIC, not routes, so they stay manual. When to add tests here: - New constraint (e.g. "only one timer running at a time") - State transitions (e.g. "completing a task sets completed_at") - Cross-entity effects (e.g. "deleting a project hides its tasks") - Search behavior - Sidebar data integrity """ from __future__ import annotations import os import uuid from datetime import date, datetime, timezone, timedelta import pytest from httpx import AsyncClient from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession # =========================================================================== # Time Tracking # =========================================================================== class TestTimerConstraints: """Only one timer can run at a time. Starting a new one auto-stops the old.""" @pytest.mark.asyncio async def test_single_timer_constraint( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): t1 = await _create_task(db_session, seed_domain["id"], seed_project["id"], "Timer1") t2 = await _create_task(db_session, seed_domain["id"], seed_project["id"], "Timer2") await client.post("/time/start", data={"task_id": t1}, follow_redirects=False) await client.post("/time/start", data={"task_id": t2}, follow_redirects=False) result = await db_session.execute( text("SELECT count(*) FROM time_entries WHERE end_at IS NULL") ) assert result.scalar() <= 1 @pytest.mark.asyncio async def test_stop_sets_end_at( self, client: AsyncClient, db_session: AsyncSession, seed_task: dict, ): await client.post("/time/start", data={"task_id": seed_task["id"]}, follow_redirects=False) await client.post("/time/stop", follow_redirects=False) result = await db_session.execute( text("SELECT count(*) FROM time_entries WHERE end_at IS NULL AND task_id = :tid"), {"tid": seed_task["id"]}, ) assert result.scalar() == 0 @pytest.mark.asyncio async def test_running_endpoint_returns_json( self, client: AsyncClient, seed_task: dict, ): await client.post("/time/start", data={"task_id": seed_task["id"]}, follow_redirects=False) r = await client.get("/time/running") assert r.status_code == 200 data = r.json() assert data is not None @pytest.mark.asyncio async def test_running_json_fields( self, client: AsyncClient, db_session: AsyncSession, seed_task: dict, ): """Running timer JSON includes task_title and elapsed_seconds.""" # Stop any running timer first await client.post("/time/stop", follow_redirects=False) await client.post("/time/start", data={"task_id": seed_task["id"]}, follow_redirects=False) r = await client.get("/time/running") data = r.json() assert data["running"] is True assert data["task_title"] == "Test Task" assert "elapsed_seconds" in data assert data["elapsed_seconds"] >= 0 # Cleanup await client.post("/time/stop", follow_redirects=False) @pytest.mark.asyncio async def test_no_running_timer_json(self, client: AsyncClient): """When no timer running, JSON returns running=False.""" await client.post("/time/stop", follow_redirects=False) r = await client.get("/time/running") data = r.json() assert data["running"] is False @pytest.mark.asyncio async def test_manual_time_entry( self, client: AsyncClient, db_session: AsyncSession, seed_task: dict, ): """Manual time entry creates a record with correct duration.""" today = str(date.today()) r = await client.post("/time/manual", data={ "task_id": seed_task["id"], "date": today, "duration_minutes": "45", "notes": "Manual test entry", }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT duration_minutes, notes FROM time_entries " "WHERE task_id = :tid AND duration_minutes = 45 AND is_deleted = false"), {"tid": seed_task["id"]}, ) row = result.first() assert row is not None, "Manual entry not created" assert row.duration_minutes == 45 @pytest.mark.asyncio async def test_time_log_page_loads(self, client: AsyncClient): """Time log page loads with default 7-day view.""" r = await client.get("/time/") assert r.status_code == 200 @pytest.mark.asyncio async def test_time_log_with_days_param(self, client: AsyncClient): """Time log page respects days parameter.""" r = await client.get("/time/?days=30") assert r.status_code == 200 @pytest.mark.asyncio async def test_time_entry_delete_uses_direct_sql( self, client: AsyncClient, db_session: AsyncSession, seed_task: dict, ): """time_entries has no updated_at - delete uses direct SQL.""" # Create a manual entry await client.post("/time/manual", data={ "task_id": seed_task["id"], "date": str(date.today()), "duration_minutes": "5", }, follow_redirects=False) result = await db_session.execute( text("SELECT id FROM time_entries WHERE task_id = :tid AND duration_minutes = 5 AND is_deleted = false"), {"tid": seed_task["id"]}, ) row = result.first() assert row is not None entry_id = str(row.id) r = await client.post(f"/time/{entry_id}/delete", follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT is_deleted FROM time_entries WHERE id = :id"), {"id": entry_id}, ) assert result.first().is_deleted is True # =========================================================================== # Task Filtering & Status Transitions # =========================================================================== class TestTaskFiltering: """Test task list filtering by domain, status, priority, context, and sort.""" @pytest.mark.asyncio async def test_filter_by_status( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): """Filter tasks by status shows only matching tasks.""" t_open = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"FilterOpen-{_uid()}") t_done = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"FilterDone-{_uid()}", status="done") r = await client.get("/tasks/?status=open") assert "FilterOpen" in r.text assert "FilterDone" not in r.text r = await client.get("/tasks/?status=done") assert "FilterDone" in r.text # Cleanup await _delete_tasks(db_session, [t_open, t_done]) @pytest.mark.asyncio async def test_filter_by_priority( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): tag = _uid() t_high = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"PriHigh-{tag}", priority=1) t_low = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"PriLow-{tag}", priority=4) r = await client.get("/tasks/?priority=1") assert f"PriHigh-{tag}" in r.text assert f"PriLow-{tag}" not in r.text await _delete_tasks(db_session, [t_high, t_low]) @pytest.mark.asyncio async def test_filter_by_domain( self, client: AsyncClient, seed_domain: dict, seed_task: dict, ): r = await client.get(f"/tasks/?domain_id={seed_domain['id']}") assert r.status_code == 200 @pytest.mark.asyncio async def test_combined_filters( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): """Multiple filters applied simultaneously.""" tag = _uid() t1 = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"Combo-{tag}", priority=1, status="open") r = await client.get(f"/tasks/?domain_id={seed_domain['id']}&status=open&priority=1") assert f"Combo-{tag}" in r.text await _delete_tasks(db_session, [t1]) @pytest.mark.asyncio async def test_sort_by_priority(self, client: AsyncClient): r = await client.get("/tasks/?sort=priority") assert r.status_code == 200 @pytest.mark.asyncio async def test_sort_by_due_date(self, client: AsyncClient): r = await client.get("/tasks/?sort=due_date") assert r.status_code == 200 @pytest.mark.asyncio async def test_sort_by_title(self, client: AsyncClient): r = await client.get("/tasks/?sort=title") assert r.status_code == 200 @pytest.mark.asyncio async def test_sort_by_created_at(self, client: AsyncClient): r = await client.get("/tasks/?sort=created_at") assert r.status_code == 200 @pytest.mark.asyncio async def test_done_tasks_sort_to_bottom( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): """Done and cancelled tasks sort after open/in_progress tasks.""" tag = _uid() t_open = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"SortOpen-{tag}", status="open") t_done = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"SortDone-{tag}", status="done") r = await client.get("/tasks/") # Open task should appear before done task in the HTML open_pos = r.text.find(f"SortOpen-{tag}") done_pos = r.text.find(f"SortDone-{tag}") if open_pos != -1 and done_pos != -1: assert open_pos < done_pos, "Done task should sort after open task" await _delete_tasks(db_session, [t_open, t_done]) class TestTaskStatusTransitions: """Test all task status transitions and completed_at handling.""" @pytest.mark.asyncio async def test_toggle_open_to_done( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"Toggle-{_uid()}") r = await client.post(f"/tasks/{tid}/toggle", follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT status, completed_at FROM tasks WHERE id = :id"), {"id": tid}, ) row = result.first() assert row.status == "done" assert row.completed_at is not None await _delete_tasks(db_session, [tid]) @pytest.mark.asyncio async def test_toggle_done_to_open( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"ToggleBack-{_uid()}", status="done") r = await client.post(f"/tasks/{tid}/toggle", follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT status, completed_at FROM tasks WHERE id = :id"), {"id": tid}, ) row = result.first() assert row.status == "open" assert row.completed_at is None await _delete_tasks(db_session, [tid]) @pytest.mark.asyncio async def test_complete_action_sets_done_and_completed_at( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"Complete-{_uid()}") r = await client.post(f"/tasks/{tid}/complete", follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT status, completed_at FROM tasks WHERE id = :id"), {"id": tid}, ) row = result.first() assert row.status == "done" assert row.completed_at is not None await _delete_tasks(db_session, [tid]) @pytest.mark.asyncio async def test_edit_status_to_done_sets_completed_at( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): """Editing a task status from open to done sets completed_at.""" tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"EditDone-{_uid()}") r = await client.post(f"/tasks/{tid}/edit", data={ "title": "EditDone", "domain_id": seed_domain["id"], "priority": "3", "status": "done", }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT completed_at FROM tasks WHERE id = :id"), {"id": tid}, ) assert result.first().completed_at is not None await _delete_tasks(db_session, [tid]) @pytest.mark.asyncio async def test_edit_status_from_done_clears_completed_at( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): """Editing a task from done to in_progress clears completed_at.""" tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"EditReopen-{_uid()}", status="done") # Set completed_at await db_session.execute( text("UPDATE tasks SET completed_at = now() WHERE id = :id"), {"id": tid} ) await db_session.commit() r = await client.post(f"/tasks/{tid}/edit", data={ "title": "EditReopen", "domain_id": seed_domain["id"], "priority": "3", "status": "in_progress", }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT status, completed_at FROM tasks WHERE id = :id"), {"id": tid}, ) row = result.first() assert row.status == "in_progress" # completed_at should be cleared (nullable_fields includes completed_at) await _delete_tasks(db_session, [tid]) @pytest.mark.asyncio async def test_all_status_values_accepted( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): """All valid status values (open, in_progress, blocked, done, cancelled) work.""" for status in ("open", "in_progress", "blocked", "done", "cancelled"): tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"Status-{status}-{_uid()}") r = await client.post(f"/tasks/{tid}/edit", data={ "title": f"Status-{status}", "domain_id": seed_domain["id"], "priority": "3", "status": status, }, follow_redirects=False) assert r.status_code == 303, f"Status {status} rejected" await _delete_tasks(db_session, [tid]) @pytest.mark.asyncio async def test_quick_add_task( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): tag = _uid() r = await client.post("/tasks/quick-add", data={ "title": f"QuickAdd-{tag}", "domain_id": seed_domain["id"], }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT id, status, priority FROM tasks WHERE title = :t AND is_deleted = false"), {"t": f"QuickAdd-{tag}"}, ) row = result.first() assert row is not None, "Quick-add task not created" assert row.status == "open" assert row.priority == 3 await _delete_tasks(db_session, [str(row.id)]) @pytest.mark.asyncio async def test_subtask_appears_on_parent_detail( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): """Subtasks (parent_id) show on parent task detail page.""" parent_id = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"Parent-{_uid()}") tag = _uid() child_id = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"Child-{tag}", parent_id=parent_id) r = await client.get(f"/tasks/{parent_id}") assert r.status_code == 200 assert f"Child-{tag}" in r.text, "Subtask missing from parent detail" await _delete_tasks(db_session, [child_id, parent_id]) # =========================================================================== # Capture Conversions # =========================================================================== class TestCaptureConversions: """Test all capture-to-entity conversion types and edge cases.""" @pytest.mark.asyncio async def test_convert_to_task( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): cap_id = await _create_capture(db_session, f"ConvTask-{_uid()}") r = await client.post(f"/capture/{cap_id}/to-task", data={ "domain_id": seed_domain["id"], "priority": "2", }, follow_redirects=False) assert r.status_code == 303 # Capture should be marked processed result = await db_session.execute( text("SELECT processed, converted_to_type, converted_to_id FROM capture WHERE id = :id"), {"id": cap_id}, ) row = result.first() assert row.processed is True assert row.converted_to_type == "task" assert row.converted_to_id is not None # Task should exist result = await db_session.execute( text("SELECT id FROM tasks WHERE id = :id AND is_deleted = false"), {"id": row.converted_to_id}, ) assert result.first() is not None, "Converted task not found" @pytest.mark.asyncio async def test_convert_to_note( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): tag = _uid() cap_id = await _create_capture(db_session, f"ConvNote-{tag}") r = await client.post(f"/capture/{cap_id}/to-note", data={ "domain_id": seed_domain["id"], }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT converted_to_type FROM capture WHERE id = :id"), {"id": cap_id}, ) assert result.first().converted_to_type == "note" @pytest.mark.asyncio async def test_convert_to_project( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): cap_id = await _create_capture(db_session, f"ConvProj-{_uid()}") r = await client.post(f"/capture/{cap_id}/to-project", data={ "domain_id": seed_domain["id"], }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT converted_to_type FROM capture WHERE id = :id"), {"id": cap_id}, ) assert result.first().converted_to_type == "project" @pytest.mark.asyncio async def test_convert_to_contact( self, client: AsyncClient, db_session: AsyncSession, ): cap_id = await _create_capture(db_session, "John Smith") r = await client.post(f"/capture/{cap_id}/to-contact", data={ "first_name": "John", "last_name": "Smith", }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT converted_to_type, converted_to_id FROM capture WHERE id = :id"), {"id": cap_id}, ) row = result.first() assert row.converted_to_type == "contact" # Verify contact exists result = await db_session.execute( text("SELECT first_name FROM contacts WHERE id = :id"), {"id": row.converted_to_id}, ) assert result.first().first_name == "John" @pytest.mark.asyncio async def test_convert_to_decision( self, client: AsyncClient, db_session: AsyncSession, ): cap_id = await _create_capture(db_session, f"DecisionCapture-{_uid()}") r = await client.post(f"/capture/{cap_id}/to-decision", data={}, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT converted_to_type, converted_to_id FROM capture WHERE id = :id"), {"id": cap_id}, ) row = result.first() assert row.converted_to_type == "decision" # Verify decision defaults result = await db_session.execute( text("SELECT status, impact FROM decisions WHERE id = :id"), {"id": row.converted_to_id}, ) dec = result.first() assert dec.status == "proposed" assert dec.impact == "medium" @pytest.mark.asyncio async def test_convert_to_weblink( self, client: AsyncClient, db_session: AsyncSession, ): cap_id = await _create_capture(db_session, "Check https://example.com/test for details") r = await client.post(f"/capture/{cap_id}/to-weblink", data={}, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT converted_to_type, converted_to_id FROM capture WHERE id = :id"), {"id": cap_id}, ) row = result.first() assert row.converted_to_type == "weblink" # URL should be extracted result = await db_session.execute( text("SELECT url FROM weblinks WHERE id = :id"), {"id": row.converted_to_id}, ) assert "https://example.com/test" in result.first().url @pytest.mark.asyncio async def test_convert_to_list_item( self, client: AsyncClient, db_session: AsyncSession, seed_list: dict, ): cap_id = await _create_capture(db_session, f"ListItem-{_uid()}") r = await client.post(f"/capture/{cap_id}/to-list_item", data={ "list_id": seed_list["id"], }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT converted_to_type FROM capture WHERE id = :id"), {"id": cap_id}, ) assert result.first().converted_to_type == "list_item" @pytest.mark.asyncio async def test_dismiss_capture( self, client: AsyncClient, db_session: AsyncSession, ): cap_id = await _create_capture(db_session, f"Dismiss-{_uid()}") r = await client.post(f"/capture/{cap_id}/dismiss", follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT processed, converted_to_type FROM capture WHERE id = :id"), {"id": cap_id}, ) row = result.first() assert row.processed is True assert row.converted_to_type == "dismissed" @pytest.mark.asyncio async def test_dismissed_hidden_from_inbox( self, client: AsyncClient, db_session: AsyncSession, ): """Dismissed items don't show in inbox.""" tag = _uid() cap_id = await _create_capture(db_session, f"DismissHide-{tag}") await client.post(f"/capture/{cap_id}/dismiss", follow_redirects=False) r = await client.get("/capture/") assert f"DismissHide-{tag}" not in r.text @pytest.mark.asyncio async def test_line_splitting_skips_empty_lines( self, client: AsyncClient, db_session: AsyncSession, ): """Empty lines are skipped in multi-line capture.""" tag = _uid() r = await client.post("/capture/add", data={ "raw_text": f"Line1-{tag}\n\n\nLine2-{tag}\n \n", }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT count(*) FROM capture WHERE raw_text LIKE :pat AND is_deleted = false"), {"pat": f"%-{tag}"}, ) assert result.scalar() == 2, "Empty lines should be skipped" @pytest.mark.asyncio async def test_batch_capture_gets_batch_id( self, client: AsyncClient, db_session: AsyncSession, ): """Multi-line capture assigns same import_batch_id to all items.""" tag = _uid() await client.post("/capture/add", data={ "raw_text": f"Batch1-{tag}\nBatch2-{tag}\nBatch3-{tag}", }, follow_redirects=False) result = await db_session.execute( text("SELECT DISTINCT import_batch_id FROM capture " "WHERE raw_text LIKE :pat AND is_deleted = false"), {"pat": f"Batch%-{tag}"}, ) batch_ids = [r[0] for r in result] assert len(batch_ids) == 1, "All batch items should share one batch_id" assert batch_ids[0] is not None, "Batch ID should not be null" @pytest.mark.asyncio async def test_single_line_no_batch_id( self, client: AsyncClient, db_session: AsyncSession, ): """Single-line capture has no batch_id.""" tag = _uid() await client.post("/capture/add", data={ "raw_text": f"Single-{tag}", }, follow_redirects=False) result = await db_session.execute( text("SELECT import_batch_id FROM capture WHERE raw_text = :t AND is_deleted = false"), {"t": f"Single-{tag}"}, ) row = result.first() assert row is not None assert row.import_batch_id is None @pytest.mark.asyncio async def test_batch_undo( self, client: AsyncClient, db_session: AsyncSession, ): """Batch undo soft-deletes all items in the batch.""" tag = _uid() await client.post("/capture/add", data={ "raw_text": f"Undo1-{tag}\nUndo2-{tag}", }, follow_redirects=False) result = await db_session.execute( text("SELECT import_batch_id FROM capture WHERE raw_text LIKE :pat AND is_deleted = false LIMIT 1"), {"pat": f"Undo%-{tag}"}, ) batch_id = str(result.first()[0]) r = await client.post(f"/capture/batch/{batch_id}/undo", follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT count(*) FROM capture WHERE import_batch_id = :bid AND is_deleted = false"), {"bid": batch_id}, ) assert result.scalar() == 0, "Batch undo should delete all items" @pytest.mark.asyncio async def test_convert_nonexistent_capture_redirects( self, client: AsyncClient, seed_domain: dict, ): """Converting a nonexistent capture redirects to /capture.""" fake_id = str(uuid.uuid4()) r = await client.post(f"/capture/{fake_id}/to-task", data={ "domain_id": seed_domain["id"], }, follow_redirects=False) assert r.status_code == 303 @pytest.mark.asyncio async def test_capture_inbox_vs_processed_view( self, client: AsyncClient, db_session: AsyncSession, ): """Inbox shows unprocessed, processed view shows processed.""" tag = _uid() cap_id = await _create_capture(db_session, f"ViewTest-{tag}") r = await client.get("/capture/?show=inbox") assert f"ViewTest-{tag}" in r.text # Dismiss it await client.post(f"/capture/{cap_id}/dismiss", follow_redirects=False) r = await client.get("/capture/?show=inbox") assert f"ViewTest-{tag}" not in r.text r = await client.get("/capture/?show=processed") assert f"ViewTest-{tag}" in r.text # =========================================================================== # Meeting Action Items & Relationships # =========================================================================== class TestMeetingRelationships: """Test meeting action items, attendees, and linked entities.""" @pytest.mark.asyncio async def test_create_action_item( self, client: AsyncClient, db_session: AsyncSession, seed_meeting: dict, seed_domain: dict, ): """Creating an action item creates a task and links it to the meeting.""" tag = _uid() r = await client.post(f"/meetings/{seed_meeting['id']}/action-item", data={ "title": f"ActionItem-{tag}", "domain_id": seed_domain["id"], }, follow_redirects=False) assert r.status_code == 303 # Task should exist result = await db_session.execute( text("SELECT id FROM tasks WHERE title = :t AND is_deleted = false"), {"t": f"ActionItem-{tag}"}, ) task_row = result.first() assert task_row is not None, "Action item task not created" # Junction should exist result = await db_session.execute( text("SELECT source FROM meeting_tasks WHERE meeting_id = :mid AND task_id = :tid"), {"mid": seed_meeting["id"], "tid": str(task_row.id)}, ) junc = result.first() assert junc is not None, "meeting_tasks junction not created" assert junc.source == "action_item" @pytest.mark.asyncio async def test_action_item_appears_on_meeting_detail( self, client: AsyncClient, db_session: AsyncSession, seed_meeting: dict, seed_domain: dict, ): tag = _uid() await client.post(f"/meetings/{seed_meeting['id']}/action-item", data={ "title": f"DetailItem-{tag}", "domain_id": seed_domain["id"], }, follow_redirects=False) r = await client.get(f"/meetings/{seed_meeting['id']}") assert f"DetailItem-{tag}" in r.text @pytest.mark.asyncio async def test_meeting_status_filter( self, client: AsyncClient, ): """Meeting list can be filtered by status.""" r = await client.get("/meetings/?status=scheduled") assert r.status_code == 200 @pytest.mark.asyncio async def test_meeting_detail_loads_all_sections( self, client: AsyncClient, seed_meeting: dict, ): """Meeting detail page loads action items, notes, decisions, attendees sections.""" r = await client.get(f"/meetings/{seed_meeting['id']}") assert r.status_code == 200 # =========================================================================== # Admin / Trash Full Lifecycle # =========================================================================== class TestAdminTrashLifecycle: """Full lifecycle: create item -> soft delete -> verify in trash -> permanent delete -> confirm gone.""" @pytest.mark.asyncio async def test_note_trash_lifecycle( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): unique_title = f"TrashTest-{uuid.uuid4().hex[:8]}" r = await client.post("/notes/create", data={ "title": unique_title, "domain_id": seed_domain["id"], "body": "Trash lifecycle test body", "content_format": "markdown", }, follow_redirects=False) assert r.status_code == 303 location = r.headers.get("location", "") note_id = location.rstrip("/").split("/")[-1] assert note_id and note_id != "notes" r = await client.get("/notes/") assert unique_title in r.text r = await client.post(f"/notes/{note_id}/delete", follow_redirects=False) assert r.status_code in (303, 302) r = await client.get("/notes/") assert unique_title not in r.text result = await db_session.execute( text("SELECT is_deleted, deleted_at FROM notes WHERE id = :id"), {"id": note_id}, ) row = result.first() assert row.is_deleted is True assert row.deleted_at is not None r = await client.get("/admin/trash/?entity_type=notes") assert unique_title in r.text r = await client.post(f"/admin/trash/notes/{note_id}/permanent-delete", follow_redirects=False) assert r.status_code == 303 r = await client.get("/admin/trash/?entity_type=notes") assert unique_title not in r.text result = await db_session.execute( text("SELECT count(*) FROM notes WHERE id = :id"), {"id": note_id}, ) assert result.scalar() == 0 @pytest.mark.asyncio async def test_task_trash_lifecycle( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): unique_title = f"TrashTask-{uuid.uuid4().hex[:8]}" task_id = str(uuid.uuid4()) await db_session.execute( text("INSERT INTO tasks (id, title, domain_id, project_id, status, priority, sort_order, is_deleted, created_at, updated_at) " "VALUES (:id, :title, :did, :pid, 'open', 3, 0, false, now(), now())"), {"id": task_id, "title": unique_title, "did": seed_domain["id"], "pid": seed_project["id"]}, ) await db_session.commit() r = await client.get("/tasks/") assert unique_title in r.text r = await client.post(f"/tasks/{task_id}/delete", follow_redirects=False) assert r.status_code in (303, 302) r = await client.get("/admin/trash/?entity_type=tasks") assert unique_title in r.text r = await client.post(f"/admin/trash/tasks/{task_id}/restore", follow_redirects=False) assert r.status_code == 303 r = await client.get("/tasks/") assert unique_title in r.text r = await client.get("/admin/trash/?entity_type=tasks") assert unique_title not in r.text await client.post(f"/tasks/{task_id}/delete", follow_redirects=False) await client.post(f"/admin/trash/tasks/{task_id}/permanent-delete", follow_redirects=False) @pytest.mark.asyncio async def test_trash_page_loads_with_counts(self, client: AsyncClient): r = await client.get("/admin/trash/") assert r.status_code == 200 assert "Trash" in r.text @pytest.mark.asyncio async def test_permanent_delete_invalid_table_redirects(self, client: AsyncClient): fake_id = str(uuid.uuid4()) r = await client.post(f"/admin/trash/nonexistent_table/{fake_id}/permanent-delete", follow_redirects=False) assert r.status_code == 303 @pytest.mark.asyncio async def test_restore_invalid_table_redirects(self, client: AsyncClient): fake_id = str(uuid.uuid4()) r = await client.post(f"/admin/trash/nonexistent_table/{fake_id}/restore", follow_redirects=False) assert r.status_code == 303 @pytest.mark.asyncio async def test_empty_trash( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): """Empty trash permanently deletes all soft-deleted items.""" # Create and soft-delete a note tag = _uid() note_id = str(uuid.uuid4()) await db_session.execute( text("INSERT INTO notes (id, title, domain_id, body, content_format, is_deleted, deleted_at, created_at, updated_at) " "VALUES (:id, :title, :did, 'body', 'markdown', true, now(), now(), now())"), {"id": note_id, "title": f"EmptyTrash-{tag}", "did": seed_domain["id"]}, ) await db_session.commit() r = await client.post("/admin/trash/empty", follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT count(*) FROM notes WHERE id = :id"), {"id": note_id}, ) assert result.scalar() == 0, "Empty trash should permanently delete items" @pytest.mark.asyncio async def test_trash_filter_by_entity_type(self, client: AsyncClient): """Trash page can filter by entity type.""" for entity_type in ("tasks", "notes", "projects", "contacts"): r = await client.get(f"/admin/trash/?entity_type={entity_type}") assert r.status_code == 200 # =========================================================================== # Soft Delete & Restore # =========================================================================== class TestSoftDeleteBehavior: """Soft-deleted items should vanish from lists and reappear after restore.""" @pytest.mark.asyncio async def test_deleted_task_hidden_from_list( self, client: AsyncClient, seed_task: dict, ): await client.post(f"/tasks/{seed_task['id']}/delete", follow_redirects=False) r = await client.get("/tasks/") assert seed_task["title"] not in r.text @pytest.mark.asyncio async def test_restore_task_reappears( self, client: AsyncClient, seed_task: dict, ): await client.post(f"/tasks/{seed_task['id']}/delete", follow_redirects=False) await client.post( f"/admin/trash/tasks/{seed_task['id']}/restore", follow_redirects=False, ) r = await client.get("/tasks/") assert seed_task["title"] in r.text @pytest.mark.asyncio async def test_deleted_project_hidden( self, client: AsyncClient, seed_project: dict, ): await client.post(f"/projects/{seed_project['id']}/delete", follow_redirects=False) r = await client.get("/projects/") assert seed_project["name"] not in r.text # =========================================================================== # Search # =========================================================================== class TestSearchBehavior: @pytest.mark.asyncio async def test_search_does_not_crash_on_sql_injection(self, client: AsyncClient): r = await client.get("/search/?q='; DROP TABLE tasks; --") assert r.status_code == 200 @pytest.mark.asyncio async def test_search_empty_query(self, client: AsyncClient): r = await client.get("/search/?q=") assert r.status_code == 200 @pytest.mark.asyncio async def test_search_special_unicode(self, client: AsyncClient): r = await client.get("/search/?q=日本語テスト") assert r.status_code == 200 @pytest.mark.asyncio async def test_search_api_returns_json(self, client: AsyncClient): """Search API endpoint returns valid JSON with results array.""" r = await client.get("/search/api?q=Test") assert r.status_code == 200 data = r.json() assert "results" in data assert "query" in data assert isinstance(data["results"], list) @pytest.mark.asyncio async def test_search_api_finds_seed_task( self, client: AsyncClient, seed_task: dict, ): """Search finds existing seed task by title.""" r = await client.get("/search/api?q=Test+Task") data = r.json() task_results = [r for r in data["results"] if r["type"] == "tasks"] # May or may not find it depending on search_vector trigger, but shouldn't crash assert r.status_code == 200 @pytest.mark.asyncio async def test_search_api_entity_type_filter(self, client: AsyncClient): """Search API can filter by entity_type.""" r = await client.get("/search/api?q=Test&entity_type=tasks") assert r.status_code == 200 data = r.json() for result in data["results"]: assert result["type"] == "tasks" @pytest.mark.asyncio async def test_search_api_limit_param(self, client: AsyncClient): """Search API respects limit parameter.""" r = await client.get("/search/api?q=Test&limit=2") assert r.status_code == 200 @pytest.mark.asyncio async def test_search_deleted_items_excluded( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): """Soft-deleted items should not appear in search results.""" tag = _uid() # Create a note with searchable title, then soft-delete it note_id = str(uuid.uuid4()) await db_session.execute( text("INSERT INTO notes (id, title, domain_id, body, content_format, is_deleted, created_at, updated_at) " "VALUES (:id, :title, :did, 'body', 'markdown', false, now(), now())"), {"id": note_id, "title": f"SearchDelete-{tag}", "did": seed_domain["id"]}, ) await db_session.commit() # Now soft-delete it await db_session.execute( text("UPDATE notes SET is_deleted = true, deleted_at = now() WHERE id = :id"), {"id": note_id}, ) await db_session.commit() r = await client.get(f"/search/api?q=SearchDelete-{tag}") data = r.json() ids = [r["id"] for r in data["results"]] assert note_id not in ids, "Deleted note should not appear in search" # Cleanup await db_session.execute(text("DELETE FROM notes WHERE id = :id"), {"id": note_id}) await db_session.commit() # =========================================================================== # Sidebar # =========================================================================== class TestSidebarIntegrity: @pytest.mark.asyncio async def test_sidebar_shows_domain_on_every_page( self, client: AsyncClient, seed_domain: dict, ): for path in ("/", "/tasks/", "/notes/", "/projects/"): r = await client.get(path) assert seed_domain["name"] in r.text, f"Domain missing from sidebar on {path}" @pytest.mark.asyncio async def test_sidebar_shows_project_hierarchy( self, client: AsyncClient, seed_domain: dict, seed_area: dict, seed_project: dict, ): r = await client.get("/") assert seed_project["name"] in r.text @pytest.mark.asyncio async def test_archived_project_hidden_from_sidebar( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): """Archived projects should not appear in the sidebar.""" tag = _uid() proj_id = str(uuid.uuid4()) await db_session.execute( text("INSERT INTO projects (id, name, domain_id, status, priority, sort_order, is_deleted, created_at, updated_at) " "VALUES (:id, :name, :did, 'archived', 3, 0, false, now(), now())"), {"id": proj_id, "name": f"Archived-{tag}", "did": seed_domain["id"]}, ) await db_session.commit() r = await client.get("/") assert f"Archived-{tag}" not in r.text, "Archived project should be hidden from sidebar" await db_session.execute(text("DELETE FROM projects WHERE id = :id"), {"id": proj_id}) await db_session.commit() @pytest.mark.asyncio async def test_sidebar_capture_badge_count( self, client: AsyncClient, db_session: AsyncSession, ): """Sidebar includes capture count for unprocessed items.""" # The sidebar data includes capture_count - we verify the page loads # and the sidebar renders (capture_count is passed to template) r = await client.get("/") assert r.status_code == 200 # The sidebar should render without error assert "sidebar" in r.text.lower() or "nav" in r.text.lower() # =========================================================================== # Focus & Capture Workflows # =========================================================================== class TestFocusWorkflow: @pytest.mark.asyncio async def test_add_and_remove_from_focus( self, client: AsyncClient, db_session: AsyncSession, seed_task: dict, ): r = await client.post("/focus/add", data={ "task_id": seed_task["id"], "focus_date": str(date.today()), }, follow_redirects=False) assert r.status_code in (303, 302) @pytest.mark.asyncio async def test_capture_multi_line_creates_multiple( self, client: AsyncClient, db_session: AsyncSession, ): await client.post( "/capture/add", data={"raw_text": "Line one\nLine two\nLine three"}, follow_redirects=False, ) result = await db_session.execute( text("SELECT count(*) FROM capture WHERE is_deleted = false") ) count = result.scalar() assert count >= 2, f"Expected multiple capture items, got {count}" @pytest.mark.asyncio async def test_focus_toggle_syncs_task_status( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): """Toggling focus completion also toggles task status.""" tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"FocusSync-{_uid()}") today = str(date.today()) # Add to focus r = await client.post("/focus/add", data={ "task_id": tid, "focus_date": today, }, follow_redirects=False) assert r.status_code == 303 # Get focus item id result = await db_session.execute( text("SELECT id FROM daily_focus WHERE task_id = :tid AND focus_date = :fd AND is_deleted = false"), {"tid": tid, "fd": date.today()}, ) focus_id = str(result.first().id) # Toggle complete await client.post(f"/focus/{focus_id}/toggle", follow_redirects=False) result = await db_session.execute( text("SELECT status FROM tasks WHERE id = :id"), {"id": tid}, ) assert result.first().status == "done", "Task should be marked done when focus toggled" # Toggle back await client.post(f"/focus/{focus_id}/toggle", follow_redirects=False) result = await db_session.execute( text("SELECT status FROM tasks WHERE id = :id"), {"id": tid}, ) assert result.first().status == "open", "Task should be reopened when focus un-toggled" await _delete_tasks(db_session, [tid]) @pytest.mark.asyncio async def test_focus_remove_soft_deletes( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): """Removing from focus soft-deletes the focus entry.""" tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"FocusRm-{_uid()}") today = str(date.today()) await client.post("/focus/add", data={"task_id": tid, "focus_date": today}, follow_redirects=False) result = await db_session.execute( text("SELECT id FROM daily_focus WHERE task_id = :tid AND focus_date = :fd AND is_deleted = false"), {"tid": tid, "fd": date.today()}, ) focus_id = str(result.first().id) await client.post(f"/focus/{focus_id}/remove", follow_redirects=False) result = await db_session.execute( text("SELECT is_deleted FROM daily_focus WHERE id = :id"), {"id": focus_id}, ) assert result.first().is_deleted is True await _delete_tasks(db_session, [tid]) @pytest.mark.asyncio async def test_focus_page_loads_with_date(self, client: AsyncClient): """Focus page loads for specific date.""" r = await client.get(f"/focus/?focus_date={date.today()}") assert r.status_code == 200 # =========================================================================== # List Items & Hierarchy # =========================================================================== class TestListItems: """Test list item CRUD, toggle, nesting.""" @pytest.mark.asyncio async def test_add_item_to_list( self, client: AsyncClient, db_session: AsyncSession, seed_list: dict, ): tag = _uid() r = await client.post(f"/lists/{seed_list['id']}/items/add", data={ "content": f"Item-{tag}", }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT content, completed FROM list_items WHERE list_id = :lid AND content = :c AND is_deleted = false"), {"lid": seed_list["id"], "c": f"Item-{tag}"}, ) row = result.first() assert row is not None assert row.completed is False @pytest.mark.asyncio async def test_toggle_item_completion( self, client: AsyncClient, db_session: AsyncSession, seed_list: dict, ): """Toggle sets completed=true and completed_at, toggle again reverts.""" item_id = await _create_list_item(db_session, seed_list["id"], f"Toggle-{_uid()}") # Toggle to completed r = await client.post( f"/lists/{seed_list['id']}/items/{item_id}/toggle", follow_redirects=False, ) assert r.status_code == 303 result = await db_session.execute( text("SELECT completed, completed_at FROM list_items WHERE id = :id"), {"id": item_id}, ) row = result.first() assert row.completed is True assert row.completed_at is not None # Toggle back await client.post( f"/lists/{seed_list['id']}/items/{item_id}/toggle", follow_redirects=False, ) result = await db_session.execute( text("SELECT completed, completed_at FROM list_items WHERE id = :id"), {"id": item_id}, ) row = result.first() assert row.completed is False assert row.completed_at is None @pytest.mark.asyncio async def test_nested_item( self, client: AsyncClient, db_session: AsyncSession, seed_list: dict, ): """Items can be nested via parent_item_id.""" parent_id = await _create_list_item(db_session, seed_list["id"], f"Parent-{_uid()}") tag = _uid() r = await client.post(f"/lists/{seed_list['id']}/items/add", data={ "content": f"Child-{tag}", "parent_item_id": parent_id, }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT parent_item_id FROM list_items WHERE content = :c AND is_deleted = false"), {"c": f"Child-{tag}"}, ) assert str(result.first().parent_item_id) == parent_id @pytest.mark.asyncio async def test_edit_item_content( self, client: AsyncClient, db_session: AsyncSession, seed_list: dict, ): item_id = await _create_list_item(db_session, seed_list["id"], "OldContent") r = await client.post( f"/lists/{seed_list['id']}/items/{item_id}/edit", data={"content": "NewContent"}, follow_redirects=False, ) assert r.status_code == 303 result = await db_session.execute( text("SELECT content FROM list_items WHERE id = :id"), {"id": item_id}, ) assert result.first().content == "NewContent" @pytest.mark.asyncio async def test_delete_item( self, client: AsyncClient, db_session: AsyncSession, seed_list: dict, ): item_id = await _create_list_item(db_session, seed_list["id"], f"DeleteMe-{_uid()}") r = await client.post( f"/lists/{seed_list['id']}/items/{item_id}/delete", follow_redirects=False, ) assert r.status_code == 303 result = await db_session.execute( text("SELECT is_deleted FROM list_items WHERE id = :id"), {"id": item_id}, ) assert result.first().is_deleted is True @pytest.mark.asyncio async def test_list_item_count_accuracy( self, client: AsyncClient, db_session: AsyncSession, seed_list: dict, ): """List view shows accurate item_count and completed_count.""" i1 = await _create_list_item(db_session, seed_list["id"], f"Count1-{_uid()}") i2 = await _create_list_item(db_session, seed_list["id"], f"Count2-{_uid()}") # Complete one await db_session.execute( text("UPDATE list_items SET completed = true, completed_at = now() WHERE id = :id"), {"id": i1}, ) await db_session.commit() r = await client.get("/lists/") assert r.status_code == 200 # Page should load without error (counts are subqueries) # =========================================================================== # Decisions & Supersession # =========================================================================== class TestDecisions: @pytest.mark.asyncio async def test_decision_filter_by_status(self, client: AsyncClient): for status in ("proposed", "decided", "archived", "rejected"): r = await client.get(f"/decisions/?status={status}") assert r.status_code == 200 @pytest.mark.asyncio async def test_decision_filter_by_impact(self, client: AsyncClient): for impact in ("high", "medium", "low"): r = await client.get(f"/decisions/?impact={impact}") assert r.status_code == 200 @pytest.mark.asyncio async def test_decision_supersession( self, client: AsyncClient, db_session: AsyncSession, ): """Decision A can be superseded by Decision B.""" a_id = str(uuid.uuid4()) b_id = str(uuid.uuid4()) await db_session.execute( text("INSERT INTO decisions (id, title, status, impact, is_deleted, created_at, updated_at) " "VALUES (:id, 'DecA', 'decided', 'high', false, now(), now())"), {"id": a_id}, ) await db_session.execute( text("INSERT INTO decisions (id, title, status, impact, is_deleted, created_at, updated_at) " "VALUES (:id, 'DecB', 'decided', 'high', false, now(), now())"), {"id": b_id}, ) await db_session.commit() # Set A superseded by B r = await client.post(f"/decisions/{a_id}/edit", data={ "title": "DecA", "status": "decided", "impact": "high", "superseded_by_id": b_id, }, follow_redirects=False) assert r.status_code == 303 # Check detail page shows supersession r = await client.get(f"/decisions/{a_id}") assert "DecB" in r.text, "Superseding decision should appear on detail" # B's detail should show it supersedes A r = await client.get(f"/decisions/{b_id}") assert "DecA" in r.text, "Superseded decision should appear" # Cleanup await db_session.execute(text("DELETE FROM decisions WHERE id IN (:a, :b)"), {"a": a_id, "b": b_id}) await db_session.commit() @pytest.mark.asyncio async def test_decision_meeting_link( self, client: AsyncClient, db_session: AsyncSession, seed_meeting: dict, ): """Decision can be linked to a meeting.""" tag = _uid() r = await client.post("/decisions/create", data={ "title": f"MeetDec-{tag}", "status": "proposed", "impact": "medium", "meeting_id": seed_meeting["id"], }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT meeting_id FROM decisions WHERE title = :t AND is_deleted = false"), {"t": f"MeetDec-{tag}"}, ) assert str(result.first().meeting_id) == seed_meeting["id"] # =========================================================================== # Appointments & Contact Associations # =========================================================================== class TestAppointments: @pytest.mark.asyncio async def test_timeframe_upcoming(self, client: AsyncClient): r = await client.get("/appointments/?timeframe=upcoming") assert r.status_code == 200 @pytest.mark.asyncio async def test_timeframe_past(self, client: AsyncClient): r = await client.get("/appointments/?timeframe=past") assert r.status_code == 200 @pytest.mark.asyncio async def test_timeframe_all(self, client: AsyncClient): r = await client.get("/appointments/?timeframe=all") assert r.status_code == 200 @pytest.mark.asyncio async def test_create_appointment_with_contacts( self, client: AsyncClient, db_session: AsyncSession, seed_contact: dict, ): """Creating an appointment with contacts creates junction records.""" tag = _uid() tomorrow = str(date.today() + timedelta(days=1)) r = await client.post("/appointments/create", data={ "title": f"Appt-{tag}", "start_date": tomorrow, "start_time": "10:00", "end_date": tomorrow, "end_time": "11:00", "contact_ids": seed_contact["id"], }, follow_redirects=False) assert r.status_code == 303 # Find the appointment result = await db_session.execute( text("SELECT id FROM appointments WHERE title = :t AND is_deleted = false"), {"t": f"Appt-{tag}"}, ) appt_id = str(result.first().id) # Check junction result = await db_session.execute( text("SELECT count(*) FROM contact_appointments WHERE appointment_id = :aid"), {"aid": appt_id}, ) assert result.scalar() >= 1, "Contact junction not created" @pytest.mark.asyncio async def test_all_day_appointment( self, client: AsyncClient, db_session: AsyncSession, ): """All-day appointment sets start_at to T00:00:00.""" tag = _uid() tomorrow = str(date.today() + timedelta(days=1)) r = await client.post("/appointments/create", data={ "title": f"AllDay-{tag}", "start_date": tomorrow, "end_date": tomorrow, "all_day": "on", }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT all_day, start_at FROM appointments WHERE title = :t AND is_deleted = false"), {"t": f"AllDay-{tag}"}, ) row = result.first() assert row.all_day is True @pytest.mark.asyncio async def test_edit_appointment_rebuilds_contacts( self, client: AsyncClient, db_session: AsyncSession, seed_contact: dict, ): """Editing an appointment rebuilds contact associations.""" tag = _uid() tomorrow = str(date.today() + timedelta(days=1)) # Create without contact r = await client.post("/appointments/create", data={ "title": f"EditAppt-{tag}", "start_date": tomorrow, }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT id FROM appointments WHERE title = :t AND is_deleted = false"), {"t": f"EditAppt-{tag}"}, ) appt_id = str(result.first().id) # Edit with contact await client.post(f"/appointments/{appt_id}/edit", data={ "title": f"EditAppt-{tag}", "start_date": tomorrow, "contact_ids": seed_contact["id"], }, follow_redirects=False) result = await db_session.execute( text("SELECT count(*) FROM contact_appointments WHERE appointment_id = :aid"), {"aid": appt_id}, ) assert result.scalar() >= 1 @pytest.mark.asyncio async def test_appointment_detail_shows_contacts( self, client: AsyncClient, seed_appointment: dict, ): r = await client.get(f"/appointments/{seed_appointment['id']}") assert r.status_code == 200 # =========================================================================== # Project Progress & Hierarchy # =========================================================================== class TestProjectHierarchy: @pytest.mark.asyncio async def test_project_progress_calculation( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): """Project progress shows correct done/total percentage.""" tag = _uid() proj_id = str(uuid.uuid4()) await db_session.execute( text("INSERT INTO projects (id, name, domain_id, status, priority, sort_order, is_deleted, created_at, updated_at) " "VALUES (:id, :name, :did, 'active', 3, 0, false, now(), now())"), {"id": proj_id, "name": f"Prog-{tag}", "did": seed_domain["id"]}, ) # 2 tasks: 1 done, 1 open = 50% t1 = await _create_task(db_session, seed_domain["id"], proj_id, f"ProgOpen-{tag}") t2 = await _create_task(db_session, seed_domain["id"], proj_id, f"ProgDone-{tag}", status="done") r = await client.get("/projects/") assert r.status_code == 200 # Page should show project with progress (won't test exact %, just that it loads) assert f"Prog-{tag}" in r.text await _delete_tasks(db_session, [t1, t2]) await db_session.execute(text("DELETE FROM projects WHERE id = :id"), {"id": proj_id}) await db_session.commit() @pytest.mark.asyncio async def test_project_progress_zero_tasks( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): """Project with zero tasks shows 0% progress (no division by zero).""" tag = _uid() proj_id = str(uuid.uuid4()) await db_session.execute( text("INSERT INTO projects (id, name, domain_id, status, priority, sort_order, is_deleted, created_at, updated_at) " "VALUES (:id, :name, :did, 'active', 3, 0, false, now(), now())"), {"id": proj_id, "name": f"ZeroProg-{tag}", "did": seed_domain["id"]}, ) await db_session.commit() r = await client.get("/projects/") assert r.status_code == 200 # Should not crash from division by zero await db_session.execute(text("DELETE FROM projects WHERE id = :id"), {"id": proj_id}) await db_session.commit() @pytest.mark.asyncio async def test_project_filter_by_status(self, client: AsyncClient): for status in ("active", "on_hold", "completed", "archived"): r = await client.get(f"/projects/?status={status}") assert r.status_code == 200 @pytest.mark.asyncio async def test_project_filter_by_domain( self, client: AsyncClient, seed_domain: dict, ): r = await client.get(f"/projects/?domain_id={seed_domain['id']}") assert r.status_code == 200 @pytest.mark.asyncio async def test_project_detail_tabs( self, client: AsyncClient, seed_project: dict, ): """Project detail loads with different tab parameters.""" for tab in ("tasks", "notes", "links"): r = await client.get(f"/projects/{seed_project['id']}?tab={tab}") assert r.status_code == 200 # =========================================================================== # Cascade / Orphan Behavior # =========================================================================== class TestCascadeBehavior: """Test cross-entity effects when parent entities are deleted.""" @pytest.mark.asyncio async def test_deleted_task_hidden_from_focus( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): """When a task is deleted, focus entries referencing it should not crash focus page.""" tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"FocusCascade-{_uid()}") today = str(date.today()) await client.post("/focus/add", data={"task_id": tid, "focus_date": today}, follow_redirects=False) # Delete the task await client.post(f"/tasks/{tid}/delete", follow_redirects=False) # Focus page should still load (JOIN filters deleted tasks) r = await client.get(f"/focus/?focus_date={today}") assert r.status_code == 200 @pytest.mark.asyncio async def test_deleted_project_tasks_still_accessible( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): """Tasks survive when their project is soft-deleted.""" tag = _uid() proj_id = str(uuid.uuid4()) await db_session.execute( text("INSERT INTO projects (id, name, domain_id, status, priority, sort_order, is_deleted, created_at, updated_at) " "VALUES (:id, :name, :did, 'active', 3, 0, false, now(), now())"), {"id": proj_id, "name": f"CascProj-{tag}", "did": seed_domain["id"]}, ) tid = await _create_task(db_session, seed_domain["id"], proj_id, f"OrphanTask-{tag}") # Delete the project await client.post(f"/projects/{proj_id}/delete", follow_redirects=False) # Task should still appear on tasks list r = await client.get("/tasks/") assert f"OrphanTask-{tag}" in r.text, "Task should survive project deletion" # Task detail should still load r = await client.get(f"/tasks/{tid}") assert r.status_code == 200 await _delete_tasks(db_session, [tid]) await db_session.execute(text("DELETE FROM projects WHERE id = :id"), {"id": proj_id}) await db_session.commit() @pytest.mark.asyncio async def test_time_entries_cascade_on_task_delete( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, seed_project: dict, ): """time_entries FK has ON DELETE CASCADE - entries deleted with task.""" tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"TimeCascade-{_uid()}") # Create a time entry for this task await db_session.execute( text("INSERT INTO time_entries (task_id, start_at, end_at, duration_minutes, is_deleted, created_at) " "VALUES (:tid, now(), now(), 10, false, now())"), {"tid": tid}, ) await db_session.commit() # Hard delete the task (CASCADE should remove time_entries) await db_session.execute(text("DELETE FROM tasks WHERE id = :id"), {"id": tid}) await db_session.commit() result = await db_session.execute( text("SELECT count(*) FROM time_entries WHERE task_id = :tid"), {"tid": tid}, ) assert result.scalar() == 0, "time_entries should cascade delete with task" @pytest.mark.asyncio async def test_list_items_cascade_on_list_delete( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): """list_items FK has ON DELETE CASCADE - items deleted with list.""" tag = _uid() list_id = str(uuid.uuid4()) await db_session.execute( text("INSERT INTO lists (id, name, domain_id, list_type, sort_order, is_deleted, created_at, updated_at) " "VALUES (:id, :name, :did, 'checklist', 0, false, now(), now())"), {"id": list_id, "name": f"CascList-{tag}", "did": seed_domain["id"]}, ) item_id = await _create_list_item(db_session, list_id, f"CascItem-{tag}") # Hard delete the list await db_session.execute(text("DELETE FROM lists WHERE id = :id"), {"id": list_id}) await db_session.commit() result = await db_session.execute( text("SELECT count(*) FROM list_items WHERE id = :id"), {"id": item_id}, ) assert result.scalar() == 0, "list_items should cascade delete with list" # =========================================================================== # File Operations # =========================================================================== class TestFileOperations: @pytest.mark.asyncio async def test_file_list_loads(self, client: AsyncClient): r = await client.get("/files/") assert r.status_code == 200 @pytest.mark.asyncio async def test_file_download_nonexistent_redirects(self, client: AsyncClient): """Download of nonexistent file redirects to file list.""" fake_id = str(uuid.uuid4()) r = await client.get(f"/files/{fake_id}/download", follow_redirects=False) assert r.status_code == 303 @pytest.mark.asyncio async def test_file_preview_nonexistent_redirects(self, client: AsyncClient): fake_id = str(uuid.uuid4()) r = await client.get(f"/files/{fake_id}/preview", follow_redirects=False) assert r.status_code == 303 @pytest.mark.asyncio async def test_file_context_filter(self, client: AsyncClient): """Files page loads with context filter params.""" r = await client.get(f"/files/?context_type=tasks&context_id={uuid.uuid4()}") assert r.status_code == 200 @pytest.mark.asyncio async def test_file_serve_nonexistent_redirects(self, client: AsyncClient): fake_id = str(uuid.uuid4()) r = await client.get(f"/files/{fake_id}/serve", follow_redirects=False) assert r.status_code == 303 # =========================================================================== # Form Validation & Edge Cases # =========================================================================== class TestFormValidation: @pytest.mark.asyncio async def test_task_create_missing_title_returns_422(self, client: AsyncClient, seed_domain: dict): """Missing required 'title' field returns 422.""" r = await client.post("/tasks/create", data={ "domain_id": seed_domain["id"], "priority": "3", }, follow_redirects=False) assert r.status_code == 422 @pytest.mark.asyncio async def test_task_create_missing_domain_returns_422(self, client: AsyncClient): """Missing required 'domain_id' field returns 422.""" r = await client.post("/tasks/create", data={ "title": "NoDomain", "priority": "3", }, follow_redirects=False) assert r.status_code == 422 @pytest.mark.asyncio async def test_note_create_missing_title_returns_422(self, client: AsyncClient): r = await client.post("/notes/create", data={ "body": "No title", }, follow_redirects=False) assert r.status_code == 422 @pytest.mark.asyncio async def test_tags_split_correctly( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): """Tags string is split by comma into array.""" tag = _uid() r = await client.post("/notes/create", data={ "title": f"TagTest-{tag}", "domain_id": seed_domain["id"], "content_format": "markdown", "tags": "alpha, beta, gamma", }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT tags FROM notes WHERE title = :t AND is_deleted = false"), {"t": f"TagTest-{tag}"}, ) row = result.first() assert row is not None tags = row.tags assert "alpha" in tags assert "beta" in tags assert "gamma" in tags @pytest.mark.asyncio async def test_empty_tags_stored_as_null( self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict, ): """Empty tags string doesn't create an array with empty strings.""" tag = _uid() r = await client.post("/notes/create", data={ "title": f"NoTag-{tag}", "domain_id": seed_domain["id"], "content_format": "markdown", "tags": "", }, follow_redirects=False) assert r.status_code == 303 result = await db_session.execute( text("SELECT tags FROM notes WHERE title = :t AND is_deleted = false"), {"t": f"NoTag-{tag}"}, ) row = result.first() # Tags should be null or empty, not [""] assert row.tags is None or row.tags == [] # =========================================================================== # Edge Cases # =========================================================================== class TestEdgeCases: @pytest.mark.asyncio async def test_invalid_uuid_in_path(self, client: AsyncClient): r = await client.get("/tasks/not-a-valid-uuid") assert r.status_code in (404, 422, 400, 303) @pytest.mark.asyncio async def test_timer_start_without_task_id(self, client: AsyncClient): r = await client.post("/time/start", data={}, follow_redirects=False) assert r.status_code != 200 @pytest.mark.asyncio async def test_double_delete_doesnt_crash( self, client: AsyncClient, seed_task: dict, ): await client.post(f"/tasks/{seed_task['id']}/delete", follow_redirects=False) r = await client.post(f"/tasks/{seed_task['id']}/delete", follow_redirects=False) assert r.status_code in (303, 302, 404) @pytest.mark.asyncio async def test_toggle_nonexistent_task_redirects(self, client: AsyncClient): fake_id = str(uuid.uuid4()) r = await client.post(f"/tasks/{fake_id}/toggle", follow_redirects=False) assert r.status_code == 303 @pytest.mark.asyncio async def test_nonexistent_detail_pages_redirect(self, client: AsyncClient): """Nonexistent entity detail pages redirect gracefully.""" fake_id = str(uuid.uuid4()) for path in ( f"/tasks/{fake_id}", f"/notes/{fake_id}", f"/projects/{fake_id}", f"/meetings/{fake_id}", f"/decisions/{fake_id}", f"/appointments/{fake_id}", f"/lists/{fake_id}", ): r = await client.get(path, follow_redirects=False) assert r.status_code == 303, f"{path} did not redirect for nonexistent ID" @pytest.mark.asyncio async def test_stop_timer_when_none_running(self, client: AsyncClient): """Stopping timer when none running should not crash.""" await client.post("/time/stop", follow_redirects=False) r = await client.post("/time/stop", follow_redirects=False) assert r.status_code == 303 @pytest.mark.asyncio async def test_capture_add_redirect_to( self, client: AsyncClient, ): """Capture add with redirect_to redirects to specified URL.""" r = await client.post("/capture/add", data={ "raw_text": "redirect test", "redirect_to": "/tasks/", }, follow_redirects=False) assert r.status_code == 303 assert r.headers.get("location") == "/tasks/" @pytest.mark.asyncio async def test_capture_add_ignores_external_redirect( self, client: AsyncClient, ): """Capture add ignores redirect_to that doesn't start with /.""" r = await client.post("/capture/add", data={ "raw_text": "external redirect test", "redirect_to": "https://evil.com", }, follow_redirects=False) assert r.status_code == 303 assert r.headers.get("location") == "/capture" # =========================================================================== # Helpers # =========================================================================== def _uid() -> str: """Short unique ID for test data.""" return uuid.uuid4().hex[:8] async def _create_task( db: AsyncSession, domain_id: str, project_id: str, title: str, status: str = "open", priority: int = 3, parent_id: str = None, ) -> str: _id = str(uuid.uuid4()) await db.execute( text("INSERT INTO tasks (id, domain_id, project_id, parent_id, title, status, priority, sort_order, is_deleted, created_at, updated_at) " "VALUES (:id, :did, :pid, :parent_id, :title, :status, :priority, 0, false, now(), now())"), {"id": _id, "did": domain_id, "pid": project_id, "parent_id": parent_id, "title": title, "status": status, "priority": priority}, ) await db.commit() return _id async def _delete_tasks(db: AsyncSession, task_ids: list[str]) -> None: for tid in task_ids: await db.execute(text("DELETE FROM tasks WHERE id = :id"), {"id": tid}) await db.commit() async def _create_capture(db: AsyncSession, raw_text: str) -> str: _id = str(uuid.uuid4()) await db.execute( text("INSERT INTO capture (id, raw_text, processed, is_deleted, created_at, updated_at) " "VALUES (:id, :text, false, false, now(), now())"), {"id": _id, "text": raw_text}, ) await db.commit() return _id async def _create_list_item(db: AsyncSession, list_id: str, content: str) -> str: _id = str(uuid.uuid4()) await db.execute( text("INSERT INTO list_items (id, list_id, content, completed, sort_order, is_deleted, created_at, updated_at) " "VALUES (:id, :lid, :content, false, 0, false, now(), now())"), {"id": _id, "lid": list_id, "content": content}, ) await db.commit() return _id