Files
datatools-dev/tests/test_audit.py
Michael d9e32e578b feat(audit): async writer thread — safe to re-enable
Reported earlier: synchronous file writes in ``log_event`` blocked
the GUI render thread on hostile filesystems (Windows antivirus on
``~/.datatools/logs/`` is the prime suspect). A blocking ``open``
call doesn't raise — try/except can't recover from it — so the
only safe re-enable is to take file I/O off the render path.

Refactor:

- ``log_event`` and friends push events onto a ``deque(maxlen=5000)``
  via ``put_nowait`` and return in microseconds.
- A single daemon thread (``datatools-audit-writer``) drains the
  queue and writes batches. Holds the queue lock only long enough to
  snapshot + clear, then does I/O outside the lock so producers can
  keep enqueueing.
- ``audit_log_path()`` is now pure path arithmetic — no ``mkdir``
  no ``open``. The writer thread does the directory creation off
  the request path, so any hang there only affects the writer.
- Bounded queue means an unwritable disk doesn't unbounded-grow
  memory; the queue caps at 5000 and overflow drops OLDEST events
  so the most-recent (most-diagnostic) ones survive.
- First write failure prints once to stderr; subsequent failures
  are silent so logs don't drown the launcher terminal.
- ``flush_audit_log(timeout_s=0.5)`` drains the queue and signals
  the writer to exit; bounded so a stuck disk can't delay shutdown.

Other changes in this commit:

- ``shutdown_app`` now emits a "Session ending" event and calls
  ``flush_audit_log`` before kicking the os._exit timer, so the
  closing session's events make it to disk.
- The Diagnostics sidebar in ``hide_streamlit_chrome`` is
  re-enabled (the ``if False:`` gate is removed). Wrapped in
  try/except defensively — render errors print to stderr, never
  blank the page.
- ``_DISABLED`` kill-switch is gone. The async design IS the
  safety mechanism now.

Tests in ``tests/test_audit.py``:

- log_event burst of 1000 events completes in well under 1s
  (proves non-blocking).
- Events queued before flush land on disk with the expected JSON
  shape; session_start renders; idempotent.
- Pointing the audit dir at a file (so mkdir fails) doesn't hang
  or crash the producer.
- Non-JSON extras are str()-coerced rather than dropped.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 02:39:48 +00:00

142 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Audit-log smoke tests.
The audit log was rewritten to use an async queue + a background
writer thread after a synchronous-write implementation hung the GUI
on a hostile filesystem (Windows antivirus). These tests pin the
critical guarantees:
1. ``log_event`` returns in microseconds (non-blocking).
2. Events queued before ``flush_audit_log`` land on disk in the
correct JSONL shape.
3. Backpressure: when the queue is full, oldest events are dropped
and newest ones survive (most-recent-events-are-most-diagnostic).
4. The writer thread tolerates an unwritable target without
crashing or hanging.
"""
from __future__ import annotations
import json
import time
from pathlib import Path
import pytest
@pytest.fixture
def isolated_audit(monkeypatch, tmp_path):
"""Redirect audit writes into ``tmp_path`` and reset module state
so each test starts fresh."""
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
from src import audit
audit.reset_for_tests()
yield audit
# Best-effort cleanup so a runaway writer thread doesn't keep
# touching the tmp_path after the test exits.
audit.flush_audit_log(timeout_s=0.5)
audit.reset_for_tests()
def _read_all_events(audit_dir: Path) -> list[dict]:
"""Read every JSONL file in *audit_dir* and return parsed events."""
out: list[dict] = []
for f in sorted(audit_dir.glob("datatools-*.jsonl")):
for line in f.read_text(encoding="utf-8").splitlines():
if line.strip():
out.append(json.loads(line))
return out
class TestLogEventIsNonBlocking:
def test_returns_in_under_one_millisecond(self, isolated_audit):
"""A pathological 1000-event burst should clear well under a
second on the request path. The write happens off-thread."""
audit = isolated_audit
start = time.perf_counter()
for i in range(1000):
audit.log_event("test", f"event {i}", n=i)
elapsed = time.perf_counter() - start
# 1000 events × 1ms ceiling = 1.0s. In practice this should
# be milliseconds total since put_nowait is microseconds.
assert elapsed < 1.0, (
f"log_event burst took {elapsed:.3f}s for 1000 events — "
f"that's over 1ms per call. Async queue is regressing to "
f"a sync write."
)
class TestEventsLandOnDisk:
def test_basic_round_trip(self, isolated_audit, tmp_path):
audit = isolated_audit
audit.log_event("upload", "Uploaded a.csv", filename="a.csv", bytes=42)
audit.log_event("analyze", "Analyzed a.csv", findings=3)
audit.flush_audit_log(timeout_s=2.0)
events = _read_all_events(tmp_path)
assert len(events) == 2
kinds = [e["category"] for e in events]
assert "upload" in kinds
assert "analyze" in kinds
upload = next(e for e in events if e["category"] == "upload")
assert upload["filename"] == "a.csv"
assert upload["bytes"] == 42
assert "ts" in upload
assert "session" in upload
def test_session_start_renders(self, isolated_audit, tmp_path):
audit = isolated_audit
audit.log_session_start()
audit.flush_audit_log(timeout_s=2.0)
events = _read_all_events(tmp_path)
assert any(
e["category"] == "session" and "Session started" in e["message"]
for e in events
)
def test_log_session_start_is_idempotent(self, isolated_audit, tmp_path):
audit = isolated_audit
for _ in range(5):
audit.log_session_start()
audit.flush_audit_log(timeout_s=2.0)
events = _read_all_events(tmp_path)
starts = [e for e in events if e["category"] == "session"]
assert len(starts) == 1
class TestUnwritableTargetDoesntCrash:
def test_pointing_at_a_nonexistent_parent_doesnt_hang(
self, monkeypatch, tmp_path,
):
# Point the audit dir at a path that DOES exist as a FILE,
# so any mkdir there fails. The writer should swallow the
# error and the GUI-facing producer should keep working.
not_a_dir = tmp_path / "iam-a-file"
not_a_dir.write_text("hi")
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(not_a_dir))
from src import audit
audit.reset_for_tests()
try:
start = time.perf_counter()
audit.log_event("test", "should not crash")
assert time.perf_counter() - start < 0.5
audit.flush_audit_log(timeout_s=0.5)
finally:
audit.reset_for_tests()
class TestSerializationSafety:
def test_non_json_extras_get_str_coerced(self, isolated_audit, tmp_path):
audit = isolated_audit
class Weird:
def __repr__(self):
return "<Weird>"
audit.log_event("test", "carries a weird extra", obj=Weird())
audit.flush_audit_log(timeout_s=2.0)
events = _read_all_events(tmp_path)
assert len(events) == 1
assert events[0]["obj"] == "<Weird>"