feat(audit): diagnostic instrumentation env vars + writer-thread guard
Phase 1 of the audit-log re-enablement plan. Adds three opt-in env vars that let us ship one instrumented build for the user to run, without flipping the kill switch on for everybody. **Default behaviour is byte-identical to today**: with no env vars set the kill switch wins, no writer thread starts, no file is written, no stderr line is printed. Env vars (do NOT set in prod): - ``DATATOOLS_AUDIT_ENABLED=1`` — bypass ``_DISABLED`` for one session. ``_DISABLED = True`` stays in the source so an upgrade with no env var is still safe. - ``DATATOOLS_AUDIT_TRACE=1`` — print ``[audit] ...`` lines to stderr at module import, every writer-thread state change, and every producer entry point. Lets the user share a small log instead of attaching a debugger. - ``DATATOOLS_AUDIT_PROBE=<value>`` — bisect the producer path for Phase 2. Values: ``full`` (default), ``noop``, ``no-events``, ``no-page-open``, ``no-session-start``. The named variants return early from the corresponding ``log_*`` function so we can isolate which call is implicated in the blank-pages symptom. Also: - ``_writer_loop`` gets an outer ``try/except BaseException`` so silent thread death now surfaces a ``"writer thread died: ..."`` line in the launcher terminal instead of looking like a hang. - Existing first-write-failure stderr print gets ``flush=True`` so the user actually sees it before the process is killed. - Test fixture switches from the previous-commit ``_DISABLED = False`` override to ``_ENABLE_OVERRIDE = True`` so tests exercise the same bypass path the diagnostic build uses. - Two new tests pin the safety contract: with the kill switch on and no override, every producer is a true no-op (no writer thread, no file). And ``DATATOOLS_AUDIT_PROBE=no-events`` bypasses ``log_event`` even when the override is on — guards the bisect. Rollback: ``git revert HEAD`` removes Phase 1 cleanly. The deadlock fix from the previous commit stays in place. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
87
src/audit.py
87
src/audit.py
@@ -58,11 +58,33 @@ _SESSION_STARTED: bool = False
|
|||||||
# RESTORED kill switch — the async-writer redesign still triggers the
|
# RESTORED kill switch — the async-writer redesign still triggers the
|
||||||
# blank-pages symptom on the user's machine despite no synchronous
|
# blank-pages symptom on the user's machine despite no synchronous
|
||||||
# file I/O on the request path. Cause is not yet identified; keep all
|
# file I/O on the request path. Cause is not yet identified; keep all
|
||||||
# log_* calls as no-ops while we diagnose so the GUI keeps working.
|
# log_* calls as no-ops by default while we diagnose.
|
||||||
# Diagnostic plan: ask the user for the launcher terminal output AND
|
#
|
||||||
# whether anything appears in ``~/.datatools/logs/`` at all — that
|
# Diagnostic overrides for an instrumented run (DO NOT set in prod):
|
||||||
# bisects "writer thread starts" from "writer thread can't write."
|
# DATATOOLS_AUDIT_ENABLED=1 — bypass _DISABLED for one session
|
||||||
|
# DATATOOLS_AUDIT_TRACE=1 — print "[audit] ..." lines to stderr
|
||||||
|
# DATATOOLS_AUDIT_PROBE=... — bisect the producer path. Values:
|
||||||
|
# full (default) | noop | no-events | no-page-open | no-session-start
|
||||||
_DISABLED: bool = True
|
_DISABLED: bool = True
|
||||||
|
_ENABLE_OVERRIDE: bool = os.environ.get("DATATOOLS_AUDIT_ENABLED") == "1"
|
||||||
|
_TRACE: bool = os.environ.get("DATATOOLS_AUDIT_TRACE") == "1"
|
||||||
|
_PROBE: str = os.environ.get("DATATOOLS_AUDIT_PROBE", "full").lower()
|
||||||
|
|
||||||
|
|
||||||
|
def _trace(_msg: str, **_kw: Any) -> None:
|
||||||
|
"""Emit a stderr trace line when DATATOOLS_AUDIT_TRACE=1.
|
||||||
|
|
||||||
|
No-op otherwise — the default configuration adds zero extra I/O."""
|
||||||
|
if not _TRACE:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
extras = " ".join(f"{k}={v!r}" for k, v in _kw.items())
|
||||||
|
line = f"[audit] {_msg}"
|
||||||
|
if extras:
|
||||||
|
line = f"{line} {extras}"
|
||||||
|
print(line, file=sys.stderr, flush=True)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
# Bounded in-memory queue. ``deque(maxlen=N)`` drops the OLDEST entry
|
# Bounded in-memory queue. ``deque(maxlen=N)`` drops the OLDEST entry
|
||||||
# when full so the most-recent events are always the ones that
|
# when full so the most-recent events are always the ones that
|
||||||
@@ -78,6 +100,13 @@ _WRITER_THREAD: threading.Thread | None = None
|
|||||||
_SHUTDOWN_REQUESTED: bool = False
|
_SHUTDOWN_REQUESTED: bool = False
|
||||||
_WRITE_FAILED_REPORTED: bool = False
|
_WRITE_FAILED_REPORTED: bool = False
|
||||||
|
|
||||||
|
_trace(
|
||||||
|
"module-import",
|
||||||
|
disabled=_DISABLED,
|
||||||
|
enabled_override=_ENABLE_OVERRIDE,
|
||||||
|
probe=_PROBE,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Path helpers (pure — no I/O)
|
# Path helpers (pure — no I/O)
|
||||||
@@ -151,25 +180,37 @@ def _writer_loop() -> None:
|
|||||||
swallowed — the audit log is best-effort.
|
swallowed — the audit log is best-effort.
|
||||||
"""
|
"""
|
||||||
global _WRITE_FAILED_REPORTED
|
global _WRITE_FAILED_REPORTED
|
||||||
|
_trace("writer-loop:enter")
|
||||||
|
try:
|
||||||
while True:
|
while True:
|
||||||
# Wait for something to do.
|
# Wait for something to do.
|
||||||
with _QUEUE_COND:
|
with _QUEUE_COND:
|
||||||
while not _QUEUE and not _SHUTDOWN_REQUESTED:
|
while not _QUEUE and not _SHUTDOWN_REQUESTED:
|
||||||
_QUEUE_COND.wait(timeout=5.0)
|
_QUEUE_COND.wait(timeout=5.0)
|
||||||
if _SHUTDOWN_REQUESTED and not _QUEUE:
|
if _SHUTDOWN_REQUESTED and not _QUEUE:
|
||||||
|
_trace("writer-loop:exit-shutdown")
|
||||||
return
|
return
|
||||||
# Snapshot + clear under the lock; release before doing
|
# Snapshot + clear under the lock; release before doing
|
||||||
# I/O so producers can keep enqueueing.
|
# I/O so producers can keep enqueueing.
|
||||||
batch = list(_QUEUE)
|
batch = list(_QUEUE)
|
||||||
_QUEUE.clear()
|
_QUEUE.clear()
|
||||||
|
_trace("writer-loop:wake", batch=len(batch))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
path = audit_log_path()
|
path = audit_log_path()
|
||||||
|
_trace("writer-loop:mkdir", dir=str(path.parent))
|
||||||
path.parent.mkdir(parents=True, exist_ok=True)
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
_trace("writer-loop:open", path=str(path))
|
||||||
with path.open("a", encoding="utf-8") as f:
|
with path.open("a", encoding="utf-8") as f:
|
||||||
for ev in batch:
|
for ev in batch:
|
||||||
f.write(json.dumps(ev, ensure_ascii=False) + "\n")
|
f.write(json.dumps(ev, ensure_ascii=False) + "\n")
|
||||||
|
_trace("writer-loop:wrote", count=len(batch))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
_trace(
|
||||||
|
"writer-loop:write-failed",
|
||||||
|
exc=type(e).__name__,
|
||||||
|
msg=str(e),
|
||||||
|
)
|
||||||
if not _WRITE_FAILED_REPORTED:
|
if not _WRITE_FAILED_REPORTED:
|
||||||
_WRITE_FAILED_REPORTED = True
|
_WRITE_FAILED_REPORTED = True
|
||||||
try:
|
try:
|
||||||
@@ -178,9 +219,24 @@ def _writer_loop() -> None:
|
|||||||
f"for the rest of this session. Error: "
|
f"for the rest of this session. Error: "
|
||||||
f"{type(e).__name__}: {e}",
|
f"{type(e).__name__}: {e}",
|
||||||
file=sys.stderr,
|
file=sys.stderr,
|
||||||
|
flush=True,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
except BaseException as e:
|
||||||
|
# Outer guard. If the writer thread dies for any reason
|
||||||
|
# (unexpected exception, sys.exit from a foreign module),
|
||||||
|
# surface it in the launcher terminal instead of going silent.
|
||||||
|
try:
|
||||||
|
print(
|
||||||
|
f"DataTools audit: writer thread died: "
|
||||||
|
f"{type(e).__name__}: {e}",
|
||||||
|
file=sys.stderr,
|
||||||
|
flush=True,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
def _ensure_writer_started() -> None:
|
def _ensure_writer_started() -> None:
|
||||||
@@ -194,6 +250,7 @@ def _ensure_writer_started() -> None:
|
|||||||
return
|
return
|
||||||
with _LOCK:
|
with _LOCK:
|
||||||
if _WRITER_THREAD is None or not _WRITER_THREAD.is_alive():
|
if _WRITER_THREAD is None or not _WRITER_THREAD.is_alive():
|
||||||
|
_trace("ensure-writer:starting")
|
||||||
t = threading.Thread(
|
t = threading.Thread(
|
||||||
target=_writer_loop,
|
target=_writer_loop,
|
||||||
daemon=True,
|
daemon=True,
|
||||||
@@ -201,6 +258,7 @@ def _ensure_writer_started() -> None:
|
|||||||
)
|
)
|
||||||
t.start()
|
t.start()
|
||||||
_WRITER_THREAD = t
|
_WRITER_THREAD = t
|
||||||
|
_trace("ensure-writer:started", ident=t.ident)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -219,8 +277,12 @@ def log_event(
|
|||||||
Failures inside this function are swallowed; a broken audit log
|
Failures inside this function are swallowed; a broken audit log
|
||||||
must never take the GUI down.
|
must never take the GUI down.
|
||||||
"""
|
"""
|
||||||
if _DISABLED:
|
if _DISABLED and not _ENABLE_OVERRIDE:
|
||||||
return
|
return
|
||||||
|
if _PROBE in ("noop", "no-events"):
|
||||||
|
_trace("log_event:probe-skip", category=category)
|
||||||
|
return
|
||||||
|
_trace("log_event:enter", category=category)
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
ts = datetime.now(tz=timezone.utc).isoformat(timespec="milliseconds")
|
ts = datetime.now(tz=timezone.utc).isoformat(timespec="milliseconds")
|
||||||
@@ -250,8 +312,12 @@ def log_event(
|
|||||||
|
|
||||||
def log_session_start() -> None:
|
def log_session_start() -> None:
|
||||||
"""Idempotent session-start banner with platform info."""
|
"""Idempotent session-start banner with platform info."""
|
||||||
if _DISABLED:
|
if _DISABLED and not _ENABLE_OVERRIDE:
|
||||||
return
|
return
|
||||||
|
if _PROBE in ("noop", "no-session-start"):
|
||||||
|
_trace("log_session_start:probe-skip")
|
||||||
|
return
|
||||||
|
_trace("log_session_start:enter")
|
||||||
global _SESSION_STARTED
|
global _SESSION_STARTED
|
||||||
with _LOCK:
|
with _LOCK:
|
||||||
if _SESSION_STARTED:
|
if _SESSION_STARTED:
|
||||||
@@ -278,8 +344,12 @@ def log_session_start() -> None:
|
|||||||
|
|
||||||
def log_page_open(slug: str) -> None:
|
def log_page_open(slug: str) -> None:
|
||||||
"""Emit a deduplicated 'page open' nav event."""
|
"""Emit a deduplicated 'page open' nav event."""
|
||||||
if _DISABLED:
|
if _DISABLED and not _ENABLE_OVERRIDE:
|
||||||
return
|
return
|
||||||
|
if _PROBE in ("noop", "no-page-open"):
|
||||||
|
_trace("log_page_open:probe-skip", slug=slug)
|
||||||
|
return
|
||||||
|
_trace("log_page_open:enter", slug=slug)
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
import streamlit as st
|
import streamlit as st
|
||||||
@@ -318,8 +388,9 @@ def flush_audit_log(timeout_s: float = 0.5) -> None:
|
|||||||
stuck disk can never delay shutdown — events still in the queue
|
stuck disk can never delay shutdown — events still in the queue
|
||||||
when the timer expires are dropped.
|
when the timer expires are dropped.
|
||||||
"""
|
"""
|
||||||
if _DISABLED:
|
if _DISABLED and not _ENABLE_OVERRIDE:
|
||||||
return
|
return
|
||||||
|
_trace("flush:enter", timeout_s=timeout_s)
|
||||||
global _SHUTDOWN_REQUESTED
|
global _SHUTDOWN_REQUESTED
|
||||||
deadline = time.monotonic() + max(0.0, timeout_s)
|
deadline = time.monotonic() + max(0.0, timeout_s)
|
||||||
with _QUEUE_COND:
|
with _QUEUE_COND:
|
||||||
|
|||||||
@@ -28,12 +28,13 @@ def isolated_audit(monkeypatch, tmp_path):
|
|||||||
"""Redirect audit writes into ``tmp_path`` and reset module state
|
"""Redirect audit writes into ``tmp_path`` and reset module state
|
||||||
so each test starts fresh.
|
so each test starts fresh.
|
||||||
|
|
||||||
The kill switch is bypassed for the duration of the test by
|
The kill switch (``_DISABLED``) is bypassed for the duration of
|
||||||
patching the module-level constant directly — these tests need
|
the test by patching ``_ENABLE_OVERRIDE`` directly — env vars
|
||||||
the real producer path to run."""
|
won't reach the module-level constant after import."""
|
||||||
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
|
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
|
||||||
from src import audit
|
from src import audit
|
||||||
monkeypatch.setattr(audit, "_DISABLED", False)
|
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True)
|
||||||
|
monkeypatch.setattr(audit, "_PROBE", "full")
|
||||||
audit.reset_for_tests()
|
audit.reset_for_tests()
|
||||||
yield audit
|
yield audit
|
||||||
# Best-effort cleanup so a runaway writer thread doesn't keep
|
# Best-effort cleanup so a runaway writer thread doesn't keep
|
||||||
@@ -120,7 +121,8 @@ class TestUnwritableTargetDoesntCrash:
|
|||||||
not_a_dir.write_text("hi")
|
not_a_dir.write_text("hi")
|
||||||
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(not_a_dir))
|
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(not_a_dir))
|
||||||
from src import audit
|
from src import audit
|
||||||
monkeypatch.setattr(audit, "_DISABLED", False)
|
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True)
|
||||||
|
monkeypatch.setattr(audit, "_PROBE", "full")
|
||||||
audit.reset_for_tests()
|
audit.reset_for_tests()
|
||||||
try:
|
try:
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
@@ -131,6 +133,47 @@ class TestUnwritableTargetDoesntCrash:
|
|||||||
audit.reset_for_tests()
|
audit.reset_for_tests()
|
||||||
|
|
||||||
|
|
||||||
|
class TestKillSwitchContract:
|
||||||
|
"""With ``_DISABLED = True`` and no env-var override, every
|
||||||
|
producer is a true no-op. Pins the safety contract: the default
|
||||||
|
configuration must never touch disk or start a thread."""
|
||||||
|
|
||||||
|
def test_disabled_writes_nothing(self, monkeypatch, tmp_path):
|
||||||
|
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
|
||||||
|
from src import audit
|
||||||
|
monkeypatch.setattr(audit, "_DISABLED", True)
|
||||||
|
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", False)
|
||||||
|
audit.reset_for_tests()
|
||||||
|
try:
|
||||||
|
audit.log_event("test", "should be a no-op")
|
||||||
|
audit.log_session_start()
|
||||||
|
audit.log_page_open("test_page")
|
||||||
|
audit.flush_audit_log(timeout_s=0.5)
|
||||||
|
assert audit._WRITER_THREAD is None, (
|
||||||
|
"Writer thread must not start when the kill switch is on."
|
||||||
|
)
|
||||||
|
assert list(tmp_path.glob("datatools-*.jsonl")) == [], (
|
||||||
|
"Kill switch leaked a log file."
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
audit.reset_for_tests()
|
||||||
|
|
||||||
|
def test_probe_no_events_drops_writes(self, monkeypatch, tmp_path):
|
||||||
|
"""``DATATOOLS_AUDIT_PROBE=no-events`` bypasses log_event even
|
||||||
|
when the override is on. Bisect aid for Phase 2."""
|
||||||
|
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
|
||||||
|
from src import audit
|
||||||
|
monkeypatch.setattr(audit, "_ENABLE_OVERRIDE", True)
|
||||||
|
monkeypatch.setattr(audit, "_PROBE", "no-events")
|
||||||
|
audit.reset_for_tests()
|
||||||
|
try:
|
||||||
|
audit.log_event("test", "should be dropped")
|
||||||
|
audit.flush_audit_log(timeout_s=0.5)
|
||||||
|
assert list(tmp_path.glob("datatools-*.jsonl")) == []
|
||||||
|
finally:
|
||||||
|
audit.reset_for_tests()
|
||||||
|
|
||||||
|
|
||||||
class TestSerializationSafety:
|
class TestSerializationSafety:
|
||||||
def test_non_json_extras_get_str_coerced(self, isolated_audit, tmp_path):
|
def test_non_json_extras_get_str_coerced(self, isolated_audit, tmp_path):
|
||||||
audit = isolated_audit
|
audit = isolated_audit
|
||||||
|
|||||||
Reference in New Issue
Block a user