Source code for xpcsviewer.utils.memory_predictor

"""
Predictive Memory Pressure Detection for XPCS Viewer

This module provides advanced memory pressure prediction specifically tailored
for XPCS data analysis workflows, enabling proactive memory management.
"""

import os
import time
from collections import deque
from dataclasses import dataclass
from typing import Any

import numpy as np

from xpcsviewer.constants import (
    LOW_CONFIDENCE_THRESHOLD,
    MAX_HISTORY_ENTRIES,
    MEMORY_PRESSURE_CRITICAL,
    MEMORY_PRESSURE_HIGH,
    MEMORY_PRESSURE_MODERATE,
    MEMORY_WARNING_THRESHOLD_MB,
    MIN_HISTORY_SAMPLES,
    MIN_LEARNING_SAMPLES,
)
from xpcsviewer.xpcs_file.memory import _get_virtual_memory

from .logging_config import get_logger
from .memory_manager import MemoryPressure, get_memory_manager

logger = get_logger(__name__)

# Global flag to enable/disable predictive monitoring overhead.
# Set XPCS_MONITORING_ENABLED=1 to turn on predict/record instrumentation.
# Default OFF in production to eliminate ~10-50ms overhead per operation.
MONITORING_ENABLED: bool = os.environ.get("XPCS_MONITORING_ENABLED", "0") == "1"


