"""
Logging utilities for comprehensive application monitoring.
This module provides logging utilities for session context management,
rate-limited logging, method timing, and path sanitization.
Features:
- LoggingContext: Session-level context with correlation IDs (contextvars)
- SessionContextFilter: Logging filter that adds session context to records
- RateLimitedLogger: Token bucket rate-limited logging wrapper
- log_timing: Decorator for logging method entry/exit with timing
- sanitize_path: Path sanitization for privacy in logs
Environment Variables:
- ``PYXPCS_LOG_RATE_LIMIT``: Default rate limit in msgs/sec (default: 10.0)
- ``PYXPCS_LOG_SANITIZE_PATHS``: Path sanitization mode: none/home/hash (default: home)
- ``PYXPCS_LOG_SESSION_ID``: Enable session IDs: 1/0 (default: 1)
Example::
from xpcsviewer.utils.log_utils import (
LoggingContext,
RateLimitedLogger,
log_timing,
sanitize_path,
)
# Session context
with LoggingContext(operation="data_analysis") as ctx:
logger.info("Starting analysis") # Includes session_id
# Rate-limited logging
rate_limited = RateLimitedLogger(logger, rate_per_second=5.0)
rate_limited.debug("High-frequency event")
# Timing decorator
@log_timing()
def process_data(data):
...
# Path sanitization
logger.info(f"Loaded: {sanitize_path(file_path)}")
"""
from __future__ import annotations
import contextvars
import functools
import hashlib
import logging
import os
import time
import uuid
from collections.abc import Callable
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, TypeVar
if TYPE_CHECKING:
from typing import ParamSpec
P = ParamSpec("P")
F = TypeVar("F", bound=Callable[..., Any])
# Context variables for session tracking
_session_id: contextvars.ContextVar[str] = contextvars.ContextVar(
"session_id", default="no-session"
)
_operation: contextvars.ContextVar[str] = contextvars.ContextVar(
"operation", default=""
)
_current_file: contextvars.ContextVar[str] = contextvars.ContextVar(
"current_file", default=""
)
def _get_env_float(name: str, default: float) -> float:
"""Get a float environment variable with fallback to default."""
value = os.environ.get(name)
if value is None:
return default
try:
result = float(value)
return result if result > 0 else default
except ValueError:
return default
def _get_env_bool(name: str, default: bool) -> bool:
"""Get a boolean environment variable with fallback to default."""
value = os.environ.get(name)
if value is None:
return default
return value.lower() in ("1", "true", "yes", "on")
def _get_sanitize_mode() -> Literal["none", "home", "hash"]:
"""Get path sanitization mode from environment."""
value = os.environ.get("PYXPCS_LOG_SANITIZE_PATHS", "home").lower()
if value in ("none", "home", "hash"):
return value # type: ignore[return-value]
return "home"
[docs]
class LoggingContext:
"""
Manage session-level logging context with correlation IDs.
Uses contextvars for thread-safe session tracking. All log entries
within a context share the same session_id for correlation.
Attributes:
session_id: UUID4 identifier for the session (read-only after creation)
Example:
with LoggingContext(operation="batch_analysis") as ctx:
logger.info("Starting batch") # Includes session_id
ctx.update_file("/path/to/data.hdf5")
process_file()
"""
_token_session: contextvars.Token[str] | None = None
_token_operation: contextvars.Token[str] | None = None
_token_file: contextvars.Token[str] | None = None
[docs]
def __init__(
self,
session_id: str | None = None,
operation: str | None = None,
current_file: str | None = None,
) -> None:
"""
Initialize a logging context.
Args:
session_id: Session ID (auto-generated UUID4 if not provided)
operation: Current high-level operation name
current_file: Currently loaded file path (will be sanitized)
"""
self._session_id = session_id or str(uuid.uuid4())[:8]
self._operation = operation or ""
self._current_file = sanitize_path(current_file) if current_file else ""
self._started_at = time.time()
[docs]
def __enter__(self) -> LoggingContext:
"""Enter the context manager, setting contextvars."""
self._token_session = _session_id.set(self._session_id)
self._token_operation = _operation.set(self._operation)
self._token_file = _current_file.set(self._current_file)
return self
[docs]
def __exit__(self, *args: object) -> None:
"""Exit the context manager, restoring previous contextvars."""
if self._token_session is not None:
_session_id.reset(self._token_session)
if self._token_operation is not None:
_operation.reset(self._token_operation)
if self._token_file is not None:
_current_file.reset(self._token_file)
@property
def session_id(self) -> str:
"""Get the session ID."""
return self._session_id
[docs]
def update_operation(self, operation: str) -> None:
"""Update the current operation context."""
self._operation = operation[:100] # Max 100 chars per data model
_operation.set(self._operation)
[docs]
def update_file(self, file_path: str | os.PathLike[str] | None) -> None:
"""Update the current file context (path will be sanitized)."""
if file_path is None:
self._current_file = ""
else:
self._current_file = sanitize_path(file_path)
_current_file.set(self._current_file)
[docs]
@staticmethod
def get_current() -> LoggingContext | None:
"""
Get the current logging context if one is active.
Returns:
Current LoggingContext or None if no context is active
"""
session = _session_id.get()
if session == "no-session":
return None
# Create a read-only view of current context
ctx = LoggingContext.__new__(LoggingContext)
ctx._session_id = session
ctx._operation = _operation.get()
ctx._current_file = _current_file.get()
ctx._started_at = 0.0 # Unknown for recovered context
return ctx
[docs]
class SessionContextFilter(logging.Filter):
"""
Logging filter that adds session context to log records.
Adds the following attributes to each LogRecord:
- session_id: Current session ID or 'no-session'
- operation: Current operation name or ''
- current_file: Current file path (sanitized) or ''
This filter should be added to handlers via logging_config.py.
"""
[docs]
def filter(self, record: logging.LogRecord) -> bool:
"""
Add session context attributes to the log record.
Args:
record: The log record to enrich
Returns:
True (always allows the record through)
"""
record.session_id = _session_id.get()
record.operation = _operation.get()
record.current_file = _current_file.get()
return True
[docs]
class RateLimitedLogger:
"""
Rate-limited logger wrapper using token bucket algorithm.
Prevents log flooding by limiting messages per second. Each unique
message template is rate-limited independently.
Attributes:
suppressed_count: Total number of suppressed messages
Example:
logger = logging.getLogger(__name__)
rate_limited = RateLimitedLogger(logger, rate_per_second=5.0)
def on_mouse_move(x, y):
rate_limited.debug(f"Mouse at ({x}, {y})") # Max 5/sec
"""
[docs]
def __init__(
self,
logger: logging.Logger,
rate_per_second: float | None = None,
burst_size: float | None = None,
) -> None:
"""
Initialize the rate-limited logger.
Args:
logger: The underlying logger to wrap
rate_per_second: Maximum messages per second (default from env var)
burst_size: Initial token bucket size (default: rate_per_second)
"""
self._logger = logger
default_rate = _get_env_float("PYXPCS_LOG_RATE_LIMIT", 10.0)
self._rate = rate_per_second if rate_per_second is not None else default_rate
self._burst_size = burst_size if burst_size is not None else self._rate
# Token bucket state: {message_key: (tokens, last_update, suppressed_count)}
self._buckets: dict[str, tuple[float, float, int]] = {}
def _get_message_key(self, msg: str) -> str:
"""Generate a key for the message (first 50 chars for grouping)."""
return msg[:50]
def _try_consume(self, msg_key: str) -> bool:
"""
Try to consume a token for the given message key.
Returns True if message should be logged, False if suppressed.
"""
now = time.monotonic()
if msg_key not in self._buckets:
# Initialize bucket with full tokens
self._buckets[msg_key] = (self._burst_size - 1, now, 0)
return True
tokens, last_update, suppressed = self._buckets[msg_key]
# Refill tokens based on elapsed time
elapsed = now - last_update
tokens = min(self._burst_size, tokens + elapsed * self._rate)
if tokens >= 1.0:
# Consume a token and log
self._buckets[msg_key] = (tokens - 1, now, 0)
return True
# Suppress and increment counter
self._buckets[msg_key] = (tokens, now, suppressed + 1)
return False
def _log(self, level: int, msg: str, *args: object, **kwargs: object) -> bool:
"""Internal logging method with rate limiting."""
msg_key = self._get_message_key(msg)
if self._try_consume(msg_key):
self._logger.log(level, msg, *args, **kwargs) # type: ignore[arg-type]
return True
return False
[docs]
def debug(self, msg: str, *args: object, **kwargs: object) -> bool:
"""Log DEBUG message if not rate-limited. Returns True if logged."""
return self._log(logging.DEBUG, msg, *args, **kwargs)
[docs]
def info(self, msg: str, *args: object, **kwargs: object) -> bool:
"""Log INFO message if not rate-limited. Returns True if logged."""
return self._log(logging.INFO, msg, *args, **kwargs)
[docs]
def warning(self, msg: str, *args: object, **kwargs: object) -> bool:
"""Log WARNING message if not rate-limited. Returns True if logged."""
return self._log(logging.WARNING, msg, *args, **kwargs)
[docs]
def error(self, msg: str, *args: object, **kwargs: object) -> bool:
"""Log ERROR message if not rate-limited. Returns True if logged."""
return self._log(logging.ERROR, msg, *args, **kwargs)
[docs]
def critical(self, msg: str, *args: object, **kwargs: object) -> bool:
"""Log CRITICAL message if not rate-limited. Returns True if logged."""
return self._log(logging.CRITICAL, msg, *args, **kwargs)
[docs]
def get_suppressed_count(self, msg_key: str | None = None) -> int:
"""
Get count of suppressed messages.
Args:
msg_key: Specific message key, or None for total
Returns:
Number of suppressed messages
"""
if msg_key is not None:
if msg_key in self._buckets:
return self._buckets[msg_key][2]
return 0
return sum(bucket[2] for bucket in self._buckets.values())
[docs]
def reset(self) -> None:
"""Clear all rate limit state."""
self._buckets.clear()
[docs]
def log_timing(
logger: logging.Logger | None = None,
level: int = logging.DEBUG,
threshold_ms: float | None = None,
threshold_level: int = logging.WARNING,
include_args: bool = False,
) -> Callable[[F], F]:
"""
Decorator for logging method entry/exit with timing.
Logs the method name and execution time. Optionally logs at a higher
level if execution exceeds a threshold.
Args:
logger: Logger to use (default: logger for decorated function's module)
level: Normal log level (default: DEBUG)
threshold_ms: Time threshold in ms for elevated logging
threshold_level: Level to use when threshold exceeded (default: WARNING)
include_args: Whether to include function arguments in log
Returns:
Decorated function
Example::
@log_timing()
def process_data(data):
... # Logs: "process_data completed in 123.45ms"
@log_timing(threshold_ms=1000)
def slow_operation():
... # Logs WARNING if > 1000ms
"""
def decorator(func: F) -> F:
func_logger = logger or logging.getLogger(func.__module__)
func_name = func.__qualname__
@functools.wraps(func)
def wrapper(*args: object, **kwargs: object) -> object:
start_time = time.perf_counter()
try:
result = func(*args, **kwargs)
elapsed_ms = (time.perf_counter() - start_time) * 1000
# Determine log level based on threshold
log_level = level
if threshold_ms is not None and elapsed_ms > threshold_ms:
log_level = threshold_level
# Build message
if include_args:
args_str = ", ".join(
[repr(a)[:50] for a in args[:3]]
+ [f"{k}={v!r}"[:30] for k, v in list(kwargs.items())[:3]]
)
msg = f"{func_name}({args_str}) completed in {elapsed_ms:.2f}ms"
else:
msg = f"{func_name} completed in {elapsed_ms:.2f}ms"
func_logger.log(log_level, msg)
return result
except Exception:
elapsed_ms = (time.perf_counter() - start_time) * 1000
func_logger.error(
f"{func_name} failed after {elapsed_ms:.2f}ms",
exc_info=True,
)
raise
return wrapper # type: ignore[return-value]
return decorator
[docs]
def sanitize_path(
path: str | os.PathLike[str] | None,
mode: Literal["none", "home", "hash"] | None = None,
) -> str:
"""
Sanitize file paths for logging privacy.
Args:
path: File path to sanitize
mode: Sanitization mode (default from PYXPCS_LOG_SANITIZE_PATHS env var)
- 'none': No sanitization, full path
- 'home': Replace home directory with ~
- 'hash': Replace home with ~ and hash the filename
Returns:
Sanitized path string
Example:
sanitize_path('/Users/john/data/file.h5')
# mode='none' -> '/Users/john/data/file.h5'
# mode='home' -> '~/data/file.h5'
# mode='hash' -> '~/data/a1b2c3d4.h5'
"""
if path is None:
return "<none>"
path_str = str(path)
effective_mode = mode if mode is not None else _get_sanitize_mode()
if effective_mode == "none":
return path_str
# Get home directory
home = os.path.expanduser("~")
# Replace home directory with ~
if path_str.startswith(home):
path_str = "~" + path_str[len(home) :]
if effective_mode == "hash":
# Hash the filename portion
p = Path(path_str)
if p.name:
name_hash = hashlib.sha256(p.name.encode()).hexdigest()[:8]
path_str = str(p.parent / f"{name_hash}{p.suffix}")
# Normalize to forward slashes for cross-platform log consistency
return path_str.replace("\\", "/")
[docs]
def get_session_context() -> dict[str, str]:
"""
Get the current session context as a dictionary.
Returns:
Dictionary with session_id, operation, and current_file
"""
return {
"session_id": _session_id.get(),
"operation": _operation.get(),
"current_file": _current_file.get(),
}