feat(server): mint API + Postgres schema + manual adapter (PR 1)

Source-agnostic license issuance service. FastAPI app fronts a
Postgres `licenses` table; the only currently-wired source is
`manual` (operator mints via /internal/mint). Gumroad webhook
adapter lands in PR 2.

Key design points:

- Signing reuses src/license/crypto.py via a COPY into the image
  (single source of truth — blobs minted server-side verify against
  the same embedded pubkey on the buyer's machine).
- Source adapter Protocol (app/adapters/base.py) is the seam for
  Gumroad / Lemon Squeezy / Stripe in later PRs; Mint API speaks
  only SaleEvent / RefundEvent.
- (source, source_order_id) UNIQUE composite gives idempotent
  webhook retries without double-mint.
- JSONB type uses with_variant(JSON, 'sqlite') so the same models
  drive both Postgres prod and SQLite tests (no testcontainers dep).
- Bearer-token auth on /internal/*; the IP-loopback guard was
  removed after the docker bridge made it fight legitimate prod
  traffic (nginx defense + Bearer remain).
- Secrets resolved via *_FILE env vars pointing at
  /run/secrets/<name>, so passwords never appear in `docker inspect`.

21 unit tests (SQLite in-memory, StaticPool) plus a real-Postgres
docker-compose smoke test in server/scripts/smoke.sh that builds the
image, runs the alembic migration, mints a license, verifies the
signature against the host dev pubkey, and checks the DB row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-14 00:46:54 +00:00
parent 4179cb5156
commit bab2c9468c
29 changed files with 1519 additions and 0 deletions

0
server/app/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,71 @@
"""Source-adapter interface.
The Mint API speaks only the normalized event types defined here.
Each storefront has its own adapter that:
- Verifies the storefront's webhook signature in its native format.
- Parses the storefront's payload into a :class:`SaleEvent` or
:class:`RefundEvent`.
- Maps the storefront's product/variant IDs to a license tier via
the per-source config in :mod:`app.adapters.config`.
Adding a new source (Lemon Squeezy, Stripe, Paddle) is one new
module that implements :class:`SourceAdapter`. The Mint API and DB
do not change.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Any, Optional, Protocol
@dataclass(frozen=True)
class SaleEvent:
"""A storefront sale, normalized.
The Mint API consumes this directly — it never reaches into the
raw storefront payload. Anything storefront-specific that's worth
keeping is preserved in :attr:`raw_payload` for audit.
"""
source: str # e.g. "gumroad", "manual"
source_order_id: Optional[str] # storefront's order ID; None for manual mints
buyer_name: str
buyer_email: str
tier: str # mapped from product/variant
years: int = 1
promotion: Optional[str] = None
amount_paid: Optional[Decimal] = None
currency: Optional[str] = "USD"
notes: Optional[str] = None
raw_payload: dict = field(default_factory=dict)
@dataclass(frozen=True)
class RefundEvent:
"""A storefront refund — marks an existing license revoked."""
source: str
source_order_id: str
reason: Optional[str] = None
raw_payload: dict = field(default_factory=dict)
class SourceAdapter(Protocol):
"""Interface every storefront adapter implements."""
source_name: str
def verify_webhook(self, *, body: bytes, headers: dict[str, str]) -> bool:
"""Return True iff the request came from the legitimate storefront."""
...
def parse_sale(self, payload: dict[str, Any]) -> Optional[SaleEvent]:
"""Return a :class:`SaleEvent` if *payload* is a sale, else None."""
...
def parse_refund(self, payload: dict[str, Any]) -> Optional[RefundEvent]:
"""Return a :class:`RefundEvent` if *payload* is a refund, else None."""
...

View File

@@ -0,0 +1,52 @@
"""Manual adapter — operator-initiated mints (comps, support replacements).
There is no webhook to verify and no payload to parse: the operator
hands us the buyer details directly via the CLI, and we construct a
:class:`SaleEvent` from them. ``source='manual'`` separates these
rows from storefront-driven mints in the DB.
"""
from __future__ import annotations
from decimal import Decimal
from typing import Any, Optional
from app.adapters.base import RefundEvent, SaleEvent
class ManualAdapter:
source_name = "manual"
def verify_webhook(self, *, body: bytes, headers: dict[str, str]) -> bool:
return False # manual flows never come through webhooks
def parse_sale(self, payload: dict[str, Any]) -> Optional[SaleEvent]:
return self.build_sale(**payload)
def parse_refund(self, payload: dict[str, Any]) -> Optional[RefundEvent]:
return None
def build_sale(
self,
*,
name: str,
email: str,
tier: str,
years: int = 1,
promotion: Optional[str] = None,
amount_paid: Optional[Decimal] = None,
currency: Optional[str] = "USD",
notes: Optional[str] = None,
) -> SaleEvent:
return SaleEvent(
source=self.source_name,
source_order_id=None,
buyer_name=name,
buyer_email=email,
tier=tier,
years=years,
promotion=promotion,
amount_paid=amount_paid,
currency=currency,
notes=notes,
)

65
server/app/auth.py Normal file
View File

@@ -0,0 +1,65 @@
"""Auth guards for ``/internal/*``.
Active layer: Bearer token, presented by the operator's CLI and
matched against the value in the secrets dir. Token rotation =
update the file, restart the container.
:func:`require_localhost` is preserved but unused by default — it
fights the Docker bridge network model (the container sees the
gateway IP, not 127.0.0.1, regardless of where traffic originated).
Re-enable it only if the API runs in ``network_mode: host``.
"""
from __future__ import annotations
import hmac
from typing import Optional
from fastapi import HTTPException, Request, status
from app.config import get_settings
def require_localhost(request: Request) -> None:
"""Reject the request unless the connecting peer is loopback.
``request.client.host`` reflects the actual TCP peer (the nginx
upstream connecting from 127.0.0.1) when ``proxy_set_header`` is
used appropriately. We deliberately do NOT trust
``X-Forwarded-For`` here — we want the raw peer.
"""
peer = request.client.host if request.client else None
if peer not in {"127.0.0.1", "::1"}:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Not found",
)
def require_bearer_token(request: Request) -> None:
"""Verify ``Authorization: Bearer <admin_token>``.
Uses constant-time comparison so timing leaks don't reveal token
prefixes. The 401 deliberately doesn't echo the supplied token or
leak whether a token is configured at all — clients should treat
"no token configured" the same as "wrong token".
"""
settings = get_settings()
expected: Optional[str] = settings.resolve_admin_token()
if not expected:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Server not configured for internal access.",
)
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Bearer token required.",
)
presented = auth.removeprefix("Bearer ").strip()
if not hmac.compare_digest(presented, expected):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token.",
)

64
server/app/config.py Normal file
View File

@@ -0,0 +1,64 @@
"""Runtime configuration loaded from environment + secret files.
Secrets are read from files (``*_FILE`` env vars pointing at
``/run/secrets/<name>``) so they never appear in ``docker inspect``
or process environment dumps. Plain ``*`` vars are the fallback for
local development where mounting secret files is overkill.
"""
from __future__ import annotations
from functools import lru_cache
from pathlib import Path
from typing import Optional
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
database_url: str = Field(
default="postgresql+psycopg://datatools_api@localhost:5432/datatools_licenses",
validation_alias="DATABASE_URL",
)
admin_token: Optional[str] = Field(default=None, validation_alias="DATATOOLS_ADMIN_TOKEN")
admin_token_file: Optional[Path] = Field(default=None, validation_alias="DATATOOLS_ADMIN_TOKEN_FILE")
license_privkey_hex: Optional[str] = Field(default=None, validation_alias="DATATOOLS_LICENSE_PRIVKEY")
license_privkey_file: Optional[Path] = Field(default=None, validation_alias="DATATOOLS_LICENSE_PRIVKEY_FILE")
license_pubkey_hex: Optional[str] = Field(default=None, validation_alias="DATATOOLS_LICENSE_PUBKEY")
postmark_token: Optional[str] = Field(default=None, validation_alias="POSTMARK_TOKEN")
postmark_token_file: Optional[Path] = Field(default=None, validation_alias="POSTMARK_TOKEN_FILE")
gumroad_secret: Optional[str] = Field(default=None, validation_alias="GUMROAD_WEBHOOK_SECRET")
gumroad_secret_file: Optional[Path] = Field(default=None, validation_alias="GUMROAD_WEBHOOK_SECRET_FILE")
def resolve_admin_token(self) -> Optional[str]:
return _resolve(self.admin_token, self.admin_token_file)
def resolve_license_privkey(self) -> Optional[str]:
return _resolve(self.license_privkey_hex, self.license_privkey_file)
def resolve_postmark_token(self) -> Optional[str]:
return _resolve(self.postmark_token, self.postmark_token_file)
def resolve_gumroad_secret(self) -> Optional[str]:
return _resolve(self.gumroad_secret, self.gumroad_secret_file)
def _resolve(inline: Optional[str], path: Optional[Path]) -> Optional[str]:
if inline:
return inline.strip()
if path and path.exists():
return path.read_text(encoding="utf-8").strip()
return None
@lru_cache(maxsize=1)
def get_settings() -> Settings:
return Settings()

65
server/app/db.py Normal file
View File

@@ -0,0 +1,65 @@
"""SQLAlchemy engine + session factory.
The DB password lives in ``/run/secrets/pg_password``; we read it
from there (or ``$PG_PASSWORD`` for local dev) and splice it into
``DATABASE_URL`` so the password never has to be in plaintext in
``compose.yml`` or process environment listings.
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import Generator
from urllib.parse import quote_plus, urlparse, urlunparse
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from app.config import get_settings
def _resolve_password() -> str | None:
inline = os.environ.get("PG_PASSWORD")
if inline:
return inline.strip()
path = os.environ.get("PG_PASSWORD_FILE")
if path and Path(path).exists():
return Path(path).read_text(encoding="utf-8").strip()
return None
def _build_url(base_url: str) -> str:
"""Inject the resolved password into ``base_url`` if absent."""
parsed = urlparse(base_url)
if parsed.password:
return base_url
pw = _resolve_password()
if pw is None:
return base_url
netloc = f"{parsed.username or ''}:{quote_plus(pw)}@{parsed.hostname}"
if parsed.port:
netloc += f":{parsed.port}"
return urlunparse(parsed._replace(netloc=netloc))
_settings = get_settings()
engine = create_engine(_build_url(_settings.database_url), pool_pre_ping=True, future=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=False)
class Base(DeclarativeBase):
"""Declarative base for ORM models."""
def get_session() -> Generator[Session, None, None]:
"""FastAPI dependency. Commits on success, rolls back on exception."""
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()

18
server/app/main.py Normal file
View File

@@ -0,0 +1,18 @@
"""FastAPI entry point for the DataTools license server."""
from __future__ import annotations
from fastapi import FastAPI
from app.routes import internal, public
app = FastAPI(
title="DataTools License Server",
version="0.1.0",
docs_url=None,
redoc_url=None,
openapi_url=None,
)
app.include_router(public.router)
app.include_router(internal.router)

136
server/app/mint.py Normal file
View File

@@ -0,0 +1,136 @@
"""Core mint + revoke logic.
Bridges the source-adapter layer (:mod:`app.adapters`) to the DB
layer (:mod:`app.models`), reusing the desktop app's signing /
encoding primitives from ``datatools_license.crypto`` so blobs minted
here verify against the same embedded pubkey on the buyer's machine.
"""
from __future__ import annotations
import os
import uuid
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.adapters.base import SaleEvent
from app.config import get_settings
from app.models import License
def _init_key_env() -> None:
"""Resolve secret-file pointers into env vars before importing crypto.
``datatools_license.crypto`` looks for ``DATATOOLS_LICENSE_PRIVKEY``
/ ``DATATOOLS_LICENSE_PUBKEY`` in ``os.environ``. When those come
from secret files (``*_FILE`` env vars), we read them once at
module import and stash so crypto can pick them up without
changes.
"""
settings = get_settings()
priv = settings.resolve_license_privkey()
if priv:
os.environ.setdefault("DATATOOLS_LICENSE_PRIVKEY", priv)
pub = settings.license_pubkey_hex
if pub:
os.environ.setdefault("DATATOOLS_LICENSE_PUBKEY", pub)
_init_key_env()
# Imported after env init so the crypto module reads the correct key.
from datatools_license.crypto import encode_blob, sign # noqa: E402
from datatools_license.features import all_features_for_tier # noqa: E402
from datatools_license.schema import ( # noqa: E402
License as LicenseDataclass,
Tier,
_utcnow_iso,
default_expiry_iso,
)
def _generate_license_key(tier: str) -> str:
rid = uuid.uuid4().hex
return f"DT1-{tier.upper()}-{rid[:8]}-{rid[8:16]}"
def _iso_to_dt(iso: str) -> datetime:
return datetime.fromisoformat(iso.replace("Z", "+00:00"))
def mint_from_sale(session: Session, sale: SaleEvent) -> License:
"""Idempotently mint a license for *sale*.
If a row with the same ``(source, source_order_id)`` already
exists, return it untouched — Gumroad retrying a webhook does not
produce a second blob with a different signature. Manual mints
(``source_order_id is None``) skip the dedup check and always
produce a new row.
"""
if sale.source_order_id is not None:
existing = session.execute(
select(License).where(
License.source == sale.source,
License.source_order_id == sale.source_order_id,
)
).scalar_one_or_none()
if existing is not None:
return existing
tier_enum = Tier(sale.tier)
license_key = _generate_license_key(sale.tier)
issued_iso = _utcnow_iso()
expires_iso = default_expiry_iso(years=sale.years)
unsigned = LicenseDataclass(
name=sale.buyer_name,
email=sale.buyer_email,
license_key=license_key,
tier=tier_enum,
features=all_features_for_tier(tier_enum),
issued_at=issued_iso,
expires_at=expires_iso,
signature="",
)
signature = sign(unsigned.to_canonical_dict())
payload = unsigned.to_canonical_dict()
payload["signature"] = signature
blob = encode_blob(payload)
row = License(
license_key=license_key,
name=sale.buyer_name,
email=sale.buyer_email,
tier=sale.tier,
issued_at=_iso_to_dt(issued_iso),
expires_at=_iso_to_dt(expires_iso),
blob=blob,
source=sale.source,
source_order_id=sale.source_order_id,
promotion=sale.promotion,
amount_paid=sale.amount_paid,
currency=sale.currency,
notes=sale.notes,
)
session.add(row)
session.flush()
return row
def revoke_license(
session: Session,
*,
license_key: str,
reason: Optional[str] = None,
) -> Optional[License]:
row = session.get(License, license_key)
if row is None:
return None
row.revoked_at = datetime.now(timezone.utc)
if reason:
suffix = f"\nRevoked: {reason}"
row.notes = ((row.notes or "") + suffix).strip()
return row

91
server/app/models.py Normal file
View File

@@ -0,0 +1,91 @@
"""ORM models for the licenses + gumroad_events tables.
Schema mirrors ``docs/LICENSE-SERVER.md``, generalized so any
``source`` can populate it. The ``(source, source_order_id)``
composite uniqueness key gives idempotent webhook retries — a
storefront firing the same sale twice maps to the same row.
"""
from __future__ import annotations
from datetime import datetime
from typing import Optional
from sqlalchemy import (
JSON,
BigInteger,
DateTime,
Index,
Numeric,
String,
UniqueConstraint,
func,
text,
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
# JSONB on Postgres (indexable, queryable), plain JSON elsewhere
# (SQLite for tests). Same Python interface either way.
_JSON_TYPE = JSON().with_variant(JSONB(), "postgresql")
from app.db import Base
class License(Base):
__tablename__ = "licenses"
license_key: Mapped[str] = mapped_column(String, primary_key=True)
name: Mapped[str] = mapped_column(String, nullable=False)
email: Mapped[str] = mapped_column(String, nullable=False)
tier: Mapped[str] = mapped_column(String, nullable=False)
issued_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
blob: Mapped[str] = mapped_column(String, nullable=False)
source: Mapped[str] = mapped_column(String, nullable=False)
source_order_id: Mapped[Optional[str]] = mapped_column(String, nullable=True)
promotion: Mapped[Optional[str]] = mapped_column(String, nullable=True)
amount_paid: Mapped[Optional[float]] = mapped_column(Numeric(10, 2), nullable=True)
currency: Mapped[Optional[str]] = mapped_column(String(3), nullable=True, server_default=text("'USD'"))
revoked_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
notes: Mapped[Optional[str]] = mapped_column(String, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
onupdate=func.now(),
)
__table_args__ = (
UniqueConstraint("source", "source_order_id", name="uq_licenses_source_order"),
Index("ix_licenses_email_lower", func.lower(text("email"))),
Index("ix_licenses_expires_active", "expires_at", postgresql_where=text("revoked_at IS NULL")),
)
class GumroadEvent(Base):
"""Append-only audit log of every webhook delivery.
Stored regardless of processing outcome so we can replay failed
events, investigate disputes, and reconstruct the customer
record if the ``licenses`` table is ever corrupted.
"""
__tablename__ = "gumroad_events"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
received_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, server_default=func.now())
event_type: Mapped[str] = mapped_column(String, nullable=False)
order_id: Mapped[Optional[str]] = mapped_column(String, nullable=True)
raw_payload: Mapped[dict] = mapped_column(_JSON_TYPE, nullable=False)
processed: Mapped[bool] = mapped_column(server_default=text("false"), nullable=False)
error: Mapped[Optional[str]] = mapped_column(String, nullable=True)
__table_args__ = (
Index("ix_gumroad_events_order_id", "order_id"),
Index("ix_gumroad_events_unprocessed", "received_at", postgresql_where=text("processed = false")),
)

View File

View File

@@ -0,0 +1,103 @@
"""Internal (operator-only) routes.
Two defense layers protect this path:
1. **nginx** blocks ``/internal/*`` at the public server-block level
(``location /internal/ { return 404; }`` in
``docs/SETUP-LICENSE-SERVER.md``).
2. **Bearer token** authenticates the operator's CLI.
An earlier draft also enforced a peer-IP loopback check here, but
that fights the Docker bridge network model: the container always
sees the gateway IP (172.x.0.1) regardless of whether traffic
originated from nginx on the host or from outside. The check is
preserved as :func:`app.auth.require_localhost` for future use
(e.g. if the API ever runs in ``network_mode: host``).
"""
from __future__ import annotations
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.adapters.manual import ManualAdapter
from app.auth import require_bearer_token
from app.db import get_session
from app.mint import mint_from_sale, revoke_license
from app.models import License
from app.schemas import LicenseResponse, MintRequest, RevokeRequest
router = APIRouter(
prefix="/internal",
dependencies=[Depends(require_bearer_token)],
)
_MANUAL = ManualAdapter()
@router.post("/mint", response_model=LicenseResponse, status_code=status.HTTP_201_CREATED)
def mint(req: MintRequest, session: Session = Depends(get_session)) -> License:
"""Mint a license blob and persist the row.
PR 1 only wires the ``manual`` source through this endpoint. Real
storefront sales (Gumroad et al.) arrive via per-source webhook
handlers in PR 2 and bypass this route entirely.
"""
if req.source != "manual":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Source {req.source!r} is not wired for direct mints. "
"Storefront sales arrive via /webhooks/* (PR 2)."
),
)
sale = _MANUAL.build_sale(
name=req.name,
email=req.email,
tier=req.tier.value,
years=req.years,
promotion=req.promotion,
amount_paid=req.amount_paid,
currency=req.currency,
notes=req.notes,
)
return mint_from_sale(session, sale)
@router.post("/revoke", response_model=LicenseResponse)
def revoke(req: RevokeRequest, session: Session = Depends(get_session)) -> License:
row = revoke_license(session, license_key=req.license_key, reason=req.reason)
if row is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="License not found")
return row
@router.get("/licenses", response_model=list[LicenseResponse])
def list_licenses(
email: Optional[str] = Query(default=None, description="Case-insensitive email substring."),
tier: Optional[str] = Query(default=None),
source: Optional[str] = Query(default=None),
include_revoked: bool = Query(default=False),
limit: int = Query(default=50, ge=1, le=500),
offset: int = Query(default=0, ge=0),
session: Session = Depends(get_session),
) -> list[License]:
stmt = select(License).order_by(License.created_at.desc()).limit(limit).offset(offset)
if email:
stmt = stmt.where(func.lower(License.email).contains(email.lower()))
if tier:
stmt = stmt.where(License.tier == tier)
if source:
stmt = stmt.where(License.source == source)
if not include_revoked:
stmt = stmt.where(License.revoked_at.is_(None))
return list(session.execute(stmt).scalars().all())
@router.get("/ping")
def ping() -> dict:
"""Sanity-check both guards from inside an SSH tunnel."""
return {"ok": True}

View File

@@ -0,0 +1,27 @@
"""Public (internet-facing) routes.
For PR 1: only ``/health``. The webhook receiver and renewal portal
land in PR 2 and PR 3 respectively.
"""
from __future__ import annotations
from fastapi import APIRouter, Depends
from sqlalchemy import text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from app.db import get_session
router = APIRouter()
@router.get("/health")
def health(session: Session = Depends(get_session)) -> dict:
"""Liveness + DB reachability. Cheap; safe to hit on a tight cadence."""
db_ok = True
try:
session.execute(text("SELECT 1"))
except SQLAlchemyError:
db_ok = False
return {"status": "ok" if db_ok else "degraded", "db": "ok" if db_ok else "error"}

54
server/app/schemas.py Normal file
View File

@@ -0,0 +1,54 @@
"""Pydantic request/response models for the HTTP layer."""
from __future__ import annotations
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import Optional
from pydantic import BaseModel, ConfigDict, EmailStr, Field
class TierName(str, Enum):
lite = "lite"
core = "core"
pro = "pro"
enterprise = "enterprise"
class MintRequest(BaseModel):
name: str = Field(min_length=1, max_length=200)
email: EmailStr
tier: TierName
years: int = Field(default=1, ge=1, le=10)
source: str = Field(default="manual", min_length=1, max_length=40)
source_order_id: Optional[str] = Field(default=None, max_length=120)
promotion: Optional[str] = Field(default=None, max_length=60)
amount_paid: Optional[Decimal] = Field(default=None, ge=0, decimal_places=2)
currency: Optional[str] = Field(default="USD", min_length=3, max_length=3)
notes: Optional[str] = Field(default=None, max_length=2000)
class RevokeRequest(BaseModel):
license_key: str = Field(min_length=1, max_length=120)
reason: Optional[str] = Field(default=None, max_length=500)
class LicenseResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)
license_key: str
name: str
email: str
tier: str
issued_at: datetime
expires_at: datetime
blob: str
source: str
source_order_id: Optional[str]
promotion: Optional[str]
amount_paid: Optional[Decimal]
currency: Optional[str]
revoked_at: Optional[datetime]
notes: Optional[str]