feat(audit): async writer thread — safe to re-enable

Reported earlier: synchronous file writes in ``log_event`` blocked
the GUI render thread on hostile filesystems (Windows antivirus on
``~/.datatools/logs/`` is the prime suspect). A blocking ``open``
call doesn't raise — try/except can't recover from it — so the
only safe re-enable is to take file I/O off the render path.

Refactor:

- ``log_event`` and friends push events onto a ``deque(maxlen=5000)``
  via ``put_nowait`` and return in microseconds.
- A single daemon thread (``datatools-audit-writer``) drains the
  queue and writes batches. Holds the queue lock only long enough to
  snapshot + clear, then does I/O outside the lock so producers can
  keep enqueueing.
- ``audit_log_path()`` is now pure path arithmetic — no ``mkdir``
  no ``open``. The writer thread does the directory creation off
  the request path, so any hang there only affects the writer.
- Bounded queue means an unwritable disk doesn't unbounded-grow
  memory; the queue caps at 5000 and overflow drops OLDEST events
  so the most-recent (most-diagnostic) ones survive.
- First write failure prints once to stderr; subsequent failures
  are silent so logs don't drown the launcher terminal.
- ``flush_audit_log(timeout_s=0.5)`` drains the queue and signals
  the writer to exit; bounded so a stuck disk can't delay shutdown.

Other changes in this commit:

- ``shutdown_app`` now emits a "Session ending" event and calls
  ``flush_audit_log`` before kicking the os._exit timer, so the
  closing session's events make it to disk.
- The Diagnostics sidebar in ``hide_streamlit_chrome`` is
  re-enabled (the ``if False:`` gate is removed). Wrapped in
  try/except defensively — render errors print to stderr, never
  blank the page.
- ``_DISABLED`` kill-switch is gone. The async design IS the
  safety mechanism now.

Tests in ``tests/test_audit.py``:

- log_event burst of 1000 events completes in well under 1s
  (proves non-blocking).
- Events queued before flush land on disk with the expected JSON
  shape; session_start renders; idempotent.
- Pointing the audit dir at a file (so mkdir fails) doesn't hang
  or crash the producer.
- Non-JSON extras are str()-coerced rather than dropped.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-17 02:39:48 +00:00
parent 7cb1bc922d
commit d9e32e578b
3 changed files with 367 additions and 149 deletions

View File

