Replaces the per-session ``datatools-<ts>-<sid>.jsonl`` filename with a single daily file ``datatools-YYYY-MM-DD.jsonl`` (local date). Sessions on the same calendar day share a file via the writer thread's per-batch open+append; multiple DataTools instances running concurrently on the same day fan into the same file (append-mode small writes are atomic on POSIX, safe-enough on Windows under realistic load). Drops the ``_LOG_PATH`` module global and the lock around it — ``audit_log_path()`` is now pure date math, recomputed on every call so a session that crosses midnight follows the rollover into the next day's file. Adds ``_sweep_old_logs()`` invoked once per process at writer- thread start. Deletes any ``datatools-*.jsonl`` whose mtime is older than 7 days. The glob deliberately matches the legacy per-session filename too, so users upgrading from the previous build don't keep a permanent backlog of pre-retention files. Event ``ts`` fields stay UTC; only the filename uses local date, because users go looking for "today's log" on their wall clock. Tests cover: daily filename shape, sweep removes stale files, sweep keeps fresh files, sweep also clears legacy filenames. Rollback: ``git revert HEAD`` restores the per-session filename and removes the sweep. No data migration needed either way — existing files keep working as JSONL. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
248 lines
9.4 KiB
Python
248 lines
9.4 KiB
Python
"""Audit-log smoke tests.
|
||
|
||
The audit log was rewritten to use an async queue + a background
|
||
writer thread after a synchronous-write implementation hung the GUI
|
||
on a hostile filesystem (Windows antivirus). These tests pin the
|
||
critical guarantees:
|
||
|
||
1. ``log_event`` returns in microseconds (non-blocking).
|
||
2. Events queued before ``flush_audit_log`` land on disk in the
|
||
correct JSONL shape.
|
||
3. Backpressure: when the queue is full, oldest events are dropped
|
||
and newest ones survive (most-recent-events-are-most-diagnostic).
|
||
4. The writer thread tolerates an unwritable target without
|
||
crashing or hanging.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import time
|
||
from pathlib import Path
|
||
|
||
import pytest
|
||
|
||
|
||
@pytest.fixture
|
||
def isolated_audit(monkeypatch, tmp_path):
|
||
"""Redirect audit writes into ``tmp_path`` and reset module state
|
||
so each test starts fresh.
|
||
|
||
The kill switch (``_DISABLED``) is bypassed for the duration of
|
||
the test by patching ``_ENABLE_OVERRIDE`` directly — env vars
|
||
won't reach the module-level constant after import."""
|
||
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
|
||
from src import audit
|
||
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True)
|
||
monkeypatch.setattr(audit, "_PROBE", "full")
|
||
audit.reset_for_tests()
|
||
yield audit
|
||
# Best-effort cleanup so a runaway writer thread doesn't keep
|
||
# touching the tmp_path after the test exits.
|
||
audit.flush_audit_log(timeout_s=0.5)
|
||
audit.reset_for_tests()
|
||
|
||
|
||
def _read_all_events(audit_dir: Path) -> list[dict]:
|
||
"""Read every JSONL file in *audit_dir* and return parsed events."""
|
||
out: list[dict] = []
|
||
for f in sorted(audit_dir.glob("datatools-*.jsonl")):
|
||
for line in f.read_text(encoding="utf-8").splitlines():
|
||
if line.strip():
|
||
out.append(json.loads(line))
|
||
return out
|
||
|
||
|
||
class TestLogEventIsNonBlocking:
|
||
def test_returns_in_under_one_millisecond(self, isolated_audit):
|
||
"""A pathological 1000-event burst should clear well under a
|
||
second on the request path. The write happens off-thread."""
|
||
audit = isolated_audit
|
||
start = time.perf_counter()
|
||
for i in range(1000):
|
||
audit.log_event("test", f"event {i}", n=i)
|
||
elapsed = time.perf_counter() - start
|
||
# 1000 events × 1ms ceiling = 1.0s. In practice this should
|
||
# be milliseconds total since put_nowait is microseconds.
|
||
assert elapsed < 1.0, (
|
||
f"log_event burst took {elapsed:.3f}s for 1000 events — "
|
||
f"that's over 1ms per call. Async queue is regressing to "
|
||
f"a sync write."
|
||
)
|
||
|
||
|
||
class TestEventsLandOnDisk:
|
||
def test_basic_round_trip(self, isolated_audit, tmp_path):
|
||
audit = isolated_audit
|
||
audit.log_event("upload", "Uploaded a.csv", filename="a.csv", bytes=42)
|
||
audit.log_event("analyze", "Analyzed a.csv", findings=3)
|
||
audit.flush_audit_log(timeout_s=2.0)
|
||
|
||
events = _read_all_events(tmp_path)
|
||
assert len(events) == 2
|
||
kinds = [e["category"] for e in events]
|
||
assert "upload" in kinds
|
||
assert "analyze" in kinds
|
||
|
||
upload = next(e for e in events if e["category"] == "upload")
|
||
assert upload["filename"] == "a.csv"
|
||
assert upload["bytes"] == 42
|
||
assert "ts" in upload
|
||
assert "session" in upload
|
||
|
||
def test_session_start_renders(self, isolated_audit, tmp_path):
|
||
audit = isolated_audit
|
||
audit.log_session_start()
|
||
audit.flush_audit_log(timeout_s=2.0)
|
||
events = _read_all_events(tmp_path)
|
||
assert any(
|
||
e["category"] == "session" and "Session started" in e["message"]
|
||
for e in events
|
||
)
|
||
|
||
def test_log_session_start_is_idempotent(self, isolated_audit, tmp_path):
|
||
audit = isolated_audit
|
||
for _ in range(5):
|
||
audit.log_session_start()
|
||
audit.flush_audit_log(timeout_s=2.0)
|
||
events = _read_all_events(tmp_path)
|
||
starts = [e for e in events if e["category"] == "session"]
|
||
assert len(starts) == 1
|
||
|
||
|
||
class TestUnwritableTargetDoesntCrash:
|
||
def test_pointing_at_a_nonexistent_parent_doesnt_hang(
|
||
self, monkeypatch, tmp_path,
|
||
):
|
||
# Point the audit dir at a path that DOES exist as a FILE,
|
||
# so any mkdir there fails. The writer should swallow the
|
||
# error and the GUI-facing producer should keep working.
|
||
not_a_dir = tmp_path / "iam-a-file"
|
||
not_a_dir.write_text("hi")
|
||
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(not_a_dir))
|
||
from src import audit
|
||
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True)
|
||
monkeypatch.setattr(audit, "_PROBE", "full")
|
||
audit.reset_for_tests()
|
||
try:
|
||
start = time.perf_counter()
|
||
audit.log_event("test", "should not crash")
|
||
assert time.perf_counter() - start < 0.5
|
||
audit.flush_audit_log(timeout_s=0.5)
|
||
finally:
|
||
audit.reset_for_tests()
|
||
|
||
|
||
class TestKillSwitchContract:
|
||
"""With ``_DISABLED = True`` and no env-var override, every
|
||
producer is a true no-op. Pins the safety contract: the default
|
||
configuration must never touch disk or start a thread."""
|
||
|
||
def test_disabled_writes_nothing(self, monkeypatch, tmp_path):
|
||
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
|
||
from src import audit
|
||
monkeypatch.setattr(audit, "_DISABLED", True)
|
||
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", False)
|
||
audit.reset_for_tests()
|
||
try:
|
||
audit.log_event("test", "should be a no-op")
|
||
audit.log_session_start()
|
||
audit.log_page_open("test_page")
|
||
audit.flush_audit_log(timeout_s=0.5)
|
||
assert audit._WRITER_THREAD is None, (
|
||
"Writer thread must not start when the kill switch is on."
|
||
)
|
||
assert list(tmp_path.glob("datatools-*.jsonl")) == [], (
|
||
"Kill switch leaked a log file."
|
||
)
|
||
finally:
|
||
audit.reset_for_tests()
|
||
|
||
def test_probe_no_events_drops_writes(self, monkeypatch, tmp_path):
|
||
"""``DATATOOLS_AUDIT_PROBE=no-events`` bypasses log_event even
|
||
when the override is on. Bisect aid for Phase 2."""
|
||
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
|
||
from src import audit
|
||
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True)
|
||
monkeypatch.setattr(audit, "_PROBE", "no-events")
|
||
audit.reset_for_tests()
|
||
try:
|
||
audit.log_event("test", "should be dropped")
|
||
audit.flush_audit_log(timeout_s=0.5)
|
||
assert list(tmp_path.glob("datatools-*.jsonl")) == []
|
||
finally:
|
||
audit.reset_for_tests()
|
||
|
||
|
||
class TestDailyFileRetention:
|
||
"""Daily filename + 7-day retention sweep on writer-thread start."""
|
||
|
||
def test_filename_is_daily_local_date(self, isolated_audit, tmp_path):
|
||
from datetime import datetime
|
||
audit = isolated_audit
|
||
audit.log_event("test", "today")
|
||
audit.flush_audit_log(timeout_s=2.0)
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
expected = tmp_path / f"datatools-{today}.jsonl"
|
||
assert expected.exists(), (
|
||
f"expected daily file {expected.name}, got "
|
||
f"{[p.name for p in tmp_path.iterdir()]}"
|
||
)
|
||
|
||
def test_sweep_deletes_files_older_than_retention(
|
||
self, isolated_audit, tmp_path,
|
||
):
|
||
"""Files with mtime > _RETENTION_DAYS old get pruned when the
|
||
writer thread starts. Files within the window survive."""
|
||
import os
|
||
audit = isolated_audit
|
||
stale = tmp_path / "datatools-2025-01-01.jsonl"
|
||
stale.write_text('{"ts": "stale"}\n', encoding="utf-8")
|
||
old_mtime = time.time() - (audit._RETENTION_DAYS + 1) * 86400
|
||
os.utime(stale, (old_mtime, old_mtime))
|
||
|
||
fresh = tmp_path / "datatools-2026-05-18.jsonl"
|
||
fresh.write_text('{"ts": "fresh"}\n', encoding="utf-8")
|
||
recent_mtime = time.time() - 1 * 86400
|
||
os.utime(fresh, (recent_mtime, recent_mtime))
|
||
|
||
audit.log_event("test", "kick the writer")
|
||
audit.flush_audit_log(timeout_s=2.0)
|
||
|
||
assert not stale.exists(), "Stale log should have been swept."
|
||
assert fresh.exists(), "Fresh log must not be swept."
|
||
|
||
def test_sweep_also_clears_legacy_per_session_files(
|
||
self, isolated_audit, tmp_path,
|
||
):
|
||
"""Old ``datatools-<ts>-<sid>.jsonl`` filenames must also match
|
||
the sweep glob, so upgrading users don't keep a permanent
|
||
backlog from before the retention switch."""
|
||
import os
|
||
audit = isolated_audit
|
||
legacy = tmp_path / "datatools-20250101T120000Z-abc12345.jsonl"
|
||
legacy.write_text('{"ts": "legacy"}\n', encoding="utf-8")
|
||
old_mtime = time.time() - (audit._RETENTION_DAYS + 1) * 86400
|
||
os.utime(legacy, (old_mtime, old_mtime))
|
||
|
||
audit.log_event("test", "kick the writer")
|
||
audit.flush_audit_log(timeout_s=2.0)
|
||
|
||
assert not legacy.exists(), "Legacy per-session file not swept."
|
||
|
||
|
||
class TestSerializationSafety:
|
||
def test_non_json_extras_get_str_coerced(self, isolated_audit, tmp_path):
|
||
audit = isolated_audit
|
||
|
||
class Weird:
|
||
def __repr__(self):
|
||
return "<Weird>"
|
||
|
||
audit.log_event("test", "carries a weird extra", obj=Weird())
|
||
audit.flush_audit_log(timeout_s=2.0)
|
||
|
||
events = _read_all_events(tmp_path)
|
||
assert len(events) == 1
|
||
assert events[0]["obj"] == "<Weird>"
|