"""LicenseManager — the public face of the license layer. Singleton-by-default (``get_manager()`` returns a process-wide instance), but tests can construct standalone managers via the constructor for full isolation. Lifecycle:: assert_production_safe() # guard against build-config errors mgr = get_manager() if not mgr.is_activated(): mgr.activate_from_blob(blob, name, email) mgr.require_feature(FeatureFlag.DEDUPLICATOR) state = mgr.current_state() # snapshot for the sidebar / CLI status """ from __future__ import annotations import os import re import sys import uuid from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Optional from . import crypto, storage from .errors import ( ExpiredLicenseError, InvalidLicenseError, LicenseError, NotActivatedError, UnsupportedFeatureError, ) from .features import all_features_for_tier from .schema import FeatureFlag, License, Tier, default_expiry_iso, _utcnow_iso # --------------------------------------------------------------------------- # State snapshot # --------------------------------------------------------------------------- @dataclass(frozen=True) class LicenseState: """A read-only snapshot for status widgets / CLI ``--status`` JSON. Always safe to render — even when no license is activated the dataclass is populated with explanatory defaults so the GUI never needs to None-check before formatting. """ activated: bool valid: bool # activated AND not expired AND signature OK name: str email: str tier: str license_key: str issued_at: str expires_at: str days_remaining: int features: tuple[str, ...] error_kind: str # "", "not_activated", "expired", "invalid" error_message: str def as_dict(self) -> dict: from dataclasses import asdict d = asdict(self) d["features"] = list(self.features) return d _EMPTY_STATE = LicenseState( activated=False, valid=False, name="", email="", tier="", license_key="", issued_at="", expires_at="", days_remaining=0, features=(), error_kind="not_activated", error_message="No license activated.", ) # --------------------------------------------------------------------------- # Manager # --------------------------------------------------------------------------- _EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") class LicenseManager: """Read/write license state. Cheap to construct; the singleton at module level just avoids reload churn. Storage path defaults to :func:`storage.default_license_path` — pass ``path=`` to override for tests. """ def __init__(self, *, path: Optional[Path] = None) -> None: self._path = path self._cached: Optional[License] = None self._dev_mode: Optional[bool] = None # --- Dev bypass --------------------------------------------------------- @property def dev_mode(self) -> bool: """``DATATOOLS_DEV_MODE=1`` short-circuits every check. Cached on the instance so a test that sets the env after construction still picks it up (re-read on each access). """ return _truthy_env("DATATOOLS_DEV_MODE") # --- Load / save -------------------------------------------------------- def load(self) -> Optional[License]: """Read + verify the on-disk license. Returns ``None`` when no file exists. Raises :class:`InvalidLicenseError` on signature mismatch / tampering.""" raw = storage.read_raw(self._path) if raw is None: self._cached = None return None lic = License.from_dict(raw) # Verify signature against the canonical payload. if not crypto.verify(lic.to_canonical_dict(), lic.signature): raise InvalidLicenseError( "License signature does not verify. The file may have " "been tampered with, or it was issued by a different " "build. Re-paste the original license blob to recover." ) self._cached = lic return lic def save(self, lic: License) -> Path: """Persist *lic* to the configured path. Caller is responsible for having signed the license already; this function does NOT re-sign.""" path = storage.write_raw(lic.to_dict(), self._path) self._cached = lic return path def deactivate(self) -> bool: """Remove the on-disk license. Returns whether a file was removed (False if nothing was active).""" self._cached = None return storage.remove(self._path) # --- Activation --------------------------------------------------------- def activate_from_blob( self, blob: str, *, name: str, email: str, ) -> License: """Verify *blob* and write the activated license to disk. The buyer pastes the blob; the page collects their *name* and *email* separately. We require both registered values to match the values embedded in the signed blob — defends against blob-sharing between buyers. """ _validate_registration(name, email) try: payload = crypto.decode_blob(blob) except ValueError as e: raise InvalidLicenseError(str(e)) from e signature = payload.get("signature", "") if not signature: raise InvalidLicenseError( "License blob is missing the ``signature`` field. " "The blob may have been truncated when pasted." ) canonical = {k: v for k, v in payload.items() if k != "signature"} if not crypto.verify(canonical, signature): raise InvalidLicenseError( "License blob signature did not verify. The blob may " "be corrupt, intended for a different product build, " "or modified after issue." ) # Reconstruct the License dataclass after verification so the # canonical dict we hashed matches the on-disk JSON. lic = License.from_dict(payload) # Personal-name and email matching is a soft attestation. We # enforce case-insensitive equality after stripping whitespace, # so " jane@Example.com " matches the embedded canonical # form without surprising the user about case. if name.strip().casefold() != lic.name.casefold() or ( email.strip().casefold() != lic.email.casefold() ): raise InvalidLicenseError( "Registered name / email do not match the values " "embedded in the license blob. Contact support if you " "believe this is in error." ) if lic.is_expired(): raise ExpiredLicenseError( f"License expired on {lic.expires_at}. " "Paste a renewal blob to extend access." ) self.save(lic) return lic def issue_trial(self, *, name: str, email: str, years: int = 1) -> License: """Self-sign a 1-year trial license. The seller's ``scripts/generate_license.py`` produces these for buyers; the same code path is reused at activation time as a fallback when a buyer wants to evaluate without a key. Trial licenses are functionally identical to CORE in v1; only the tier label differs (so the sidebar can say "TRIAL" if we ever want to nudge a conversion). """ _validate_registration(name, email) return self._mint(name=name, email=email, tier=Tier.TRIAL, years=years) def renew(self, blob: str) -> License: """Renew an existing license using a fresh blob. Verification: the blob must verify, its name+email must match the currently-active license, and its expiry must be in the future. We allow tier changes during renewal (upgrade path). """ current = self._cached or self.load() if current is None: raise NotActivatedError( "No active license to renew. Use ``activate`` instead " "of ``renew`` for first-time setup." ) try: payload = crypto.decode_blob(blob) except ValueError as e: raise InvalidLicenseError(str(e)) from e signature = payload.get("signature", "") canonical = {k: v for k, v in payload.items() if k != "signature"} if not crypto.verify(canonical, signature): raise InvalidLicenseError("Renewal blob signature did not verify.") lic = License.from_dict(payload) if ( lic.name.casefold() != current.name.casefold() or lic.email.casefold() != current.email.casefold() ): raise InvalidLicenseError( "Renewal blob is for a different name/email than the " "currently-active license." ) if lic.is_expired(): raise ExpiredLicenseError( "Renewal blob is itself expired. Generate a new one." ) self.save(lic) return lic # --- Inspection --------------------------------------------------------- def is_activated(self) -> bool: if self._cached is not None: return True return storage.read_raw(self._path) is not None def is_valid(self) -> bool: if self.dev_mode: return True try: lic = self._cached or self.load() except LicenseError: return False if lic is None: return False return not lic.is_expired() def current_state(self) -> LicenseState: if self.dev_mode: return LicenseState( activated=True, valid=True, name="dev", email="dev@local", tier=Tier.ENTERPRISE.value, license_key="DEV-BYPASS", issued_at=_utcnow_iso(), expires_at=default_expiry_iso(years=99), days_remaining=36500, features=all_features_for_tier(Tier.ENTERPRISE), error_kind="", error_message="", ) try: lic = self._cached or self.load() except InvalidLicenseError as e: return _EMPTY_STATE.__class__( activated=True, valid=False, name="", email="", tier="", license_key="", issued_at="", expires_at="", days_remaining=0, features=(), error_kind="invalid", error_message=str(e), ) if lic is None: return _EMPTY_STATE if lic.is_expired(): return LicenseState( activated=True, valid=False, name=lic.name, email=lic.email, tier=lic.tier.value, license_key=lic.license_key, issued_at=lic.issued_at, expires_at=lic.expires_at, days_remaining=lic.days_remaining(), features=lic.features, error_kind="expired", error_message=( f"License expired on {lic.expires_at}. " "Paste a renewal blob to extend access." ), ) return LicenseState( activated=True, valid=True, name=lic.name, email=lic.email, tier=lic.tier.value, license_key=lic.license_key, issued_at=lic.issued_at, expires_at=lic.expires_at, days_remaining=max(lic.days_remaining(), 0), features=lic.features, error_kind="", error_message="", ) def require_feature(self, feature: str | FeatureFlag) -> License: """Raise the right error if *feature* isn't accessible. Returns the active :class:`License` on success so callers can log the tier / days-remaining alongside their own work. """ if self.dev_mode: # Synthesize a dev license so callers expecting a return # value don't blow up. The dev license unlocks every flag. return License( name="dev", email="dev@local", license_key="DEV-BYPASS", tier=Tier.ENTERPRISE, features=all_features_for_tier(Tier.ENTERPRISE), issued_at=_utcnow_iso(), expires_at=default_expiry_iso(years=99), signature="", ) try: lic = self._cached or self.load() except InvalidLicenseError: raise if lic is None: raise NotActivatedError( "DataTools is not activated. Run " "``datatools-license activate `` or use the " "Activate page in the GUI." ) if lic.is_expired(): raise ExpiredLicenseError( f"License expired on {lic.expires_at}. " "Renew before continuing." ) if not lic.has_feature(feature): tier_name = lic.tier.value if isinstance(lic.tier, Tier) else lic.tier raise UnsupportedFeatureError( f"Feature {feature!r} is not enabled on the active " f"{tier_name!r} license." ) return lic # --- Internals --------------------------------------------------------- def _mint( self, *, name: str, email: str, tier: Tier, years: int = 1, license_key: Optional[str] = None, ) -> License: """Self-sign a new license. Used by ``issue_trial`` and by the seller-side key generation utility (which calls the same code via the bare manager).""" now = _utcnow_iso() exp = default_expiry_iso(years=years) features = all_features_for_tier(tier) key = license_key or _generate_license_key(tier) unsigned = License( name=name, email=email, license_key=key, tier=tier, features=features, issued_at=now, expires_at=exp, signature="", ) sig = crypto.sign(unsigned.to_canonical_dict()) signed = License( name=unsigned.name, email=unsigned.email, license_key=unsigned.license_key, tier=unsigned.tier, features=unsigned.features, issued_at=unsigned.issued_at, expires_at=unsigned.expires_at, signature=sig, ) self.save(signed) return signed def _generate_license_key(tier: Tier) -> str: """Human-readable but unguessable key id. Format: ``DT1-{TIER}-{8 hex}-{8 hex}``. The two random hex blocks come from a single UUID4 so the key has 64 bits of entropy. Not used as the cryptographic identity — that's the signature — but it's a stable handle for support emails. """ rid = uuid.uuid4().hex return f"DT1-{tier.value.upper()}-{rid[:8]}-{rid[8:16]}" def _validate_registration(name: str, email: str) -> None: """Reject obviously-bad inputs before touching crypto. The activation page should call this too so the error surfaces immediately instead of from inside the verifier. """ if not name or not name.strip(): raise InvalidLicenseError("Name is required for registration.") if not email or not _EMAIL_RE.match(email.strip()): raise InvalidLicenseError( f"{email!r} is not a valid email address. " "Expected: ``local@domain.tld``." ) def _truthy_env(name: str) -> bool: v = os.environ.get(name, "") return v.strip().lower() in {"1", "true", "yes", "on"} # --------------------------------------------------------------------------- # Singleton + module-level convenience # --------------------------------------------------------------------------- _singleton: Optional[LicenseManager] = None def get_manager() -> LicenseManager: """Return the process-wide :class:`LicenseManager`. Re-uses the same instance across imports so the GUI's sidebar, the chrome gate, and the CLI guard share one cached license read. Tests that need isolation should construct their own manager instead. """ global _singleton if _singleton is None: _singleton = LicenseManager() return _singleton def reset_singleton_for_tests() -> None: """Drop the cached singleton. Used by the test fixture so each test session starts with a fresh manager pointed at its tmp license path.""" global _singleton _singleton = None def current_state() -> LicenseState: return get_manager().current_state() def require_feature(feature: str | FeatureFlag) -> License: return get_manager().require_feature(feature) # --------------------------------------------------------------------------- # Production-build sanity check # --------------------------------------------------------------------------- class ProductionBuildError(RuntimeError): """Raised when a frozen / shipped build is misconfigured in a way that would defeat licensing. Always loud, always fatal — the binary must not boot in this state.""" def _is_shipped_build() -> bool: """True when running from a PyInstaller bundle (``sys.frozen``). Set automatically by PyInstaller; not set in source / pytest runs. The whole purpose of the prod-safe check is to enforce invariants that only matter in a shipped build, so the rest of the codebase can stay flexible. """ return getattr(sys, "frozen", False) def assert_production_safe() -> None: """Fail loudly if a shipped build is misconfigured. Two tripwires: 1. ``DATATOOLS_DEV_MODE`` is set in a frozen build. The dev-mode env var unconditionally bypasses license verification — if a buyer's installer somehow ships it enabled (build pipeline bug, mis-set environment), every license check is a no-op. Refuse to start instead. 2. The active verification key is still the dev key. The build pipeline is supposed to override ``DATATOOLS_LICENSE_PUBKEY`` with the production key; if it didn't, the binary will reject every legitimate license (signed with the prod private key) AND would *accept* anything signed with the dev key (which is checked into the source tree). Refuse to start. No-ops in non-frozen runs (development, tests) so the dev key + dev mode keep working in those contexts. Production builds call this from :func:`src.cli_license_guard.guard` and :func:`src.gui.components.hide_streamlit_chrome`. """ if not _is_shipped_build(): return if _truthy_env("DATATOOLS_DEV_MODE"): raise ProductionBuildError( "DATATOOLS_DEV_MODE is set in a shipped build. This env " "var disables every license check and must never be set " "on a buyer machine. If you see this message in a release " "build, the install was misconfigured — contact support." ) if crypto.is_using_dev_key(): raise ProductionBuildError( "Shipped build is verifying against the development " "license key. The build pipeline must set " "DATATOOLS_LICENSE_PUBKEY to the production public key " "before packaging. This binary will reject every real " "license blob — re-download from the official channel." )