#!/bin/bash set -e echo "=== Life OS Test Suite Fix ===" echo "Fixes: event loop conflicts, seed data approach, search/api 422" echo "" # ────────────────────────────────────────────────────────────── # Step 1: Install psycopg2-binary for sync seed data # ────────────────────────────────────────────────────────────── echo "[1/6] Installing psycopg2-binary..." docker exec lifeos-dev pip install psycopg2-binary --break-system-packages -q 2>/dev/null echo " Done" # ────────────────────────────────────────────────────────────── # Step 1b: Update pytest.ini for session-scoped loop # ────────────────────────────────────────────────────────────── echo " Updating pytest.ini..." cat > /opt/lifeos/dev/pytest.ini << 'PYTESTINI_EOF' [pytest] asyncio_mode = auto asyncio_default_fixture_loop_scope = session testpaths = tests addopts = -v --tb=short PYTESTINI_EOF # ────────────────────────────────────────────────────────────── # Step 2: Probe the app to verify engine location # ────────────────────────────────────────────────────────────── echo "[2/6] Probing app structure..." docker exec lifeos-dev python3 -c " import os os.environ['DATABASE_URL'] = 'postgresql+asyncpg://postgres:UCTOQDZiUhN8U@lifeos-db:5432/lifeos_test' from core.database import engine print(f' Engine found: {engine.url}') print(f' Engine class: {type(engine).__name__}') " 2>&1 || { echo "ERROR: Could not find engine in core.database"; exit 1; } # ────────────────────────────────────────────────────────────── # Step 3: Probe actual table columns for seed data accuracy # ────────────────────────────────────────────────────────────── echo "[3/6] Probing table columns for seed data..." SEED_COLUMNS=$(docker exec lifeos-db psql -U postgres -d lifeos_dev -t -A -c " SELECT table_name, string_agg(column_name || ':' || is_nullable || ':' || data_type, ',') FROM information_schema.columns WHERE table_schema='public' AND table_name IN ('domains','areas','projects','tasks','contacts','notes','meetings','decisions','appointments','weblink_folders','lists','links','weblinks','capture_inbox','daily_focus','time_entries','files') GROUP BY table_name ORDER BY table_name; ") echo " Columns retrieved for seed tables" # Verify critical table names exist echo " Verifying table names..." docker exec lifeos-db psql -U postgres -d lifeos_test -t -A -c " SELECT tablename FROM pg_tables WHERE schemaname='public' AND tablename IN ('domains','areas','projects','tasks','contacts','notes','meetings','decisions','appointments','weblink_folders','lists','links','weblinks','capture_inbox','daily_focus','time_entries','files') ORDER BY tablename; " # Check if capture table is 'capture' or 'capture_inbox' CAPTURE_TABLE=$(docker exec lifeos-db psql -U postgres -d lifeos_test -t -A -c " SELECT tablename FROM pg_tables WHERE schemaname='public' AND tablename LIKE 'capture%' LIMIT 1; ") echo " Capture table name: $CAPTURE_TABLE" # Check daily_focus vs daily_focus FOCUS_TABLE=$(docker exec lifeos-db psql -U postgres -d lifeos_test -t -A -c " SELECT tablename FROM pg_tables WHERE schemaname='public' AND tablename LIKE '%focus%' LIMIT 1; ") echo " Focus table name: $FOCUS_TABLE" # ────────────────────────────────────────────────────────────── # Step 4: Write fixed registry.py # ────────────────────────────────────────────────────────────── echo "[4/6] Writing fixed registry.py..." cat > /opt/lifeos/dev/tests/registry.py << 'REGISTRY_EOF' """ Route registry - imports app, runs introspection once, exposes route data. Disposes the async engine after introspection to avoid event loop conflicts. """ import os import asyncio # Point the app at the test database BEFORE importing os.environ["DATABASE_URL"] = "postgresql+asyncpg://postgres:UCTOQDZiUhN8U@lifeos-db:5432/lifeos_test" from main import app from tests.introspect import introspect_routes, classify_route # Build route registry from live app ROUTE_REGISTRY = introspect_routes(app) # Classify routes into buckets for parametrized tests GET_NO_PARAMS = [r for r in ROUTE_REGISTRY if r.method == "GET" and not r.path_params] GET_WITH_PARAMS = [r for r in ROUTE_REGISTRY if r.method == "GET" and r.path_params] POST_CREATE = [r for r in ROUTE_REGISTRY if r.method == "POST" and r.kind == "create"] POST_EDIT = [r for r in ROUTE_REGISTRY if r.method == "POST" and r.kind == "edit"] POST_DELETE = [r for r in ROUTE_REGISTRY if r.method == "POST" and r.kind == "delete"] POST_ACTION = [r for r in ROUTE_REGISTRY if r.method == "POST" and r.kind in ("action", "toggle")] # Map route prefixes to seed fixture keys PREFIX_TO_SEED = { "/domains": "domain", "/areas": "area", "/projects": "project", "/tasks": "task", "/notes": "note", "/links": "link", "/contacts": "contact", "/lists": "list", "/meetings": "meeting", "/decisions": "decision", "/weblinks": "weblink", "/weblinks/folders": "weblink_folder", "/appointments": "appointment", "/focus": "focus", "/capture": "capture", "/time": "task", "/files": None, "/admin/trash": None, } def resolve_path(path_template, seeds): """Replace {id} placeholders with real seed UUIDs.""" import re result = path_template for param in re.findall(r"\{(\w+)\}", path_template): # Find prefix for this route for prefix, seed_key in sorted(PREFIX_TO_SEED.items(), key=lambda x: -len(x[0])): if path_template.startswith(prefix) and seed_key and seed_key in seeds: result = result.replace(f"{{{param}}}", str(seeds[seed_key])) break return result # CRITICAL: Dispose the async engine created at import time. # It was bound to whatever event loop existed during collection. # When tests run, pytest-asyncio creates a NEW event loop. # The engine will lazily recreate its connection pool on that new loop. try: from core.database import engine loop = asyncio.new_event_loop() loop.run_until_complete(engine.dispose()) loop.close() except Exception: pass # If disposal fails, tests will still try to proceed REGISTRY_EOF echo " registry.py written" # ────────────────────────────────────────────────────────────── # Step 5: Write fixed conftest.py # ────────────────────────────────────────────────────────────── echo "[5/6] Writing fixed conftest.py..." cat > /opt/lifeos/dev/tests/conftest.py << 'CONFTEST_EOF' """ Test fixtures - uses psycopg2 (sync) for seed data to avoid event loop conflicts. Seeds are session-scoped and committed so the app can see them via its own engine. """ import asyncio import uuid import pytest import psycopg2 from httpx import AsyncClient, ASGITransport from tests.registry import app TEST_DB_DSN = "host=lifeos-db port=5432 dbname=lifeos_test user=postgres password=UCTOQDZiUhN8U" # ── Fixed seed UUIDs (stable across test runs) ────────────── SEED_IDS = { "domain": "a0000000-0000-0000-0000-000000000001", "area": "a0000000-0000-0000-0000-000000000002", "project": "a0000000-0000-0000-0000-000000000003", "task": "a0000000-0000-0000-0000-000000000004", "contact": "a0000000-0000-0000-0000-000000000005", "note": "a0000000-0000-0000-0000-000000000006", "meeting": "a0000000-0000-0000-0000-000000000007", "decision": "a0000000-0000-0000-0000-000000000008", "appointment": "a0000000-0000-0000-0000-000000000009", "weblink_folder": "a0000000-0000-0000-0000-00000000000a", "list": "a0000000-0000-0000-0000-00000000000b", "link": "a0000000-0000-0000-0000-00000000000c", "weblink": "a0000000-0000-0000-0000-00000000000d", "capture": "a0000000-0000-0000-0000-00000000000e", "focus": "a0000000-0000-0000-0000-00000000000f", } # ── Session-scoped event loop ─────────────────────────────── # All async tests share one loop so the app's engine pool stays valid. @pytest.fixture(scope="session") def event_loop(): loop = asyncio.new_event_loop() yield loop loop.close() # ── Sync DB connection for seed management ────────────────── @pytest.fixture(scope="session") def sync_conn(): conn = psycopg2.connect(TEST_DB_DSN) conn.autocommit = False yield conn conn.close() # ── Seed data (session-scoped, committed) ─────────────────── @pytest.fixture(scope="session") def all_seeds(sync_conn): """Insert all seed data once. Committed so the app's engine can see it.""" cur = sync_conn.cursor() d = SEED_IDS try: # Domain cur.execute(""" INSERT INTO domains (id, name, color, description, sort_order, is_deleted, created_at, updated_at) VALUES (%s, 'Test Domain', '#FF5733', 'Auto test domain', 0, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["domain"],)) # Area cur.execute(""" INSERT INTO areas (id, name, domain_id, description, status, sort_order, is_deleted, created_at, updated_at) VALUES (%s, 'Test Area', %s, 'Auto test area', 'active', 0, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["area"], d["domain"])) # Project cur.execute(""" INSERT INTO projects (id, name, domain_id, area_id, description, status, priority, sort_order, is_deleted, created_at, updated_at) VALUES (%s, 'Test Project', %s, %s, 'Auto test project', 'active', 2, 0, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["project"], d["domain"], d["area"])) # Task cur.execute(""" INSERT INTO tasks (id, title, domain_id, project_id, description, priority, status, sort_order, is_deleted, created_at, updated_at) VALUES (%s, 'Test Task', %s, %s, 'Auto test task', 2, 'todo', 0, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["task"], d["domain"], d["project"])) # Contact cur.execute(""" INSERT INTO contacts (id, first_name, last_name, company, email, is_deleted, created_at, updated_at) VALUES (%s, 'Test', 'Contact', 'TestCorp', 'test@example.com', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["contact"],)) # Note cur.execute(""" INSERT INTO notes (id, title, domain_id, body, content_format, is_deleted, created_at, updated_at) VALUES (%s, 'Test Note', %s, 'Test body content', 'markdown', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["note"], d["domain"])) # Meeting cur.execute(""" INSERT INTO meetings (id, title, meeting_date, status, is_deleted, created_at, updated_at) VALUES (%s, 'Test Meeting', '2025-06-15', 'scheduled', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["meeting"],)) # Decision cur.execute(""" INSERT INTO decisions (id, title, status, impact, is_deleted, created_at, updated_at) VALUES (%s, 'Test Decision', 'decided', 'high', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["decision"],)) # Appointment cur.execute(""" INSERT INTO appointments (id, title, start_at, end_at, all_day, is_deleted, created_at, updated_at) VALUES (%s, 'Test Appointment', '2025-06-15 10:00:00', '2025-06-15 11:00:00', false, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["appointment"],)) # Weblink folder cur.execute(""" INSERT INTO weblink_folders (id, name, is_deleted, created_at, updated_at) VALUES (%s, 'Test Folder', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["weblink_folder"],)) # List cur.execute(""" INSERT INTO lists (id, name, domain_id, project_id, list_type, is_deleted, created_at, updated_at) VALUES (%s, 'Test List', %s, %s, 'checklist', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["list"], d["domain"], d["project"])) # Link cur.execute(""" INSERT INTO links (id, label, url, domain_id, is_deleted, created_at, updated_at) VALUES (%s, 'Test Link', 'https://example.com', %s, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["link"], d["domain"])) # Weblink cur.execute(""" INSERT INTO weblinks (id, label, url, folder_id, is_deleted, created_at, updated_at) VALUES (%s, 'Test Weblink', 'https://example.com/wl', %s, false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["weblink"], d["weblink_folder"])) # Capture cur.execute(""" INSERT INTO capture_inbox (id, raw_text, status, is_deleted, created_at, updated_at) VALUES (%s, 'Test capture item', 'pending', false, now(), now()) ON CONFLICT (id) DO NOTHING """, (d["capture"],)) # Daily focus cur.execute(""" INSERT INTO daily_focus (id, task_id, focus_date, is_completed, created_at) VALUES (%s, %s, CURRENT_DATE, false, now()) ON CONFLICT (id) DO NOTHING """, (d["focus"], d["task"])) sync_conn.commit() except Exception as e: sync_conn.rollback() raise RuntimeError(f"Seed data insertion failed: {e}") from e yield d # Cleanup: delete all seed data (reverse dependency order) try: cur.execute("DELETE FROM daily_focus WHERE id = %s", (d["focus"],)) cur.execute("DELETE FROM capture_inbox WHERE id = %s", (d["capture"],)) cur.execute("DELETE FROM weblinks WHERE id = %s", (d["weblink"],)) cur.execute("DELETE FROM links WHERE id = %s", (d["link"],)) cur.execute("DELETE FROM lists WHERE id = %s", (d["list"],)) cur.execute("DELETE FROM weblink_folders WHERE id = %s", (d["weblink_folder"],)) cur.execute("DELETE FROM appointments WHERE id = %s", (d["appointment"],)) cur.execute("DELETE FROM decisions WHERE id = %s", (d["decision"],)) cur.execute("DELETE FROM meetings WHERE id = %s", (d["meeting"],)) cur.execute("DELETE FROM notes WHERE id = %s", (d["note"],)) cur.execute("DELETE FROM contacts WHERE id = %s", (d["contact"],)) cur.execute("DELETE FROM tasks WHERE id = %s", (d["task"],)) cur.execute("DELETE FROM projects WHERE id = %s", (d["project"],)) cur.execute("DELETE FROM areas WHERE id = %s", (d["area"],)) cur.execute("DELETE FROM domains WHERE id = %s", (d["domain"],)) sync_conn.commit() except Exception: sync_conn.rollback() finally: cur.close() # ── HTTP client (function-scoped) ─────────────────────────── @pytest.fixture async def client(): transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as c: yield c CONFTEST_EOF echo " conftest.py written" # ────────────────────────────────────────────────────────────── # Step 6: Write fixed test_smoke_dynamic.py # ────────────────────────────────────────────────────────────── echo "[6/6] Writing fixed test_smoke_dynamic.py..." cat > /opt/lifeos/dev/tests/test_smoke_dynamic.py << 'SMOKE_EOF' """ Dynamic smoke tests - auto-parametrized from introspected routes. Tests that all GET endpoints return 200 (or acceptable alternatives). """ import pytest from tests.registry import ( GET_NO_PARAMS, GET_WITH_PARAMS, resolve_path, PREFIX_TO_SEED, ROUTE_REGISTRY, ) # Routes that require query params to avoid 422 ROUTES_NEEDING_QUERY = { "/search/api": "?q=test", } # Routes that may legitimately return non-200 without full context ACCEPTABLE_CODES = {200, 307, 308} # ── Test 1: All GET endpoints without path params ─────────── @pytest.mark.parametrize( "path", [r.path for r in GET_NO_PARAMS], ids=[f"GET {r.path}" for r in GET_NO_PARAMS], ) async def test_get_no_params_returns_200(client, path): """Every GET endpoint without path params should return 200.""" url = path # Append required query params if needed for route_prefix, query_string in ROUTES_NEEDING_QUERY.items(): if path == route_prefix or path.rstrip("/") == route_prefix.rstrip("/"): url = path + query_string break r = await client.get(url, follow_redirects=True) assert r.status_code in ACCEPTABLE_CODES or r.status_code == 200, \ f"GET {url} returned {r.status_code}" # ── Test 2: All GET endpoints with valid seed IDs ─────────── @pytest.mark.parametrize( "path_template", [r.path for r in GET_WITH_PARAMS], ids=[f"GET {r.path}" for r in GET_WITH_PARAMS], ) async def test_get_with_valid_id_returns_200(client, all_seeds, path_template): """GET endpoints with a valid seed UUID should return 200.""" path = resolve_path(path_template, all_seeds) # Skip if we couldn't resolve (no seed for this prefix) if "{" in path: pytest.skip(f"No seed mapping for {path_template}") r = await client.get(path, follow_redirects=True) assert r.status_code == 200, f"GET {path} returned {r.status_code}" # ── Test 3: GET with fake UUID returns 404 ────────────────── FAKE_UUID = "00000000-0000-0000-0000-000000000000" # Build fake-ID test cases from routes that have path params _fake_id_cases = [] for r in GET_WITH_PARAMS: import re fake_path = re.sub(r"\{[^}]+\}", FAKE_UUID, r.path) _fake_id_cases.append((fake_path, r.path)) @pytest.mark.parametrize( "path,template", _fake_id_cases if _fake_id_cases else [pytest.param("", "", marks=pytest.mark.skip)], ids=[f"404 {c[1]}" for c in _fake_id_cases] if _fake_id_cases else ["NOTSET"], ) async def test_get_with_fake_id_returns_404(client, path, template): """GET endpoints with a nonexistent UUID should return 404.""" r = await client.get(path, follow_redirects=True) assert r.status_code in (404, 302, 303), \ f"GET {path} returned {r.status_code}, expected 404 or redirect" SMOKE_EOF echo " test_smoke_dynamic.py written" # ────────────────────────────────────────────────────────────── # Verify deployment # ────────────────────────────────────────────────────────────── echo "" echo "=== Fix deployed ===" echo "" echo "Changes made:" echo " 1. Installed psycopg2-binary (sync DB driver for seed data)" echo " 2. registry.py: disposes async engine after introspection" echo " 3. conftest.py: session-scoped event loop, psycopg2 seeds, fixed UUIDs" echo " 4. test_smoke_dynamic.py: handles /search/api query param, better assertions" echo "" echo "Now run:" echo " docker exec lifeos-dev bash /app/tests/run_tests.sh smoke" echo "" echo "If smoke tests pass, run:" echo " docker exec lifeos-dev bash /app/tests/run_tests.sh crud" echo " docker exec lifeos-dev bash /app/tests/run_tests.sh logic" echo " docker exec lifeos-dev bash /app/tests/run_tests.sh # full suite"