""" Test fixtures - uses psycopg2 (sync) for seed data to avoid event loop conflicts. Seeds are session-scoped and committed so the app can see them via its own engine. """ import asyncio import uuid import pytest import psycopg2 from httpx import AsyncClient, ASGITransport from tests.registry import app TEST_DB_DSN = "host=lifeos-db port=5432 dbname=lifeos_test user=postgres password=UCTOQDZiUhN8U" # ── Fixed seed UUIDs (stable across test runs) ────────────── SEED_IDS = { "domain": "a0000000-0000-0000-0000-000000000001", "area": "a0000000-0000-0000-0000-000000000002", "project": "a0000000-0000-0000-0000-000000000003", "task": "a0000000-0000-0000-0000-000000000004", "contact": "a0000000-0000-0000-0000-000000000005", "note": "a0000000-0000-0000-0000-000000000006", "meeting": "a0000000-0000-0000-0000-000000000007", "decision": "a0000000-0000-0000-0000-000000000008", "appointment": "a0000000-0000-0000-0000-000000000009", "weblink_folder": "a0000000-0000-0000-0000-00000000000a", "list": "a0000000-0000-0000-0000-00000000000b", "link": "a0000000-0000-0000-0000-00000000000c", "weblink": "a0000000-0000-0000-0000-00000000000d", "capture": "a0000000-0000-0000-0000-00000000000e", "focus": "a0000000-0000-0000-0000-00000000000f", } # ── Reinitialize the async engine within the test event loop ── @pytest.fixture(scope="session", autouse=True) async def _reinit_engine(): """ Replace the engine created at import time with a fresh one created within the test event loop. This ensures all connections use the right loop. """ from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession from core import database # Dispose the import-time engine (might have stale loop references) await database.engine.dispose() # Create a brand new engine on the current (test) event loop new_engine = create_async_engine( database.DATABASE_URL, echo=False, pool_size=5, max_overflow=10, pool_pre_ping=True, ) new_session_factory = async_sessionmaker( new_engine, class_=AsyncSession, expire_on_commit=False, ) # Patch the module so all app code uses the new engine database.engine = new_engine database.async_session_factory = new_session_factory yield await new_engine.dispose() # ── Sync DB connection for seed management ────────────────── @pytest.fixture(scope="session") def sync_conn(): conn = psycopg2.connect(TEST_DB_DSN) conn.autocommit = False yield conn conn.close() # ── Seed data (session-scoped, committed) ─────────────────── @pytest.fixture(scope="session") def all_seeds(sync_conn): """Insert all seed data once. Committed so the app's engine can see it.""" cur = sync_conn.cursor() d = SEED_IDS try: # Domain cur.execute(""" INSERT INTO domains (id, name, color, description, sort_order, is_deleted, created_at, updated_at) VALUES (%s, 'Test Domain', '#FF5733', 'Auto test domain', 0, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["domain"],)) # Area cur.execute(""" INSERT INTO areas (id, name, domain_id, description, status, sort_order, is_deleted, created_at, updated_at) VALUES (%s, 'Test Area', %s, 'Auto test area', 'active', 0, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["area"], d["domain"])) # Project cur.execute(""" INSERT INTO projects (id, name, domain_id, area_id, description, status, priority, sort_order, is_deleted, created_at, updated_at) VALUES (%s, 'Test Project', %s, %s, 'Auto test project', 'active', 2, 0, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["project"], d["domain"], d["area"])) # Task (status='open' matches DB default, not 'todo') cur.execute(""" INSERT INTO tasks (id, title, domain_id, project_id, description, priority, status, sort_order, is_deleted, created_at, updated_at) VALUES (%s, 'Test Task', %s, %s, 'Auto test task', 2, 'open', 0, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["task"], d["domain"], d["project"])) # Contact cur.execute(""" INSERT INTO contacts (id, first_name, last_name, company, email, is_deleted, created_at, updated_at) VALUES (%s, 'Test', 'Contact', 'TestCorp', 'test@example.com', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["contact"],)) # Note cur.execute(""" INSERT INTO notes (id, title, domain_id, body, content_format, is_deleted, created_at, updated_at) VALUES (%s, 'Test Note', %s, 'Test body content', 'markdown', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["note"], d["domain"])) # Meeting cur.execute(""" INSERT INTO meetings (id, title, meeting_date, status, is_deleted, created_at, updated_at) VALUES (%s, 'Test Meeting', '2025-06-15', 'scheduled', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["meeting"],)) # Decision cur.execute(""" INSERT INTO decisions (id, title, status, impact, is_deleted, created_at, updated_at) VALUES (%s, 'Test Decision', 'decided', 'high', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["decision"],)) # Appointment cur.execute(""" INSERT INTO appointments (id, title, start_at, end_at, all_day, is_deleted, created_at, updated_at) VALUES (%s, 'Test Appointment', '2025-06-15 10:00:00', '2025-06-15 11:00:00', false, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["appointment"],)) # Weblink folder cur.execute(""" INSERT INTO weblink_folders (id, name, is_deleted, created_at, updated_at) VALUES (%s, 'Test Folder', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["weblink_folder"],)) # List cur.execute(""" INSERT INTO lists (id, name, domain_id, project_id, list_type, is_deleted, created_at, updated_at) VALUES (%s, 'Test List', %s, %s, 'checklist', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["list"], d["domain"], d["project"])) # Link cur.execute(""" INSERT INTO links (id, label, url, domain_id, is_deleted, created_at, updated_at) VALUES (%s, 'Test Link', 'https://example.com', %s, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["link"], d["domain"])) # Weblink cur.execute(""" INSERT INTO weblinks (id, label, url, is_deleted, created_at, updated_at) VALUES (%s, 'Test Weblink', 'https://example.com/wl', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["weblink"],)) # Link weblink to folder via junction table cur.execute(""" INSERT INTO folder_weblinks (folder_id, weblink_id) VALUES (%s, %s) ON CONFLICT DO NOTHING """, (d["weblink_folder"], d["weblink"])) # Capture cur.execute(""" INSERT INTO capture (id, raw_text, processed, is_deleted, created_at, updated_at) VALUES (%s, 'Test capture item', false, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["capture"],)) # Daily focus cur.execute(""" INSERT INTO daily_focus (id, task_id, focus_date, completed, created_at, updated_at) VALUES (%s, %s, CURRENT_DATE, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["focus"], d["task"])) sync_conn.commit() except Exception as e: sync_conn.rollback() raise RuntimeError(f"Seed data insertion failed: {e}") from e yield d # Cleanup: delete all seed data (reverse dependency order) try: cur.execute("DELETE FROM daily_focus WHERE id = %s", (d["focus"],)) cur.execute("DELETE FROM capture WHERE id = %s", (d["capture"],)) cur.execute("DELETE FROM folder_weblinks WHERE weblink_id = %s", (d["weblink"],)) cur.execute("DELETE FROM weblinks WHERE id = %s", (d["weblink"],)) cur.execute("DELETE FROM links WHERE id = %s", (d["link"],)) cur.execute("DELETE FROM lists WHERE id = %s", (d["list"],)) cur.execute("DELETE FROM weblink_folders WHERE id = %s", (d["weblink_folder"],)) cur.execute("DELETE FROM appointments WHERE id = %s", (d["appointment"],)) cur.execute("DELETE FROM decisions WHERE id = %s", (d["decision"],)) cur.execute("DELETE FROM meetings WHERE id = %s", (d["meeting"],)) cur.execute("DELETE FROM notes WHERE id = %s", (d["note"],)) cur.execute("DELETE FROM contacts WHERE id = %s", (d["contact"],)) cur.execute("DELETE FROM tasks WHERE id = %s", (d["task"],)) cur.execute("DELETE FROM projects WHERE id = %s", (d["project"],)) cur.execute("DELETE FROM areas WHERE id = %s", (d["area"],)) cur.execute("DELETE FROM domains WHERE id = %s", (d["domain"],)) sync_conn.commit() except Exception: sync_conn.rollback() finally: cur.close() # ── HTTP client (function-scoped) ─────────────────────────── @pytest.fixture async def client(): transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as c: yield c # ── Async DB session for business logic tests ─────────────── @pytest.fixture async def db_session(): """Yields an async DB session for direct SQL in tests.""" from core.database import async_session_factory async with async_session_factory() as session: yield session # ── Individual seed entity fixtures (for test_business_logic.py) ── @pytest.fixture(scope="session") def seed_domain(all_seeds): return {"id": all_seeds["domain"], "name": "Test Domain", "color": "#FF5733"} @pytest.fixture(scope="session") def seed_area(all_seeds): return {"id": all_seeds["area"], "name": "Test Area"} @pytest.fixture(scope="session") def seed_project(all_seeds): return {"id": all_seeds["project"], "name": "Test Project"} @pytest.fixture(scope="session") def seed_task(all_seeds): return {"id": all_seeds["task"], "title": "Test Task"} @pytest.fixture(scope="session") def seed_contact(all_seeds): return {"id": all_seeds["contact"], "first_name": "Test", "last_name": "Contact"} @pytest.fixture(scope="session") def seed_note(all_seeds): return {"id": all_seeds["note"], "title": "Test Note"} @pytest.fixture(scope="session") def seed_meeting(all_seeds): return {"id": all_seeds["meeting"], "title": "Test Meeting"}