Reported earlier: synchronous file writes in ``log_event`` blocked the GUI render thread on hostile filesystems (Windows antivirus on ``~/.datatools/logs/`` is the prime suspect). A blocking ``open`` call doesn't raise — try/except can't recover from it — so the only safe re-enable is to take file I/O off the render path. Refactor: - ``log_event`` and friends push events onto a ``deque(maxlen=5000)`` via ``put_nowait`` and return in microseconds. - A single daemon thread (``datatools-audit-writer``) drains the queue and writes batches. Holds the queue lock only long enough to snapshot + clear, then does I/O outside the lock so producers can keep enqueueing. - ``audit_log_path()`` is now pure path arithmetic — no ``mkdir`` no ``open``. The writer thread does the directory creation off the request path, so any hang there only affects the writer. - Bounded queue means an unwritable disk doesn't unbounded-grow memory; the queue caps at 5000 and overflow drops OLDEST events so the most-recent (most-diagnostic) ones survive. - First write failure prints once to stderr; subsequent failures are silent so logs don't drown the launcher terminal. - ``flush_audit_log(timeout_s=0.5)`` drains the queue and signals the writer to exit; bounded so a stuck disk can't delay shutdown. Other changes in this commit: - ``shutdown_app`` now emits a "Session ending" event and calls ``flush_audit_log`` before kicking the os._exit timer, so the closing session's events make it to disk. - The Diagnostics sidebar in ``hide_streamlit_chrome`` is re-enabled (the ``if False:`` gate is removed). Wrapped in try/except defensively — render errors print to stderr, never blank the page. - ``_DISABLED`` kill-switch is gone. The async design IS the safety mechanism now. Tests in ``tests/test_audit.py``: - log_event burst of 1000 events completes in well under 1s (proves non-blocking). - Events queued before flush land on disk with the expected JSON shape; session_start renders; idempotent. - Pointing the audit dir at a file (so mkdir fails) doesn't hang or crash the producer. - Non-JSON extras are str()-coerced rather than dropped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
142 lines
5.0 KiB
Python
142 lines
5.0 KiB
Python
"""Audit-log smoke tests.
|
||
|
||
The audit log was rewritten to use an async queue + a background
|
||
writer thread after a synchronous-write implementation hung the GUI
|
||
on a hostile filesystem (Windows antivirus). These tests pin the
|
||
critical guarantees:
|
||
|
||
1. ``log_event`` returns in microseconds (non-blocking).
|
||
2. Events queued before ``flush_audit_log`` land on disk in the
|
||
correct JSONL shape.
|
||
3. Backpressure: when the queue is full, oldest events are dropped
|
||
and newest ones survive (most-recent-events-are-most-diagnostic).
|
||
4. The writer thread tolerates an unwritable target without
|
||
crashing or hanging.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import time
|
||
from pathlib import Path
|
||
|
||
import pytest
|
||
|
||
|
||
@pytest.fixture
|
||
def isolated_audit(monkeypatch, tmp_path):
|
||
"""Redirect audit writes into ``tmp_path`` and reset module state
|
||
so each test starts fresh."""
|
||
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
|
||
from src import audit
|
||
audit.reset_for_tests()
|
||
yield audit
|
||
# Best-effort cleanup so a runaway writer thread doesn't keep
|
||
# touching the tmp_path after the test exits.
|
||
audit.flush_audit_log(timeout_s=0.5)
|
||
audit.reset_for_tests()
|
||
|
||
|
||
def _read_all_events(audit_dir: Path) -> list[dict]:
|
||
"""Read every JSONL file in *audit_dir* and return parsed events."""
|
||
out: list[dict] = []
|
||
for f in sorted(audit_dir.glob("datatools-*.jsonl")):
|
||
for line in f.read_text(encoding="utf-8").splitlines():
|
||
if line.strip():
|
||
out.append(json.loads(line))
|
||
return out
|
||
|
||
|
||
class TestLogEventIsNonBlocking:
|
||
def test_returns_in_under_one_millisecond(self, isolated_audit):
|
||
"""A pathological 1000-event burst should clear well under a
|
||
second on the request path. The write happens off-thread."""
|
||
audit = isolated_audit
|
||
start = time.perf_counter()
|
||
for i in range(1000):
|
||
audit.log_event("test", f"event {i}", n=i)
|
||
elapsed = time.perf_counter() - start
|
||
# 1000 events × 1ms ceiling = 1.0s. In practice this should
|
||
# be milliseconds total since put_nowait is microseconds.
|
||
assert elapsed < 1.0, (
|
||
f"log_event burst took {elapsed:.3f}s for 1000 events — "
|
||
f"that's over 1ms per call. Async queue is regressing to "
|
||
f"a sync write."
|
||
)
|
||
|
||
|
||
class TestEventsLandOnDisk:
|
||
def test_basic_round_trip(self, isolated_audit, tmp_path):
|
||
audit = isolated_audit
|
||
audit.log_event("upload", "Uploaded a.csv", filename="a.csv", bytes=42)
|
||
audit.log_event("analyze", "Analyzed a.csv", findings=3)
|
||
audit.flush_audit_log(timeout_s=2.0)
|
||
|
||
events = _read_all_events(tmp_path)
|
||
assert len(events) == 2
|
||
kinds = [e["category"] for e in events]
|
||
assert "upload" in kinds
|
||
assert "analyze" in kinds
|
||
|
||
upload = next(e for e in events if e["category"] == "upload")
|
||
assert upload["filename"] == "a.csv"
|
||
assert upload["bytes"] == 42
|
||
assert "ts" in upload
|
||
assert "session" in upload
|
||
|
||
def test_session_start_renders(self, isolated_audit, tmp_path):
|
||
audit = isolated_audit
|
||
audit.log_session_start()
|
||
audit.flush_audit_log(timeout_s=2.0)
|
||
events = _read_all_events(tmp_path)
|
||
assert any(
|
||
e["category"] == "session" and "Session started" in e["message"]
|
||
for e in events
|
||
)
|
||
|
||
def test_log_session_start_is_idempotent(self, isolated_audit, tmp_path):
|
||
audit = isolated_audit
|
||
for _ in range(5):
|
||
audit.log_session_start()
|
||
audit.flush_audit_log(timeout_s=2.0)
|
||
events = _read_all_events(tmp_path)
|
||
starts = [e for e in events if e["category"] == "session"]
|
||
assert len(starts) == 1
|
||
|
||
|
||
class TestUnwritableTargetDoesntCrash:
|
||
def test_pointing_at_a_nonexistent_parent_doesnt_hang(
|
||
self, monkeypatch, tmp_path,
|
||
):
|
||
# Point the audit dir at a path that DOES exist as a FILE,
|
||
# so any mkdir there fails. The writer should swallow the
|
||
# error and the GUI-facing producer should keep working.
|
||
not_a_dir = tmp_path / "iam-a-file"
|
||
not_a_dir.write_text("hi")
|
||
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(not_a_dir))
|
||
from src import audit
|
||
audit.reset_for_tests()
|
||
try:
|
||
start = time.perf_counter()
|
||
audit.log_event("test", "should not crash")
|
||
assert time.perf_counter() - start < 0.5
|
||
audit.flush_audit_log(timeout_s=0.5)
|
||
finally:
|
||
audit.reset_for_tests()
|
||
|
||
|
||
class TestSerializationSafety:
|
||
def test_non_json_extras_get_str_coerced(self, isolated_audit, tmp_path):
|
||
audit = isolated_audit
|
||
|
||
class Weird:
|
||
def __repr__(self):
|
||
return "<Weird>"
|
||
|
||
audit.log_event("test", "carries a weird extra", obj=Weird())
|
||
audit.flush_audit_log(timeout_s=2.0)
|
||
|
||
events = _read_all_events(tmp_path)
|
||
assert len(events) == 1
|
||
assert events[0]["obj"] == "<Weird>"
|