"""
Unified Memory Management System for XPCS Viewer
This module provides a comprehensive memory management solution that consolidates
all caching strategies and implements intelligent memory pressure handling.
"""
import gc
import threading
import time
import weakref
from collections import OrderedDict
from collections.abc import Callable
from contextlib import contextmanager
from enum import Enum
from typing import Any
import numpy as np
import psutil
from xpcsviewer.constants import CACHE_LONG_EXPIRY, MIN_CACHE_ENTRIES
from xpcsviewer.utils.logging_config import get_logger
logger = get_logger(__name__)
[docs]
class CacheType(Enum):
"""Types of cached data for different management strategies."""
COMPUTATION = "computation" # Fitting results, FFT data, etc.
ARRAY_DATA = "array_data" # SAXS 2D, log data, etc.
METADATA = "metadata" # File headers, qmaps, etc.
PLOT_DATA = "plot_data" # Plot configurations, curves, etc.
[docs]
class MemoryPressure(Enum):
"""Memory pressure levels for adaptive management."""
LOW = "low" # < 60% memory usage
MODERATE = "moderate" # 60-75% memory usage
HIGH = "high" # 75-85% memory usage
CRITICAL = "critical" # > 85% memory usage
[docs]
class CacheEntry:
"""Enhanced cache entry with comprehensive metadata."""
[docs]
def __init__(self, data: Any, cache_type: CacheType, size_mb: float | None = None):
self.data = data
self.cache_type = cache_type
self.size_mb = size_mb or self._estimate_size_mb(data)
self.created_at = time.time()
self.last_accessed = self.created_at
self.access_count = 0
self.access_frequency = 0.0 # accesses per second
self.is_pinned = False # Prevent eviction if True
self.generation = 0 # For generational cache management
def _estimate_size_mb(self, data: Any) -> float:
"""Estimate memory usage of cached data."""
if isinstance(data, np.ndarray):
return data.nbytes / (1024 * 1024)
if hasattr(data, "__sizeof__"):
return data.__sizeof__() / (1024 * 1024)
import sys
return sys.getsizeof(data) / (1024 * 1024)
[docs]
def touch(self) -> None:
"""Update access metadata."""
current_time = time.time()
self.access_count += 1
# Calculate access frequency (exponential moving average)
time_delta = current_time - self.last_accessed
if time_delta > 0:
recent_frequency = 1.0 / time_delta
alpha = 0.3 # Smoothing factor
self.access_frequency = (
alpha * recent_frequency + (1 - alpha) * self.access_frequency
)
self.last_accessed = current_time
[docs]
def age_seconds(self) -> float:
"""Get age of cache entry in seconds."""
return time.time() - self.created_at
[docs]
def time_since_access(self) -> float:
"""Get time since last access in seconds."""
return time.time() - self.last_accessed
[docs]
class UnifiedMemoryManager:
"""
Unified memory management system that consolidates all caching strategies.
Features:
- Predictive memory pressure detection
- Intelligent cache eviction with multiple strategies
- Type-aware cache management
- Memory-mapped file support for large arrays
- Automatic memory optimization
"""
[docs]
def __init__(
self,
max_memory_mb: float = 2048,
pressure_thresholds: dict[str, float] | None = None,
enable_monitoring: bool = True,
):
self.max_memory_mb = max_memory_mb
self.pressure_thresholds = pressure_thresholds or {
"low": 0.6,
"moderate": 0.75,
"high": 0.85,
"critical": 0.95,
}
self.enable_monitoring = enable_monitoring
# Cache storage with type separation
self._caches: dict[CacheType, OrderedDict[str, CacheEntry]] = {
cache_type: OrderedDict() for cache_type in CacheType
}
# Memory tracking
self._current_memory_mb = 0.0
self._peak_memory_mb = 0.0
# Per-partition running totals — avoids O(n) sum on every cache_put
self._partition_usage_mb: dict[CacheType, float] = dict.fromkeys(CacheType, 0.0)
self._memory_history: list[tuple[float, float]] = [] # (timestamp, memory_mb)
# Thread safety
self._cache_locks = {cache_type: threading.RLock() for cache_type in CacheType}
self._global_lock = threading.RLock()
# Weak references for automatic cleanup
self._object_registry: weakref.WeakSet[Any] = weakref.WeakSet()
# Monitoring and statistics
self._stats: dict[str, Any] = {
"cache_hits": 0,
"cache_misses": 0,
"evictions": 0,
"memory_pressure_events": 0,
"automatic_cleanups": 0,
"preload_hits": 0,
"preload_requests": 0,
"compression_saves_mb": 0,
}
# Advanced caching features
self._preload_queue: dict[
str, tuple[CacheType, Any, float]
] = {} # key -> (cache_type, loader_func, priority)
self._compression_enabled = True
self._hot_data_threshold = 5 # Access count to mark as hot
self._cache_partitions = self._setup_partitions()
# Background monitoring
self._monitoring_thread = None
self._shutdown_event = threading.Event()
if self.enable_monitoring:
self._start_monitoring()
logger.info(f"UnifiedMemoryManager initialized with {max_memory_mb}MB limit")
def _make_space_in_partition(
self, cache_type: CacheType, required_mb: float
) -> None:
"""Make space within a specific cache partition using intelligent eviction"""
cache = self._caches[cache_type]
partition_limit_mb = self.max_memory_mb * self._cache_partitions[cache_type]
current_mb = self._partition_usage_mb[cache_type]
if current_mb + required_mb <= partition_limit_mb:
return # No eviction needed
target_mb = partition_limit_mb * 0.7 # Leave 30% buffer
evicted_mb = 0.0
# First pass: evict aged items
aged_keys = []
for key, entry in cache.items():
if not entry.is_pinned and entry.time_since_access() > 900: # 15 minutes
aged_keys.append((key, entry.size_mb))
# Sort by age (oldest first)
aged_keys.sort(key=lambda x: cache[x[0]].time_since_access(), reverse=True)
for key, size_mb in aged_keys:
if current_mb - evicted_mb <= target_mb:
break
self._evict_entry(cache_type, key)
evicted_mb += size_mb
# Second pass: LRU eviction if still needed
if current_mb - evicted_mb > target_mb:
self._evict_lru(cache_type, target_mb - (current_mb - evicted_mb))
[docs]
def preload_data(
self,
key: str,
loader_func: Callable[[], Any],
cache_type: CacheType,
priority: int = 5,
):
"""
Schedule data for intelligent preloading.
Parameters
----------
key : str
Cache key for the data
loader_func : callable
Function to load the data
cache_type : CacheType
Type of cache to store in
priority : int
Priority level (1-10, higher = more important)
"""
self._preload_queue[key] = (cache_type, loader_func, priority)
self._stats["preload_requests"] += 1
logger.debug(f"Queued preload for {key} with priority {priority}")
def _process_preload_queue(self):
"""Process preload queue during low memory pressure periods"""
if self.get_memory_pressure() != MemoryPressure.LOW:
return
# Sort by priority (highest first)
sorted_items = sorted(
self._preload_queue.items(), key=lambda x: x[1][2], reverse=True
)
for key, (cache_type, loader_func, _priority) in sorted_items[
:3
]: # Process top 3
try:
if key not in self._caches[cache_type]: # Only if not already cached
data = loader_func()
if self.cache_put(key, data, cache_type):
logger.debug(f"Successfully preloaded {key}")
self._stats["preload_hits"] += 1
# Remove from queue after processing
del self._preload_queue[key]
except Exception as e:
logger.debug(f"Preload failed for {key}: {e}")
# Don't remove failed items immediately - retry later
def _setup_partitions(self) -> dict[CacheType, float]:
"""Setup cache memory partitions with intelligent allocation"""
return {
CacheType.ARRAY_DATA: 0.5, # 50% for large array data
CacheType.COMPUTATION: 0.25, # 25% for computation results
CacheType.PLOT_DATA: 0.15, # 15% for plot data
CacheType.METADATA: 0.10, # 10% for metadata
}
def _start_monitoring(self):
"""Start background memory monitoring.
Registers a periodic callback with the HealthMonitor if it is running,
so that memory monitoring shares the health monitor's daemon thread
rather than spawning a separate thread (BUG-050). Falls back to a
dedicated thread if the health monitor is not available.
"""
import os
# Skip starting background threads in test mode to prevent threading issues
if os.environ.get("XPCS_TEST_MODE") == "1":
return
# Prefer registering with the health monitor to reduce thread count.
try:
from .health_monitor import get_health_monitor
health_monitor = get_health_monitor()
health_monitor.register_periodic_callback(self._monitoring_step)
logger.debug(
"MemoryManager registered with HealthMonitor periodic callback "
"(no separate monitoring thread started)"
)
return
except Exception:
pass # Fall through to dedicated thread
self._monitoring_thread = threading.Thread(
target=self._monitoring_loop, daemon=True
)
self._monitoring_thread.start()
def _monitoring_step(self):
"""Single monitoring step called from the health monitor's thread."""
try:
self._update_memory_history()
self._check_memory_pressure()
self._perform_maintenance()
# Process preload queue during low pressure periods
if len(self._preload_queue) > 0:
self._process_preload_queue()
except Exception as e:
logger.warning(f"Memory monitoring error: {e}")
def _monitoring_loop(self):
"""Background monitoring loop for memory pressure and optimization (fallback)."""
while not self._shutdown_event.wait(timeout=10.0): # Check every 10 seconds
self._monitoring_step()
def _update_memory_history(self):
"""Update memory usage history for trend analysis."""
system_memory = psutil.virtual_memory()
current_time = time.time()
with self._global_lock:
self._memory_history.append((current_time, system_memory.percent))
# Keep only last hour of history
cutoff_time = current_time - 3600
self._memory_history = [
(t, m) for t, m in self._memory_history if t > cutoff_time
]
def _check_memory_pressure(self):
"""Check for memory pressure and trigger appropriate responses."""
pressure = self.get_memory_pressure()
if pressure in [MemoryPressure.HIGH, MemoryPressure.CRITICAL]:
self._stats["memory_pressure_events"] += 1
logger.warning(f"Memory pressure detected: {pressure.value}")
# Trigger appropriate cleanup based on pressure level
if pressure == MemoryPressure.CRITICAL:
self._emergency_cleanup()
else:
self._aggressive_cleanup()
def _perform_maintenance(self):
"""Perform routine cache maintenance."""
with self._global_lock:
# Update cache generation for aging-based eviction
time.time()
for cache_type in CacheType:
with self._cache_locks[cache_type]:
cache = self._caches[cache_type]
# Age out old entries
aged_keys = []
for key, entry in cache.items():
if entry.time_since_access() > CACHE_LONG_EXPIRY: # 30 minutes
aged_keys.append(key)
for key in aged_keys:
if not cache[key].is_pinned:
self._evict_entry(cache_type, key)
[docs]
def get_memory_pressure(self) -> MemoryPressure:
"""Get current memory pressure level."""
system_memory = psutil.virtual_memory()
pressure_ratio = system_memory.percent / 100.0
if pressure_ratio >= self.pressure_thresholds["critical"]:
return MemoryPressure.CRITICAL
if pressure_ratio >= self.pressure_thresholds["high"]:
return MemoryPressure.HIGH
if pressure_ratio >= self.pressure_thresholds["moderate"]:
return MemoryPressure.MODERATE
return MemoryPressure.LOW
[docs]
def cache_put(
self, key: str, data: Any, cache_type: CacheType, pin: bool = False
) -> bool:
"""
Store data in the appropriate cache with intelligent eviction.
Parameters
----------
key : str
Unique cache key
data : Any
Data to cache
cache_type : CacheType
Type of cache for management strategy
pin : bool
Prevent eviction if True
Returns
-------
bool
True if successfully cached
"""
# Check partition limits before creating entry
partition_limit_mb = self.max_memory_mb * self._cache_partitions[cache_type]
entry = CacheEntry(data, cache_type)
entry.is_pinned = pin
# Check if data is too large for its partition
if entry.size_mb > partition_limit_mb * 0.8: # 80% of partition
logger.warning(
f"Data too large for {cache_type.value} partition: {entry.size_mb:.1f}MB > {partition_limit_mb * 0.8:.1f}MB"
)
return False
with self._cache_locks[cache_type]:
cache = self._caches[cache_type]
# Use running partition total instead of O(n) recomputation
current_partition_mb = self._partition_usage_mb[cache_type]
# Make space if needed within partition
if current_partition_mb + entry.size_mb > partition_limit_mb:
self._make_space_in_partition(cache_type, entry.size_mb)
# Check if we need to make space
if entry.size_mb > self.max_memory_mb * 0.5:
logger.warning(f"Data too large to cache: {entry.size_mb:.1f}MB")
return False
# Evict if necessary
required_space = entry.size_mb
if self._current_memory_mb + required_space > self.max_memory_mb:
self._make_space(cache_type, required_space)
# Remove existing entry if key exists
if key in cache:
old_entry = cache[key]
self._current_memory_mb -= old_entry.size_mb
self._partition_usage_mb[cache_type] -= old_entry.size_mb
del cache[key]
# Add new entry
cache[key] = entry
self._current_memory_mb += entry.size_mb
self._partition_usage_mb[cache_type] += entry.size_mb
# Update peak memory
self._peak_memory_mb = max(self._peak_memory_mb, self._current_memory_mb)
logger.debug(
f"Cached {key} ({entry.size_mb:.1f}MB, type: {cache_type.value})"
)
return True
[docs]
def cache_get(self, key: str, cache_type: CacheType) -> Any | None:
"""
Retrieve data from cache.
Parameters
----------
key : str
Cache key
cache_type : CacheType
Type of cache to search
Returns
-------
Any or None
Cached data or None if not found
"""
with self._cache_locks[cache_type]:
cache = self._caches[cache_type]
if key in cache:
entry = cache[key]
entry.touch()
# Move to end (LRU)
cache.move_to_end(key)
self._stats["cache_hits"] += 1
logger.debug(f"Cache hit for {key}")
return entry.data
self._stats["cache_misses"] += 1
logger.debug(f"Cache miss for {key}")
return None
def _make_space(self, cache_type: CacheType, required_mb: float):
"""Make space in cache using intelligent eviction strategies."""
freed_mb = 0.0
# Strategy 1: Evict from same cache type first
freed_mb += self._evict_lru(cache_type, required_mb * 0.7)
# Strategy 2: Evict from less critical cache types
if freed_mb < required_mb:
eviction_order = [
CacheType.PLOT_DATA,
CacheType.COMPUTATION,
CacheType.METADATA,
CacheType.ARRAY_DATA,
]
for other_type in eviction_order:
if other_type != cache_type and freed_mb < required_mb:
freed_mb += self._evict_lru(
other_type, (required_mb - freed_mb) * 1.2
)
# Strategy 3: Age-based eviction
if freed_mb < required_mb:
freed_mb += self._evict_by_age(1800) # 30 minutes
return freed_mb
def _evict_lru(self, cache_type: CacheType, target_mb: float) -> float:
"""Evict least recently used entries from specified cache."""
freed_mb = 0.0
with self._cache_locks[cache_type]:
cache = self._caches[cache_type]
# Sort by last access time (oldest first)
sorted_items = sorted(cache.items(), key=lambda x: x[1].last_accessed)
keys_to_remove = []
for key, entry in sorted_items:
if freed_mb >= target_mb:
break
if not entry.is_pinned:
keys_to_remove.append(key)
freed_mb += entry.size_mb
# Remove entries
for key in keys_to_remove:
self._evict_entry(cache_type, key)
return freed_mb
def _evict_by_age(self, max_age_seconds: float) -> float:
"""Evict entries older than specified age."""
freed_mb = 0.0
current_time = time.time()
for cache_type in CacheType:
with self._cache_locks[cache_type]:
cache = self._caches[cache_type]
aged_keys = []
for key, entry in cache.items():
if (
current_time - entry.created_at > max_age_seconds
and not entry.is_pinned
):
aged_keys.append(key)
for key in aged_keys:
entry = cache[key]
freed_mb += entry.size_mb
self._evict_entry(cache_type, key)
return freed_mb
def _evict_entry(self, cache_type: CacheType, key: str):
"""Remove entry from cache and update counters."""
with self._cache_locks[cache_type]:
cache = self._caches[cache_type]
if key in cache:
entry = cache[key]
self._current_memory_mb -= entry.size_mb
self._partition_usage_mb[cache_type] -= entry.size_mb
del cache[key]
self._stats["evictions"] += 1
logger.debug(f"Evicted {key} ({entry.size_mb:.1f}MB)")
def _aggressive_cleanup(self):
"""Perform aggressive cleanup under memory pressure."""
logger.info("Performing aggressive memory cleanup")
# Clear non-essential caches
self.clear_cache_type(CacheType.PLOT_DATA)
# Evict old computation results
self._evict_by_age(900) # 15 minutes
# Force garbage collection
gc.collect()
self._stats["automatic_cleanups"] += 1
def _emergency_cleanup(self):
"""Emergency cleanup under critical memory pressure."""
logger.warning("Performing emergency memory cleanup")
# Clear all non-pinned caches
for cache_type in [CacheType.PLOT_DATA, CacheType.COMPUTATION]:
self.clear_cache_type(cache_type)
# Aggressive eviction from remaining caches
self._evict_by_age(300) # 5 minutes
# Multiple garbage collection passes
for _ in range(3):
gc.collect()
self._stats["automatic_cleanups"] += 1
[docs]
def clear_cache_type(self, cache_type: CacheType):
"""Clear all entries from specified cache type."""
with self._cache_locks[cache_type]:
cache = self._caches[cache_type]
# Use running partition total
freed_mb = self._partition_usage_mb[cache_type]
cache.clear()
self._current_memory_mb -= freed_mb
self._partition_usage_mb[cache_type] = 0.0
logger.info(f"Cleared {cache_type.value} cache, freed {freed_mb:.1f}MB")
[docs]
def clear_all_caches(self):
"""Clear all caches."""
for cache_type in CacheType:
self.clear_cache_type(cache_type)
logger.info("Cleared all caches")
[docs]
def get_cache_stats(self) -> dict[str, Any]:
"""Get comprehensive cache statistics."""
with self._global_lock:
stats = self._stats.copy()
# Add current state
stats.update(
{
"current_memory_mb": self._current_memory_mb,
"peak_memory_mb": self._peak_memory_mb,
"max_memory_mb": self.max_memory_mb,
"memory_utilization": self._current_memory_mb / self.max_memory_mb,
"cache_efficiency": (
stats["cache_hits"]
/ max(1, stats["cache_hits"] + stats["cache_misses"])
),
"memory_pressure": self.get_memory_pressure().value,
}
)
# Add per-type statistics
for cache_type in CacheType:
with self._cache_locks[cache_type]:
cache = self._caches[cache_type]
type_memory = sum(entry.size_mb for entry in cache.values())
stats[f"{cache_type.value}_entries"] = len(cache)
stats[f"{cache_type.value}_memory_mb"] = type_memory
return stats
[docs]
def shutdown(self):
"""Shutdown memory manager and cleanup resources."""
if self._monitoring_thread:
self._shutdown_event.set()
self._monitoring_thread.join(timeout=5.0)
self.clear_all_caches()
logger.info("UnifiedMemoryManager shutdown complete")
[docs]
def get_enhanced_stats(self) -> dict[str, Any]:
"""Get enhanced statistics including new caching features"""
stats = self.get_cache_stats()
# Add new feature statistics
stats.update(
{
"preload_hit_ratio": (
stats["preload_hits"] / max(1, stats["preload_requests"])
),
"preload_queue_size": len(self._preload_queue),
"partition_usage": {
cache_type.value: {
"limit_mb": self.max_memory_mb
* self._cache_partitions[cache_type],
"used_mb": sum(
entry.size_mb for entry in self._caches[cache_type].values()
),
"utilization": sum(
entry.size_mb for entry in self._caches[cache_type].values()
)
/ (self.max_memory_mb * self._cache_partitions[cache_type]),
}
for cache_type in CacheType
},
}
)
return stats
# Global memory manager instance
_global_memory_manager: UnifiedMemoryManager | None = None
_global_memory_manager_lock = threading.Lock()
[docs]
def get_memory_manager() -> UnifiedMemoryManager:
"""Get or create the global memory manager instance.
Uses double-checked locking to be thread-safe under concurrent first
access from multiple threads without paying the lock cost on every
subsequent call. (BUG-030)
"""
global _global_memory_manager # noqa: PLW0603 - intentional singleton pattern
if _global_memory_manager is None:
with _global_memory_manager_lock:
if _global_memory_manager is None:
_global_memory_manager = UnifiedMemoryManager()
return _global_memory_manager
[docs]
def shutdown_memory_manager():
"""Shutdown the global memory manager."""
global _global_memory_manager # noqa: PLW0603 - intentional singleton pattern
if _global_memory_manager:
_global_memory_manager.shutdown()
_global_memory_manager = None
# Convenience functions for common operations
[docs]
def cache_computation(key: str, data: Any) -> bool:
"""Cache computation result."""
return get_memory_manager().cache_put(key, data, CacheType.COMPUTATION)
[docs]
def get_computation(key: str) -> Any | None:
"""Get cached computation result."""
return get_memory_manager().cache_get(key, CacheType.COMPUTATION)
[docs]
def cache_array(key: str, array: np.ndarray) -> bool:
"""Cache large array data."""
return get_memory_manager().cache_put(key, array, CacheType.ARRAY_DATA)
[docs]
def get_array(key: str) -> np.ndarray | None:
"""Get cached array data."""
return get_memory_manager().cache_get(key, CacheType.ARRAY_DATA)
[docs]
@contextmanager
def memory_pressure_monitor():
"""Context manager for monitoring memory pressure during operations."""
manager = get_memory_manager()
initial_pressure = manager.get_memory_pressure()
initial_memory = manager._current_memory_mb
try:
yield manager
finally:
final_pressure = manager.get_memory_pressure()
final_memory = manager._current_memory_mb
if final_pressure != initial_pressure:
logger.info(
f"Memory pressure changed: {initial_pressure.value} -> {final_pressure.value}"
)
memory_delta = final_memory - initial_memory
if abs(memory_delta) > MIN_CACHE_ENTRIES: # 50MB threshold
logger.info(f"Significant memory change: {memory_delta:+.1f}MB")