"""Unit tests for the license layer. Covers: - Schema: License dataclass roundtrip + expiration helpers. - Crypto: HMAC sign/verify, tamper detection, blob encode/decode. - Manager: activation, renewal, deactivation, feature gating, expiration handling, dev-mode bypass, name/email mismatch rejection. The session-scoped autouse fixture in ``conftest.py`` sets ``DATATOOLS_DEV_MODE=1`` for the suite. Tests in this file that need the real check explicitly use the ``isolated_license_path`` fixture which clears it. """ from __future__ import annotations import json import os from datetime import datetime, timedelta, timezone from pathlib import Path import pytest from src.license import ( ExpiredLicenseError, FeatureFlag, InvalidLicenseError, License, LicenseError, LicenseManager, NotActivatedError, Tier, UnsupportedFeatureError, ) from src.license.crypto import ( decode_blob, encode_blob, is_using_dev_key, sign, verify, ) from src.license.features import FEATURES_BY_TIER, all_features_for_tier from src.license.schema import default_expiry_iso # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- class TestLicenseSchema: def _make(self, **overrides) -> License: defaults = dict( name="Jane Doe", email="jane@example.com", license_key="DT1-CORE-AAAA-BBBB", tier=Tier.CORE, features=("01_deduplicator",), issued_at="2026-05-13T00:00:00Z", expires_at="2027-05-13T00:00:00Z", signature="deadbeef", ) defaults.update(overrides) return License(**defaults) def test_to_dict_roundtrip(self): lic = self._make() again = License.from_dict(lic.to_dict()) assert again == lic def test_canonical_dict_excludes_signature(self): lic = self._make() canon = lic.to_canonical_dict() assert "signature" not in canon assert canon["name"] == "Jane Doe" def test_is_expired_false_when_future(self): future = (datetime.now(timezone.utc) + timedelta(days=30)).strftime( "%Y-%m-%dT%H:%M:%SZ" ) lic = self._make(expires_at=future) assert not lic.is_expired() assert lic.days_remaining() >= 29 def test_is_expired_true_when_past(self): past = (datetime.now(timezone.utc) - timedelta(days=1)).strftime( "%Y-%m-%dT%H:%M:%SZ" ) lic = self._make(expires_at=past) assert lic.is_expired() def test_has_feature_accepts_string_and_enum(self): lic = self._make(features=("01_deduplicator", "02_text_cleaner")) assert lic.has_feature("01_deduplicator") assert lic.has_feature(FeatureFlag.TEXT_CLEANER) assert not lic.has_feature(FeatureFlag.PIPELINE_RUNNER) def test_default_expiry_one_year_default(self): now = datetime(2026, 5, 13, tzinfo=timezone.utc) exp = default_expiry_iso(now=now) # One year from 2026-05-13 is 2027-05-13 (2027 not a leap year). assert exp.startswith("2027-05-13") def test_default_expiry_leap_day_fallback(self): # Feb 29 + 1y where target year (2027) isn't a leap year — we # slide to Feb 28. Pin that contract. leap = datetime(2024, 2, 29, tzinfo=timezone.utc) exp = default_expiry_iso(years=3, now=leap) # 2024 + 3 = 2027; not a leap year. assert exp.startswith("2027-02-28") # --------------------------------------------------------------------------- # Crypto # --------------------------------------------------------------------------- class TestSignAndVerify: def test_sign_is_deterministic(self): payload = {"a": 1, "b": "hello"} assert sign(payload) == sign(payload) def test_verify_accepts_matching_signature(self): payload = {"a": 1, "b": "hello"} sig = sign(payload) assert verify(payload, sig) is True def test_verify_rejects_modified_payload(self): payload = {"a": 1, "b": "hello"} sig = sign(payload) modified = dict(payload, b="goodbye") assert verify(modified, sig) is False def test_verify_rejects_modified_signature(self): payload = {"a": 1} sig = sign(payload) # Flip one nibble. bad = sig[:-1] + ("0" if sig[-1] != "0" else "1") assert verify(payload, bad) is False def test_sign_respects_privkey_env_override(self, monkeypatch): # Use a different valid Ed25519 private key (32 bytes hex). # Picked arbitrarily; doesn't need to match the dev key. alt_priv = "00" * 32 payload = {"a": 1} monkeypatch.setenv("DATATOOLS_LICENSE_PRIVKEY", alt_priv) alt_sig = sign(payload) monkeypatch.delenv("DATATOOLS_LICENSE_PRIVKEY", raising=False) default_sig = sign(payload) assert alt_sig != default_sig def test_verify_with_wrong_pubkey_returns_false(self, monkeypatch): # Sign with the dev key (default), then swap the pubkey and # confirm verification fails. payload = {"a": 1} sig = sign(payload) # 32-byte hex that isn't the matching dev pubkey. wrong_pub = "11" * 32 monkeypatch.setenv("DATATOOLS_LICENSE_PUBKEY", wrong_pub) assert verify(payload, sig) is False def test_canonical_form_is_key_order_invariant(self): a = {"x": 1, "y": 2} b = {"y": 2, "x": 1} assert sign(a) == sign(b) class TestBlobEncodeDecode: def test_roundtrip(self): payload = {"name": "Jane", "tier": "core", "signature": "abc"} blob = encode_blob(payload) again = decode_blob(blob) assert again == payload def test_blob_uses_v2_prefix(self): """v1.6 switched HMAC → Ed25519; blob version bumped to DTLIC2. Pin the prefix so any future scheme change is intentional.""" blob = encode_blob({"x": 1}) assert blob.startswith("DTLIC2:") def test_decode_rejects_missing_prefix(self): with pytest.raises(ValueError, match="DTLIC2"): decode_blob("not-a-blob") def test_decode_rejects_v1_blob_with_clear_message(self): """A v1 (HMAC) blob must surface a clear 'old format' message rather than 'signature mismatch' — buyers redeeming an old delivery email need to know to request a new blob.""" with pytest.raises(ValueError, match="DTLIC1"): decode_blob("DTLIC1:eyJhIjogMX0=") def test_decode_rejects_bad_base64(self): with pytest.raises(ValueError, match="base64"): decode_blob("DTLIC2:!!!notbase64!!!") def test_decode_rejects_truncated_blob(self): blob = encode_blob({"x": 1}) truncated = blob[:-5] with pytest.raises(ValueError): decode_blob(truncated) class TestDevKeypair: """The embedded dev keypair must match the seed phrase so anyone reproducing the build gets the same values. Catches a hand-edit to ``_dev_keypair.py`` that drifts the constants from the seed.""" def test_dev_keypair_matches_seed(self): from src.license._dev_keypair import ( DEV_PRIVATE_KEY_HEX, DEV_PUBLIC_KEY_HEX, _derive_from_seed, ) derived_priv, derived_pub = _derive_from_seed() assert derived_priv == DEV_PRIVATE_KEY_HEX assert derived_pub == DEV_PUBLIC_KEY_HEX def test_is_using_dev_key_true_by_default(self, monkeypatch): monkeypatch.delenv("DATATOOLS_LICENSE_PUBKEY", raising=False) assert is_using_dev_key() is True def test_is_using_dev_key_false_when_overridden(self, monkeypatch): monkeypatch.setenv("DATATOOLS_LICENSE_PUBKEY", "22" * 32) assert is_using_dev_key() is False class TestProductionSafe: """``assert_production_safe`` is a tripwire that fires only in frozen / shipped builds. Tests simulate the frozen state via monkeypatching ``sys.frozen``.""" def test_no_op_in_source_run(self): # Default test run: sys.frozen is unset; nothing should raise. from src.license import assert_production_safe assert_production_safe() # no exception def test_raises_on_dev_mode_in_frozen_build(self, monkeypatch): from src.license import ( ProductionBuildError, assert_production_safe, ) monkeypatch.setattr("sys.frozen", True, raising=False) monkeypatch.setenv("DATATOOLS_DEV_MODE", "1") # Override pubkey so the dev-key check doesn't fire first. monkeypatch.setenv("DATATOOLS_LICENSE_PUBKEY", "22" * 32) with pytest.raises(ProductionBuildError, match="DATATOOLS_DEV_MODE"): assert_production_safe() def test_raises_on_dev_key_in_frozen_build(self, monkeypatch): from src.license import ( ProductionBuildError, assert_production_safe, ) monkeypatch.setattr("sys.frozen", True, raising=False) monkeypatch.delenv("DATATOOLS_DEV_MODE", raising=False) monkeypatch.delenv("DATATOOLS_LICENSE_PUBKEY", raising=False) with pytest.raises(ProductionBuildError, match="development license key"): assert_production_safe() def test_passes_in_frozen_build_with_prod_pubkey(self, monkeypatch): from src.license import assert_production_safe monkeypatch.setattr("sys.frozen", True, raising=False) monkeypatch.delenv("DATATOOLS_DEV_MODE", raising=False) monkeypatch.setenv("DATATOOLS_LICENSE_PUBKEY", "22" * 32) # Should not raise. assert_production_safe() # --------------------------------------------------------------------------- # Features # --------------------------------------------------------------------------- class TestFeatures: def test_every_tier_has_features(self): for tier in Tier: assert FEATURES_BY_TIER[tier], ( f"tier {tier!r} has an empty feature set" ) def test_all_features_for_tier_returns_sorted_tuple(self): flags = all_features_for_tier(Tier.CORE) assert flags == tuple(sorted(flags)) def test_core_unlocks_every_tool(self): """v1 SKU contract: Core = all 9 tools.""" flags = set(all_features_for_tier(Tier.CORE)) assert {f.value for f in FeatureFlag} <= flags # --------------------------------------------------------------------------- # Manager: activation flow # --------------------------------------------------------------------------- class TestManagerActivation: def test_first_load_returns_none_when_no_file( self, unactivated_license_manager, ): assert unactivated_license_manager.load() is None assert not unactivated_license_manager.is_activated() assert not unactivated_license_manager.is_valid() def test_issue_trial_writes_file_and_returns_license( self, unactivated_license_manager, isolated_license_path, ): lic = unactivated_license_manager.issue_trial( name="Trial User", email="trial@example.com", ) assert lic.tier == Tier.TRIAL assert lic.name == "Trial User" assert isolated_license_path.exists() def test_trial_signature_round_trips( self, unactivated_license_manager, isolated_license_path, ): unactivated_license_manager.issue_trial( name="A", email="a@b.com", ) mgr2 = LicenseManager() lic2 = mgr2.load() assert lic2 is not None assert lic2.name == "A" def test_activate_from_blob_round_trips( self, unactivated_license_manager, ): # Use the manager itself to mint then re-activate from blob. mgr = unactivated_license_manager mgr.issue_trial(name="Buyer", email="buyer@example.com") lic = mgr.load() # Re-encode as if shipped via Gumroad. blob = encode_blob(lic.to_dict()) # Deactivate then re-activate from the blob. mgr.deactivate() mgr2 = LicenseManager() again = mgr2.activate_from_blob(blob, name="Buyer", email="buyer@example.com") assert again.license_key == lic.license_key def test_activate_rejects_wrong_name(self, unactivated_license_manager): mgr = unactivated_license_manager mgr.issue_trial(name="Buyer", email="buyer@example.com") lic = mgr.load() blob = encode_blob(lic.to_dict()) mgr.deactivate() with pytest.raises(InvalidLicenseError, match="do not match"): mgr.activate_from_blob( blob, name="Different Person", email="buyer@example.com", ) def test_activate_rejects_wrong_email(self, unactivated_license_manager): mgr = unactivated_license_manager mgr.issue_trial(name="Buyer", email="buyer@example.com") lic = mgr.load() blob = encode_blob(lic.to_dict()) mgr.deactivate() with pytest.raises(InvalidLicenseError, match="do not match"): mgr.activate_from_blob( blob, name="Buyer", email="someone-else@example.com", ) def test_activate_rejects_tampered_blob(self, unactivated_license_manager): mgr = unactivated_license_manager mgr.issue_trial(name="Buyer", email="buyer@example.com") lic = mgr.load() # Tamper: bump tier to enterprise without re-signing. raw = lic.to_dict() raw["tier"] = "enterprise" bad = encode_blob(raw) mgr.deactivate() with pytest.raises(InvalidLicenseError, match="signature"): mgr.activate_from_blob( bad, name="Buyer", email="buyer@example.com", ) def test_activate_rejects_invalid_email_format( self, unactivated_license_manager, ): mgr = unactivated_license_manager with pytest.raises(InvalidLicenseError, match="valid email"): mgr.activate_from_blob("anything", name="x", email="not-an-email") def test_deactivate_returns_false_when_no_file( self, unactivated_license_manager, ): assert unactivated_license_manager.deactivate() is False def test_deactivate_returns_true_after_activation( self, unactivated_license_manager, ): unactivated_license_manager.issue_trial( name="A", email="a@b.com", ) assert unactivated_license_manager.deactivate() is True # --------------------------------------------------------------------------- # Manager: expiration + renewal # --------------------------------------------------------------------------- class TestExpirationAndRenewal: def test_is_valid_false_when_expired( self, unactivated_license_manager, isolated_license_path, ): # Mint a license with an expiry in the past. from src.license import crypto as _crypto past = (datetime.now(timezone.utc) - timedelta(days=2)).strftime( "%Y-%m-%dT%H:%M:%SZ" ) lic = License( name="X", email="x@x.com", license_key="DT1-CORE-XXXX-YYYY", tier=Tier.CORE, features=all_features_for_tier(Tier.CORE), issued_at="2025-01-01T00:00:00Z", expires_at=past, signature="", ) sig = _crypto.sign(lic.to_canonical_dict()) signed = License( **{**lic.__dict__, "signature": sig}, ) unactivated_license_manager.save(signed) mgr2 = LicenseManager() assert mgr2.is_activated() assert not mgr2.is_valid() state = mgr2.current_state() assert state.error_kind == "expired" def test_renew_extends_expiry( self, unactivated_license_manager, ): mgr = unactivated_license_manager old = mgr.issue_trial(name="A", email="a@b.com", years=1) # Mint a fresh blob with a longer expiry. mgr2 = LicenseManager() new = mgr2._mint(name="A", email="a@b.com", tier=Tier.CORE, years=2) blob = encode_blob(new.to_dict()) # Renew via the manager. renewed = mgr.renew(blob) assert renewed.tier == Tier.CORE assert renewed.expires_dt > old.expires_dt def test_renew_rejects_for_different_buyer( self, unactivated_license_manager, ): mgr = unactivated_license_manager mgr.issue_trial(name="A", email="a@b.com") # Mint a blob for a DIFFERENT buyer. other = LicenseManager() # Use a separate path so other doesn't overwrite a's file. from tempfile import mkstemp _, p = mkstemp(suffix=".json") other._path = Path(p) other_lic = other._mint(name="B", email="b@c.com", tier=Tier.CORE) blob = encode_blob(other_lic.to_dict()) with pytest.raises(InvalidLicenseError, match="different name/email"): mgr.renew(blob) # --------------------------------------------------------------------------- # Manager: feature gating # --------------------------------------------------------------------------- class TestFeatureGating: def test_require_feature_passes_on_valid_license( self, activated_license_manager, ): # CORE unlocks every flag in v1. for flag in FeatureFlag: activated_license_manager.require_feature(flag) def test_require_feature_raises_not_activated( self, unactivated_license_manager, ): with pytest.raises(NotActivatedError): unactivated_license_manager.require_feature( FeatureFlag.DEDUPLICATOR, ) def test_require_feature_returns_license( self, activated_license_manager, ): lic = activated_license_manager.require_feature( FeatureFlag.DEDUPLICATOR, ) assert lic.name == "Test User" # --------------------------------------------------------------------------- # Manager: dev mode # --------------------------------------------------------------------------- class TestDevMode: def test_dev_mode_bypasses_validity_check( self, isolated_license_path, monkeypatch, ): monkeypatch.setenv("DATATOOLS_DEV_MODE", "1") mgr = LicenseManager() assert mgr.is_valid() is True # No license file exists. assert not isolated_license_path.exists() def test_dev_mode_state_reports_synthetic_license( self, isolated_license_path, monkeypatch, ): monkeypatch.setenv("DATATOOLS_DEV_MODE", "1") mgr = LicenseManager() state = mgr.current_state() assert state.activated is True assert state.valid is True assert state.tier == "enterprise" assert state.error_kind == "" def test_dev_mode_off_in_test_default_env_via_explicit_clear( self, isolated_license_path, monkeypatch, ): # ``isolated_license_path`` already clears DEV_MODE; double- # check that contract here so the broader suite can rely on it. assert "DATATOOLS_DEV_MODE" not in os.environ