Commit prior to Claude Code implementation on VM

This commit is contained in:
2026-03-01 14:45:15 +00:00
parent a1d24354a0
commit f7c5ac2d89
14 changed files with 21711 additions and 0 deletions

0
tests/__init__.py Normal file
View File

202
tests/conftest.py Normal file
View File

@@ -0,0 +1,202 @@
"""
Test fixtures - uses psycopg2 (sync) for seed data to avoid event loop conflicts.
Seeds are session-scoped and committed so the app can see them via its own engine.
"""
import asyncio
import uuid
import pytest
import psycopg2
from httpx import AsyncClient, ASGITransport
from tests.registry import app
TEST_DB_DSN = "host=lifeos-db port=5432 dbname=lifeos_test user=postgres password=UCTOQDZiUhN8U"
# ── Fixed seed UUIDs (stable across test runs) ──────────────
SEED_IDS = {
"domain": "a0000000-0000-0000-0000-000000000001",
"area": "a0000000-0000-0000-0000-000000000002",
"project": "a0000000-0000-0000-0000-000000000003",
"task": "a0000000-0000-0000-0000-000000000004",
"contact": "a0000000-0000-0000-0000-000000000005",
"note": "a0000000-0000-0000-0000-000000000006",
"meeting": "a0000000-0000-0000-0000-000000000007",
"decision": "a0000000-0000-0000-0000-000000000008",
"appointment": "a0000000-0000-0000-0000-000000000009",
"weblink_folder": "a0000000-0000-0000-0000-00000000000a",
"list": "a0000000-0000-0000-0000-00000000000b",
"link": "a0000000-0000-0000-0000-00000000000c",
"weblink": "a0000000-0000-0000-0000-00000000000d",
"capture": "a0000000-0000-0000-0000-00000000000e",
"focus": "a0000000-0000-0000-0000-00000000000f",
}
# ── Session-scoped event loop ───────────────────────────────
# All async tests share one loop so the app's engine pool stays valid.
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.new_event_loop()
yield loop
loop.close()
# ── Sync DB connection for seed management ──────────────────
@pytest.fixture(scope="session")
def sync_conn():
conn = psycopg2.connect(TEST_DB_DSN)
conn.autocommit = False
yield conn
conn.close()
# ── Seed data (session-scoped, committed) ───────────────────
@pytest.fixture(scope="session")
def all_seeds(sync_conn):
"""Insert all seed data once. Committed so the app's engine can see it."""
cur = sync_conn.cursor()
d = SEED_IDS
try:
# Domain
cur.execute("""
INSERT INTO domains (id, name, color, description, sort_order, is_deleted, created_at, updated_at)
VALUES (%s, 'Test Domain', '#FF5733', 'Auto test domain', 0, false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["domain"],))
# Area
cur.execute("""
INSERT INTO areas (id, name, domain_id, description, status, sort_order, is_deleted, created_at, updated_at)
VALUES (%s, 'Test Area', %s, 'Auto test area', 'active', 0, false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["area"], d["domain"]))
# Project
cur.execute("""
INSERT INTO projects (id, name, domain_id, area_id, description, status, priority, sort_order, is_deleted, created_at, updated_at)
VALUES (%s, 'Test Project', %s, %s, 'Auto test project', 'active', 2, 0, false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["project"], d["domain"], d["area"]))
# Task
cur.execute("""
INSERT INTO tasks (id, title, domain_id, project_id, description, priority, status, sort_order, is_deleted, created_at, updated_at)
VALUES (%s, 'Test Task', %s, %s, 'Auto test task', 2, 'todo', 0, false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["task"], d["domain"], d["project"]))
# Contact
cur.execute("""
INSERT INTO contacts (id, first_name, last_name, company, email, is_deleted, created_at, updated_at)
VALUES (%s, 'Test', 'Contact', 'TestCorp', 'test@example.com', false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["contact"],))
# Note
cur.execute("""
INSERT INTO notes (id, title, domain_id, body, content_format, is_deleted, created_at, updated_at)
VALUES (%s, 'Test Note', %s, 'Test body content', 'markdown', false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["note"], d["domain"]))
# Meeting
cur.execute("""
INSERT INTO meetings (id, title, meeting_date, status, is_deleted, created_at, updated_at)
VALUES (%s, 'Test Meeting', '2025-06-15', 'scheduled', false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["meeting"],))
# Decision
cur.execute("""
INSERT INTO decisions (id, title, status, impact, is_deleted, created_at, updated_at)
VALUES (%s, 'Test Decision', 'decided', 'high', false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["decision"],))
# Appointment
cur.execute("""
INSERT INTO appointments (id, title, start_at, end_at, all_day, is_deleted, created_at, updated_at)
VALUES (%s, 'Test Appointment', '2025-06-15 10:00:00', '2025-06-15 11:00:00', false, false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["appointment"],))
# Weblink folder
cur.execute("""
INSERT INTO weblink_folders (id, name, is_deleted, created_at, updated_at)
VALUES (%s, 'Test Folder', false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["weblink_folder"],))
# List
cur.execute("""
INSERT INTO lists (id, name, domain_id, project_id, list_type, is_deleted, created_at, updated_at)
VALUES (%s, 'Test List', %s, %s, 'checklist', false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["list"], d["domain"], d["project"]))
# Link
cur.execute("""
INSERT INTO links (id, label, url, domain_id, is_deleted, created_at, updated_at)
VALUES (%s, 'Test Link', 'https://example.com', %s, false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["link"], d["domain"]))
# Weblink
cur.execute("""
INSERT INTO weblinks (id, label, url, folder_id, is_deleted, created_at, updated_at)
VALUES (%s, 'Test Weblink', 'https://example.com/wl', %s, false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["weblink"], d["weblink_folder"]))
# Capture
cur.execute("""
INSERT INTO capture (id, raw_text, status, is_deleted, created_at, updated_at)
VALUES (%s, 'Test capture item', 'pending', false, now(), now())
ON CONFLICT (id) DO NOTHING
""", (d["capture"],))
# Daily focus
cur.execute("""
INSERT INTO daily_focus (id, task_id, focus_date, is_completed, created_at)
VALUES (%s, %s, CURRENT_DATE, false, now())
ON CONFLICT (id) DO NOTHING
""", (d["focus"], d["task"]))
sync_conn.commit()
except Exception as e:
sync_conn.rollback()
raise RuntimeError(f"Seed data insertion failed: {e}") from e
yield d
# Cleanup: delete all seed data (reverse dependency order)
try:
cur.execute("DELETE FROM daily_focus WHERE id = %s", (d["focus"],))
cur.execute("DELETE FROM capture WHERE id = %s", (d["capture"],))
cur.execute("DELETE FROM weblinks WHERE id = %s", (d["weblink"],))
cur.execute("DELETE FROM links WHERE id = %s", (d["link"],))
cur.execute("DELETE FROM lists WHERE id = %s", (d["list"],))
cur.execute("DELETE FROM weblink_folders WHERE id = %s", (d["weblink_folder"],))
cur.execute("DELETE FROM appointments WHERE id = %s", (d["appointment"],))
cur.execute("DELETE FROM decisions WHERE id = %s", (d["decision"],))
cur.execute("DELETE FROM meetings WHERE id = %s", (d["meeting"],))
cur.execute("DELETE FROM notes WHERE id = %s", (d["note"],))
cur.execute("DELETE FROM contacts WHERE id = %s", (d["contact"],))
cur.execute("DELETE FROM tasks WHERE id = %s", (d["task"],))
cur.execute("DELETE FROM projects WHERE id = %s", (d["project"],))
cur.execute("DELETE FROM areas WHERE id = %s", (d["area"],))
cur.execute("DELETE FROM domains WHERE id = %s", (d["domain"],))
sync_conn.commit()
except Exception:
sync_conn.rollback()
finally:
cur.close()
# ── HTTP client (function-scoped) ───────────────────────────
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as c:
yield c

195
tests/form_factory.py Normal file
View File

@@ -0,0 +1,195 @@
"""
Form Data Factory
=================
Generates valid POST form data for any route, using:
1. Introspected Form field names/types from the route
2. Seed data UUIDs for FK references (domain_id, project_id, etc.)
3. Heuristic value generation based on field name patterns
This eliminates hardcoded form data in tests. When a route's
form fields change, tests automatically adapt.
"""
from __future__ import annotations
from datetime import date, datetime, timezone
from typing import Any
from tests.introspect import FormField
# ---------------------------------------------------------------------------
# Field name -> value resolution rules
# ---------------------------------------------------------------------------
# FK fields map to seed fixture keys
FK_FIELD_MAP = {
"domain_id": "domain",
"area_id": "area",
"project_id": "project",
"task_id": "task",
"folder_id": "weblink_folder",
"parent_id": None, # Usually optional, skip
"meeting_id": None,
"contact_id": "contact",
"release_id": None,
"note_id": "note",
"list_id": "list",
}
# Field name pattern -> static test value
NAME_PATTERNS: list[tuple[str, Any]] = [
# Exact matches first
("title", "Test Title Auto"),
("name", "Test Name Auto"),
("first_name", "TestFirst"),
("last_name", "TestLast"),
("description", "Auto-generated test description"),
("body", "Auto-generated test body content"),
("raw_text", "Auto capture line 1\nAuto capture line 2"),
("url", "https://test.example.com"),
("email", "autotest@example.com"),
("phone", "555-0199"),
("company", "Test Corp"),
("role", "Tester"),
("color", "#AA55CC"),
("icon", "star"),
("status", "active"),
("priority", "3"),
("content_format", "rich"),
("meeting_date", None), # Resolved dynamically
("start_date", None),
("end_date", None),
("start_time", None),
("end_time", None),
("due_date", None),
("start_at", None),
("end_at", None),
("focus_date", None),
("sort_order", "0"),
("estimated_minutes", "30"),
("energy_required", "medium"),
("context", ""),
("recurrence", ""),
("version_label", "v1.0"),
("agenda", "Test agenda"),
("notes_body", "Test notes"),
("location", "Test Location"),
("tags", ""),
("notes", "Test notes field"),
]
def _resolve_date_field(field_name: str) -> str:
"""Generate appropriate date/time string based on field name."""
now = datetime.now(timezone.utc)
if "time" in field_name and "date" not in field_name:
return now.strftime("%H:%M")
if "date" in field_name:
return date.today().isoformat()
if field_name in ("start_at", "end_at"):
return now.isoformat()
return date.today().isoformat()
def build_form_data(
form_fields: list[FormField],
seed_data: dict[str, dict] | None = None,
) -> dict[str, str]:
"""
Build a valid form data dict for a POST request.
Args:
form_fields: List of FormField from route introspection.
seed_data: Dict mapping entity type to seed fixture dict.
e.g. {"domain": {"id": "abc-123", ...}, "project": {"id": "def-456", ...}}
Returns:
Dict of field_name -> string value, ready for httpx POST data.
"""
seed_data = seed_data or {}
data: dict[str, str] = {}
for field in form_fields:
if field.is_file:
continue # Skip file uploads in form data
value = _resolve_field_value(field, seed_data)
if value is not None:
data[field.name] = str(value)
return data
def _resolve_field_value(
field: FormField,
seed_data: dict[str, dict],
) -> Any | None:
"""Resolve a single field's test value."""
name = field.name
# 1. FK fields -> look up seed data UUID
if name in FK_FIELD_MAP:
entity_type = FK_FIELD_MAP[name]
if entity_type is None:
# Optional FK with no mapping, return None (skip)
return "" if not field.required else None
if entity_type in seed_data and "id" in seed_data[entity_type]:
return seed_data[entity_type]["id"]
# Required FK but no seed data available
return None if not field.required else ""
# 2. Date/time fields
if any(kw in name for kw in ("date", "time", "_at")):
return _resolve_date_field(name)
# 3. Pattern matching on field name
for pattern_name, pattern_value in NAME_PATTERNS:
if name == pattern_name:
if pattern_value is None:
return _resolve_date_field(name)
return pattern_value
# 4. Partial name matching for common patterns
if "title" in name:
return "Test Title Auto"
if "name" in name:
return "Test Name Auto"
if "description" in name or "desc" in name:
return "Auto test description"
if "url" in name:
return "https://test.example.com"
if "email" in name:
return "auto@test.com"
# 5. Type-based fallback
if "int" in field.annotation.lower():
return "0"
if "bool" in field.annotation.lower():
return "false"
# 6. If field has a default, use it
if field.default is not None:
return str(field.default)
# 7. Last resort for required string fields
if field.required:
return f"test_{name}"
return ""
def build_edit_data(
form_fields: list[FormField],
seed_data: dict[str, dict] | None = None,
) -> dict[str, str]:
"""
Build form data for an edit/update request.
Same as build_form_data but prefixes string values with "Updated "
so tests can verify the edit took effect.
"""
data = build_form_data(form_fields, seed_data)
for key, value in data.items():
# Only modify display-name fields, not IDs/dates/status
if key in ("title", "name", "first_name", "last_name", "description", "body"):
data[key] = f"Updated {value}"
return data

357
tests/introspect.py Normal file
View File

@@ -0,0 +1,357 @@
"""
Route Introspection Engine
==========================
Imports the live FastAPI app and extracts a complete route registry:
- All paths, methods, endpoint names
- Path parameters (e.g. {id})
- Form fields with types and defaults (from function signatures)
- Route classification (list, detail, create, edit, delete, etc.)
This is the single source of truth for all dynamic tests.
No hardcoded routes anywhere in the test suite.
"""
from __future__ import annotations
import inspect
import json
import re
from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import Any, Optional
from fastapi import Form, UploadFile
from fastapi.params import Depends
from fastapi.routing import APIRoute
class RouteKind(str, Enum):
"""Classification of route purpose based on path pattern + method."""
LIST = "list" # GET /entities/
DETAIL = "detail" # GET /entities/{id}
CREATE_FORM = "create_form" # GET /entities/create
EDIT_FORM = "edit_form" # GET /entities/{id}/edit
CREATE = "create" # POST /entities/create
EDIT = "edit" # POST /entities/{id}/edit
DELETE = "delete" # POST /entities/{id}/delete
TOGGLE = "toggle" # POST /entities/{id}/toggle
ACTION = "action" # POST with other patterns
JSON_ENDPOINT = "json" # GET returning JSON (e.g. /time/running)
PAGE = "page" # GET catch-all (dashboard, search, etc.)
OTHER = "other"
@dataclass
class FormField:
"""A form field extracted from a route endpoint's signature."""
name: str
annotation: str # str, int, Optional[str], etc.
required: bool
default: Any = None # Default value if not required
is_file: bool = False # True for UploadFile params
@dataclass
class RouteInfo:
"""Complete metadata for a single route."""
path: str
methods: list[str]
endpoint_name: str
kind: RouteKind
path_params: list[str] # e.g. ["id"] from /tasks/{id}
form_fields: list[FormField] # Form() params from signature
query_params: list[str] # Non-Form, non-Depends, non-Request params
has_file_upload: bool = False
prefix: str = "" # Router prefix, e.g. "/tasks"
@property
def needs_seed_data(self) -> bool:
"""True if this route needs an entity ID to test."""
return bool(self.path_params)
@property
def entity_prefix(self) -> str:
"""Extract the entity prefix, e.g. '/tasks' from '/tasks/{id}/edit'."""
if self.prefix:
return self.prefix
parts = self.path.strip("/").split("/")
return f"/{parts[0]}" if parts else ""
def _classify_route(path: str, methods: set[str]) -> RouteKind:
"""Determine route purpose from path pattern and HTTP method."""
is_get = "GET" in methods
is_post = "POST" in methods
if is_get:
if path.endswith("/create"):
return RouteKind.CREATE_FORM
if path.endswith("/edit"):
return RouteKind.EDIT_FORM
if "{" in path:
return RouteKind.DETAIL
# Heuristic: paths ending in known JSON endpoints
if any(path.endswith(p) for p in ["/running"]):
return RouteKind.JSON_ENDPOINT
if re.match(r"^/[^/]+/?$", path) or path == "/":
return RouteKind.LIST if path != "/" else RouteKind.PAGE
return RouteKind.PAGE
if is_post:
if path.endswith("/create"):
return RouteKind.CREATE
if path.endswith("/edit"):
return RouteKind.EDIT
if path.endswith("/delete"):
return RouteKind.DELETE
if path.endswith("/toggle"):
return RouteKind.TOGGLE
return RouteKind.ACTION
return RouteKind.OTHER
def _extract_path_params(path: str) -> list[str]:
"""Pull {param_name} from path template."""
return re.findall(r"\{(\w+)\}", path)
def _extract_form_fields(endpoint) -> list[FormField]:
"""
Inspect the endpoint function signature to find Form() parameters.
Returns list of FormField with name, type, required status, and defaults.
"""
fields = []
try:
sig = inspect.signature(endpoint)
except (ValueError, TypeError):
return fields
for name, param in sig.parameters.items():
# Skip non-form params
if name in ("request", "self", "cls"):
continue
default = param.default
# Check if it's a Depends() - skip those
if isinstance(default, Depends):
continue
# Check for UploadFile annotation
annotation = param.annotation
ann_str = _annotation_to_str(annotation)
if annotation is UploadFile or (hasattr(annotation, "__origin__") and UploadFile in getattr(annotation, "__args__", ())):
fields.append(FormField(
name=name,
annotation=ann_str,
required=True,
is_file=True,
))
continue
# Check if default is a Form() instance
is_form = False
form_default = inspect.Parameter.empty
if hasattr(default, "__class__") and default.__class__.__name__ in ("Form", "FieldInfo"):
is_form = True
# Extract the actual default from the Form() wrapper
if hasattr(default, "default"):
form_default = default.default
if not is_form:
# Could also be Form via Annotated types (FastAPI 0.95+)
if hasattr(annotation, "__metadata__"):
for meta in annotation.__metadata__:
if hasattr(meta, "__class__") and meta.__class__.__name__ in ("Form", "FieldInfo"):
is_form = True
if hasattr(meta, "default"):
form_default = meta.default
break
if is_form:
# Determine if required
required = form_default is inspect.Parameter.empty or form_default is Ellipsis or form_default is None and "Optional" not in ann_str
actual_default = None if form_default in (inspect.Parameter.empty, Ellipsis) else form_default
fields.append(FormField(
name=name,
annotation=ann_str,
required=required,
default=actual_default,
))
return fields
def _extract_query_params(endpoint, path_params: list[str]) -> list[str]:
"""Extract query parameters (non-Form, non-Depends, non-special params)."""
params = []
skip = {"request", "self", "cls", "db"} | set(path_params)
try:
sig = inspect.signature(endpoint)
except (ValueError, TypeError):
return params
for name, param in sig.parameters.items():
if name in skip:
continue
default = param.default
if isinstance(default, Depends):
continue
# If it has a Form() default, skip (handled by form_fields)
if hasattr(default, "__class__") and default.__class__.__name__ in ("Form", "FieldInfo"):
continue
# Check Annotated metadata
annotation = param.annotation
if hasattr(annotation, "__metadata__"):
has_form = any(
hasattr(m, "__class__") and m.__class__.__name__ in ("Form", "FieldInfo")
for m in annotation.__metadata__
)
if has_form:
continue
# Remaining non-special params are likely query params
if param.annotation is not UploadFile:
params.append(name)
return params
def _annotation_to_str(annotation) -> str:
"""Convert a type annotation to readable string."""
if annotation is inspect.Parameter.empty:
return "Any"
if hasattr(annotation, "__name__"):
return annotation.__name__
return str(annotation).replace("typing.", "")
def _extract_prefix(path: str) -> str:
"""Get the router prefix from a path."""
# /tasks/{id}/edit -> /tasks
# /admin/trash/ -> /admin/trash
# /weblinks/folders/create -> /weblinks
parts = path.strip("/").split("/")
if not parts:
return "/"
# Walk until we hit a {param} or known action word
prefix_parts = []
for part in parts:
if part.startswith("{"):
break
if part in ("create", "edit", "delete", "toggle"):
break
prefix_parts.append(part)
return "/" + "/".join(prefix_parts) if prefix_parts else "/"
def introspect_app(app) -> list[RouteInfo]:
"""
Walk all registered routes on the FastAPI app.
Returns a complete RouteInfo list - the single source of truth for tests.
"""
routes = []
for route in app.routes:
if not isinstance(route, APIRoute):
continue
path = route.path
methods = route.methods or set()
endpoint = route.endpoint
endpoint_name = endpoint.__name__ if hasattr(endpoint, "__name__") else str(endpoint)
path_params = _extract_path_params(path)
form_fields = _extract_form_fields(endpoint)
query_params = _extract_query_params(endpoint, path_params)
has_file = any(f.is_file for f in form_fields)
# Classify each method separately if route has both GET and POST
for method in methods:
kind = _classify_route(path, {method})
prefix = _extract_prefix(path)
routes.append(RouteInfo(
path=path,
methods=[method],
endpoint_name=endpoint_name,
kind=kind,
path_params=path_params,
form_fields=form_fields if method == "POST" else [],
query_params=query_params if method == "GET" else [],
has_file_upload=has_file,
prefix=prefix,
))
return routes
def get_route_registry(app) -> dict:
"""
Build a structured registry keyed by route kind.
Convenience wrapper for test parametrization.
"""
all_routes = introspect_app(app)
registry = {
"all": all_routes,
"get_no_params": [r for r in all_routes if "GET" in r.methods and not r.path_params],
"get_with_params": [r for r in all_routes if "GET" in r.methods and r.path_params],
"post_create": [r for r in all_routes if r.kind == RouteKind.CREATE],
"post_edit": [r for r in all_routes if r.kind == RouteKind.EDIT],
"post_delete": [r for r in all_routes if r.kind == RouteKind.DELETE],
"post_action": [r for r in all_routes if r.kind in (RouteKind.ACTION, RouteKind.TOGGLE)],
"lists": [r for r in all_routes if r.kind == RouteKind.LIST],
"details": [r for r in all_routes if r.kind == RouteKind.DETAIL],
"create_forms": [r for r in all_routes if r.kind == RouteKind.CREATE_FORM],
"edit_forms": [r for r in all_routes if r.kind == RouteKind.EDIT_FORM],
}
# Group by prefix for entity-level operations
by_prefix: dict[str, list[RouteInfo]] = {}
for r in all_routes:
by_prefix.setdefault(r.prefix, []).append(r)
registry["by_prefix"] = by_prefix
return registry
def dump_registry_report(app) -> str:
"""
Generate a human-readable report of all discovered routes.
Useful for debugging / verifying introspection output.
"""
routes = introspect_app(app)
lines = [
"=" * 70,
"LIFE OS ROUTE REGISTRY",
f"Total routes discovered: {len(routes)}",
"=" * 70,
"",
]
# Group by prefix
by_prefix: dict[str, list[RouteInfo]] = {}
for r in routes:
by_prefix.setdefault(r.prefix, []).append(r)
for prefix in sorted(by_prefix.keys()):
prefix_routes = by_prefix[prefix]
lines.append(f" {prefix}")
lines.append(f" {'' * 60}")
for r in sorted(prefix_routes, key=lambda x: (x.path, x.methods)):
method = r.methods[0] if r.methods else "?"
form_str = ""
if r.form_fields:
field_names = [f.name + ("*" if f.required else "") for f in r.form_fields]
form_str = f" fields=[{', '.join(field_names)}]"
query_str = ""
if r.query_params:
query_str = f" query=[{', '.join(r.query_params)}]"
lines.append(f" {method:6s} {r.path:40s} {r.kind.value:15s}{form_str}{query_str}")
lines.append("")
return "\n".join(lines)

69
tests/registry.py Normal file
View File

@@ -0,0 +1,69 @@
"""
Route registry - imports app, runs introspection once, exposes route data.
Disposes the async engine after introspection to avoid event loop conflicts.
"""
import os
import asyncio
# Point the app at the test database BEFORE importing
os.environ["DATABASE_URL"] = "postgresql+asyncpg://postgres:UCTOQDZiUhN8U@lifeos-db:5432/lifeos_test"
from main import app
from tests.introspect import introspect_app
# Build route registry from live app
ROUTE_REGISTRY = introspect_app(app)
# Classify routes into buckets for parametrized tests
GET_NO_PARAMS = [r for r in ROUTE_REGISTRY if "GET" in r.methods and not r.path_params]
GET_WITH_PARAMS = [r for r in ROUTE_REGISTRY if "GET" in r.methods and r.path_params]
POST_CREATE = [r for r in ROUTE_REGISTRY if "POST" in r.methods and r.kind == "create"]
POST_EDIT = [r for r in ROUTE_REGISTRY if "POST" in r.methods and r.kind == "edit"]
POST_DELETE = [r for r in ROUTE_REGISTRY if "POST" in r.methods and r.kind == "delete"]
POST_ACTION = [r for r in ROUTE_REGISTRY if "POST" in r.methods and r.kind in ("action", "toggle")]
# Map route prefixes to seed fixture keys
PREFIX_TO_SEED = {
"/domains": "domain",
"/areas": "area",
"/projects": "project",
"/tasks": "task",
"/notes": "note",
"/links": "link",
"/contacts": "contact",
"/lists": "list",
"/meetings": "meeting",
"/decisions": "decision",
"/weblinks": "weblink",
"/weblinks/folders": "weblink_folder",
"/appointments": "appointment",
"/focus": "focus",
"/capture": "capture",
"/time": "task",
"/files": None,
"/admin/trash": None,
}
def resolve_path(path_template, seeds):
"""Replace {id} placeholders with real seed UUIDs."""
import re
result = path_template
for param in re.findall(r"\{(\w+)\}", path_template):
# Find prefix for this route
for prefix, seed_key in sorted(PREFIX_TO_SEED.items(), key=lambda x: -len(x[0])):
if path_template.startswith(prefix) and seed_key and seed_key in seeds:
result = result.replace(f"{{{param}}}", str(seeds[seed_key]))
break
return result
# CRITICAL: Dispose the async engine created at import time.
# It was bound to whatever event loop existed during collection.
# When tests run, pytest-asyncio creates a NEW event loop.
# The engine will lazily recreate its connection pool on that new loop.
try:
from core.database import engine
loop = asyncio.new_event_loop()
loop.run_until_complete(engine.dispose())
loop.close()
except Exception:
pass # If disposal fails, tests will still try to proceed

65
tests/route_report.py Normal file
View File

@@ -0,0 +1,65 @@
"""
Route Registry Report
=====================
Run inside the container to see exactly what the introspection engine
discovers from the live app. Use this to verify before running tests.
Usage:
docker exec lifeos-dev python -m tests.route_report
"""
from __future__ import annotations
import sys
sys.path.insert(0, "/app")
from tests.registry import ALL_ROUTES, ROUTE_REGISTRY, PREFIX_TO_SEED # noqa: E402
from tests.introspect import dump_registry_report, RouteKind # noqa: E402
from main import app # noqa: E402
def main():
print(dump_registry_report(app))
reg = ROUTE_REGISTRY
print("\n" + "=" * 70)
print("SUMMARY")
print("=" * 70)
print(f" Total routes: {len(reg['all'])}")
print(f" GET (no params): {len(reg['get_no_params'])}")
print(f" GET (with params): {len(reg['get_with_params'])}")
print(f" POST create: {len(reg['post_create'])}")
print(f" POST edit: {len(reg['post_edit'])}")
print(f" POST delete: {len(reg['post_delete'])}")
print(f" POST action/toggle: {len(reg['post_action'])}")
print(f" Entity prefixes: {len(reg['by_prefix'])}")
print()
# Warn about POST routes with no discovered form fields
for r in reg["post_create"]:
if not r.form_fields:
print(f" WARNING: {r.path} has no discovered Form() fields")
for r in reg["post_edit"]:
if not r.form_fields:
print(f" WARNING: {r.path} has no discovered Form() fields")
# Show seed mapping coverage
print()
print("PREFIX_TO_SEED coverage:")
print("-" * 70)
for prefix in sorted(reg["by_prefix"].keys()):
has_seed = prefix in PREFIX_TO_SEED and PREFIX_TO_SEED[prefix] is not None
marker = "OK" if has_seed else "SKIP (no seed)"
print(f" {prefix:30s} {marker}")
print()
print("Form field details for create routes:")
print("-" * 70)
for r in reg["post_create"]:
if r.form_fields:
fields = [f" {f.name}: {f.annotation}{'*' if f.required else ''}" for f in r.form_fields]
print(f"\n {r.path}")
print("\n".join(fields))
if __name__ == "__main__":
main()

22
tests/run_tests.sh Executable file
View File

@@ -0,0 +1,22 @@
#!/bin/bash
set -e
cd /app
pip install pytest pytest-asyncio httpx --break-system-packages -q 2>/dev/null
echo "============================================="
echo " Life OS Dynamic Test Suite"
echo "============================================="
echo ""
case "${1}" in
smoke) echo ">> Smoke tests"; python -m pytest tests/test_smoke_dynamic.py -v --tb=short ;;
crud) echo ">> CRUD tests"; python -m pytest tests/test_crud_dynamic.py -v --tb=short ;;
logic) echo ">> Business logic"; python -m pytest tests/test_business_logic.py -v --tb=short ;;
report) echo ">> Route report"; python -m tests.route_report ;;
fast) echo ">> Smoke, stop on fail"; python -m pytest tests/test_smoke_dynamic.py -v --tb=short -x ;;
"") echo ">> Full suite"; python -m pytest tests/ -v --tb=short ;;
*) echo ">> Custom: $@"; python -m pytest tests/ "$@" ;;
esac
echo ""
echo "Done"

