Files
datatools-dev/tests/test_license.py
Michael e534fb4989 sec(license): Ed25519 sigs + production-safe tripwire
Two coupled hardening upgrades.

1. Asymmetric signatures (HMAC → Ed25519)

The previous HMAC scheme used a symmetric secret that any motivated
reverse engineer could pull out of the shipped binary and use to
mint blobs for any tier / name / email. With Ed25519, the binary
ships only the public verification key; the signing key never
leaves the seller's environment, so binary compromise no longer
yields forgery.

- src/license/crypto.py rewritten around
  cryptography.hazmat.primitives.asymmetric.ed25519. Same public
  API surface (sign/verify/encode_blob/decode_blob), same canonical
  JSON encoding — drop-in for the manager / cli / GUI layers.
- DATATOOLS_LICENSE_PRIVKEY (seller-side) and
  DATATOOLS_LICENSE_PUBKEY (build-time) env vars supply the keys;
  the in-source dev keypair (src/license/_dev_keypair.py)
  deterministically derives from a seed phrase for repro builds and
  tests.
- Blob prefix bumped DTLIC1: → DTLIC2:. Decoding a DTLIC1 blob
  surfaces a clear "old format" error rather than a confusing
  signature mismatch.
- scripts/generate_keypair.py mints fresh production keypairs for
  the seller (run once, stash the private key offline). Adds
  cryptography>=41,<46 to requirements.txt (was an undeclared
  transitive dep).

2. Production-safe tripwire

assert_production_safe() refuses to boot a frozen / shipped build
when either:

- DATATOOLS_DEV_MODE=1 is set (would unconditionally bypass every
  license check — fine in source/test but catastrophic in a buyer
  install).
- The active verification key is still the embedded dev key (the
  build pipeline forgot to set DATATOOLS_LICENSE_PUBKEY).

No-op in source / pytest runs (sys.frozen is unset) so test
fixtures and dev workflows keep working without ceremony. Called
from src/cli_license_guard.guard() and from hide_streamlit_chrome
— so it fires on every CLI invocation and every GUI page load.

Tests: 49 license-layer unit tests (was 40); added Ed25519
wrong-key rejection, dev-keypair seed pin, blob v2 prefix, v1
rejection with clear message, and four production-safe scenarios
(no-op in source, fires on DEV_MODE in frozen, fires on dev key in
frozen, passes in frozen with prod pubkey). Total: 2024 → 2033.

Docs (REQUIREMENTS §17a, DEVELOPER licensing recipe, DECISIONS
§9b + decision log) updated with the new threat-model write-up,
key-storage workflow, and tripwire behaviour.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 17:34:48 +00:00

519 lines
19 KiB
Python

