feat(audit): diagnostic instrumentation env vars + writer-thread guard

Phase 1 of the audit-log re-enablement plan. Adds three opt-in env
vars that let us ship one instrumented build for the user to run,
without flipping the kill switch on for everybody. **Default
behaviour is byte-identical to today**: with no env vars set the
kill switch wins, no writer thread starts, no file is written, no
stderr line is printed.

Env vars (do NOT set in prod):

- ``DATATOOLS_AUDIT_ENABLED=1`` — bypass ``_DISABLED`` for one
  session. ``_DISABLED = True`` stays in the source so an upgrade
  with no env var is still safe.
- ``DATATOOLS_AUDIT_TRACE=1`` — print ``[audit] ...`` lines to
  stderr at module import, every writer-thread state change, and
  every producer entry point. Lets the user share a small log
  instead of attaching a debugger.
- ``DATATOOLS_AUDIT_PROBE=<value>`` — bisect the producer path
  for Phase 2. Values: ``full`` (default), ``noop``, ``no-events``,
  ``no-page-open``, ``no-session-start``. The named variants
  return early from the corresponding ``log_*`` function so we can
  isolate which call is implicated in the blank-pages symptom.

Also:

- ``_writer_loop`` gets an outer ``try/except BaseException`` so
  silent thread death now surfaces a ``"writer thread died: ..."``
  line in the launcher terminal instead of looking like a hang.
- Existing first-write-failure stderr print gets ``flush=True`` so
  the user actually sees it before the process is killed.
- Test fixture switches from the previous-commit ``_DISABLED = False``
  override to ``_ENABLE_OVERRIDE = True`` so tests exercise the same
  bypass path the diagnostic build uses.
- Two new tests pin the safety contract: with the kill switch on
  and no override, every producer is a true no-op (no writer
  thread, no file). And ``DATATOOLS_AUDIT_PROBE=no-events`` bypasses
  ``log_event`` even when the override is on — guards the bisect.

Rollback: ``git revert HEAD`` removes Phase 1 cleanly. The deadlock
fix from the previous commit stays in place.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-19 14:46:27 +00:00
parent a8ff8f4bd0
commit 76c9f5a679
2 changed files with 155 additions and 41 deletions

View File

