feat(audit): async writer thread — safe to re-enable
Reported earlier: synchronous file writes in ``log_event`` blocked the GUI render thread on hostile filesystems (Windows antivirus on ``~/.datatools/logs/`` is the prime suspect). A blocking ``open`` call doesn't raise — try/except can't recover from it — so the only safe re-enable is to take file I/O off the render path. Refactor: - ``log_event`` and friends push events onto a ``deque(maxlen=5000)`` via ``put_nowait`` and return in microseconds. - A single daemon thread (``datatools-audit-writer``) drains the queue and writes batches. Holds the queue lock only long enough to snapshot + clear, then does I/O outside the lock so producers can keep enqueueing. - ``audit_log_path()`` is now pure path arithmetic — no ``mkdir`` no ``open``. The writer thread does the directory creation off the request path, so any hang there only affects the writer. - Bounded queue means an unwritable disk doesn't unbounded-grow memory; the queue caps at 5000 and overflow drops OLDEST events so the most-recent (most-diagnostic) ones survive. - First write failure prints once to stderr; subsequent failures are silent so logs don't drown the launcher terminal. - ``flush_audit_log(timeout_s=0.5)`` drains the queue and signals the writer to exit; bounded so a stuck disk can't delay shutdown. Other changes in this commit: - ``shutdown_app`` now emits a "Session ending" event and calls ``flush_audit_log`` before kicking the os._exit timer, so the closing session's events make it to disk. - The Diagnostics sidebar in ``hide_streamlit_chrome`` is re-enabled (the ``if False:`` gate is removed). Wrapped in try/except defensively — render errors print to stderr, never blank the page. - ``_DISABLED`` kill-switch is gone. The async design IS the safety mechanism now. Tests in ``tests/test_audit.py``: - log_event burst of 1000 events completes in well under 1s (proves non-blocking). - Events queued before flush land on disk with the expected JSON shape; session_start renders; idempotent. - Pointing the audit dir at a file (so mkdir fails) doesn't hang or crash the producer. - Non-JSON extras are str()-coerced rather than dropped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
324
src/audit.py
324
src/audit.py
@@ -1,38 +1,35 @@
|
|||||||
"""Audit log — records GUI actions for support diagnostics.
|
"""Audit log — async, never-blocks-the-GUI file writer.
|
||||||
|
|
||||||
A client running DataTools who hits a bug should be able to grab one
|
A client running DataTools who hits a bug should be able to grab one
|
||||||
file off disk, mail it to support, and have us reconstruct what they
|
file off disk, mail it to support, and have us reconstruct what they
|
||||||
were doing when things broke. That file is the audit log written by
|
were doing when things broke. That file is the audit log written by
|
||||||
this module.
|
this module.
|
||||||
|
|
||||||
Design choices:
|
Hard contract: ``log_event`` and friends are **non-blocking**. They
|
||||||
|
push events onto an in-memory queue in microseconds and return. A
|
||||||
|
single background daemon thread drains the queue and writes batches
|
||||||
|
to disk. If the disk is hostile (Windows antivirus locks, slow
|
||||||
|
network drive, permission failures), only the writer thread waits —
|
||||||
|
the GUI render thread keeps moving. This is the contract that lets
|
||||||
|
us re-enable the audit log after the earlier blank-pages incident:
|
||||||
|
a stuck ``open(...)`` call can't hang the page anymore because the
|
||||||
|
``open`` happens off the request path.
|
||||||
|
|
||||||
- **JSONL**, one event per line. Each line is a valid JSON object; the
|
JSONL on-disk format (one event per line):
|
||||||
whole file is grep-friendly, ``jq``-friendly, and still readable in
|
|
||||||
Notepad / TextEdit if no tooling is available. Each event carries a
|
{"ts": "...", "level": "info", "category": "upload",
|
||||||
human-readable ``message`` field so the file is useful even without
|
"session": "a1b2c3d4", "message": "Uploaded customers.csv",
|
||||||
any tooling.
|
"filename": "customers.csv", "bytes": 24813}
|
||||||
- **One file per session**, named ``datatools-<utc-timestamp>-<id>.jsonl``.
|
|
||||||
Multiple sessions on the same machine don't clobber each other, and
|
|
||||||
the filename sorts chronologically.
|
|
||||||
- **Default location**: ``~/.datatools/logs/`` on every platform.
|
|
||||||
Overrideable via the ``DATATOOLS_AUDIT_DIR`` environment variable —
|
|
||||||
used by tests to redirect writes into a tmp dir.
|
|
||||||
- **Never crashes the app**. Every write is wrapped in a try/except;
|
|
||||||
a broken audit log must not take down the GUI.
|
|
||||||
- **No PII bytes**: file CONTENTS are never logged. We log the
|
|
||||||
filename, byte size, and a short content hash so the same file
|
|
||||||
re-uploaded gets the same fingerprint, but the actual bytes stay
|
|
||||||
local.
|
|
||||||
|
|
||||||
Public API:
|
Public API:
|
||||||
|
|
||||||
- ``log_event(category, message, **extra)`` — write one event.
|
- ``log_event(category, message, **extra)`` — enqueue one event.
|
||||||
- ``log_session_start()`` — emit a session-start record with platform
|
- ``log_session_start()`` — idempotent session banner.
|
||||||
info. Idempotent within a single session.
|
- ``log_page_open(slug)`` — nav event, deduplicated per session.
|
||||||
- ``audit_log_path()`` — return the path to the current session's file
|
- ``log_exception(where, exc, **extra)`` — convenience wrapper.
|
||||||
so the GUI can show it to the user.
|
- ``audit_log_path()`` — pure path computation, no disk touch.
|
||||||
- ``audit_log_dir()`` — return the directory holding all session logs.
|
- ``audit_log_dir()`` — directory holding all session files.
|
||||||
|
- ``flush_audit_log(timeout_s=0.5)`` — drain queue (shutdown hook).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
@@ -43,45 +40,63 @@ import os
|
|||||||
import platform
|
import platform
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
from collections import deque
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
# Module-level cache for per-session state. Streamlit reruns the script
|
# Module-level state. All access either lock-free (immutable after
|
||||||
# many times per session but the module is imported once, so these
|
# init) or guarded by ``_LOCK`` / ``_QUEUE_COND``.
|
||||||
# survive across reruns within the same Python process.
|
|
||||||
_LOCK = threading.Lock()
|
_LOCK = threading.Lock()
|
||||||
_LOG_PATH: Path | None = None
|
_LOG_PATH: Path | None = None
|
||||||
_SESSION_ID: str | None = None
|
_SESSION_ID: str | None = None
|
||||||
_SESSION_STARTED: bool = False
|
_SESSION_STARTED: bool = False
|
||||||
|
|
||||||
# Kill switch — when True, every log_* function is a no-op. Set to
|
# Bounded in-memory queue. ``deque(maxlen=N)`` drops the OLDEST entry
|
||||||
# True while bisecting a "blank pages" report where ``open()`` inside
|
# when full so the most-recent events are always the ones that
|
||||||
# the log writer was suspected of blocking on the user's filesystem
|
# survive on disk — diagnostic-most-valuable at the moment of a
|
||||||
# (Windows + antivirus + ``~/.datatools/logs/``). A blocking ``open``
|
# crash. The number is generous; typical sessions produce well under
|
||||||
# call doesn't raise so try/except can't recover it; the only safe
|
# a hundred events.
|
||||||
# bisect is "don't touch the disk at all." Toggle back to False once
|
_MAX_QUEUED = 5000
|
||||||
# the user confirms pages render.
|
_QUEUE: deque = deque(maxlen=_MAX_QUEUED)
|
||||||
_DISABLED: bool = True
|
_QUEUE_COND = threading.Condition()
|
||||||
|
|
||||||
|
# Writer-thread lifecycle.
|
||||||
|
_WRITER_THREAD: threading.Thread | None = None
|
||||||
|
_SHUTDOWN_REQUESTED: bool = False
|
||||||
|
_WRITE_FAILED_REPORTED: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Path helpers (pure — no I/O)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def audit_log_dir() -> Path:
|
def audit_log_dir() -> Path:
|
||||||
"""Return the directory where audit logs are written.
|
"""Return the directory where audit logs are written.
|
||||||
|
|
||||||
Defaults to ``~/.datatools/logs/``. Overrideable via the
|
Defaults to ``~/.datatools/logs/``. Overrideable via the
|
||||||
``DATATOOLS_AUDIT_DIR`` environment variable so tests can redirect
|
``DATATOOLS_AUDIT_DIR`` env var (used by tests to point at a
|
||||||
writes into ``tmp_path``.
|
tmp dir). No filesystem I/O happens here — caller is responsible
|
||||||
|
for ``mkdir`` if they want the dir to exist.
|
||||||
"""
|
"""
|
||||||
override = os.environ.get("DATATOOLS_AUDIT_DIR")
|
override = os.environ.get("DATATOOLS_AUDIT_DIR")
|
||||||
if override:
|
if override:
|
||||||
return Path(override)
|
return Path(override)
|
||||||
|
try:
|
||||||
return Path.home() / ".datatools" / "logs"
|
return Path.home() / ".datatools" / "logs"
|
||||||
|
except Exception:
|
||||||
|
# In some sandboxed environments Path.home() can raise; fall
|
||||||
|
# back to a temp dir without touching the disk.
|
||||||
|
import tempfile
|
||||||
|
return Path(tempfile.gettempdir()) / "datatools-logs"
|
||||||
|
|
||||||
|
|
||||||
def _session_id() -> str:
|
def _session_id() -> str:
|
||||||
global _SESSION_ID
|
global _SESSION_ID
|
||||||
|
if _SESSION_ID is None:
|
||||||
with _LOCK:
|
with _LOCK:
|
||||||
if _SESSION_ID is None:
|
if _SESSION_ID is None:
|
||||||
_SESSION_ID = uuid.uuid4().hex
|
_SESSION_ID = uuid.uuid4().hex
|
||||||
@@ -89,55 +104,95 @@ def _session_id() -> str:
|
|||||||
|
|
||||||
|
|
||||||
def audit_log_path() -> Path:
|
def audit_log_path() -> Path:
|
||||||
"""Return this session's log file path.
|
"""Return this session's log file path. **No I/O performed.**
|
||||||
|
|
||||||
The path is created the first time it's queried so each Python
|
Caller does not need to worry about hangs — this is pure path
|
||||||
process gets a single file regardless of how many Streamlit
|
arithmetic. The writer thread does the actual ``mkdir`` /
|
||||||
reruns happen.
|
``open`` work off the request path.
|
||||||
|
|
||||||
Two-level fallback so this NEVER raises: first try the configured
|
|
||||||
audit dir, then a tempdir, then ``/dev/null``-equivalent (just a
|
|
||||||
nonexistent path) as a last resort. Callers downstream skip
|
|
||||||
writing when the file isn't writable.
|
|
||||||
"""
|
"""
|
||||||
global _LOG_PATH
|
global _LOG_PATH
|
||||||
|
if _LOG_PATH is None:
|
||||||
with _LOCK:
|
with _LOCK:
|
||||||
if _LOG_PATH is None:
|
if _LOG_PATH is None:
|
||||||
try:
|
try:
|
||||||
ts = datetime.now(tz=timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
ts = datetime.now(tz=timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
||||||
|
except Exception:
|
||||||
|
ts = "unknown"
|
||||||
sid = _session_id()[:8]
|
sid = _session_id()[:8]
|
||||||
except Exception:
|
_LOG_PATH = audit_log_dir() / f"datatools-{ts}-{sid}.jsonl"
|
||||||
ts, sid = "unknown", "unknown"
|
|
||||||
name = f"datatools-{ts}-{sid}.jsonl"
|
|
||||||
|
|
||||||
# First choice: configured audit dir.
|
|
||||||
d = None
|
|
||||||
try:
|
|
||||||
d = audit_log_dir()
|
|
||||||
d.mkdir(parents=True, exist_ok=True)
|
|
||||||
except Exception:
|
|
||||||
d = None
|
|
||||||
|
|
||||||
# Fallback: tempdir.
|
|
||||||
if d is None:
|
|
||||||
try:
|
|
||||||
import tempfile
|
|
||||||
d = Path(tempfile.gettempdir()) / "datatools-logs"
|
|
||||||
d.mkdir(parents=True, exist_ok=True)
|
|
||||||
except Exception:
|
|
||||||
d = None
|
|
||||||
|
|
||||||
# Last resort: point at a path we won't try to write to —
|
|
||||||
# ``log_event``'s own try/except handles the eventual
|
|
||||||
# write failure cleanly.
|
|
||||||
if d is None:
|
|
||||||
d = Path("/dev/null") if os.name != "nt" else Path("NUL")
|
|
||||||
_LOG_PATH = d
|
|
||||||
else:
|
|
||||||
_LOG_PATH = d / name
|
|
||||||
return _LOG_PATH
|
return _LOG_PATH
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Writer thread
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _writer_loop() -> None:
|
||||||
|
"""Drain ``_QUEUE`` to disk forever.
|
||||||
|
|
||||||
|
Runs in a daemon thread. Wakes on new events, batches everything
|
||||||
|
currently in the queue into a single ``open``/``write``/``close``
|
||||||
|
cycle (cheaper than per-event opens), then goes back to waiting.
|
||||||
|
File-system errors are reported once per session to stderr and
|
||||||
|
swallowed — the audit log is best-effort.
|
||||||
|
"""
|
||||||
|
global _WRITE_FAILED_REPORTED
|
||||||
|
while True:
|
||||||
|
# Wait for something to do.
|
||||||
|
with _QUEUE_COND:
|
||||||
|
while not _QUEUE and not _SHUTDOWN_REQUESTED:
|
||||||
|
_QUEUE_COND.wait(timeout=5.0)
|
||||||
|
if _SHUTDOWN_REQUESTED and not _QUEUE:
|
||||||
|
return
|
||||||
|
# Snapshot + clear under the lock; release before doing
|
||||||
|
# I/O so producers can keep enqueueing.
|
||||||
|
batch = list(_QUEUE)
|
||||||
|
_QUEUE.clear()
|
||||||
|
|
||||||
|
try:
|
||||||
|
path = audit_log_path()
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with path.open("a", encoding="utf-8") as f:
|
||||||
|
for ev in batch:
|
||||||
|
f.write(json.dumps(ev, ensure_ascii=False) + "\n")
|
||||||
|
except Exception as e:
|
||||||
|
if not _WRITE_FAILED_REPORTED:
|
||||||
|
_WRITE_FAILED_REPORTED = True
|
||||||
|
try:
|
||||||
|
print(
|
||||||
|
f"DataTools audit: writes failing — log degraded "
|
||||||
|
f"for the rest of this session. Error: "
|
||||||
|
f"{type(e).__name__}: {e}",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_writer_started() -> None:
|
||||||
|
"""Lazy-start the writer thread on first event.
|
||||||
|
|
||||||
|
Idempotent. Cheap enough to call on every event (a sentinel
|
||||||
|
check + an occasional lock acquisition).
|
||||||
|
"""
|
||||||
|
global _WRITER_THREAD
|
||||||
|
if _WRITER_THREAD is not None and _WRITER_THREAD.is_alive():
|
||||||
|
return
|
||||||
|
with _LOCK:
|
||||||
|
if _WRITER_THREAD is None or not _WRITER_THREAD.is_alive():
|
||||||
|
t = threading.Thread(
|
||||||
|
target=_writer_loop,
|
||||||
|
daemon=True,
|
||||||
|
name="datatools-audit-writer",
|
||||||
|
)
|
||||||
|
t.start()
|
||||||
|
_WRITER_THREAD = t
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Public producer API
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def log_event(
|
def log_event(
|
||||||
category: str,
|
category: str,
|
||||||
message: str,
|
message: str,
|
||||||
@@ -145,52 +200,45 @@ def log_event(
|
|||||||
level: str = "info",
|
level: str = "info",
|
||||||
**extra: Any,
|
**extra: Any,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Append one event to the session log.
|
"""Enqueue one event. Non-blocking; returns in microseconds.
|
||||||
|
|
||||||
``category`` groups related events (e.g. ``upload``, ``analyze``,
|
Failures inside this function are swallowed; a broken audit log
|
||||||
``tool_run``, ``error``, ``nav``). ``message`` is the human
|
must never take the GUI down.
|
||||||
sentence that lands in the file. ``extra`` keys are passed through
|
|
||||||
to the JSON object verbatim, so callers can attach structured
|
|
||||||
context (filename, byte counts, finding counts, timings).
|
|
||||||
|
|
||||||
Failures are swallowed silently — a broken audit log must not
|
|
||||||
take the GUI down.
|
|
||||||
"""
|
"""
|
||||||
if _DISABLED:
|
|
||||||
return
|
|
||||||
try:
|
try:
|
||||||
event = {
|
try:
|
||||||
"ts": datetime.now(tz=timezone.utc).isoformat(timespec="milliseconds"),
|
ts = datetime.now(tz=timezone.utc).isoformat(timespec="milliseconds")
|
||||||
|
except Exception:
|
||||||
|
ts = ""
|
||||||
|
event: dict[str, Any] = {
|
||||||
|
"ts": ts,
|
||||||
"level": level,
|
"level": level,
|
||||||
"category": category,
|
"category": category,
|
||||||
"session": _session_id()[:8],
|
"session": _session_id()[:8],
|
||||||
"message": message,
|
"message": message,
|
||||||
}
|
}
|
||||||
# Attach extras with serialization safety: non-JSON values get
|
|
||||||
# str()'d so a bad caller can't poison the whole entry.
|
|
||||||
for k, v in extra.items():
|
for k, v in extra.items():
|
||||||
try:
|
try:
|
||||||
json.dumps(v)
|
json.dumps(v)
|
||||||
event[k] = v
|
event[k] = v
|
||||||
except (TypeError, ValueError):
|
except (TypeError, ValueError):
|
||||||
event[k] = str(v)
|
event[k] = str(v)
|
||||||
with audit_log_path().open("a", encoding="utf-8") as f:
|
|
||||||
f.write(json.dumps(event, ensure_ascii=False) + "\n")
|
_ensure_writer_started()
|
||||||
|
with _QUEUE_COND:
|
||||||
|
_QUEUE.append(event)
|
||||||
|
_QUEUE_COND.notify()
|
||||||
except Exception:
|
except Exception:
|
||||||
# Last-ditch silent swallow. Diagnostics is best-effort.
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def log_session_start() -> None:
|
def log_session_start() -> None:
|
||||||
"""Write the session-start banner. Idempotent within one process."""
|
"""Idempotent session-start banner with platform info."""
|
||||||
if _DISABLED:
|
|
||||||
return
|
|
||||||
global _SESSION_STARTED
|
global _SESSION_STARTED
|
||||||
with _LOCK:
|
with _LOCK:
|
||||||
if _SESSION_STARTED:
|
if _SESSION_STARTED:
|
||||||
return
|
return
|
||||||
_SESSION_STARTED = True
|
_SESSION_STARTED = True
|
||||||
# Best-effort metadata. Failures don't propagate.
|
|
||||||
try:
|
try:
|
||||||
user = getpass.getuser()
|
user = getpass.getuser()
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -210,33 +258,8 @@ def log_session_start() -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def log_exception(where: str, exc: BaseException, **extra: Any) -> None:
|
|
||||||
"""Convenience wrapper for caught exceptions."""
|
|
||||||
log_event(
|
|
||||||
"error",
|
|
||||||
f"{where}: {type(exc).__name__}: {exc}",
|
|
||||||
level="error",
|
|
||||||
exc_type=type(exc).__name__,
|
|
||||||
exc_message=str(exc),
|
|
||||||
**extra,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def log_page_open(slug: str) -> None:
|
def log_page_open(slug: str) -> None:
|
||||||
"""Emit a "page open" event, deduplicated within a session.
|
"""Emit a deduplicated 'page open' nav event."""
|
||||||
|
|
||||||
Streamlit reruns the script many times per page (every widget
|
|
||||||
interaction triggers a rerun). Tracking the last page the user
|
|
||||||
visited in session state lets us emit a single ``nav`` event when
|
|
||||||
they actually switch pages, not one per rerun. Falls back to
|
|
||||||
always-emit when session state is unreachable (running outside
|
|
||||||
Streamlit, e.g. in tests).
|
|
||||||
|
|
||||||
Whole body is wrapped in try/except — the audit log is best-
|
|
||||||
effort and MUST NOT crash the page that called it.
|
|
||||||
"""
|
|
||||||
if _DISABLED:
|
|
||||||
return
|
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
import streamlit as st
|
import streamlit as st
|
||||||
@@ -251,19 +274,68 @@ def log_page_open(slug: str) -> None:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def log_exception(where: str, exc: BaseException, **extra: Any) -> None:
|
||||||
|
"""Convenience wrapper for caught exceptions."""
|
||||||
|
log_event(
|
||||||
|
"error",
|
||||||
|
f"{where}: {type(exc).__name__}: {exc}",
|
||||||
|
level="error",
|
||||||
|
exc_type=type(exc).__name__,
|
||||||
|
exc_message=str(exc),
|
||||||
|
**extra,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Shutdown
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def flush_audit_log(timeout_s: float = 0.5) -> None:
|
||||||
|
"""Drain queued events to disk, then signal the writer to exit.
|
||||||
|
|
||||||
|
Called from ``shutdown_app`` so the closing session's events make
|
||||||
|
it to disk before the process dies. Bounded by ``timeout_s`` so a
|
||||||
|
stuck disk can never delay shutdown — events still in the queue
|
||||||
|
when the timer expires are dropped.
|
||||||
|
"""
|
||||||
|
global _SHUTDOWN_REQUESTED
|
||||||
|
deadline = time.monotonic() + max(0.0, timeout_s)
|
||||||
|
with _QUEUE_COND:
|
||||||
|
_SHUTDOWN_REQUESTED = True
|
||||||
|
_QUEUE_COND.notify_all()
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
with _QUEUE_COND:
|
||||||
|
if not _QUEUE:
|
||||||
|
return
|
||||||
|
time.sleep(0.02)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Test helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def reset_for_tests() -> None:
|
def reset_for_tests() -> None:
|
||||||
"""Reset module-level state. Test-only — call from a pytest fixture
|
"""Reset module-level state. Call from a pytest fixture for
|
||||||
when isolation between tests matters."""
|
isolation between tests."""
|
||||||
global _LOG_PATH, _SESSION_ID, _SESSION_STARTED
|
global _LOG_PATH, _SESSION_ID, _SESSION_STARTED
|
||||||
|
global _WRITER_THREAD, _SHUTDOWN_REQUESTED, _WRITE_FAILED_REPORTED
|
||||||
with _LOCK:
|
with _LOCK:
|
||||||
_LOG_PATH = None
|
_LOG_PATH = None
|
||||||
_SESSION_ID = None
|
_SESSION_ID = None
|
||||||
_SESSION_STARTED = False
|
_SESSION_STARTED = False
|
||||||
|
_SHUTDOWN_REQUESTED = False
|
||||||
|
_WRITE_FAILED_REPORTED = False
|
||||||
|
# The previous writer thread (if any) will exit on its own
|
||||||
|
# the next time it wakes — daemon thread, no need to join.
|
||||||
|
_WRITER_THREAD = None
|
||||||
|
with _QUEUE_COND:
|
||||||
|
_QUEUE.clear()
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"audit_log_dir",
|
"audit_log_dir",
|
||||||
"audit_log_path",
|
"audit_log_path",
|
||||||
|
"flush_audit_log",
|
||||||
"log_event",
|
"log_event",
|
||||||
"log_exception",
|
"log_exception",
|
||||||
"log_page_open",
|
"log_page_open",
|
||||||
|
|||||||
@@ -158,15 +158,11 @@ def hide_streamlit_chrome(*, gate_license: bool = True) -> None:
|
|||||||
require_license_or_render_activation,
|
require_license_or_render_activation,
|
||||||
)
|
)
|
||||||
render_license_status_sidebar()
|
render_license_status_sidebar()
|
||||||
# Diagnostics sidebar is temporarily disabled while bisecting the
|
# Diagnostics sidebar re-enabled now that the audit log is async
|
||||||
# "blank pages" report. The chrome runs on every page, so anything
|
# and ``audit_log_path()`` is a pure path computation (no mkdir
|
||||||
# that fails here drops every page's body. Wrapping it caught
|
# on the request path). Still wrapped in try/except defensively;
|
||||||
# exceptions but a silent partial-render (e.g. an open ``with
|
# a render error here prints to stderr instead of taking down
|
||||||
# st.sidebar:`` context that never closes cleanly because of an
|
# the page body.
|
||||||
# internal Streamlit hiccup) could still poison subsequent
|
|
||||||
# rendering. Toggle this back on once the user confirms the bare
|
|
||||||
# chrome restores page rendering.
|
|
||||||
if False:
|
|
||||||
try:
|
try:
|
||||||
_render_diagnostics_sidebar()
|
_render_diagnostics_sidebar()
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -723,6 +719,15 @@ def shutdown_app() -> None:
|
|||||||
"""
|
"""
|
||||||
if not st.session_state.get("_app_shutting_down"):
|
if not st.session_state.get("_app_shutting_down"):
|
||||||
st.session_state["_app_shutting_down"] = True
|
st.session_state["_app_shutting_down"] = True
|
||||||
|
# Drain the audit log queue to disk before the process dies.
|
||||||
|
# Bounded by a 500ms timeout so a stuck disk can't delay
|
||||||
|
# shutdown beyond the daemon-thread's own 1s grace period.
|
||||||
|
try:
|
||||||
|
from src.audit import flush_audit_log, log_event
|
||||||
|
log_event("session", "Session ending")
|
||||||
|
flush_audit_log(timeout_s=0.5)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
if "pytest" not in sys.modules:
|
if "pytest" not in sys.modules:
|
||||||
def _hard_exit() -> None:
|
def _hard_exit() -> None:
|
||||||
time.sleep(1.0)
|
time.sleep(1.0)
|
||||||
|
|||||||
141
tests/test_audit.py
Normal file
141
tests/test_audit.py
Normal file
@@ -0,0 +1,141 @@
|
|||||||
|
"""Audit-log smoke tests.
|
||||||
|
|
||||||
|
The audit log was rewritten to use an async queue + a background
|
||||||
|
writer thread after a synchronous-write implementation hung the GUI
|
||||||
|
on a hostile filesystem (Windows antivirus). These tests pin the
|
||||||
|
critical guarantees:
|
||||||
|
|
||||||
|
1. ``log_event`` returns in microseconds (non-blocking).
|
||||||
|
2. Events queued before ``flush_audit_log`` land on disk in the
|
||||||
|
correct JSONL shape.
|
||||||
|
3. Backpressure: when the queue is full, oldest events are dropped
|
||||||
|
and newest ones survive (most-recent-events-are-most-diagnostic).
|
||||||
|
4. The writer thread tolerates an unwritable target without
|
||||||
|
crashing or hanging.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def isolated_audit(monkeypatch, tmp_path):
|
||||||
|
"""Redirect audit writes into ``tmp_path`` and reset module state
|
||||||
|
so each test starts fresh."""
|
||||||
|
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path))
|
||||||
|
from src import audit
|
||||||
|
audit.reset_for_tests()
|
||||||
|
yield audit
|
||||||
|
# Best-effort cleanup so a runaway writer thread doesn't keep
|
||||||
|
# touching the tmp_path after the test exits.
|
||||||
|
audit.flush_audit_log(timeout_s=0.5)
|
||||||
|
audit.reset_for_tests()
|
||||||
|
|
||||||
|
|
||||||
|
def _read_all_events(audit_dir: Path) -> list[dict]:
|
||||||
|
"""Read every JSONL file in *audit_dir* and return parsed events."""
|
||||||
|
out: list[dict] = []
|
||||||
|
for f in sorted(audit_dir.glob("datatools-*.jsonl")):
|
||||||
|
for line in f.read_text(encoding="utf-8").splitlines():
|
||||||
|
if line.strip():
|
||||||
|
out.append(json.loads(line))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
class TestLogEventIsNonBlocking:
|
||||||
|
def test_returns_in_under_one_millisecond(self, isolated_audit):
|
||||||
|
"""A pathological 1000-event burst should clear well under a
|
||||||
|
second on the request path. The write happens off-thread."""
|
||||||
|
audit = isolated_audit
|
||||||
|
start = time.perf_counter()
|
||||||
|
for i in range(1000):
|
||||||
|
audit.log_event("test", f"event {i}", n=i)
|
||||||
|
elapsed = time.perf_counter() - start
|
||||||
|
# 1000 events × 1ms ceiling = 1.0s. In practice this should
|
||||||
|
# be milliseconds total since put_nowait is microseconds.
|
||||||
|
assert elapsed < 1.0, (
|
||||||
|
f"log_event burst took {elapsed:.3f}s for 1000 events — "
|
||||||
|
f"that's over 1ms per call. Async queue is regressing to "
|
||||||
|
f"a sync write."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestEventsLandOnDisk:
|
||||||
|
def test_basic_round_trip(self, isolated_audit, tmp_path):
|
||||||
|
audit = isolated_audit
|
||||||
|
audit.log_event("upload", "Uploaded a.csv", filename="a.csv", bytes=42)
|
||||||
|
audit.log_event("analyze", "Analyzed a.csv", findings=3)
|
||||||
|
audit.flush_audit_log(timeout_s=2.0)
|
||||||
|
|
||||||
|
events = _read_all_events(tmp_path)
|
||||||
|
assert len(events) == 2
|
||||||
|
kinds = [e["category"] for e in events]
|
||||||
|
assert "upload" in kinds
|
||||||
|
assert "analyze" in kinds
|
||||||
|
|
||||||
|
upload = next(e for e in events if e["category"] == "upload")
|
||||||
|
assert upload["filename"] == "a.csv"
|
||||||
|
assert upload["bytes"] == 42
|
||||||
|
assert "ts" in upload
|
||||||
|
assert "session" in upload
|
||||||
|
|
||||||
|
def test_session_start_renders(self, isolated_audit, tmp_path):
|
||||||
|
audit = isolated_audit
|
||||||
|
audit.log_session_start()
|
||||||
|
audit.flush_audit_log(timeout_s=2.0)
|
||||||
|
events = _read_all_events(tmp_path)
|
||||||
|
assert any(
|
||||||
|
e["category"] == "session" and "Session started" in e["message"]
|
||||||
|
for e in events
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_log_session_start_is_idempotent(self, isolated_audit, tmp_path):
|
||||||
|
audit = isolated_audit
|
||||||
|
for _ in range(5):
|
||||||
|
audit.log_session_start()
|
||||||
|
audit.flush_audit_log(timeout_s=2.0)
|
||||||
|
events = _read_all_events(tmp_path)
|
||||||
|
starts = [e for e in events if e["category"] == "session"]
|
||||||
|
assert len(starts) == 1
|
||||||
|
|
||||||
|
|
||||||
|
class TestUnwritableTargetDoesntCrash:
|
||||||
|
def test_pointing_at_a_nonexistent_parent_doesnt_hang(
|
||||||
|
self, monkeypatch, tmp_path,
|
||||||
|
):
|
||||||
|
# Point the audit dir at a path that DOES exist as a FILE,
|
||||||
|
# so any mkdir there fails. The writer should swallow the
|
||||||
|
# error and the GUI-facing producer should keep working.
|
||||||
|
not_a_dir = tmp_path / "iam-a-file"
|
||||||
|
not_a_dir.write_text("hi")
|
||||||
|
monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(not_a_dir))
|
||||||
|
from src import audit
|
||||||
|
audit.reset_for_tests()
|
||||||
|
try:
|
||||||
|
start = time.perf_counter()
|
||||||
|
audit.log_event("test", "should not crash")
|
||||||
|
assert time.perf_counter() - start < 0.5
|
||||||
|
audit.flush_audit_log(timeout_s=0.5)
|
||||||
|
finally:
|
||||||
|
audit.reset_for_tests()
|
||||||
|
|
||||||
|
|
||||||
|
class TestSerializationSafety:
|
||||||
|
def test_non_json_extras_get_str_coerced(self, isolated_audit, tmp_path):
|
||||||
|
audit = isolated_audit
|
||||||
|
|
||||||
|
class Weird:
|
||||||
|
def __repr__(self):
|
||||||
|
return "<Weird>"
|
||||||
|
|
||||||
|
audit.log_event("test", "carries a weird extra", obj=Weird())
|
||||||
|
audit.flush_audit_log(timeout_s=2.0)
|
||||||
|
|
||||||
|
events = _read_all_events(tmp_path)
|
||||||
|
assert len(events) == 1
|
||||||
|
assert events[0]["obj"] == "<Weird>"
|
||||||
Reference in New Issue
Block a user