feat(audit): diagnostic instrumentation env vars + writer-thread guard

Phase 1 of the audit-log re-enablement plan. Adds three opt-in env
vars that let us ship one instrumented build for the user to run,
without flipping the kill switch on for everybody. **Default
behaviour is byte-identical to today**: with no env vars set the
kill switch wins, no writer thread starts, no file is written, no
stderr line is printed.

Env vars (do NOT set in prod):

- ``DATATOOLS_AUDIT_ENABLED=1`` — bypass ``_DISABLED`` for one
  session. ``_DISABLED = True`` stays in the source so an upgrade
  with no env var is still safe.
- ``DATATOOLS_AUDIT_TRACE=1`` — print ``[audit] ...`` lines to
  stderr at module import, every writer-thread state change, and
  every producer entry point. Lets the user share a small log
  instead of attaching a debugger.
- ``DATATOOLS_AUDIT_PROBE=<value>`` — bisect the producer path
  for Phase 2. Values: ``full`` (default), ``noop``, ``no-events``,
  ``no-page-open``, ``no-session-start``. The named variants
  return early from the corresponding ``log_*`` function so we can
  isolate which call is implicated in the blank-pages symptom.

Also:

- ``_writer_loop`` gets an outer ``try/except BaseException`` so
  silent thread death now surfaces a ``"writer thread died: ..."``
  line in the launcher terminal instead of looking like a hang.
- Existing first-write-failure stderr print gets ``flush=True`` so
  the user actually sees it before the process is killed.
- Test fixture switches from the previous-commit ``_DISABLED = False``
  override to ``_ENABLE_OVERRIDE = True`` so tests exercise the same
  bypass path the diagnostic build uses.
- Two new tests pin the safety contract: with the kill switch on
  and no override, every producer is a true no-op (no writer
  thread, no file). And ``DATATOOLS_AUDIT_PROBE=no-events`` bypasses
  ``log_event`` even when the override is on — guards the bisect.

Rollback: ``git revert HEAD`` removes Phase 1 cleanly. The deadlock
fix from the previous commit stays in place.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-19 14:46:27 +00:00
parent a8ff8f4bd0
commit 76c9f5a679
2 changed files with 155 additions and 41 deletions

View File