View File

@@ -0,0 +1,212 @@
"""
Business Logic Tests
====================
Hand-written tests for specific behavioral contracts.
These test LOGIC, not routes, so they stay manual.
When to add tests here:
- New constraint (e.g. "only one timer running at a time")
- State transitions (e.g. "completing a task sets completed_at")
- Cross-entity effects (e.g. "deleting a project hides its tasks")
- Search behavior
- Sidebar data integrity
"""
from __future__ import annotations
import uuid
from datetime import date, datetime, timezone
import pytest
from httpx import AsyncClient
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
# ===========================================================================
# Time Tracking
# ===========================================================================
class TestTimerConstraints:
"""Only one timer can run at a time. Starting a new one auto-stops the old."""
@pytest.mark.asyncio
async def test_single_timer_constraint(
self, client: AsyncClient, db_session: AsyncSession,
seed_domain: dict, seed_project: dict,
):
t1 = await _create_task(db_session, seed_domain["id"], seed_project["id"], "Timer1")
t2 = await _create_task(db_session, seed_domain["id"], seed_project["id"], "Timer2")
await client.post("/time/start", data={"task_id": t1}, follow_redirects=False)
await client.post("/time/start", data={"task_id": t2}, follow_redirects=False)
result = await db_session.execute(
text("SELECT count(*) FROM time_entries WHERE end_at IS NULL")
)
assert result.scalar() <= 1
@pytest.mark.asyncio
async def test_stop_sets_end_at(
self, client: AsyncClient, db_session: AsyncSession, seed_task: dict,
):
await client.post("/time/start", data={"task_id": seed_task["id"]}, follow_redirects=False)
await client.post("/time/stop", follow_redirects=False)
result = await db_session.execute(
text("SELECT count(*) FROM time_entries WHERE end_at IS NULL AND task_id = :tid"),
{"tid": seed_task["id"]},
)
assert result.scalar() == 0
@pytest.mark.asyncio
async def test_running_endpoint_returns_json(
self, client: AsyncClient, seed_task: dict,
):
await client.post("/time/start", data={"task_id": seed_task["id"]}, follow_redirects=False)
r = await client.get("/time/running")
assert r.status_code == 200
# Should be valid JSON
data = r.json()
assert data is not None
# ===========================================================================
# Soft Delete & Restore
# ===========================================================================
class TestSoftDeleteBehavior:
"""Soft-deleted items should vanish from lists and reappear after restore."""
@pytest.mark.asyncio
async def test_deleted_task_hidden_from_list(
self, client: AsyncClient, seed_task: dict,
):
await client.post(f"/tasks/{seed_task['id']}/delete", follow_redirects=False)
r = await client.get("/tasks/")
assert seed_task["title"] not in r.text
@pytest.mark.asyncio
async def test_restore_task_reappears(
self, client: AsyncClient, seed_task: dict,
):
await client.post(f"/tasks/{seed_task['id']}/delete", follow_redirects=False)
await client.post(
f"/admin/trash/restore/tasks/{seed_task['id']}",
follow_redirects=False,
)
r = await client.get("/tasks/")
assert seed_task["title"] in r.text
@pytest.mark.asyncio
async def test_deleted_project_hidden(
self, client: AsyncClient, seed_project: dict,
):
await client.post(f"/projects/{seed_project['id']}/delete", follow_redirects=False)
r = await client.get("/projects/")
assert seed_project["name"] not in r.text
# ===========================================================================
# Search
# ===========================================================================
class TestSearchBehavior:
@pytest.mark.asyncio
async def test_search_does_not_crash_on_sql_injection(self, client: AsyncClient):
r = await client.get("/search/?q='; DROP TABLE tasks; --")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_search_empty_query(self, client: AsyncClient):
r = await client.get("/search/?q=")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_search_special_unicode(self, client: AsyncClient):
r = await client.get("/search/?q=日本語テスト")
assert r.status_code == 200
# ===========================================================================
# Sidebar
# ===========================================================================
class TestSidebarIntegrity:
@pytest.mark.asyncio
async def test_sidebar_shows_domain_on_every_page(
self, client: AsyncClient, seed_domain: dict,
):
"""Domain should appear in sidebar across all pages."""
# Sample a few different page types
for path in ("/", "/tasks/", "/notes/", "/projects/"):
r = await client.get(path)
assert seed_domain["name"] in r.text, f"Domain missing from sidebar on {path}"
@pytest.mark.asyncio
async def test_sidebar_shows_project_hierarchy(
self, client: AsyncClient, seed_domain: dict, seed_area: dict, seed_project: dict,
):
r = await client.get("/")
assert seed_project["name"] in r.text
# ===========================================================================
# Focus & Capture Workflows
# ===========================================================================
class TestFocusWorkflow:
@pytest.mark.asyncio
async def test_add_and_remove_from_focus(
self, client: AsyncClient, db_session: AsyncSession, seed_task: dict,
):
# Add to focus
r = await client.post("/focus/add", data={"task_id": seed_task["id"]}, follow_redirects=False)
assert r.status_code in (303, 302)
@pytest.mark.asyncio
async def test_capture_multi_line_creates_multiple(
self, client: AsyncClient, db_session: AsyncSession,
):
await client.post(
"/capture/add",
data={"raw_text": "Line one\nLine two\nLine three"},
follow_redirects=False,
)
result = await db_session.execute(
text("SELECT count(*) FROM capture WHERE is_deleted = false")
)
count = result.scalar()
# Should have created at least 2 items (3 lines)
assert count >= 2, f"Expected multiple capture items, got {count}"
# ===========================================================================
# Edge Cases
# ===========================================================================
class TestEdgeCases:
@pytest.mark.asyncio
async def test_invalid_uuid_in_path(self, client: AsyncClient):
r = await client.get("/tasks/not-a-valid-uuid")
assert r.status_code in (404, 422, 400)
@pytest.mark.asyncio
async def test_timer_start_without_task_id(self, client: AsyncClient):
r = await client.post("/time/start", data={}, follow_redirects=False)
assert r.status_code != 200 # Should error, not silently succeed
@pytest.mark.asyncio
async def test_double_delete_doesnt_crash(
self, client: AsyncClient, seed_task: dict,
):
await client.post(f"/tasks/{seed_task['id']}/delete", follow_redirects=False)
r = await client.post(f"/tasks/{seed_task['id']}/delete", follow_redirects=False)
assert r.status_code in (303, 302, 404)
# ===========================================================================
# Helpers
# ===========================================================================
async def _create_task(db: AsyncSession, domain_id: str, project_id: str, title: str) -> str:
_id = str(uuid.uuid4())
await db.execute(
text("INSERT INTO tasks (id, domain_id, project_id, title, status, priority, sort_order, is_deleted, created_at, updated_at) "
"VALUES (:id, :did, :pid, :title, 'open', 3, 0, false, now(), now())"),
{"id": _id, "did": domain_id, "pid": project_id, "title": title},
)
await db.flush()
return _id

