feat(license): registration + 1-year licenses + tier scaffolding

A complete offline licensing layer (no internet at any step):

Core
- src/license/ — schema (License, Tier, FeatureFlag), HMAC crypto,
  JSON storage, LicenseManager singleton with activate/renew/
  deactivate/issue_trial. Tier-scaffolded so future SKUs can carve
  per-tool feature sets without consumer-code edits.
- scripts/generate_license.py — creator-only key generator. Mints a
  DTLIC1: blob the buyer pastes into the activation page.

GUI
- New activation form component (src/gui/components/activation.py).
- hide_streamlit_chrome() now inline-renders the activation form when
  no valid license is present (every page short-circuits to the form
  until activated).
- Sidebar shows tier + days remaining; renewal warning under 30 days.
- New pages/_Activate.py for revisiting the form after activation.

CLI
- src/license_cli.py — activate / renew / status / trial / deactivate
  commands. Exempt from the guard.
- src/cli_license_guard.py — drop-in guard call added to every tool
  CLI's main(). Lets --help through; respects DATATOOLS_DEV_MODE.

i18n
- New activation.* and license.* keys in en.json + es.json
  (page title, form labels, status badges, renewal warnings, error
  messages). Pack parity test stays green.

Test infrastructure
- tests/conftest.py autouse fixture sets DATATOOLS_DEV_MODE=1 so the
  existing 1916 tests continue to pass.
- isolated_license_path / activated_license_manager /
  unactivated_license_manager fixtures for tests that want to drive
  the real check.

Tests (+79)
- tests/test_license.py (40): schema, crypto roundtrip, blob
  encode/decode, tier→feature mapping, activation flow, name/email
  mismatch rejection, tamper detection, expiration, renewal,
  dev-mode bypass.
- tests/test_license_cli.py (26): every license_cli command +
  subprocess tests confirming every tool CLI refuses to run without
  a license, --help always works, DEV_MODE bypasses.
- tests/gui/test_activation.py (13): gate blocks without license,
  passes with trial, activation form submission unlocks the gate,
  sidebar status, renewal warning, i18n.

Total: 1916 → 1995 tests. All pass under the strict warning filter.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-13 16:54:23 +00:00
parent b2c7b94fe9
commit e435103113
27 changed files with 2798 additions and 6 deletions

