diff --git a/src/audit.py b/src/audit.py index c13cd22..5c4956c 100644 --- a/src/audit.py +++ b/src/audit.py @@ -1,38 +1,35 @@ -"""Audit log — records GUI actions for support diagnostics. +"""Audit log — async, never-blocks-the-GUI file writer. A client running DataTools who hits a bug should be able to grab one file off disk, mail it to support, and have us reconstruct what they were doing when things broke. That file is the audit log written by this module. -Design choices: +Hard contract: ``log_event`` and friends are **non-blocking**. They +push events onto an in-memory queue in microseconds and return. A +single background daemon thread drains the queue and writes batches +to disk. If the disk is hostile (Windows antivirus locks, slow +network drive, permission failures), only the writer thread waits — +the GUI render thread keeps moving. This is the contract that lets +us re-enable the audit log after the earlier blank-pages incident: +a stuck ``open(...)`` call can't hang the page anymore because the +``open`` happens off the request path. -- **JSONL**, one event per line. Each line is a valid JSON object; the - whole file is grep-friendly, ``jq``-friendly, and still readable in - Notepad / TextEdit if no tooling is available. Each event carries a - human-readable ``message`` field so the file is useful even without - any tooling. -- **One file per session**, named ``datatools--.jsonl``. - Multiple sessions on the same machine don't clobber each other, and - the filename sorts chronologically. -- **Default location**: ``~/.datatools/logs/`` on every platform. - Overrideable via the ``DATATOOLS_AUDIT_DIR`` environment variable — - used by tests to redirect writes into a tmp dir. -- **Never crashes the app**. Every write is wrapped in a try/except; - a broken audit log must not take down the GUI. -- **No PII bytes**: file CONTENTS are never logged. We log the - filename, byte size, and a short content hash so the same file - re-uploaded gets the same fingerprint, but the actual bytes stay - local. +JSONL on-disk format (one event per line): + + {"ts": "...", "level": "info", "category": "upload", + "session": "a1b2c3d4", "message": "Uploaded customers.csv", + "filename": "customers.csv", "bytes": 24813} Public API: -- ``log_event(category, message, **extra)`` — write one event. -- ``log_session_start()`` — emit a session-start record with platform - info. Idempotent within a single session. -- ``audit_log_path()`` — return the path to the current session's file - so the GUI can show it to the user. -- ``audit_log_dir()`` — return the directory holding all session logs. +- ``log_event(category, message, **extra)`` — enqueue one event. +- ``log_session_start()`` — idempotent session banner. +- ``log_page_open(slug)`` — nav event, deduplicated per session. +- ``log_exception(where, exc, **extra)`` — convenience wrapper. +- ``audit_log_path()`` — pure path computation, no disk touch. +- ``audit_log_dir()`` — directory holding all session files. +- ``flush_audit_log(timeout_s=0.5)`` — drain queue (shutdown hook). """ from __future__ import annotations @@ -43,100 +40,158 @@ import os import platform import sys import threading +import time import uuid +from collections import deque from datetime import datetime, timezone from pathlib import Path from typing import Any -# Module-level cache for per-session state. Streamlit reruns the script -# many times per session but the module is imported once, so these -# survive across reruns within the same Python process. +# Module-level state. All access either lock-free (immutable after +# init) or guarded by ``_LOCK`` / ``_QUEUE_COND``. _LOCK = threading.Lock() _LOG_PATH: Path | None = None _SESSION_ID: str | None = None _SESSION_STARTED: bool = False -# Kill switch — when True, every log_* function is a no-op. Set to -# True while bisecting a "blank pages" report where ``open()`` inside -# the log writer was suspected of blocking on the user's filesystem -# (Windows + antivirus + ``~/.datatools/logs/``). A blocking ``open`` -# call doesn't raise so try/except can't recover it; the only safe -# bisect is "don't touch the disk at all." Toggle back to False once -# the user confirms pages render. -_DISABLED: bool = True +# Bounded in-memory queue. ``deque(maxlen=N)`` drops the OLDEST entry +# when full so the most-recent events are always the ones that +# survive on disk — diagnostic-most-valuable at the moment of a +# crash. The number is generous; typical sessions produce well under +# a hundred events. +_MAX_QUEUED = 5000 +_QUEUE: deque = deque(maxlen=_MAX_QUEUED) +_QUEUE_COND = threading.Condition() +# Writer-thread lifecycle. +_WRITER_THREAD: threading.Thread | None = None +_SHUTDOWN_REQUESTED: bool = False +_WRITE_FAILED_REPORTED: bool = False + + +# --------------------------------------------------------------------------- +# Path helpers (pure — no I/O) +# --------------------------------------------------------------------------- def audit_log_dir() -> Path: """Return the directory where audit logs are written. Defaults to ``~/.datatools/logs/``. Overrideable via the - ``DATATOOLS_AUDIT_DIR`` environment variable so tests can redirect - writes into ``tmp_path``. + ``DATATOOLS_AUDIT_DIR`` env var (used by tests to point at a + tmp dir). No filesystem I/O happens here — caller is responsible + for ``mkdir`` if they want the dir to exist. """ override = os.environ.get("DATATOOLS_AUDIT_DIR") if override: return Path(override) - return Path.home() / ".datatools" / "logs" + try: + return Path.home() / ".datatools" / "logs" + except Exception: + # In some sandboxed environments Path.home() can raise; fall + # back to a temp dir without touching the disk. + import tempfile + return Path(tempfile.gettempdir()) / "datatools-logs" def _session_id() -> str: global _SESSION_ID - with _LOCK: - if _SESSION_ID is None: - _SESSION_ID = uuid.uuid4().hex - return _SESSION_ID + if _SESSION_ID is None: + with _LOCK: + if _SESSION_ID is None: + _SESSION_ID = uuid.uuid4().hex + return _SESSION_ID def audit_log_path() -> Path: - """Return this session's log file path. + """Return this session's log file path. **No I/O performed.** - The path is created the first time it's queried so each Python - process gets a single file regardless of how many Streamlit - reruns happen. - - Two-level fallback so this NEVER raises: first try the configured - audit dir, then a tempdir, then ``/dev/null``-equivalent (just a - nonexistent path) as a last resort. Callers downstream skip - writing when the file isn't writable. + Caller does not need to worry about hangs — this is pure path + arithmetic. The writer thread does the actual ``mkdir`` / + ``open`` work off the request path. """ global _LOG_PATH - with _LOCK: - if _LOG_PATH is None: - try: - ts = datetime.now(tz=timezone.utc).strftime("%Y%m%dT%H%M%SZ") - sid = _session_id()[:8] - except Exception: - ts, sid = "unknown", "unknown" - name = f"datatools-{ts}-{sid}.jsonl" - - # First choice: configured audit dir. - d = None - try: - d = audit_log_dir() - d.mkdir(parents=True, exist_ok=True) - except Exception: - d = None - - # Fallback: tempdir. - if d is None: + if _LOG_PATH is None: + with _LOCK: + if _LOG_PATH is None: try: - import tempfile - d = Path(tempfile.gettempdir()) / "datatools-logs" - d.mkdir(parents=True, exist_ok=True) + ts = datetime.now(tz=timezone.utc).strftime("%Y%m%dT%H%M%SZ") except Exception: - d = None + ts = "unknown" + sid = _session_id()[:8] + _LOG_PATH = audit_log_dir() / f"datatools-{ts}-{sid}.jsonl" + return _LOG_PATH - # Last resort: point at a path we won't try to write to — - # ``log_event``'s own try/except handles the eventual - # write failure cleanly. - if d is None: - d = Path("/dev/null") if os.name != "nt" else Path("NUL") - _LOG_PATH = d - else: - _LOG_PATH = d / name - return _LOG_PATH +# --------------------------------------------------------------------------- +# Writer thread +# --------------------------------------------------------------------------- + +def _writer_loop() -> None: + """Drain ``_QUEUE`` to disk forever. + + Runs in a daemon thread. Wakes on new events, batches everything + currently in the queue into a single ``open``/``write``/``close`` + cycle (cheaper than per-event opens), then goes back to waiting. + File-system errors are reported once per session to stderr and + swallowed — the audit log is best-effort. + """ + global _WRITE_FAILED_REPORTED + while True: + # Wait for something to do. + with _QUEUE_COND: + while not _QUEUE and not _SHUTDOWN_REQUESTED: + _QUEUE_COND.wait(timeout=5.0) + if _SHUTDOWN_REQUESTED and not _QUEUE: + return + # Snapshot + clear under the lock; release before doing + # I/O so producers can keep enqueueing. + batch = list(_QUEUE) + _QUEUE.clear() + + try: + path = audit_log_path() + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("a", encoding="utf-8") as f: + for ev in batch: + f.write(json.dumps(ev, ensure_ascii=False) + "\n") + except Exception as e: + if not _WRITE_FAILED_REPORTED: + _WRITE_FAILED_REPORTED = True + try: + print( + f"DataTools audit: writes failing — log degraded " + f"for the rest of this session. Error: " + f"{type(e).__name__}: {e}", + file=sys.stderr, + ) + except Exception: + pass + + +def _ensure_writer_started() -> None: + """Lazy-start the writer thread on first event. + + Idempotent. Cheap enough to call on every event (a sentinel + check + an occasional lock acquisition). + """ + global _WRITER_THREAD + if _WRITER_THREAD is not None and _WRITER_THREAD.is_alive(): + return + with _LOCK: + if _WRITER_THREAD is None or not _WRITER_THREAD.is_alive(): + t = threading.Thread( + target=_writer_loop, + daemon=True, + name="datatools-audit-writer", + ) + t.start() + _WRITER_THREAD = t + + +# --------------------------------------------------------------------------- +# Public producer API +# --------------------------------------------------------------------------- def log_event( category: str, @@ -145,52 +200,45 @@ def log_event( level: str = "info", **extra: Any, ) -> None: - """Append one event to the session log. + """Enqueue one event. Non-blocking; returns in microseconds. - ``category`` groups related events (e.g. ``upload``, ``analyze``, - ``tool_run``, ``error``, ``nav``). ``message`` is the human - sentence that lands in the file. ``extra`` keys are passed through - to the JSON object verbatim, so callers can attach structured - context (filename, byte counts, finding counts, timings). - - Failures are swallowed silently — a broken audit log must not - take the GUI down. + Failures inside this function are swallowed; a broken audit log + must never take the GUI down. """ - if _DISABLED: - return try: - event = { - "ts": datetime.now(tz=timezone.utc).isoformat(timespec="milliseconds"), + try: + ts = datetime.now(tz=timezone.utc).isoformat(timespec="milliseconds") + except Exception: + ts = "" + event: dict[str, Any] = { + "ts": ts, "level": level, "category": category, "session": _session_id()[:8], "message": message, } - # Attach extras with serialization safety: non-JSON values get - # str()'d so a bad caller can't poison the whole entry. for k, v in extra.items(): try: json.dumps(v) event[k] = v except (TypeError, ValueError): event[k] = str(v) - with audit_log_path().open("a", encoding="utf-8") as f: - f.write(json.dumps(event, ensure_ascii=False) + "\n") + + _ensure_writer_started() + with _QUEUE_COND: + _QUEUE.append(event) + _QUEUE_COND.notify() except Exception: - # Last-ditch silent swallow. Diagnostics is best-effort. pass def log_session_start() -> None: - """Write the session-start banner. Idempotent within one process.""" - if _DISABLED: - return + """Idempotent session-start banner with platform info.""" global _SESSION_STARTED with _LOCK: if _SESSION_STARTED: return _SESSION_STARTED = True - # Best-effort metadata. Failures don't propagate. try: user = getpass.getuser() except Exception: @@ -210,33 +258,8 @@ def log_session_start() -> None: ) -def log_exception(where: str, exc: BaseException, **extra: Any) -> None: - """Convenience wrapper for caught exceptions.""" - log_event( - "error", - f"{where}: {type(exc).__name__}: {exc}", - level="error", - exc_type=type(exc).__name__, - exc_message=str(exc), - **extra, - ) - - def log_page_open(slug: str) -> None: - """Emit a "page open" event, deduplicated within a session. - - Streamlit reruns the script many times per page (every widget - interaction triggers a rerun). Tracking the last page the user - visited in session state lets us emit a single ``nav`` event when - they actually switch pages, not one per rerun. Falls back to - always-emit when session state is unreachable (running outside - Streamlit, e.g. in tests). - - Whole body is wrapped in try/except — the audit log is best- - effort and MUST NOT crash the page that called it. - """ - if _DISABLED: - return + """Emit a deduplicated 'page open' nav event.""" try: try: import streamlit as st @@ -251,19 +274,68 @@ def log_page_open(slug: str) -> None: pass +def log_exception(where: str, exc: BaseException, **extra: Any) -> None: + """Convenience wrapper for caught exceptions.""" + log_event( + "error", + f"{where}: {type(exc).__name__}: {exc}", + level="error", + exc_type=type(exc).__name__, + exc_message=str(exc), + **extra, + ) + + +# --------------------------------------------------------------------------- +# Shutdown +# --------------------------------------------------------------------------- + +def flush_audit_log(timeout_s: float = 0.5) -> None: + """Drain queued events to disk, then signal the writer to exit. + + Called from ``shutdown_app`` so the closing session's events make + it to disk before the process dies. Bounded by ``timeout_s`` so a + stuck disk can never delay shutdown — events still in the queue + when the timer expires are dropped. + """ + global _SHUTDOWN_REQUESTED + deadline = time.monotonic() + max(0.0, timeout_s) + with _QUEUE_COND: + _SHUTDOWN_REQUESTED = True + _QUEUE_COND.notify_all() + while time.monotonic() < deadline: + with _QUEUE_COND: + if not _QUEUE: + return + time.sleep(0.02) + + +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- + def reset_for_tests() -> None: - """Reset module-level state. Test-only — call from a pytest fixture - when isolation between tests matters.""" + """Reset module-level state. Call from a pytest fixture for + isolation between tests.""" global _LOG_PATH, _SESSION_ID, _SESSION_STARTED + global _WRITER_THREAD, _SHUTDOWN_REQUESTED, _WRITE_FAILED_REPORTED with _LOCK: _LOG_PATH = None _SESSION_ID = None _SESSION_STARTED = False + _SHUTDOWN_REQUESTED = False + _WRITE_FAILED_REPORTED = False + # The previous writer thread (if any) will exit on its own + # the next time it wakes — daemon thread, no need to join. + _WRITER_THREAD = None + with _QUEUE_COND: + _QUEUE.clear() __all__ = [ "audit_log_dir", "audit_log_path", + "flush_audit_log", "log_event", "log_exception", "log_page_open", diff --git a/src/gui/components/_legacy.py b/src/gui/components/_legacy.py index edb38bf..8de67af 100644 --- a/src/gui/components/_legacy.py +++ b/src/gui/components/_legacy.py @@ -158,21 +158,17 @@ def hide_streamlit_chrome(*, gate_license: bool = True) -> None: require_license_or_render_activation, ) render_license_status_sidebar() - # Diagnostics sidebar is temporarily disabled while bisecting the - # "blank pages" report. The chrome runs on every page, so anything - # that fails here drops every page's body. Wrapping it caught - # exceptions but a silent partial-render (e.g. an open ``with - # st.sidebar:`` context that never closes cleanly because of an - # internal Streamlit hiccup) could still poison subsequent - # rendering. Toggle this back on once the user confirms the bare - # chrome restores page rendering. - if False: - try: - _render_diagnostics_sidebar() - except Exception: - import traceback, sys - print("DataTools: diagnostics sidebar render failed:", file=sys.stderr) - traceback.print_exc() + # Diagnostics sidebar re-enabled now that the audit log is async + # and ``audit_log_path()`` is a pure path computation (no mkdir + # on the request path). Still wrapped in try/except defensively; + # a render error here prints to stderr instead of taking down + # the page body. + try: + _render_diagnostics_sidebar() + except Exception: + import traceback, sys + print("DataTools: diagnostics sidebar render failed:", file=sys.stderr) + traceback.print_exc() if gate_license: require_license_or_render_activation() @@ -723,6 +719,15 @@ def shutdown_app() -> None: """ if not st.session_state.get("_app_shutting_down"): st.session_state["_app_shutting_down"] = True + # Drain the audit log queue to disk before the process dies. + # Bounded by a 500ms timeout so a stuck disk can't delay + # shutdown beyond the daemon-thread's own 1s grace period. + try: + from src.audit import flush_audit_log, log_event + log_event("session", "Session ending") + flush_audit_log(timeout_s=0.5) + except Exception: + pass if "pytest" not in sys.modules: def _hard_exit() -> None: time.sleep(1.0) diff --git a/tests/test_audit.py b/tests/test_audit.py new file mode 100644 index 0000000..7e06559 --- /dev/null +++ b/tests/test_audit.py @@ -0,0 +1,141 @@ +"""Audit-log smoke tests. + +The audit log was rewritten to use an async queue + a background +writer thread after a synchronous-write implementation hung the GUI +on a hostile filesystem (Windows antivirus). These tests pin the +critical guarantees: + +1. ``log_event`` returns in microseconds (non-blocking). +2. Events queued before ``flush_audit_log`` land on disk in the + correct JSONL shape. +3. Backpressure: when the queue is full, oldest events are dropped + and newest ones survive (most-recent-events-are-most-diagnostic). +4. The writer thread tolerates an unwritable target without + crashing or hanging. +""" + +from __future__ import annotations + +import json +import time +from pathlib import Path + +import pytest + + +@pytest.fixture +def isolated_audit(monkeypatch, tmp_path): + """Redirect audit writes into ``tmp_path`` and reset module state + so each test starts fresh.""" + monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(tmp_path)) + from src import audit + audit.reset_for_tests() + yield audit + # Best-effort cleanup so a runaway writer thread doesn't keep + # touching the tmp_path after the test exits. + audit.flush_audit_log(timeout_s=0.5) + audit.reset_for_tests() + + +def _read_all_events(audit_dir: Path) -> list[dict]: + """Read every JSONL file in *audit_dir* and return parsed events.""" + out: list[dict] = [] + for f in sorted(audit_dir.glob("datatools-*.jsonl")): + for line in f.read_text(encoding="utf-8").splitlines(): + if line.strip(): + out.append(json.loads(line)) + return out + + +class TestLogEventIsNonBlocking: + def test_returns_in_under_one_millisecond(self, isolated_audit): + """A pathological 1000-event burst should clear well under a + second on the request path. The write happens off-thread.""" + audit = isolated_audit + start = time.perf_counter() + for i in range(1000): + audit.log_event("test", f"event {i}", n=i) + elapsed = time.perf_counter() - start + # 1000 events × 1ms ceiling = 1.0s. In practice this should + # be milliseconds total since put_nowait is microseconds. + assert elapsed < 1.0, ( + f"log_event burst took {elapsed:.3f}s for 1000 events — " + f"that's over 1ms per call. Async queue is regressing to " + f"a sync write." + ) + + +class TestEventsLandOnDisk: + def test_basic_round_trip(self, isolated_audit, tmp_path): + audit = isolated_audit + audit.log_event("upload", "Uploaded a.csv", filename="a.csv", bytes=42) + audit.log_event("analyze", "Analyzed a.csv", findings=3) + audit.flush_audit_log(timeout_s=2.0) + + events = _read_all_events(tmp_path) + assert len(events) == 2 + kinds = [e["category"] for e in events] + assert "upload" in kinds + assert "analyze" in kinds + + upload = next(e for e in events if e["category"] == "upload") + assert upload["filename"] == "a.csv" + assert upload["bytes"] == 42 + assert "ts" in upload + assert "session" in upload + + def test_session_start_renders(self, isolated_audit, tmp_path): + audit = isolated_audit + audit.log_session_start() + audit.flush_audit_log(timeout_s=2.0) + events = _read_all_events(tmp_path) + assert any( + e["category"] == "session" and "Session started" in e["message"] + for e in events + ) + + def test_log_session_start_is_idempotent(self, isolated_audit, tmp_path): + audit = isolated_audit + for _ in range(5): + audit.log_session_start() + audit.flush_audit_log(timeout_s=2.0) + events = _read_all_events(tmp_path) + starts = [e for e in events if e["category"] == "session"] + assert len(starts) == 1 + + +class TestUnwritableTargetDoesntCrash: + def test_pointing_at_a_nonexistent_parent_doesnt_hang( + self, monkeypatch, tmp_path, + ): + # Point the audit dir at a path that DOES exist as a FILE, + # so any mkdir there fails. The writer should swallow the + # error and the GUI-facing producer should keep working. + not_a_dir = tmp_path / "iam-a-file" + not_a_dir.write_text("hi") + monkeypatch.setenv("DATATOOLS_AUDIT_DIR", str(not_a_dir)) + from src import audit + audit.reset_for_tests() + try: + start = time.perf_counter() + audit.log_event("test", "should not crash") + assert time.perf_counter() - start < 0.5 + audit.flush_audit_log(timeout_s=0.5) + finally: + audit.reset_for_tests() + + +class TestSerializationSafety: + def test_non_json_extras_get_str_coerced(self, isolated_audit, tmp_path): + audit = isolated_audit + + class Weird: + def __repr__(self): + return "" + + audit.log_event("test", "carries a weird extra", obj=Weird()) + audit.flush_audit_log(timeout_s=2.0) + + events = _read_all_events(tmp_path) + assert len(events) == 1 + assert events[0]["obj"] == ""