@@ -58,11 +58,33 @@ _SESSION_STARTED: bool = False
# RESTORED kill switch — the async-writer redesign still triggers the # RESTORED kill switch — the async-writer redesign still triggers the
# blank-pages symptom on the user's machine despite no synchronous # blank-pages symptom on the user's machine despite no synchronous
# file I/O on the request path. Cause is not yet identified; keep all # file I/O on the request path. Cause is not yet identified; keep all
# log_* calls as no-ops while we diagnose so the GUI keeps working. # log_* calls as no-ops by default while we diagnose.
# Diagnostic plan: ask the user for the launcher terminal output AND #
# whether anything appears in ``~/.datatools/logs/`` at all — that # Diagnostic overrides for an instrumented run (DO NOT set in prod):
# bisects "writer thread starts" from "writer thread can't write." # DATATOOLS_AUDIT_ENABLED=1 — bypass _DISABLED for one session
# DATATOOLS_AUDIT_TRACE=1 — print "[audit] ..." lines to stderr
# DATATOOLS_AUDIT_PROBE=... — bisect the producer path. Values:
# full (default) | noop | no-events | no-page-open | no-session-start
_DISABLED: bool = True _DISABLED: bool = True
_ENABLE_OVERRIDE: bool = os.environ.get("DATATOOLS_AUDIT_ENABLED") == "1"
_TRACE: bool = os.environ.get("DATATOOLS_AUDIT_TRACE") == "1"
_PROBE: str = os.environ.get("DATATOOLS_AUDIT_PROBE", "full").lower()
def _trace(_msg: str, **_kw: Any) -> None:
"""Emit a stderr trace line when DATATOOLS_AUDIT_TRACE=1.
No-op otherwise — the default configuration adds zero extra I/O."""
if not _TRACE:
return
try:
extras = " ".join(f"{k}={v!r}" for k, v in _kw.items())
line = f"[audit] {_msg}"
if extras:
line = f"{line} {extras}"
print(line, file=sys.stderr, flush=True)
except Exception:
pass
# Bounded in-memory queue. ``deque(maxlen=N)`` drops the OLDEST entry # Bounded in-memory queue. ``deque(maxlen=N)`` drops the OLDEST entry
# when full so the most-recent events are always the ones that # when full so the most-recent events are always the ones that
@@ -78,6 +100,13 @@ _WRITER_THREAD: threading.Thread | None = None
_SHUTDOWN_REQUESTED: bool = False _SHUTDOWN_REQUESTED: bool = False
_WRITE_FAILED_REPORTED: bool = False _WRITE_FAILED_REPORTED: bool = False
_trace(
"module-import",
disabled=_DISABLED,
enabled_override=_ENABLE_OVERRIDE,
probe=_PROBE,
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Path helpers (pure — no I/O) # Path helpers (pure — no I/O)
@@ -151,36 +180,63 @@ def _writer_loop() -> None:
swallowed — the audit log is best-effort. swallowed — the audit log is best-effort.
""" """
global _WRITE_FAILED_REPORTED global _WRITE_FAILED_REPORTED
while True: _trace("writer-loop:enter")
# Wait for something to do. try:
with _QUEUE_COND: while True:
while not _QUEUE and not _SHUTDOWN_REQUESTED: # Wait for something to do.
_QUEUE_COND.wait(timeout=5.0) with _QUEUE_COND:
if _SHUTDOWN_REQUESTED and not _QUEUE: while not _QUEUE and not _SHUTDOWN_REQUESTED:
return _QUEUE_COND.wait(timeout=5.0)
# Snapshot + clear under the lock; release before doing if _SHUTDOWN_REQUESTED and not _QUEUE:
# I/O so producers can keep enqueueing. _trace("writer-loop:exit-shutdown")
batch = list(_QUEUE) return
_QUEUE.clear() # Snapshot + clear under the lock; release before doing
# I/O so producers can keep enqueueing.
batch = list(_QUEUE)
_QUEUE.clear()
_trace("writer-loop:wake", batch=len(batch))
try:
path = audit_log_path()
_trace("writer-loop:mkdir", dir=str(path.parent))
path.parent.mkdir(parents=True, exist_ok=True)
_trace("writer-loop:open", path=str(path))
with path.open("a", encoding="utf-8") as f:
for ev in batch:
f.write(json.dumps(ev, ensure_ascii=False) + "\n")
_trace("writer-loop:wrote", count=len(batch))
except Exception as e:
_trace(
"writer-loop:write-failed",
exc=type(e).__name__,
msg=str(e),
)
if not _WRITE_FAILED_REPORTED:
_WRITE_FAILED_REPORTED = True
try:
print(
f"DataTools audit: writes failing — log degraded "
f"for the rest of this session. Error: "
f"{type(e).__name__}: {e}",
file=sys.stderr,
flush=True,
)
except Exception:
pass
except BaseException as e:
# Outer guard. If the writer thread dies for any reason
# (unexpected exception, sys.exit from a foreign module),
# surface it in the launcher terminal instead of going silent.
try: try:
path = audit_log_path() print(
path.parent.mkdir(parents=True, exist_ok=True) f"DataTools audit: writer thread died: "
with path.open("a", encoding="utf-8") as f: f"{type(e).__name__}: {e}",
for ev in batch: file=sys.stderr,
f.write(json.dumps(ev, ensure_ascii=False) + "\n") flush=True,
except Exception as e: )
if not _WRITE_FAILED_REPORTED: except Exception:
_WRITE_FAILED_REPORTED = True pass
try: raise
print(
f"DataTools audit: writes failing — log degraded "
f"for the rest of this session. Error: "
f"{type(e).__name__}: {e}",
file=sys.stderr,
)
except Exception:
pass
def _ensure_writer_started() -> None: def _ensure_writer_started() -> None:
@@ -194,6 +250,7 @@ def _ensure_writer_started() -> None:
return return
with _LOCK: with _LOCK:
if _WRITER_THREAD is None or not _WRITER_THREAD.is_alive(): if _WRITER_THREAD is None or not _WRITER_THREAD.is_alive():
_trace("ensure-writer:starting")
t = threading.Thread( t = threading.Thread(
target=_writer_loop, target=_writer_loop,
daemon=True, daemon=True,
@@ -201,6 +258,7 @@ def _ensure_writer_started() -> None:
) )
t.start() t.start()
_WRITER_THREAD = t _WRITER_THREAD = t
_trace("ensure-writer:started", ident=t.ident)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -219,8 +277,12 @@ def log_event(
Failures inside this function are swallowed; a broken audit log Failures inside this function are swallowed; a broken audit log
must never take the GUI down. must never take the GUI down.
""" """
if _DISABLED: if _DISABLED and not _ENABLE_OVERRIDE:
return return
if _PROBE in ("noop", "no-events"):
_trace("log_event:probe-skip", category=category)
return
_trace("log_event:enter", category=category)
try: try:
try: try:
ts = datetime.now(tz=timezone.utc).isoformat(timespec="milliseconds") ts = datetime.now(tz=timezone.utc).isoformat(timespec="milliseconds")
@@ -250,8 +312,12 @@ def log_event(
def log_session_start() -> None: def log_session_start() -> None:
"""Idempotent session-start banner with platform info.""" """Idempotent session-start banner with platform info."""
if _DISABLED: if _DISABLED and not _ENABLE_OVERRIDE:
return return
if _PROBE in ("noop", "no-session-start"):
_trace("log_session_start:probe-skip")
return
_trace("log_session_start:enter")
global _SESSION_STARTED global _SESSION_STARTED
with _LOCK: with _LOCK:
if _SESSION_STARTED: if _SESSION_STARTED:
@@ -278,8 +344,12 @@ def log_session_start() -> None:
def log_page_open(slug: str) -> None: def log_page_open(slug: str) -> None:
"""Emit a deduplicated 'page open' nav event.""" """Emit a deduplicated 'page open' nav event."""
if _DISABLED: if _DISABLED and not _ENABLE_OVERRIDE:
return return
if _PROBE in ("noop", "no-page-open"):
_trace("log_page_open:probe-skip", slug=slug)
return
_trace("log_page_open:enter", slug=slug)
try: try:
try: try:
import streamlit as st import streamlit as st
@@ -318,8 +388,9 @@ def flush_audit_log(timeout_s: float = 0.5) -> None:
stuck disk can never delay shutdown — events still in the queue stuck disk can never delay shutdown — events still in the queue
when the timer expires are dropped. when the timer expires are dropped.
""" """
if _DISABLED: if _DISABLED and not _ENABLE_OVERRIDE:
return return
_trace("flush:enter", timeout_s=timeout_s)
global _SHUTDOWN_REQUESTED global _SHUTDOWN_REQUESTED
deadline = time.monotonic() + max(0.0, timeout_s) deadline = time.monotonic() + max(0.0, timeout_s)
with _QUEUE_COND: with _QUEUE_COND:

View File

@@ -28,12 +28,13 @@ def isolated_audit(monkeypatch, tmp_path):
"""Redirect audit writes into ``tmp_path`` and reset module state """Redirect audit writes into ``tmp_path`` and reset module state
so each test starts fresh. so each test starts fresh.
The kill switch is bypassed for the duration of the test by The kill switch (``_DISABLED``) is bypassed for the duration of
patching the module-level constant directly — these tests need the test by patching ``_ENABLE_OVERRIDE`` directly — env vars
the real producer path to run.""" won't reach the module-level constant after import."""
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path)) monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
from src import audit from src import audit
monkeypatch.setattr(audit, "_DISABLED", False) monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True)
monkeypatch.setattr(audit, "_PROBE", "full")
audit.reset_for_tests() audit.reset_for_tests()
yield audit yield audit
# Best-effort cleanup so a runaway writer thread doesn't keep # Best-effort cleanup so a runaway writer thread doesn't keep
@@ -120,7 +121,8 @@ class TestUnwritableTargetDoesntCrash:
not_a_dir.write_text("hi") not_a_dir.write_text("hi")
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(not_a_dir)) monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(not_a_dir))
from src import audit from src import audit
monkeypatch.setattr(audit, "_DISABLED", False) monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True)
monkeypatch.setattr(audit, "_PROBE", "full")
audit.reset_for_tests() audit.reset_for_tests()
try: try:
start = time.perf_counter() start = time.perf_counter()
@@ -131,6 +133,47 @@ class TestUnwritableTargetDoesntCrash:
audit.reset_for_tests() audit.reset_for_tests()
class TestKillSwitchContract:
"""With ``_DISABLED = True`` and no env-var override, every
producer is a true no-op. Pins the safety contract: the default
configuration must never touch disk or start a thread."""
def test_disabled_writes_nothing(self, monkeypatch, tmp_path):
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
from src import audit
monkeypatch.setattr(audit, "_DISABLED", True)
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", False)
audit.reset_for_tests()
try:
audit.log_event("test", "should be a no-op")
audit.log_session_start()
audit.log_page_open("test_page")
audit.flush_audit_log(timeout_s=0.5)
assert audit._WRITER_THREAD is None, (
"Writer thread must not start when the kill switch is on."
)
assert list(tmp_path.glob("datatools-*.jsonl")) == [], (
"Kill switch leaked a log file."
)
finally:
audit.reset_for_tests()
def test_probe_no_events_drops_writes(self, monkeypatch, tmp_path):
"""``DATATOOLS_AUDIT_PROBE=no-events`` bypasses log_event even
when the override is on. Bisect aid for Phase 2."""
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
from src import audit
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True)
monkeypatch.setattr(audit, "_PROBE", "no-events")
audit.reset_for_tests()
try:
audit.log_event("test", "should be dropped")
audit.flush_audit_log(timeout_s=0.5)
assert list(tmp_path.glob("datatools-*.jsonl")) == []
finally:
audit.reset_for_tests()
class TestSerializationSafety: class TestSerializationSafety:
def test_non_json_extras_get_str_coerced(self, isolated_audit, tmp_path): def test_non_json_extras_get_str_coerced(self, isolated_audit, tmp_path):
audit = isolated_audit audit = isolated_audit