126
scripts/generate_license.py Normal file
View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
"""Mint a signed license blob for a buyer.
Creator-only tool. Reads the active HMAC secret from the environment
(``$DATATOOLS_LICENSE_SECRET``) — point it at the same secret baked
into the shipped binary or the result will fail to verify.
Examples
--------
Mint a 1-year CORE license for Jane Doe::
python scripts/generate_license.py \\
--name "Jane Doe" --email jane@example.com --tier core
Mint a 2-year PRO license and write the blob to a file::
python scripts/generate_license.py \\
--name "Acme Corp" --email ops@acme.com --tier pro \\
--years 2 --output acme.dtlic
Re-sign with a custom secret (useful for staged rollouts)::
DATATOOLS_LICENSE_SECRET=shipping-secret-2026 \\
python scripts/generate_license.py --name ... --email ...
The output is a single base64-encoded token starting with ``DTLIC1:``
— paste this whole string into the buyer's delivery email or
deliver as an attached ``.dtlic`` file.
"""
from __future__ import annotations
import argparse
import sys
import uuid
from pathlib import Path
# Make ``src.license`` importable when run from the repo root.
_PROJECT_ROOT = Path(__file__).resolve().parent.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
from src.license import Tier # noqa: E402
from src.license.crypto import encode_blob, sign # noqa: E402
from src.license.features import all_features_for_tier # noqa: E402
from src.license.schema import ( # noqa: E402
License,
_utcnow_iso,
default_expiry_iso,
)
def build_args() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="Mint a signed DataTools license blob.",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument("--name", required=True, help="Buyer's full name.")
p.add_argument("--email", required=True, help="Buyer's email.")
p.add_argument(
"--tier",
default=Tier.CORE.value,
choices=[t.value for t in Tier],
help="License tier (default: %(default)s).",
)
p.add_argument(
"--years",
type=int,
default=1,
help="License lifetime in years (default: %(default)s).",
)
p.add_argument(
"--key",
default=None,
help="Override the auto-generated license key (default: random).",
)
p.add_argument(
"--output",
"-o",
type=Path,
default=None,
help="Write the blob to this file (default: print to stdout).",
)
return p
def main(argv: list[str] | None = None) -> int:
args = build_args().parse_args(argv)
tier = Tier(args.tier)
rid = uuid.uuid4().hex
key = args.key or f"DT1-{tier.value.upper()}-{rid[:8]}-{rid[8:16]}"
lic = License(
name=args.name,
email=args.email,
license_key=key,
tier=tier,
features=all_features_for_tier(tier),
issued_at=_utcnow_iso(),
expires_at=default_expiry_iso(years=args.years),
signature="",
)
signature = sign(lic.to_canonical_dict())
payload = lic.to_canonical_dict()
payload["signature"] = signature
blob = encode_blob(payload)
if args.output:
args.output.write_text(blob + "\n", encoding="utf-8")
print(f"Wrote license to {args.output}", file=sys.stderr)
else:
print(blob)
print(
f" name: {lic.name}\n"
f" email: {lic.email}\n"
f" tier: {lic.tier.value}\n"
f" key: {lic.license_key}\n"
f" expires: {lic.expires_at}",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -499,6 +499,8 @@ def _write_match_groups(result, original_df, path: Path) -> None:
# ---------------------------------------------------------------------------
def main():
from src.cli_license_guard import guard
guard()
app()

View File

@@ -151,8 +151,14 @@ def _maybe_strict_exit(findings, strict: bool) -> None:
raise typer.Exit(code=1)
def main() -> None:
from src.cli_license_guard import guard
guard()
app()
# Entrypoint when run via `python -m src.cli_analyze`. Typer's no_args_is_help
# kicks in when the user invokes without args; we expose the single command at
# the top level for convenience: ``python -m src.cli_analyze input.csv``.
if __name__ == "__main__":
app()
main()

View File

@@ -348,6 +348,8 @@ def _print_results(result, input_path: Path, options) -> None:
# ---------------------------------------------------------------------------
def main():
from src.cli_license_guard import guard
guard()
app()

View File

@@ -357,6 +357,8 @@ def standardize(
def main():
from src.cli_license_guard import guard
guard()
app()

86
src/cli_license_guard.py Normal file
View File

@@ -0,0 +1,86 @@
"""License guard for the tool CLIs.
Every tool CLI (``cli.py``, ``cli_text_clean.py``, ``cli_format.py``,
``cli_missing.py``, ``cli_column_map.py``, ``cli_pipeline.py``,
``cli_analyze.py``) calls :func:`guard` from its ``main()`` before
delegating to Typer. The guard:
1. Lets ``--help`` / ``-h`` through unconditionally so users can
always see what a command does.
2. Lets the ``DATATOOLS_DEV_MODE=1`` env var bypass the check.
3. Otherwise, verifies a valid license is on disk. If not, prints a
one-line user-facing message naming the exact ``datatools-license``
subcommand to run, then exits with status 2.
The exit code (2) is the same Typer uses for argument errors — fits
into ``run_tests.py`` / CI pipelines without special casing.
"""
from __future__ import annotations
import sys
from typing import NoReturn
_HELP_FLAGS = {"--help", "-h", "--version"}
def _is_help_invocation() -> bool:
"""True when the user is asking for help, not running work."""
return any(arg in _HELP_FLAGS for arg in sys.argv[1:])
def guard() -> None:
"""Block startup if no valid license. No-op when license is valid,
when called with ``--help``, or under ``DATATOOLS_DEV_MODE``."""
if _is_help_invocation():
return
# Lazy import so a broken license module doesn't fail ``--help``.
from src.license import (
ExpiredLicenseError,
InvalidLicenseError,
LicenseError,
get_manager,
)
mgr = get_manager()
if mgr.dev_mode:
return
try:
if mgr.is_valid():
return
except LicenseError:
# ``is_valid()`` swallows errors and returns False, but be
# paranoid: fall through to the state-based diagnostic.
pass
state = mgr.current_state()
_exit_with_message(state)
def _exit_with_message(state) -> NoReturn:
"""Print the right one-liner for the current state and exit."""
if state.error_kind == "not_activated":
msg = (
"DataTools is not activated.\n"
"Run: python -m src.license_cli activate <license-blob>\n"
"Or start a 1-year trial: "
"python -m src.license_cli trial --name 'Your Name' --email you@example.com"
)
elif state.error_kind == "expired":
msg = (
f"License expired on {state.expires_at[:10]}.\n"
"Renew with: python -m src.license_cli renew <license-blob>"
)
elif state.error_kind == "invalid":
msg = (
"License file is present but invalid.\n"
f"Detail: {state.error_message}\n"
"Re-paste the blob from your delivery email: "
"python -m src.license_cli activate <license-blob>"
)
else:
msg = "License is not valid. Run: python -m src.license_cli status"
print(f"Error: {msg}", file=sys.stderr)
raise SystemExit(2)

View File

@@ -373,6 +373,8 @@ def _print_results(result, input_path: Path, options) -> None:
# ---------------------------------------------------------------------------
def main():
from src.cli_license_guard import guard
guard()
app()

View File

@@ -300,6 +300,8 @@ def run(
def main() -> None:
from src.cli_license_guard import guard
guard()
app()

View File

@@ -370,6 +370,8 @@ def _print_results(result, input_path: Path, options) -> None:
# ---------------------------------------------------------------------------
def main():
from src.cli_license_guard import guard
guard()
app()

View File

@@ -38,12 +38,22 @@ from . import _legacy as _legacy # noqa: F401 (keep for direct access)
# Names exported from _legacy.py that pages currently use. Kept here as
# the canonical public list so a removal from _legacy is a visible
# breaking change instead of a silent drop.
from .activation import ( # noqa: F401 re-exported
render_activation_form,
render_license_status_sidebar,
require_license_or_render_activation,
)
__all__ = [
# Shared chrome / pickup / gate
"hide_streamlit_chrome",
"quit_button",
"pickup_or_upload",
"require_normalization_gate",
# License gate + activation form
"render_activation_form",
"render_license_status_sidebar",
"require_license_or_render_activation",
# Dedup widgets
"config_panel",
"match_group_card",

View File

@@ -72,13 +72,21 @@ footer {
"""
def hide_streamlit_chrome() -> None:
def hide_streamlit_chrome(*, gate_license: bool = True) -> None:
"""Inject CSS to hide Streamlit's default header, menu, and footer.
Also renders the sidebar language selector, since every entrypoint
that hides the default chrome wants the picker visible in the
same place. Pages that want a clean chrome without the selector can
inject ``_HIDE_CHROME_CSS`` themselves instead of calling this.
Also renders the sidebar language selector + license status badge,
since every entrypoint that hides the default chrome wants those
visible in the same place. Pages that want a clean chrome without
them can inject ``_HIDE_CHROME_CSS`` themselves instead of calling
this.
When *gate_license* is True (the default) the function calls
:func:`require_license_or_render_activation` after the sidebar
widgets render. If no valid license is present, the activation
form replaces the page body and the page short-circuits via
``st.stop()``. The Activate page itself passes ``False`` so it
can render its own form without recursion.
"""
st.markdown(_HIDE_CHROME_CSS, unsafe_allow_html=True)
# Imported lazily so this module stays importable in environments
@@ -86,6 +94,14 @@ def hide_streamlit_chrome() -> None:
# individual legacy helpers).
from src.i18n import render_language_selector
render_language_selector()
# License chrome: sidebar status badge + inline gate.
from .activation import (
render_license_status_sidebar,
require_license_or_render_activation,
)
render_license_status_sidebar()
if gate_license:
require_license_or_render_activation()
# ---------------------------------------------------------------------------

View File

@@ -0,0 +1,218 @@
"""Activation page rendering + license gate inline-renderer.
Two public callables:
- :func:`render_activation_form` — the activation page body. Used by
``pages/_Activate.py`` for the explicit nav entry and by
:func:`require_license_or_render_activation` for the gate-injected
inline form.
- :func:`require_license_or_render_activation` — call after
``hide_streamlit_chrome`` from every page that needs a valid
license. When the license is missing, expired, or invalid it
renders the activation form in place of the page body and calls
``st.stop()`` so the rest of the page doesn't execute.
- :func:`render_license_status_sidebar` — small chrome widget the
sidebar uses to show "Core · 327 days left" or "🛑 Expired".
The component is kept dependency-free of the page modules so any
page can import these helpers without circular imports through
``components.__init__``.
"""
from __future__ import annotations
from typing import Optional
import streamlit as st
from src.i18n import t as _t
from src.license import (
ExpiredLicenseError,
InvalidLicenseError,
LicenseError,
NotActivatedError,
get_manager,
)
# ---------------------------------------------------------------------------
# Sidebar status widget
# ---------------------------------------------------------------------------
def render_license_status_sidebar() -> None:
"""Render the per-page sidebar license badge.
Cheap to call — the manager caches the parsed license, so multiple
pages mounting the same chrome don't pay multiple disk reads.
"""
state = get_manager().current_state()
target = st.sidebar
if not state.activated:
target.caption(f"🔒 {_t('license.status_not_activated')}")
return
if state.error_kind == "invalid":
target.caption(f"⚠️ {_t('license.status_invalid')}")
return
if not state.valid:
# Activated but expired.
target.caption(f"🛑 {_t('license.status_expired')}")
return
tier_label = _t(f"license.tier_{state.tier}") or state.tier.title()
if state.tier == "trial":
label = _t("license.status_trial", days=state.days_remaining)
else:
label = _t(
"license.status_active",
tier=tier_label,
days=state.days_remaining,
)
target.caption(label)
if 0 < state.days_remaining <= 30:
target.warning(
_t("license.renewal_warning_30", days=state.days_remaining)
)
# ---------------------------------------------------------------------------
# Activation page body
# ---------------------------------------------------------------------------
def render_activation_form(*, key_prefix: str = "act") -> None:
"""Render the full activation page body.
*key_prefix* is appended to every Streamlit widget key so the
form can be embedded inline by ``require_license_or_render_activation``
without colliding with an instance that's mounted by the
explicit ``_Activate.py`` page.
"""
st.title(_t("activation.title"))
st.caption(_t("activation.intro"))
mgr = get_manager()
state = mgr.current_state()
# Surface the current state explicitly so a user with an expired /
# invalid license sees what's wrong before they paste anything.
if state.activated and state.error_kind == "expired":
st.warning(
_t("license.renewal_warning_expired", date=state.expires_at[:10])
)
elif state.activated and state.error_kind == "invalid":
st.error(f"⚠️ {state.error_message}")
st.divider()
with st.form(key=f"{key_prefix}_form"):
name = st.text_input(
_t("activation.name_label"),
value=state.name,
help=_t("activation.name_help"),
key=f"{key_prefix}_name",
)
email = st.text_input(
_t("activation.email_label"),
value=state.email,
help=_t("activation.email_help"),
key=f"{key_prefix}_email",
)
blob = st.text_area(
_t("activation.blob_label"),
help=_t("activation.blob_help"),
key=f"{key_prefix}_blob",
height=120,
)
col_activate, col_trial = st.columns(2)
with col_activate:
is_renewal = state.activated
label = (
_t("activation.renew_button") if is_renewal
else _t("activation.activate_button")
)
submit = st.form_submit_button(label, type="primary")
with col_trial:
trial = st.form_submit_button(
_t("activation.trial_button"),
help=_t("activation.trial_help"),
)
# Process the submission.
if submit:
if not blob.strip():
st.error(_t("activation.errors_heading") + ": license blob is empty.")
return
try:
if is_renewal:
lic = mgr.renew(blob)
st.success(_t("activation.renewed", expires=lic.expires_at[:10]))
else:
lic = mgr.activate_from_blob(blob, name=name, email=email)
st.success(_t(
"activation.success",
name=lic.name, expires=lic.expires_at[:10],
))
# Force a clean re-render so the gate sees the new state.
st.rerun()
except LicenseError as e:
st.error(f"{_t('activation.errors_heading')}: {e}")
if trial:
try:
lic = mgr.issue_trial(name=name, email=email)
st.success(_t(
"activation.success",
name=lic.name, expires=lic.expires_at[:10],
))
st.rerun()
except LicenseError as e:
st.error(f"{_t('activation.errors_heading')}: {e}")
# Deactivate (only useful when already activated).
if state.activated:
st.divider()
with st.expander(_t("activation.deactivate_button")):
st.caption(_t("activation.deactivate_help"))
if st.button(
_t("activation.deactivate_button"),
key=f"{key_prefix}_deactivate",
type="secondary",
):
mgr.deactivate()
st.rerun()
# ---------------------------------------------------------------------------
# Gate
# ---------------------------------------------------------------------------
def require_license_or_render_activation(*, feature: Optional[str] = None) -> None:
"""Page-level guard. Call after ``hide_streamlit_chrome``.
If a valid license exists (and *feature*, when supplied, is
unlocked), this is a no-op. Otherwise it renders the activation
form in place of the page body and calls ``st.stop()``.
The chrome helper invokes this automatically so individual pages
don't have to call it; the explicit *feature* form is provided
for tools that want to bypass the global gate but enforce a tier
check (future SKU work).
"""
mgr = get_manager()
if mgr.dev_mode:
return
if mgr.is_valid():
# When feature gating gets enabled per-tool, this is the hook.
if feature is not None:
try:
mgr.require_feature(feature)
except LicenseError as e:
st.error(f"⚠️ {e}")
st.stop()
return
# Otherwise: render the activation form inline and stop the page.
render_activation_form(key_prefix="gate")
st.stop()

View File

@@ -0,0 +1,34 @@
"""Activate — license registration + renewal.
Lives in the sidebar nav under an underscore-prefixed filename so
Streamlit sorts it above the numbered tool pages. The chrome's
license gate also injects the activation form inline on any other
page when no valid license is present; this page exists so a user
can revisit the form without hitting an expiration first (e.g., to
review renewal status or deactivate the device).
"""
from __future__ import annotations
import sys
from pathlib import Path
import streamlit as st
_project_root = Path(__file__).resolve().parent.parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from src.gui.components import hide_streamlit_chrome, render_activation_form
from src.i18n import t
st.set_page_config(
page_title=t("activation.page_title"),
page_icon="🔑",
layout="wide",
)
# ``gate_license=False`` keeps the chrome from re-rendering the
# activation form on top of the form we're about to render below.
hide_streamlit_chrome(gate_license=False)
render_activation_form(key_prefix="page")

View File

@@ -56,6 +56,44 @@
"body": "Clicking the button below will terminate the DataTools server. Any unsaved work in other tools will be lost. Once the app shuts down you can close this window.",
"button": "Close the app"
},
"activation": {
"page_title": "DataTools — Activate",
"title": "🔑 Activate DataTools",
"intro": "DataTools needs to be activated before any tools unlock. Enter the name and email tied to your purchase, then paste the license blob from your delivery email.",
"name_label": "Full name",
"name_help": "Must match the name on your purchase receipt.",
"email_label": "Email",
"email_help": "Must match the email on your purchase receipt.",
"blob_label": "License blob",
"blob_help": "Begins with `DTLIC1:` — paste the entire string.",
"activate_button": "Activate",
"renew_button": "Apply renewal",
"trial_button": "Start 1-year trial",
"trial_help": "Skips the paid blob and self-issues a 1-year license tied to your name and email. Useful for evaluating before purchase.",
"or_separator": "— or —",
"success": "Activated! Welcome, {name}. Your license is valid until {expires}.",
"renewed": "License renewed. New expiry: {expires}.",
"errors_heading": "Activation problem",
"deactivate_button": "Deactivate this device",
"deactivate_help": "Removes the local license file. You'll need to re-paste your blob to reactivate."
},
"license": {
"status_active": "{tier} · {days} days left",
"status_trial": "Trial · {days} days left",
"status_expired": "Expired",
"status_not_activated": "Not activated",
"status_invalid": "License invalid",
"renewal_warning_30": "⚠️ License expires in {days} days. Renew soon to avoid interruption.",
"renewal_warning_expired": "🛑 License expired on {date}. Renew to continue using DataTools.",
"tier_trial": "Trial",
"tier_core": "Core",
"tier_pro": "Pro",
"tier_enterprise": "Enterprise",
"registered_to": "Registered to {name} · {email}",
"expires_on": "Expires on {date}",
"issued_on": "Issued on {date}",
"view_details": "License details"
},
"tools": {
"01_deduplicator": {
"name": "Deduplicator",

View File

@@ -56,6 +56,44 @@
"body": "Al pulsar el botón de abajo se cerrará el servidor de DataTools. Cualquier trabajo sin guardar en otras herramientas se perderá. Una vez cerrada la app, puedes cerrar esta ventana.",
"button": "Cerrar la app"
},
"activation": {
"page_title": "DataTools — Activar",
"title": "🔑 Activar DataTools",
"intro": "DataTools debe activarse antes de desbloquear cualquier herramienta. Introduce el nombre y correo asociados a tu compra, y luego pega el código de licencia del correo de entrega.",
"name_label": "Nombre completo",
"name_help": "Debe coincidir con el nombre en el recibo de compra.",
"email_label": "Correo electrónico",
"email_help": "Debe coincidir con el correo del recibo de compra.",
"blob_label": "Código de licencia",
"blob_help": "Empieza con `DTLIC1:` — pega la cadena completa.",
"activate_button": "Activar",
"renew_button": "Aplicar renovación",
"trial_button": "Iniciar prueba de 1 año",
"trial_help": "Omite el código de pago y emite una licencia local de 1 año vinculada a tu nombre y correo. Útil para evaluar antes de comprar.",
"or_separator": "— o —",
"success": "¡Activado! Bienvenido, {name}. Tu licencia es válida hasta el {expires}.",
"renewed": "Licencia renovada. Nueva fecha de caducidad: {expires}.",
"errors_heading": "Problema al activar",
"deactivate_button": "Desactivar este dispositivo",
"deactivate_help": "Elimina el archivo de licencia local. Tendrás que volver a pegar tu código para reactivarla."
},
"license": {
"status_active": "{tier} · {days} días restantes",
"status_trial": "Prueba · {days} días restantes",
"status_expired": "Caducada",
"status_not_activated": "Sin activar",
"status_invalid": "Licencia inválida",
"renewal_warning_30": "⚠️ La licencia caduca en {days} días. Renueva pronto para evitar interrupciones.",
"renewal_warning_expired": "🛑 La licencia caducó el {date}. Renuévala para seguir usando DataTools.",
"tier_trial": "Prueba",
"tier_core": "Core",
"tier_pro": "Pro",
"tier_enterprise": "Enterprise",
"registered_to": "Registrado a nombre de {name} · {email}",
"expires_on": "Caduca el {date}",
"issued_on": "Emitida el {date}",
"view_details": "Detalles de la licencia"
},
"tools": {
"01_deduplicator": {
"name": "Eliminador de duplicados",

59
src/license/__init__.py Normal file
View File

@@ -0,0 +1,59 @@
"""License module — registration, activation, expiration, feature gating.
Public API the rest of the app uses:
- :func:`get_manager` — singleton :class:`LicenseManager` instance.
- :func:`current_state` — quick snapshot for status badges / tests.
- :func:`require_feature` — raise :class:`LicenseError` if a feature
isn't unlocked by the active license.
- :class:`License`, :class:`Tier`, :class:`FeatureFlag` — schema.
- :class:`LicenseError` and subclasses — typed failures the UI can
branch on (not yet activated vs. expired vs. tampered).
The license model is:
1. The seller (creator) runs ``scripts/generate_license.py`` to mint a
signed **license blob** keyed to a buyer's name + email.
2. The buyer pastes the blob into the activation page on first launch.
3. The app verifies the HMAC signature locally (no internet), then
writes a canonical ``~/.datatools/license.json`` and the app
unlocks.
The signature is HMAC-SHA256 with a build-time secret. Combined with
the 30-day refund policy, this is honor-system DRM — see
``docs/DECISIONS.md`` for the trade-off discussion.
"""
from __future__ import annotations
from .errors import (
ExpiredLicenseError,
InvalidLicenseError,
LicenseError,
NotActivatedError,
UnsupportedFeatureError,
)
from .features import FEATURES_BY_TIER, all_features_for_tier
from .manager import LicenseManager, current_state, get_manager, require_feature
from .schema import FeatureFlag, License, Tier
__all__ = [
# Manager
"LicenseManager",
"current_state",
"get_manager",
"require_feature",
# Schema
"FeatureFlag",
"License",
"Tier",
# Feature registry
"FEATURES_BY_TIER",
"all_features_for_tier",
# Errors
"LicenseError",
"NotActivatedError",
"ExpiredLicenseError",
"InvalidLicenseError",
"UnsupportedFeatureError",
]

112
src/license/crypto.py Normal file
View File

@@ -0,0 +1,112 @@
"""HMAC sign/verify for license blobs.
The signing secret is read from ``$DATATOOLS_LICENSE_SECRET`` if
present, otherwise from the build-time constant below. Replace the
constant at build time (via PyInstaller hook or a sed step in the
build pipeline) so the shipped binary has a different secret from
this repo's source tree.
Threat model: honor-system DRM. A motivated reverse engineer can pull
the secret out of the binary, sign their own licenses, and bypass the
check. That's expected for $49 desktop software — the goal is to
discourage casual sharing, not stop targeted piracy. The 30-day
refund policy and the personal-name embedded in every license cover
the same gap from a different angle.
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
from typing import Any
# Build-time default. Replace via env var in shipped builds; keep this
# constant non-empty so unit tests have a stable verification key.
_DEFAULT_SECRET = (
"datatools-license-v1-development-secret-"
"replace-at-build-time-via-DATATOOLS_LICENSE_SECRET"
)
def _secret_bytes() -> bytes:
"""Return the active HMAC secret as bytes."""
return os.environ.get("DATATOOLS_LICENSE_SECRET", _DEFAULT_SECRET).encode("utf-8")
def _canonical_bytes(payload: dict[str, Any]) -> bytes:
"""Canonical JSON encoding for the HMAC input.
``sort_keys=True`` + ``separators=(",", ":")`` produce a byte-for-
byte deterministic representation across Python versions and OS
locales. Without that, two structurally-identical dicts could hash
to different signatures.
"""
return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
def sign(payload: dict[str, Any]) -> str:
"""Compute the HMAC-SHA256 hex digest over *payload*.
*payload* MUST NOT contain a ``signature`` key — that's the field
we're computing. The caller is responsible for stripping it.
"""
digest = hmac.new(_secret_bytes(), _canonical_bytes(payload), hashlib.sha256)
return digest.hexdigest()
def verify(payload: dict[str, Any], signature: str) -> bool:
"""Constant-time compare between the recomputed HMAC and *signature*.
Returns ``True`` on a match. Uses :func:`hmac.compare_digest` so a
timing oracle can't be used to recover the secret one byte at a
time — overkill for honor-system DRM, but free.
"""
expected = sign(payload)
return hmac.compare_digest(expected.encode("ascii"), signature.encode("ascii"))
# ---------------------------------------------------------------------------
# Blob encoding / decoding
# ---------------------------------------------------------------------------
# A "license blob" is the artifact the buyer pastes into the activation
# form. It's a base64-encoded JSON dict containing every license field
# *plus* the signature. We choose base64 over raw JSON so the blob is
# one paste-able token (no whitespace surprises) and so a typo
# truncates the blob into an obviously-invalid form rather than a
# subtly-mutated payload.
_BLOB_PREFIX = "DTLIC1:"
def encode_blob(payload_with_signature: dict[str, Any]) -> str:
"""Wrap a signed payload into the buyer-facing blob form."""
raw = json.dumps(
payload_with_signature, sort_keys=True, separators=(",", ":"),
).encode("utf-8")
return _BLOB_PREFIX + base64.urlsafe_b64encode(raw).decode("ascii")
def decode_blob(blob: str) -> dict[str, Any]:
"""Reverse of :func:`encode_blob`. Raises ``ValueError`` on a
blob that doesn't carry the expected prefix or doesn't decode
cleanly — both surface as :class:`InvalidLicenseError` at the
manager layer."""
s = blob.strip()
if not s.startswith(_BLOB_PREFIX):
raise ValueError(
f"License blob missing {_BLOB_PREFIX!r} prefix. "
"Did you paste the wrong text?"
)
encoded = s[len(_BLOB_PREFIX):]
try:
raw = base64.urlsafe_b64decode(encoded.encode("ascii"))
except (ValueError, TypeError) as e:
raise ValueError(f"License blob is not valid base64: {e}") from e
try:
return json.loads(raw.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
raise ValueError(f"License blob contains invalid JSON: {e}") from e

47
src/license/errors.py Normal file
View File

@@ -0,0 +1,47 @@
"""Structured error hierarchy for the license layer.
Mirrors the ``src.core.errors`` pattern — every subclass extends a
stdlib base so existing ``except OSError`` / ``except ValueError``
handlers keep working. The UI / CLI branches on the subclass to render
the right next step (activate, renew, contact support).
"""
from __future__ import annotations
class LicenseError(ValueError):
"""Base class for every licensing failure. Subclass-only — callers
should catch the specific failure mode they handle."""
class NotActivatedError(LicenseError):
"""No license file present, or file present but signature missing.
Recovery: open the activation page (GUI) or run
``datatools-license activate <blob>`` (CLI).
"""
class InvalidLicenseError(LicenseError):
"""The license file is present but failed verification.
Common causes: tampered signature, blob from a different build
(different secret), corrupted JSON. Recovery: re-paste the blob
from the original delivery email or contact support.
"""
class ExpiredLicenseError(LicenseError):
"""The license is structurally valid but past its expiration date.
Recovery: renew via ``datatools-license renew <blob>`` or paste a
new blob into the activation page.
"""
class UnsupportedFeatureError(LicenseError):
"""The active license's tier doesn't unlock a requested feature.
Raised by :func:`require_feature` when, e.g., a TRIAL user tries
to access an ENTERPRISE-only tool. Recovery: upgrade tier.
"""

49
src/license/features.py Normal file
View File

@@ -0,0 +1,49 @@
"""Tier → feature mapping.
A tier unlocks every feature listed for it. Adding a new SKU means
adding a new row here and (if the SKU introduces new functionality)
adding feature flags to :class:`~src.license.schema.FeatureFlag`. No
consumer code changes.
The v1 product ships only :data:`Tier.CORE`, which unlocks every tool.
TRIAL exists so a buyer can register without a paid key and still get
a 1-year working license; the difference between TRIAL and CORE is
semantic (and the basis for showing "TRIAL" in the sidebar), not
functional.
PRO and ENTERPRISE are scaffolded for future SKUs. They currently
unlock the same feature set as CORE so the architecture is exercised
by tests without committing to a particular pricing structure.
"""
from __future__ import annotations
from typing import FrozenSet
from .schema import FeatureFlag, Tier
def _all() -> FrozenSet[FeatureFlag]:
"""Every feature flag — used as the default for the v1 SKU."""
return frozenset(FeatureFlag)
FEATURES_BY_TIER: dict[Tier, FrozenSet[FeatureFlag]] = {
Tier.TRIAL: _all(),
Tier.CORE: _all(),
# Pre-wired for future SKUs. Today they mirror CORE so the gating
# tests exercise the lookup path without making a marketing claim.
Tier.PRO: _all(),
Tier.ENTERPRISE: _all(),
}
def all_features_for_tier(tier: Tier) -> tuple[str, ...]:
"""Return the canonical, sorted tuple of feature ids for *tier*.
Used by the license generator to fill the ``features`` field on a
new license, and by the manager to upgrade an older license whose
``features`` list omits a flag we've since added to its tier.
"""
flags = FEATURES_BY_TIER[tier]
return tuple(sorted(f.value for f in flags))

470
src/license/manager.py Normal file
View File

@@ -0,0 +1,470 @@
"""LicenseManager — the public face of the license layer.
Singleton-by-default (``get_manager()`` returns a process-wide
instance), but tests can construct standalone managers via the
constructor for full isolation.
Lifecycle::
mgr = get_manager()
if not mgr.is_activated():
mgr.activate_from_blob(blob, name, email)
mgr.require_feature(FeatureFlag.DEDUPLICATOR)
state = mgr.current_state() # snapshot for the sidebar / CLI status
"""
from __future__ import annotations
import os
import re
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from . import crypto, storage
from .errors import (
ExpiredLicenseError,
InvalidLicenseError,
LicenseError,
NotActivatedError,
UnsupportedFeatureError,
)
from .features import all_features_for_tier
from .schema import FeatureFlag, License, Tier, default_expiry_iso, _utcnow_iso
# ---------------------------------------------------------------------------
# State snapshot
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class LicenseState:
"""A read-only snapshot for status widgets / CLI ``--status`` JSON.
Always safe to render — even when no license is activated the
dataclass is populated with explanatory defaults so the GUI never
needs to None-check before formatting.
"""
activated: bool
valid: bool # activated AND not expired AND signature OK
name: str
email: str
tier: str
license_key: str
issued_at: str
expires_at: str
days_remaining: int
features: tuple[str, ...]
error_kind: str # "", "not_activated", "expired", "invalid"
error_message: str
def as_dict(self) -> dict:
from dataclasses import asdict
d = asdict(self)
d["features"] = list(self.features)
return d
_EMPTY_STATE = LicenseState(
activated=False, valid=False, name="", email="", tier="",
license_key="", issued_at="", expires_at="", days_remaining=0,
features=(),
error_kind="not_activated",
error_message="No license activated.",
)
# ---------------------------------------------------------------------------
# Manager
# ---------------------------------------------------------------------------
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
class LicenseManager:
"""Read/write license state. Cheap to construct; the singleton at
module level just avoids reload churn.
Storage path defaults to :func:`storage.default_license_path` —
pass ``path=`` to override for tests.
"""
def __init__(self, *, path: Optional[Path] = None) -> None:
self._path = path
self._cached: Optional[License] = None
self._dev_mode: Optional[bool] = None
# --- Dev bypass ---------------------------------------------------------
@property
def dev_mode(self) -> bool:
"""``DATATOOLS_DEV_MODE=1`` short-circuits every check.
Cached on the instance so a test that sets the env after
construction still picks it up (re-read on each access).
"""
return _truthy_env("DATATOOLS_DEV_MODE")
# --- Load / save --------------------------------------------------------
def load(self) -> Optional[License]:
"""Read + verify the on-disk license. Returns ``None`` when no
file exists. Raises :class:`InvalidLicenseError` on signature
mismatch / tampering."""
raw = storage.read_raw(self._path)
if raw is None:
self._cached = None
return None
lic = License.from_dict(raw)
# Verify signature against the canonical payload.
if not crypto.verify(lic.to_canonical_dict(), lic.signature):
raise InvalidLicenseError(
"License signature does not verify. The file may have "
"been tampered with, or it was issued by a different "
"build. Re-paste the original license blob to recover."
)
self._cached = lic
return lic
def save(self, lic: License) -> Path:
"""Persist *lic* to the configured path. Caller is responsible
for having signed the license already; this function does
NOT re-sign."""
path = storage.write_raw(lic.to_dict(), self._path)
self._cached = lic
return path
def deactivate(self) -> bool:
"""Remove the on-disk license. Returns whether a file was
removed (False if nothing was active)."""
self._cached = None
return storage.remove(self._path)
# --- Activation ---------------------------------------------------------
def activate_from_blob(
self,
blob: str,
*,
name: str,
email: str,
) -> License:
"""Verify *blob* and write the activated license to disk.
The buyer pastes the blob; the page collects their *name* and
*email* separately. We require both registered values to
match the values embedded in the signed blob — defends
against blob-sharing between buyers.
"""
_validate_registration(name, email)
try:
payload = crypto.decode_blob(blob)
except ValueError as e:
raise InvalidLicenseError(str(e)) from e
signature = payload.get("signature", "")
if not signature:
raise InvalidLicenseError(
"License blob is missing the ``signature`` field. "
"The blob may have been truncated when pasted."
)
canonical = {k: v for k, v in payload.items() if k != "signature"}
if not crypto.verify(canonical, signature):
raise InvalidLicenseError(
"License blob signature did not verify. The blob may "
"be corrupt, intended for a different product build, "
"or modified after issue."
)
# Reconstruct the License dataclass after verification so the
# canonical dict we hashed matches the on-disk JSON.
lic = License.from_dict(payload)
# Personal-name and email matching is a soft attestation. We
# enforce case-insensitive equality after stripping whitespace,
# so " jane@Example.com " matches the embedded canonical
# form without surprising the user about case.
if name.strip().casefold() != lic.name.casefold() or (
email.strip().casefold() != lic.email.casefold()
):
raise InvalidLicenseError(
"Registered name / email do not match the values "
"embedded in the license blob. Contact support if you "
"believe this is in error."
)
if lic.is_expired():
raise ExpiredLicenseError(
f"License expired on {lic.expires_at}. "
"Paste a renewal blob to extend access."
)
self.save(lic)
return lic
def issue_trial(self, *, name: str, email: str, years: int = 1) -> License:
"""Self-sign a 1-year trial license. The seller's
``scripts/generate_license.py`` produces these for buyers; the
same code path is reused at activation time as a fallback
when a buyer wants to evaluate without a key.
Trial licenses are functionally identical to CORE in v1; only
the tier label differs (so the sidebar can say "TRIAL" if we
ever want to nudge a conversion).
"""
_validate_registration(name, email)
return self._mint(name=name, email=email, tier=Tier.TRIAL, years=years)
def renew(self, blob: str) -> License:
"""Renew an existing license using a fresh blob.
Verification: the blob must verify, its name+email must match
the currently-active license, and its expiry must be in the
future. We allow tier changes during renewal (upgrade path).
"""
current = self._cached or self.load()
if current is None:
raise NotActivatedError(
"No active license to renew. Use ``activate`` instead "
"of ``renew`` for first-time setup."
)
try:
payload = crypto.decode_blob(blob)
except ValueError as e:
raise InvalidLicenseError(str(e)) from e
signature = payload.get("signature", "")
canonical = {k: v for k, v in payload.items() if k != "signature"}
if not crypto.verify(canonical, signature):
raise InvalidLicenseError("Renewal blob signature did not verify.")
lic = License.from_dict(payload)
if (
lic.name.casefold() != current.name.casefold()
or lic.email.casefold() != current.email.casefold()
):
raise InvalidLicenseError(
"Renewal blob is for a different name/email than the "
"currently-active license."
)
if lic.is_expired():
raise ExpiredLicenseError(
"Renewal blob is itself expired. Generate a new one."
)
self.save(lic)
return lic
# --- Inspection ---------------------------------------------------------
def is_activated(self) -> bool:
if self._cached is not None:
return True
return storage.read_raw(self._path) is not None
def is_valid(self) -> bool:
if self.dev_mode:
return True
try:
lic = self._cached or self.load()
except LicenseError:
return False
if lic is None:
return False
return not lic.is_expired()
def current_state(self) -> LicenseState:
if self.dev_mode:
return LicenseState(
activated=True, valid=True,
name="dev", email="dev@local",
tier=Tier.ENTERPRISE.value,
license_key="DEV-BYPASS",
issued_at=_utcnow_iso(),
expires_at=default_expiry_iso(years=99),
days_remaining=36500,
features=all_features_for_tier(Tier.ENTERPRISE),
error_kind="",
error_message="",
)
try:
lic = self._cached or self.load()
except InvalidLicenseError as e:
return _EMPTY_STATE.__class__(
activated=True, valid=False,
name="", email="", tier="", license_key="",
issued_at="", expires_at="", days_remaining=0,
features=(),
error_kind="invalid",
error_message=str(e),
)
if lic is None:
return _EMPTY_STATE
if lic.is_expired():
return LicenseState(
activated=True, valid=False,
name=lic.name, email=lic.email, tier=lic.tier.value,
license_key=lic.license_key,
issued_at=lic.issued_at, expires_at=lic.expires_at,
days_remaining=lic.days_remaining(),
features=lic.features,
error_kind="expired",
error_message=(
f"License expired on {lic.expires_at}. "
"Paste a renewal blob to extend access."
),
)
return LicenseState(
activated=True, valid=True,
name=lic.name, email=lic.email, tier=lic.tier.value,
license_key=lic.license_key,
issued_at=lic.issued_at, expires_at=lic.expires_at,
days_remaining=max(lic.days_remaining(), 0),
features=lic.features,
error_kind="",
error_message="",
)
def require_feature(self, feature: str | FeatureFlag) -> License:
"""Raise the right error if *feature* isn't accessible.
Returns the active :class:`License` on success so callers can
log the tier / days-remaining alongside their own work.
"""
if self.dev_mode:
# Synthesize a dev license so callers expecting a return
# value don't blow up. The dev license unlocks every flag.
return License(
name="dev", email="dev@local",
license_key="DEV-BYPASS",
tier=Tier.ENTERPRISE,
features=all_features_for_tier(Tier.ENTERPRISE),
issued_at=_utcnow_iso(),
expires_at=default_expiry_iso(years=99),
signature="",
)
try:
lic = self._cached or self.load()
except InvalidLicenseError:
raise
if lic is None:
raise NotActivatedError(
"DataTools is not activated. Run "
"``datatools-license activate <blob>`` or use the "
"Activate page in the GUI."
)
if lic.is_expired():
raise ExpiredLicenseError(
f"License expired on {lic.expires_at}. "
"Renew before continuing."
)
if not lic.has_feature(feature):
tier_name = lic.tier.value if isinstance(lic.tier, Tier) else lic.tier
raise UnsupportedFeatureError(
f"Feature {feature!r} is not enabled on the active "
f"{tier_name!r} license."
)
return lic
# --- Internals ---------------------------------------------------------
def _mint(
self,
*,
name: str,
email: str,
tier: Tier,
years: int = 1,
license_key: Optional[str] = None,
) -> License:
"""Self-sign a new license. Used by ``issue_trial`` and by
the seller-side key generation utility (which calls the
same code via the bare manager)."""
now = _utcnow_iso()
exp = default_expiry_iso(years=years)
features = all_features_for_tier(tier)
key = license_key or _generate_license_key(tier)
unsigned = License(
name=name, email=email, license_key=key, tier=tier,
features=features, issued_at=now, expires_at=exp,
signature="",
)
sig = crypto.sign(unsigned.to_canonical_dict())
signed = License(
name=unsigned.name, email=unsigned.email,
license_key=unsigned.license_key, tier=unsigned.tier,
features=unsigned.features, issued_at=unsigned.issued_at,
expires_at=unsigned.expires_at, signature=sig,
)
self.save(signed)
return signed
def _generate_license_key(tier: Tier) -> str:
"""Human-readable but unguessable key id.
Format: ``DT1-{TIER}-{8 hex}-{8 hex}``. The two random hex blocks
come from a single UUID4 so the key has 64 bits of entropy. Not
used as the cryptographic identity — that's the signature — but
it's a stable handle for support emails.
"""
rid = uuid.uuid4().hex
return f"DT1-{tier.value.upper()}-{rid[:8]}-{rid[8:16]}"
def _validate_registration(name: str, email: str) -> None:
"""Reject obviously-bad inputs before touching crypto.
The activation page should call this too so the error surfaces
immediately instead of from inside the verifier.
"""
if not name or not name.strip():
raise InvalidLicenseError("Name is required for registration.")
if not email or not _EMAIL_RE.match(email.strip()):
raise InvalidLicenseError(
f"{email!r} is not a valid email address. "
"Expected: ``local@domain.tld``."
)
def _truthy_env(name: str) -> bool:
v = os.environ.get(name, "")
return v.strip().lower() in {"1", "true", "yes", "on"}
# ---------------------------------------------------------------------------
# Singleton + module-level convenience
# ---------------------------------------------------------------------------
_singleton: Optional[LicenseManager] = None
def get_manager() -> LicenseManager:
"""Return the process-wide :class:`LicenseManager`.
Re-uses the same instance across imports so the GUI's sidebar,
the chrome gate, and the CLI guard share one cached license read.
Tests that need isolation should construct their own manager
instead.
"""
global _singleton
if _singleton is None:
_singleton = LicenseManager()
return _singleton
def reset_singleton_for_tests() -> None:
"""Drop the cached singleton. Used by the test fixture so each
test session starts with a fresh manager pointed at its tmp
license path."""
global _singleton
_singleton = None
def current_state() -> LicenseState:
return get_manager().current_state()
def require_feature(feature: str | FeatureFlag) -> License:
return get_manager().require_feature(feature)

181
src/license/schema.py Normal file
View File

@@ -0,0 +1,181 @@
"""License schema — dataclasses + enums.
Wire format (the contents of ``~/.datatools/license.json`` AND the
base64-decoded activation blob)::
{
"name": "Jane Doe",
"email": "jane@example.com",
"license_key": "DT1-CORE-1A2B3C4D-5E6F7G8H",
"tier": "core",
"features": ["01_deduplicator", "02_text_cleaner", ...],
"issued_at": "2026-05-13T00:00:00Z",
"expires_at": "2027-05-13T00:00:00Z",
"signature": "<hex hmac-sha256>"
}
The signature is the HMAC over the canonical JSON of every field
*except* ``signature`` itself (see :mod:`.crypto`). Keeping the schema
strictly additive means future builds can verify older licenses as
long as they ship the same secret.
"""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any
class Tier(str, Enum):
"""License tier. Drives the feature set the active license unlocks.
Order matters: TRIAL < CORE < PRO < ENTERPRISE. A higher tier
inherits every feature of the lower tiers — see
:data:`.features.FEATURES_BY_TIER`.
"""
TRIAL = "trial"
CORE = "core"
PRO = "pro"
ENTERPRISE = "enterprise"
class FeatureFlag(str, Enum):
"""Stable feature identifiers. Match the ``tool_id`` field in
:mod:`src.gui.tools_registry` so the GUI's per-tool gating can
share the same string keys.
Future SKUs ship by adding new flags here and adding them to a
new tier in ``FEATURES_BY_TIER`` — no consumer code changes.
"""
DEDUPLICATOR = "01_deduplicator"
TEXT_CLEANER = "02_text_cleaner"
FORMAT_STANDARDIZER = "03_format_standardizer"
MISSING_HANDLER = "04_missing_handler"
COLUMN_MAPPER = "05_column_mapper"
OUTLIER_DETECTOR = "06_outlier_detector"
MULTI_FILE_MERGER = "07_multi_file_merger"
VALIDATOR_REPORTER = "08_validator_reporter"
PIPELINE_RUNNER = "09_pipeline_runner"
def _utcnow_iso() -> str:
"""Return current UTC time in ISO-8601 with explicit ``Z`` suffix.
``datetime.utcnow`` is deprecated in CPython 3.12; using a
tz-aware UTC datetime and slicing off the ``+00:00`` keeps the
serialized form short and human-readable.
"""
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _parse_iso(s: str) -> datetime:
"""Parse one of our ISO strings into a tz-aware datetime."""
# Accept both ``...Z`` and ``...+00:00`` so future format tweaks
# don't break old files.
if s.endswith("Z"):
s = s[:-1] + "+00:00"
dt = datetime.fromisoformat(s)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
@dataclass(frozen=True)
class License:
"""One activated license. Immutable — renew/upgrade produces a new
instance, never mutates an existing one."""
name: str
email: str
license_key: str
tier: Tier
features: tuple[str, ...]
issued_at: str # ISO-8601 UTC
expires_at: str # ISO-8601 UTC
signature: str = "" # populated by ``crypto.sign``
# --- Convenience accessors ------------------------------------------------
@property
def issued_dt(self) -> datetime:
return _parse_iso(self.issued_at)
@property
def expires_dt(self) -> datetime:
return _parse_iso(self.expires_at)
def is_expired(self, *, now: datetime | None = None) -> bool:
ref = now or datetime.now(timezone.utc)
return ref >= self.expires_dt
def days_remaining(self, *, now: datetime | None = None) -> int:
ref = now or datetime.now(timezone.utc)
delta = self.expires_dt - ref
# ``int(days)`` floors towards 0 for negatives — we use
# ``max(..., 0)`` for the display path and the raw value for
# the test path. Callers wanting "expired by N days" should
# use ``is_expired`` first.
return delta.days
def has_feature(self, feature: str | FeatureFlag) -> bool:
key = feature.value if isinstance(feature, FeatureFlag) else feature
return key in self.features
# --- Serialization --------------------------------------------------------
def to_canonical_dict(self) -> dict[str, Any]:
"""Return the JSON-canonical dict the HMAC is computed over.
Excludes ``signature`` so signing and verifying both agree on
the message bytes.
"""
d = asdict(self)
d.pop("signature", None)
d["tier"] = self.tier.value if isinstance(self.tier, Tier) else self.tier
d["features"] = list(self.features)
return d
def to_dict(self) -> dict[str, Any]:
"""Return the on-disk dict, signature included."""
d = self.to_canonical_dict()
d["signature"] = self.signature
return d
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "License":
"""Inverse of :meth:`to_dict`. Tolerant of missing optional
fields (defaults), strict on required ones (raises KeyError).
"""
tier_raw = data["tier"]
tier = tier_raw if isinstance(tier_raw, Tier) else Tier(tier_raw)
return cls(
name=str(data["name"]),
email=str(data["email"]),
license_key=str(data["license_key"]),
tier=tier,
features=tuple(data.get("features", ())),
issued_at=str(data["issued_at"]),
expires_at=str(data["expires_at"]),
signature=str(data.get("signature", "")),
)
# Public helper exposed for the activation flow (1-year default).
def default_expiry_iso(years: int = 1, *, now: datetime | None = None) -> str:
"""Return an ISO timestamp *years* from *now* (default: current UTC)."""
ref = now or datetime.now(timezone.utc)
# ``replace(year=...)`` handles leap-year edge cases via the
# ``timedelta`` fallback below for Feb-29 issued dates.
try:
target = ref.replace(year=ref.year + years)
except ValueError:
# Feb 29 + N years where target year isn't a leap year — slide
# to Feb 28. Acceptable; the buyer is one day short of an
# exact year boundary on a date they almost certainly didn't
# pick on purpose.
target = ref.replace(year=ref.year + years, day=28)
return target.strftime("%Y-%m-%dT%H:%M:%SZ")

76
src/license/storage.py Normal file
View File

@@ -0,0 +1,76 @@
"""Where the activated license lives on disk.
Default path: ``~/.datatools/license.json``. Overridable via
``$DATATOOLS_LICENSE_PATH`` for tests (the conftest fixture uses this
to point each test session at a tmp file).
The directory is created lazily on first write — we don't want to
create the user's config dir just for reading.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any, Optional
def default_license_path() -> Path:
"""The resolved license file path for the current process.
Order of resolution:
1. ``$DATATOOLS_LICENSE_PATH`` (absolute path; used by tests).
2. ``~/.datatools/license.json`` (everyone else).
"""
override = os.environ.get("DATATOOLS_LICENSE_PATH")
if override:
return Path(override).expanduser().resolve()
return Path.home() / ".datatools" / "license.json"
def read_raw(path: Optional[Path] = None) -> Optional[dict[str, Any]]:
"""Return the on-disk license dict, or ``None`` if no file exists.
Anything else (truncated file, invalid JSON) raises ``ValueError``
so the caller surfaces it as :class:`InvalidLicenseError`. We
don't try to recover from a corrupt license file — a user that
sees "invalid license" can paste their blob again.
"""
p = path or default_license_path()
if not p.exists():
return None
try:
return json.loads(p.read_text(encoding="utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
raise ValueError(f"License file at {p} is corrupted: {e}") from e
def write_raw(data: dict[str, Any], path: Optional[Path] = None) -> Path:
"""Atomically write *data* to the license path.
Atomic = write-to-temp-then-rename, so a crashed write doesn't
leave a half-written license file that would fail verification on
the next launch.
"""
p = path or default_license_path()
p.parent.mkdir(parents=True, exist_ok=True)
tmp = p.with_suffix(p.suffix + ".tmp")
tmp.write_text(
json.dumps(data, indent=2, sort_keys=True), encoding="utf-8",
)
tmp.replace(p)
return p
def remove(path: Optional[Path] = None) -> bool:
"""Delete the license file. Returns ``True`` if a file was
removed, ``False`` if nothing was there. Used by the
``datatools-license deactivate`` command and by test cleanup."""
p = path or default_license_path()
try:
p.unlink()
return True
except FileNotFoundError:
return False

182
src/license_cli.py Normal file
View File

@@ -0,0 +1,182 @@
"""CLI for license management.
Five commands:
- ``activate BLOB --name NAME --email EMAIL``
First-time activation. Verifies the signed blob, ensures the
name + email match, writes ``~/.datatools/license.json``.
- ``renew BLOB``
Apply a renewal blob to the currently-active license. The blob's
embedded name + email must match the active license; tier may
differ (upgrade path).
- ``status [--json]``
Print the current license state. Human-readable by default;
``--json`` emits the same payload as a JSON document for piping
into shell scripts / monitoring.
- ``trial --name NAME --email EMAIL [--years N]``
Self-issue a trial license without a paid blob. Useful for
evaluating the product or for support to repro a buyer's issue
locally without needing a real key.
- ``deactivate``
Remove the local license file.
This CLI is exempt from the guard that protects every tool CLI —
otherwise a user with no license couldn't run ``activate``.
"""
from __future__ import annotations
import json
import sys
from typing import Optional
import typer
from src.license import (
LicenseError,
Tier,
get_manager,
)
from src.license.manager import LicenseManager
app = typer.Typer(
name="license",
help=(
"Manage your DataTools license: activate a paid blob, renew, "
"check status, or self-issue a 1-year trial.\n\n"
"All operations are local — no internet calls. The signed "
"license file lives at ~/.datatools/license.json (override "
"with $DATATOOLS_LICENSE_PATH)."
),
add_completion=False,
no_args_is_help=True,
)
@app.command()
def activate(
blob: str = typer.Argument(..., help="License blob from the delivery email (starts with DTLIC1:)."),
name: str = typer.Option(..., "--name", "-n", help="Buyer name (must match the blob)."),
email: str = typer.Option(..., "--email", "-e", help="Buyer email (must match the blob)."),
) -> None:
"""Verify and install a license blob."""
mgr = get_manager()
try:
lic = mgr.activate_from_blob(blob, name=name, email=email)
except LicenseError as e:
typer.echo(f"Activation failed: {e}", err=True)
raise typer.Exit(code=2)
typer.echo(
f"Activated. Tier: {lic.tier.value} · "
f"Key: {lic.license_key} · "
f"Expires: {lic.expires_at[:10]}"
)
@app.command()
def renew(
blob: str = typer.Argument(..., help="Renewal blob from the renewal email."),
) -> None:
"""Apply a renewal blob to the currently active license."""
mgr = get_manager()
try:
lic = mgr.renew(blob)
except LicenseError as e:
typer.echo(f"Renewal failed: {e}", err=True)
raise typer.Exit(code=2)
typer.echo(
f"Renewed. New expiry: {lic.expires_at[:10]} "
f"({lic.days_remaining()} days)"
)
@app.command()
def status(
json_output: bool = typer.Option(False, "--json", help="Emit JSON instead of human-readable text."),
) -> None:
"""Print the current license state."""
state = get_manager().current_state()
if json_output:
typer.echo(json.dumps(state.as_dict(), indent=2))
return
if not state.activated:
typer.echo("Status: not activated.")
typer.echo(
"Run: python -m src.license_cli activate <blob> "
"--name 'Your Name' --email you@example.com"
)
raise typer.Exit(code=1)
if state.error_kind == "invalid":
typer.echo(f"Status: invalid. {state.error_message}")
raise typer.Exit(code=2)
if state.error_kind == "expired":
typer.echo(
f"Status: expired on {state.expires_at[:10]}. "
f"Renew with: python -m src.license_cli renew <blob>"
)
raise typer.Exit(code=2)
typer.echo(
f"Status: active.\n"
f" Name: {state.name}\n"
f" Email: {state.email}\n"
f" Tier: {state.tier}\n"
f" Key: {state.license_key}\n"
f" Issued: {state.issued_at[:10]}\n"
f" Expires: {state.expires_at[:10]} ({state.days_remaining} days)\n"
f" Features: {', '.join(state.features)}"
)
@app.command()
def trial(
name: str = typer.Option(..., "--name", "-n"),
email: str = typer.Option(..., "--email", "-e"),
years: int = typer.Option(1, "--years", help="Trial length (default: 1 year)."),
) -> None:
"""Self-issue a trial license without a paid blob."""
mgr = get_manager()
try:
lic = mgr.issue_trial(name=name, email=email, years=years)
except LicenseError as e:
typer.echo(f"Trial issuance failed: {e}", err=True)
raise typer.Exit(code=2)
typer.echo(
f"Trial issued. Key: {lic.license_key} · "
f"Expires: {lic.expires_at[:10]} ({lic.days_remaining()} days)"
)
@app.command()
def deactivate(
confirm: bool = typer.Option(
False, "--yes", "-y", help="Skip the interactive confirmation.",
),
) -> None:
"""Remove the local license file (does NOT contact a server)."""
if not confirm:
if not typer.confirm(
"This removes the license file at ~/.datatools/license.json. Continue?",
default=False,
):
typer.echo("Aborted.")
raise typer.Exit(code=1)
removed = get_manager().deactivate()
if removed:
typer.echo("Deactivated. The license file has been removed.")
else:
typer.echo("No license was active; nothing to deactivate.")
def main() -> None:
app()
if __name__ == "__main__":
main()

View File

@@ -1,5 +1,7 @@
"""Shared test fixtures."""
import os
import pandas as pd
import pytest
from pathlib import Path
@@ -7,6 +9,84 @@ from pathlib import Path
SAMPLES_DIR = Path(__file__).parent.parent / "samples"
# ---------------------------------------------------------------------------
# License gating bypass
# ---------------------------------------------------------------------------
#
# Every CLI entry point and every GUI page now requires a valid license,
# but the test suite shouldn't be in the business of paying for or
# generating licenses on every run. The session-scoped autouse fixture
# below sets the dev-mode env var BEFORE any test code (including
# parametrize-time imports) runs, so all 1900+ existing tests continue
# to pass.
#
# Individual license tests that DO want to exercise the real activation
# flow either:
# - clear the env var themselves and point the manager at a tmp file, or
# - use the explicit ``activated_license_manager`` / ``unactivated_license_manager``
# fixtures defined below.
@pytest.fixture(scope="session", autouse=True)
def _enable_license_dev_mode():
"""Bypass license checks for every test by default.
Set in the env so subprocess-based tests (test_install, test_e2e)
inherit it without each test needing to plumb the env var.
"""
previous = os.environ.get("DATATOOLS_DEV_MODE")
os.environ["DATATOOLS_DEV_MODE"] = "1"
try:
yield
finally:
if previous is None:
os.environ.pop("DATATOOLS_DEV_MODE", None)
else:
os.environ["DATATOOLS_DEV_MODE"] = previous
@pytest.fixture
def isolated_license_path(tmp_path, monkeypatch):
"""Point the license manager at a fresh tmp file for one test.
Useful when a test wants to exercise the real activation flow:
create + sign + verify the license bytes in a controlled location
without polluting ``~/.datatools/license.json``.
Also clears the dev-mode bypass so the manager actually consults
the file.
"""
path = tmp_path / "license.json"
monkeypatch.setenv("DATATOOLS_LICENSE_PATH", str(path))
monkeypatch.delenv("DATATOOLS_DEV_MODE", raising=False)
# The manager singleton caches its handle across tests; drop it
# so the new env vars take effect.
from src.license.manager import reset_singleton_for_tests
reset_singleton_for_tests()
yield path
reset_singleton_for_tests()
@pytest.fixture
def activated_license_manager(isolated_license_path):
"""Yield a LicenseManager pointed at a tmp file, pre-activated as
a Core user. The license is freshly signed with the current
secret so verification succeeds.
"""
from src.license import LicenseManager, Tier
mgr = LicenseManager()
mgr._mint(name="Test User", email="test@example.com", tier=Tier.CORE)
return mgr
@pytest.fixture
def unactivated_license_manager(isolated_license_path):
"""Yield a LicenseManager pointed at a tmp file with NO license
file. Useful for testing the activation flow + gate behaviour.
"""
from src.license import LicenseManager
return LicenseManager()
@pytest.fixture
def sample_csv_path():
return SAMPLES_DIR / "messy_sales.csv"

View File

@@ -0,0 +1,254 @@
"""GUI activation + license-gate tests.
These exercise the chrome-level gate that ``hide_streamlit_chrome``
installs: when no valid license is on disk, every page renders the
activation form instead of the page body, and tool widgets do NOT
appear. We test against the Deduplicator page since it's the smallest
real-world tool that depends on chrome.
The autouse fixture in ``tests/conftest.py`` sets
``DATATOOLS_DEV_MODE=1``, which the GUI gate respects. Each test
below uses ``monkeypatch`` to clear that env var so the real gate
fires; ``isolated_license_path`` then redirects the manager to a
tmp file.
"""
from __future__ import annotations
import pytest
from streamlit.testing.v1 import AppTest
from .conftest import collected_text, stash_upload, with_language
@pytest.fixture
def no_license_env(monkeypatch, tmp_path):
"""Clear dev mode and point the license at a fresh empty tmp path."""
monkeypatch.delenv("DATATOOLS_DEV_MODE", raising=False)
monkeypatch.setenv("DATATOOLS_LICENSE_PATH", str(tmp_path / "license.json"))
from src.license.manager import reset_singleton_for_tests
reset_singleton_for_tests()
yield tmp_path / "license.json"
reset_singleton_for_tests()
@pytest.fixture
def trial_license(no_license_env):
"""Pre-activate a 1-year trial license for tests that need to
pass the gate."""
from src.license import LicenseManager, Tier
mgr = LicenseManager()
mgr.issue_trial(name="Test User", email="test@example.com")
yield mgr
class TestGateBlocksWithoutLicense:
"""When no license file exists, every page should render the
activation form and short-circuit the tool body."""
def test_home_renders_activation_form(self, no_license_env, home_app):
home_app.run()
text = collected_text(home_app)
assert "Activate DataTools" in text or "Activar DataTools" in text
def test_dedup_page_does_not_render_tool_widgets(
self, no_license_env, app_factory,
):
app = app_factory("1_Deduplicator")
app.run()
# Without a license, the page should NOT have the dedup-
# specific advanced-options expander or Find Duplicates button.
labels = [b.label for b in app.button]
assert not any("Find Duplicates" in lbl for lbl in labels), (
f"tool widgets leaked past the gate; got: {labels}"
)
def test_activation_form_localizes_to_spanish(
self, no_license_env, home_app,
):
with_language(home_app, "es")
home_app.run()
text = collected_text(home_app)
assert "Activar DataTools" in text
def test_sidebar_shows_not_activated(self, no_license_env, home_app):
home_app.run()
# Sidebar caption "🔒 Not activated".
captions = [c.value for c in home_app.sidebar.caption]
joined = " ".join(captions)
assert "Not activated" in joined or "Sin activar" in joined
class TestGatePassesWithTrialLicense:
def test_home_renders_full_grid(self, trial_license, home_app):
home_app.run()
text = collected_text(home_app)
# With a valid license, the activation form should NOT be the
# primary content; we should see the home title + tool cards.
assert "Data Cleaning Mastery" in text
assert "Activate DataTools" not in text # form not shown inline
def test_sidebar_shows_active_status(self, trial_license, home_app):
home_app.run()
captions = " ".join(c.value for c in home_app.sidebar.caption)
# "Trial · 364 days left" (give or take one).
assert "Trial" in captions or "Prueba" in captions
assert "days left" in captions or "días" in captions
def test_dedup_page_renders_tool_widgets(
self, trial_license, app_factory, small_csv_bytes,
):
app = app_factory("1_Deduplicator")
stash_upload(app, name="messy.csv", data=small_csv_bytes)
app.run()
labels = [b.label for b in app.button]
assert any("Find Duplicates" in lbl for lbl in labels), (
f"tool widgets blocked by gate even with valid license; "
f"got: {labels}"
)
class TestActivationFormSubmission:
"""End-to-end: paste a generated blob into the inline form and
confirm the gate releases on next render."""
def test_paste_blob_activates_and_unlocks(
self, no_license_env, home_app,
):
# Generate a blob the same way scripts/generate_license.py does.
from src.license import LicenseManager, Tier
from src.license.crypto import encode_blob
mint_mgr = LicenseManager()
# Use a separate tmp path so the trial we mint doesn't fight
# with the manager the GUI will use.
from tempfile import mkstemp
from pathlib import Path
_, p = mkstemp(suffix=".json")
mint_mgr._path = Path(p)
lic = mint_mgr._mint(
name="Buyer", email="buyer@example.com", tier=Tier.CORE,
)
blob = encode_blob(lic.to_dict())
# Wipe the temporary mint manager so its file doesn't collide.
mint_mgr.deactivate()
# Now drive the real GUI manager.
home_app.run()
# The activation form is inline in chrome — its widget keys
# are prefixed ``gate_``.
home_app.text_input(key="gate_name").set_value("Buyer").run()
home_app.text_input(key="gate_email").set_value("buyer@example.com").run()
home_app.text_area(key="gate_blob").set_value(blob).run()
# Submit primary form button.
submit = next(
b for b in home_app.button
if b.label in ("Activate", "Apply renewal")
)
submit.click().run()
# After activation the page reruns and the activation form
# should be gone — we should see the home page proper.
text = collected_text(home_app)
assert "Data Cleaning Mastery" in text
def test_trial_button_self_issues_license(
self, no_license_env, home_app,
):
home_app.run()
home_app.text_input(key="gate_name").set_value("Trial").run()
home_app.text_input(key="gate_email").set_value("trial@example.com").run()
# Click the trial button on the same form.
trial_btn = next(
b for b in home_app.button
if "trial" in b.label.lower() or "prueba" in b.label.lower()
)
trial_btn.click().run()
text = collected_text(home_app)
# Successful activation → home page renders fully.
assert "Data Cleaning Mastery" in text
class TestActivationPageDirect:
"""``pages/_Activate.py`` renders the same form regardless of
license state — buyer can revisit it to review or deactivate."""
def test_activate_page_renders_with_valid_license(
self, trial_license, app_factory,
):
app = app_factory("_Activate")
app.run()
text = collected_text(app)
# Page title localized.
assert "Activate DataTools" in text
# Deactivate option only shown after activation.
labels = [b.label for b in app.button]
assert any("Deactivate" in lbl for lbl in labels)
def test_activate_page_renders_without_license(
self, no_license_env, app_factory,
):
app = app_factory("_Activate")
app.run()
text = collected_text(app)
assert "Activate DataTools" in text
# Deactivate button should NOT appear when nothing is active.
labels = [b.label for b in app.button]
assert not any("Deactivate" in lbl for lbl in labels)
class TestSidebarRenewalWarning:
"""A license with <30 days remaining surfaces a sidebar warning."""
def test_renewal_warning_appears_under_30_days(
self, no_license_env, home_app, monkeypatch,
):
# Mint a license with 7 days left.
from datetime import datetime, timedelta, timezone
from src.license import License, LicenseManager, Tier
from src.license import crypto as _crypto
from src.license.features import all_features_for_tier
future = (datetime.now(timezone.utc) + timedelta(days=7)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
lic = License(
name="X", email="x@x.com",
license_key="DT1-CORE-EXPIRING",
tier=Tier.CORE,
features=all_features_for_tier(Tier.CORE),
issued_at="2026-05-13T00:00:00Z",
expires_at=future,
signature="",
)
sig = _crypto.sign(lic.to_canonical_dict())
signed = License(**{**lic.__dict__, "signature": sig})
LicenseManager().save(signed)
home_app.run()
sidebar_warnings = [w.body for w in home_app.sidebar.warning if w.body]
joined = " ".join(sidebar_warnings)
assert "expires in" in joined.lower() or "caduca en" in joined.lower(), (
f"expected renewal warning in sidebar; got: {sidebar_warnings}"
)
class TestLicenseStatusBadgeI18n:
"""Sidebar status badge tier name must localize."""
def test_core_tier_localizes_in_spanish(
self, no_license_env, home_app, monkeypatch,
):
from src.license import LicenseManager, Tier
LicenseManager()._mint(
name="X", email="x@x.com", tier=Tier.CORE,
)
with_language(home_app, "es")
home_app.run()
captions = " ".join(c.value for c in home_app.sidebar.caption)
# The es pack maps ``license.tier_core`` to "Core" — same word
# in Spanish — but the surrounding template (``días restantes``)
# localizes.
assert "días restantes" in captions, (
f"Spanish status label missing; sidebar captions: {captions}"
)

430
tests/test_license.py Normal file
View File

@@ -0,0 +1,430 @@
"""Unit tests for the license layer.
Covers:
- Schema: License dataclass roundtrip + expiration helpers.
- Crypto: HMAC sign/verify, tamper detection, blob encode/decode.
- Manager: activation, renewal, deactivation, feature gating,
expiration handling, dev-mode bypass, name/email mismatch
rejection.
The session-scoped autouse fixture in ``conftest.py`` sets
``DATATOOLS_DEV_MODE=1`` for the suite. Tests in this file that need
the real check explicitly use the ``isolated_license_path`` fixture
which clears it.
"""
from __future__ import annotations
import json
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
from src.license import (
ExpiredLicenseError,
FeatureFlag,
InvalidLicenseError,
License,
LicenseError,
LicenseManager,
NotActivatedError,
Tier,
UnsupportedFeatureError,
)
from src.license.crypto import (
_DEFAULT_SECRET,
decode_blob,
encode_blob,
sign,
verify,
)
from src.license.features import FEATURES_BY_TIER, all_features_for_tier
from src.license.schema import default_expiry_iso
# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
class TestLicenseSchema:
def _make(self, **overrides) -> License:
defaults = dict(
name="Jane Doe",
email="jane@example.com",
license_key="DT1-CORE-AAAA-BBBB",
tier=Tier.CORE,
features=("01_deduplicator",),
issued_at="2026-05-13T00:00:00Z",
expires_at="2027-05-13T00:00:00Z",
signature="deadbeef",
)
defaults.update(overrides)
return License(**defaults)
def test_to_dict_roundtrip(self):
lic = self._make()
again = License.from_dict(lic.to_dict())
assert again == lic
def test_canonical_dict_excludes_signature(self):
lic = self._make()
canon = lic.to_canonical_dict()
assert "signature" not in canon
assert canon["name"] == "Jane Doe"
def test_is_expired_false_when_future(self):
future = (datetime.now(timezone.utc) + timedelta(days=30)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
lic = self._make(expires_at=future)
assert not lic.is_expired()
assert lic.days_remaining() >= 29
def test_is_expired_true_when_past(self):
past = (datetime.now(timezone.utc) - timedelta(days=1)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
lic = self._make(expires_at=past)
assert lic.is_expired()
def test_has_feature_accepts_string_and_enum(self):
lic = self._make(features=("01_deduplicator", "02_text_cleaner"))
assert lic.has_feature("01_deduplicator")
assert lic.has_feature(FeatureFlag.TEXT_CLEANER)
assert not lic.has_feature(FeatureFlag.PIPELINE_RUNNER)
def test_default_expiry_one_year_default(self):
now = datetime(2026, 5, 13, tzinfo=timezone.utc)
exp = default_expiry_iso(now=now)
# One year from 2026-05-13 is 2027-05-13 (2027 not a leap year).
assert exp.startswith("2027-05-13")
def test_default_expiry_leap_day_fallback(self):
# Feb 29 + 1y where target year (2027) isn't a leap year — we
# slide to Feb 28. Pin that contract.
leap = datetime(2024, 2, 29, tzinfo=timezone.utc)
exp = default_expiry_iso(years=3, now=leap)
# 2024 + 3 = 2027; not a leap year.
assert exp.startswith("2027-02-28")
# ---------------------------------------------------------------------------
# Crypto
# ---------------------------------------------------------------------------
class TestSignAndVerify:
def test_sign_is_deterministic(self):
payload = {"a": 1, "b": "hello"}
assert sign(payload) == sign(payload)
def test_verify_accepts_matching_signature(self):
payload = {"a": 1, "b": "hello"}
sig = sign(payload)
assert verify(payload, sig) is True
def test_verify_rejects_modified_payload(self):
payload = {"a": 1, "b": "hello"}
sig = sign(payload)
modified = dict(payload, b="goodbye")
assert verify(modified, sig) is False
def test_verify_rejects_modified_signature(self):
payload = {"a": 1}
sig = sign(payload)
# Flip one nibble.
bad = sig[:-1] + ("0" if sig[-1] != "0" else "1")
assert verify(payload, bad) is False
def test_sign_respects_secret_env_override(self, monkeypatch):
payload = {"a": 1}
monkeypatch.setenv("DATATOOLS_LICENSE_SECRET", "alternate")
alt = sign(payload)
monkeypatch.delenv("DATATOOLS_LICENSE_SECRET", raising=False)
default = sign(payload)
assert alt != default
def test_canonical_form_is_key_order_invariant(self):
a = {"x": 1, "y": 2}
b = {"y": 2, "x": 1}
assert sign(a) == sign(b)
class TestBlobEncodeDecode:
def test_roundtrip(self):
payload = {"name": "Jane", "tier": "core", "signature": "abc"}
blob = encode_blob(payload)
again = decode_blob(blob)
assert again == payload
def test_blob_has_human_readable_prefix(self):
blob = encode_blob({"x": 1})
assert blob.startswith("DTLIC1:")
def test_decode_rejects_missing_prefix(self):
with pytest.raises(ValueError, match="DTLIC1"):
decode_blob("not-a-blob")
def test_decode_rejects_bad_base64(self):
with pytest.raises(ValueError, match="base64"):
decode_blob("DTLIC1:!!!notbase64!!!")
def test_decode_rejects_truncated_blob(self):
blob = encode_blob({"x": 1})
truncated = blob[:-5]
with pytest.raises(ValueError):
decode_blob(truncated)
# ---------------------------------------------------------------------------
# Features
# ---------------------------------------------------------------------------
class TestFeatures:
def test_every_tier_has_features(self):
for tier in Tier:
assert FEATURES_BY_TIER[tier], (
f"tier {tier!r} has an empty feature set"
)
def test_all_features_for_tier_returns_sorted_tuple(self):
flags = all_features_for_tier(Tier.CORE)
assert flags == tuple(sorted(flags))
def test_core_unlocks_every_tool(self):
"""v1 SKU contract: Core = all 9 tools."""
flags = set(all_features_for_tier(Tier.CORE))
assert {f.value for f in FeatureFlag} <= flags
# ---------------------------------------------------------------------------
# Manager: activation flow
# ---------------------------------------------------------------------------
class TestManagerActivation:
def test_first_load_returns_none_when_no_file(
self, unactivated_license_manager,
):
assert unactivated_license_manager.load() is None
assert not unactivated_license_manager.is_activated()
assert not unactivated_license_manager.is_valid()
def test_issue_trial_writes_file_and_returns_license(
self, unactivated_license_manager, isolated_license_path,
):
lic = unactivated_license_manager.issue_trial(
name="Trial User", email="trial@example.com",
)
assert lic.tier == Tier.TRIAL
assert lic.name == "Trial User"
assert isolated_license_path.exists()
def test_trial_signature_round_trips(
self, unactivated_license_manager, isolated_license_path,
):
unactivated_license_manager.issue_trial(
name="A", email="a@b.com",
)
mgr2 = LicenseManager()
lic2 = mgr2.load()
assert lic2 is not None
assert lic2.name == "A"
def test_activate_from_blob_round_trips(
self, unactivated_license_manager,
):
# Use the manager itself to mint then re-activate from blob.
mgr = unactivated_license_manager
mgr.issue_trial(name="Buyer", email="buyer@example.com")
lic = mgr.load()
# Re-encode as if shipped via Gumroad.
blob = encode_blob(lic.to_dict())
# Deactivate then re-activate from the blob.
mgr.deactivate()
mgr2 = LicenseManager()
again = mgr2.activate_from_blob(blob, name="Buyer", email="buyer@example.com")
assert again.license_key == lic.license_key
def test_activate_rejects_wrong_name(self, unactivated_license_manager):
mgr = unactivated_license_manager
mgr.issue_trial(name="Buyer", email="buyer@example.com")
lic = mgr.load()
blob = encode_blob(lic.to_dict())
mgr.deactivate()
with pytest.raises(InvalidLicenseError, match="do not match"):
mgr.activate_from_blob(
blob, name="Different Person", email="buyer@example.com",
)
def test_activate_rejects_wrong_email(self, unactivated_license_manager):
mgr = unactivated_license_manager
mgr.issue_trial(name="Buyer", email="buyer@example.com")
lic = mgr.load()
blob = encode_blob(lic.to_dict())
mgr.deactivate()
with pytest.raises(InvalidLicenseError, match="do not match"):
mgr.activate_from_blob(
blob, name="Buyer", email="someone-else@example.com",
)
def test_activate_rejects_tampered_blob(self, unactivated_license_manager):
mgr = unactivated_license_manager
mgr.issue_trial(name="Buyer", email="buyer@example.com")
lic = mgr.load()
# Tamper: bump tier to enterprise without re-signing.
raw = lic.to_dict()
raw["tier"] = "enterprise"
bad = encode_blob(raw)
mgr.deactivate()
with pytest.raises(InvalidLicenseError, match="signature"):
mgr.activate_from_blob(
bad, name="Buyer", email="buyer@example.com",
)
def test_activate_rejects_invalid_email_format(
self, unactivated_license_manager,
):
mgr = unactivated_license_manager
with pytest.raises(InvalidLicenseError, match="valid email"):
mgr.activate_from_blob("anything", name="x", email="not-an-email")
def test_deactivate_returns_false_when_no_file(
self, unactivated_license_manager,
):
assert unactivated_license_manager.deactivate() is False
def test_deactivate_returns_true_after_activation(
self, unactivated_license_manager,
):
unactivated_license_manager.issue_trial(
name="A", email="a@b.com",
)
assert unactivated_license_manager.deactivate() is True
# ---------------------------------------------------------------------------
# Manager: expiration + renewal
# ---------------------------------------------------------------------------
class TestExpirationAndRenewal:
def test_is_valid_false_when_expired(
self, unactivated_license_manager, isolated_license_path,
):
# Mint a license with an expiry in the past.
from src.license import crypto as _crypto
past = (datetime.now(timezone.utc) - timedelta(days=2)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
lic = License(
name="X", email="x@x.com",
license_key="DT1-CORE-XXXX-YYYY",
tier=Tier.CORE,
features=all_features_for_tier(Tier.CORE),
issued_at="2025-01-01T00:00:00Z",
expires_at=past,
signature="",
)
sig = _crypto.sign(lic.to_canonical_dict())
signed = License(
**{**lic.__dict__, "signature": sig},
)
unactivated_license_manager.save(signed)
mgr2 = LicenseManager()
assert mgr2.is_activated()
assert not mgr2.is_valid()
state = mgr2.current_state()
assert state.error_kind == "expired"
def test_renew_extends_expiry(
self, unactivated_license_manager,
):
mgr = unactivated_license_manager
old = mgr.issue_trial(name="A", email="a@b.com", years=1)
# Mint a fresh blob with a longer expiry.
mgr2 = LicenseManager()
new = mgr2._mint(name="A", email="a@b.com", tier=Tier.CORE, years=2)
blob = encode_blob(new.to_dict())
# Renew via the manager.
renewed = mgr.renew(blob)
assert renewed.tier == Tier.CORE
assert renewed.expires_dt > old.expires_dt
def test_renew_rejects_for_different_buyer(
self, unactivated_license_manager,
):
mgr = unactivated_license_manager
mgr.issue_trial(name="A", email="a@b.com")
# Mint a blob for a DIFFERENT buyer.
other = LicenseManager()
# Use a separate path so other doesn't overwrite a's file.
from tempfile import mkstemp
_, p = mkstemp(suffix=".json")
other._path = Path(p)
other_lic = other._mint(name="B", email="b@c.com", tier=Tier.CORE)
blob = encode_blob(other_lic.to_dict())
with pytest.raises(InvalidLicenseError, match="different name/email"):
mgr.renew(blob)
# ---------------------------------------------------------------------------
# Manager: feature gating
# ---------------------------------------------------------------------------
class TestFeatureGating:
def test_require_feature_passes_on_valid_license(
self, activated_license_manager,
):
# CORE unlocks every flag in v1.
for flag in FeatureFlag:
activated_license_manager.require_feature(flag)
def test_require_feature_raises_not_activated(
self, unactivated_license_manager,
):
with pytest.raises(NotActivatedError):
unactivated_license_manager.require_feature(
FeatureFlag.DEDUPLICATOR,
)
def test_require_feature_returns_license(
self, activated_license_manager,
):
lic = activated_license_manager.require_feature(
FeatureFlag.DEDUPLICATOR,
)
assert lic.name == "Test User"
# ---------------------------------------------------------------------------
# Manager: dev mode
# ---------------------------------------------------------------------------
class TestDevMode:
def test_dev_mode_bypasses_validity_check(
self, isolated_license_path, monkeypatch,
):
monkeypatch.setenv("DATATOOLS_DEV_MODE", "1")
mgr = LicenseManager()
assert mgr.is_valid() is True
# No license file exists.
assert not isolated_license_path.exists()
def test_dev_mode_state_reports_synthetic_license(
self, isolated_license_path, monkeypatch,
):
monkeypatch.setenv("DATATOOLS_DEV_MODE", "1")
mgr = LicenseManager()
state = mgr.current_state()
assert state.activated is True
assert state.valid is True
assert state.tier == "enterprise"
assert state.error_kind == ""
def test_dev_mode_off_in_test_default_env_via_explicit_clear(
self, isolated_license_path, monkeypatch,
):
# ``isolated_license_path`` already clears DEV_MODE; double-
# check that contract here so the broader suite can rely on it.
assert "DATATOOLS_DEV_MODE" not in os.environ

268
tests/test_license_cli.py Normal file
View File

@@ -0,0 +1,268 @@
"""Tests for the license CLI + the per-CLI guard.
Two layers:
1. ``src/license_cli.py`` commands — ``activate``, ``renew``,
``status``, ``trial``, ``deactivate``. Invoked via Typer's testing
helper so we get a clean ``CliRunner.invoke`` interface without
spawning subprocesses for every test.
2. ``src/cli_license_guard.py`` — verify that every existing tool CLI
refuses to run when no license is present, and that ``--help``
always works regardless of license state.
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
from pathlib import Path
import pytest
from typer.testing import CliRunner
from src.license_cli import app as license_app
PROJECT_ROOT = Path(__file__).resolve().parent.parent
# ---------------------------------------------------------------------------
# license_cli commands
# ---------------------------------------------------------------------------
class TestLicenseCliStatus:
def test_status_without_activation_exits_nonzero(self, unactivated_license_manager):
runner = CliRunner()
result = runner.invoke(license_app, ["status"])
assert result.exit_code == 1
assert "not activated" in result.stdout.lower()
def test_status_with_active_license(self, activated_license_manager):
runner = CliRunner()
result = runner.invoke(license_app, ["status"])
assert result.exit_code == 0
assert "active" in result.stdout.lower()
assert "Test User" in result.stdout
def test_status_json_emits_valid_json(self, activated_license_manager):
runner = CliRunner()
result = runner.invoke(license_app, ["status", "--json"])
assert result.exit_code == 0
data = json.loads(result.stdout)
assert data["activated"] is True
assert data["name"] == "Test User"
assert data["tier"] == "core"
assert isinstance(data["features"], list)
assert data["days_remaining"] >= 0
class TestLicenseCliTrial:
def test_trial_issues_one_year_license(self, unactivated_license_manager):
runner = CliRunner()
result = runner.invoke(license_app, [
"trial", "--name", "Trial User", "--email", "trial@example.com",
])
assert result.exit_code == 0
assert "Trial issued" in result.stdout
# And the manager now sees it as active.
from src.license import LicenseManager
mgr = LicenseManager()
assert mgr.is_valid()
lic = mgr.load()
assert lic.tier.value == "trial"
def test_trial_rejects_bad_email(self, unactivated_license_manager):
runner = CliRunner()
result = runner.invoke(license_app, [
"trial", "--name", "T", "--email", "not-an-email",
])
assert result.exit_code == 2
# ``typer.echo(..., err=True)`` lands in ``result.output`` when
# ``mix_stderr`` is the default True; ``result.stdout`` only has
# the bare stdout.
assert "valid email" in result.output.lower()
class TestLicenseCliActivate:
def _make_blob(self, name="Buyer", email="buyer@example.com", tier="core"):
"""Mint a blob via the same machinery scripts/generate_license.py uses."""
from src.license import LicenseManager, Tier
from src.license.crypto import encode_blob
# Use a throwaway manager (separate path) so we don't trample
# the one the test is exercising.
from tempfile import mkstemp
_, p = mkstemp(suffix=".json")
mgr = LicenseManager()
mgr._path = Path(p)
lic = mgr._mint(name=name, email=email, tier=Tier(tier))
Path(p).unlink(missing_ok=True)
return encode_blob(lic.to_dict())
def test_activate_round_trip(self, unactivated_license_manager):
blob = self._make_blob()
runner = CliRunner()
result = runner.invoke(license_app, [
"activate", blob,
"--name", "Buyer", "--email", "buyer@example.com",
])
assert result.exit_code == 0
assert "Activated" in result.stdout
# State is now active.
from src.license import LicenseManager
assert LicenseManager().is_valid()
def test_activate_rejects_wrong_name(self, unactivated_license_manager):
blob = self._make_blob()
runner = CliRunner()
result = runner.invoke(license_app, [
"activate", blob,
"--name", "Wrong Person", "--email", "buyer@example.com",
])
assert result.exit_code == 2
assert "do not match" in result.output.lower()
class TestLicenseCliRenew:
def test_renew_extends_expiry(self, activated_license_manager):
# Mint a longer-duration blob for the same buyer.
from src.license import LicenseManager, Tier
from src.license.crypto import encode_blob
from tempfile import mkstemp
_, p = mkstemp(suffix=".json")
other = LicenseManager()
other._path = Path(p)
lic = other._mint(
name="Test User", email="test@example.com",
tier=Tier.CORE, years=2,
)
Path(p).unlink(missing_ok=True)
blob = encode_blob(lic.to_dict())
runner = CliRunner()
result = runner.invoke(license_app, ["renew", blob])
assert result.exit_code == 0
assert "Renewed" in result.stdout
class TestLicenseCliDeactivate:
def test_deactivate_with_yes(self, activated_license_manager):
runner = CliRunner()
result = runner.invoke(license_app, ["deactivate", "--yes"])
assert result.exit_code == 0
assert "Deactivated" in result.stdout
from src.license import LicenseManager
assert not LicenseManager().is_activated()
# ---------------------------------------------------------------------------
# Guard tests — every tool CLI refuses to run without a license
# ---------------------------------------------------------------------------
class TestCliLicenseGuard:
"""Run each tool CLI as a subprocess so we exercise the real
``main()`` path, including the guard call. We bypass the suite's
DEV_MODE bypass by clearing the env var in the subprocess."""
@pytest.fixture
def clean_env(self, tmp_path):
"""Subprocess env: no DEV_MODE, license path in tmp_path."""
env = dict(os.environ)
env.pop("DATATOOLS_DEV_MODE", None)
env["DATATOOLS_LICENSE_PATH"] = str(tmp_path / "license.json")
return env
def _run(self, env, *args, expect_success=False):
proc = subprocess.run(
[sys.executable, "-m", *args],
cwd=PROJECT_ROOT,
env=env,
capture_output=True,
text=True,
timeout=60,
)
if expect_success:
assert proc.returncode == 0, (
f"Expected success, got {proc.returncode}\n"
f"stdout:\n{proc.stdout}\nstderr:\n{proc.stderr}"
)
return proc
@pytest.mark.parametrize("module", [
"src.cli",
"src.cli_text_clean",
"src.cli_format",
"src.cli_missing",
"src.cli_column_map",
"src.cli_pipeline",
"src.cli_analyze",
])
def test_cli_blocked_without_license(self, clean_env, module):
# Run with a dummy filename so we'd otherwise be running the
# tool; the guard should fire BEFORE typer parses argv.
proc = self._run(clean_env, module, "/nonexistent.csv")
assert proc.returncode == 2
assert "not activated" in proc.stderr.lower() or "license" in proc.stderr.lower()
@pytest.mark.parametrize("module", [
"src.cli",
"src.cli_text_clean",
"src.cli_format",
"src.cli_missing",
"src.cli_column_map",
"src.cli_pipeline",
"src.cli_analyze",
])
def test_help_always_works(self, clean_env, module):
# ``--help`` must bypass the guard so users can see usage
# before they activate.
proc = self._run(clean_env, module, "--help", expect_success=True)
assert "usage" in (proc.stdout + proc.stderr).lower()
def test_dev_mode_bypasses_guard(self, clean_env, tmp_path):
# Set DEV_MODE; the guard should allow the CLI to run (and
# then fail on the missing input file with a non-license
# error — which is what we're asserting via stderr).
env = dict(clean_env)
env["DATATOOLS_DEV_MODE"] = "1"
proc = self._run(env, "src.cli_analyze", "/nonexistent.csv")
# We expect typer / our code to fail on the missing path,
# NOT on the license. Look for evidence the license check
# was bypassed.
assert "not activated" not in proc.stderr.lower()
# Either pandas / our io.py raises a FileNotFoundError-ish.
combined = (proc.stdout + proc.stderr).lower()
assert "no such file" in combined or "not found" in combined or proc.returncode != 0
class TestGuardBypassesHelp:
"""Sanity check: ``--help`` and friends must skip the guard so the
Typer help screen renders even when no license is on disk."""
def test_runs_under_help_flag_without_license(self, tmp_path, monkeypatch):
# In-process check via the guard helper.
monkeypatch.delenv("DATATOOLS_DEV_MODE", raising=False)
monkeypatch.setenv("DATATOOLS_LICENSE_PATH", str(tmp_path / "license.json"))
from src.license.manager import reset_singleton_for_tests
reset_singleton_for_tests()
monkeypatch.setattr("sys.argv", ["progname", "--help"])
from src.cli_license_guard import guard
# No exception expected: --help bypasses.
guard()
def test_blocks_under_real_command_without_license(
self, tmp_path, monkeypatch,
):
monkeypatch.delenv("DATATOOLS_DEV_MODE", raising=False)
monkeypatch.setenv("DATATOOLS_LICENSE_PATH", str(tmp_path / "license.json"))
from src.license.manager import reset_singleton_for_tests
reset_singleton_for_tests()
monkeypatch.setattr("sys.argv", ["progname", "input.csv", "--apply"])
from src.cli_license_guard import guard
with pytest.raises(SystemExit) as ei:
guard()
assert ei.value.code == 2