diff --git a/scripts/generate_license.py b/scripts/generate_license.py new file mode 100644 index 0000000..1e5357e --- /dev/null +++ b/scripts/generate_license.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +"""Mint a signed license blob for a buyer. + +Creator-only tool. Reads the active HMAC secret from the environment +(``$DATATOOLS_LICENSE_SECRET``) — point it at the same secret baked +into the shipped binary or the result will fail to verify. + +Examples +-------- + +Mint a 1-year CORE license for Jane Doe:: + + python scripts/generate_license.py \\ + --name "Jane Doe" --email jane@example.com --tier core + +Mint a 2-year PRO license and write the blob to a file:: + + python scripts/generate_license.py \\ + --name "Acme Corp" --email ops@acme.com --tier pro \\ + --years 2 --output acme.dtlic + +Re-sign with a custom secret (useful for staged rollouts):: + + DATATOOLS_LICENSE_SECRET=shipping-secret-2026 \\ + python scripts/generate_license.py --name ... --email ... + +The output is a single base64-encoded token starting with ``DTLIC1:`` +— paste this whole string into the buyer's delivery email or +deliver as an attached ``.dtlic`` file. +""" + +from __future__ import annotations + +import argparse +import sys +import uuid +from pathlib import Path + +# Make ``src.license`` importable when run from the repo root. +_PROJECT_ROOT = Path(__file__).resolve().parent.parent +if str(_PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(_PROJECT_ROOT)) + +from src.license import Tier # noqa: E402 +from src.license.crypto import encode_blob, sign # noqa: E402 +from src.license.features import all_features_for_tier # noqa: E402 +from src.license.schema import ( # noqa: E402 + License, + _utcnow_iso, + default_expiry_iso, +) + + +def build_args() -> argparse.ArgumentParser: + p = argparse.ArgumentParser( + description="Mint a signed DataTools license blob.", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.add_argument("--name", required=True, help="Buyer's full name.") + p.add_argument("--email", required=True, help="Buyer's email.") + p.add_argument( + "--tier", + default=Tier.CORE.value, + choices=[t.value for t in Tier], + help="License tier (default: %(default)s).", + ) + p.add_argument( + "--years", + type=int, + default=1, + help="License lifetime in years (default: %(default)s).", + ) + p.add_argument( + "--key", + default=None, + help="Override the auto-generated license key (default: random).", + ) + p.add_argument( + "--output", + "-o", + type=Path, + default=None, + help="Write the blob to this file (default: print to stdout).", + ) + return p + + +def main(argv: list[str] | None = None) -> int: + args = build_args().parse_args(argv) + tier = Tier(args.tier) + rid = uuid.uuid4().hex + key = args.key or f"DT1-{tier.value.upper()}-{rid[:8]}-{rid[8:16]}" + + lic = License( + name=args.name, + email=args.email, + license_key=key, + tier=tier, + features=all_features_for_tier(tier), + issued_at=_utcnow_iso(), + expires_at=default_expiry_iso(years=args.years), + signature="", + ) + signature = sign(lic.to_canonical_dict()) + payload = lic.to_canonical_dict() + payload["signature"] = signature + blob = encode_blob(payload) + + if args.output: + args.output.write_text(blob + "\n", encoding="utf-8") + print(f"Wrote license to {args.output}", file=sys.stderr) + else: + print(blob) + print( + f" name: {lic.name}\n" + f" email: {lic.email}\n" + f" tier: {lic.tier.value}\n" + f" key: {lic.license_key}\n" + f" expires: {lic.expires_at}", + file=sys.stderr, + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/cli.py b/src/cli.py index 204fd73..f4002e9 100644 --- a/src/cli.py +++ b/src/cli.py @@ -499,6 +499,8 @@ def _write_match_groups(result, original_df, path: Path) -> None: # --------------------------------------------------------------------------- def main(): + from src.cli_license_guard import guard + guard() app() diff --git a/src/cli_analyze.py b/src/cli_analyze.py index f21d421..c2dafb3 100644 --- a/src/cli_analyze.py +++ b/src/cli_analyze.py @@ -151,8 +151,14 @@ def _maybe_strict_exit(findings, strict: bool) -> None: raise typer.Exit(code=1) +def main() -> None: + from src.cli_license_guard import guard + guard() + app() + + # Entrypoint when run via `python -m src.cli_analyze`. Typer's no_args_is_help # kicks in when the user invokes without args; we expose the single command at # the top level for convenience: ``python -m src.cli_analyze input.csv``. if __name__ == "__main__": - app() + main() diff --git a/src/cli_column_map.py b/src/cli_column_map.py index bd5dab9..01dba55 100644 --- a/src/cli_column_map.py +++ b/src/cli_column_map.py @@ -348,6 +348,8 @@ def _print_results(result, input_path: Path, options) -> None: # --------------------------------------------------------------------------- def main(): + from src.cli_license_guard import guard + guard() app() diff --git a/src/cli_format.py b/src/cli_format.py index b7f687f..6bbb4c3 100644 --- a/src/cli_format.py +++ b/src/cli_format.py @@ -357,6 +357,8 @@ def standardize( def main(): + from src.cli_license_guard import guard + guard() app() diff --git a/src/cli_license_guard.py b/src/cli_license_guard.py new file mode 100644 index 0000000..9f49a41 --- /dev/null +++ b/src/cli_license_guard.py @@ -0,0 +1,86 @@ +"""License guard for the tool CLIs. + +Every tool CLI (``cli.py``, ``cli_text_clean.py``, ``cli_format.py``, +``cli_missing.py``, ``cli_column_map.py``, ``cli_pipeline.py``, +``cli_analyze.py``) calls :func:`guard` from its ``main()`` before +delegating to Typer. The guard: + +1. Lets ``--help`` / ``-h`` through unconditionally so users can + always see what a command does. +2. Lets the ``DATATOOLS_DEV_MODE=1`` env var bypass the check. +3. Otherwise, verifies a valid license is on disk. If not, prints a + one-line user-facing message naming the exact ``datatools-license`` + subcommand to run, then exits with status 2. + +The exit code (2) is the same Typer uses for argument errors — fits +into ``run_tests.py`` / CI pipelines without special casing. +""" + +from __future__ import annotations + +import sys +from typing import NoReturn + + +_HELP_FLAGS = {"--help", "-h", "--version"} + + +def _is_help_invocation() -> bool: + """True when the user is asking for help, not running work.""" + return any(arg in _HELP_FLAGS for arg in sys.argv[1:]) + + +def guard() -> None: + """Block startup if no valid license. No-op when license is valid, + when called with ``--help``, or under ``DATATOOLS_DEV_MODE``.""" + if _is_help_invocation(): + return + # Lazy import so a broken license module doesn't fail ``--help``. + from src.license import ( + ExpiredLicenseError, + InvalidLicenseError, + LicenseError, + get_manager, + ) + + mgr = get_manager() + if mgr.dev_mode: + return + + try: + if mgr.is_valid(): + return + except LicenseError: + # ``is_valid()`` swallows errors and returns False, but be + # paranoid: fall through to the state-based diagnostic. + pass + + state = mgr.current_state() + _exit_with_message(state) + + +def _exit_with_message(state) -> NoReturn: + """Print the right one-liner for the current state and exit.""" + if state.error_kind == "not_activated": + msg = ( + "DataTools is not activated.\n" + "Run: python -m src.license_cli activate \n" + "Or start a 1-year trial: " + "python -m src.license_cli trial --name 'Your Name' --email you@example.com" + ) + elif state.error_kind == "expired": + msg = ( + f"License expired on {state.expires_at[:10]}.\n" + "Renew with: python -m src.license_cli renew " + ) + elif state.error_kind == "invalid": + msg = ( + "License file is present but invalid.\n" + f"Detail: {state.error_message}\n" + "Re-paste the blob from your delivery email: " + "python -m src.license_cli activate " + ) + else: + msg = "License is not valid. Run: python -m src.license_cli status" + print(f"Error: {msg}", file=sys.stderr) + raise SystemExit(2) diff --git a/src/cli_missing.py b/src/cli_missing.py index fa78bc5..02ac6d7 100644 --- a/src/cli_missing.py +++ b/src/cli_missing.py @@ -373,6 +373,8 @@ def _print_results(result, input_path: Path, options) -> None: # --------------------------------------------------------------------------- def main(): + from src.cli_license_guard import guard + guard() app() diff --git a/src/cli_pipeline.py b/src/cli_pipeline.py index 69fc4c3..2769d05 100644 --- a/src/cli_pipeline.py +++ b/src/cli_pipeline.py @@ -300,6 +300,8 @@ def run( def main() -> None: + from src.cli_license_guard import guard + guard() app() diff --git a/src/cli_text_clean.py b/src/cli_text_clean.py index bc5c163..f8ccb47 100644 --- a/src/cli_text_clean.py +++ b/src/cli_text_clean.py @@ -370,6 +370,8 @@ def _print_results(result, input_path: Path, options) -> None: # --------------------------------------------------------------------------- def main(): + from src.cli_license_guard import guard + guard() app() diff --git a/src/gui/components/__init__.py b/src/gui/components/__init__.py index ede0990..b0670de 100644 --- a/src/gui/components/__init__.py +++ b/src/gui/components/__init__.py @@ -38,12 +38,22 @@ from . import _legacy as _legacy # noqa: F401 (keep for direct access) # Names exported from _legacy.py that pages currently use. Kept here as # the canonical public list so a removal from _legacy is a visible # breaking change instead of a silent drop. +from .activation import ( # noqa: F401 re-exported + render_activation_form, + render_license_status_sidebar, + require_license_or_render_activation, +) + __all__ = [ # Shared chrome / pickup / gate "hide_streamlit_chrome", "quit_button", "pickup_or_upload", "require_normalization_gate", + # License gate + activation form + "render_activation_form", + "render_license_status_sidebar", + "require_license_or_render_activation", # Dedup widgets "config_panel", "match_group_card", diff --git a/src/gui/components/_legacy.py b/src/gui/components/_legacy.py index 7defbb3..e6fdcda 100644 --- a/src/gui/components/_legacy.py +++ b/src/gui/components/_legacy.py @@ -72,13 +72,21 @@ footer { """ -def hide_streamlit_chrome() -> None: +def hide_streamlit_chrome(*, gate_license: bool = True) -> None: """Inject CSS to hide Streamlit's default header, menu, and footer. - Also renders the sidebar language selector, since every entrypoint - that hides the default chrome wants the picker visible in the - same place. Pages that want a clean chrome without the selector can - inject ``_HIDE_CHROME_CSS`` themselves instead of calling this. + Also renders the sidebar language selector + license status badge, + since every entrypoint that hides the default chrome wants those + visible in the same place. Pages that want a clean chrome without + them can inject ``_HIDE_CHROME_CSS`` themselves instead of calling + this. + + When *gate_license* is True (the default) the function calls + :func:`require_license_or_render_activation` after the sidebar + widgets render. If no valid license is present, the activation + form replaces the page body and the page short-circuits via + ``st.stop()``. The Activate page itself passes ``False`` so it + can render its own form without recursion. """ st.markdown(_HIDE_CHROME_CSS, unsafe_allow_html=True) # Imported lazily so this module stays importable in environments @@ -86,6 +94,14 @@ def hide_streamlit_chrome() -> None: # individual legacy helpers). from src.i18n import render_language_selector render_language_selector() + # License chrome: sidebar status badge + inline gate. + from .activation import ( + render_license_status_sidebar, + require_license_or_render_activation, + ) + render_license_status_sidebar() + if gate_license: + require_license_or_render_activation() # --------------------------------------------------------------------------- diff --git a/src/gui/components/activation.py b/src/gui/components/activation.py new file mode 100644 index 0000000..1b590c4 --- /dev/null +++ b/src/gui/components/activation.py @@ -0,0 +1,218 @@ +"""Activation page rendering + license gate inline-renderer. + +Two public callables: + +- :func:`render_activation_form` — the activation page body. Used by + ``pages/_Activate.py`` for the explicit nav entry and by + :func:`require_license_or_render_activation` for the gate-injected + inline form. + +- :func:`require_license_or_render_activation` — call after + ``hide_streamlit_chrome`` from every page that needs a valid + license. When the license is missing, expired, or invalid it + renders the activation form in place of the page body and calls + ``st.stop()`` so the rest of the page doesn't execute. + +- :func:`render_license_status_sidebar` — small chrome widget the + sidebar uses to show "Core · 327 days left" or "🛑 Expired". + +The component is kept dependency-free of the page modules so any +page can import these helpers without circular imports through +``components.__init__``. +""" + +from __future__ import annotations + +from typing import Optional + +import streamlit as st + +from src.i18n import t as _t +from src.license import ( + ExpiredLicenseError, + InvalidLicenseError, + LicenseError, + NotActivatedError, + get_manager, +) + + +# --------------------------------------------------------------------------- +# Sidebar status widget +# --------------------------------------------------------------------------- + +def render_license_status_sidebar() -> None: + """Render the per-page sidebar license badge. + + Cheap to call — the manager caches the parsed license, so multiple + pages mounting the same chrome don't pay multiple disk reads. + """ + state = get_manager().current_state() + target = st.sidebar + if not state.activated: + target.caption(f"🔒 {_t('license.status_not_activated')}") + return + if state.error_kind == "invalid": + target.caption(f"⚠️ {_t('license.status_invalid')}") + return + if not state.valid: + # Activated but expired. + target.caption(f"🛑 {_t('license.status_expired')}") + return + + tier_label = _t(f"license.tier_{state.tier}") or state.tier.title() + if state.tier == "trial": + label = _t("license.status_trial", days=state.days_remaining) + else: + label = _t( + "license.status_active", + tier=tier_label, + days=state.days_remaining, + ) + target.caption(label) + if 0 < state.days_remaining <= 30: + target.warning( + _t("license.renewal_warning_30", days=state.days_remaining) + ) + + +# --------------------------------------------------------------------------- +# Activation page body +# --------------------------------------------------------------------------- + +def render_activation_form(*, key_prefix: str = "act") -> None: + """Render the full activation page body. + + *key_prefix* is appended to every Streamlit widget key so the + form can be embedded inline by ``require_license_or_render_activation`` + without colliding with an instance that's mounted by the + explicit ``_Activate.py`` page. + """ + st.title(_t("activation.title")) + st.caption(_t("activation.intro")) + + mgr = get_manager() + state = mgr.current_state() + + # Surface the current state explicitly so a user with an expired / + # invalid license sees what's wrong before they paste anything. + if state.activated and state.error_kind == "expired": + st.warning( + _t("license.renewal_warning_expired", date=state.expires_at[:10]) + ) + elif state.activated and state.error_kind == "invalid": + st.error(f"⚠️ {state.error_message}") + + st.divider() + + with st.form(key=f"{key_prefix}_form"): + name = st.text_input( + _t("activation.name_label"), + value=state.name, + help=_t("activation.name_help"), + key=f"{key_prefix}_name", + ) + email = st.text_input( + _t("activation.email_label"), + value=state.email, + help=_t("activation.email_help"), + key=f"{key_prefix}_email", + ) + blob = st.text_area( + _t("activation.blob_label"), + help=_t("activation.blob_help"), + key=f"{key_prefix}_blob", + height=120, + ) + + col_activate, col_trial = st.columns(2) + with col_activate: + is_renewal = state.activated + label = ( + _t("activation.renew_button") if is_renewal + else _t("activation.activate_button") + ) + submit = st.form_submit_button(label, type="primary") + with col_trial: + trial = st.form_submit_button( + _t("activation.trial_button"), + help=_t("activation.trial_help"), + ) + + # Process the submission. + if submit: + if not blob.strip(): + st.error(_t("activation.errors_heading") + ": license blob is empty.") + return + try: + if is_renewal: + lic = mgr.renew(blob) + st.success(_t("activation.renewed", expires=lic.expires_at[:10])) + else: + lic = mgr.activate_from_blob(blob, name=name, email=email) + st.success(_t( + "activation.success", + name=lic.name, expires=lic.expires_at[:10], + )) + # Force a clean re-render so the gate sees the new state. + st.rerun() + except LicenseError as e: + st.error(f"{_t('activation.errors_heading')}: {e}") + + if trial: + try: + lic = mgr.issue_trial(name=name, email=email) + st.success(_t( + "activation.success", + name=lic.name, expires=lic.expires_at[:10], + )) + st.rerun() + except LicenseError as e: + st.error(f"{_t('activation.errors_heading')}: {e}") + + # Deactivate (only useful when already activated). + if state.activated: + st.divider() + with st.expander(_t("activation.deactivate_button")): + st.caption(_t("activation.deactivate_help")) + if st.button( + _t("activation.deactivate_button"), + key=f"{key_prefix}_deactivate", + type="secondary", + ): + mgr.deactivate() + st.rerun() + + +# --------------------------------------------------------------------------- +# Gate +# --------------------------------------------------------------------------- + +def require_license_or_render_activation(*, feature: Optional[str] = None) -> None: + """Page-level guard. Call after ``hide_streamlit_chrome``. + + If a valid license exists (and *feature*, when supplied, is + unlocked), this is a no-op. Otherwise it renders the activation + form in place of the page body and calls ``st.stop()``. + + The chrome helper invokes this automatically so individual pages + don't have to call it; the explicit *feature* form is provided + for tools that want to bypass the global gate but enforce a tier + check (future SKU work). + """ + mgr = get_manager() + if mgr.dev_mode: + return + if mgr.is_valid(): + # When feature gating gets enabled per-tool, this is the hook. + if feature is not None: + try: + mgr.require_feature(feature) + except LicenseError as e: + st.error(f"⚠️ {e}") + st.stop() + return + + # Otherwise: render the activation form inline and stop the page. + render_activation_form(key_prefix="gate") + st.stop() diff --git a/src/gui/pages/_Activate.py b/src/gui/pages/_Activate.py new file mode 100644 index 0000000..bd4c7bd --- /dev/null +++ b/src/gui/pages/_Activate.py @@ -0,0 +1,34 @@ +"""Activate — license registration + renewal. + +Lives in the sidebar nav under an underscore-prefixed filename so +Streamlit sorts it above the numbered tool pages. The chrome's +license gate also injects the activation form inline on any other +page when no valid license is present; this page exists so a user +can revisit the form without hitting an expiration first (e.g., to +review renewal status or deactivate the device). +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import streamlit as st + +_project_root = Path(__file__).resolve().parent.parent.parent.parent +if str(_project_root) not in sys.path: + sys.path.insert(0, str(_project_root)) + +from src.gui.components import hide_streamlit_chrome, render_activation_form +from src.i18n import t + +st.set_page_config( + page_title=t("activation.page_title"), + page_icon="🔑", + layout="wide", +) + +# ``gate_license=False`` keeps the chrome from re-rendering the +# activation form on top of the form we're about to render below. +hide_streamlit_chrome(gate_license=False) +render_activation_form(key_prefix="page") diff --git a/src/i18n/packs/en.json b/src/i18n/packs/en.json index 935d11a..ece5aa7 100644 --- a/src/i18n/packs/en.json +++ b/src/i18n/packs/en.json @@ -56,6 +56,44 @@ "body": "Clicking the button below will terminate the DataTools server. Any unsaved work in other tools will be lost. Once the app shuts down you can close this window.", "button": "Close the app" }, + "activation": { + "page_title": "DataTools — Activate", + "title": "🔑 Activate DataTools", + "intro": "DataTools needs to be activated before any tools unlock. Enter the name and email tied to your purchase, then paste the license blob from your delivery email.", + "name_label": "Full name", + "name_help": "Must match the name on your purchase receipt.", + "email_label": "Email", + "email_help": "Must match the email on your purchase receipt.", + "blob_label": "License blob", + "blob_help": "Begins with `DTLIC1:` — paste the entire string.", + "activate_button": "Activate", + "renew_button": "Apply renewal", + "trial_button": "Start 1-year trial", + "trial_help": "Skips the paid blob and self-issues a 1-year license tied to your name and email. Useful for evaluating before purchase.", + "or_separator": "— or —", + "success": "Activated! Welcome, {name}. Your license is valid until {expires}.", + "renewed": "License renewed. New expiry: {expires}.", + "errors_heading": "Activation problem", + "deactivate_button": "Deactivate this device", + "deactivate_help": "Removes the local license file. You'll need to re-paste your blob to reactivate." + }, + "license": { + "status_active": "{tier} · {days} days left", + "status_trial": "Trial · {days} days left", + "status_expired": "Expired", + "status_not_activated": "Not activated", + "status_invalid": "License invalid", + "renewal_warning_30": "⚠️ License expires in {days} days. Renew soon to avoid interruption.", + "renewal_warning_expired": "🛑 License expired on {date}. Renew to continue using DataTools.", + "tier_trial": "Trial", + "tier_core": "Core", + "tier_pro": "Pro", + "tier_enterprise": "Enterprise", + "registered_to": "Registered to {name} · {email}", + "expires_on": "Expires on {date}", + "issued_on": "Issued on {date}", + "view_details": "License details" + }, "tools": { "01_deduplicator": { "name": "Deduplicator", diff --git a/src/i18n/packs/es.json b/src/i18n/packs/es.json index 55f25e4..33ec26a 100644 --- a/src/i18n/packs/es.json +++ b/src/i18n/packs/es.json @@ -56,6 +56,44 @@ "body": "Al pulsar el botón de abajo se cerrará el servidor de DataTools. Cualquier trabajo sin guardar en otras herramientas se perderá. Una vez cerrada la app, puedes cerrar esta ventana.", "button": "Cerrar la app" }, + "activation": { + "page_title": "DataTools — Activar", + "title": "🔑 Activar DataTools", + "intro": "DataTools debe activarse antes de desbloquear cualquier herramienta. Introduce el nombre y correo asociados a tu compra, y luego pega el código de licencia del correo de entrega.", + "name_label": "Nombre completo", + "name_help": "Debe coincidir con el nombre en el recibo de compra.", + "email_label": "Correo electrónico", + "email_help": "Debe coincidir con el correo del recibo de compra.", + "blob_label": "Código de licencia", + "blob_help": "Empieza con `DTLIC1:` — pega la cadena completa.", + "activate_button": "Activar", + "renew_button": "Aplicar renovación", + "trial_button": "Iniciar prueba de 1 año", + "trial_help": "Omite el código de pago y emite una licencia local de 1 año vinculada a tu nombre y correo. Útil para evaluar antes de comprar.", + "or_separator": "— o —", + "success": "¡Activado! Bienvenido, {name}. Tu licencia es válida hasta el {expires}.", + "renewed": "Licencia renovada. Nueva fecha de caducidad: {expires}.", + "errors_heading": "Problema al activar", + "deactivate_button": "Desactivar este dispositivo", + "deactivate_help": "Elimina el archivo de licencia local. Tendrás que volver a pegar tu código para reactivarla." + }, + "license": { + "status_active": "{tier} · {days} días restantes", + "status_trial": "Prueba · {days} días restantes", + "status_expired": "Caducada", + "status_not_activated": "Sin activar", + "status_invalid": "Licencia inválida", + "renewal_warning_30": "⚠️ La licencia caduca en {days} días. Renueva pronto para evitar interrupciones.", + "renewal_warning_expired": "🛑 La licencia caducó el {date}. Renuévala para seguir usando DataTools.", + "tier_trial": "Prueba", + "tier_core": "Core", + "tier_pro": "Pro", + "tier_enterprise": "Enterprise", + "registered_to": "Registrado a nombre de {name} · {email}", + "expires_on": "Caduca el {date}", + "issued_on": "Emitida el {date}", + "view_details": "Detalles de la licencia" + }, "tools": { "01_deduplicator": { "name": "Eliminador de duplicados", diff --git a/src/license/__init__.py b/src/license/__init__.py new file mode 100644 index 0000000..5f7b60e --- /dev/null +++ b/src/license/__init__.py @@ -0,0 +1,59 @@ +"""License module — registration, activation, expiration, feature gating. + +Public API the rest of the app uses: + +- :func:`get_manager` — singleton :class:`LicenseManager` instance. +- :func:`current_state` — quick snapshot for status badges / tests. +- :func:`require_feature` — raise :class:`LicenseError` if a feature + isn't unlocked by the active license. +- :class:`License`, :class:`Tier`, :class:`FeatureFlag` — schema. +- :class:`LicenseError` and subclasses — typed failures the UI can + branch on (not yet activated vs. expired vs. tampered). + +The license model is: + +1. The seller (creator) runs ``scripts/generate_license.py`` to mint a + signed **license blob** keyed to a buyer's name + email. +2. The buyer pastes the blob into the activation page on first launch. +3. The app verifies the HMAC signature locally (no internet), then + writes a canonical ``~/.datatools/license.json`` and the app + unlocks. + +The signature is HMAC-SHA256 with a build-time secret. Combined with +the 30-day refund policy, this is honor-system DRM — see +``docs/DECISIONS.md`` for the trade-off discussion. +""" + +from __future__ import annotations + +from .errors import ( + ExpiredLicenseError, + InvalidLicenseError, + LicenseError, + NotActivatedError, + UnsupportedFeatureError, +) +from .features import FEATURES_BY_TIER, all_features_for_tier +from .manager import LicenseManager, current_state, get_manager, require_feature +from .schema import FeatureFlag, License, Tier + +__all__ = [ + # Manager + "LicenseManager", + "current_state", + "get_manager", + "require_feature", + # Schema + "FeatureFlag", + "License", + "Tier", + # Feature registry + "FEATURES_BY_TIER", + "all_features_for_tier", + # Errors + "LicenseError", + "NotActivatedError", + "ExpiredLicenseError", + "InvalidLicenseError", + "UnsupportedFeatureError", +] diff --git a/src/license/crypto.py b/src/license/crypto.py new file mode 100644 index 0000000..580478a --- /dev/null +++ b/src/license/crypto.py @@ -0,0 +1,112 @@ +"""HMAC sign/verify for license blobs. + +The signing secret is read from ``$DATATOOLS_LICENSE_SECRET`` if +present, otherwise from the build-time constant below. Replace the +constant at build time (via PyInstaller hook or a sed step in the +build pipeline) so the shipped binary has a different secret from +this repo's source tree. + +Threat model: honor-system DRM. A motivated reverse engineer can pull +the secret out of the binary, sign their own licenses, and bypass the +check. That's expected for $49 desktop software — the goal is to +discourage casual sharing, not stop targeted piracy. The 30-day +refund policy and the personal-name embedded in every license cover +the same gap from a different angle. +""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import os +from typing import Any + +# Build-time default. Replace via env var in shipped builds; keep this +# constant non-empty so unit tests have a stable verification key. +_DEFAULT_SECRET = ( + "datatools-license-v1-development-secret-" + "replace-at-build-time-via-DATATOOLS_LICENSE_SECRET" +) + + +def _secret_bytes() -> bytes: + """Return the active HMAC secret as bytes.""" + return os.environ.get("DATATOOLS_LICENSE_SECRET", _DEFAULT_SECRET).encode("utf-8") + + +def _canonical_bytes(payload: dict[str, Any]) -> bytes: + """Canonical JSON encoding for the HMAC input. + + ``sort_keys=True`` + ``separators=(",", ":")`` produce a byte-for- + byte deterministic representation across Python versions and OS + locales. Without that, two structurally-identical dicts could hash + to different signatures. + """ + return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8") + + +def sign(payload: dict[str, Any]) -> str: + """Compute the HMAC-SHA256 hex digest over *payload*. + + *payload* MUST NOT contain a ``signature`` key — that's the field + we're computing. The caller is responsible for stripping it. + """ + digest = hmac.new(_secret_bytes(), _canonical_bytes(payload), hashlib.sha256) + return digest.hexdigest() + + +def verify(payload: dict[str, Any], signature: str) -> bool: + """Constant-time compare between the recomputed HMAC and *signature*. + + Returns ``True`` on a match. Uses :func:`hmac.compare_digest` so a + timing oracle can't be used to recover the secret one byte at a + time — overkill for honor-system DRM, but free. + """ + expected = sign(payload) + return hmac.compare_digest(expected.encode("ascii"), signature.encode("ascii")) + + +# --------------------------------------------------------------------------- +# Blob encoding / decoding +# --------------------------------------------------------------------------- + +# A "license blob" is the artifact the buyer pastes into the activation +# form. It's a base64-encoded JSON dict containing every license field +# *plus* the signature. We choose base64 over raw JSON so the blob is +# one paste-able token (no whitespace surprises) and so a typo +# truncates the blob into an obviously-invalid form rather than a +# subtly-mutated payload. + +_BLOB_PREFIX = "DTLIC1:" + + +def encode_blob(payload_with_signature: dict[str, Any]) -> str: + """Wrap a signed payload into the buyer-facing blob form.""" + raw = json.dumps( + payload_with_signature, sort_keys=True, separators=(",", ":"), + ).encode("utf-8") + return _BLOB_PREFIX + base64.urlsafe_b64encode(raw).decode("ascii") + + +def decode_blob(blob: str) -> dict[str, Any]: + """Reverse of :func:`encode_blob`. Raises ``ValueError`` on a + blob that doesn't carry the expected prefix or doesn't decode + cleanly — both surface as :class:`InvalidLicenseError` at the + manager layer.""" + s = blob.strip() + if not s.startswith(_BLOB_PREFIX): + raise ValueError( + f"License blob missing {_BLOB_PREFIX!r} prefix. " + "Did you paste the wrong text?" + ) + encoded = s[len(_BLOB_PREFIX):] + try: + raw = base64.urlsafe_b64decode(encoded.encode("ascii")) + except (ValueError, TypeError) as e: + raise ValueError(f"License blob is not valid base64: {e}") from e + try: + return json.loads(raw.decode("utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + raise ValueError(f"License blob contains invalid JSON: {e}") from e diff --git a/src/license/errors.py b/src/license/errors.py new file mode 100644 index 0000000..cd4f73c --- /dev/null +++ b/src/license/errors.py @@ -0,0 +1,47 @@ +"""Structured error hierarchy for the license layer. + +Mirrors the ``src.core.errors`` pattern — every subclass extends a +stdlib base so existing ``except OSError`` / ``except ValueError`` +handlers keep working. The UI / CLI branches on the subclass to render +the right next step (activate, renew, contact support). +""" + +from __future__ import annotations + + +class LicenseError(ValueError): + """Base class for every licensing failure. Subclass-only — callers + should catch the specific failure mode they handle.""" + + +class NotActivatedError(LicenseError): + """No license file present, or file present but signature missing. + + Recovery: open the activation page (GUI) or run + ``datatools-license activate `` (CLI). + """ + + +class InvalidLicenseError(LicenseError): + """The license file is present but failed verification. + + Common causes: tampered signature, blob from a different build + (different secret), corrupted JSON. Recovery: re-paste the blob + from the original delivery email or contact support. + """ + + +class ExpiredLicenseError(LicenseError): + """The license is structurally valid but past its expiration date. + + Recovery: renew via ``datatools-license renew `` or paste a + new blob into the activation page. + """ + + +class UnsupportedFeatureError(LicenseError): + """The active license's tier doesn't unlock a requested feature. + + Raised by :func:`require_feature` when, e.g., a TRIAL user tries + to access an ENTERPRISE-only tool. Recovery: upgrade tier. + """ diff --git a/src/license/features.py b/src/license/features.py new file mode 100644 index 0000000..cf6a71d --- /dev/null +++ b/src/license/features.py @@ -0,0 +1,49 @@ +"""Tier → feature mapping. + +A tier unlocks every feature listed for it. Adding a new SKU means +adding a new row here and (if the SKU introduces new functionality) +adding feature flags to :class:`~src.license.schema.FeatureFlag`. No +consumer code changes. + +The v1 product ships only :data:`Tier.CORE`, which unlocks every tool. +TRIAL exists so a buyer can register without a paid key and still get +a 1-year working license; the difference between TRIAL and CORE is +semantic (and the basis for showing "TRIAL" in the sidebar), not +functional. + +PRO and ENTERPRISE are scaffolded for future SKUs. They currently +unlock the same feature set as CORE so the architecture is exercised +by tests without committing to a particular pricing structure. +""" + +from __future__ import annotations + +from typing import FrozenSet + +from .schema import FeatureFlag, Tier + + +def _all() -> FrozenSet[FeatureFlag]: + """Every feature flag — used as the default for the v1 SKU.""" + return frozenset(FeatureFlag) + + +FEATURES_BY_TIER: dict[Tier, FrozenSet[FeatureFlag]] = { + Tier.TRIAL: _all(), + Tier.CORE: _all(), + # Pre-wired for future SKUs. Today they mirror CORE so the gating + # tests exercise the lookup path without making a marketing claim. + Tier.PRO: _all(), + Tier.ENTERPRISE: _all(), +} + + +def all_features_for_tier(tier: Tier) -> tuple[str, ...]: + """Return the canonical, sorted tuple of feature ids for *tier*. + + Used by the license generator to fill the ``features`` field on a + new license, and by the manager to upgrade an older license whose + ``features`` list omits a flag we've since added to its tier. + """ + flags = FEATURES_BY_TIER[tier] + return tuple(sorted(f.value for f in flags)) diff --git a/src/license/manager.py b/src/license/manager.py new file mode 100644 index 0000000..ee6573d --- /dev/null +++ b/src/license/manager.py @@ -0,0 +1,470 @@ +"""LicenseManager — the public face of the license layer. + +Singleton-by-default (``get_manager()`` returns a process-wide +instance), but tests can construct standalone managers via the +constructor for full isolation. + +Lifecycle:: + + mgr = get_manager() + if not mgr.is_activated(): + mgr.activate_from_blob(blob, name, email) + mgr.require_feature(FeatureFlag.DEDUPLICATOR) + state = mgr.current_state() # snapshot for the sidebar / CLI status +""" + +from __future__ import annotations + +import os +import re +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from . import crypto, storage +from .errors import ( + ExpiredLicenseError, + InvalidLicenseError, + LicenseError, + NotActivatedError, + UnsupportedFeatureError, +) +from .features import all_features_for_tier +from .schema import FeatureFlag, License, Tier, default_expiry_iso, _utcnow_iso + + +# --------------------------------------------------------------------------- +# State snapshot +# --------------------------------------------------------------------------- + +@dataclass(frozen=True) +class LicenseState: + """A read-only snapshot for status widgets / CLI ``--status`` JSON. + + Always safe to render — even when no license is activated the + dataclass is populated with explanatory defaults so the GUI never + needs to None-check before formatting. + """ + + activated: bool + valid: bool # activated AND not expired AND signature OK + name: str + email: str + tier: str + license_key: str + issued_at: str + expires_at: str + days_remaining: int + features: tuple[str, ...] + error_kind: str # "", "not_activated", "expired", "invalid" + error_message: str + + def as_dict(self) -> dict: + from dataclasses import asdict + d = asdict(self) + d["features"] = list(self.features) + return d + + +_EMPTY_STATE = LicenseState( + activated=False, valid=False, name="", email="", tier="", + license_key="", issued_at="", expires_at="", days_remaining=0, + features=(), + error_kind="not_activated", + error_message="No license activated.", +) + + +# --------------------------------------------------------------------------- +# Manager +# --------------------------------------------------------------------------- + +_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") + + +class LicenseManager: + """Read/write license state. Cheap to construct; the singleton at + module level just avoids reload churn. + + Storage path defaults to :func:`storage.default_license_path` — + pass ``path=`` to override for tests. + """ + + def __init__(self, *, path: Optional[Path] = None) -> None: + self._path = path + self._cached: Optional[License] = None + self._dev_mode: Optional[bool] = None + + # --- Dev bypass --------------------------------------------------------- + + @property + def dev_mode(self) -> bool: + """``DATATOOLS_DEV_MODE=1`` short-circuits every check. + + Cached on the instance so a test that sets the env after + construction still picks it up (re-read on each access). + """ + return _truthy_env("DATATOOLS_DEV_MODE") + + # --- Load / save -------------------------------------------------------- + + def load(self) -> Optional[License]: + """Read + verify the on-disk license. Returns ``None`` when no + file exists. Raises :class:`InvalidLicenseError` on signature + mismatch / tampering.""" + raw = storage.read_raw(self._path) + if raw is None: + self._cached = None + return None + lic = License.from_dict(raw) + # Verify signature against the canonical payload. + if not crypto.verify(lic.to_canonical_dict(), lic.signature): + raise InvalidLicenseError( + "License signature does not verify. The file may have " + "been tampered with, or it was issued by a different " + "build. Re-paste the original license blob to recover." + ) + self._cached = lic + return lic + + def save(self, lic: License) -> Path: + """Persist *lic* to the configured path. Caller is responsible + for having signed the license already; this function does + NOT re-sign.""" + path = storage.write_raw(lic.to_dict(), self._path) + self._cached = lic + return path + + def deactivate(self) -> bool: + """Remove the on-disk license. Returns whether a file was + removed (False if nothing was active).""" + self._cached = None + return storage.remove(self._path) + + # --- Activation --------------------------------------------------------- + + def activate_from_blob( + self, + blob: str, + *, + name: str, + email: str, + ) -> License: + """Verify *blob* and write the activated license to disk. + + The buyer pastes the blob; the page collects their *name* and + *email* separately. We require both registered values to + match the values embedded in the signed blob — defends + against blob-sharing between buyers. + """ + _validate_registration(name, email) + try: + payload = crypto.decode_blob(blob) + except ValueError as e: + raise InvalidLicenseError(str(e)) from e + + signature = payload.get("signature", "") + if not signature: + raise InvalidLicenseError( + "License blob is missing the ``signature`` field. " + "The blob may have been truncated when pasted." + ) + + canonical = {k: v for k, v in payload.items() if k != "signature"} + if not crypto.verify(canonical, signature): + raise InvalidLicenseError( + "License blob signature did not verify. The blob may " + "be corrupt, intended for a different product build, " + "or modified after issue." + ) + + # Reconstruct the License dataclass after verification so the + # canonical dict we hashed matches the on-disk JSON. + lic = License.from_dict(payload) + + # Personal-name and email matching is a soft attestation. We + # enforce case-insensitive equality after stripping whitespace, + # so " jane@Example.com " matches the embedded canonical + # form without surprising the user about case. + if name.strip().casefold() != lic.name.casefold() or ( + email.strip().casefold() != lic.email.casefold() + ): + raise InvalidLicenseError( + "Registered name / email do not match the values " + "embedded in the license blob. Contact support if you " + "believe this is in error." + ) + + if lic.is_expired(): + raise ExpiredLicenseError( + f"License expired on {lic.expires_at}. " + "Paste a renewal blob to extend access." + ) + + self.save(lic) + return lic + + def issue_trial(self, *, name: str, email: str, years: int = 1) -> License: + """Self-sign a 1-year trial license. The seller's + ``scripts/generate_license.py`` produces these for buyers; the + same code path is reused at activation time as a fallback + when a buyer wants to evaluate without a key. + + Trial licenses are functionally identical to CORE in v1; only + the tier label differs (so the sidebar can say "TRIAL" if we + ever want to nudge a conversion). + """ + _validate_registration(name, email) + return self._mint(name=name, email=email, tier=Tier.TRIAL, years=years) + + def renew(self, blob: str) -> License: + """Renew an existing license using a fresh blob. + + Verification: the blob must verify, its name+email must match + the currently-active license, and its expiry must be in the + future. We allow tier changes during renewal (upgrade path). + """ + current = self._cached or self.load() + if current is None: + raise NotActivatedError( + "No active license to renew. Use ``activate`` instead " + "of ``renew`` for first-time setup." + ) + try: + payload = crypto.decode_blob(blob) + except ValueError as e: + raise InvalidLicenseError(str(e)) from e + signature = payload.get("signature", "") + canonical = {k: v for k, v in payload.items() if k != "signature"} + if not crypto.verify(canonical, signature): + raise InvalidLicenseError("Renewal blob signature did not verify.") + lic = License.from_dict(payload) + if ( + lic.name.casefold() != current.name.casefold() + or lic.email.casefold() != current.email.casefold() + ): + raise InvalidLicenseError( + "Renewal blob is for a different name/email than the " + "currently-active license." + ) + if lic.is_expired(): + raise ExpiredLicenseError( + "Renewal blob is itself expired. Generate a new one." + ) + self.save(lic) + return lic + + # --- Inspection --------------------------------------------------------- + + def is_activated(self) -> bool: + if self._cached is not None: + return True + return storage.read_raw(self._path) is not None + + def is_valid(self) -> bool: + if self.dev_mode: + return True + try: + lic = self._cached or self.load() + except LicenseError: + return False + if lic is None: + return False + return not lic.is_expired() + + def current_state(self) -> LicenseState: + if self.dev_mode: + return LicenseState( + activated=True, valid=True, + name="dev", email="dev@local", + tier=Tier.ENTERPRISE.value, + license_key="DEV-BYPASS", + issued_at=_utcnow_iso(), + expires_at=default_expiry_iso(years=99), + days_remaining=36500, + features=all_features_for_tier(Tier.ENTERPRISE), + error_kind="", + error_message="", + ) + try: + lic = self._cached or self.load() + except InvalidLicenseError as e: + return _EMPTY_STATE.__class__( + activated=True, valid=False, + name="", email="", tier="", license_key="", + issued_at="", expires_at="", days_remaining=0, + features=(), + error_kind="invalid", + error_message=str(e), + ) + if lic is None: + return _EMPTY_STATE + if lic.is_expired(): + return LicenseState( + activated=True, valid=False, + name=lic.name, email=lic.email, tier=lic.tier.value, + license_key=lic.license_key, + issued_at=lic.issued_at, expires_at=lic.expires_at, + days_remaining=lic.days_remaining(), + features=lic.features, + error_kind="expired", + error_message=( + f"License expired on {lic.expires_at}. " + "Paste a renewal blob to extend access." + ), + ) + return LicenseState( + activated=True, valid=True, + name=lic.name, email=lic.email, tier=lic.tier.value, + license_key=lic.license_key, + issued_at=lic.issued_at, expires_at=lic.expires_at, + days_remaining=max(lic.days_remaining(), 0), + features=lic.features, + error_kind="", + error_message="", + ) + + def require_feature(self, feature: str | FeatureFlag) -> License: + """Raise the right error if *feature* isn't accessible. + + Returns the active :class:`License` on success so callers can + log the tier / days-remaining alongside their own work. + """ + if self.dev_mode: + # Synthesize a dev license so callers expecting a return + # value don't blow up. The dev license unlocks every flag. + return License( + name="dev", email="dev@local", + license_key="DEV-BYPASS", + tier=Tier.ENTERPRISE, + features=all_features_for_tier(Tier.ENTERPRISE), + issued_at=_utcnow_iso(), + expires_at=default_expiry_iso(years=99), + signature="", + ) + try: + lic = self._cached or self.load() + except InvalidLicenseError: + raise + if lic is None: + raise NotActivatedError( + "DataTools is not activated. Run " + "``datatools-license activate `` or use the " + "Activate page in the GUI." + ) + if lic.is_expired(): + raise ExpiredLicenseError( + f"License expired on {lic.expires_at}. " + "Renew before continuing." + ) + if not lic.has_feature(feature): + tier_name = lic.tier.value if isinstance(lic.tier, Tier) else lic.tier + raise UnsupportedFeatureError( + f"Feature {feature!r} is not enabled on the active " + f"{tier_name!r} license." + ) + return lic + + # --- Internals --------------------------------------------------------- + + def _mint( + self, + *, + name: str, + email: str, + tier: Tier, + years: int = 1, + license_key: Optional[str] = None, + ) -> License: + """Self-sign a new license. Used by ``issue_trial`` and by + the seller-side key generation utility (which calls the + same code via the bare manager).""" + now = _utcnow_iso() + exp = default_expiry_iso(years=years) + features = all_features_for_tier(tier) + key = license_key or _generate_license_key(tier) + unsigned = License( + name=name, email=email, license_key=key, tier=tier, + features=features, issued_at=now, expires_at=exp, + signature="", + ) + sig = crypto.sign(unsigned.to_canonical_dict()) + signed = License( + name=unsigned.name, email=unsigned.email, + license_key=unsigned.license_key, tier=unsigned.tier, + features=unsigned.features, issued_at=unsigned.issued_at, + expires_at=unsigned.expires_at, signature=sig, + ) + self.save(signed) + return signed + + +def _generate_license_key(tier: Tier) -> str: + """Human-readable but unguessable key id. + + Format: ``DT1-{TIER}-{8 hex}-{8 hex}``. The two random hex blocks + come from a single UUID4 so the key has 64 bits of entropy. Not + used as the cryptographic identity — that's the signature — but + it's a stable handle for support emails. + """ + rid = uuid.uuid4().hex + return f"DT1-{tier.value.upper()}-{rid[:8]}-{rid[8:16]}" + + +def _validate_registration(name: str, email: str) -> None: + """Reject obviously-bad inputs before touching crypto. + + The activation page should call this too so the error surfaces + immediately instead of from inside the verifier. + """ + if not name or not name.strip(): + raise InvalidLicenseError("Name is required for registration.") + if not email or not _EMAIL_RE.match(email.strip()): + raise InvalidLicenseError( + f"{email!r} is not a valid email address. " + "Expected: ``local@domain.tld``." + ) + + +def _truthy_env(name: str) -> bool: + v = os.environ.get(name, "") + return v.strip().lower() in {"1", "true", "yes", "on"} + + +# --------------------------------------------------------------------------- +# Singleton + module-level convenience +# --------------------------------------------------------------------------- + +_singleton: Optional[LicenseManager] = None + + +def get_manager() -> LicenseManager: + """Return the process-wide :class:`LicenseManager`. + + Re-uses the same instance across imports so the GUI's sidebar, + the chrome gate, and the CLI guard share one cached license read. + Tests that need isolation should construct their own manager + instead. + """ + global _singleton + if _singleton is None: + _singleton = LicenseManager() + return _singleton + + +def reset_singleton_for_tests() -> None: + """Drop the cached singleton. Used by the test fixture so each + test session starts with a fresh manager pointed at its tmp + license path.""" + global _singleton + _singleton = None + + +def current_state() -> LicenseState: + return get_manager().current_state() + + +def require_feature(feature: str | FeatureFlag) -> License: + return get_manager().require_feature(feature) diff --git a/src/license/schema.py b/src/license/schema.py new file mode 100644 index 0000000..297fe07 --- /dev/null +++ b/src/license/schema.py @@ -0,0 +1,181 @@ +"""License schema — dataclasses + enums. + +Wire format (the contents of ``~/.datatools/license.json`` AND the +base64-decoded activation blob):: + + { + "name": "Jane Doe", + "email": "jane@example.com", + "license_key": "DT1-CORE-1A2B3C4D-5E6F7G8H", + "tier": "core", + "features": ["01_deduplicator", "02_text_cleaner", ...], + "issued_at": "2026-05-13T00:00:00Z", + "expires_at": "2027-05-13T00:00:00Z", + "signature": "" + } + +The signature is the HMAC over the canonical JSON of every field +*except* ``signature`` itself (see :mod:`.crypto`). Keeping the schema +strictly additive means future builds can verify older licenses as +long as they ship the same secret. +""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import Any + + +class Tier(str, Enum): + """License tier. Drives the feature set the active license unlocks. + + Order matters: TRIAL < CORE < PRO < ENTERPRISE. A higher tier + inherits every feature of the lower tiers — see + :data:`.features.FEATURES_BY_TIER`. + """ + + TRIAL = "trial" + CORE = "core" + PRO = "pro" + ENTERPRISE = "enterprise" + + +class FeatureFlag(str, Enum): + """Stable feature identifiers. Match the ``tool_id`` field in + :mod:`src.gui.tools_registry` so the GUI's per-tool gating can + share the same string keys. + + Future SKUs ship by adding new flags here and adding them to a + new tier in ``FEATURES_BY_TIER`` — no consumer code changes. + """ + + DEDUPLICATOR = "01_deduplicator" + TEXT_CLEANER = "02_text_cleaner" + FORMAT_STANDARDIZER = "03_format_standardizer" + MISSING_HANDLER = "04_missing_handler" + COLUMN_MAPPER = "05_column_mapper" + OUTLIER_DETECTOR = "06_outlier_detector" + MULTI_FILE_MERGER = "07_multi_file_merger" + VALIDATOR_REPORTER = "08_validator_reporter" + PIPELINE_RUNNER = "09_pipeline_runner" + + +def _utcnow_iso() -> str: + """Return current UTC time in ISO-8601 with explicit ``Z`` suffix. + + ``datetime.utcnow`` is deprecated in CPython 3.12; using a + tz-aware UTC datetime and slicing off the ``+00:00`` keeps the + serialized form short and human-readable. + """ + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _parse_iso(s: str) -> datetime: + """Parse one of our ISO strings into a tz-aware datetime.""" + # Accept both ``...Z`` and ``...+00:00`` so future format tweaks + # don't break old files. + if s.endswith("Z"): + s = s[:-1] + "+00:00" + dt = datetime.fromisoformat(s) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + + +@dataclass(frozen=True) +class License: + """One activated license. Immutable — renew/upgrade produces a new + instance, never mutates an existing one.""" + + name: str + email: str + license_key: str + tier: Tier + features: tuple[str, ...] + issued_at: str # ISO-8601 UTC + expires_at: str # ISO-8601 UTC + signature: str = "" # populated by ``crypto.sign`` + + # --- Convenience accessors ------------------------------------------------ + + @property + def issued_dt(self) -> datetime: + return _parse_iso(self.issued_at) + + @property + def expires_dt(self) -> datetime: + return _parse_iso(self.expires_at) + + def is_expired(self, *, now: datetime | None = None) -> bool: + ref = now or datetime.now(timezone.utc) + return ref >= self.expires_dt + + def days_remaining(self, *, now: datetime | None = None) -> int: + ref = now or datetime.now(timezone.utc) + delta = self.expires_dt - ref + # ``int(days)`` floors towards 0 for negatives — we use + # ``max(..., 0)`` for the display path and the raw value for + # the test path. Callers wanting "expired by N days" should + # use ``is_expired`` first. + return delta.days + + def has_feature(self, feature: str | FeatureFlag) -> bool: + key = feature.value if isinstance(feature, FeatureFlag) else feature + return key in self.features + + # --- Serialization -------------------------------------------------------- + + def to_canonical_dict(self) -> dict[str, Any]: + """Return the JSON-canonical dict the HMAC is computed over. + + Excludes ``signature`` so signing and verifying both agree on + the message bytes. + """ + d = asdict(self) + d.pop("signature", None) + d["tier"] = self.tier.value if isinstance(self.tier, Tier) else self.tier + d["features"] = list(self.features) + return d + + def to_dict(self) -> dict[str, Any]: + """Return the on-disk dict, signature included.""" + d = self.to_canonical_dict() + d["signature"] = self.signature + return d + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "License": + """Inverse of :meth:`to_dict`. Tolerant of missing optional + fields (defaults), strict on required ones (raises KeyError). + """ + tier_raw = data["tier"] + tier = tier_raw if isinstance(tier_raw, Tier) else Tier(tier_raw) + return cls( + name=str(data["name"]), + email=str(data["email"]), + license_key=str(data["license_key"]), + tier=tier, + features=tuple(data.get("features", ())), + issued_at=str(data["issued_at"]), + expires_at=str(data["expires_at"]), + signature=str(data.get("signature", "")), + ) + + +# Public helper exposed for the activation flow (1-year default). +def default_expiry_iso(years: int = 1, *, now: datetime | None = None) -> str: + """Return an ISO timestamp *years* from *now* (default: current UTC).""" + ref = now or datetime.now(timezone.utc) + # ``replace(year=...)`` handles leap-year edge cases via the + # ``timedelta`` fallback below for Feb-29 issued dates. + try: + target = ref.replace(year=ref.year + years) + except ValueError: + # Feb 29 + N years where target year isn't a leap year — slide + # to Feb 28. Acceptable; the buyer is one day short of an + # exact year boundary on a date they almost certainly didn't + # pick on purpose. + target = ref.replace(year=ref.year + years, day=28) + return target.strftime("%Y-%m-%dT%H:%M:%SZ") diff --git a/src/license/storage.py b/src/license/storage.py new file mode 100644 index 0000000..3815e95 --- /dev/null +++ b/src/license/storage.py @@ -0,0 +1,76 @@ +"""Where the activated license lives on disk. + +Default path: ``~/.datatools/license.json``. Overridable via +``$DATATOOLS_LICENSE_PATH`` for tests (the conftest fixture uses this +to point each test session at a tmp file). + +The directory is created lazily on first write — we don't want to +create the user's config dir just for reading. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Any, Optional + + +def default_license_path() -> Path: + """The resolved license file path for the current process. + + Order of resolution: + + 1. ``$DATATOOLS_LICENSE_PATH`` (absolute path; used by tests). + 2. ``~/.datatools/license.json`` (everyone else). + """ + override = os.environ.get("DATATOOLS_LICENSE_PATH") + if override: + return Path(override).expanduser().resolve() + return Path.home() / ".datatools" / "license.json" + + +def read_raw(path: Optional[Path] = None) -> Optional[dict[str, Any]]: + """Return the on-disk license dict, or ``None`` if no file exists. + + Anything else (truncated file, invalid JSON) raises ``ValueError`` + so the caller surfaces it as :class:`InvalidLicenseError`. We + don't try to recover from a corrupt license file — a user that + sees "invalid license" can paste their blob again. + """ + p = path or default_license_path() + if not p.exists(): + return None + try: + return json.loads(p.read_text(encoding="utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + raise ValueError(f"License file at {p} is corrupted: {e}") from e + + +def write_raw(data: dict[str, Any], path: Optional[Path] = None) -> Path: + """Atomically write *data* to the license path. + + Atomic = write-to-temp-then-rename, so a crashed write doesn't + leave a half-written license file that would fail verification on + the next launch. + """ + p = path or default_license_path() + p.parent.mkdir(parents=True, exist_ok=True) + tmp = p.with_suffix(p.suffix + ".tmp") + tmp.write_text( + json.dumps(data, indent=2, sort_keys=True), encoding="utf-8", + ) + tmp.replace(p) + return p + + +def remove(path: Optional[Path] = None) -> bool: + """Delete the license file. Returns ``True`` if a file was + removed, ``False`` if nothing was there. Used by the + ``datatools-license deactivate`` command and by test cleanup.""" + p = path or default_license_path() + try: + p.unlink() + return True + except FileNotFoundError: + return False diff --git a/src/license_cli.py b/src/license_cli.py new file mode 100644 index 0000000..049a75c --- /dev/null +++ b/src/license_cli.py @@ -0,0 +1,182 @@ +"""CLI for license management. + +Five commands: + +- ``activate BLOB --name NAME --email EMAIL`` + First-time activation. Verifies the signed blob, ensures the + name + email match, writes ``~/.datatools/license.json``. + +- ``renew BLOB`` + Apply a renewal blob to the currently-active license. The blob's + embedded name + email must match the active license; tier may + differ (upgrade path). + +- ``status [--json]`` + Print the current license state. Human-readable by default; + ``--json`` emits the same payload as a JSON document for piping + into shell scripts / monitoring. + +- ``trial --name NAME --email EMAIL [--years N]`` + Self-issue a trial license without a paid blob. Useful for + evaluating the product or for support to repro a buyer's issue + locally without needing a real key. + +- ``deactivate`` + Remove the local license file. + +This CLI is exempt from the guard that protects every tool CLI — +otherwise a user with no license couldn't run ``activate``. +""" + +from __future__ import annotations + +import json +import sys +from typing import Optional + +import typer + +from src.license import ( + LicenseError, + Tier, + get_manager, +) +from src.license.manager import LicenseManager + + +app = typer.Typer( + name="license", + help=( + "Manage your DataTools license: activate a paid blob, renew, " + "check status, or self-issue a 1-year trial.\n\n" + "All operations are local — no internet calls. The signed " + "license file lives at ~/.datatools/license.json (override " + "with $DATATOOLS_LICENSE_PATH)." + ), + add_completion=False, + no_args_is_help=True, +) + + +@app.command() +def activate( + blob: str = typer.Argument(..., help="License blob from the delivery email (starts with DTLIC1:)."), + name: str = typer.Option(..., "--name", "-n", help="Buyer name (must match the blob)."), + email: str = typer.Option(..., "--email", "-e", help="Buyer email (must match the blob)."), +) -> None: + """Verify and install a license blob.""" + mgr = get_manager() + try: + lic = mgr.activate_from_blob(blob, name=name, email=email) + except LicenseError as e: + typer.echo(f"Activation failed: {e}", err=True) + raise typer.Exit(code=2) + typer.echo( + f"Activated. Tier: {lic.tier.value} · " + f"Key: {lic.license_key} · " + f"Expires: {lic.expires_at[:10]}" + ) + + +@app.command() +def renew( + blob: str = typer.Argument(..., help="Renewal blob from the renewal email."), +) -> None: + """Apply a renewal blob to the currently active license.""" + mgr = get_manager() + try: + lic = mgr.renew(blob) + except LicenseError as e: + typer.echo(f"Renewal failed: {e}", err=True) + raise typer.Exit(code=2) + typer.echo( + f"Renewed. New expiry: {lic.expires_at[:10]} " + f"({lic.days_remaining()} days)" + ) + + +@app.command() +def status( + json_output: bool = typer.Option(False, "--json", help="Emit JSON instead of human-readable text."), +) -> None: + """Print the current license state.""" + state = get_manager().current_state() + if json_output: + typer.echo(json.dumps(state.as_dict(), indent=2)) + return + + if not state.activated: + typer.echo("Status: not activated.") + typer.echo( + "Run: python -m src.license_cli activate " + "--name 'Your Name' --email you@example.com" + ) + raise typer.Exit(code=1) + if state.error_kind == "invalid": + typer.echo(f"Status: invalid. {state.error_message}") + raise typer.Exit(code=2) + if state.error_kind == "expired": + typer.echo( + f"Status: expired on {state.expires_at[:10]}. " + f"Renew with: python -m src.license_cli renew " + ) + raise typer.Exit(code=2) + + typer.echo( + f"Status: active.\n" + f" Name: {state.name}\n" + f" Email: {state.email}\n" + f" Tier: {state.tier}\n" + f" Key: {state.license_key}\n" + f" Issued: {state.issued_at[:10]}\n" + f" Expires: {state.expires_at[:10]} ({state.days_remaining} days)\n" + f" Features: {', '.join(state.features)}" + ) + + +@app.command() +def trial( + name: str = typer.Option(..., "--name", "-n"), + email: str = typer.Option(..., "--email", "-e"), + years: int = typer.Option(1, "--years", help="Trial length (default: 1 year)."), +) -> None: + """Self-issue a trial license without a paid blob.""" + mgr = get_manager() + try: + lic = mgr.issue_trial(name=name, email=email, years=years) + except LicenseError as e: + typer.echo(f"Trial issuance failed: {e}", err=True) + raise typer.Exit(code=2) + typer.echo( + f"Trial issued. Key: {lic.license_key} · " + f"Expires: {lic.expires_at[:10]} ({lic.days_remaining()} days)" + ) + + +@app.command() +def deactivate( + confirm: bool = typer.Option( + False, "--yes", "-y", help="Skip the interactive confirmation.", + ), +) -> None: + """Remove the local license file (does NOT contact a server).""" + if not confirm: + if not typer.confirm( + "This removes the license file at ~/.datatools/license.json. Continue?", + default=False, + ): + typer.echo("Aborted.") + raise typer.Exit(code=1) + removed = get_manager().deactivate() + if removed: + typer.echo("Deactivated. The license file has been removed.") + else: + typer.echo("No license was active; nothing to deactivate.") + + +def main() -> None: + app() + + +if __name__ == "__main__": + main() diff --git a/tests/conftest.py b/tests/conftest.py index ab48128..98e03c3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,7 @@ """Shared test fixtures.""" +import os + import pandas as pd import pytest from pathlib import Path @@ -7,6 +9,84 @@ from pathlib import Path SAMPLES_DIR = Path(__file__).parent.parent / "samples" +# --------------------------------------------------------------------------- +# License gating bypass +# --------------------------------------------------------------------------- +# +# Every CLI entry point and every GUI page now requires a valid license, +# but the test suite shouldn't be in the business of paying for or +# generating licenses on every run. The session-scoped autouse fixture +# below sets the dev-mode env var BEFORE any test code (including +# parametrize-time imports) runs, so all 1900+ existing tests continue +# to pass. +# +# Individual license tests that DO want to exercise the real activation +# flow either: +# - clear the env var themselves and point the manager at a tmp file, or +# - use the explicit ``activated_license_manager`` / ``unactivated_license_manager`` +# fixtures defined below. + +@pytest.fixture(scope="session", autouse=True) +def _enable_license_dev_mode(): + """Bypass license checks for every test by default. + + Set in the env so subprocess-based tests (test_install, test_e2e) + inherit it without each test needing to plumb the env var. + """ + previous = os.environ.get("DATATOOLS_DEV_MODE") + os.environ["DATATOOLS_DEV_MODE"] = "1" + try: + yield + finally: + if previous is None: + os.environ.pop("DATATOOLS_DEV_MODE", None) + else: + os.environ["DATATOOLS_DEV_MODE"] = previous + + +@pytest.fixture +def isolated_license_path(tmp_path, monkeypatch): + """Point the license manager at a fresh tmp file for one test. + + Useful when a test wants to exercise the real activation flow: + create + sign + verify the license bytes in a controlled location + without polluting ``~/.datatools/license.json``. + + Also clears the dev-mode bypass so the manager actually consults + the file. + """ + path = tmp_path / "license.json" + monkeypatch.setenv("DATATOOLS_LICENSE_PATH", str(path)) + monkeypatch.delenv("DATATOOLS_DEV_MODE", raising=False) + # The manager singleton caches its handle across tests; drop it + # so the new env vars take effect. + from src.license.manager import reset_singleton_for_tests + reset_singleton_for_tests() + yield path + reset_singleton_for_tests() + + +@pytest.fixture +def activated_license_manager(isolated_license_path): + """Yield a LicenseManager pointed at a tmp file, pre-activated as + a Core user. The license is freshly signed with the current + secret so verification succeeds. + """ + from src.license import LicenseManager, Tier + mgr = LicenseManager() + mgr._mint(name="Test User", email="test@example.com", tier=Tier.CORE) + return mgr + + +@pytest.fixture +def unactivated_license_manager(isolated_license_path): + """Yield a LicenseManager pointed at a tmp file with NO license + file. Useful for testing the activation flow + gate behaviour. + """ + from src.license import LicenseManager + return LicenseManager() + + @pytest.fixture def sample_csv_path(): return SAMPLES_DIR / "messy_sales.csv" diff --git a/tests/gui/test_activation.py b/tests/gui/test_activation.py new file mode 100644 index 0000000..d9ea370 --- /dev/null +++ b/tests/gui/test_activation.py @@ -0,0 +1,254 @@ +"""GUI activation + license-gate tests. + +These exercise the chrome-level gate that ``hide_streamlit_chrome`` +installs: when no valid license is on disk, every page renders the +activation form instead of the page body, and tool widgets do NOT +appear. We test against the Deduplicator page since it's the smallest +real-world tool that depends on chrome. + +The autouse fixture in ``tests/conftest.py`` sets +``DATATOOLS_DEV_MODE=1``, which the GUI gate respects. Each test +below uses ``monkeypatch`` to clear that env var so the real gate +fires; ``isolated_license_path`` then redirects the manager to a +tmp file. +""" + +from __future__ import annotations + +import pytest + +from streamlit.testing.v1 import AppTest + +from .conftest import collected_text, stash_upload, with_language + + +@pytest.fixture +def no_license_env(monkeypatch, tmp_path): + """Clear dev mode and point the license at a fresh empty tmp path.""" + monkeypatch.delenv("DATATOOLS_DEV_MODE", raising=False) + monkeypatch.setenv("DATATOOLS_LICENSE_PATH", str(tmp_path / "license.json")) + from src.license.manager import reset_singleton_for_tests + reset_singleton_for_tests() + yield tmp_path / "license.json" + reset_singleton_for_tests() + + +@pytest.fixture +def trial_license(no_license_env): + """Pre-activate a 1-year trial license for tests that need to + pass the gate.""" + from src.license import LicenseManager, Tier + mgr = LicenseManager() + mgr.issue_trial(name="Test User", email="test@example.com") + yield mgr + + +class TestGateBlocksWithoutLicense: + """When no license file exists, every page should render the + activation form and short-circuit the tool body.""" + + def test_home_renders_activation_form(self, no_license_env, home_app): + home_app.run() + text = collected_text(home_app) + assert "Activate DataTools" in text or "Activar DataTools" in text + + def test_dedup_page_does_not_render_tool_widgets( + self, no_license_env, app_factory, + ): + app = app_factory("1_Deduplicator") + app.run() + # Without a license, the page should NOT have the dedup- + # specific advanced-options expander or Find Duplicates button. + labels = [b.label for b in app.button] + assert not any("Find Duplicates" in lbl for lbl in labels), ( + f"tool widgets leaked past the gate; got: {labels}" + ) + + def test_activation_form_localizes_to_spanish( + self, no_license_env, home_app, + ): + with_language(home_app, "es") + home_app.run() + text = collected_text(home_app) + assert "Activar DataTools" in text + + def test_sidebar_shows_not_activated(self, no_license_env, home_app): + home_app.run() + # Sidebar caption "🔒 Not activated". + captions = [c.value for c in home_app.sidebar.caption] + joined = " ".join(captions) + assert "Not activated" in joined or "Sin activar" in joined + + +class TestGatePassesWithTrialLicense: + def test_home_renders_full_grid(self, trial_license, home_app): + home_app.run() + text = collected_text(home_app) + # With a valid license, the activation form should NOT be the + # primary content; we should see the home title + tool cards. + assert "Data Cleaning Mastery" in text + assert "Activate DataTools" not in text # form not shown inline + + def test_sidebar_shows_active_status(self, trial_license, home_app): + home_app.run() + captions = " ".join(c.value for c in home_app.sidebar.caption) + # "Trial · 364 days left" (give or take one). + assert "Trial" in captions or "Prueba" in captions + assert "days left" in captions or "días" in captions + + def test_dedup_page_renders_tool_widgets( + self, trial_license, app_factory, small_csv_bytes, + ): + app = app_factory("1_Deduplicator") + stash_upload(app, name="messy.csv", data=small_csv_bytes) + app.run() + labels = [b.label for b in app.button] + assert any("Find Duplicates" in lbl for lbl in labels), ( + f"tool widgets blocked by gate even with valid license; " + f"got: {labels}" + ) + + +class TestActivationFormSubmission: + """End-to-end: paste a generated blob into the inline form and + confirm the gate releases on next render.""" + + def test_paste_blob_activates_and_unlocks( + self, no_license_env, home_app, + ): + # Generate a blob the same way scripts/generate_license.py does. + from src.license import LicenseManager, Tier + from src.license.crypto import encode_blob + mint_mgr = LicenseManager() + # Use a separate tmp path so the trial we mint doesn't fight + # with the manager the GUI will use. + from tempfile import mkstemp + from pathlib import Path + _, p = mkstemp(suffix=".json") + mint_mgr._path = Path(p) + lic = mint_mgr._mint( + name="Buyer", email="buyer@example.com", tier=Tier.CORE, + ) + blob = encode_blob(lic.to_dict()) + # Wipe the temporary mint manager so its file doesn't collide. + mint_mgr.deactivate() + + # Now drive the real GUI manager. + home_app.run() + # The activation form is inline in chrome — its widget keys + # are prefixed ``gate_``. + home_app.text_input(key="gate_name").set_value("Buyer").run() + home_app.text_input(key="gate_email").set_value("buyer@example.com").run() + home_app.text_area(key="gate_blob").set_value(blob).run() + # Submit primary form button. + submit = next( + b for b in home_app.button + if b.label in ("Activate", "Apply renewal") + ) + submit.click().run() + + # After activation the page reruns and the activation form + # should be gone — we should see the home page proper. + text = collected_text(home_app) + assert "Data Cleaning Mastery" in text + + def test_trial_button_self_issues_license( + self, no_license_env, home_app, + ): + home_app.run() + home_app.text_input(key="gate_name").set_value("Trial").run() + home_app.text_input(key="gate_email").set_value("trial@example.com").run() + # Click the trial button on the same form. + trial_btn = next( + b for b in home_app.button + if "trial" in b.label.lower() or "prueba" in b.label.lower() + ) + trial_btn.click().run() + text = collected_text(home_app) + # Successful activation → home page renders fully. + assert "Data Cleaning Mastery" in text + + +class TestActivationPageDirect: + """``pages/_Activate.py`` renders the same form regardless of + license state — buyer can revisit it to review or deactivate.""" + + def test_activate_page_renders_with_valid_license( + self, trial_license, app_factory, + ): + app = app_factory("_Activate") + app.run() + text = collected_text(app) + # Page title localized. + assert "Activate DataTools" in text + # Deactivate option only shown after activation. + labels = [b.label for b in app.button] + assert any("Deactivate" in lbl for lbl in labels) + + def test_activate_page_renders_without_license( + self, no_license_env, app_factory, + ): + app = app_factory("_Activate") + app.run() + text = collected_text(app) + assert "Activate DataTools" in text + # Deactivate button should NOT appear when nothing is active. + labels = [b.label for b in app.button] + assert not any("Deactivate" in lbl for lbl in labels) + + +class TestSidebarRenewalWarning: + """A license with <30 days remaining surfaces a sidebar warning.""" + + def test_renewal_warning_appears_under_30_days( + self, no_license_env, home_app, monkeypatch, + ): + # Mint a license with 7 days left. + from datetime import datetime, timedelta, timezone + from src.license import License, LicenseManager, Tier + from src.license import crypto as _crypto + from src.license.features import all_features_for_tier + + future = (datetime.now(timezone.utc) + timedelta(days=7)).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + lic = License( + name="X", email="x@x.com", + license_key="DT1-CORE-EXPIRING", + tier=Tier.CORE, + features=all_features_for_tier(Tier.CORE), + issued_at="2026-05-13T00:00:00Z", + expires_at=future, + signature="", + ) + sig = _crypto.sign(lic.to_canonical_dict()) + signed = License(**{**lic.__dict__, "signature": sig}) + LicenseManager().save(signed) + + home_app.run() + sidebar_warnings = [w.body for w in home_app.sidebar.warning if w.body] + joined = " ".join(sidebar_warnings) + assert "expires in" in joined.lower() or "caduca en" in joined.lower(), ( + f"expected renewal warning in sidebar; got: {sidebar_warnings}" + ) + + +class TestLicenseStatusBadgeI18n: + """Sidebar status badge tier name must localize.""" + + def test_core_tier_localizes_in_spanish( + self, no_license_env, home_app, monkeypatch, + ): + from src.license import LicenseManager, Tier + LicenseManager()._mint( + name="X", email="x@x.com", tier=Tier.CORE, + ) + with_language(home_app, "es") + home_app.run() + captions = " ".join(c.value for c in home_app.sidebar.caption) + # The es pack maps ``license.tier_core`` to "Core" — same word + # in Spanish — but the surrounding template (``días restantes``) + # localizes. + assert "días restantes" in captions, ( + f"Spanish status label missing; sidebar captions: {captions}" + ) diff --git a/tests/test_license.py b/tests/test_license.py new file mode 100644 index 0000000..f235e62 --- /dev/null +++ b/tests/test_license.py @@ -0,0 +1,430 @@ +"""Unit tests for the license layer. + +Covers: + +- Schema: License dataclass roundtrip + expiration helpers. +- Crypto: HMAC sign/verify, tamper detection, blob encode/decode. +- Manager: activation, renewal, deactivation, feature gating, + expiration handling, dev-mode bypass, name/email mismatch + rejection. + +The session-scoped autouse fixture in ``conftest.py`` sets +``DATATOOLS_DEV_MODE=1`` for the suite. Tests in this file that need +the real check explicitly use the ``isolated_license_path`` fixture +which clears it. +""" + +from __future__ import annotations + +import json +import os +from datetime import datetime, timedelta, timezone +from pathlib import Path + +import pytest + +from src.license import ( + ExpiredLicenseError, + FeatureFlag, + InvalidLicenseError, + License, + LicenseError, + LicenseManager, + NotActivatedError, + Tier, + UnsupportedFeatureError, +) +from src.license.crypto import ( + _DEFAULT_SECRET, + decode_blob, + encode_blob, + sign, + verify, +) +from src.license.features import FEATURES_BY_TIER, all_features_for_tier +from src.license.schema import default_expiry_iso + + +# --------------------------------------------------------------------------- +# Schema +# --------------------------------------------------------------------------- + +class TestLicenseSchema: + def _make(self, **overrides) -> License: + defaults = dict( + name="Jane Doe", + email="jane@example.com", + license_key="DT1-CORE-AAAA-BBBB", + tier=Tier.CORE, + features=("01_deduplicator",), + issued_at="2026-05-13T00:00:00Z", + expires_at="2027-05-13T00:00:00Z", + signature="deadbeef", + ) + defaults.update(overrides) + return License(**defaults) + + def test_to_dict_roundtrip(self): + lic = self._make() + again = License.from_dict(lic.to_dict()) + assert again == lic + + def test_canonical_dict_excludes_signature(self): + lic = self._make() + canon = lic.to_canonical_dict() + assert "signature" not in canon + assert canon["name"] == "Jane Doe" + + def test_is_expired_false_when_future(self): + future = (datetime.now(timezone.utc) + timedelta(days=30)).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + lic = self._make(expires_at=future) + assert not lic.is_expired() + assert lic.days_remaining() >= 29 + + def test_is_expired_true_when_past(self): + past = (datetime.now(timezone.utc) - timedelta(days=1)).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + lic = self._make(expires_at=past) + assert lic.is_expired() + + def test_has_feature_accepts_string_and_enum(self): + lic = self._make(features=("01_deduplicator", "02_text_cleaner")) + assert lic.has_feature("01_deduplicator") + assert lic.has_feature(FeatureFlag.TEXT_CLEANER) + assert not lic.has_feature(FeatureFlag.PIPELINE_RUNNER) + + def test_default_expiry_one_year_default(self): + now = datetime(2026, 5, 13, tzinfo=timezone.utc) + exp = default_expiry_iso(now=now) + # One year from 2026-05-13 is 2027-05-13 (2027 not a leap year). + assert exp.startswith("2027-05-13") + + def test_default_expiry_leap_day_fallback(self): + # Feb 29 + 1y where target year (2027) isn't a leap year — we + # slide to Feb 28. Pin that contract. + leap = datetime(2024, 2, 29, tzinfo=timezone.utc) + exp = default_expiry_iso(years=3, now=leap) + # 2024 + 3 = 2027; not a leap year. + assert exp.startswith("2027-02-28") + + +# --------------------------------------------------------------------------- +# Crypto +# --------------------------------------------------------------------------- + +class TestSignAndVerify: + def test_sign_is_deterministic(self): + payload = {"a": 1, "b": "hello"} + assert sign(payload) == sign(payload) + + def test_verify_accepts_matching_signature(self): + payload = {"a": 1, "b": "hello"} + sig = sign(payload) + assert verify(payload, sig) is True + + def test_verify_rejects_modified_payload(self): + payload = {"a": 1, "b": "hello"} + sig = sign(payload) + modified = dict(payload, b="goodbye") + assert verify(modified, sig) is False + + def test_verify_rejects_modified_signature(self): + payload = {"a": 1} + sig = sign(payload) + # Flip one nibble. + bad = sig[:-1] + ("0" if sig[-1] != "0" else "1") + assert verify(payload, bad) is False + + def test_sign_respects_secret_env_override(self, monkeypatch): + payload = {"a": 1} + monkeypatch.setenv("DATATOOLS_LICENSE_SECRET", "alternate") + alt = sign(payload) + monkeypatch.delenv("DATATOOLS_LICENSE_SECRET", raising=False) + default = sign(payload) + assert alt != default + + def test_canonical_form_is_key_order_invariant(self): + a = {"x": 1, "y": 2} + b = {"y": 2, "x": 1} + assert sign(a) == sign(b) + + +class TestBlobEncodeDecode: + def test_roundtrip(self): + payload = {"name": "Jane", "tier": "core", "signature": "abc"} + blob = encode_blob(payload) + again = decode_blob(blob) + assert again == payload + + def test_blob_has_human_readable_prefix(self): + blob = encode_blob({"x": 1}) + assert blob.startswith("DTLIC1:") + + def test_decode_rejects_missing_prefix(self): + with pytest.raises(ValueError, match="DTLIC1"): + decode_blob("not-a-blob") + + def test_decode_rejects_bad_base64(self): + with pytest.raises(ValueError, match="base64"): + decode_blob("DTLIC1:!!!notbase64!!!") + + def test_decode_rejects_truncated_blob(self): + blob = encode_blob({"x": 1}) + truncated = blob[:-5] + with pytest.raises(ValueError): + decode_blob(truncated) + + +# --------------------------------------------------------------------------- +# Features +# --------------------------------------------------------------------------- + +class TestFeatures: + def test_every_tier_has_features(self): + for tier in Tier: + assert FEATURES_BY_TIER[tier], ( + f"tier {tier!r} has an empty feature set" + ) + + def test_all_features_for_tier_returns_sorted_tuple(self): + flags = all_features_for_tier(Tier.CORE) + assert flags == tuple(sorted(flags)) + + def test_core_unlocks_every_tool(self): + """v1 SKU contract: Core = all 9 tools.""" + flags = set(all_features_for_tier(Tier.CORE)) + assert {f.value for f in FeatureFlag} <= flags + + +# --------------------------------------------------------------------------- +# Manager: activation flow +# --------------------------------------------------------------------------- + +class TestManagerActivation: + def test_first_load_returns_none_when_no_file( + self, unactivated_license_manager, + ): + assert unactivated_license_manager.load() is None + assert not unactivated_license_manager.is_activated() + assert not unactivated_license_manager.is_valid() + + def test_issue_trial_writes_file_and_returns_license( + self, unactivated_license_manager, isolated_license_path, + ): + lic = unactivated_license_manager.issue_trial( + name="Trial User", email="trial@example.com", + ) + assert lic.tier == Tier.TRIAL + assert lic.name == "Trial User" + assert isolated_license_path.exists() + + def test_trial_signature_round_trips( + self, unactivated_license_manager, isolated_license_path, + ): + unactivated_license_manager.issue_trial( + name="A", email="a@b.com", + ) + mgr2 = LicenseManager() + lic2 = mgr2.load() + assert lic2 is not None + assert lic2.name == "A" + + def test_activate_from_blob_round_trips( + self, unactivated_license_manager, + ): + # Use the manager itself to mint then re-activate from blob. + mgr = unactivated_license_manager + mgr.issue_trial(name="Buyer", email="buyer@example.com") + lic = mgr.load() + # Re-encode as if shipped via Gumroad. + blob = encode_blob(lic.to_dict()) + # Deactivate then re-activate from the blob. + mgr.deactivate() + mgr2 = LicenseManager() + again = mgr2.activate_from_blob(blob, name="Buyer", email="buyer@example.com") + assert again.license_key == lic.license_key + + def test_activate_rejects_wrong_name(self, unactivated_license_manager): + mgr = unactivated_license_manager + mgr.issue_trial(name="Buyer", email="buyer@example.com") + lic = mgr.load() + blob = encode_blob(lic.to_dict()) + mgr.deactivate() + with pytest.raises(InvalidLicenseError, match="do not match"): + mgr.activate_from_blob( + blob, name="Different Person", email="buyer@example.com", + ) + + def test_activate_rejects_wrong_email(self, unactivated_license_manager): + mgr = unactivated_license_manager + mgr.issue_trial(name="Buyer", email="buyer@example.com") + lic = mgr.load() + blob = encode_blob(lic.to_dict()) + mgr.deactivate() + with pytest.raises(InvalidLicenseError, match="do not match"): + mgr.activate_from_blob( + blob, name="Buyer", email="someone-else@example.com", + ) + + def test_activate_rejects_tampered_blob(self, unactivated_license_manager): + mgr = unactivated_license_manager + mgr.issue_trial(name="Buyer", email="buyer@example.com") + lic = mgr.load() + # Tamper: bump tier to enterprise without re-signing. + raw = lic.to_dict() + raw["tier"] = "enterprise" + bad = encode_blob(raw) + mgr.deactivate() + with pytest.raises(InvalidLicenseError, match="signature"): + mgr.activate_from_blob( + bad, name="Buyer", email="buyer@example.com", + ) + + def test_activate_rejects_invalid_email_format( + self, unactivated_license_manager, + ): + mgr = unactivated_license_manager + with pytest.raises(InvalidLicenseError, match="valid email"): + mgr.activate_from_blob("anything", name="x", email="not-an-email") + + def test_deactivate_returns_false_when_no_file( + self, unactivated_license_manager, + ): + assert unactivated_license_manager.deactivate() is False + + def test_deactivate_returns_true_after_activation( + self, unactivated_license_manager, + ): + unactivated_license_manager.issue_trial( + name="A", email="a@b.com", + ) + assert unactivated_license_manager.deactivate() is True + + +# --------------------------------------------------------------------------- +# Manager: expiration + renewal +# --------------------------------------------------------------------------- + +class TestExpirationAndRenewal: + def test_is_valid_false_when_expired( + self, unactivated_license_manager, isolated_license_path, + ): + # Mint a license with an expiry in the past. + from src.license import crypto as _crypto + past = (datetime.now(timezone.utc) - timedelta(days=2)).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + lic = License( + name="X", email="x@x.com", + license_key="DT1-CORE-XXXX-YYYY", + tier=Tier.CORE, + features=all_features_for_tier(Tier.CORE), + issued_at="2025-01-01T00:00:00Z", + expires_at=past, + signature="", + ) + sig = _crypto.sign(lic.to_canonical_dict()) + signed = License( + **{**lic.__dict__, "signature": sig}, + ) + unactivated_license_manager.save(signed) + mgr2 = LicenseManager() + assert mgr2.is_activated() + assert not mgr2.is_valid() + state = mgr2.current_state() + assert state.error_kind == "expired" + + def test_renew_extends_expiry( + self, unactivated_license_manager, + ): + mgr = unactivated_license_manager + old = mgr.issue_trial(name="A", email="a@b.com", years=1) + # Mint a fresh blob with a longer expiry. + mgr2 = LicenseManager() + new = mgr2._mint(name="A", email="a@b.com", tier=Tier.CORE, years=2) + blob = encode_blob(new.to_dict()) + # Renew via the manager. + renewed = mgr.renew(blob) + assert renewed.tier == Tier.CORE + assert renewed.expires_dt > old.expires_dt + + def test_renew_rejects_for_different_buyer( + self, unactivated_license_manager, + ): + mgr = unactivated_license_manager + mgr.issue_trial(name="A", email="a@b.com") + # Mint a blob for a DIFFERENT buyer. + other = LicenseManager() + # Use a separate path so other doesn't overwrite a's file. + from tempfile import mkstemp + _, p = mkstemp(suffix=".json") + other._path = Path(p) + other_lic = other._mint(name="B", email="b@c.com", tier=Tier.CORE) + blob = encode_blob(other_lic.to_dict()) + with pytest.raises(InvalidLicenseError, match="different name/email"): + mgr.renew(blob) + + +# --------------------------------------------------------------------------- +# Manager: feature gating +# --------------------------------------------------------------------------- + +class TestFeatureGating: + def test_require_feature_passes_on_valid_license( + self, activated_license_manager, + ): + # CORE unlocks every flag in v1. + for flag in FeatureFlag: + activated_license_manager.require_feature(flag) + + def test_require_feature_raises_not_activated( + self, unactivated_license_manager, + ): + with pytest.raises(NotActivatedError): + unactivated_license_manager.require_feature( + FeatureFlag.DEDUPLICATOR, + ) + + def test_require_feature_returns_license( + self, activated_license_manager, + ): + lic = activated_license_manager.require_feature( + FeatureFlag.DEDUPLICATOR, + ) + assert lic.name == "Test User" + + +# --------------------------------------------------------------------------- +# Manager: dev mode +# --------------------------------------------------------------------------- + +class TestDevMode: + def test_dev_mode_bypasses_validity_check( + self, isolated_license_path, monkeypatch, + ): + monkeypatch.setenv("DATATOOLS_DEV_MODE", "1") + mgr = LicenseManager() + assert mgr.is_valid() is True + # No license file exists. + assert not isolated_license_path.exists() + + def test_dev_mode_state_reports_synthetic_license( + self, isolated_license_path, monkeypatch, + ): + monkeypatch.setenv("DATATOOLS_DEV_MODE", "1") + mgr = LicenseManager() + state = mgr.current_state() + assert state.activated is True + assert state.valid is True + assert state.tier == "enterprise" + assert state.error_kind == "" + + def test_dev_mode_off_in_test_default_env_via_explicit_clear( + self, isolated_license_path, monkeypatch, + ): + # ``isolated_license_path`` already clears DEV_MODE; double- + # check that contract here so the broader suite can rely on it. + assert "DATATOOLS_DEV_MODE" not in os.environ diff --git a/tests/test_license_cli.py b/tests/test_license_cli.py new file mode 100644 index 0000000..d21e517 --- /dev/null +++ b/tests/test_license_cli.py @@ -0,0 +1,268 @@ +"""Tests for the license CLI + the per-CLI guard. + +Two layers: + +1. ``src/license_cli.py`` commands — ``activate``, ``renew``, + ``status``, ``trial``, ``deactivate``. Invoked via Typer's testing + helper so we get a clean ``CliRunner.invoke`` interface without + spawning subprocesses for every test. + +2. ``src/cli_license_guard.py`` — verify that every existing tool CLI + refuses to run when no license is present, and that ``--help`` + always works regardless of license state. +""" + +from __future__ import annotations + +import json +import os +import subprocess +import sys +from pathlib import Path + +import pytest +from typer.testing import CliRunner + +from src.license_cli import app as license_app + + +PROJECT_ROOT = Path(__file__).resolve().parent.parent + + +# --------------------------------------------------------------------------- +# license_cli commands +# --------------------------------------------------------------------------- + +class TestLicenseCliStatus: + def test_status_without_activation_exits_nonzero(self, unactivated_license_manager): + runner = CliRunner() + result = runner.invoke(license_app, ["status"]) + assert result.exit_code == 1 + assert "not activated" in result.stdout.lower() + + def test_status_with_active_license(self, activated_license_manager): + runner = CliRunner() + result = runner.invoke(license_app, ["status"]) + assert result.exit_code == 0 + assert "active" in result.stdout.lower() + assert "Test User" in result.stdout + + def test_status_json_emits_valid_json(self, activated_license_manager): + runner = CliRunner() + result = runner.invoke(license_app, ["status", "--json"]) + assert result.exit_code == 0 + data = json.loads(result.stdout) + assert data["activated"] is True + assert data["name"] == "Test User" + assert data["tier"] == "core" + assert isinstance(data["features"], list) + assert data["days_remaining"] >= 0 + + +class TestLicenseCliTrial: + def test_trial_issues_one_year_license(self, unactivated_license_manager): + runner = CliRunner() + result = runner.invoke(license_app, [ + "trial", "--name", "Trial User", "--email", "trial@example.com", + ]) + assert result.exit_code == 0 + assert "Trial issued" in result.stdout + # And the manager now sees it as active. + from src.license import LicenseManager + mgr = LicenseManager() + assert mgr.is_valid() + lic = mgr.load() + assert lic.tier.value == "trial" + + def test_trial_rejects_bad_email(self, unactivated_license_manager): + runner = CliRunner() + result = runner.invoke(license_app, [ + "trial", "--name", "T", "--email", "not-an-email", + ]) + assert result.exit_code == 2 + # ``typer.echo(..., err=True)`` lands in ``result.output`` when + # ``mix_stderr`` is the default True; ``result.stdout`` only has + # the bare stdout. + assert "valid email" in result.output.lower() + + +class TestLicenseCliActivate: + def _make_blob(self, name="Buyer", email="buyer@example.com", tier="core"): + """Mint a blob via the same machinery scripts/generate_license.py uses.""" + from src.license import LicenseManager, Tier + from src.license.crypto import encode_blob + # Use a throwaway manager (separate path) so we don't trample + # the one the test is exercising. + from tempfile import mkstemp + _, p = mkstemp(suffix=".json") + mgr = LicenseManager() + mgr._path = Path(p) + lic = mgr._mint(name=name, email=email, tier=Tier(tier)) + Path(p).unlink(missing_ok=True) + return encode_blob(lic.to_dict()) + + def test_activate_round_trip(self, unactivated_license_manager): + blob = self._make_blob() + runner = CliRunner() + result = runner.invoke(license_app, [ + "activate", blob, + "--name", "Buyer", "--email", "buyer@example.com", + ]) + assert result.exit_code == 0 + assert "Activated" in result.stdout + # State is now active. + from src.license import LicenseManager + assert LicenseManager().is_valid() + + def test_activate_rejects_wrong_name(self, unactivated_license_manager): + blob = self._make_blob() + runner = CliRunner() + result = runner.invoke(license_app, [ + "activate", blob, + "--name", "Wrong Person", "--email", "buyer@example.com", + ]) + assert result.exit_code == 2 + assert "do not match" in result.output.lower() + + +class TestLicenseCliRenew: + def test_renew_extends_expiry(self, activated_license_manager): + # Mint a longer-duration blob for the same buyer. + from src.license import LicenseManager, Tier + from src.license.crypto import encode_blob + from tempfile import mkstemp + _, p = mkstemp(suffix=".json") + other = LicenseManager() + other._path = Path(p) + lic = other._mint( + name="Test User", email="test@example.com", + tier=Tier.CORE, years=2, + ) + Path(p).unlink(missing_ok=True) + blob = encode_blob(lic.to_dict()) + + runner = CliRunner() + result = runner.invoke(license_app, ["renew", blob]) + assert result.exit_code == 0 + assert "Renewed" in result.stdout + + +class TestLicenseCliDeactivate: + def test_deactivate_with_yes(self, activated_license_manager): + runner = CliRunner() + result = runner.invoke(license_app, ["deactivate", "--yes"]) + assert result.exit_code == 0 + assert "Deactivated" in result.stdout + from src.license import LicenseManager + assert not LicenseManager().is_activated() + + +# --------------------------------------------------------------------------- +# Guard tests — every tool CLI refuses to run without a license +# --------------------------------------------------------------------------- + +class TestCliLicenseGuard: + """Run each tool CLI as a subprocess so we exercise the real + ``main()`` path, including the guard call. We bypass the suite's + DEV_MODE bypass by clearing the env var in the subprocess.""" + + @pytest.fixture + def clean_env(self, tmp_path): + """Subprocess env: no DEV_MODE, license path in tmp_path.""" + env = dict(os.environ) + env.pop("DATATOOLS_DEV_MODE", None) + env["DATATOOLS_LICENSE_PATH"] = str(tmp_path / "license.json") + return env + + def _run(self, env, *args, expect_success=False): + proc = subprocess.run( + [sys.executable, "-m", *args], + cwd=PROJECT_ROOT, + env=env, + capture_output=True, + text=True, + timeout=60, + ) + if expect_success: + assert proc.returncode == 0, ( + f"Expected success, got {proc.returncode}\n" + f"stdout:\n{proc.stdout}\nstderr:\n{proc.stderr}" + ) + return proc + + @pytest.mark.parametrize("module", [ + "src.cli", + "src.cli_text_clean", + "src.cli_format", + "src.cli_missing", + "src.cli_column_map", + "src.cli_pipeline", + "src.cli_analyze", + ]) + def test_cli_blocked_without_license(self, clean_env, module): + # Run with a dummy filename so we'd otherwise be running the + # tool; the guard should fire BEFORE typer parses argv. + proc = self._run(clean_env, module, "/nonexistent.csv") + assert proc.returncode == 2 + assert "not activated" in proc.stderr.lower() or "license" in proc.stderr.lower() + + @pytest.mark.parametrize("module", [ + "src.cli", + "src.cli_text_clean", + "src.cli_format", + "src.cli_missing", + "src.cli_column_map", + "src.cli_pipeline", + "src.cli_analyze", + ]) + def test_help_always_works(self, clean_env, module): + # ``--help`` must bypass the guard so users can see usage + # before they activate. + proc = self._run(clean_env, module, "--help", expect_success=True) + assert "usage" in (proc.stdout + proc.stderr).lower() + + def test_dev_mode_bypasses_guard(self, clean_env, tmp_path): + # Set DEV_MODE; the guard should allow the CLI to run (and + # then fail on the missing input file with a non-license + # error — which is what we're asserting via stderr). + env = dict(clean_env) + env["DATATOOLS_DEV_MODE"] = "1" + proc = self._run(env, "src.cli_analyze", "/nonexistent.csv") + # We expect typer / our code to fail on the missing path, + # NOT on the license. Look for evidence the license check + # was bypassed. + assert "not activated" not in proc.stderr.lower() + # Either pandas / our io.py raises a FileNotFoundError-ish. + combined = (proc.stdout + proc.stderr).lower() + assert "no such file" in combined or "not found" in combined or proc.returncode != 0 + + +class TestGuardBypassesHelp: + """Sanity check: ``--help`` and friends must skip the guard so the + Typer help screen renders even when no license is on disk.""" + + def test_runs_under_help_flag_without_license(self, tmp_path, monkeypatch): + # In-process check via the guard helper. + monkeypatch.delenv("DATATOOLS_DEV_MODE", raising=False) + monkeypatch.setenv("DATATOOLS_LICENSE_PATH", str(tmp_path / "license.json")) + from src.license.manager import reset_singleton_for_tests + reset_singleton_for_tests() + + monkeypatch.setattr("sys.argv", ["progname", "--help"]) + from src.cli_license_guard import guard + # No exception expected: --help bypasses. + guard() + + def test_blocks_under_real_command_without_license( + self, tmp_path, monkeypatch, + ): + monkeypatch.delenv("DATATOOLS_DEV_MODE", raising=False) + monkeypatch.setenv("DATATOOLS_LICENSE_PATH", str(tmp_path / "license.json")) + from src.license.manager import reset_singleton_for_tests + reset_singleton_for_tests() + + monkeypatch.setattr("sys.argv", ["progname", "input.csv", "--apply"]) + from src.cli_license_guard import guard + with pytest.raises(SystemExit) as ei: + guard() + assert ei.value.code == 2