"""Audit-log smoke tests. The audit log was rewritten to use an async queue + a background writer thread after a synchronous-write implementation hung the GUI on a hostile filesystem (Windows antivirus). These tests pin the critical guarantees: 1. ``log_event`` returns in microseconds (non-blocking). 2. Events queued before ``flush_audit_log`` land on disk in the correct JSONL shape. 3. Backpressure: when the queue is full, oldest events are dropped and newest ones survive (most-recent-events-are-most-diagnostic). 4. The writer thread tolerates an unwritable target without crashing or hanging. """ from __future__ import annotations import json import time from pathlib import Path import pytest @pytest.fixture def isolated_audit(monkeypatch, tmp_path): """Redirect audit writes into ``tmp_path`` and reset module state so each test starts fresh. The kill switch (``_DISABLED``) is bypassed for the duration of the test by patching ``_ENABLE_OVERRIDE`` directly — env vars won't reach the module-level constant after import.""" monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path)) from src import audit monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True) monkeypatch.setattr(audit, "_PROBE", "full") audit.reset_for_tests() yield audit # Best-effort cleanup so a runaway writer thread doesn't keep # touching the tmp_path after the test exits. audit.flush_audit_log(timeout_s=0.5) audit.reset_for_tests() def _read_all_events(audit_dir: Path) -> list[dict]: """Read every JSONL file in *audit_dir* and return parsed events.""" out: list[dict] = [] for f in sorted(audit_dir.glob("datatools-*.jsonl")): for line in f.read_text(encoding="utf-8").splitlines(): if line.strip(): out.append(json.loads(line)) return out class TestLogEventIsNonBlocking: def test_returns_in_under_one_millisecond(self, isolated_audit): """A pathological 1000-event burst should clear well under a second on the request path. The write happens off-thread.""" audit = isolated_audit start = time.perf_counter() for i in range(1000): audit.log_event("test", f"event {i}", n=i) elapsed = time.perf_counter() - start # 1000 events × 1ms ceiling = 1.0s. In practice this should # be milliseconds total since put_nowait is microseconds. assert elapsed < 1.0, ( f"log_event burst took {elapsed:.3f}s for 1000 events — " f"that's over 1ms per call. Async queue is regressing to " f"a sync write." ) class TestEventsLandOnDisk: def test_basic_round_trip(self, isolated_audit, tmp_path): audit = isolated_audit audit.log_event("upload", "Uploaded a.csv", filename="a.csv", bytes=42) audit.log_event("analyze", "Analyzed a.csv", findings=3) audit.flush_audit_log(timeout_s=2.0) events = _read_all_events(tmp_path) assert len(events) == 2 kinds = [e["category"] for e in events] assert "upload" in kinds assert "analyze" in kinds upload = next(e for e in events if e["category"] == "upload") assert upload["filename"] == "a.csv" assert upload["bytes"] == 42 assert "ts" in upload assert "session" in upload def test_session_start_renders(self, isolated_audit, tmp_path): audit = isolated_audit audit.log_session_start() audit.flush_audit_log(timeout_s=2.0) events = _read_all_events(tmp_path) assert any( e["category"] == "session" and "Session started" in e["message"] for e in events ) def test_log_session_start_is_idempotent(self, isolated_audit, tmp_path): audit = isolated_audit for _ in range(5): audit.log_session_start() audit.flush_audit_log(timeout_s=2.0) events = _read_all_events(tmp_path) starts = [e for e in events if e["category"] == "session"] assert len(starts) == 1 class TestUnwritableTargetDoesntCrash: def test_pointing_at_a_nonexistent_parent_doesnt_hang( self, monkeypatch, tmp_path, ): # Point the audit dir at a path that DOES exist as a FILE, # so any mkdir there fails. The writer should swallow the # error and the GUI-facing producer should keep working. not_a_dir = tmp_path / "iam-a-file" not_a_dir.write_text("hi") monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(not_a_dir)) from src import audit monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True) monkeypatch.setattr(audit, "_PROBE", "full") audit.reset_for_tests() try: start = time.perf_counter() audit.log_event("test", "should not crash") assert time.perf_counter() - start < 0.5 audit.flush_audit_log(timeout_s=0.5) finally: audit.reset_for_tests() class TestKillSwitchContract: """With ``_DISABLED = True`` and no env-var override, every producer is a true no-op. Pins the safety contract: the default configuration must never touch disk or start a thread.""" def test_disabled_writes_nothing(self, monkeypatch, tmp_path): monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path)) from src import audit monkeypatch.setattr(audit, "_DISABLED", True) monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", False) audit.reset_for_tests() try: audit.log_event("test", "should be a no-op") audit.log_session_start() audit.log_page_open("test_page") audit.flush_audit_log(timeout_s=0.5) assert audit._WRITER_THREAD is None, ( "Writer thread must not start when the kill switch is on." ) assert list(tmp_path.glob("datatools-*.jsonl")) == [], ( "Kill switch leaked a log file." ) finally: audit.reset_for_tests() def test_probe_no_events_drops_writes(self, monkeypatch, tmp_path): """``DATATOOLS_AUDIT_PROBE=no-events`` bypasses log_event even when the override is on. Bisect aid for Phase 2.""" monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path)) from src import audit monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True) monkeypatch.setattr(audit, "_PROBE", "no-events") audit.reset_for_tests() try: audit.log_event("test", "should be dropped") audit.flush_audit_log(timeout_s=0.5) assert list(tmp_path.glob("datatools-*.jsonl")) == [] finally: audit.reset_for_tests() class TestDailyFileRetention: """Daily filename + 7-day retention sweep on writer-thread start.""" def test_filename_is_daily_local_date(self, isolated_audit, tmp_path): from datetime import datetime audit = isolated_audit audit.log_event("test", "today") audit.flush_audit_log(timeout_s=2.0) today = datetime.now().strftime("%Y-%m-%d") expected = tmp_path / f"datatools-{today}.jsonl" assert expected.exists(), ( f"expected daily file {expected.name}, got " f"{[p.name for p in tmp_path.iterdir()]}" ) def test_sweep_deletes_files_older_than_retention( self, isolated_audit, tmp_path, ): """Files with mtime > _RETENTION_DAYS old get pruned when the writer thread starts. Files within the window survive.""" import os audit = isolated_audit stale = tmp_path / "datatools-2025-01-01.jsonl" stale.write_text('{"ts": "stale"}\n', encoding="utf-8") old_mtime = time.time() - (audit._RETENTION_DAYS + 1) * 86400 os.utime(stale, (old_mtime, old_mtime)) fresh = tmp_path / "datatools-2026-05-18.jsonl" fresh.write_text('{"ts": "fresh"}\n', encoding="utf-8") recent_mtime = time.time() - 1 * 86400 os.utime(fresh, (recent_mtime, recent_mtime)) audit.log_event("test", "kick the writer") audit.flush_audit_log(timeout_s=2.0) assert not stale.exists(), "Stale log should have been swept." assert fresh.exists(), "Fresh log must not be swept." def test_sweep_also_clears_legacy_per_session_files( self, isolated_audit, tmp_path, ): """Old ``datatools--.jsonl`` filenames must also match the sweep glob, so upgrading users don't keep a permanent backlog from before the retention switch.""" import os audit = isolated_audit legacy = tmp_path / "datatools-20250101T120000Z-abc12345.jsonl" legacy.write_text('{"ts": "legacy"}\n', encoding="utf-8") old_mtime = time.time() - (audit._RETENTION_DAYS + 1) * 86400 os.utime(legacy, (old_mtime, old_mtime)) audit.log_event("test", "kick the writer") audit.flush_audit_log(timeout_s=2.0) assert not legacy.exists(), "Legacy per-session file not swept." class TestSerializationSafety: def test_non_json_extras_get_str_coerced(self, isolated_audit, tmp_path): audit = isolated_audit class Weird: def __repr__(self): return "" audit.log_event("test", "carries a weird extra", obj=Weird()) audit.flush_audit_log(timeout_s=2.0) events = _read_all_events(tmp_path) assert len(events) == 1 assert events[0]["obj"] == ""