feat(server): Gumroad webhook receiver + Postmark email (PR 2)

Wires the second source-adapter (Gumroad) plus the email delivery
that lets the server fulfill a sale end-to-end without operator
intervention.

Auth model: Gumroad doesn't HMAC the body, so we use their
recommended URL-secret pattern (?secret=...). Wrong/missing secret
returns 404 — no signal to a prober that the endpoint exists.

Webhook flow (server/app/routes/webhooks.py):
  1. audit-log the raw payload (gumroad_events row) BEFORE anything
     else, so a later failure leaves us replayable
  2. parse via GumroadAdapter (server/app/adapters/gumroad.py)
  3. mint_from_sale — UNIQUE(source, source_order_id) dedups
     duplicate webhook retries
  4. send the license email
  5. mark gumroad_events.processed = true

Always returns 200 once auth passes. Non-2xx would trigger Gumroad's
3-day retry storm; we'd rather record the failure on the audit row
and replay manually after fixing whatever surfaced.

Product → tier mapping is per-source YAML at
server/config/products.yaml (lru_cached). Adding a SKU = edit yaml,
restart api. Unmapped product_id is an error on the audit row, not
a crash.

EmailService (server/app/email.py): provider-agnostic interface with
Postmark as the first implementation. When POSTMARK_TOKEN is unset
the factory returns LoggingEmailService instead, so the webhook
exercises end-to-end before Postmark is provisioned.

48 unit tests (was 21) including:
- Gumroad secret verify with constant-time compare
- Sale parsing: amount-in-cents, name fallback from email,
  test=true tagging, missing-required fields, offer codes
- Product mapping lookups
- Email rendering text + HTML, HTML-escapes user input
- Postmark client via httpx.MockTransport (success and 4xx)
- Webhook end-to-end: secret check, audit log, idempotency on
  retry, unmapped product, email failure keeps license

Smoke test (server/scripts/smoke.sh) extended to POST a synthetic
Ping payload, verify the row + audit log, prove wrong-secret is
rejected, prove duplicate sale_id stays one row.

SQLite-test compatibility:
- BigInteger primary key uses with_variant(Integer, "sqlite") since
  SQLite only autoincrements INTEGER PRIMARY KEY.
- python-multipart pulled in for FastAPI Form parsing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-14 01:33:43 +00:00
parent b5cd74d474
commit 2bbaba954b
15 changed files with 1066 additions and 2 deletions

View File

