various enhancements for new tabs and bug fixes

This commit is contained in:
2026-03-02 17:35:00 +00:00
parent 9dedf6dbf2
commit cf84d6d2dd
32 changed files with 4501 additions and 296 deletions

View File

@@ -201,6 +201,24 @@ def all_seeds(sync_conn):
ON CONFLICT (id) DO NOTHING
""", (d["focus"], d["task"]))
# Junction table seeds for contact tabs
cur.execute("""
INSERT INTO contact_tasks (contact_id, task_id, role)
VALUES (%s, %s, 'assignee') ON CONFLICT DO NOTHING
""", (d["contact"], d["task"]))
cur.execute("""
INSERT INTO contact_projects (contact_id, project_id, role)
VALUES (%s, %s, 'stakeholder') ON CONFLICT DO NOTHING
""", (d["contact"], d["project"]))
cur.execute("""
INSERT INTO contact_meetings (contact_id, meeting_id, role)
VALUES (%s, %s, 'attendee') ON CONFLICT DO NOTHING
""", (d["contact"], d["meeting"]))
cur.execute("""
INSERT INTO contact_lists (contact_id, list_id, role)
VALUES (%s, %s, 'contributor') ON CONFLICT DO NOTHING
""", (d["contact"], d["list"]))
# Process
cur.execute("""
INSERT INTO processes (id, name, process_type, status, category, is_deleted, created_at, updated_at)
@@ -253,6 +271,11 @@ def all_seeds(sync_conn):
# Cleanup: delete all seed data (reverse dependency order)
try:
# Junction tables first
cur.execute("DELETE FROM contact_tasks WHERE task_id = %s", (d["task"],))
cur.execute("DELETE FROM contact_projects WHERE project_id = %s", (d["project"],))
cur.execute("DELETE FROM contact_meetings WHERE meeting_id = %s", (d["meeting"],))
cur.execute("DELETE FROM contact_lists WHERE list_id = %s", (d["list"],))
cur.execute("DELETE FROM files WHERE id = %s", (d["file"],))
if os.path.exists(dummy_file_path):
os.remove(dummy_file_path)

View File

@@ -46,6 +46,9 @@ PREFIX_TO_SEED = {
"/time-budgets": "time_budget",
"/files": "file",
"/admin/trash": None,
"/history": None,
"/eisenhower": None,
"/calendar": None,
}
def resolve_path(path_template, seeds):

View File

