Files
datatools-dev/src/audit.py
Michael b3ae913bb9 feat(audit): daily filename + 7-day retention sweep
Replaces the per-session ``datatools-<ts>-<sid>.jsonl`` filename
with a single daily file ``datatools-YYYY-MM-DD.jsonl`` (local
date). Sessions on the same calendar day share a file via the
writer thread's per-batch open+append; multiple DataTools
instances running concurrently on the same day fan into the same
file (append-mode small writes are atomic on POSIX, safe-enough on
Windows under realistic load).

Drops the ``_LOG_PATH`` module global and the lock around it —
``audit_log_path()`` is now pure date math, recomputed on every
call so a session that crosses midnight follows the rollover into
the next day's file.

Adds ``_sweep_old_logs()`` invoked once per process at writer-
thread start. Deletes any ``datatools-*.jsonl`` whose mtime is
older than 7 days. The glob deliberately matches the legacy
per-session filename too, so users upgrading from the previous
build don't keep a permanent backlog of pre-retention files.

Event ``ts`` fields stay UTC; only the filename uses local date,
because users go looking for "today's log" on their wall clock.

Tests cover: daily filename shape, sweep removes stale files,
sweep keeps fresh files, sweep also clears legacy filenames.

Rollback: ``git revert HEAD`` restores the per-session filename
and removes the sweep. No data migration needed either way —
existing files keep working as JSONL.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 21:22:47 +00:00

465 lines
16 KiB
Python

