"""Core mint + revoke logic. Bridges the source-adapter layer (:mod:`app.adapters`) to the DB layer (:mod:`app.models`), reusing the desktop app's signing / encoding primitives from ``datatools_license.crypto`` so blobs minted here verify against the same embedded pubkey on the buyer's machine. """ from __future__ import annotations import os import uuid from datetime import datetime, timezone from typing import Optional from sqlalchemy import select from sqlalchemy.orm import Session from app.adapters.base import SaleEvent from app.config import get_settings from app.models import License def _init_key_env() -> None: """Resolve secret-file pointers into env vars before importing crypto. ``datatools_license.crypto`` looks for ``DATATOOLS_LICENSE_PRIVKEY`` / ``DATATOOLS_LICENSE_PUBKEY`` in ``os.environ``. When those come from secret files (``*_FILE`` env vars), we read them once at module import and stash so crypto can pick them up without changes. """ settings = get_settings() priv = settings.resolve_license_privkey() if priv: os.environ.setdefault("DATATOOLS_LICENSE_PRIVKEY", priv) pub = settings.license_pubkey_hex if pub: os.environ.setdefault("DATATOOLS_LICENSE_PUBKEY", pub) _init_key_env() # Imported after env init so the crypto module reads the correct key. from datatools_license.crypto import encode_blob, sign # noqa: E402 from datatools_license.features import all_features_for_tier # noqa: E402 from datatools_license.schema import ( # noqa: E402 License as LicenseDataclass, Tier, _utcnow_iso, default_expiry_iso, ) def _generate_license_key(tier: str) -> str: rid = uuid.uuid4().hex return f"DT1-{tier.upper()}-{rid[:8]}-{rid[8:16]}" def _iso_to_dt(iso: str) -> datetime: return datetime.fromisoformat(iso.replace("Z", "+00:00")) def mint_from_sale(session: Session, sale: SaleEvent) -> License: """Idempotently mint a license for *sale*. If a row with the same ``(source, source_order_id)`` already exists, return it untouched — Gumroad retrying a webhook does not produce a second blob with a different signature. Manual mints (``source_order_id is None``) skip the dedup check and always produce a new row. """ if sale.source_order_id is not None: existing = session.execute( select(License).where( License.source == sale.source, License.source_order_id == sale.source_order_id, ) ).scalar_one_or_none() if existing is not None: return existing tier_enum = Tier(sale.tier) license_key = _generate_license_key(sale.tier) issued_iso = _utcnow_iso() expires_iso = default_expiry_iso(years=sale.years) unsigned = LicenseDataclass( name=sale.buyer_name, email=sale.buyer_email, license_key=license_key, tier=tier_enum, features=all_features_for_tier(tier_enum), issued_at=issued_iso, expires_at=expires_iso, signature="", ) signature = sign(unsigned.to_canonical_dict()) payload = unsigned.to_canonical_dict() payload["signature"] = signature blob = encode_blob(payload) row = License( license_key=license_key, name=sale.buyer_name, email=sale.buyer_email, tier=sale.tier, issued_at=_iso_to_dt(issued_iso), expires_at=_iso_to_dt(expires_iso), blob=blob, source=sale.source, source_order_id=sale.source_order_id, promotion=sale.promotion, amount_paid=sale.amount_paid, currency=sale.currency, notes=sale.notes, ) session.add(row) session.flush() return row def revoke_license( session: Session, *, license_key: str, reason: Optional[str] = None, ) -> Optional[License]: row = session.get(License, license_key) if row is None: return None row.revoked_at = datetime.now(timezone.utc) if reason: suffix = f"\nRevoked: {reason}" row.notes = ((row.notes or "") + suffix).strip() return row