161
tests/test_crud_dynamic.py Normal file
View File

@@ -0,0 +1,161 @@
"""
Dynamic CRUD Tests
==================
Auto-discovers all POST routes and generates valid form data from
introspected Form() field signatures. No hardcoded form payloads.
When you add a new entity router with standard CRUD, these tests
automatically cover create/edit/delete on next run.
Tests:
- All POST /create routes accept valid form data and redirect 303
- All POST /{id}/edit routes accept valid form data and redirect 303
- All POST /{id}/delete routes redirect 303
- All POST action routes don't crash (303 or other non-500)
"""
from __future__ import annotations
import re
import uuid
import pytest
from httpx import AsyncClient
from tests.registry import ALL_ROUTES, resolve_path
from tests.introspect import RouteKind
from tests.form_factory import build_form_data, build_edit_data
# ---------------------------------------------------------------------------
# Collect POST routes by kind
# ---------------------------------------------------------------------------
_CREATE_ROUTES = [r for r in ALL_ROUTES if r.kind == RouteKind.CREATE and not r.has_file_upload]
_EDIT_ROUTES = [r for r in ALL_ROUTES if r.kind == RouteKind.EDIT and not r.has_file_upload]
_DELETE_ROUTES = [r for r in ALL_ROUTES if r.kind == RouteKind.DELETE]
_ACTION_ROUTES = [r for r in ALL_ROUTES if r.kind in (RouteKind.ACTION, RouteKind.TOGGLE)]
# ---------------------------------------------------------------------------
# Create: POST /entity/create with auto-generated form data -> 303
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize(
"route",
_CREATE_ROUTES,
ids=[f"CREATE {r.path}" for r in _CREATE_ROUTES],
)
async def test_create_redirects(client: AsyncClient, all_seeds: dict, route):
"""POST to create routes with valid form data should redirect 303."""
form_data = build_form_data(route.form_fields, all_seeds)
if not form_data:
pytest.skip(f"No form fields discovered for {route.path}")
r = await client.post(route.path, data=form_data, follow_redirects=False)
assert r.status_code in (303, 302, 307), (
f"POST {route.path} returned {r.status_code} with data {form_data}"
)
# ---------------------------------------------------------------------------
# Edit: POST /entity/{id}/edit with auto-generated form data -> 303
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize(
"route",
_EDIT_ROUTES,
ids=[f"EDIT {r.path}" for r in _EDIT_ROUTES],
)
async def test_edit_redirects(client: AsyncClient, all_seeds: dict, route):
"""POST to edit routes with valid form data should redirect 303."""
resolved = resolve_path(route.path, all_seeds)
if resolved is None:
pytest.skip(f"No seed data mapping for {route.path}")
form_data = build_edit_data(route.form_fields, all_seeds)
if not form_data:
pytest.skip(f"No form fields discovered for {route.path}")
r = await client.post(resolved, data=form_data, follow_redirects=False)
assert r.status_code in (303, 302, 307), (
f"POST {resolved} returned {r.status_code} with data {form_data}"
)
# ---------------------------------------------------------------------------
# Delete: POST /entity/{id}/delete -> 303
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize(
"route",
_DELETE_ROUTES,
ids=[f"DELETE {r.path}" for r in _DELETE_ROUTES],
)
async def test_delete_redirects(client: AsyncClient, all_seeds: dict, route):
"""POST to delete routes should redirect 303."""
resolved = resolve_path(route.path, all_seeds)
if resolved is None:
pytest.skip(f"No seed data mapping for {route.path}")
r = await client.post(resolved, follow_redirects=False)
assert r.status_code in (303, 302, 307, 404), (
f"POST {resolved} returned {r.status_code}"
)
# ---------------------------------------------------------------------------
# Action routes: POST /entity/{id}/toggle, etc. -> non-500
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize(
"route",
_ACTION_ROUTES,
ids=[f"ACTION {r.path}" for r in _ACTION_ROUTES],
)
async def test_action_does_not_crash(client: AsyncClient, all_seeds: dict, route):
"""POST action routes should not return 500."""
resolved = resolve_path(route.path, all_seeds)
if resolved is None:
# Try building form data for actions that need it (e.g. /focus/add)
form_data = build_form_data(route.form_fields, all_seeds) if route.form_fields else {}
r = await client.post(route.path, data=form_data, follow_redirects=False)
else:
form_data = build_form_data(route.form_fields, all_seeds) if route.form_fields else {}
r = await client.post(resolved, data=form_data, follow_redirects=False)
assert r.status_code != 500, (
f"POST {resolved or route.path} returned 500 (server error)"
)
# ---------------------------------------------------------------------------
# Verify create actually persists: create then check list page
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize(
"route",
[r for r in _CREATE_ROUTES if r.prefix in ("/domains", "/contacts", "/meetings")],
ids=[f"PERSIST {r.path}" for r in _CREATE_ROUTES if r.prefix in ("/domains", "/contacts", "/meetings")],
)
async def test_create_persists_in_list(client: AsyncClient, all_seeds: dict, route):
"""Items created via POST should appear on the list page."""
form_data = build_form_data(route.form_fields, all_seeds)
if not form_data:
pytest.skip(f"No form fields for {route.path}")
# Use a unique name to search for
marker = f"AutoTest_{uuid.uuid4().hex[:8]}"
for key in ("name", "title", "first_name"):
if key in form_data:
form_data[key] = marker
break
else:
pytest.skip(f"No name/title field found for {route.path}")
await client.post(route.path, data=form_data, follow_redirects=False)
# Check the list page
list_path = route.prefix + "/"
r = await client.get(list_path)
assert marker in r.text, (
f"Created item '{marker}' not found on {list_path}"
)

