"""Unit tests for the license layer. Covers: - Schema: License dataclass roundtrip + expiration helpers. - Crypto: HMAC sign/verify, tamper detection, blob encode/decode. - Manager: activation, renewal, deactivation, feature gating, expiration handling, dev-mode bypass, name/email mismatch rejection. The session-scoped autouse fixture in ``conftest.py`` sets ``DATATOOLS_DEV_MODE=1`` for the suite. Tests in this file that need the real check explicitly use the ``isolated_license_path`` fixture which clears it. """ from __future__ import annotations import json import os from datetime import datetime, timedelta, timezone from pathlib import Path import pytest from src.license import ( ExpiredLicenseError, FeatureFlag, InvalidLicenseError, License, LicenseError, LicenseManager, NotActivatedError, Tier, UnsupportedFeatureError, ) from src.license.crypto import ( _DEFAULT_SECRET, decode_blob, encode_blob, sign, verify, ) from src.license.features import FEATURES_BY_TIER, all_features_for_tier from src.license.schema import default_expiry_iso # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- class TestLicenseSchema: def _make(self, **overrides) -> License: defaults = dict( name="Jane Doe", email="jane@example.com", license_key="DT1-CORE-AAAA-BBBB", tier=Tier.CORE, features=("01_deduplicator",), issued_at="2026-05-13T00:00:00Z", expires_at="2027-05-13T00:00:00Z", signature="deadbeef", ) defaults.update(overrides) return License(**defaults) def test_to_dict_roundtrip(self): lic = self._make() again = License.from_dict(lic.to_dict()) assert again == lic def test_canonical_dict_excludes_signature(self): lic = self._make() canon = lic.to_canonical_dict() assert "signature" not in canon assert canon["name"] == "Jane Doe" def test_is_expired_false_when_future(self): future = (datetime.now(timezone.utc) + timedelta(days=30)).strftime( "%Y-%m-%dT%H:%M:%SZ" ) lic = self._make(expires_at=future) assert not lic.is_expired() assert lic.days_remaining() >= 29 def test_is_expired_true_when_past(self): past = (datetime.now(timezone.utc) - timedelta(days=1)).strftime( "%Y-%m-%dT%H:%M:%SZ" ) lic = self._make(expires_at=past) assert lic.is_expired() def test_has_feature_accepts_string_and_enum(self): lic = self._make(features=("01_deduplicator", "02_text_cleaner")) assert lic.has_feature("01_deduplicator") assert lic.has_feature(FeatureFlag.TEXT_CLEANER) assert not lic.has_feature(FeatureFlag.PIPELINE_RUNNER) def test_default_expiry_one_year_default(self): now = datetime(2026, 5, 13, tzinfo=timezone.utc) exp = default_expiry_iso(now=now) # One year from 2026-05-13 is 2027-05-13 (2027 not a leap year). assert exp.startswith("2027-05-13") def test_default_expiry_leap_day_fallback(self): # Feb 29 + 1y where target year (2027) isn't a leap year — we # slide to Feb 28. Pin that contract. leap = datetime(2024, 2, 29, tzinfo=timezone.utc) exp = default_expiry_iso(years=3, now=leap) # 2024 + 3 = 2027; not a leap year. assert exp.startswith("2027-02-28") # --------------------------------------------------------------------------- # Crypto # --------------------------------------------------------------------------- class TestSignAndVerify: def test_sign_is_deterministic(self): payload = {"a": 1, "b": "hello"} assert sign(payload) == sign(payload) def test_verify_accepts_matching_signature(self): payload = {"a": 1, "b": "hello"} sig = sign(payload) assert verify(payload, sig) is True def test_verify_rejects_modified_payload(self): payload = {"a": 1, "b": "hello"} sig = sign(payload) modified = dict(payload, b="goodbye") assert verify(modified, sig) is False def test_verify_rejects_modified_signature(self): payload = {"a": 1} sig = sign(payload) # Flip one nibble. bad = sig[:-1] + ("0" if sig[-1] != "0" else "1") assert verify(payload, bad) is False def test_sign_respects_secret_env_override(self, monkeypatch): payload = {"a": 1} monkeypatch.setenv("DATATOOLS_LICENSE_SECRET", "alternate") alt = sign(payload) monkeypatch.delenv("DATATOOLS_LICENSE_SECRET", raising=False) default = sign(payload) assert alt != default def test_canonical_form_is_key_order_invariant(self): a = {"x": 1, "y": 2} b = {"y": 2, "x": 1} assert sign(a) == sign(b) class TestBlobEncodeDecode: def test_roundtrip(self): payload = {"name": "Jane", "tier": "core", "signature": "abc"} blob = encode_blob(payload) again = decode_blob(blob) assert again == payload def test_blob_has_human_readable_prefix(self): blob = encode_blob({"x": 1}) assert blob.startswith("DTLIC1:") def test_decode_rejects_missing_prefix(self): with pytest.raises(ValueError, match="DTLIC1"): decode_blob("not-a-blob") def test_decode_rejects_bad_base64(self): with pytest.raises(ValueError, match="base64"): decode_blob("DTLIC1:!!!notbase64!!!") def test_decode_rejects_truncated_blob(self): blob = encode_blob({"x": 1}) truncated = blob[:-5] with pytest.raises(ValueError): decode_blob(truncated) # --------------------------------------------------------------------------- # Features # --------------------------------------------------------------------------- class TestFeatures: def test_every_tier_has_features(self): for tier in Tier: assert FEATURES_BY_TIER[tier], ( f"tier {tier!r} has an empty feature set" ) def test_all_features_for_tier_returns_sorted_tuple(self): flags = all_features_for_tier(Tier.CORE) assert flags == tuple(sorted(flags)) def test_core_unlocks_every_tool(self): """v1 SKU contract: Core = all 9 tools.""" flags = set(all_features_for_tier(Tier.CORE)) assert {f.value for f in FeatureFlag} <= flags # --------------------------------------------------------------------------- # Manager: activation flow # --------------------------------------------------------------------------- class TestManagerActivation: def test_first_load_returns_none_when_no_file( self, unactivated_license_manager, ): assert unactivated_license_manager.load() is None assert not unactivated_license_manager.is_activated() assert not unactivated_license_manager.is_valid() def test_issue_trial_writes_file_and_returns_license( self, unactivated_license_manager, isolated_license_path, ): lic = unactivated_license_manager.issue_trial( name="Trial User", email="trial@example.com", ) assert lic.tier == Tier.TRIAL assert lic.name == "Trial User" assert isolated_license_path.exists() def test_trial_signature_round_trips( self, unactivated_license_manager, isolated_license_path, ): unactivated_license_manager.issue_trial( name="A", email="a@b.com", ) mgr2 = LicenseManager() lic2 = mgr2.load() assert lic2 is not None assert lic2.name == "A" def test_activate_from_blob_round_trips( self, unactivated_license_manager, ): # Use the manager itself to mint then re-activate from blob. mgr = unactivated_license_manager mgr.issue_trial(name="Buyer", email="buyer@example.com") lic = mgr.load() # Re-encode as if shipped via Gumroad. blob = encode_blob(lic.to_dict()) # Deactivate then re-activate from the blob. mgr.deactivate() mgr2 = LicenseManager() again = mgr2.activate_from_blob(blob, name="Buyer", email="buyer@example.com") assert again.license_key == lic.license_key def test_activate_rejects_wrong_name(self, unactivated_license_manager): mgr = unactivated_license_manager mgr.issue_trial(name="Buyer", email="buyer@example.com") lic = mgr.load() blob = encode_blob(lic.to_dict()) mgr.deactivate() with pytest.raises(InvalidLicenseError, match="do not match"): mgr.activate_from_blob( blob, name="Different Person", email="buyer@example.com", ) def test_activate_rejects_wrong_email(self, unactivated_license_manager): mgr = unactivated_license_manager mgr.issue_trial(name="Buyer", email="buyer@example.com") lic = mgr.load() blob = encode_blob(lic.to_dict()) mgr.deactivate() with pytest.raises(InvalidLicenseError, match="do not match"): mgr.activate_from_blob( blob, name="Buyer", email="someone-else@example.com", ) def test_activate_rejects_tampered_blob(self, unactivated_license_manager): mgr = unactivated_license_manager mgr.issue_trial(name="Buyer", email="buyer@example.com") lic = mgr.load() # Tamper: bump tier to enterprise without re-signing. raw = lic.to_dict() raw["tier"] = "enterprise" bad = encode_blob(raw) mgr.deactivate() with pytest.raises(InvalidLicenseError, match="signature"): mgr.activate_from_blob( bad, name="Buyer", email="buyer@example.com", ) def test_activate_rejects_invalid_email_format( self, unactivated_license_manager, ): mgr = unactivated_license_manager with pytest.raises(InvalidLicenseError, match="valid email"): mgr.activate_from_blob("anything", name="x", email="not-an-email") def test_deactivate_returns_false_when_no_file( self, unactivated_license_manager, ): assert unactivated_license_manager.deactivate() is False def test_deactivate_returns_true_after_activation( self, unactivated_license_manager, ): unactivated_license_manager.issue_trial( name="A", email="a@b.com", ) assert unactivated_license_manager.deactivate() is True # --------------------------------------------------------------------------- # Manager: expiration + renewal # --------------------------------------------------------------------------- class TestExpirationAndRenewal: def test_is_valid_false_when_expired( self, unactivated_license_manager, isolated_license_path, ): # Mint a license with an expiry in the past. from src.license import crypto as _crypto past = (datetime.now(timezone.utc) - timedelta(days=2)).strftime( "%Y-%m-%dT%H:%M:%SZ" ) lic = License( name="X", email="x@x.com", license_key="DT1-CORE-XXXX-YYYY", tier=Tier.CORE, features=all_features_for_tier(Tier.CORE), issued_at="2025-01-01T00:00:00Z", expires_at=past, signature="", ) sig = _crypto.sign(lic.to_canonical_dict()) signed = License( **{**lic.__dict__, "signature": sig}, ) unactivated_license_manager.save(signed) mgr2 = LicenseManager() assert mgr2.is_activated() assert not mgr2.is_valid() state = mgr2.current_state() assert state.error_kind == "expired" def test_renew_extends_expiry( self, unactivated_license_manager, ): mgr = unactivated_license_manager old = mgr.issue_trial(name="A", email="a@b.com", years=1) # Mint a fresh blob with a longer expiry. mgr2 = LicenseManager() new = mgr2._mint(name="A", email="a@b.com", tier=Tier.CORE, years=2) blob = encode_blob(new.to_dict()) # Renew via the manager. renewed = mgr.renew(blob) assert renewed.tier == Tier.CORE assert renewed.expires_dt > old.expires_dt def test_renew_rejects_for_different_buyer( self, unactivated_license_manager, ): mgr = unactivated_license_manager mgr.issue_trial(name="A", email="a@b.com") # Mint a blob for a DIFFERENT buyer. other = LicenseManager() # Use a separate path so other doesn't overwrite a's file. from tempfile import mkstemp _, p = mkstemp(suffix=".json") other._path = Path(p) other_lic = other._mint(name="B", email="b@c.com", tier=Tier.CORE) blob = encode_blob(other_lic.to_dict()) with pytest.raises(InvalidLicenseError, match="different name/email"): mgr.renew(blob) # --------------------------------------------------------------------------- # Manager: feature gating # --------------------------------------------------------------------------- class TestFeatureGating: def test_require_feature_passes_on_valid_license( self, activated_license_manager, ): # CORE unlocks every flag in v1. for flag in FeatureFlag: activated_license_manager.require_feature(flag) def test_require_feature_raises_not_activated( self, unactivated_license_manager, ): with pytest.raises(NotActivatedError): unactivated_license_manager.require_feature( FeatureFlag.DEDUPLICATOR, ) def test_require_feature_returns_license( self, activated_license_manager, ): lic = activated_license_manager.require_feature( FeatureFlag.DEDUPLICATOR, ) assert lic.name == "Test User" # --------------------------------------------------------------------------- # Manager: dev mode # --------------------------------------------------------------------------- class TestDevMode: def test_dev_mode_bypasses_validity_check( self, isolated_license_path, monkeypatch, ): monkeypatch.setenv("DATATOOLS_DEV_MODE", "1") mgr = LicenseManager() assert mgr.is_valid() is True # No license file exists. assert not isolated_license_path.exists() def test_dev_mode_state_reports_synthetic_license( self, isolated_license_path, monkeypatch, ): monkeypatch.setenv("DATATOOLS_DEV_MODE", "1") mgr = LicenseManager() state = mgr.current_state() assert state.activated is True assert state.valid is True assert state.tier == "enterprise" assert state.error_kind == "" def test_dev_mode_off_in_test_default_env_via_explicit_clear( self, isolated_license_path, monkeypatch, ): # ``isolated_license_path`` already clears DEV_MODE; double- # check that contract here so the broader suite can rely on it. assert "DATATOOLS_DEV_MODE" not in os.environ