Source code for xpcsviewer.utils.health_monitor

"""
Background Health Monitoring System for XPCS Viewer.

This module provides non-intrusive background monitoring using existing thread pools
to track system health, resource usage, and reliability metrics without impacting
performance of core operations.
"""

import gc
import os
import threading
import time
import weakref
from collections import defaultdict, deque
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import Enum
from typing import Any

import numpy as np
import psutil

from xpcsviewer.constants import MIN_DISPLAY_POINTS, NDIM_2D

from .logging_config import get_logger

logger = get_logger(__name__)


[docs] class HealthStatus(Enum): """Overall system health status levels.""" EXCELLENT = "excellent" # All systems optimal GOOD = "good" # Minor issues, no impact WARNING = "warning" # Issues detected, monitoring CRITICAL = "critical" # Immediate attention required EMERGENCY = "emergency" # System stability at risk
[docs] class ResourceType(Enum): """Types of system resources to monitor.""" MEMORY = "memory" CPU = "cpu" DISK = "disk" THREADS = "threads" HDF5_CONNECTIONS = "hdf5_connections" GUI_RESPONSIVENESS = "gui_responsiveness"
[docs] @dataclass class HealthMetric: """Individual health metric with thresholds and history.""" name: str current_value: float threshold_warning: float threshold_critical: float unit: str = "" history: deque = field(default_factory=lambda: deque(maxlen=100)) last_updated: float = field(default_factory=time.time)
[docs] def update(self, value: float) -> None: """Update metric value and history.""" self.current_value = value self.history.append((time.time(), value)) self.last_updated = time.time()
[docs] def get_status(self) -> HealthStatus: """Get current status based on thresholds.""" if self.current_value >= self.threshold_critical: return HealthStatus.CRITICAL if self.current_value >= self.threshold_warning: return HealthStatus.WARNING return HealthStatus.GOOD
[docs] def get_trend(self, window_minutes: float = 5.0) -> str: """Get trend over specified time window.""" if len(self.history) < NDIM_2D: return "insufficient_data" cutoff_time = time.time() - (window_minutes * 60) recent_values = [val for ts, val in self.history if ts >= cutoff_time] if len(recent_values) < NDIM_2D: return "insufficient_data" # Simple linear trend trend = np.polyfit(range(len(recent_values)), recent_values, 1)[0] if abs(trend) < 0.01: # Threshold for "stable" return "stable" if trend > 0: return "increasing" return "decreasing"
[docs] class HealthMonitor: """ Non-intrusive health monitoring system using background threads. Monitors system resources, application state, and reliability metrics without impacting performance of core XPCS operations. """
[docs] def __init__(self, monitoring_interval: float = 30.0, history_size: int = 100): self.monitoring_interval = monitoring_interval self.history_size = history_size self._metrics: dict[str, HealthMetric] = {} self._monitoring_active = False self._monitor_thread: threading.Thread | None = None self._lock = threading.RLock() self._callbacks: dict[HealthStatus, list[Callable]] = defaultdict(list) self._last_alert_times: dict[str, float] = {} self._alert_cooldown = 300.0 # 5 minutes between same alerts # Track application objects for health monitoring self._tracked_objects: set[weakref.ref] = set() # Periodic callbacks registered by subsystems that want to reuse the # health monitor's daemon thread instead of spawning their own (BUG-050). # Each entry is a callable() with no required arguments. self._periodic_callbacks: list[Callable] = [] # Initialize core metrics self._initialize_metrics()
def _initialize_metrics(self) -> None: """Initialize core health metrics with appropriate thresholds.""" self._metrics = { "memory_usage_percent": HealthMetric( name="Memory Usage", current_value=0.0, threshold_warning=75.0, threshold_critical=90.0, unit="%", ), "memory_available_gb": HealthMetric( name="Available Memory", current_value=0.0, threshold_warning=2.0, # < 2GB available threshold_critical=0.5, # < 500MB available unit="GB", ), "cpu_usage_percent": HealthMetric( name="CPU Usage", current_value=0.0, threshold_warning=80.0, threshold_critical=95.0, unit="%", ), "thread_count": HealthMetric( name="Thread Count", current_value=0.0, threshold_warning=100, threshold_critical=200, unit="", ), "hdf5_connections": HealthMetric( name="HDF5 Connections", current_value=0.0, threshold_warning=50, threshold_critical=100, unit="", ), "disk_usage_percent": HealthMetric( name="Disk Usage", current_value=0.0, threshold_warning=85.0, threshold_critical=95.0, unit="%", ), "gc_collections_per_hour": HealthMetric( name="GC Collections/Hour", current_value=0.0, threshold_warning=100, threshold_critical=300, unit="", ), }
[docs] def start_monitoring(self) -> None: """Start background health monitoring.""" import os # Skip starting background threads in test mode to prevent threading issues if os.environ.get("XPCS_TEST_MODE") == "1": return with self._lock: if self._monitoring_active: logger.debug("Health monitoring already active") return self._monitoring_active = True self._monitor_thread = threading.Thread( target=self._monitoring_loop, name="XPCS-HealthMonitor", daemon=True ) self._monitor_thread.start() logger.info( f"Health monitoring started (interval: {self.monitoring_interval}s)" )
[docs] def stop_monitoring(self) -> None: """Stop background health monitoring. The lock is released before joining the monitor thread to avoid a potential deadlock: the monitor loop may attempt to acquire ``_lock`` (e.g. via ``get_health_summary``), so holding it while waiting for the thread to finish would block both sides (SRE-6). """ thread_to_join = None with self._lock: if not self._monitoring_active: return self._monitoring_active = False if self._monitor_thread and self._monitor_thread.is_alive(): thread_to_join = self._monitor_thread # Join outside the lock to prevent deadlock (SRE-6). if thread_to_join is not None: thread_to_join.join(timeout=5.0) if thread_to_join.is_alive(): logger.warning("Health monitor thread did not stop within timeout") with self._lock: self._monitor_thread = None logger.info("Health monitoring stopped")
def _monitoring_loop(self) -> None: """Main monitoring loop running in background thread.""" logger.debug("Health monitoring loop started") # Track GC statistics -- reset at the start of *each* interval so that # _update_gc_metrics reports a true per-interval delta rather than a # cumulative total since monitoring started (BUG-052). gc_stats_start = gc.get_stats() while self._monitoring_active: try: start_time = time.time() # Update all metrics self._update_system_metrics() self._update_application_metrics() self._update_gc_metrics(gc_stats_start) # Reset GC baseline to current stats so the *next* iteration # measures only collections that occurred during this interval. gc_stats_start = gc.get_stats() # Invoke subsystem callbacks that have opted in to share this # thread instead of running their own daemon threads (BUG-050). for cb in list(self._periodic_callbacks): try: cb() except Exception as cb_exc: logger.debug(f"Error in periodic health callback: {cb_exc}") # Check for alerts self._check_health_alerts() # Clean up dead weak references self._cleanup_tracked_objects() # Sleep for remaining interval time elapsed = time.time() - start_time sleep_time = max(0, self.monitoring_interval - elapsed) if sleep_time > 0: time.sleep(sleep_time) except Exception as e: # Don't let monitoring errors crash the monitor logger.debug(f"Health monitoring error: {e}") time.sleep(self.monitoring_interval) logger.debug("Health monitoring loop ended") def _update_system_metrics(self) -> None: """Update system-level metrics using psutil.""" try: # Memory metrics memory = psutil.virtual_memory() self._metrics["memory_usage_percent"].update(memory.percent) self._metrics["memory_available_gb"].update(memory.available / (1024**3)) # CPU metrics (average over short period to avoid blocking) cpu_percent = psutil.cpu_percent(interval=0.1) self._metrics["cpu_usage_percent"].update(cpu_percent) # Thread count process = psutil.Process() thread_count = process.num_threads() self._metrics["thread_count"].update(thread_count) # Disk usage for current working directory disk_usage = psutil.disk_usage(os.getcwd()) disk_percent = (disk_usage.used / disk_usage.total) * 100 self._metrics["disk_usage_percent"].update(disk_percent) except Exception as e: logger.debug(f"Error updating system metrics: {e}") def _update_application_metrics(self) -> None: """Update application-specific metrics.""" try: # HDF5 connection count (if available) hdf5_count = self._get_hdf5_connection_count() self._metrics["hdf5_connections"].update(hdf5_count) except Exception as e: logger.debug(f"Error updating application metrics: {e}") def _update_gc_metrics(self, initial_stats: list[dict]) -> None: """Update garbage collection metrics. ``initial_stats`` must be the GC stats snapshot taken at the *start of the current interval*, not at monitoring start time. The caller (``_monitoring_loop``) refreshes the baseline after each call so that this method always computes a true per-interval delta. (BUG-052) """ try: current_stats = gc.get_stats() if len(current_stats) == len(initial_stats): # Per-interval delta: only collections since the last snapshot. total_collections = sum( current_stats[i]["collections"] - initial_stats[i]["collections"] for i in range(len(current_stats)) ) # Scale to collections-per-hour using the monitoring interval so # the metric is comparable across different interval settings. # Avoid division by zero if interval is somehow 0. interval_hours = max(self.monitoring_interval, 1e-6) / 3600 collections_per_hour = total_collections / interval_hours self._metrics["gc_collections_per_hour"].update(collections_per_hour) except Exception as e: logger.debug(f"Error updating GC metrics: {e}") def _get_hdf5_connection_count(self) -> float: """Get current HDF5 connection count.""" try: # Try to get connection count from HDF5 reader from ..fileIO.hdf_reader import _connection_pool if hasattr(_connection_pool, "get_pool_size"): return float(_connection_pool.get_pool_size()) if hasattr(_connection_pool, "_pool_size"): return float(_connection_pool._pool_size) except (ImportError, AttributeError): pass return 0.0 def _check_health_alerts(self) -> None: """Check metrics and trigger alerts if thresholds exceeded.""" current_time = time.time() overall_status = HealthStatus.EXCELLENT for metric_name, metric in self._metrics.items(): status = metric.get_status() # Update overall status to worst individual status if status.value == "critical": overall_status = HealthStatus.CRITICAL elif status.value == "warning" and overall_status.value in [ "excellent", "good", ]: overall_status = HealthStatus.WARNING elif status.value == "good" and overall_status.value == "excellent": overall_status = HealthStatus.GOOD # Check if we should send alert alert_key = f"{metric_name}_{status.value}" last_alert = self._last_alert_times.get(alert_key, 0) if ( status in [HealthStatus.WARNING, HealthStatus.CRITICAL] and current_time - last_alert > self._alert_cooldown ): self._trigger_alert(metric_name, metric, status) self._last_alert_times[alert_key] = current_time # Trigger overall status callbacks self._trigger_status_callbacks(overall_status) def _trigger_alert( self, metric_name: str, metric: HealthMetric, status: HealthStatus ) -> None: """Trigger alert for specific metric.""" trend = metric.get_trend() message = ( f"Health Alert [{status.value.upper()}]: {metric.name} = " f"{metric.current_value:.1f}{metric.unit} " f"(threshold: {metric.threshold_warning if status == HealthStatus.WARNING else metric.threshold_critical:.1f}{metric.unit}, " f"trend: {trend})" ) if status == HealthStatus.CRITICAL: logger.error(message) else: logger.warning(message) # Trigger specific actions based on metric and status self._handle_metric_alert(metric_name, metric, status) def _handle_metric_alert( self, metric_name: str, metric: HealthMetric, status: HealthStatus ) -> None: """Handle specific metric alerts with automatic recovery actions.""" if metric_name == "memory_usage_percent" and status == HealthStatus.CRITICAL: # Trigger emergency memory cleanup logger.warning( "Triggering emergency memory cleanup due to high memory usage" ) self._emergency_memory_cleanup() elif metric_name == "hdf5_connections" and status == HealthStatus.WARNING: # Clean up HDF5 connections logger.warning("Cleaning up HDF5 connections due to high connection count") self._cleanup_hdf5_connections() elif ( metric_name == "gc_collections_per_hour" and status == HealthStatus.CRITICAL ): # Log GC pressure warning logger.warning( "High garbage collection pressure detected - potential memory leaks" ) def _emergency_memory_cleanup(self) -> None: """Perform emergency memory cleanup.""" try: # Force garbage collection collected = gc.collect() logger.debug(f"Emergency GC freed {collected} objects") # Try to trigger cleanup in memory manager if available try: from .memory_manager import get_memory_manager memory_manager = get_memory_manager() memory_manager._emergency_cleanup() logger.debug("Triggered memory manager emergency cleanup") except (ImportError, AttributeError): pass except Exception as e: logger.debug(f"Error during emergency memory cleanup: {e}") def _cleanup_hdf5_connections(self) -> None: """Clean up HDF5 connections.""" try: from ..fileIO.hdf_reader import _connection_pool if hasattr(_connection_pool, "cleanup_idle_connections"): _connection_pool.cleanup_idle_connections() logger.debug("Cleaned up idle HDF5 connections") except (ImportError, AttributeError): pass def _trigger_status_callbacks(self, status: HealthStatus) -> None: """Trigger registered callbacks for overall status. Warning: Callbacks are invoked on the daemon monitoring thread, not on the main/GUI thread. Registered callbacks must be thread-safe. For Qt-aware callbacks, use QMetaObject.invokeMethod or signals to marshal work to the GUI thread. """ callbacks = self._callbacks.get(status, []) if callbacks: logger.debug( f"Triggering {len(callbacks)} health callback(s) for " f"status={status.value} on daemon thread" ) for callback in callbacks: try: callback(status, self.get_health_summary()) except Exception as e: logger.debug(f"Error in health status callback: {e}") def _cleanup_tracked_objects(self) -> None: """Clean up dead weak references.""" self._tracked_objects = { ref for ref in self._tracked_objects if ref() is not None }
[docs] def register_health_callback( self, status: HealthStatus, callback: Callable ) -> None: """Register callback for specific health status. Warning: Callbacks are invoked on the daemon monitoring thread. They must be thread-safe. For Qt GUI updates, use signals or QMetaObject.invokeMethod to marshal work to the main thread. """ self._callbacks[status].append(callback)
[docs] def register_periodic_callback(self, callback: Callable) -> None: """Register a callable to be invoked once per monitoring interval. This allows subsystems to share the health monitor's daemon thread instead of spawning their own background threads (BUG-050). Each registered callback is called with no arguments from the monitoring loop. Callbacks must be thread-safe and must not block for longer than ``monitoring_interval`` seconds. Warning: Callbacks are invoked on the daemon monitoring thread. """ self._periodic_callbacks.append(callback)
[docs] def unregister_periodic_callback(self, callback: Callable) -> None: """Remove a previously registered periodic callback.""" try: self._periodic_callbacks.remove(callback) except ValueError: pass
[docs] def track_object(self, obj: Any) -> None: """Track object for health monitoring.""" self._tracked_objects.add(weakref.ref(obj))
[docs] def get_health_summary(self) -> dict[str, Any]: """Get comprehensive health summary.""" with self._lock: summary: dict[str, Any] = { "overall_status": self._get_overall_status().value, "monitoring_active": self._monitoring_active, "metrics": {}, "alerts": [], "recommendations": [], } # Add metric summaries for name, metric in self._metrics.items(): summary["metrics"][name] = { "value": metric.current_value, "unit": metric.unit, "status": metric.get_status().value, "trend": metric.get_trend(), "threshold_warning": metric.threshold_warning, "threshold_critical": metric.threshold_critical, } # Add to alerts if problematic status = metric.get_status() if status in [HealthStatus.WARNING, HealthStatus.CRITICAL]: summary["alerts"].append( { "metric": name, "status": status.value, "value": metric.current_value, "message": f"{metric.name} is {status.value}: {metric.current_value:.1f}{metric.unit}", } ) # Add recommendations summary["recommendations"] = self._get_health_recommendations() return summary
def _get_overall_status(self) -> HealthStatus: """Calculate overall health status.""" worst_status = HealthStatus.EXCELLENT for metric in self._metrics.values(): status = metric.get_status() if status == HealthStatus.CRITICAL: return HealthStatus.CRITICAL if status == HealthStatus.WARNING and worst_status != HealthStatus.CRITICAL: worst_status = HealthStatus.WARNING elif status == HealthStatus.GOOD and worst_status == HealthStatus.EXCELLENT: worst_status = HealthStatus.GOOD return worst_status def _get_health_recommendations(self) -> list[str]: """Get health improvement recommendations.""" recommendations = [] memory_metric = self._metrics.get("memory_usage_percent") if memory_metric and memory_metric.current_value > 70: recommendations.append( "Consider closing unused data files or reducing dataset sizes" ) cpu_metric = self._metrics.get("cpu_usage_percent") if cpu_metric and cpu_metric.current_value > 80: recommendations.append( "High CPU usage detected - consider reducing parallel operations" ) hdf5_metric = self._metrics.get("hdf5_connections") if hdf5_metric and hdf5_metric.current_value > 30: recommendations.append( "Many HDF5 connections open - consider closing unused files" ) gc_metric = self._metrics.get("gc_collections_per_hour") if gc_metric and gc_metric.current_value > 200: recommendations.append( "High garbage collection rate - potential memory leaks detected" ) return recommendations
[docs] def get_performance_impact(self) -> dict[str, float]: """Get monitoring performance impact statistics.""" return { "monitoring_cpu_percent": 0.1, # Estimated < 0.1% CPU "monitoring_memory_mb": 5.0, # Estimated < 5MB memory "monitoring_interval_seconds": self.monitoring_interval, "metrics_tracked": len(self._metrics), "objects_tracked": len(self._tracked_objects), }
# Global health monitor instance _health_monitor: HealthMonitor | None = None _monitor_lock = threading.Lock()
[docs] def get_health_monitor() -> HealthMonitor: """Get or create the global health monitor instance.""" global _health_monitor # noqa: PLW0603 - intentional singleton pattern if _health_monitor is None: with _monitor_lock: if _health_monitor is None: _health_monitor = HealthMonitor() return _health_monitor
[docs] def start_health_monitoring(interval: float = 30.0) -> None: """Start background health monitoring.""" monitor = get_health_monitor() monitor.monitoring_interval = interval monitor.start_monitoring()
[docs] def stop_health_monitoring() -> None: """Stop background health monitoring.""" if _health_monitor: _health_monitor.stop_monitoring()
[docs] def get_health_status() -> dict[str, Any]: """Get current health status summary.""" monitor = get_health_monitor() return monitor.get_health_summary()
[docs] def register_health_callback(status: HealthStatus, callback: Callable) -> None: """Register callback for specific health status changes.""" monitor = get_health_monitor() monitor.register_health_callback(status, callback)
[docs] def track_object_health(obj: Any) -> None: """Track object for health monitoring.""" monitor = get_health_monitor() monitor.track_object(obj)
# Context manager for automatic health monitoring
[docs] class health_monitoring_context: """Context manager for automatic health monitoring during operations."""
[docs] def __init__(self, operation_name: str, alert_on_warning: bool = True): self.operation_name = operation_name self.alert_on_warning = alert_on_warning self.start_metrics: dict[str, float] = {}
def __enter__(self): # Capture initial metrics monitor = get_health_monitor() self.start_metrics = { name: metric.current_value for name, metric in monitor._metrics.items() } return self def __exit__(self, exc_type, exc_value, traceback): # Check for significant changes monitor = get_health_monitor() significant_changes = [] for name, start_value in self.start_metrics.items(): current_metric = monitor._metrics.get(name) if current_metric: change_percent = ( abs(current_metric.current_value - start_value) / max(start_value, 1.0) * 100 ) if change_percent > MIN_DISPLAY_POINTS: # > 10% change significant_changes.append( f"{name}: {start_value:.1f} -> {current_metric.current_value:.1f}" ) if significant_changes: logger.debug( f"Operation '{self.operation_name}' caused significant resource changes: {', '.join(significant_changes)}" ) return False # Don't suppress exceptions