"""Audit log — async, never-blocks-the-GUI file writer.
A client running DataTools who hits a bug should be able to grab one
file off disk, mail it to support, and have us reconstruct what they
were doing when things broke. That file is the audit log written by
this module.
Hard contract: ``log_event`` and friends are **non-blocking**. They
push events onto an in-memory queue in microseconds and return. A
single background daemon thread drains the queue and writes batches
to disk. If the disk is hostile (Windows antivirus locks, slow
network drive, permission failures), only the writer thread waits —
the GUI render thread keeps moving. This is the contract that lets
us re-enable the audit log after the earlier blank-pages incident:
a stuck ``open(...)`` call can't hang the page anymore because the
``open`` happens off the request path.
JSONL on-disk format (one event per line):
{"ts": "...", "level": "info", "category": "upload",
"session": "a1b2c3d4", "message": "Uploaded customers.csv",
"filename": "customers.csv", "bytes": 24813}
Public API:
- ``log_event(category, message, **extra)`` — enqueue one event.
- ``log_session_start()`` — idempotent session banner.
- ``log_page_open(slug)`` — nav event, deduplicated per session.
- ``log_exception(where, exc, **extra)`` — convenience wrapper.
- ``audit_log_path()`` — pure path computation, no disk touch.
- ``audit_log_dir()`` — directory holding all session files.
- ``flush_audit_log(timeout_s=0.5)`` — drain queue (shutdown hook).
"""
from __future__ import annotations
import getpass
import json
import os
import platform
import sys
import threading
import time
import uuid
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
# Module-level state. All access either lock-free (immutable after
# init) or guarded by ``_LOCK`` / ``_QUEUE_COND``.
_LOCK = threading.Lock()
_SESSION_ID: str | None = None
_SESSION_STARTED: bool = False
# Kill switch — held off by default now that the blank-pages symptom
# is traced to the audit_log_path/_session_id deadlock fixed in
# a8ff8f4. End-to-end validation in session cf2ebbd5 (2026-05-19)
# wrote session/upload/analyze/nav/session-end events with no
# regression. Flip back to ``True`` if the symptom recurs; the env
# vars below stay as escape hatches.
#
# Env-var overrides (still wired):
# DATATOOLS_AUDIT_ENABLED=1 — force-on even if _DISABLED is True
# DATATOOLS_AUDIT_TRACE=1 — print "[audit] ..." lines to stderr
# DATATOOLS_AUDIT_PROBE=... — bisect the producer path. Values:
# full (default) | noop | no-events | no-page-open | no-session-start
_DISABLED: bool = False
_ENABLE_OVERRIDE: bool = os.environ.get("DATATOOLS_AUDIT_ENABLED") == "1"
_TRACE: bool = os.environ.get("DATATOOLS_AUDIT_TRACE") == "1"
_PROBE: str = os.environ.get("DATATOOLS_AUDIT_PROBE", "full").lower()
def _trace(_msg: str, **_kw: Any) -> None:
"""Emit a stderr trace line when DATATOOLS_AUDIT_TRACE=1.
No-op otherwise — the default configuration adds zero extra I/O."""
if not _TRACE:
return
try:
extras = " ".join(f"{k}={v!r}" for k, v in _kw.items())
line = f"[audit] {_msg}"
if extras:
line = f"{line} {extras}"
print(line, file=sys.stderr, flush=True)
except Exception:
pass
# Bounded in-memory queue. ``deque(maxlen=N)`` drops the OLDEST entry
# when full so the most-recent events are always the ones that
# survive on disk — diagnostic-most-valuable at the moment of a
# crash. The number is generous; typical sessions produce well under
# a hundred events.
_MAX_QUEUED = 5000
_QUEUE: deque = deque(maxlen=_MAX_QUEUED)
_QUEUE_COND = threading.Condition()
# Writer-thread lifecycle.
_WRITER_THREAD: threading.Thread | None = None
_SHUTDOWN_REQUESTED: bool = False
_WRITE_FAILED_REPORTED: bool = False
_trace(
"module-import",
disabled=_DISABLED,
enabled_override=_ENABLE_OVERRIDE,
probe=_PROBE,
)
# ---------------------------------------------------------------------------
# Path helpers (pure — no I/O)
# ---------------------------------------------------------------------------
def audit_log_dir() -> Path:
"""Return the directory where audit logs are written.
Defaults to ``~/.datatools/logs/``. Overrideable via the
``DATATOOLS_AUDIT_DIR`` env var (used by tests to point at a
tmp dir). No filesystem I/O happens here — caller is responsible
for ``mkdir`` if they want the dir to exist.
"""
override = os.environ.get("DATATOOLS_AUDIT_DIR")
if override:
return Path(override)
try:
return Path.home() / ".datatools" / "logs"
except Exception:
# In some sandboxed environments Path.home() can raise; fall
# back to a temp dir without touching the disk.
import tempfile
return Path(tempfile.gettempdir()) / "datatools-logs"
def _session_id() -> str:
global _SESSION_ID
if _SESSION_ID is None:
with _LOCK:
if _SESSION_ID is None:
_SESSION_ID = uuid.uuid4().hex
return _SESSION_ID
def audit_log_path() -> Path:
"""Return today's log file path. **No I/O performed.**
Filename is ``datatools-YYYY-MM-DD.jsonl`` keyed on **local** date
(event ``ts`` fields stay UTC). Local date is what a user sees on
their calendar when they go looking for "today's log"; UTC would
flip a day early in the evening on west-coast timezones, which is
surprising to humans. Concurrent DataTools instances writing on
the same day share the file via the writer thread's per-batch
``open`` + append; append-mode for small writes is atomic on
POSIX and safe-enough on Windows.
Recomputed on every call so a session that crosses midnight
follows the rollover. No caching, no locking — pure path math.
"""
try:
date_str = datetime.now().strftime("%Y-%m-%d")
except Exception:
date_str = "unknown"
return audit_log_dir() / f"datatools-{date_str}.jsonl"
_RETENTION_DAYS = 7
def _sweep_old_logs() -> None:
"""Delete ``datatools-*.jsonl`` files older than ``_RETENTION_DAYS``.
Best-effort housekeeping invoked once per process at writer-thread
start. Matches the new daily filename AND the legacy per-session
pattern (``datatools-<ts>-<sid>.jsonl``), so this also clears
leftovers from before the retention switch.
"""
try:
log_dir = audit_log_dir()
if not log_dir.exists():
return
cutoff = time.time() - (_RETENTION_DAYS * 86400)
deleted = 0
for p in log_dir.glob("datatools-*.jsonl"):
try:
if p.stat().st_mtime < cutoff:
p.unlink()
deleted += 1
except Exception:
continue
_trace("sweep:done", deleted=deleted, retention_days=_RETENTION_DAYS)
except Exception as e:
_trace("sweep:failed", exc=type(e).__name__, msg=str(e))
# ---------------------------------------------------------------------------
# Writer thread
# ---------------------------------------------------------------------------
def _writer_loop() -> None:
"""Drain ``_QUEUE`` to disk forever.
Runs in a daemon thread. Wakes on new events, batches everything
currently in the queue into a single ``open``/``write``/``close``
cycle (cheaper than per-event opens), then goes back to waiting.
File-system errors are reported once per session to stderr and
swallowed — the audit log is best-effort.
"""
global _WRITE_FAILED_REPORTED
_trace("writer-loop:enter")
_sweep_old_logs()
try:
while True:
# Wait for something to do.
with _QUEUE_COND:
while not _QUEUE and not _SHUTDOWN_REQUESTED:
_QUEUE_COND.wait(timeout=5.0)
if _SHUTDOWN_REQUESTED and not _QUEUE:
_trace("writer-loop:exit-shutdown")
return
# Snapshot + clear under the lock; release before doing
# I/O so producers can keep enqueueing.
batch = list(_QUEUE)
_QUEUE.clear()
_trace("writer-loop:wake", batch=len(batch))
try:
path = audit_log_path()
_trace("writer-loop:mkdir", dir=str(path.parent))
path.parent.mkdir(parents=True, exist_ok=True)
_trace("writer-loop:open", path=str(path))
with path.open("a", encoding="utf-8") as f:
for ev in batch:
f.write(json.dumps(ev, ensure_ascii=False) + "\n")
_trace("writer-loop:wrote", count=len(batch))
except Exception as e:
_trace(
"writer-loop:write-failed",
exc=type(e).__name__,
msg=str(e),
)
if not _WRITE_FAILED_REPORTED:
_WRITE_FAILED_REPORTED = True
try:
print(
f"DataTools audit: writes failing — log degraded "
f"for the rest of this session. Error: "
f"{type(e).__name__}: {e}",
file=sys.stderr,
flush=True,
)
except Exception:
pass
except BaseException as e:
# Outer guard. If the writer thread dies for any reason
# (unexpected exception, sys.exit from a foreign module),
# surface it in the launcher terminal instead of going silent.
try:
print(
f"DataTools audit: writer thread died: "
f"{type(e).__name__}: {e}",
file=sys.stderr,
flush=True,
)
except Exception:
pass
raise
def _ensure_writer_started() -> None:
"""Lazy-start the writer thread on first event.
Idempotent. Cheap enough to call on every event (a sentinel
check + an occasional lock acquisition).
"""
global _WRITER_THREAD
if _WRITER_THREAD is not None and _WRITER_THREAD.is_alive():
return
with _LOCK:
if _WRITER_THREAD is None or not _WRITER_THREAD.is_alive():
_trace("ensure-writer:starting")
t = threading.Thread(
target=_writer_loop,
daemon=True,
name="datatools-audit-writer",
)
t.start()
_WRITER_THREAD = t
_trace("ensure-writer:started", ident=t.ident)
# ---------------------------------------------------------------------------
# Public producer API
# ---------------------------------------------------------------------------
def log_event(
category: str,
message: str,
*,
level: str = "info",
**extra: Any,
) -> None:
"""Enqueue one event. Non-blocking; returns in microseconds.
Failures inside this function are swallowed; a broken audit log
must never take the GUI down.
"""
if _DISABLED and not _ENABLE_OVERRIDE:
return
if _PROBE in ("noop", "no-events"):
_trace("log_event:probe-skip", category=category)
return
_trace("log_event:enter", category=category)
try:
try:
ts = datetime.now(tz=timezone.utc).isoformat(timespec="milliseconds")
except Exception:
ts = ""
event: dict[str, Any] = {
"ts": ts,
"level": level,
"category": category,
"session": _session_id()[:8],
"message": message,
}
for k, v in extra.items():
try:
json.dumps(v)
event[k] = v
except (TypeError, ValueError):
event[k] = str(v)
_ensure_writer_started()
with _QUEUE_COND:
_QUEUE.append(event)
_QUEUE_COND.notify()
except Exception:
pass
def log_session_start() -> None:
"""Idempotent session-start banner with platform info."""
if _DISABLED and not _ENABLE_OVERRIDE:
return
if _PROBE in ("noop", "no-session-start"):
_trace("log_session_start:probe-skip")
return
_trace("log_session_start:enter")
global _SESSION_STARTED
with _LOCK:
if _SESSION_STARTED:
return
_SESSION_STARTED = True
try:
user = getpass.getuser()
except Exception:
user = "?"
try:
cwd = str(Path.cwd())
except Exception:
cwd = "?"
log_event(
"session",
"Session started",
platform=f"{platform.system()} {platform.release()}",
python=sys.version.split()[0],
user=user,
cwd=cwd,
log_file=str(audit_log_path()),
)
def log_page_open(slug: str) -> None:
"""Emit a deduplicated 'page open' nav event."""
if _DISABLED and not _ENABLE_OVERRIDE:
return
if _PROBE in ("noop", "no-page-open"):
_trace("log_page_open:probe-skip", slug=slug)
return
_trace("log_page_open:enter", slug=slug)
try:
try:
import streamlit as st
prev = st.session_state.get("_audit_current_page")
if prev == slug:
return
st.session_state["_audit_current_page"] = slug
except Exception:
pass
log_event("nav", f"Opened {slug}", page=slug)
except Exception:
pass
def log_exception(where: str, exc: BaseException, **extra: Any) -> None:
"""Convenience wrapper for caught exceptions."""
log_event(
"error",
f"{where}: {type(exc).__name__}: {exc}",
level="error",
exc_type=type(exc).__name__,
exc_message=str(exc),
**extra,
)
# ---------------------------------------------------------------------------
# Shutdown
# ---------------------------------------------------------------------------
def flush_audit_log(timeout_s: float = 0.5) -> None:
"""Drain queued events to disk, then signal the writer to exit.
Called from ``shutdown_app`` so the closing session's events make
it to disk before the process dies. Bounded by ``timeout_s`` so a
stuck disk can never delay shutdown — events still in the queue
when the timer expires are dropped.
"""
if _DISABLED and not _ENABLE_OVERRIDE:
return
_trace("flush:enter", timeout_s=timeout_s)
global _SHUTDOWN_REQUESTED
deadline = time.monotonic() + max(0.0, timeout_s)
with _QUEUE_COND:
_SHUTDOWN_REQUESTED = True
_QUEUE_COND.notify_all()
while time.monotonic() < deadline:
with _QUEUE_COND:
if not _QUEUE:
return
time.sleep(0.02)
# ---------------------------------------------------------------------------
# Test helpers
# ---------------------------------------------------------------------------
def reset_for_tests() -> None:
"""Reset module-level state. Call from a pytest fixture for
isolation between tests."""
global _SESSION_ID, _SESSION_STARTED
global _WRITER_THREAD, _SHUTDOWN_REQUESTED, _WRITE_FAILED_REPORTED
with _LOCK:
_SESSION_ID = None
_SESSION_STARTED = False
_SHUTDOWN_REQUESTED = False
_WRITE_FAILED_REPORTED = False
# The previous writer thread (if any) will exit on its own
# the next time it wakes — daemon thread, no need to join.
_WRITER_THREAD = None
with _QUEUE_COND:
_QUEUE.clear()
__all__ = [
"audit_log_dir",
"audit_log_path",
"flush_audit_log",
"log_event",
"log_exception",
"log_page_open",
"log_session_start",
"reset_for_tests",
]