[docs] @dataclass class MemoryPrediction: """Container for memory usage predictions.""" predicted_mb: float confidence: float # 0.0 to 1.0 time_horizon_seconds: float pressure_level: MemoryPressure recommended_actions: list[str]
# Lightweight no-op prediction returned when monitoring is disabled. _NOOP_PREDICTION = MemoryPrediction( predicted_mb=0.0, confidence=0.0, time_horizon_seconds=0.0, pressure_level=MemoryPressure.LOW, recommended_actions=[], )
[docs] @dataclass class OperationProfile: """Memory profile for a specific operation type.""" operation_type: str avg_memory_mb: float peak_memory_mb: float duration_seconds: float input_size_correlation: float # How much memory scales with input size
[docs] class XPCSMemoryPredictor: """ Advanced memory pressure predictor for XPCS workflows. This predictor learns from historical memory usage patterns and provides proactive warnings before memory pressure situations. """
[docs] def __init__(self, history_size: int = 1000): self.history_size = history_size # Memory usage history self.memory_history: deque = deque(maxlen=history_size) self.operation_history: deque = deque(maxlen=history_size) # Operation profiles for different XPCS operations self.operation_profiles: dict[str, OperationProfile] = { "load_saxs_2d": OperationProfile("load_saxs_2d", 150.0, 300.0, 2.0, 1.2), "compute_saxs_log": OperationProfile( "compute_saxs_log", 100.0, 200.0, 1.5, 1.0 ), "fit_g2": OperationProfile("fit_g2", 50.0, 100.0, 5.0, 0.3), "fit_g2_parallel": OperationProfile( "fit_g2_parallel", 200.0, 400.0, 3.0, 0.3 ), "load_twotime_c2": OperationProfile( "load_twotime_c2", 500.0, 1000.0, 3.0, 2.0 ), "average_files": OperationProfile("average_files", 100.0, 300.0, 10.0, 0.8), "roi_calculation": OperationProfile( "roi_calculation", 30.0, 60.0, 1.0, 0.5 ), } # Memory manager for coordinated cleanup self.memory_manager = get_memory_manager() # Prediction models self._trend_window = 10 # Number of recent samples for trend analysis self._peak_detection_window = 5 # Window for peak detection logger.info("XPCSMemoryPredictor initialized")
[docs] def record_operation( self, operation_type: str, input_size_mb: float, memory_before_mb: float, memory_after_mb: float, duration_seconds: float, ): """ Record memory usage for a completed operation. Parameters ---------- operation_type : str Type of operation ('load_saxs_2d', 'fit_g2', etc.) input_size_mb : float Size of input data in MB memory_before_mb : float Memory usage before operation memory_after_mb : float Memory usage after operation duration_seconds : float How long the operation took """ memory_delta = memory_after_mb - memory_before_mb timestamp = time.time() # Record the operation operation_record = { "timestamp": timestamp, "operation_type": operation_type, "input_size_mb": input_size_mb, "memory_before_mb": memory_before_mb, "memory_after_mb": memory_after_mb, "memory_delta_mb": memory_delta, "duration_seconds": duration_seconds, } self.operation_history.append(operation_record) # Update operation profile with exponential moving average if operation_type in self.operation_profiles: profile = self.operation_profiles[operation_type] alpha = 0.1 # Learning rate # Update average memory usage profile.avg_memory_mb = ( alpha * abs(memory_delta) + (1 - alpha) * profile.avg_memory_mb ) # Update peak memory (max of recent observations) profile.peak_memory_mb = max( abs(memory_delta) * 1.5, # Safety factor profile.peak_memory_mb * 0.95, # Gradual decay ) # Update duration profile.duration_seconds = ( alpha * duration_seconds + (1 - alpha) * profile.duration_seconds ) # Update size correlation if we have meaningful input size if ( input_size_mb > MIN_LEARNING_SAMPLES ): # Only for operations with significant data size_correlation = abs(memory_delta) / input_size_mb profile.input_size_correlation = ( alpha * size_correlation + (1 - alpha) * profile.input_size_correlation ) logger.debug( f"Updated profile for {operation_type}: " f"avg={profile.avg_memory_mb:.1f}MB, " f"peak={profile.peak_memory_mb:.1f}MB" )
[docs] def predict_operation_memory( self, operation_type: str, input_size_mb: float = 0.0 ) -> MemoryPrediction: """ Predict memory usage for a planned operation. Parameters ---------- operation_type : str Type of operation to predict input_size_mb : float Expected input data size Returns ------- MemoryPrediction Prediction with memory estimate and recommendations """ current_memory = _get_virtual_memory() current_usage_mb = (current_memory.total - current_memory.available) / ( 1024 * 1024 ) if operation_type not in self.operation_profiles: # Create default profile for unknown operations predicted_mb = max(50.0, input_size_mb * 0.5) confidence = 0.3 else: profile = self.operation_profiles[operation_type] # Base prediction from profile base_prediction = profile.avg_memory_mb # Scale by input size if relevant if input_size_mb > MIN_LEARNING_SAMPLES: size_scaling = input_size_mb * profile.input_size_correlation predicted_mb = base_prediction + size_scaling else: predicted_mb = base_prediction # Add safety buffer based on historical variance safety_factor = 1.3 # 30% safety buffer predicted_mb *= safety_factor # Confidence based on amount of historical data num_observations = len( [ op for op in self.operation_history if op["operation_type"] == operation_type ] ) confidence = min(0.9, 0.3 + 0.1 * num_observations) # Predict resulting memory usage predicted_total_mb = current_usage_mb + predicted_mb # Determine pressure level memory_total_mb = current_memory.total / (1024 * 1024) pressure_ratio = predicted_total_mb / memory_total_mb if pressure_ratio >= MEMORY_PRESSURE_CRITICAL: pressure_level = MemoryPressure.CRITICAL elif pressure_ratio >= MEMORY_PRESSURE_HIGH: pressure_level = MemoryPressure.HIGH elif pressure_ratio >= MEMORY_PRESSURE_MODERATE: pressure_level = MemoryPressure.MODERATE else: pressure_level = MemoryPressure.LOW # Generate recommendations recommendations = self._generate_recommendations( pressure_level, operation_type, predicted_mb, current_usage_mb ) return MemoryPrediction( predicted_mb=predicted_mb, confidence=confidence, time_horizon_seconds=profile.duration_seconds if operation_type in self.operation_profiles else 5.0, pressure_level=pressure_level, recommended_actions=recommendations, )
def _generate_recommendations( self, pressure_level: MemoryPressure, operation_type: str, predicted_mb: float, current_usage_mb: float, ) -> list[str]: """Generate memory management recommendations.""" recommendations = [] if pressure_level == MemoryPressure.CRITICAL: recommendations.extend( [ "CRITICAL: Clear all non-essential caches immediately", "Consider breaking operation into smaller chunks", "Close other applications if possible", f"Operation will require {predicted_mb:.0f}MB additional memory", ] ) elif pressure_level == MemoryPressure.HIGH: recommendations.extend( [ "Clear computation and plot caches", "Consider using streaming/chunked processing", "Monitor memory usage closely during operation", ] ) elif pressure_level == MemoryPressure.MODERATE: recommendations.extend( [ "Consider clearing old cached data", "Monitor for memory leaks during operation", ] ) # Operation-specific recommendations if ( operation_type == "load_saxs_2d" and predicted_mb > MEMORY_WARNING_THRESHOLD_MB ): recommendations.append("Consider using memory-mapped file access") elif operation_type == "fit_g2" and predicted_mb > MAX_HISTORY_ENTRIES: recommendations.append( "Consider using sequential fitting instead of parallel" ) elif operation_type == "load_twotime_c2" and predicted_mb > 500: recommendations.extend( ["Use chunked C2 loading", "Clear SAXS data cache before loading C2"] ) return recommendations
[docs] def check_proactive_cleanup_needed(self) -> tuple[bool, list[str]]: """ Check if proactive cleanup is recommended based on trends. Returns ------- tuple[bool, list[str]] (cleanup_needed, reasons) """ trends = self.detect_memory_trends() current_pressure = self.memory_manager.get_memory_pressure() cleanup_needed = False reasons = [] # Check for rapid memory growth if ( trends["growth_rate_mb_per_hour"] > MAX_HISTORY_ENTRIES ): # Growing > 100MB/hour cleanup_needed = True reasons.append( f"Rapid memory growth: {trends['growth_rate_mb_per_hour']:.0f}MB/hour" ) # Check for high volatility (possible memory leaks) if trends["volatility"] > 50: # High volatility cleanup_needed = True reasons.append(f"High memory volatility: {trends['volatility']:.0f}MB") # Check current pressure level if current_pressure in [MemoryPressure.HIGH, MemoryPressure.CRITICAL]: cleanup_needed = True reasons.append(f"Current memory pressure: {current_pressure.value}") # Check cache efficiency cache_stats = self.memory_manager.get_cache_stats() if ( cache_stats.get("cache_efficiency", 1.0) < LOW_CONFIDENCE_THRESHOLD ): # Low hit rate cleanup_needed = True reasons.append("Low cache efficiency suggests wasted memory") return cleanup_needed, reasons
[docs] def update_memory_snapshot(self): """Update the memory usage snapshot for trend analysis.""" memory = _get_virtual_memory() timestamp = time.time() memory_snapshot = { "timestamp": timestamp, "memory_mb": (memory.total - memory.available) / (1024 * 1024), "memory_percent": memory.percent, "available_mb": memory.available / (1024 * 1024), } self.memory_history.append(memory_snapshot)
[docs] def get_prediction_summary(self) -> dict[str, Any]: """Get a summary of current memory predictions and trends.""" trends = self.detect_memory_trends() cleanup_needed, cleanup_reasons = self.check_proactive_cleanup_needed() current_memory = _get_virtual_memory() return { "current_memory_mb": (current_memory.total - current_memory.available) / (1024 * 1024), "current_pressure": self.memory_manager.get_memory_pressure().value, "memory_trends": trends, "proactive_cleanup_needed": cleanup_needed, "cleanup_reasons": cleanup_reasons, "operation_profiles": { name: { "avg_memory_mb": profile.avg_memory_mb, "peak_memory_mb": profile.peak_memory_mb, "duration_seconds": profile.duration_seconds, } for name, profile in self.operation_profiles.items() }, "cache_stats": self.memory_manager.get_cache_stats(), }
# Global predictor instance _global_predictor: XPCSMemoryPredictor | None = None
[docs] def get_memory_predictor() -> XPCSMemoryPredictor: """Get or create the global memory predictor instance.""" global _global_predictor # noqa: PLW0603 - intentional singleton pattern if _global_predictor is None: _global_predictor = XPCSMemoryPredictor() return _global_predictor
[docs] def predict_operation_memory( operation_type: str, input_size_mb: float = 0.0 ) -> MemoryPrediction: """Convenience function for memory prediction. Returns a cheap no-op prediction when MONITORING_ENABLED is False. """ if not MONITORING_ENABLED: return _NOOP_PREDICTION return get_memory_predictor().predict_operation_memory( operation_type, input_size_mb )
[docs] def record_operation_memory( operation_type: str, input_size_mb: float, memory_before_mb: float, memory_after_mb: float, duration_seconds: float, ): """Convenience function for recording operation memory usage. No-op when MONITORING_ENABLED is False. """ if not MONITORING_ENABLED: return None return get_memory_predictor().record_operation( operation_type, input_size_mb, memory_before_mb, memory_after_mb, duration_seconds, )