feat(license): datatools-admin CLI for the mint API

New operator CLI at src/admin_cli.py: mint, list, revoke, ping —
talks to the server's /internal/* endpoints over a local SSH tunnel.
Stdlib-only on the desktop side (urllib + typer), no new top-level
deps. Auth via $DATATOOLS_ADMIN_TOKEN.

scripts/generate_license.py is now annotated as a break-glass tool
for when the server is unreachable — routine work goes through the
new CLI so the authoritative `licenses` row is created.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-14 00:47:01 +00:00
parent bab2c9468c
commit 673b902377
2 changed files with 206 additions and 1 deletions

View File

@@ -1,5 +1,17 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Mint a signed license blob for a buyer. """Mint a signed license blob for a buyer (LOCAL, break-glass).
.. warning::
This script mints **locally**, without going through the license
server. Prefer :mod:`src.admin_cli` (``datatools-admin mint``)
for routine work — it writes to the authoritative ``licenses``
Postgres table and emits the same blob.
Reach for this script only when the server is unreachable and a
buyer needs a license *right now*. Mints from here land in the
local issuance JSONL log; you'll need to reconcile them into the
server's DB afterwards.
Creator-only tool. Signs with the Ed25519 private key from Creator-only tool. Signs with the Ed25519 private key from
``$DATATOOLS_LICENSE_PRIVKEY`` (production) or the in-tree dev key ``$DATATOOLS_LICENSE_PRIVKEY`` (production) or the in-tree dev key

193
src/admin_cli.py Normal file
View File

@@ -0,0 +1,193 @@
"""Operator CLI for the DataTools License Server.
Talks to the server's ``/internal/*`` endpoints over a local SSH
tunnel. Default base URL is ``http://127.0.0.1:8090`` — open the
tunnel with::
ssh -L 8090:127.0.0.1:8090 michael@46.225.166.142 -N
Auth: ``$DATATOOLS_ADMIN_TOKEN`` (matching the token in
``/srv/datatools-license/secrets/admin_token`` on the server).
Commands::
datatools-admin ping
datatools-admin mint --name "Jane Doe" --email jane@example.com --tier core
datatools-admin list [--email STR] [--tier STR] [--source STR] [--include-revoked]
datatools-admin revoke DT1-CORE-xxxx-yyyy --reason "refund"
This supersedes ``scripts/generate_license.py``, which now exists
only as a break-glass tool for when the server is unreachable.
"""
from __future__ import annotations
import json
import os
import sys
from typing import Any, Optional
import typer
from urllib import request as urlrequest
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
app = typer.Typer(no_args_is_help=True, add_completion=False)
# ---------------------------------------------------------------------------
# Transport
# ---------------------------------------------------------------------------
def _base_url() -> str:
return os.environ.get("DATATOOLS_ADMIN_URL", "http://127.0.0.1:8090").rstrip("/")
def _token() -> str:
tok = os.environ.get("DATATOOLS_ADMIN_TOKEN")
if not tok:
typer.echo(
"ERROR: DATATOOLS_ADMIN_TOKEN is not set. Read the token from\n"
" /srv/datatools-license/secrets/admin_token on the server\n"
" and export it locally.",
err=True,
)
raise typer.Exit(code=2)
return tok
def _request(method: str, path: str, *, body: Optional[dict] = None, query: Optional[dict] = None) -> Any:
url = _base_url() + path
if query:
url += "?" + urlencode({k: v for k, v in query.items() if v is not None})
data = json.dumps(body).encode("utf-8") if body is not None else None
req = urlrequest.Request(
url,
data=data,
method=method,
headers={
"Authorization": f"Bearer {_token()}",
"Content-Type": "application/json" if data else "application/octet-stream",
"Accept": "application/json",
},
)
try:
with urlrequest.urlopen(req, timeout=30) as resp:
raw = resp.read()
return json.loads(raw) if raw else None
except HTTPError as e:
try:
err_body = json.loads(e.read())
except Exception:
err_body = {"detail": e.reason}
typer.echo(f"ERROR {e.code}: {err_body.get('detail', err_body)}", err=True)
raise typer.Exit(code=1)
except URLError as e:
typer.echo(
f"ERROR: could not reach {url}: {e.reason}\n"
" Is the SSH tunnel open? See the docstring.",
err=True,
)
raise typer.Exit(code=1)
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
@app.command()
def ping() -> None:
"""Sanity-check the tunnel + token."""
result = _request("GET", "/internal/ping")
typer.echo(json.dumps(result, indent=2))
@app.command()
def mint(
name: str = typer.Option(..., help="Buyer's full name."),
email: str = typer.Option(..., help="Buyer's email."),
tier: str = typer.Option(..., help="lite | core | pro | enterprise."),
years: int = typer.Option(1, help="License lifetime in years."),
source: str = typer.Option("manual", help="Origin: manual / gumroad / ..."),
promotion: Optional[str] = typer.Option(None),
amount_paid: Optional[str] = typer.Option(None, "--amount-paid", help="e.g. 79.00"),
currency: str = typer.Option("USD"),
notes: Optional[str] = typer.Option(None),
) -> None:
"""Mint a license + persist a DB row + return the blob.
PR 1 only accepts ``source=manual`` here. Storefront sales arrive
via webhook in PR 2.
"""
body = {
"name": name,
"email": email,
"tier": tier,
"years": years,
"source": source,
"promotion": promotion,
"amount_paid": amount_paid,
"currency": currency,
"notes": notes,
}
result = _request("POST", "/internal/mint", body=body)
typer.echo(result["blob"])
typer.echo(
f" key: {result['license_key']}\n"
f" tier: {result['tier']}\n"
f" expires: {result['expires_at']}",
err=True,
)
@app.command(name="list")
def list_cmd(
email: Optional[str] = typer.Option(None),
tier: Optional[str] = typer.Option(None),
source: Optional[str] = typer.Option(None),
include_revoked: bool = typer.Option(False, "--include-revoked"),
limit: int = typer.Option(50),
offset: int = typer.Option(0),
) -> None:
"""List licenses. Filters AND together; default excludes revoked."""
rows = _request(
"GET",
"/internal/licenses",
query={
"email": email,
"tier": tier,
"source": source,
"include_revoked": "true" if include_revoked else "false",
"limit": limit,
"offset": offset,
},
)
if not rows:
typer.echo("(no rows)")
return
for r in rows:
revoked = " [REVOKED]" if r.get("revoked_at") else ""
typer.echo(
f"{r['license_key']:36} {r['tier']:10} {r['email']:40} "
f"exp {r['expires_at'][:10]} src={r['source']}{revoked}"
)
@app.command()
def revoke(
license_key: str = typer.Argument(..., help="DT1-... key from `list`."),
reason: Optional[str] = typer.Option(None, help="Stored in notes for audit."),
) -> None:
"""Mark a license revoked. Desktop activations stay working until
expiry (no online check); revocation gates future renewals only."""
result = _request(
"POST",
"/internal/revoke",
body={"license_key": license_key, "reason": reason},
)
typer.echo(f"Revoked {result['license_key']} at {result['revoked_at']}")
if __name__ == "__main__":
app()