@@ -1893,3 +1893,743 @@ async def _create_list_item(db: AsyncSession, list_id: str, content: str) -> str
)
await db.commit()
return _id
# ===========================================================================
# Task Detail Tabs
# ===========================================================================
class TestTaskDetailTabs:
"""Test tabbed task detail page - all tabs load, correct content per tab."""
@pytest.mark.asyncio
async def test_overview_tab_default(self, client: AsyncClient, seed_task: dict):
"""Default tab is overview."""
r = await client.get(f"/tasks/{seed_task['id']}")
assert r.status_code == 200
# The active tab has class "tab-item active"
assert 'tab-item active' in r.text
@pytest.mark.asyncio
async def test_all_tabs_return_200(self, client: AsyncClient, seed_task: dict):
"""Every tab on task detail returns 200."""
for tab in ("overview", "notes", "weblinks", "files", "lists", "decisions", "processes", "contacts"):
r = await client.get(f"/tasks/{seed_task['id']}?tab={tab}")
assert r.status_code == 200, f"Tab '{tab}' returned {r.status_code}"
@pytest.mark.asyncio
async def test_contacts_tab_shows_seed_contact(
self, client: AsyncClient, seed_task: dict, seed_contact: dict,
):
"""Contacts tab shows the linked contact from seed data."""
r = await client.get(f"/tasks/{seed_task['id']}?tab=contacts")
assert r.status_code == 200
assert seed_contact["first_name"] in r.text
@pytest.mark.asyncio
async def test_notes_tab_shows_linked_note(
self, client: AsyncClient, db_session: AsyncSession,
seed_task: dict, seed_domain: dict,
):
"""A note linked to a task appears on the notes tab."""
tag = _uid()
note_id = str(uuid.uuid4())
await db_session.execute(
text("INSERT INTO notes (id, title, domain_id, task_id, body, content_format, is_deleted, created_at, updated_at) "
"VALUES (:id, :title, :did, :tid, 'body', 'markdown', false, now(), now())"),
{"id": note_id, "title": f"TaskNote-{tag}", "did": seed_domain["id"], "tid": seed_task["id"]},
)
await db_session.commit()
r = await client.get(f"/tasks/{seed_task['id']}?tab=notes")
assert f"TaskNote-{tag}" in r.text
await db_session.execute(text("DELETE FROM notes WHERE id = :id"), {"id": note_id})
await db_session.commit()
@pytest.mark.asyncio
async def test_invalid_tab_defaults_to_overview(self, client: AsyncClient, seed_task: dict):
"""Invalid tab param still loads the page (defaults to overview)."""
r = await client.get(f"/tasks/{seed_task['id']}?tab=nonexistent")
assert r.status_code == 200
# ===========================================================================
# Task Contact Management
# ===========================================================================
class TestTaskContactManagement:
"""Test adding and removing contacts on task detail."""
@pytest.mark.asyncio
async def test_add_contact_to_task(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict, seed_contact: dict,
):
"""Adding a contact to a task creates a junction entry."""
tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"ContactTask-{_uid()}")
r = await client.post(f"/tasks/{tid}/contacts/add", data={
"contact_id": seed_contact["id"],
"role": "reviewer",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT role FROM contact_tasks WHERE task_id = :tid AND contact_id = :cid"),
{"tid": tid, "cid": seed_contact["id"]},
)
row = result.first()
assert row is not None, "Contact-task junction not created"
assert row.role == "reviewer"
# Cleanup
await db_session.execute(text("DELETE FROM contact_tasks WHERE task_id = :tid"), {"tid": tid})
await db_session.commit()
await _delete_tasks(db_session, [tid])
@pytest.mark.asyncio
async def test_remove_contact_from_task(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict, seed_contact: dict,
):
"""Removing a contact from a task deletes the junction entry."""
tid = await _create_task(db_session, seed_domain["id"], seed_project["id"], f"RmContact-{_uid()}")
await db_session.execute(
text("INSERT INTO contact_tasks (contact_id, task_id, role) VALUES (:cid, :tid, 'test') ON CONFLICT DO NOTHING"),
{"cid": seed_contact["id"], "tid": tid},
)
await db_session.commit()
r = await client.post(f"/tasks/{tid}/contacts/{seed_contact['id']}/remove", follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT count(*) FROM contact_tasks WHERE task_id = :tid AND contact_id = :cid"),
{"tid": tid, "cid": seed_contact["id"]},
)
assert result.scalar() == 0
await _delete_tasks(db_session, [tid])
@pytest.mark.asyncio
async def test_add_duplicate_contact_no_error(
self, client: AsyncClient, db_session: AsyncSession,
seed_task: dict, seed_contact: dict,
):
"""Adding same contact twice doesn't crash (ON CONFLICT DO NOTHING)."""
r = await client.post(f"/tasks/{seed_task['id']}/contacts/add", data={
"contact_id": seed_contact["id"],
}, follow_redirects=False)
assert r.status_code == 303
r = await client.post(f"/tasks/{seed_task['id']}/contacts/add", data={
"contact_id": seed_contact["id"],
}, follow_redirects=False)
assert r.status_code == 303
# ===========================================================================
# Project Detail Tabs
# ===========================================================================
class TestProjectDetailTabs:
"""Test tabbed project detail page."""
@pytest.mark.asyncio
async def test_all_project_tabs_return_200(self, client: AsyncClient, seed_project: dict):
"""Every tab on project detail returns 200."""
for tab in ("tasks", "notes", "links", "files", "lists", "decisions", "processes", "contacts"):
r = await client.get(f"/projects/{seed_project['id']}?tab={tab}")
assert r.status_code == 200, f"Project tab '{tab}' returned {r.status_code}"
@pytest.mark.asyncio
async def test_project_contacts_tab_shows_seed_contact(
self, client: AsyncClient, seed_project: dict, seed_contact: dict,
):
"""Contacts tab shows the linked contact."""
r = await client.get(f"/projects/{seed_project['id']}?tab=contacts")
assert seed_contact["first_name"] in r.text
@pytest.mark.asyncio
async def test_project_tasks_tab_shows_seed_task(
self, client: AsyncClient, db_session: AsyncSession,
seed_project: dict, seed_task: dict,
):
"""Tasks tab shows the seed task linked to this project."""
# Ensure seed project and task are not soft-deleted (earlier tests may delete them)
await db_session.execute(
text("UPDATE projects SET is_deleted = false, deleted_at = NULL WHERE id = :id"),
{"id": seed_project["id"]},
)
await db_session.execute(
text("UPDATE tasks SET is_deleted = false, deleted_at = NULL WHERE id = :id"),
{"id": seed_task["id"]},
)
await db_session.commit()
r = await client.get(f"/projects/{seed_project['id']}?tab=tasks")
assert seed_task["title"] in r.text
# ===========================================================================
# Project Contact Management
# ===========================================================================
class TestProjectContactManagement:
"""Test adding and removing contacts on project detail."""
@pytest.mark.asyncio
async def test_add_contact_to_project(
self, client: AsyncClient, db_session: AsyncSession,
seed_project: dict, seed_contact: dict,
):
"""Adding a contact to a project creates a junction entry."""
# First remove existing seed junction to test fresh add
await db_session.execute(
text("DELETE FROM contact_projects WHERE project_id = :pid AND contact_id = :cid"),
{"pid": seed_project["id"], "cid": seed_contact["id"]},
)
await db_session.commit()
r = await client.post(f"/projects/{seed_project['id']}/contacts/add", data={
"contact_id": seed_contact["id"],
"role": "lead",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT role FROM contact_projects WHERE project_id = :pid AND contact_id = :cid"),
{"pid": seed_project["id"], "cid": seed_contact["id"]},
)
row = result.first()
assert row is not None
assert row.role == "lead"
@pytest.mark.asyncio
async def test_remove_contact_from_project(
self, client: AsyncClient, db_session: AsyncSession,
seed_project: dict, seed_contact: dict,
):
"""Removing a contact from a project deletes the junction entry."""
# Ensure junction exists
await db_session.execute(
text("INSERT INTO contact_projects (contact_id, project_id, role) VALUES (:cid, :pid, 'test') ON CONFLICT DO NOTHING"),
{"cid": seed_contact["id"], "pid": seed_project["id"]},
)
await db_session.commit()
r = await client.post(f"/projects/{seed_project['id']}/contacts/{seed_contact['id']}/remove", follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT count(*) FROM contact_projects WHERE project_id = :pid AND contact_id = :cid"),
{"pid": seed_project["id"], "cid": seed_contact["id"]},
)
assert result.scalar() == 0
# Re-insert for other tests
await db_session.execute(
text("INSERT INTO contact_projects (contact_id, project_id, role) VALUES (:cid, :pid, 'stakeholder') ON CONFLICT DO NOTHING"),
{"cid": seed_contact["id"], "pid": seed_project["id"]},
)
await db_session.commit()
# ===========================================================================
# Meeting Detail Tabs
# ===========================================================================
class TestMeetingDetailTabs:
"""Test tabbed meeting detail page."""
@pytest.mark.asyncio
async def test_all_meeting_tabs_return_200(self, client: AsyncClient, seed_meeting: dict):
"""Every tab on meeting detail returns 200."""
for tab in ("overview", "notes", "weblinks", "files", "lists", "processes", "contacts"):
r = await client.get(f"/meetings/{seed_meeting['id']}?tab={tab}")
assert r.status_code == 200, f"Meeting tab '{tab}' returned {r.status_code}"
@pytest.mark.asyncio
async def test_meeting_contacts_tab_shows_seed_contact(
self, client: AsyncClient, seed_meeting: dict, seed_contact: dict,
):
"""Contacts tab shows the linked contact."""
r = await client.get(f"/meetings/{seed_meeting['id']}?tab=contacts")
assert seed_contact["first_name"] in r.text
# ===========================================================================
# Meeting Contact Management
# ===========================================================================
class TestMeetingContactManagement:
"""Test adding and removing contacts on meeting detail."""
@pytest.mark.asyncio
async def test_add_contact_to_meeting(
self, client: AsyncClient, db_session: AsyncSession,
seed_meeting: dict, seed_contact: dict,
):
"""Adding a contact to a meeting creates a junction entry."""
await db_session.execute(
text("DELETE FROM contact_meetings WHERE meeting_id = :mid AND contact_id = :cid"),
{"mid": seed_meeting["id"], "cid": seed_contact["id"]},
)
await db_session.commit()
r = await client.post(f"/meetings/{seed_meeting['id']}/contacts/add", data={
"contact_id": seed_contact["id"],
"role": "organizer",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT role FROM contact_meetings WHERE meeting_id = :mid AND contact_id = :cid"),
{"mid": seed_meeting["id"], "cid": seed_contact["id"]},
)
row = result.first()
assert row is not None
assert row.role == "organizer"
@pytest.mark.asyncio
async def test_remove_contact_from_meeting(
self, client: AsyncClient, db_session: AsyncSession,
seed_meeting: dict, seed_contact: dict,
):
"""Removing a contact from a meeting deletes the junction entry."""
await db_session.execute(
text("INSERT INTO contact_meetings (contact_id, meeting_id, role) VALUES (:cid, :mid, 'attendee') ON CONFLICT DO NOTHING"),
{"cid": seed_contact["id"], "mid": seed_meeting["id"]},
)
await db_session.commit()
r = await client.post(f"/meetings/{seed_meeting['id']}/contacts/{seed_contact['id']}/remove", follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT count(*) FROM contact_meetings WHERE meeting_id = :mid AND contact_id = :cid"),
{"mid": seed_meeting["id"], "cid": seed_contact["id"]},
)
assert result.scalar() == 0
# Re-insert for other tests
await db_session.execute(
text("INSERT INTO contact_meetings (contact_id, meeting_id, role) VALUES (:cid, :mid, 'attendee') ON CONFLICT DO NOTHING"),
{"cid": seed_contact["id"], "mid": seed_meeting["id"]},
)
await db_session.commit()
# ===========================================================================
# List Contact Management
# ===========================================================================
class TestListContactManagement:
"""Test adding and removing contacts on list detail."""
@pytest.mark.asyncio
async def test_add_contact_to_list(
self, client: AsyncClient, db_session: AsyncSession,
seed_list: dict, seed_contact: dict,
):
"""Adding a contact to a list creates a junction entry."""
await db_session.execute(
text("DELETE FROM contact_lists WHERE list_id = :lid AND contact_id = :cid"),
{"lid": seed_list["id"], "cid": seed_contact["id"]},
)
await db_session.commit()
r = await client.post(f"/lists/{seed_list['id']}/contacts/add", data={
"contact_id": seed_contact["id"],
"role": "owner",
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT role FROM contact_lists WHERE list_id = :lid AND contact_id = :cid"),
{"lid": seed_list["id"], "cid": seed_contact["id"]},
)
row = result.first()
assert row is not None
assert row.role == "owner"
@pytest.mark.asyncio
async def test_remove_contact_from_list(
self, client: AsyncClient, db_session: AsyncSession,
seed_list: dict, seed_contact: dict,
):
"""Removing a contact from a list deletes the junction entry."""
await db_session.execute(
text("INSERT INTO contact_lists (contact_id, list_id, role) VALUES (:cid, :lid, 'test') ON CONFLICT DO NOTHING"),
{"cid": seed_contact["id"], "lid": seed_list["id"]},
)
await db_session.commit()
r = await client.post(f"/lists/{seed_list['id']}/contacts/{seed_contact['id']}/remove", follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT count(*) FROM contact_lists WHERE list_id = :lid AND contact_id = :cid"),
{"lid": seed_list["id"], "cid": seed_contact["id"]},
)
assert result.scalar() == 0
# Re-insert for other tests
await db_session.execute(
text("INSERT INTO contact_lists (contact_id, list_id, role) VALUES (:cid, :lid, 'contributor') ON CONFLICT DO NOTHING"),
{"cid": seed_contact["id"], "lid": seed_list["id"]},
)
await db_session.commit()
# ===========================================================================
# History Page
# ===========================================================================
class TestHistoryPage:
"""Test the change history page."""
@pytest.mark.asyncio
async def test_history_page_loads(self, client: AsyncClient):
"""History page returns 200."""
r = await client.get("/history/")
assert r.status_code == 200
assert "Change History" in r.text
@pytest.mark.asyncio
async def test_history_shows_seed_entities(
self, client: AsyncClient, db_session: AsyncSession, seed_task: dict,
):
"""History page shows recently modified seed entities."""
# Ensure seed task is not soft-deleted (earlier tests may delete it)
await db_session.execute(
text("UPDATE tasks SET is_deleted = false, deleted_at = NULL WHERE id = :id"),
{"id": seed_task["id"]},
)
await db_session.commit()
r = await client.get("/history/")
assert r.status_code == 200
assert seed_task["title"] in r.text
@pytest.mark.asyncio
async def test_history_entity_type_filter(self, client: AsyncClient):
"""History page can filter by entity type."""
for entity_type in ("tasks", "notes", "projects", "contacts"):
r = await client.get(f"/history/?entity_type={entity_type}")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_history_shows_modified_vs_created(
self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict,
):
"""Items with updated_at different from created_at show as 'modified'."""
tag = _uid()
note_id = str(uuid.uuid4())
await db_session.execute(
text("INSERT INTO notes (id, title, domain_id, body, content_format, is_deleted, created_at, updated_at) "
"VALUES (:id, :title, :did, 'body', 'markdown', false, now() - interval '1 hour', now())"),
{"id": note_id, "title": f"HistMod-{tag}", "did": seed_domain["id"]},
)
await db_session.commit()
r = await client.get("/history/?entity_type=notes")
assert f"HistMod-{tag}" in r.text
assert "modified" in r.text
await db_session.execute(text("DELETE FROM notes WHERE id = :id"), {"id": note_id})
await db_session.commit()
# ===========================================================================
# Search Prefix Matching
# ===========================================================================
class TestSearchPrefixMatching:
"""Test the improved search with prefix tsquery and ILIKE fallback."""
@pytest.mark.asyncio
async def test_search_api_short_query_rejected(self, client: AsyncClient):
"""Query shorter than 2 chars returns empty results."""
r = await client.get("/search/api?q=a")
assert r.status_code == 200
data = r.json()
assert len(data["results"]) == 0
@pytest.mark.asyncio
async def test_search_api_result_structure(self, client: AsyncClient):
"""Search results have expected fields."""
r = await client.get("/search/api?q=Test")
assert r.status_code == 200
data = r.json()
if data["results"]:
result = data["results"][0]
assert "type" in result
assert "id" in result
assert "name" in result
assert "url" in result
assert "icon" in result
assert "rank" in result
@pytest.mark.asyncio
async def test_search_page_returns_200(self, client: AsyncClient):
"""Full search page works."""
r = await client.get("/search/?q=Test")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_search_ilike_fallback(
self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict,
):
"""ILIKE fallback finds items that tsvector might miss."""
tag = _uid()
# Create a note with a very specific title
note_id = str(uuid.uuid4())
await db_session.execute(
text("INSERT INTO notes (id, title, domain_id, body, content_format, is_deleted, created_at, updated_at) "
"VALUES (:id, :title, :did, 'body', 'markdown', false, now(), now())"),
{"id": note_id, "title": f"XqzNote{tag}", "did": seed_domain["id"]},
)
await db_session.commit()
r = await client.get(f"/search/api?q=XqzNote{tag}&entity_type=notes")
data = r.json()
ids = [res["id"] for res in data["results"]]
assert note_id in ids, "ILIKE fallback should find exact substring match"
await db_session.execute(text("DELETE FROM notes WHERE id = :id"), {"id": note_id})
await db_session.commit()
# ===========================================================================
# Projects API (Dynamic Filtering)
# ===========================================================================
class TestProjectsAPI:
"""Test the /projects/api/by-domain JSON endpoint."""
@pytest.mark.asyncio
async def test_projects_by_domain_returns_json(
self, client: AsyncClient, db_session: AsyncSession, seed_domain: dict,
):
"""API returns JSON list of projects for a domain."""
# Ensure seed project is not soft-deleted (earlier tests may delete it)
await db_session.execute(
text("UPDATE projects SET is_deleted = false, deleted_at = NULL WHERE id = :id AND is_deleted = true"),
{"id": "a0000000-0000-0000-0000-000000000003"},
)
await db_session.commit()
r = await client.get(f"/projects/api/by-domain?domain_id={seed_domain['id']}")
assert r.status_code == 200
data = r.json()
assert isinstance(data, list)
assert any(p["name"] == "Test Project" for p in data)
@pytest.mark.asyncio
async def test_projects_by_domain_empty(self, client: AsyncClient):
"""API returns empty list for nonexistent domain."""
fake_id = str(uuid.uuid4())
r = await client.get(f"/projects/api/by-domain?domain_id={fake_id}")
assert r.status_code == 200
data = r.json()
assert data == []
@pytest.mark.asyncio
async def test_projects_by_domain_no_param(self, client: AsyncClient):
"""API without domain_id returns all active projects."""
r = await client.get("/projects/api/by-domain")
assert r.status_code == 200
data = r.json()
assert isinstance(data, list)
# ===========================================================================
# Eisenhower Matrix Filters
# ===========================================================================
class TestEisenhowerFilters:
"""Test Eisenhower matrix page with filter parameters."""
@pytest.mark.asyncio
async def test_eisenhower_page_loads(self, client: AsyncClient):
"""Eisenhower page returns 200."""
r = await client.get("/eisenhower/")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_eisenhower_filter_by_domain(
self, client: AsyncClient, seed_domain: dict,
):
"""Eisenhower page accepts domain_id filter."""
r = await client.get(f"/eisenhower/?domain_id={seed_domain['id']}")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_eisenhower_filter_by_status(self, client: AsyncClient):
"""Eisenhower page accepts status filter."""
r = await client.get("/eisenhower/?status=open")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_eisenhower_combined_filters(
self, client: AsyncClient, seed_domain: dict, seed_project: dict,
):
"""Eisenhower page accepts multiple filters."""
r = await client.get(
f"/eisenhower/?domain_id={seed_domain['id']}&project_id={seed_project['id']}&status=open"
)
assert r.status_code == 200
# ===========================================================================
# Focus Filters
# ===========================================================================
class TestFocusFilters:
"""Test focus page filter parameters."""
@pytest.mark.asyncio
async def test_focus_filter_by_domain(
self, client: AsyncClient, seed_domain: dict,
):
"""Focus page accepts domain_id filter for available tasks."""
r = await client.get(f"/focus/?domain_id={seed_domain['id']}")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_focus_filter_by_project(
self, client: AsyncClient, seed_project: dict,
):
"""Focus page accepts project_id filter."""
r = await client.get(f"/focus/?project_id={seed_project['id']}")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_focus_combined_filters(
self, client: AsyncClient, seed_domain: dict, seed_project: dict,
):
"""Focus page accepts multiple filters."""
r = await client.get(
f"/focus/?domain_id={seed_domain['id']}&project_id={seed_project['id']}"
)
assert r.status_code == 200
# ===========================================================================
# Entity Create with task_id/meeting_id
# ===========================================================================
class TestEntityCreateWithParentContext:
"""Test creating entities with task_id or meeting_id context."""
@pytest.mark.asyncio
async def test_create_note_with_task_id(
self, client: AsyncClient, db_session: AsyncSession,
seed_task: dict, seed_domain: dict,
):
"""Creating a note with task_id sets the FK and redirects to task tab."""
tag = _uid()
r = await client.post("/notes/create", data={
"title": f"TaskNote-{tag}",
"domain_id": seed_domain["id"],
"body": "Test body",
"content_format": "markdown",
"task_id": seed_task["id"],
}, follow_redirects=False)
assert r.status_code == 303
# Should redirect back to task's notes tab
location = r.headers.get("location", "")
assert "tab=notes" in location
result = await db_session.execute(
text("SELECT task_id FROM notes WHERE title = :t AND is_deleted = false"),
{"t": f"TaskNote-{tag}"},
)
row = result.first()
assert row is not None
assert str(row.task_id) == seed_task["id"]
# Cleanup
await db_session.execute(
text("DELETE FROM notes WHERE title = :t"), {"t": f"TaskNote-{tag}"}
)
await db_session.commit()
@pytest.mark.asyncio
async def test_create_note_form_with_task_prefill(
self, client: AsyncClient, seed_task: dict,
):
"""Note create form with task_id query param loads correctly."""
r = await client.get(f"/notes/create?task_id={seed_task['id']}")
assert r.status_code == 200
assert seed_task["id"] in r.text
@pytest.mark.asyncio
async def test_create_weblink_with_task_id(
self, client: AsyncClient, db_session: AsyncSession, seed_task: dict,
):
"""Creating a weblink with task_id sets the FK."""
tag = _uid()
r = await client.post("/weblinks/create", data={
"label": f"TaskWeblink-{tag}",
"url": "https://example.com/test",
"task_id": seed_task["id"],
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT task_id FROM weblinks WHERE label = :l AND is_deleted = false"),
{"l": f"TaskWeblink-{tag}"},
)
row = result.first()
assert row is not None
assert str(row.task_id) == seed_task["id"]
await db_session.execute(
text("DELETE FROM weblinks WHERE label = :l"), {"l": f"TaskWeblink-{tag}"}
)
await db_session.commit()
@pytest.mark.asyncio
async def test_create_list_with_task_id(
self, client: AsyncClient, db_session: AsyncSession,
seed_task: dict, seed_domain: dict,
):
"""Creating a list with task_id sets the FK."""
tag = _uid()
r = await client.post("/lists/create", data={
"name": f"TaskList-{tag}",
"domain_id": seed_domain["id"],
"list_type": "checklist",
"task_id": seed_task["id"],
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT task_id FROM lists WHERE name = :n AND is_deleted = false"),
{"n": f"TaskList-{tag}"},
)
row = result.first()
assert row is not None
assert str(row.task_id) == seed_task["id"]
await db_session.execute(
text("DELETE FROM lists WHERE name = :n"), {"n": f"TaskList-{tag}"}
)
await db_session.commit()
@pytest.mark.asyncio
async def test_create_decision_with_task_id(
self, client: AsyncClient, db_session: AsyncSession, seed_task: dict,
):
"""Creating a decision with task_id sets the FK."""
tag = _uid()
r = await client.post("/decisions/create", data={
"title": f"TaskDecision-{tag}",
"status": "proposed",
"impact": "medium",
"task_id": seed_task["id"],
}, follow_redirects=False)
assert r.status_code == 303
result = await db_session.execute(
text("SELECT task_id FROM decisions WHERE title = :t AND is_deleted = false"),
{"t": f"TaskDecision-{tag}"},
)
row = result.first()
assert row is not None
assert str(row.task_id) == seed_task["id"]
await db_session.execute(
text("DELETE FROM decisions WHERE title = :t"), {"t": f"TaskDecision-{tag}"}
)
await db_session.commit()