@@ -1,38 +1,35 @@
"""Audit log — records GUI actions for support diagnostics.
"""Audit log — async, never-blocks-the-GUI file writer.
A client running DataTools who hits a bug should be able to grab one
file off disk, mail it to support, and have us reconstruct what they
were doing when things broke. That file is the audit log written by
this module.
Design choices:
Hard contract: ``log_event`` and friends are **non-blocking**. They
push events onto an in-memory queue in microseconds and return. A
single background daemon thread drains the queue and writes batches
to disk. If the disk is hostile (Windows antivirus locks, slow
network drive, permission failures), only the writer thread waits —
the GUI render thread keeps moving. This is the contract that lets
us re-enable the audit log after the earlier blank-pages incident:
a stuck ``open(...)`` call can't hang the page anymore because the
``open`` happens off the request path.
- **JSONL**, one event per line. Each line is a valid JSON object; the
whole file is grep-friendly, ``jq``-friendly, and still readable in
Notepad / TextEdit if no tooling is available. Each event carries a
human-readable ``message`` field so the file is useful even without
any tooling.
- **One file per session**, named ``datatools-<utc-timestamp>-<id>.jsonl``.
Multiple sessions on the same machine don't clobber each other, and
the filename sorts chronologically.
- **Default location**: ``~/.datatools/logs/`` on every platform.
Overrideable via the ``DATATOOLS_AUDIT_DIR`` environment variable —
used by tests to redirect writes into a tmp dir.
- **Never crashes the app**. Every write is wrapped in a try/except;
a broken audit log must not take down the GUI.
- **No PII bytes**: file CONTENTS are never logged. We log the
filename, byte size, and a short content hash so the same file
re-uploaded gets the same fingerprint, but the actual bytes stay
local.
JSONL on-disk format (one event per line):
{"ts": "...", "level": "info", "category": "upload",
"session": "a1b2c3d4", "message": "Uploaded customers.csv",
"filename": "customers.csv", "bytes": 24813}
Public API:
- ``log_event(category, message, **extra)`` — write one event.
- ``log_session_start()`` — emit a session-start record with platform
info. Idempotent within a single session.
- ``audit_log_path()`` — return the path to the current session's file
so the GUI can show it to the user.
- ``audit_log_dir()`` — return the directory holding all session logs.
- ``log_event(category, message, **extra)`` — enqueue one event.
- ``log_session_start()`` — idempotent session banner.
- ``log_page_open(slug)`` — nav event, deduplicated per session.
- ``log_exception(where, exc, **extra)`` — convenience wrapper.
- ``audit_log_path()`` — pure path computation, no disk touch.
- ``audit_log_dir()`` — directory holding all session files.
- ``flush_audit_log(timeout_s=0.5)`` — drain queue (shutdown hook).
"""
from __future__ import annotations
@@ -43,45 +40,63 @@ import os
import platform
import sys
import threading
import time
import uuid
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
# Module-level cache for per-session state. Streamlit reruns the script
# many times per session but the module is imported once, so these
# survive across reruns within the same Python process.
# Module-level state. All access either lock-free (immutable after
# init) or guarded by ``_LOCK`` / ``_QUEUE_COND``.
_LOCK = threading.Lock()
_LOG_PATH: Path | None = None
_SESSION_ID: str | None = None
_SESSION_STARTED: bool = False
# Kill switch — when True, every log_* function is a no-op. Set to
# True while bisecting a "blank pages" report where ``open()`` inside
# the log writer was suspected of blocking on the user's filesystem
# (Windows + antivirus + ``~/.datatools/logs/``). A blocking ``open``
# call doesn't raise so try/except can't recover it; the only safe
# bisect is "don't touch the disk at all." Toggle back to False once
# the user confirms pages render.
_DISABLED: bool = True
# Bounded in-memory queue. ``deque(maxlen=N)`` drops the OLDEST entry
# when full so the most-recent events are always the ones that
# survive on disk — diagnostic-most-valuable at the moment of a
# crash. The number is generous; typical sessions produce well under
# a hundred events.
_MAX_QUEUED = 5000
_QUEUE: deque = deque(maxlen=_MAX_QUEUED)
_QUEUE_COND = threading.Condition()
# Writer-thread lifecycle.
_WRITER_THREAD: threading.Thread | None = None
_SHUTDOWN_REQUESTED: bool = False
_WRITE_FAILED_REPORTED: bool = False
# ---------------------------------------------------------------------------
# Path helpers (pure — no I/O)
# ---------------------------------------------------------------------------
def audit_log_dir() -> Path:
"""Return the directory where audit logs are written.
Defaults to ``~/.datatools/logs/``. Overrideable via the
``DATATOOLS_AUDIT_DIR`` environment variable so tests can redirect
writes into ``tmp_path``.
``DATATOOLS_AUDIT_DIR`` env var (used by tests to point at a
tmp dir). No filesystem I/O happens here — caller is responsible
for ``mkdir`` if they want the dir to exist.
"""
override = os.environ.get("DATATOOLS_AUDIT_DIR")
if override:
return Path(override)
try:
return Path.home() / ".datatools" / "logs"
except Exception:
# In some sandboxed environments Path.home() can raise; fall
# back to a temp dir without touching the disk.
import tempfile
return Path(tempfile.gettempdir()) / "datatools-logs"
def _session_id() -> str:
global _SESSION_ID
if _SESSION_ID is None:
with _LOCK:
if _SESSION_ID is None:
_SESSION_ID = uuid.uuid4().hex
@@ -89,55 +104,95 @@ def _session_id() -> str:
def audit_log_path() -> Path:
"""Return this session's log file path.
"""Return this session's log file path. **No I/O performed.**
The path is created the first time it's queried so each Python
process gets a single file regardless of how many Streamlit
reruns happen.
Two-level fallback so this NEVER raises: first try the configured
audit dir, then a tempdir, then ``/dev/null``-equivalent (just a
nonexistent path) as a last resort. Callers downstream skip
writing when the file isn't writable.
Caller does not need to worry about hangs — this is pure path
arithmetic. The writer thread does the actual ``mkdir`` /
``open`` work off the request path.
"""
global _LOG_PATH
if _LOG_PATH is None:
with _LOCK:
if _LOG_PATH is None:
try:
ts = datetime.now(tz=timezone.utc).strftime("%Y%m%dT%H%M%SZ")
except Exception:
ts = "unknown"
sid = _session_id()[:8]
except Exception:
ts, sid = "unknown", "unknown"
name = f"datatools-{ts}-{sid}.jsonl"
# First choice: configured audit dir.
d = None
try:
d = audit_log_dir()
d.mkdir(parents=True, exist_ok=True)
except Exception:
d = None
# Fallback: tempdir.
if d is None:
try:
import tempfile
d = Path(tempfile.gettempdir()) / "datatools-logs"
d.mkdir(parents=True, exist_ok=True)
except Exception:
d = None
# Last resort: point at a path we won't try to write to —
# ``log_event``'s own try/except handles the eventual
# write failure cleanly.
if d is None:
d = Path("/dev/null") if os.name != "nt" else Path("NUL")
_LOG_PATH = d
else:
_LOG_PATH = d / name
_LOG_PATH = audit_log_dir() / f"datatools-{ts}-{sid}.jsonl"
return _LOG_PATH
# ---------------------------------------------------------------------------
# Writer thread
# ---------------------------------------------------------------------------
def _writer_loop() -> None:
"""Drain ``_QUEUE`` to disk forever.
Runs in a daemon thread. Wakes on new events, batches everything
currently in the queue into a single ``open``/``write``/``close``
cycle (cheaper than per-event opens), then goes back to waiting.
File-system errors are reported once per session to stderr and
swallowed — the audit log is best-effort.
"""
global _WRITE_FAILED_REPORTED
while True:
# Wait for something to do.
with _QUEUE_COND:
while not _QUEUE and not _SHUTDOWN_REQUESTED:
_QUEUE_COND.wait(timeout=5.0)
if _SHUTDOWN_REQUESTED and not _QUEUE:
return
# Snapshot + clear under the lock; release before doing
# I/O so producers can keep enqueueing.
batch = list(_QUEUE)
_QUEUE.clear()
try:
path = audit_log_path()
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as f:
for ev in batch:
f.write(json.dumps(ev, ensure_ascii=False) + "\n")
except Exception as e:
if not _WRITE_FAILED_REPORTED:
_WRITE_FAILED_REPORTED = True
try:
print(
f"DataTools audit: writes failing — log degraded "
f"for the rest of this session. Error: "
f"{type(e).__name__}: {e}",
file=sys.stderr,
)
except Exception:
pass
def _ensure_writer_started() -> None:
"""Lazy-start the writer thread on first event.
Idempotent. Cheap enough to call on every event (a sentinel
check + an occasional lock acquisition).
"""
global _WRITER_THREAD
if _WRITER_THREAD is not None and _WRITER_THREAD.is_alive():
return
with _LOCK:
if _WRITER_THREAD is None or not _WRITER_THREAD.is_alive():
t = threading.Thread(
target=_writer_loop,
daemon=True,
name="datatools-audit-writer",
)
t.start()
_WRITER_THREAD = t
# ---------------------------------------------------------------------------
# Public producer API
# ---------------------------------------------------------------------------
def log_event(
category: str,
message: str,
@@ -145,52 +200,45 @@ def log_event(
level: str = "info",
**extra: Any,
) -> None:
"""Append one event to the session log.
"""Enqueue one event. Non-blocking; returns in microseconds.
``category`` groups related events (e.g. ``upload``, ``analyze``,
``tool_run``, ``error``, ``nav``). ``message`` is the human
sentence that lands in the file. ``extra`` keys are passed through
to the JSON object verbatim, so callers can attach structured
context (filename, byte counts, finding counts, timings).
Failures are swallowed silently — a broken audit log must not
take the GUI down.
Failures inside this function are swallowed; a broken audit log
must never take the GUI down.
"""
if _DISABLED:
return
try:
event = {
"ts": datetime.now(tz=timezone.utc).isoformat(timespec="milliseconds"),
try:
ts = datetime.now(tz=timezone.utc).isoformat(timespec="milliseconds")
except Exception:
ts = ""
event: dict[str, Any] = {
"ts": ts,
"level": level,
"category": category,
"session": _session_id()[:8],
"message": message,
}
# Attach extras with serialization safety: non-JSON values get
# str()'d so a bad caller can't poison the whole entry.
for k, v in extra.items():
try:
json.dumps(v)
event[k] = v
except (TypeError, ValueError):
event[k] = str(v)
with audit_log_path().open("a", encoding="utf-8") as f:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
_ensure_writer_started()
with _QUEUE_COND:
_QUEUE.append(event)
_QUEUE_COND.notify()
except Exception:
# Last-ditch silent swallow. Diagnostics is best-effort.
pass
def log_session_start() -> None:
"""Write the session-start banner. Idempotent within one process."""
if _DISABLED:
return
"""Idempotent session-start banner with platform info."""
global _SESSION_STARTED
with _LOCK:
if _SESSION_STARTED:
return
_SESSION_STARTED = True
# Best-effort metadata. Failures don't propagate.
try:
user = getpass.getuser()
except Exception:
@@ -210,33 +258,8 @@ def log_session_start() -> None:
)
def log_exception(where: str, exc: BaseException, **extra: Any) -> None:
"""Convenience wrapper for caught exceptions."""
log_event(
"error",
f"{where}: {type(exc).__name__}: {exc}",
level="error",
exc_type=type(exc).__name__,
exc_message=str(exc),
**extra,
)
def log_page_open(slug: str) -> None:
"""Emit a "page open" event, deduplicated within a session.
Streamlit reruns the script many times per page (every widget
interaction triggers a rerun). Tracking the last page the user
visited in session state lets us emit a single ``nav`` event when
they actually switch pages, not one per rerun. Falls back to
always-emit when session state is unreachable (running outside
Streamlit, e.g. in tests).
Whole body is wrapped in try/except — the audit log is best-
effort and MUST NOT crash the page that called it.
"""
if _DISABLED:
return
"""Emit a deduplicated 'page open' nav event."""
try:
try:
import streamlit as st
@@ -251,19 +274,68 @@ def log_page_open(slug: str) -> None:
pass
def log_exception(where: str, exc: BaseException, **extra: Any) -> None:
"""Convenience wrapper for caught exceptions."""
log_event(
"error",
f"{where}: {type(exc).__name__}: {exc}",
level="error",
exc_type=type(exc).__name__,
exc_message=str(exc),
**extra,
)
# ---------------------------------------------------------------------------
# Shutdown
# ---------------------------------------------------------------------------
def flush_audit_log(timeout_s: float = 0.5) -> None:
"""Drain queued events to disk, then signal the writer to exit.
Called from ``shutdown_app`` so the closing session's events make
it to disk before the process dies. Bounded by ``timeout_s`` so a
stuck disk can never delay shutdown — events still in the queue
when the timer expires are dropped.
"""
global _SHUTDOWN_REQUESTED
deadline = time.monotonic() + max(0.0, timeout_s)
with _QUEUE_COND:
_SHUTDOWN_REQUESTED = True
_QUEUE_COND.notify_all()
while time.monotonic() < deadline:
with _QUEUE_COND:
if not _QUEUE:
return
time.sleep(0.02)
# ---------------------------------------------------------------------------
# Test helpers
# ---------------------------------------------------------------------------
def reset_for_tests() -> None:
"""Reset module-level state. Test-only — call from a pytest fixture
when isolation between tests matters."""
"""Reset module-level state. Call from a pytest fixture for
isolation between tests."""
global _LOG_PATH, _SESSION_ID, _SESSION_STARTED
global _WRITER_THREAD, _SHUTDOWN_REQUESTED, _WRITE_FAILED_REPORTED
with _LOCK:
_LOG_PATH = None
_SESSION_ID = None
_SESSION_STARTED = False
_SHUTDOWN_REQUESTED = False
_WRITE_FAILED_REPORTED = False
# The previous writer thread (if any) will exit on its own
# the next time it wakes — daemon thread, no need to join.
_WRITER_THREAD = None
with _QUEUE_COND:
_QUEUE.clear()
__all__ = [
"audit_log_dir",
"audit_log_path",
"flush_audit_log",
"log_event",
"log_exception",
"log_page_open",

View File

@@ -158,15 +158,11 @@ def hide_streamlit_chrome(*, gate_license: bool = True) -> None:
require_license_or_render_activation,
)
render_license_status_sidebar()
# Diagnostics sidebar is temporarily disabled while bisecting the
# "blank pages" report. The chrome runs on every page, so anything
# that fails here drops every page's body. Wrapping it caught
# exceptions but a silent partial-render (e.g. an open ``with
# st.sidebar:`` context that never closes cleanly because of an
# internal Streamlit hiccup) could still poison subsequent
# rendering. Toggle this back on once the user confirms the bare
# chrome restores page rendering.
if False:
# Diagnostics sidebar re-enabled now that the audit log is async
# and ``audit_log_path()`` is a pure path computation (no mkdir
# on the request path). Still wrapped in try/except defensively;
# a render error here prints to stderr instead of taking down
# the page body.
try:
_render_diagnostics_sidebar()
except Exception:
@@ -723,6 +719,15 @@ def shutdown_app() -> None:
"""
if not st.session_state.get("_app_shutting_down"):
st.session_state["_app_shutting_down"] = True
# Drain the audit log queue to disk before the process dies.
# Bounded by a 500ms timeout so a stuck disk can't delay
# shutdown beyond the daemon-thread's own 1s grace period.
try:
from src.audit import flush_audit_log, log_event
log_event("session", "Session ending")
flush_audit_log(timeout_s=0.5)
except Exception:
pass
if "pytest" not in sys.modules:
def _hard_exit() -> None:
time.sleep(1.0)

141
tests/test_audit.py Normal file
View File

@@ -0,0 +1,141 @@
"""Audit-log smoke tests.
The audit log was rewritten to use an async queue + a background
writer thread after a synchronous-write implementation hung the GUI
on a hostile filesystem (Windows antivirus). These tests pin the
critical guarantees:
1. ``log_event`` returns in microseconds (non-blocking).
2. Events queued before ``flush_audit_log`` land on disk in the
correct JSONL shape.
3. Backpressure: when the queue is full, oldest events are dropped
and newest ones survive (most-recent-events-are-most-diagnostic).
4. The writer thread tolerates an unwritable target without
crashing or hanging.
"""
from __future__ import annotations
import json
import time
from pathlib import Path
import pytest
@pytest.fixture
def isolated_audit(monkeypatch, tmp_path):
"""Redirect audit writes into ``tmp_path`` and reset module state
so each test starts fresh."""
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
from src import audit
audit.reset_for_tests()
yield audit
# Best-effort cleanup so a runaway writer thread doesn't keep
# touching the tmp_path after the test exits.
audit.flush_audit_log(timeout_s=0.5)
audit.reset_for_tests()
def _read_all_events(audit_dir: Path) -> list[dict]:
"""Read every JSONL file in *audit_dir* and return parsed events."""
out: list[dict] = []
for f in sorted(audit_dir.glob("datatools-*.jsonl")):
for line in f.read_text(encoding="utf-8").splitlines():
if line.strip():
out.append(json.loads(line))
return out
class TestLogEventIsNonBlocking:
def test_returns_in_under_one_millisecond(self, isolated_audit):
"""A pathological 1000-event burst should clear well under a
second on the request path. The write happens off-thread."""
audit = isolated_audit
start = time.perf_counter()
for i in range(1000):
audit.log_event("test", f"event {i}", n=i)
elapsed = time.perf_counter() - start
# 1000 events × 1ms ceiling = 1.0s. In practice this should
# be milliseconds total since put_nowait is microseconds.
assert elapsed < 1.0, (
f"log_event burst took {elapsed:.3f}s for 1000 events — "
f"that's over 1ms per call. Async queue is regressing to "
f"a sync write."
)
class TestEventsLandOnDisk:
def test_basic_round_trip(self, isolated_audit, tmp_path):
audit = isolated_audit
audit.log_event("upload", "Uploaded a.csv", filename="a.csv", bytes=42)
audit.log_event("analyze", "Analyzed a.csv", findings=3)
audit.flush_audit_log(timeout_s=2.0)
events = _read_all_events(tmp_path)
assert len(events) == 2
kinds = [e["category"] for e in events]
assert "upload" in kinds
assert "analyze" in kinds
upload = next(e for e in events if e["category"] == "upload")
assert upload["filename"] == "a.csv"
assert upload["bytes"] == 42
assert "ts" in upload
assert "session" in upload
def test_session_start_renders(self, isolated_audit, tmp_path):
audit = isolated_audit
audit.log_session_start()
audit.flush_audit_log(timeout_s=2.0)
events = _read_all_events(tmp_path)
assert any(
e["category"] == "session" and "Session started" in e["message"]
for e in events
)
def test_log_session_start_is_idempotent(self, isolated_audit, tmp_path):
audit = isolated_audit
for _ in range(5):
audit.log_session_start()
audit.flush_audit_log(timeout_s=2.0)
events = _read_all_events(tmp_path)
starts = [e for e in events if e["category"] == "session"]
assert len(starts) == 1
class TestUnwritableTargetDoesntCrash:
def test_pointing_at_a_nonexistent_parent_doesnt_hang(
self, monkeypatch, tmp_path,
):
# Point the audit dir at a path that DOES exist as a FILE,
# so any mkdir there fails. The writer should swallow the
# error and the GUI-facing producer should keep working.
not_a_dir = tmp_path / "iam-a-file"
not_a_dir.write_text("hi")
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(not_a_dir))
from src import audit
audit.reset_for_tests()
try:
start = time.perf_counter()
audit.log_event("test", "should not crash")
assert time.perf_counter() - start < 0.5
audit.flush_audit_log(timeout_s=0.5)
finally:
audit.reset_for_tests()
class TestSerializationSafety:
def test_non_json_extras_get_str_coerced(self, isolated_audit, tmp_path):
audit = isolated_audit
class Weird:
def __repr__(self):
return "<Weird>"
audit.log_event("test", "carries a weird extra", obj=Weird())
audit.flush_audit_log(timeout_s=2.0)
events = _read_all_events(tmp_path)
assert len(events) == 1
assert events[0]["obj"] == "<Weird>"