@@ -0,0 +1,173 @@
"""Gumroad adapter.
Receives "Ping" notifications from Gumroad — form-encoded POSTs sent
when a sale occurs. Gumroad's Ping URL is configured in the seller
dashboard (Settings → Advanced → Ping URL).
Authentication
--------------
Gumroad does not HMAC-sign the body. Their recommended pattern is
to put a secret in the URL itself::
https://licenses.datatools.unalogix.com/webhooks/gumroad?secret=...
The webhook receiver pulls the secret from the query string and
:meth:`GumroadAdapter.verify_webhook` constant-time-compares it
against the configured value. If they don't match, the request is
dropped with 404 (so a probing attacker can't tell whether the
endpoint exists, much less that it's the wrong secret).
The "test" field
----------------
Gumroad sends ``test=true`` on test pings fired from the dashboard.
We treat test pings as real sales (they create licenses just like
production sales), but tag them with ``notes='gumroad test ping'``
so the operator can filter / delete them later. Refusing test pings
would block the standard "Send Test Ping" verification flow.
Refunds, disputes, cancellations
--------------------------------
Stubbed for now (``parse_refund`` returns None). Gumroad doesn't
include refund signals in the standard sale Ping — refunds arrive
via the separate "Resource subscriptions" mechanism. Wiring that
in is PR 2.1; until then, refunds are handled by the operator
running ``datatools-admin revoke``.
"""
from __future__ import annotations
import hmac
from decimal import Decimal
from typing import Any, Optional
from app.adapters.base import RefundEvent, SaleEvent
from app.products import lookup as product_lookup
class GumroadAdapter:
source_name = "gumroad"
def __init__(self, secret: Optional[str]) -> None:
self._secret = secret
# --- Auth ----------------------------------------------------------------
def verify_webhook(self, *, body: bytes, headers: dict[str, str]) -> bool:
"""Not used — Gumroad authentication is via URL query param,
which only the route handler has direct access to. Call
:meth:`verify_secret` instead."""
return False
def verify_secret(self, presented: Optional[str]) -> bool:
"""Constant-time compare against the configured secret.
Returns False (not an exception) so the route handler can
decide the response code — we return 404 to avoid signaling
endpoint existence to an unauthenticated prober.
"""
if not self._secret or not presented:
return False
return hmac.compare_digest(presented, self._secret)
# --- Parsing -------------------------------------------------------------
def parse_sale(self, payload: dict[str, Any]) -> Optional[SaleEvent]:
"""Parse a Gumroad Ping form-encoded payload into a SaleEvent.
Returns None if the payload isn't a sale (e.g. some future
event type we don't yet handle). Returns None *with no row
side-effect* if the product_id is unmapped — the caller
should treat that as an error and record it in the audit
row, not silently drop.
"""
# Sale pings always include sale_id (the order ID) and email.
sale_id = payload.get("sale_id")
email = payload.get("email")
product_id = (
payload.get("product_id")
or payload.get("product_permalink")
or payload.get("permalink")
)
if not (sale_id and email and product_id):
return None
mapping = product_lookup(self.source_name, str(product_id))
if mapping is None:
# Unmapped — surface to caller as a SaleEvent with no tier.
# We deliberately don't raise here so the caller can
# log it to gumroad_events with error info and still
# return 200 (no Gumroad retry storm).
raise UnmappedProductError(
f"Gumroad product_id {product_id!r} has no entry in "
"config/products.yaml. Add a mapping and replay this "
f"sale (sale_id={sale_id})."
)
name = (payload.get("full_name") or "").strip() or _email_local(email)
price_cents = _to_int(payload.get("price"))
amount_paid = Decimal(price_cents) / Decimal(100) if price_cents is not None else None
currency = (payload.get("currency") or "USD").upper()
promotion = (payload.get("offer_code") or "").strip() or None
notes = None
if _is_truthy(payload.get("test")):
notes = "gumroad test ping"
return SaleEvent(
source=self.source_name,
source_order_id=str(sale_id),
buyer_name=name,
buyer_email=email.strip(),
tier=mapping.tier,
years=mapping.years,
promotion=promotion,
amount_paid=amount_paid,
currency=currency,
notes=notes,
raw_payload=dict(payload),
)
def parse_refund(self, payload: dict[str, Any]) -> Optional[RefundEvent]:
# PR 2.1.
return None
class UnmappedProductError(ValueError):
"""Raised when a sale arrives for a product not in products.yaml.
Caller catches and logs into ``gumroad_events.error`` so the
operator can fix the mapping and replay.
"""
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _email_local(email: str) -> str:
"""Fallback display name when ``full_name`` is missing — the part
of the email before the ``@``, capitalized. Better than 'Unknown'
for support tickets and the buyer's own delivery email."""
local = email.split("@", 1)[0]
return local.replace(".", " ").title()
def _to_int(v: Any) -> Optional[int]:
if v is None or v == "":
return None
try:
return int(v)
except (TypeError, ValueError):
return None
def _is_truthy(v: Any) -> bool:
if isinstance(v, bool):
return v
if v is None:
return False
return str(v).strip().lower() in {"1", "true", "yes", "on"}

214
server/app/email.py Normal file
View File

