From 673b9023777a0f31ae2d6619c639b7f5e967b7b8 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 14 May 2026 00:47:01 +0000 Subject: [PATCH] feat(license): datatools-admin CLI for the mint API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New operator CLI at src/admin_cli.py: mint, list, revoke, ping — talks to the server's /internal/* endpoints over a local SSH tunnel. Stdlib-only on the desktop side (urllib + typer), no new top-level deps. Auth via $DATATOOLS_ADMIN_TOKEN. scripts/generate_license.py is now annotated as a break-glass tool for when the server is unreachable — routine work goes through the new CLI so the authoritative `licenses` row is created. Co-Authored-By: Claude Opus 4.7 (1M context) --- scripts/generate_license.py | 14 ++- src/admin_cli.py | 193 ++++++++++++++++++++++++++++++++++++ 2 files changed, 206 insertions(+), 1 deletion(-) create mode 100644 src/admin_cli.py diff --git a/scripts/generate_license.py b/scripts/generate_license.py index 057bf14..6728dfa 100644 --- a/scripts/generate_license.py +++ b/scripts/generate_license.py @@ -1,5 +1,17 @@ #!/usr/bin/env python3 -"""Mint a signed license blob for a buyer. +"""Mint a signed license blob for a buyer (LOCAL, break-glass). + +.. warning:: + + This script mints **locally**, without going through the license + server. Prefer :mod:`src.admin_cli` (``datatools-admin mint``) + for routine work — it writes to the authoritative ``licenses`` + Postgres table and emits the same blob. + + Reach for this script only when the server is unreachable and a + buyer needs a license *right now*. Mints from here land in the + local issuance JSONL log; you'll need to reconcile them into the + server's DB afterwards. Creator-only tool. Signs with the Ed25519 private key from ``$DATATOOLS_LICENSE_PRIVKEY`` (production) or the in-tree dev key diff --git a/src/admin_cli.py b/src/admin_cli.py new file mode 100644 index 0000000..8ebe3ec --- /dev/null +++ b/src/admin_cli.py @@ -0,0 +1,193 @@ +"""Operator CLI for the DataTools License Server. + +Talks to the server's ``/internal/*`` endpoints over a local SSH +tunnel. Default base URL is ``http://127.0.0.1:8090`` — open the +tunnel with:: + + ssh -L 8090:127.0.0.1:8090 michael@46.225.166.142 -N + +Auth: ``$DATATOOLS_ADMIN_TOKEN`` (matching the token in +``/srv/datatools-license/secrets/admin_token`` on the server). + +Commands:: + + datatools-admin ping + datatools-admin mint --name "Jane Doe" --email jane@example.com --tier core + datatools-admin list [--email STR] [--tier STR] [--source STR] [--include-revoked] + datatools-admin revoke DT1-CORE-xxxx-yyyy --reason "refund" + +This supersedes ``scripts/generate_license.py``, which now exists +only as a break-glass tool for when the server is unreachable. +""" + +from __future__ import annotations + +import json +import os +import sys +from typing import Any, Optional + +import typer +from urllib import request as urlrequest +from urllib.error import HTTPError, URLError +from urllib.parse import urlencode + + +app = typer.Typer(no_args_is_help=True, add_completion=False) + + +# --------------------------------------------------------------------------- +# Transport +# --------------------------------------------------------------------------- + +def _base_url() -> str: + return os.environ.get("DATATOOLS_ADMIN_URL", "http://127.0.0.1:8090").rstrip("/") + + +def _token() -> str: + tok = os.environ.get("DATATOOLS_ADMIN_TOKEN") + if not tok: + typer.echo( + "ERROR: DATATOOLS_ADMIN_TOKEN is not set. Read the token from\n" + " /srv/datatools-license/secrets/admin_token on the server\n" + " and export it locally.", + err=True, + ) + raise typer.Exit(code=2) + return tok + + +def _request(method: str, path: str, *, body: Optional[dict] = None, query: Optional[dict] = None) -> Any: + url = _base_url() + path + if query: + url += "?" + urlencode({k: v for k, v in query.items() if v is not None}) + data = json.dumps(body).encode("utf-8") if body is not None else None + req = urlrequest.Request( + url, + data=data, + method=method, + headers={ + "Authorization": f"Bearer {_token()}", + "Content-Type": "application/json" if data else "application/octet-stream", + "Accept": "application/json", + }, + ) + try: + with urlrequest.urlopen(req, timeout=30) as resp: + raw = resp.read() + return json.loads(raw) if raw else None + except HTTPError as e: + try: + err_body = json.loads(e.read()) + except Exception: + err_body = {"detail": e.reason} + typer.echo(f"ERROR {e.code}: {err_body.get('detail', err_body)}", err=True) + raise typer.Exit(code=1) + except URLError as e: + typer.echo( + f"ERROR: could not reach {url}: {e.reason}\n" + " Is the SSH tunnel open? See the docstring.", + err=True, + ) + raise typer.Exit(code=1) + + +# --------------------------------------------------------------------------- +# Commands +# --------------------------------------------------------------------------- + +@app.command() +def ping() -> None: + """Sanity-check the tunnel + token.""" + result = _request("GET", "/internal/ping") + typer.echo(json.dumps(result, indent=2)) + + +@app.command() +def mint( + name: str = typer.Option(..., help="Buyer's full name."), + email: str = typer.Option(..., help="Buyer's email."), + tier: str = typer.Option(..., help="lite | core | pro | enterprise."), + years: int = typer.Option(1, help="License lifetime in years."), + source: str = typer.Option("manual", help="Origin: manual / gumroad / ..."), + promotion: Optional[str] = typer.Option(None), + amount_paid: Optional[str] = typer.Option(None, "--amount-paid", help="e.g. 79.00"), + currency: str = typer.Option("USD"), + notes: Optional[str] = typer.Option(None), +) -> None: + """Mint a license + persist a DB row + return the blob. + + PR 1 only accepts ``source=manual`` here. Storefront sales arrive + via webhook in PR 2. + """ + body = { + "name": name, + "email": email, + "tier": tier, + "years": years, + "source": source, + "promotion": promotion, + "amount_paid": amount_paid, + "currency": currency, + "notes": notes, + } + result = _request("POST", "/internal/mint", body=body) + typer.echo(result["blob"]) + typer.echo( + f" key: {result['license_key']}\n" + f" tier: {result['tier']}\n" + f" expires: {result['expires_at']}", + err=True, + ) + + +@app.command(name="list") +def list_cmd( + email: Optional[str] = typer.Option(None), + tier: Optional[str] = typer.Option(None), + source: Optional[str] = typer.Option(None), + include_revoked: bool = typer.Option(False, "--include-revoked"), + limit: int = typer.Option(50), + offset: int = typer.Option(0), +) -> None: + """List licenses. Filters AND together; default excludes revoked.""" + rows = _request( + "GET", + "/internal/licenses", + query={ + "email": email, + "tier": tier, + "source": source, + "include_revoked": "true" if include_revoked else "false", + "limit": limit, + "offset": offset, + }, + ) + if not rows: + typer.echo("(no rows)") + return + for r in rows: + revoked = " [REVOKED]" if r.get("revoked_at") else "" + typer.echo( + f"{r['license_key']:36} {r['tier']:10} {r['email']:40} " + f"exp {r['expires_at'][:10]} src={r['source']}{revoked}" + ) + + +@app.command() +def revoke( + license_key: str = typer.Argument(..., help="DT1-... key from `list`."), + reason: Optional[str] = typer.Option(None, help="Stored in notes for audit."), +) -> None: + """Mark a license revoked. Desktop activations stay working until + expiry (no online check); revocation gates future renewals only.""" + result = _request( + "POST", + "/internal/revoke", + body={"license_key": license_key, "reason": reason}, + ) + typer.echo(f"Revoked {result['license_key']} at {result['revoked_at']}") + + +if __name__ == "__main__": + app()