""" Route Introspection Engine ========================== Imports the live FastAPI app and extracts a complete route registry: - All paths, methods, endpoint names - Path parameters (e.g. {id}) - Form fields with types and defaults (from function signatures) - Route classification (list, detail, create, edit, delete, etc.) This is the single source of truth for all dynamic tests. No hardcoded routes anywhere in the test suite. """ from __future__ import annotations import inspect import json import re from dataclasses import dataclass, field, asdict from enum import Enum from typing import Any, Optional from fastapi import Form, UploadFile from fastapi.params import Depends from fastapi.routing import APIRoute class RouteKind(str, Enum): """Classification of route purpose based on path pattern + method.""" LIST = "list" # GET /entities/ DETAIL = "detail" # GET /entities/{id} CREATE_FORM = "create_form" # GET /entities/create EDIT_FORM = "edit_form" # GET /entities/{id}/edit CREATE = "create" # POST /entities/create EDIT = "edit" # POST /entities/{id}/edit DELETE = "delete" # POST /entities/{id}/delete TOGGLE = "toggle" # POST /entities/{id}/toggle ACTION = "action" # POST with other patterns JSON_ENDPOINT = "json" # GET returning JSON (e.g. /time/running) PAGE = "page" # GET catch-all (dashboard, search, etc.) OTHER = "other" @dataclass class FormField: """A form field extracted from a route endpoint's signature.""" name: str annotation: str # str, int, Optional[str], etc. required: bool default: Any = None # Default value if not required is_file: bool = False # True for UploadFile params @dataclass class RouteInfo: """Complete metadata for a single route.""" path: str methods: list[str] endpoint_name: str kind: RouteKind path_params: list[str] # e.g. ["id"] from /tasks/{id} form_fields: list[FormField] # Form() params from signature query_params: list[str] # Non-Form, non-Depends, non-Request params has_file_upload: bool = False prefix: str = "" # Router prefix, e.g. "/tasks" @property def needs_seed_data(self) -> bool: """True if this route needs an entity ID to test.""" return bool(self.path_params) @property def entity_prefix(self) -> str: """Extract the entity prefix, e.g. '/tasks' from '/tasks/{id}/edit'.""" if self.prefix: return self.prefix parts = self.path.strip("/").split("/") return f"/{parts[0]}" if parts else "" def _classify_route(path: str, methods: set[str]) -> RouteKind: """Determine route purpose from path pattern and HTTP method.""" is_get = "GET" in methods is_post = "POST" in methods if is_get: if path.endswith("/create"): return RouteKind.CREATE_FORM if path.endswith("/edit"): return RouteKind.EDIT_FORM if "{" in path: return RouteKind.DETAIL # Heuristic: paths ending in known JSON endpoints if any(path.endswith(p) for p in ["/running"]): return RouteKind.JSON_ENDPOINT if re.match(r"^/[^/]+/?$", path) or path == "/": return RouteKind.LIST if path != "/" else RouteKind.PAGE return RouteKind.PAGE if is_post: if path.endswith("/create"): return RouteKind.CREATE if path.endswith("/edit"): return RouteKind.EDIT if path.endswith("/delete"): return RouteKind.DELETE if path.endswith("/toggle"): return RouteKind.TOGGLE return RouteKind.ACTION return RouteKind.OTHER def _extract_path_params(path: str) -> list[str]: """Pull {param_name} from path template.""" return re.findall(r"\{(\w+)\}", path) def _extract_form_fields(endpoint) -> list[FormField]: """ Inspect the endpoint function signature to find Form() parameters. Returns list of FormField with name, type, required status, and defaults. """ fields = [] try: sig = inspect.signature(endpoint) except (ValueError, TypeError): return fields for name, param in sig.parameters.items(): # Skip non-form params if name in ("request", "self", "cls"): continue default = param.default # Check if it's a Depends() - skip those if isinstance(default, Depends): continue # Check for UploadFile annotation annotation = param.annotation ann_str = _annotation_to_str(annotation) if annotation is UploadFile or (hasattr(annotation, "__origin__") and UploadFile in getattr(annotation, "__args__", ())): fields.append(FormField( name=name, annotation=ann_str, required=True, is_file=True, )) continue # Check if default is a Form() instance is_form = False form_default = inspect.Parameter.empty if hasattr(default, "__class__") and default.__class__.__name__ in ("Form", "FieldInfo"): is_form = True # Extract the actual default from the Form() wrapper if hasattr(default, "default"): form_default = default.default if not is_form: # Could also be Form via Annotated types (FastAPI 0.95+) if hasattr(annotation, "__metadata__"): for meta in annotation.__metadata__: if hasattr(meta, "__class__") and meta.__class__.__name__ in ("Form", "FieldInfo"): is_form = True if hasattr(meta, "default"): form_default = meta.default break if is_form: # Determine if required required = form_default is inspect.Parameter.empty or form_default is Ellipsis or form_default is None and "Optional" not in ann_str actual_default = None if form_default in (inspect.Parameter.empty, Ellipsis) else form_default fields.append(FormField( name=name, annotation=ann_str, required=required, default=actual_default, )) return fields def _extract_query_params(endpoint, path_params: list[str]) -> list[str]: """Extract query parameters (non-Form, non-Depends, non-special params).""" params = [] skip = {"request", "self", "cls", "db"} | set(path_params) try: sig = inspect.signature(endpoint) except (ValueError, TypeError): return params for name, param in sig.parameters.items(): if name in skip: continue default = param.default if isinstance(default, Depends): continue # If it has a Form() default, skip (handled by form_fields) if hasattr(default, "__class__") and default.__class__.__name__ in ("Form", "FieldInfo"): continue # Check Annotated metadata annotation = param.annotation if hasattr(annotation, "__metadata__"): has_form = any( hasattr(m, "__class__") and m.__class__.__name__ in ("Form", "FieldInfo") for m in annotation.__metadata__ ) if has_form: continue # Remaining non-special params are likely query params if param.annotation is not UploadFile: params.append(name) return params def _annotation_to_str(annotation) -> str: """Convert a type annotation to readable string.""" if annotation is inspect.Parameter.empty: return "Any" if hasattr(annotation, "__name__"): return annotation.__name__ return str(annotation).replace("typing.", "") def _extract_prefix(path: str) -> str: """Get the router prefix from a path.""" # /tasks/{id}/edit -> /tasks # /admin/trash/ -> /admin/trash # /weblinks/folders/create -> /weblinks parts = path.strip("/").split("/") if not parts: return "/" # Walk until we hit a {param} or known action word prefix_parts = [] for part in parts: if part.startswith("{"): break if part in ("create", "edit", "delete", "toggle"): break prefix_parts.append(part) return "/" + "/".join(prefix_parts) if prefix_parts else "/" def introspect_app(app) -> list[RouteInfo]: """ Walk all registered routes on the FastAPI app. Returns a complete RouteInfo list - the single source of truth for tests. """ routes = [] for route in app.routes: if not isinstance(route, APIRoute): continue path = route.path methods = route.methods or set() endpoint = route.endpoint endpoint_name = endpoint.__name__ if hasattr(endpoint, "__name__") else str(endpoint) path_params = _extract_path_params(path) form_fields = _extract_form_fields(endpoint) query_params = _extract_query_params(endpoint, path_params) has_file = any(f.is_file for f in form_fields) # Classify each method separately if route has both GET and POST for method in methods: kind = _classify_route(path, {method}) prefix = _extract_prefix(path) routes.append(RouteInfo( path=path, methods=[method], endpoint_name=endpoint_name, kind=kind, path_params=path_params, form_fields=form_fields if method == "POST" else [], query_params=query_params if method == "GET" else [], has_file_upload=has_file, prefix=prefix, )) return routes def get_route_registry(app) -> dict: """ Build a structured registry keyed by route kind. Convenience wrapper for test parametrization. """ all_routes = introspect_app(app) registry = { "all": all_routes, "get_no_params": [r for r in all_routes if "GET" in r.methods and not r.path_params], "get_with_params": [r for r in all_routes if "GET" in r.methods and r.path_params], "post_create": [r for r in all_routes if r.kind == RouteKind.CREATE], "post_edit": [r for r in all_routes if r.kind == RouteKind.EDIT], "post_delete": [r for r in all_routes if r.kind == RouteKind.DELETE], "post_action": [r for r in all_routes if r.kind in (RouteKind.ACTION, RouteKind.TOGGLE)], "lists": [r for r in all_routes if r.kind == RouteKind.LIST], "details": [r for r in all_routes if r.kind == RouteKind.DETAIL], "create_forms": [r for r in all_routes if r.kind == RouteKind.CREATE_FORM], "edit_forms": [r for r in all_routes if r.kind == RouteKind.EDIT_FORM], } # Group by prefix for entity-level operations by_prefix: dict[str, list[RouteInfo]] = {} for r in all_routes: by_prefix.setdefault(r.prefix, []).append(r) registry["by_prefix"] = by_prefix return registry def dump_registry_report(app) -> str: """ Generate a human-readable report of all discovered routes. Useful for debugging / verifying introspection output. """ routes = introspect_app(app) lines = [ "=" * 70, "LIFE OS ROUTE REGISTRY", f"Total routes discovered: {len(routes)}", "=" * 70, "", ] # Group by prefix by_prefix: dict[str, list[RouteInfo]] = {} for r in routes: by_prefix.setdefault(r.prefix, []).append(r) for prefix in sorted(by_prefix.keys()): prefix_routes = by_prefix[prefix] lines.append(f" {prefix}") lines.append(f" {'─' * 60}") for r in sorted(prefix_routes, key=lambda x: (x.path, x.methods)): method = r.methods[0] if r.methods else "?" form_str = "" if r.form_fields: field_names = [f.name + ("*" if f.required else "") for f in r.form_fields] form_str = f" fields=[{', '.join(field_names)}]" query_str = "" if r.query_params: query_str = f" query=[{', '.join(r.query_params)}]" lines.append(f" {method:6s} {r.path:40s} {r.kind.value:15s}{form_str}{query_str}") lines.append("") return "\n".join(lines)