@@ -0,0 +1,214 @@
"""Transactional email delivery.
Provider: Postmark. Picked for its transactional-deliverability
reputation and a tiny, no-SDK-needed HTTP API.
Configuration
-------------
- ``POSTMARK_TOKEN`` / ``POSTMARK_TOKEN_FILE`` — server API token.
- ``EMAIL_FROM`` — verified sender address (default
``licenses@datatools.unalogix.com``).
- ``EMAIL_REPLY_TO`` — optional Reply-To (default same as From).
When ``POSTMARK_TOKEN`` is unset the service falls back to
:class:`LoggingEmailService`, which prints the email to stdout
instead of sending. Lets the webhook handler exercise the full
flow before the Postmark account is provisioned.
"""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass
from typing import Optional, Protocol
import httpx
from app.config import get_settings
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class LicenseEmail:
"""Inputs the renderer needs from the caller."""
to_name: str
to_email: str
tier: str
license_key: str
expires_at_iso: str
blob: str
class EmailService(Protocol):
"""Provider-agnostic email surface — keeps Postmark out of the
callers' import graph."""
def send_license(self, msg: LicenseEmail) -> str:
"""Deliver the license-delivery email. Returns a provider
message id (or ``"logged"`` for the dev fallback) so the
caller can record it on the licenses row for audit."""
...
class LoggingEmailService:
"""Stand-in when no real provider is configured. Logs the
rendered message body at INFO so it shows up in ``docker compose
logs api`` — useful during local dev and during the deploy
window before Postmark is wired up."""
def send_license(self, msg: LicenseEmail) -> str:
body = _render_text(msg)
log.info(
"[email-stub] would send to=%s subject=%r\n%s",
msg.to_email,
_subject(msg),
body,
)
return "logged"
class PostmarkEmailService:
"""Postmark transactional API client.
Single endpoint, ~3 fields, no SDK needed. We use a per-call
httpx Client with a tight timeout — webhook handlers run on
the request thread and we never want to block them on a flaky
upstream.
"""
API_URL = "https://api.postmarkapp.com/email"
TIMEOUT_S = 8.0
def __init__(
self,
token: str,
*,
sender: str,
reply_to: Optional[str] = None,
message_stream: str = "outbound",
) -> None:
self._token = token
self._sender = sender
self._reply_to = reply_to or sender
self._stream = message_stream
def send_license(self, msg: LicenseEmail) -> str:
body_text = _render_text(msg)
body_html = _render_html(msg)
payload = {
"From": self._sender,
"To": _rfc_addr(msg.to_name, msg.to_email),
"ReplyTo": self._reply_to,
"Subject": _subject(msg),
"TextBody": body_text,
"HtmlBody": body_html,
"MessageStream": self._stream,
}
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"X-Postmark-Server-Token": self._token,
}
with httpx.Client(timeout=self.TIMEOUT_S) as c:
r = c.post(self.API_URL, json=payload, headers=headers)
if r.status_code >= 400:
raise EmailDeliveryError(
f"Postmark rejected the request: HTTP {r.status_code} "
f"body={r.text[:300]!r}"
)
return str(r.json().get("MessageID", ""))
class EmailDeliveryError(RuntimeError):
"""Provider returned a non-2xx. Caller should record this on the
audit row so the operator can replay after fixing the provider
config (verified sender domain, paid plan, etc.)."""
# ---------------------------------------------------------------------------
# Factory
# ---------------------------------------------------------------------------
def get_email_service() -> EmailService:
"""Choose the real provider if a token is configured, else the
logger. Reads settings fresh — tests can flip env vars between
sends without restarting."""
settings = get_settings()
token = settings.resolve_postmark_token()
if not token:
return LoggingEmailService()
sender = os.environ.get("EMAIL_FROM", "licenses@datatools.unalogix.com")
reply_to = os.environ.get("EMAIL_REPLY_TO")
return PostmarkEmailService(token, sender=sender, reply_to=reply_to)
# ---------------------------------------------------------------------------
# Rendering
# ---------------------------------------------------------------------------
def _subject(msg: LicenseEmail) -> str:
return f"Your DataTools license ({msg.tier})"
def _render_text(msg: LicenseEmail) -> str:
return (
f"Hi {msg.to_name},\n\n"
f"Thanks for your DataTools purchase. Your license is below.\n\n"
f"License key: {msg.license_key}\n"
f"Tier: {msg.tier}\n"
f"Expires: {msg.expires_at_iso[:10]}\n\n"
f"To activate, paste the full blob (starting with DTLIC2:) into\n"
f"the Activate screen, or run:\n\n"
f" python -m src.license_cli activate \"{msg.blob}\" \\\n"
f" --name \"{msg.to_name}\" --email {msg.to_email}\n\n"
f"Your blob:\n\n"
f"{msg.blob}\n\n"
f"Keep this email — you'll need the blob if you move to a new\n"
f"computer. Questions: reply to this email.\n\n"
f"— DataTools\n"
)
def _render_html(msg: LicenseEmail) -> str:
return (
"<!doctype html><html><body style=\"font-family:system-ui,sans-serif;"
"max-width:560px;margin:auto;padding:24px;color:#222;\">"
f"<p>Hi {_html_escape(msg.to_name)},</p>"
"<p>Thanks for your DataTools purchase. Your license is below.</p>"
"<table cellpadding=\"4\" style=\"border-collapse:collapse;\">"
f"<tr><td><b>License key</b></td><td><code>{_html_escape(msg.license_key)}</code></td></tr>"
f"<tr><td><b>Tier</b></td><td>{_html_escape(msg.tier)}</td></tr>"
f"<tr><td><b>Expires</b></td><td>{_html_escape(msg.expires_at_iso[:10])}</td></tr>"
"</table>"
"<p>To activate, paste the blob below into the <em>Activate</em> "
"screen on first launch.</p>"
"<pre style=\"background:#f4f4f4;padding:12px;border-radius:6px;"
"white-space:pre-wrap;word-break:break-all;font-size:11px;\">"
f"{_html_escape(msg.blob)}</pre>"
"<p style=\"color:#666;font-size:13px;\">Keep this email — you'll "
"need the blob if you move to a new computer. Questions: just reply.</p>"
"<p>— DataTools</p></body></html>"
)
def _rfc_addr(name: str, email: str) -> str:
# Postmark accepts "Name <addr>" or just "addr". Quote names with
# special chars; otherwise keep it readable in the inbox.
if not name or "@" in name:
return email
if any(c in name for c in ',<>"'):
name = name.replace('"', "").replace(",", "")
return f"{name} <{email}>"
def _html_escape(s: str) -> str:
return (
s.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
)

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
from fastapi import FastAPI
from app.routes import internal, public
from app.routes import internal, public, webhooks
app = FastAPI(
title="DataTools License Server",
@@ -16,3 +16,4 @@ app = FastAPI(
app.include_router(public.router)
app.include_router(internal.router)
app.include_router(webhooks.router)

View File

@@ -16,6 +16,7 @@ from sqlalchemy import (
BigInteger,
DateTime,
Index,
Integer,
Numeric,
String,
UniqueConstraint,
@@ -29,6 +30,11 @@ from sqlalchemy.orm import Mapped, mapped_column
# (SQLite for tests). Same Python interface either way.
_JSON_TYPE = JSON().with_variant(JSONB(), "postgresql")
# SQLite only auto-increments INTEGER PRIMARY KEY (not BIGINT).
# Postgres can autoincrement either, so the variant keeps the
# production migration on BigInteger while tests use Integer.
_PK_TYPE = BigInteger().with_variant(Integer(), "sqlite")
from app.db import Base
@@ -77,7 +83,7 @@ class GumroadEvent(Base):
__tablename__ = "gumroad_events"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
id: Mapped[int] = mapped_column(_PK_TYPE, primary_key=True, autoincrement=True)
received_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, server_default=func.now())
event_type: Mapped[str] = mapped_column(String, nullable=False)
order_id: Mapped[Optional[str]] = mapped_column(String, nullable=True)

71
server/app/products.py Normal file
View File

@@ -0,0 +1,71 @@
"""Storefront product → license tier mapping.
The mapping lives in ``server/config/products.yaml`` (gitignored
for secrets it isn't — it's a routine catalog file) so adding a
new SKU is one yaml edit plus a container restart. The lookup is
``(source, product_id) -> (tier, years)``.
Cached at module import. The runtime cost of reloading on every
webhook would be trivial, but caching keeps the hot path
allocation-free and makes the "edit yaml, restart api" idiom
explicit — operators always know exactly when their changes go
live.
"""
from __future__ import annotations
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Optional
import yaml
@dataclass(frozen=True)
class ProductMapping:
tier: str
years: int
def _config_path() -> Path:
"""Resolve the products config.
Container layout puts the config at ``/app/config/products.yaml``
(the Dockerfile COPYs ``server/config`` to ``/app/config``).
For local pytest runs we walk up from this file to ``server/``.
"""
in_container = Path("/app/config/products.yaml")
if in_container.exists():
return in_container
return Path(__file__).resolve().parent.parent / "config" / "products.yaml"
@lru_cache(maxsize=1)
def _table() -> dict[tuple[str, str], ProductMapping]:
raw = yaml.safe_load(_config_path().read_text(encoding="utf-8")) or {}
table: dict[tuple[str, str], ProductMapping] = {}
for source, entries in raw.items():
for entry in entries or []:
key = (source, str(entry["product_id"]))
table[key] = ProductMapping(
tier=entry["tier"],
years=int(entry.get("years", 1)),
)
return table
def lookup(source: str, product_id: str) -> Optional[ProductMapping]:
"""Return the mapping for *(source, product_id)*, or None if unmapped.
Returning None (rather than raising) lets the webhook layer
decide whether to surface the failure as an audit row vs a
user-visible error — we want unmapped sales to be logged, not
to crash the handler and trigger Gumroad retry storms.
"""
return _table().get((source, product_id))
def reload_for_tests() -> None:
"""Drop the cache. Tests that mutate the yaml call this."""
_table.cache_clear()

View File

@@ -0,0 +1,121 @@
"""Storefront webhook receivers.
PR 2 wires Gumroad. Future storefronts each get their own route
(``/webhooks/lemonsqueezy``, ``/webhooks/stripe``, ...). All share
the same downstream flow: audit-log the raw payload, parse via
adapter, mint, send email, mark processed.
Handler contract
----------------
We **always** return 200 once a request authenticates, even on
downstream failures. Gumroad retries non-2xx for ~3 days, which
would turn a single broken sale into hours of duplicate webhook
storms. Our idempotency keys (``UNIQUE(source, source_order_id)``)
make at-least-once handling safe; the storefront retries on
network errors only.
When something downstream fails (unmapped product, DB error, email
failure), we record the cause in ``gumroad_events.error`` so the
operator can fix and replay.
Unauthenticated requests return 404 — we don't want to signal
endpoint existence or "wrong secret" to a prober.
"""
from __future__ import annotations
import logging
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy.orm import Session
from app.adapters.gumroad import GumroadAdapter, UnmappedProductError
from app.config import get_settings
from app.db import get_session
from app.email import EmailDeliveryError, LicenseEmail, get_email_service
from app.mint import mint_from_sale
from app.models import GumroadEvent
router = APIRouter(prefix="/webhooks")
log = logging.getLogger(__name__)
def _gumroad_adapter() -> GumroadAdapter:
settings = get_settings()
return GumroadAdapter(secret=settings.resolve_gumroad_secret())
@router.post("/gumroad", status_code=200)
async def gumroad(
request: Request,
secret: Optional[str] = Query(default=None),
session: Session = Depends(get_session),
) -> dict:
adapter = _gumroad_adapter()
if not adapter.verify_secret(secret):
# 404 — no information leak about endpoint existence.
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Not found")
# Gumroad's Ping is form-encoded; FastAPI doesn't auto-parse
# without a Form() dependency, and we want the raw map for the
# audit log regardless of schema.
raw_form = await request.form()
payload = {k: str(v) for k, v in raw_form.items()}
# Audit row FIRST — any later failure leaves us a replayable record.
event = GumroadEvent(
event_type="sale",
order_id=payload.get("sale_id"),
raw_payload=payload,
)
session.add(event)
session.flush()
try:
sale = adapter.parse_sale(payload)
except UnmappedProductError as e:
event.error = str(e)
log.warning("Gumroad sale with unmapped product: %s", e)
return {"status": "logged-no-mint", "reason": "unmapped_product"}
except Exception as e: # pragma: no cover — defensive
event.error = f"parse error: {e!r}"
log.exception("Gumroad parse failure")
return {"status": "logged-no-mint", "reason": "parse_error"}
if sale is None:
event.error = "payload did not parse as a sale"
return {"status": "logged-no-mint", "reason": "not_a_sale"}
try:
row = mint_from_sale(session, sale)
session.flush()
except Exception as e: # pragma: no cover — defensive
event.error = f"mint error: {e!r}"
log.exception("mint_from_sale failed")
return {"status": "logged-no-mint", "reason": "mint_error"}
try:
get_email_service().send_license(
LicenseEmail(
to_name=row.name,
to_email=row.email,
tier=row.tier,
license_key=row.license_key,
expires_at_iso=row.expires_at.isoformat(),
blob=row.blob,
)
)
except EmailDeliveryError as e:
event.error = f"email error: {e}"
log.warning("Email delivery failed (license already minted): %s", e)
# The buyer can still be served from the DB via the renewal
# portal in PR 3 / a manual resend, so we don't fail the
# webhook over an email hiccup.
event.processed = True
return {"status": "minted-email-failed", "license_key": row.license_key}
event.processed = True
return {"status": "ok", "license_key": row.license_key}