View File

@@ -0,0 +1,76 @@
"""
Dynamic smoke tests - auto-parametrized from introspected routes.
Tests that all GET endpoints return 200 (or acceptable alternatives).
"""
import pytest
from tests.registry import (
GET_NO_PARAMS, GET_WITH_PARAMS,
resolve_path, PREFIX_TO_SEED, ROUTE_REGISTRY,
)
# Routes that require query params to avoid 422
ROUTES_NEEDING_QUERY = {
"/search/api": "?q=test",
}
# Routes that may legitimately return non-200 without full context
ACCEPTABLE_CODES = {200, 307, 308}
# ── Test 1: All GET endpoints without path params ───────────
@pytest.mark.parametrize(
"path",
[r.path for r in GET_NO_PARAMS],
ids=[f"GET {r.path}" for r in GET_NO_PARAMS],
)
async def test_get_no_params_returns_200(client, path):
"""Every GET endpoint without path params should return 200."""
url = path
# Append required query params if needed
for route_prefix, query_string in ROUTES_NEEDING_QUERY.items():
if path == route_prefix or path.rstrip("/") == route_prefix.rstrip("/"):
url = path + query_string
break
r = await client.get(url, follow_redirects=True)
assert r.status_code in ACCEPTABLE_CODES or r.status_code == 200, \
f"GET {url} returned {r.status_code}"
# ── Test 2: All GET endpoints with valid seed IDs ───────────
@pytest.mark.parametrize(
"path_template",
[r.path for r in GET_WITH_PARAMS],
ids=[f"GET {r.path}" for r in GET_WITH_PARAMS],
)
async def test_get_with_valid_id_returns_200(client, all_seeds, path_template):
"""GET endpoints with a valid seed UUID should return 200."""
path = resolve_path(path_template, all_seeds)
# Skip if we couldn't resolve (no seed for this prefix)
if "{" in path:
pytest.skip(f"No seed mapping for {path_template}")
r = await client.get(path, follow_redirects=True)
assert r.status_code == 200, f"GET {path} returned {r.status_code}"
# ── Test 3: GET with fake UUID returns 404 ──────────────────
FAKE_UUID = "00000000-0000-0000-0000-000000000000"
# Build fake-ID test cases from routes that have path params
_fake_id_cases = []
for r in GET_WITH_PARAMS:
import re
fake_path = re.sub(r"\{[^}]+\}", FAKE_UUID, r.path)
_fake_id_cases.append((fake_path, r.path))
@pytest.mark.parametrize(
"path,template",
_fake_id_cases if _fake_id_cases else [pytest.param("", "", marks=pytest.mark.skip)],
ids=[f"404 {c[1]}" for c in _fake_id_cases] if _fake_id_cases else ["NOTSET"],
)
async def test_get_with_fake_id_returns_404(client, path, template):
"""GET endpoints with a nonexistent UUID should return 404."""
r = await client.get(path, follow_redirects=True)
assert r.status_code in (404, 302, 303), \
f"GET {path} returned {r.status_code}, expected 404 or redirect"