Source code for xpcsviewer.xpcs_file.cache

"""Data caching utilities for XpcsFile.

This module provides LRU caching with memory management for XPCS data.
"""

from __future__ import annotations

import sys
import threading
import time
from collections import OrderedDict
from typing import Any

import numpy as np

from xpcsviewer.utils.logging_config import get_logger
from xpcsviewer.xpcs_file.memory import MemoryMonitor

logger = get_logger(__name__)


[docs] class CacheItem: """Individual cache item with metadata."""
[docs] def __init__(self, data: Any, size_mb: float): self.data = data self.size_mb = size_mb self.access_count = 0 self.last_accessed = 0 self.created_at = 0 self._update_access_time()
def _update_access_time(self): """Update access timestamp and increment access count.""" self.last_accessed = time.time() self.access_count += 1 if self.created_at == 0: self.created_at = self.last_accessed
[docs] def touch(self): """Mark item as accessed.""" self._update_access_time()
[docs] class DataCache: """ LRU cache with memory limit and automatic cleanup for XPCS data. Features: - LRU eviction policy - Memory limit enforcement (default 500MB) - Automatic cleanup on memory pressure - Memory usage tracking per item - Thread-safe operations """
[docs] def __init__( self, max_memory_mb: float = 500.0, memory_pressure_threshold: float = 0.85 ): self.max_memory_mb = max_memory_mb self.memory_pressure_threshold = memory_pressure_threshold self._cache: OrderedDict[str, CacheItem] = OrderedDict() self._current_memory_mb = 0.0 self._lock = threading.RLock() self._cleanup_in_progress = False logger.info(f"DataCache initialized with {max_memory_mb}MB limit")
def _generate_key(self, file_path: str, data_type: str) -> str: """Generate cache key from file path and data type.""" return f"{file_path}:{data_type}" def _evict_lru_items(self, required_memory_mb: float = 0) -> float: """ Evict least recently used items to free memory. Parameters ---------- required_memory_mb : float Minimum memory to free Returns ------- float Amount of memory freed in MB """ freed_memory = 0.0 items_to_remove = [] # Sort by last access time (oldest first) sorted_items = sorted(self._cache.items(), key=lambda x: x[1].last_accessed) for key, item in sorted_items: if ( freed_memory >= required_memory_mb and self._current_memory_mb <= self.max_memory_mb ): break items_to_remove.append(key) freed_memory += item.size_mb self._current_memory_mb -= item.size_mb logger.debug(f"Evicting cache item {key}, size: {item.size_mb:.2f}MB") # Remove items from cache for key in items_to_remove: del self._cache[key] return freed_memory def _cleanup_on_memory_pressure(self): """Perform cleanup when system memory pressure is high.""" if self._cleanup_in_progress: return self._cleanup_in_progress = True try: if MemoryMonitor.is_memory_pressure_high(self.memory_pressure_threshold): # Aggressive cleanup: remove 50% of cache target_memory = self.max_memory_mb * 0.5 freed = self._evict_lru_items(self._current_memory_mb - target_memory) logger.info(f"Memory pressure cleanup: freed {freed:.2f}MB") finally: self._cleanup_in_progress = False
[docs] def put(self, file_path: str, data_type: str, data: Any) -> bool: """ Store data in cache. Parameters ---------- file_path : str File path identifier data_type : str Type of data ('saxs_2d', 'saxs_2d_log', etc.) data : Any Data to cache Returns ------- bool True if successfully cached """ with self._lock: key = self._generate_key(file_path, data_type) # Estimate memory usage if isinstance(data, np.ndarray): size_mb = MemoryMonitor.estimate_array_memory(data.shape, data.dtype) else: # Rough estimate for other data types size_mb = sys.getsizeof(data) / (1024 * 1024) # Check if data is too large for cache if size_mb > self.max_memory_mb * 0.8: logger.warning( f"Data too large for cache: {size_mb:.2f}MB > {self.max_memory_mb * 0.8:.2f}MB" ) return False # Evict items if necessary required_memory = size_mb if self._current_memory_mb + required_memory > self.max_memory_mb: self._evict_lru_items(required_memory) # Remove existing item if it exists if key in self._cache: old_item = self._cache[key] self._current_memory_mb -= old_item.size_mb del self._cache[key] # Add new item cache_item = CacheItem(data, size_mb) self._cache[key] = cache_item self._current_memory_mb += size_mb # Check for memory pressure self._cleanup_on_memory_pressure() logger.debug( f"Cached {key}, size: {size_mb:.2f}MB, total: {self._current_memory_mb:.2f}MB" ) return True
[docs] def get(self, file_path: str, data_type: str) -> Any | None: """ Retrieve data from cache. Parameters ---------- file_path : str File path identifier data_type : str Type of data Returns ------- Any or None Cached data or None if not found """ with self._lock: key = self._generate_key(file_path, data_type) if key in self._cache: item = self._cache[key] item.touch() # Move to end (most recently used) self._cache.move_to_end(key) logger.debug(f"Cache hit for {key}") return item.data logger.debug(f"Cache miss for {key}") return None
[docs] def clear(self): """Clear all cached data.""" with self._lock: self._cache.clear() self._current_memory_mb = 0.0 logger.info("Cache cleared")
[docs] def clear_file(self, file_path: str): """Clear all cached data for a specific file.""" with self._lock: keys_to_remove = [ key for key in self._cache if key.startswith(f"{file_path}:") ] freed_memory = 0.0 for key in keys_to_remove: item = self._cache[key] freed_memory += item.size_mb self._current_memory_mb -= item.size_mb del self._cache[key] if freed_memory > 0: logger.debug( f"Cleared {len(keys_to_remove)} items for {file_path}, freed {freed_memory:.2f}MB" )
[docs] def get_stats(self) -> dict[str, Any]: """Get cache statistics.""" with self._lock: used_mb, available_mb = MemoryMonitor.get_memory_usage() pressure = MemoryMonitor.get_memory_pressure() return { "cache_items": len(self._cache), "cache_memory_mb": self._current_memory_mb, "cache_limit_mb": self.max_memory_mb, "cache_utilization": self._current_memory_mb / self.max_memory_mb, "system_memory_used_mb": used_mb, "system_memory_available_mb": available_mb, "system_memory_pressure": pressure, "items_by_type": {}, }
[docs] def force_cleanup(self, target_memory_mb: float | None = None): """Force cleanup to target memory usage.""" with self._lock: if target_memory_mb is None: target_memory_mb = self.max_memory_mb * 0.5 if self._current_memory_mb > target_memory_mb: freed = self._evict_lru_items( self._current_memory_mb - target_memory_mb ) logger.info(f"Forced cleanup: freed {freed:.2f}MB")