"""Unit tests for the license layer.
Covers:
- Schema: License dataclass roundtrip + expiration helpers.
- Crypto: HMAC sign/verify, tamper detection, blob encode/decode.
- Manager: activation, renewal, deactivation, feature gating,
expiration handling, dev-mode bypass, name/email mismatch
rejection.
The session-scoped autouse fixture in ``conftest.py`` sets
``DATATOOLS_DEV_MODE=1`` for the suite. Tests in this file that need
the real check explicitly use the ``isolated_license_path`` fixture
which clears it.
"""
from __future__ import annotations
import json
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
from src.license import (
ExpiredLicenseError,
FeatureFlag,
InvalidLicenseError,
License,
LicenseError,
LicenseManager,
NotActivatedError,
Tier,
UnsupportedFeatureError,
)
from src.license.crypto import (
decode_blob,
encode_blob,
is_using_dev_key,
sign,
verify,
)
from src.license.features import FEATURES_BY_TIER, all_features_for_tier
from src.license.schema import default_expiry_iso
# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
class TestLicenseSchema:
def _make(self, **overrides) -> License:
defaults = dict(
name="Jane Doe",
email="jane@example.com",
license_key="DT1-CORE-AAAA-BBBB",
tier=Tier.CORE,
features=("01_deduplicator",),
issued_at="2026-05-13T00:00:00Z",
expires_at="2027-05-13T00:00:00Z",
signature="deadbeef",
)
defaults.update(overrides)
return License(**defaults)
def test_to_dict_roundtrip(self):
lic = self._make()
again = License.from_dict(lic.to_dict())
assert again == lic
def test_canonical_dict_excludes_signature(self):
lic = self._make()
canon = lic.to_canonical_dict()
assert "signature" not in canon
assert canon["name"] == "Jane Doe"
def test_is_expired_false_when_future(self):
future = (datetime.now(timezone.utc) + timedelta(days=30)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
lic = self._make(expires_at=future)
assert not lic.is_expired()
assert lic.days_remaining() >= 29
def test_is_expired_true_when_past(self):
past = (datetime.now(timezone.utc) - timedelta(days=1)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
lic = self._make(expires_at=past)
assert lic.is_expired()
def test_has_feature_accepts_string_and_enum(self):
lic = self._make(features=("01_deduplicator", "02_text_cleaner"))
assert lic.has_feature("01_deduplicator")
assert lic.has_feature(FeatureFlag.TEXT_CLEANER)
assert not lic.has_feature(FeatureFlag.PIPELINE_RUNNER)
def test_default_expiry_one_year_default(self):
now = datetime(2026, 5, 13, tzinfo=timezone.utc)
exp = default_expiry_iso(now=now)
# One year from 2026-05-13 is 2027-05-13 (2027 not a leap year).
assert exp.startswith("2027-05-13")
def test_default_expiry_leap_day_fallback(self):
# Feb 29 + 1y where target year (2027) isn't a leap year — we
# slide to Feb 28. Pin that contract.
leap = datetime(2024, 2, 29, tzinfo=timezone.utc)
exp = default_expiry_iso(years=3, now=leap)
# 2024 + 3 = 2027; not a leap year.
assert exp.startswith("2027-02-28")
# ---------------------------------------------------------------------------
# Crypto
# ---------------------------------------------------------------------------
class TestSignAndVerify:
def test_sign_is_deterministic(self):
payload = {"a": 1, "b": "hello"}
assert sign(payload) == sign(payload)
def test_verify_accepts_matching_signature(self):
payload = {"a": 1, "b": "hello"}
sig = sign(payload)
assert verify(payload, sig) is True
def test_verify_rejects_modified_payload(self):
payload = {"a": 1, "b": "hello"}
sig = sign(payload)
modified = dict(payload, b="goodbye")
assert verify(modified, sig) is False
def test_verify_rejects_modified_signature(self):
payload = {"a": 1}
sig = sign(payload)
# Flip one nibble.
bad = sig[:-1] + ("0" if sig[-1] != "0" else "1")
assert verify(payload, bad) is False
def test_sign_respects_privkey_env_override(self, monkeypatch):
# Use a different valid Ed25519 private key (32 bytes hex).
# Picked arbitrarily; doesn't need to match the dev key.
alt_priv = "00" * 32
payload = {"a": 1}
monkeypatch.setenv("DATATOOLS_LICENSE_PRIVKEY", alt_priv)
alt_sig = sign(payload)
monkeypatch.delenv("DATATOOLS_LICENSE_PRIVKEY", raising=False)
default_sig = sign(payload)
assert alt_sig != default_sig
def test_verify_with_wrong_pubkey_returns_false(self, monkeypatch):
# Sign with the dev key (default), then swap the pubkey and
# confirm verification fails.
payload = {"a": 1}
sig = sign(payload)
# 32-byte hex that isn't the matching dev pubkey.
wrong_pub = "11" * 32
monkeypatch.setenv("DATATOOLS_LICENSE_PUBKEY", wrong_pub)
assert verify(payload, sig) is False
def test_canonical_form_is_key_order_invariant(self):
a = {"x": 1, "y": 2}
b = {"y": 2, "x": 1}
assert sign(a) == sign(b)
class TestBlobEncodeDecode:
def test_roundtrip(self):
payload = {"name": "Jane", "tier": "core", "signature": "abc"}
blob = encode_blob(payload)
again = decode_blob(blob)
assert again == payload
def test_blob_uses_v2_prefix(self):
"""v1.6 switched HMAC → Ed25519; blob version bumped to DTLIC2.
Pin the prefix so any future scheme change is intentional."""
blob = encode_blob({"x": 1})
assert blob.startswith("DTLIC2:")
def test_decode_rejects_missing_prefix(self):
with pytest.raises(ValueError, match="DTLIC2"):
decode_blob("not-a-blob")
def test_decode_rejects_v1_blob_with_clear_message(self):
"""A v1 (HMAC) blob must surface a clear 'old format' message
rather than 'signature mismatch' — buyers redeeming an old
delivery email need to know to request a new blob."""
with pytest.raises(ValueError, match="DTLIC1"):
decode_blob("DTLIC1:eyJhIjogMX0=")
def test_decode_rejects_bad_base64(self):
with pytest.raises(ValueError, match="base64"):
decode_blob("DTLIC2:!!!notbase64!!!")
def test_decode_rejects_truncated_blob(self):
blob = encode_blob({"x": 1})
truncated = blob[:-5]
with pytest.raises(ValueError):
decode_blob(truncated)
class TestDevKeypair:
"""The embedded dev keypair must match the seed phrase so anyone
reproducing the build gets the same values. Catches a hand-edit
to ``_dev_keypair.py`` that drifts the constants from the seed."""
def test_dev_keypair_matches_seed(self):
from src.license._dev_keypair import (
DEV_PRIVATE_KEY_HEX,
DEV_PUBLIC_KEY_HEX,
_derive_from_seed,
)
derived_priv, derived_pub = _derive_from_seed()
assert derived_priv == DEV_PRIVATE_KEY_HEX
assert derived_pub == DEV_PUBLIC_KEY_HEX
def test_is_using_dev_key_true_by_default(self, monkeypatch):
monkeypatch.delenv("DATATOOLS_LICENSE_PUBKEY", raising=False)
assert is_using_dev_key() is True
def test_is_using_dev_key_false_when_overridden(self, monkeypatch):
monkeypatch.setenv("DATATOOLS_LICENSE_PUBKEY", "22" * 32)
assert is_using_dev_key() is False
class TestProductionSafe:
"""``assert_production_safe`` is a tripwire that fires only in
frozen / shipped builds. Tests simulate the frozen state via
monkeypatching ``sys.frozen``."""
def test_no_op_in_source_run(self):
# Default test run: sys.frozen is unset; nothing should raise.
from src.license import assert_production_safe
assert_production_safe() # no exception
def test_raises_on_dev_mode_in_frozen_build(self, monkeypatch):
from src.license import (
ProductionBuildError,
assert_production_safe,
)
monkeypatch.setattr("sys.frozen", True, raising=False)
monkeypatch.setenv("DATATOOLS_DEV_MODE", "1")
# Override pubkey so the dev-key check doesn't fire first.
monkeypatch.setenv("DATATOOLS_LICENSE_PUBKEY", "22" * 32)
with pytest.raises(ProductionBuildError, match="DATATOOLS_DEV_MODE"):
assert_production_safe()
def test_raises_on_dev_key_in_frozen_build(self, monkeypatch):
from src.license import (
ProductionBuildError,
assert_production_safe,
)
monkeypatch.setattr("sys.frozen", True, raising=False)
monkeypatch.delenv("DATATOOLS_DEV_MODE", raising=False)
monkeypatch.delenv("DATATOOLS_LICENSE_PUBKEY", raising=False)
with pytest.raises(ProductionBuildError, match="development license key"):
assert_production_safe()
def test_passes_in_frozen_build_with_prod_pubkey(self, monkeypatch):
from src.license import assert_production_safe
monkeypatch.setattr("sys.frozen", True, raising=False)
monkeypatch.delenv("DATATOOLS_DEV_MODE", raising=False)
monkeypatch.setenv("DATATOOLS_LICENSE_PUBKEY", "22" * 32)
# Should not raise.
assert_production_safe()
# ---------------------------------------------------------------------------
# Features
# ---------------------------------------------------------------------------
class TestFeatures:
def test_every_tier_has_features(self):
for tier in Tier:
assert FEATURES_BY_TIER[tier], (
f"tier {tier!r} has an empty feature set"
)
def test_all_features_for_tier_returns_sorted_tuple(self):
flags = all_features_for_tier(Tier.CORE)
assert flags == tuple(sorted(flags))
def test_core_unlocks_every_tool(self):
"""v1 SKU contract: Core = all 9 tools."""
flags = set(all_features_for_tier(Tier.CORE))
assert {f.value for f in FeatureFlag} <= flags
# ---------------------------------------------------------------------------
# Manager: activation flow
# ---------------------------------------------------------------------------
class TestManagerActivation:
def test_first_load_returns_none_when_no_file(
self, unactivated_license_manager,
):
assert unactivated_license_manager.load() is None
assert not unactivated_license_manager.is_activated()
assert not unactivated_license_manager.is_valid()
def test_issue_trial_writes_file_and_returns_license(
self, unactivated_license_manager, isolated_license_path,
):
lic = unactivated_license_manager.issue_trial(
name="Trial User", email="trial@example.com",
)
assert lic.tier == Tier.TRIAL
assert lic.name == "Trial User"
assert isolated_license_path.exists()
def test_trial_signature_round_trips(
self, unactivated_license_manager, isolated_license_path,
):
unactivated_license_manager.issue_trial(
name="A", email="a@b.com",
)
mgr2 = LicenseManager()
lic2 = mgr2.load()
assert lic2 is not None
assert lic2.name == "A"
def test_activate_from_blob_round_trips(
self, unactivated_license_manager,
):
# Use the manager itself to mint then re-activate from blob.
mgr = unactivated_license_manager
mgr.issue_trial(name="Buyer", email="buyer@example.com")
lic = mgr.load()
# Re-encode as if shipped via Gumroad.
blob = encode_blob(lic.to_dict())
# Deactivate then re-activate from the blob.
mgr.deactivate()
mgr2 = LicenseManager()
again = mgr2.activate_from_blob(blob, name="Buyer", email="buyer@example.com")
assert again.license_key == lic.license_key
def test_activate_rejects_wrong_name(self, unactivated_license_manager):
mgr = unactivated_license_manager
mgr.issue_trial(name="Buyer", email="buyer@example.com")
lic = mgr.load()
blob = encode_blob(lic.to_dict())
mgr.deactivate()
with pytest.raises(InvalidLicenseError, match="do not match"):
mgr.activate_from_blob(
blob, name="Different Person", email="buyer@example.com",
)
def test_activate_rejects_wrong_email(self, unactivated_license_manager):
mgr = unactivated_license_manager
mgr.issue_trial(name="Buyer", email="buyer@example.com")
lic = mgr.load()
blob = encode_blob(lic.to_dict())
mgr.deactivate()
with pytest.raises(InvalidLicenseError, match="do not match"):
mgr.activate_from_blob(
blob, name="Buyer", email="someone-else@example.com",
)
def test_activate_rejects_tampered_blob(self, unactivated_license_manager):
mgr = unactivated_license_manager
mgr.issue_trial(name="Buyer", email="buyer@example.com")
lic = mgr.load()
# Tamper: bump tier to enterprise without re-signing.
raw = lic.to_dict()
raw["tier"] = "enterprise"
bad = encode_blob(raw)
mgr.deactivate()
with pytest.raises(InvalidLicenseError, match="signature"):
mgr.activate_from_blob(
bad, name="Buyer", email="buyer@example.com",
)
def test_activate_rejects_invalid_email_format(
self, unactivated_license_manager,
):
mgr = unactivated_license_manager
with pytest.raises(InvalidLicenseError, match="valid email"):
mgr.activate_from_blob("anything", name="x", email="not-an-email")
def test_deactivate_returns_false_when_no_file(
self, unactivated_license_manager,
):
assert unactivated_license_manager.deactivate() is False
def test_deactivate_returns_true_after_activation(
self, unactivated_license_manager,
):
unactivated_license_manager.issue_trial(
name="A", email="a@b.com",
)
assert unactivated_license_manager.deactivate() is True
# ---------------------------------------------------------------------------
# Manager: expiration + renewal
# ---------------------------------------------------------------------------
class TestExpirationAndRenewal:
def test_is_valid_false_when_expired(
self, unactivated_license_manager, isolated_license_path,
):
# Mint a license with an expiry in the past.
from src.license import crypto as _crypto
past = (datetime.now(timezone.utc) - timedelta(days=2)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
lic = License(
name="X", email="x@x.com",
license_key="DT1-CORE-XXXX-YYYY",
tier=Tier.CORE,
features=all_features_for_tier(Tier.CORE),
issued_at="2025-01-01T00:00:00Z",
expires_at=past,
signature="",
)
sig = _crypto.sign(lic.to_canonical_dict())
signed = License(
**{**lic.__dict__, "signature": sig},
)
unactivated_license_manager.save(signed)
mgr2 = LicenseManager()
assert mgr2.is_activated()
assert not mgr2.is_valid()
state = mgr2.current_state()
assert state.error_kind == "expired"
def test_renew_extends_expiry(
self, unactivated_license_manager,
):
mgr = unactivated_license_manager
old = mgr.issue_trial(name="A", email="a@b.com", years=1)
# Mint a fresh blob with a longer expiry.
mgr2 = LicenseManager()
new = mgr2._mint(name="A", email="a@b.com", tier=Tier.CORE, years=2)
blob = encode_blob(new.to_dict())
# Renew via the manager.
renewed = mgr.renew(blob)
assert renewed.tier == Tier.CORE
assert renewed.expires_dt > old.expires_dt
def test_renew_rejects_for_different_buyer(
self, unactivated_license_manager,
):
mgr = unactivated_license_manager
mgr.issue_trial(name="A", email="a@b.com")
# Mint a blob for a DIFFERENT buyer.
other = LicenseManager()
# Use a separate path so other doesn't overwrite a's file.
from tempfile import mkstemp
_, p = mkstemp(suffix=".json")
other._path = Path(p)
other_lic = other._mint(name="B", email="b@c.com", tier=Tier.CORE)
blob = encode_blob(other_lic.to_dict())
with pytest.raises(InvalidLicenseError, match="different name/email"):
mgr.renew(blob)
# ---------------------------------------------------------------------------
# Manager: feature gating
# ---------------------------------------------------------------------------
class TestFeatureGating:
def test_require_feature_passes_on_valid_license(
self, activated_license_manager,
):
# CORE unlocks every flag in v1.
for flag in FeatureFlag:
activated_license_manager.require_feature(flag)
def test_require_feature_raises_not_activated(
self, unactivated_license_manager,
):
with pytest.raises(NotActivatedError):
unactivated_license_manager.require_feature(
FeatureFlag.DEDUPLICATOR,
)
def test_require_feature_returns_license(
self, activated_license_manager,
):
lic = activated_license_manager.require_feature(
FeatureFlag.DEDUPLICATOR,
)
assert lic.name == "Test User"
# ---------------------------------------------------------------------------
# Manager: dev mode
# ---------------------------------------------------------------------------
class TestDevMode:
def test_dev_mode_bypasses_validity_check(
self, isolated_license_path, monkeypatch,
):
monkeypatch.setenv("DATATOOLS_DEV_MODE", "1")
mgr = LicenseManager()
assert mgr.is_valid() is True
# No license file exists.
assert not isolated_license_path.exists()
def test_dev_mode_state_reports_synthetic_license(
self, isolated_license_path, monkeypatch,
):
monkeypatch.setenv("DATATOOLS_DEV_MODE", "1")
mgr = LicenseManager()
state = mgr.current_state()
assert state.activated is True
assert state.valid is True
assert state.tier == "enterprise"
assert state.error_kind == ""
def test_dev_mode_off_in_test_default_env_via_explicit_clear(
self, isolated_license_path, monkeypatch,
):
# ``isolated_license_path`` already clears DEV_MODE; double-
# check that contract here so the broader suite can rely on it.
assert "DATATOOLS_DEV_MODE" not in os.environ