@@ -58,11 +58,33 @@ _SESSION_STARTED: bool = False
# RESTORED kill switch — the async-writer redesign still triggers the
# blank-pages symptom on the user's machine despite no synchronous
# file I/O on the request path. Cause is not yet identified; keep all
# log_* calls as no-ops while we diagnose so the GUI keeps working.
# Diagnostic plan: ask the user for the launcher terminal output AND
# whether anything appears in ``~/.datatools/logs/`` at all — that
# bisects "writer thread starts" from "writer thread can't write."
# log_* calls as no-ops by default while we diagnose.
#
# Diagnostic overrides for an instrumented run (DO NOT set in prod):
# DATATOOLS_AUDIT_ENABLED=1 — bypass _DISABLED for one session
# DATATOOLS_AUDIT_TRACE=1 — print "[audit] ..." lines to stderr
# DATATOOLS_AUDIT_PROBE=... — bisect the producer path. Values:
# full (default) | noop | no-events | no-page-open | no-session-start
_DISABLED: bool = True
_ENABLE_OVERRIDE: bool = os.environ.get("DATATOOLS_AUDIT_ENABLED") == "1"
_TRACE: bool = os.environ.get("DATATOOLS_AUDIT_TRACE") == "1"
_PROBE: str = os.environ.get("DATATOOLS_AUDIT_PROBE", "full").lower()
def _trace(_msg: str, **_kw: Any) -> None:
"""Emit a stderr trace line when DATATOOLS_AUDIT_TRACE=1.
No-op otherwise — the default configuration adds zero extra I/O."""
if not _TRACE:
return
try:
extras = " ".join(f"{k}={v!r}" for k, v in _kw.items())
line = f"[audit] {_msg}"
if extras:
line = f"{line} {extras}"
print(line, file=sys.stderr, flush=True)
except Exception:
pass
# Bounded in-memory queue. ``deque(maxlen=N)`` drops the OLDEST entry
# when full so the most-recent events are always the ones that
@@ -78,6 +100,13 @@ _WRITER_THREAD: threading.Thread | None = None
_SHUTDOWN_REQUESTED: bool = False
_WRITE_FAILED_REPORTED: bool = False
_trace(
"module-import",
disabled=_DISABLED,
enabled_override=_ENABLE_OVERRIDE,
probe=_PROBE,
)
# ---------------------------------------------------------------------------
# Path helpers (pure — no I/O)
@@ -151,25 +180,37 @@ def _writer_loop() -> None:
swallowed — the audit log is best-effort.
"""
global _WRITE_FAILED_REPORTED
_trace("writer-loop:enter")
try:
while True:
# Wait for something to do.
with _QUEUE_COND:
while not _QUEUE and not _SHUTDOWN_REQUESTED:
_QUEUE_COND.wait(timeout=5.0)
if _SHUTDOWN_REQUESTED and not _QUEUE:
_trace("writer-loop:exit-shutdown")
return
# Snapshot + clear under the lock; release before doing
# I/O so producers can keep enqueueing.
batch = list(_QUEUE)
_QUEUE.clear()
_trace("writer-loop:wake", batch=len(batch))
try:
path = audit_log_path()
_trace("writer-loop:mkdir", dir=str(path.parent))
path.parent.mkdir(parents=True, exist_ok=True)
_trace("writer-loop:open", path=str(path))
with path.open("a", encoding="utf-8") as f:
for ev in batch:
f.write(json.dumps(ev, ensure_ascii=False) + "\n")
_trace("writer-loop:wrote", count=len(batch))
except Exception as e:
_trace(
"writer-loop:write-failed",
exc=type(e).__name__,
msg=str(e),
)
if not _WRITE_FAILED_REPORTED:
_WRITE_FAILED_REPORTED = True
try:
@@ -178,9 +219,24 @@ def _writer_loop() -> None:
f"for the rest of this session. Error: "
f"{type(e).__name__}: {e}",
file=sys.stderr,
flush=True,
)
except Exception:
pass
except BaseException as e:
# Outer guard. If the writer thread dies for any reason
# (unexpected exception, sys.exit from a foreign module),
# surface it in the launcher terminal instead of going silent.
try:
print(
f"DataTools audit: writer thread died: "
f"{type(e).__name__}: {e}",
file=sys.stderr,
flush=True,
)
except Exception:
pass
raise
def _ensure_writer_started() -> None:
@@ -194,6 +250,7 @@ def _ensure_writer_started() -> None:
return
with _LOCK:
if _WRITER_THREAD is None or not _WRITER_THREAD.is_alive():
_trace("ensure-writer:starting")
t = threading.Thread(
target=_writer_loop,
daemon=True,
@@ -201,6 +258,7 @@ def _ensure_writer_started() -> None:
)
t.start()
_WRITER_THREAD = t
_trace("ensure-writer:started", ident=t.ident)
# ---------------------------------------------------------------------------
@@ -219,8 +277,12 @@ def log_event(
Failures inside this function are swallowed; a broken audit log
must never take the GUI down.
"""
if _DISABLED:
if _DISABLED and not _ENABLE_OVERRIDE:
return
if _PROBE in ("noop", "no-events"):
_trace("log_event:probe-skip", category=category)
return
_trace("log_event:enter", category=category)
try:
try:
ts = datetime.now(tz=timezone.utc).isoformat(timespec="milliseconds")
@@ -250,8 +312,12 @@ def log_event(
def log_session_start() -> None:
"""Idempotent session-start banner with platform info."""
if _DISABLED:
if _DISABLED and not _ENABLE_OVERRIDE:
return
if _PROBE in ("noop", "no-session-start"):
_trace("log_session_start:probe-skip")
return
_trace("log_session_start:enter")
global _SESSION_STARTED
with _LOCK:
if _SESSION_STARTED:
@@ -278,8 +344,12 @@ def log_session_start() -> None:
def log_page_open(slug: str) -> None:
"""Emit a deduplicated 'page open' nav event."""
if _DISABLED:
if _DISABLED and not _ENABLE_OVERRIDE:
return
if _PROBE in ("noop", "no-page-open"):
_trace("log_page_open:probe-skip", slug=slug)
return
_trace("log_page_open:enter", slug=slug)
try:
try:
import streamlit as st
@@ -318,8 +388,9 @@ def flush_audit_log(timeout_s: float = 0.5) -> None:
stuck disk can never delay shutdown — events still in the queue
when the timer expires are dropped.
"""
if _DISABLED:
if _DISABLED and not _ENABLE_OVERRIDE:
return
_trace("flush:enter", timeout_s=timeout_s)
global _SHUTDOWN_REQUESTED
deadline = time.monotonic() + max(0.0, timeout_s)
with _QUEUE_COND:

View File

@@ -28,12 +28,13 @@ def isolated_audit(monkeypatch, tmp_path):
"""Redirect audit writes into ``tmp_path`` and reset module state
so each test starts fresh.
The kill switch is bypassed for the duration of the test by
patching the module-level constant directly — these tests need
the real producer path to run."""
The kill switch (``_DISABLED``) is bypassed for the duration of
the test by patching ``_ENABLE_OVERRIDE`` directly — env vars
won't reach the module-level constant after import."""
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
from src import audit
monkeypatch.setattr(audit, "_DISABLED", False)
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True)
monkeypatch.setattr(audit, "_PROBE", "full")
audit.reset_for_tests()
yield audit
# Best-effort cleanup so a runaway writer thread doesn't keep
@@ -120,7 +121,8 @@ class TestUnwritableTargetDoesntCrash:
not_a_dir.write_text("hi")
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(not_a_dir))
from src import audit
monkeypatch.setattr(audit, "_DISABLED", False)
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True)
monkeypatch.setattr(audit, "_PROBE", "full")
audit.reset_for_tests()
try:
start = time.perf_counter()
@@ -131,6 +133,47 @@ class TestUnwritableTargetDoesntCrash:
audit.reset_for_tests()
class TestKillSwitchContract:
"""With ``_DISABLED = True`` and no env-var override, every
producer is a true no-op. Pins the safety contract: the default
configuration must never touch disk or start a thread."""
def test_disabled_writes_nothing(self, monkeypatch, tmp_path):
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
from src import audit
monkeypatch.setattr(audit, "_DISABLED", True)
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", False)
audit.reset_for_tests()
try:
audit.log_event("test", "should be a no-op")
audit.log_session_start()
audit.log_page_open("test_page")
audit.flush_audit_log(timeout_s=0.5)
assert audit._WRITER_THREAD is None, (
"Writer thread must not start when the kill switch is on."
)
assert list(tmp_path.glob("datatools-*.jsonl")) == [], (
"Kill switch leaked a log file."
)
finally:
audit.reset_for_tests()
def test_probe_no_events_drops_writes(self, monkeypatch, tmp_path):
"""``DATATOOLS_AUDIT_PROBE=no-events`` bypasses log_event even
when the override is on. Bisect aid for Phase 2."""
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
from src import audit
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True)
monkeypatch.setattr(audit, "_PROBE", "no-events")
audit.reset_for_tests()
try:
audit.log_event("test", "should be dropped")
audit.flush_audit_log(timeout_s=0.5)
assert list(tmp_path.glob("datatools-*.jsonl")) == []
finally:
audit.reset_for_tests()
class TestSerializationSafety:
def test_non_json_extras_get_str_coerced(self, isolated_audit, tmp_path):
audit = isolated_audit