Source code for xpcsviewer.utils.performance_monitor

"""
Performance Monitoring and Benchmarking Infrastructure for XPCS Viewer

This module provides comprehensive performance monitoring capabilities to track,
analyze, and benchmark the various optimizations implemented in the XPCS toolkit.
"""

import builtins
import json
import time
import traceback
from collections import defaultdict, deque
from collections.abc import Callable
from contextlib import contextmanager, suppress
from dataclasses import asdict, dataclass, field
from datetime import datetime
from functools import wraps
from pathlib import Path
from typing import Any

import numpy as np
import psutil

from .logging_config import get_logger
from .memory_manager import CacheType, get_memory_manager

logger = get_logger(__name__)


[docs] @dataclass class PerformanceMetrics: """Container for performance metrics.""" operation_name: str start_time: float end_time: float duration: float memory_before_mb: float memory_after_mb: float memory_peak_mb: float memory_delta_mb: float cpu_percent: float input_size_mb: float = 0.0 output_size_mb: float = 0.0 success: bool = True error_message: str = "" metadata: dict[str, Any] = field(default_factory=dict) @property def throughput_mbs(self) -> float: """Calculate throughput in MB/s.""" if self.duration > 0 and self.input_size_mb > 0: return self.input_size_mb / self.duration return 0.0 @property def memory_efficiency(self) -> float: """Calculate memory efficiency (input/peak memory).""" if self.memory_peak_mb > 0: return self.input_size_mb / self.memory_peak_mb return 0.0
[docs] @dataclass class BenchmarkResult: """Container for benchmark results.""" benchmark_name: str timestamp: str system_info: dict[str, Any] metrics: list[PerformanceMetrics] summary: dict[str, Any] duration_total: float success_rate: float
[docs] class PerformanceProfiler: """Context manager for profiling performance of operations."""
[docs] def __init__( self, operation_name: str, input_size_mb: float = 0.0, metadata: dict[str, Any] | None = None, ): self.operation_name = operation_name self.input_size_mb = input_size_mb self.metadata = metadata or {} # Metrics tracking # Metrics tracking self.start_time: float = 0.0 self.end_time: float = 0.0 self.memory_before_mb: float = 0.0 self.memory_after_mb: float = 0.0 self.memory_peak_mb: float = 0.0 self.cpu_percent: float = 0.0 self.success = True self.error_message = "" # Memory manager for additional context self.memory_manager = get_memory_manager()
[docs] def __enter__(self): """Start profiling.""" self.start_time = time.time() # Capture initial memory state memory = psutil.virtual_memory() self.memory_before_mb = (memory.total - memory.available) / (1024 * 1024) self.memory_peak_mb = self.memory_before_mb # Start CPU monitoring psutil.cpu_percent() # First call to initialize logger.debug(f"Started profiling operation: {self.operation_name}") return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """End profiling and create metrics.""" self.end_time = time.time() # Capture final memory state memory = psutil.virtual_memory() self.memory_after_mb = (memory.total - memory.available) / (1024 * 1024) # Get CPU usage self.cpu_percent = psutil.cpu_percent() # Handle exceptions if exc_type is not None: self.success = False self.error_message = str(exc_val) logger.warning(f"Operation {self.operation_name} failed: {exc_val}") # Create performance metrics metrics = PerformanceMetrics( operation_name=self.operation_name, start_time=self.start_time, end_time=self.end_time, duration=self.end_time - self.start_time, memory_before_mb=self.memory_before_mb, memory_after_mb=self.memory_after_mb, memory_peak_mb=max(self.memory_peak_mb, self.memory_after_mb), memory_delta_mb=self.memory_after_mb - self.memory_before_mb, cpu_percent=self.cpu_percent, input_size_mb=self.input_size_mb, success=self.success, error_message=self.error_message, metadata=self.metadata, ) # Register metrics with global monitor PerformanceMonitor.get_instance().record_metrics(metrics) logger.debug( f"Completed profiling {self.operation_name}: " f"{metrics.duration:.3f}s, memory: {metrics.memory_delta_mb:+.1f}MB" )
[docs] def update_memory_peak(self, current_memory_mb: float): """Update peak memory usage during operation.""" self.memory_peak_mb = max(self.memory_peak_mb, current_memory_mb)
[docs] class PerformanceMonitor: """Global performance monitoring system.""" _instance = None
[docs] def __init__(self, max_history: int = 10000): self.max_history = max_history self.metrics_history: deque = deque(maxlen=max_history) self.operation_stats: dict[str, list[PerformanceMetrics]] = defaultdict(list) self.benchmarks: list[BenchmarkResult] = [] # System information self.system_info = self._get_system_info() logger.info("PerformanceMonitor initialized")
[docs] @classmethod def get_instance(cls) -> "PerformanceMonitor": """Get or create global performance monitor instance.""" if cls._instance is None: cls._instance = cls() return cls._instance
[docs] def record_metrics(self, metrics: PerformanceMetrics): """Record performance metrics.""" self.metrics_history.append(metrics) self.operation_stats[metrics.operation_name].append(metrics) # Limit per-operation history to prevent unbounded growth if len(self.operation_stats[metrics.operation_name]) > 1000: self.operation_stats[metrics.operation_name].pop(0)
[docs] def get_operation_stats(self, operation_name: str) -> dict[str, Any]: """Get statistics for a specific operation.""" metrics_list = self.operation_stats.get(operation_name, []) if not metrics_list: return {} # Calculate statistics durations = [m.duration for m in metrics_list if m.success] memory_deltas = [m.memory_delta_mb for m in metrics_list if m.success] throughputs = [ m.throughput_mbs for m in metrics_list if m.success and m.throughput_mbs > 0 ] stats = { "operation_name": operation_name, "total_calls": len(metrics_list), "successful_calls": len(durations), "success_rate": len(durations) / len(metrics_list) if metrics_list else 0.0, "duration_stats": self._calculate_stats(durations), "memory_stats": self._calculate_stats(memory_deltas), "throughput_stats": self._calculate_stats(throughputs), "recent_performance": durations[-10:] if durations else [], } return stats
[docs] def get_overall_stats(self) -> dict[str, Any]: """Get overall performance statistics.""" all_operations = list(self.operation_stats.keys()) total_metrics = len(self.metrics_history) successful_operations = sum(1 for m in self.metrics_history if m.success) stats = { "total_operations": len(all_operations), "total_metrics": total_metrics, "overall_success_rate": successful_operations / total_metrics if total_metrics else 0.0, "operations_summary": { op: self.get_operation_stats(op) for op in all_operations }, "system_info": self.system_info, "monitoring_period": { "start_time": min(m.start_time for m in self.metrics_history) if self.metrics_history else None, "end_time": max(m.end_time for m in self.metrics_history) if self.metrics_history else None, }, } return stats
[docs] def export_metrics(self, filepath: str): """Export performance metrics to JSON file.""" stats = self.get_overall_stats() # Convert metrics to serializable format serializable_stats = self._make_serializable(stats) file_path = Path(filepath) file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, "w") as f: json.dump(serializable_stats, f, indent=2) logger.info(f"Performance metrics exported to {file_path}")
[docs] def clear_history(self): """Clear performance metrics history.""" self.metrics_history.clear() self.operation_stats.clear() logger.info("Performance metrics history cleared")
def _get_system_info(self) -> dict[str, Any]: """Collect system information.""" memory = psutil.virtual_memory() cpu_info = {} with suppress(builtins.BaseException): cpu_info = { "cpu_count": psutil.cpu_count(), "cpu_count_logical": psutil.cpu_count(logical=True), "cpu_freq": psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None, } return { "platform": psutil.WINDOWS if hasattr(psutil, "WINDOWS") else "unix", "memory_total_gb": memory.total / (1024**3), "cpu_info": cpu_info, "python_version": f"{__import__('sys').version_info.major}.{__import__('sys').version_info.minor}", "numpy_version": np.__version__, "timestamp": datetime.now().isoformat(), } def _calculate_stats(self, values: list[float]) -> dict[str, float]: """Calculate basic statistics for a list of values.""" if not values: return {} values_array = np.array(values) return { "count": len(values), "mean": float(np.mean(values_array)), "median": float(np.median(values_array)), "std": float(np.std(values_array)), "min": float(np.min(values_array)), "max": float(np.max(values_array)), "p95": float(np.percentile(values_array, 95)), "p99": float(np.percentile(values_array, 99)), } def _make_serializable(self, obj): """Convert object to JSON-serializable format.""" if isinstance(obj, dict): return {k: self._make_serializable(v) for k, v in obj.items()} if isinstance(obj, list): return [self._make_serializable(item) for item in obj] if isinstance(obj, (np.integer, np.floating)): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() if hasattr(obj, "_asdict"): # namedtuple return self._make_serializable(obj._asdict()) return obj
[docs] class XPCSBenchmarkSuite: """Comprehensive benchmark suite for XPCS performance testing."""
[docs] def __init__(self, output_dir: str = "benchmarks"): self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.monitor = PerformanceMonitor.get_instance()
[docs] def run_comprehensive_benchmark( self, sample_data_path: str | None = None ) -> BenchmarkResult: """Run comprehensive benchmark of all XPCS optimizations.""" benchmark_start = time.time() logger.info("Starting comprehensive XPCS benchmark suite") # Clear previous metrics self.monitor.clear_history() benchmark_metrics = [] try: # Memory management benchmarks benchmark_metrics.extend(self._benchmark_memory_management()) # I/O performance benchmarks if sample_data_path: benchmark_metrics.extend( self._benchmark_io_operations(sample_data_path) ) # G2 fitting benchmarks benchmark_metrics.extend(self._benchmark_g2_fitting()) # ROI calculation benchmarks benchmark_metrics.extend(self._benchmark_roi_calculations()) # SAXS processing benchmarks benchmark_metrics.extend(self._benchmark_saxs_processing()) except Exception as e: logger.error(f"Benchmark suite failed: {e}") logger.error(traceback.format_exc()) # Calculate overall results benchmark_end = time.time() duration_total = benchmark_end - benchmark_start successful_metrics = [m for m in benchmark_metrics if m.success] success_rate = ( len(successful_metrics) / len(benchmark_metrics) if benchmark_metrics else 0.0 ) # Create summary summary = self._create_benchmark_summary(benchmark_metrics) result = BenchmarkResult( benchmark_name="comprehensive_xpcs_benchmark", timestamp=datetime.now().isoformat(), system_info=self.monitor.system_info, metrics=benchmark_metrics, summary=summary, duration_total=duration_total, success_rate=success_rate, ) # Export results self._export_benchmark_result(result) logger.info( f"Comprehensive benchmark completed: {duration_total:.1f}s, " f"success rate: {success_rate:.1%}" ) return result
def _benchmark_memory_management(self) -> list[PerformanceMetrics]: """Benchmark memory management optimizations.""" logger.info("Benchmarking memory management...") memory_manager = get_memory_manager() # Test cache operations with PerformanceProfiler( "memory_cache_operations", metadata={"test": "cache_ops"} ) as profiler: # Create test data test_data = np.random.random((1000, 1000)).astype(np.float32) data_size_mb = test_data.nbytes / (1024 * 1024) profiler.input_size_mb = data_size_mb # Cache operations for i in range(100): cache_key = f"test_data_{i}" memory_manager.cache_put(cache_key, test_data, CacheType.ARRAY_DATA) retrieved = memory_manager.cache_get(cache_key, CacheType.ARRAY_DATA) assert retrieved is not None # Test memory pressure detection with PerformanceProfiler( "memory_pressure_detection", metadata={"test": "pressure"} ) as profiler: for _i in range(1000): memory_manager.get_memory_pressure() memory_manager.get_cache_stats() # Test cleanup operations with PerformanceProfiler( "memory_cleanup", metadata={"test": "cleanup"} ) as profiler: memory_manager._aggressive_cleanup() return [ m for m in self.monitor.metrics_history if m.operation_name.startswith("memory_") ] def _benchmark_io_operations( self, sample_data_path: str ) -> list[PerformanceMetrics]: """Benchmark I/O operations with enhanced HDF5 reader.""" logger.info("Benchmarking I/O operations...") from ..fileIO.hdf_reader import batch_read_fields from ..fileIO.hdf_reader_enhanced import get_enhanced_reader enhanced_reader = get_enhanced_reader() # Benchmark enhanced vs standard HDF5 reading sample_path = Path(sample_data_path) if sample_path.exists(): # Test enhanced reader with PerformanceProfiler( "hdf5_enhanced_read", metadata={"file": str(sample_path)} ) as profiler: try: data = enhanced_reader.read_dataset( str(sample_path), "/xpcs/scattering_2d", enable_read_ahead=True ) if hasattr(data, "nbytes"): profiler.input_size_mb = data.nbytes / (1024 * 1024) except Exception as e: logger.warning(f"Enhanced HDF5 read test failed: {e}") # Test standard reader for comparison with PerformanceProfiler( "hdf5_standard_read", metadata={"file": str(sample_path)} ) as profiler: try: result = batch_read_fields( str(sample_path), ["saxs_2d"], "alias", ftype="nexus" ) if "saxs_2d" in result and hasattr(result["saxs_2d"], "nbytes"): profiler.input_size_mb = result["saxs_2d"].nbytes / ( 1024 * 1024 ) except Exception as e: logger.warning(f"Standard HDF5 read test failed: {e}") return [m for m in self.monitor.metrics_history if "hdf5" in m.operation_name] def _benchmark_g2_fitting(self) -> list[PerformanceMetrics]: """Benchmark G2 fitting performance (sequential vs parallel).""" logger.info("Benchmarking G2 fitting...") # Create synthetic G2 data for testing num_tau = 100 num_q = 50 tau = np.logspace(-6, 2, num_tau) g2_synthetic = np.zeros((num_tau, num_q)) g2_err_synthetic = np.zeros((num_tau, num_q)) # Generate synthetic G2 curves for q in range(num_q): # Single exponential with noise true_tau = 10 ** (np.random.uniform(-3, 1)) # Random relaxation time contrast = np.random.uniform(0.1, 0.5) baseline = 1.0 g2_true = contrast * np.exp(-2 * tau / true_tau) + baseline noise = np.random.normal(0, 0.01, num_tau) g2_synthetic[:, q] = g2_true + noise g2_err_synthetic[:, q] = np.abs(noise) + 0.001 # Import fitting functions from ..fitting import fit_with_fixed, fit_with_fixed_parallel, single_exp # Prepare fitting parameters bounds = np.array( [[0.01, 1e-6, 0.8], [1.0, 1e6, 1.2]] ) # [contrast, tau, baseline] fit_flag = np.array([True, True, True]) fit_x = np.logspace( np.log10(np.min(tau)) - 0.5, np.log10(np.max(tau)) + 0.5, 128 ) # Benchmark sequential fitting with PerformanceProfiler( "g2_fitting_sequential", input_size_mb=g2_synthetic.nbytes / (1024 * 1024), metadata={"num_q": num_q, "num_tau": num_tau}, ): try: _fit_line_seq, _fit_val_seq = fit_with_fixed( single_exp, tau, g2_synthetic, g2_err_synthetic, bounds, fit_flag, fit_x, ) except Exception as e: logger.warning(f"Sequential G2 fitting test failed: {e}") # Benchmark parallel fitting with PerformanceProfiler( "g2_fitting_parallel", input_size_mb=g2_synthetic.nbytes / (1024 * 1024), metadata={"num_q": num_q, "num_tau": num_tau}, ): try: _fit_line_par, _fit_val_par = fit_with_fixed_parallel( single_exp, tau, g2_synthetic, g2_err_synthetic, bounds, fit_flag, fit_x, max_workers=4, ) except Exception as e: logger.warning(f"Parallel G2 fitting test failed: {e}") return [ m for m in self.monitor.metrics_history if "g2_fitting" in m.operation_name ] def _benchmark_roi_calculations(self) -> list[PerformanceMetrics]: """Benchmark ROI calculation performance.""" logger.info("Benchmarking ROI calculations...") # Create synthetic SAXS data shape = (10, 512, 512) # 10 time frames, 512x512 detector saxs_data = np.random.poisson(100, shape).astype(np.float32) data_size_mb = saxs_data.nbytes / (1024 * 1024) # Create geometry data y, x = np.mgrid[: shape[1], : shape[2]] center_x, center_y = shape[2] // 2, shape[1] // 2 geometry_data = { "qmap": np.sqrt((x - center_x) ** 2 + (y - center_y) ** 2) * 0.01, # Synthetic q-map "pmap": np.degrees(np.arctan2(y - center_y, x - center_x)) % 360, # Angular map "rmap": np.sqrt((x - center_x) ** 2 + (y - center_y) ** 2), # Radial map "mask": np.ones(shape[1:], dtype=bool), } # Import ROI calculators from ..utils.vectorized_roi import ( PieROICalculator, RingROICalculator, ROIParameters, ROIType, ) # Benchmark pie ROI calculation pie_params = ROIParameters( roi_type=ROIType.PIE, parameters={ "angle_range": (45, 135), "qmap_idx": np.arange(np.max(geometry_data["qmap"]) + 1), "qsize": 100, "qspan": np.linspace(0, np.max(geometry_data["qmap"]), 101), }, ) with PerformanceProfiler( "roi_pie_vectorized", input_size_mb=data_size_mb, metadata={"roi_type": "pie", "shape": shape}, ): try: pie_calculator = PieROICalculator() pie_calculator.calculate_roi(saxs_data, geometry_data, pie_params) except Exception as e: logger.warning(f"Pie ROI benchmark failed: {e}") # Benchmark ring ROI calculation ring_params = ROIParameters( roi_type=ROIType.RING, parameters={ "radius_range": (50, 150), "pmap": geometry_data["pmap"], "phi_num": 180, }, ) with PerformanceProfiler( "roi_ring_vectorized", input_size_mb=data_size_mb, metadata={"roi_type": "ring", "shape": shape}, ): try: ring_calculator = RingROICalculator() ring_calculator.calculate_roi(saxs_data, geometry_data, ring_params) except Exception as e: logger.warning(f"Ring ROI benchmark failed: {e}") return [m for m in self.monitor.metrics_history if "roi_" in m.operation_name] def _benchmark_saxs_processing(self) -> list[PerformanceMetrics]: """Benchmark SAXS data processing (log computation, streaming).""" logger.info("Benchmarking SAXS processing...") # Create synthetic SAXS data shape = (100, 256, 256) # Large SAXS dataset saxs_data = np.random.poisson(50, shape).astype(np.float32) data_size_mb = saxs_data.nbytes / (1024 * 1024) # Benchmark standard log computation with PerformanceProfiler( "saxs_log_standard", input_size_mb=data_size_mb, metadata={"shape": shape} ): try: # Standard log computation saxs_copy = np.copy(saxs_data) roi = saxs_copy > 0 if np.sum(roi) > 0: min_val = np.min(saxs_copy[roi]) saxs_copy[~roi] = min_val np.log10(saxs_copy).astype(np.float32) except Exception as e: logger.warning(f"Standard SAXS log benchmark failed: {e}") # Benchmark streaming log computation with PerformanceProfiler( "saxs_log_streaming", input_size_mb=data_size_mb, metadata={"shape": shape} ): try: from ..utils.streaming_processor import process_saxs_log_streaming process_saxs_log_streaming(saxs_data, chunk_size_mb=20.0) except Exception as e: logger.warning(f"Streaming SAXS log benchmark failed: {e}") return [m for m in self.monitor.metrics_history if "saxs_" in m.operation_name] def _create_benchmark_summary( self, metrics: list[PerformanceMetrics] ) -> dict[str, Any]: """Create summary of benchmark results.""" if not metrics: return {} # Group metrics by operation operation_groups = defaultdict(list) for metric in metrics: operation_groups[metric.operation_name].append(metric) summary: dict[str, Any] = { "total_operations": len(operation_groups), "total_metrics": len(metrics), "overall_success_rate": sum(1 for m in metrics if m.success) / len(metrics), "performance_comparison": {}, "memory_efficiency": {}, "throughput_analysis": {}, } # Analyze performance comparisons for op_name, op_metrics in operation_groups.items(): successful_metrics = [m for m in op_metrics if m.success] if successful_metrics: durations = [m.duration for m in successful_metrics] memory_deltas = [m.memory_delta_mb for m in successful_metrics] throughputs = [ m.throughput_mbs for m in successful_metrics if m.throughput_mbs > 0 ] summary["performance_comparison"][op_name] = { "average_duration": np.mean(durations), "average_memory_delta": np.mean(memory_deltas), "average_throughput": np.mean(throughputs) if throughputs else 0.0, "sample_count": len(successful_metrics), } # Highlight performance improvements if ( "g2_fitting_sequential" in summary["performance_comparison"] and "g2_fitting_parallel" in summary["performance_comparison"] ): seq_time = summary["performance_comparison"]["g2_fitting_sequential"][ "average_duration" ] par_time = summary["performance_comparison"]["g2_fitting_parallel"][ "average_duration" ] if par_time > 0: speedup = seq_time / par_time summary["g2_fitting_speedup"] = speedup return summary def _export_benchmark_result(self, result: BenchmarkResult): """Export benchmark result to file.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"xpcs_benchmark_{timestamp}.json" filepath = self.output_dir / filename # Convert to serializable format result_dict = asdict(result) serializable_result = self.monitor._make_serializable(result_dict) with open(filepath, "w") as f: json.dump(serializable_result, f, indent=2) logger.info(f"Benchmark results exported to {filepath}")
# Convenience decorators for easy performance monitoring
[docs] def profile_performance( operation_name: str | None = None, track_input_size: bool = False ): """Decorator to automatically profile function performance.""" def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs): # Determine operation name op_name = operation_name or f"{func.__module__}.{func.__name__}" # Try to estimate input size if requested input_size_mb = 0.0 if track_input_size and args: try: # Look for numpy arrays in arguments for arg in args: if hasattr(arg, "nbytes"): input_size_mb += arg.nbytes / (1024 * 1024) except (TypeError, AttributeError): pass # Profile the function execution with PerformanceProfiler(op_name, input_size_mb=input_size_mb): return func(*args, **kwargs) return wrapper return decorator
[docs] @contextmanager def benchmark_context( operation_name: str, input_size_mb: float = 0.0, metadata: dict[str, Any] | None = None, ): """Context manager for benchmarking code blocks.""" with PerformanceProfiler(operation_name, input_size_mb, metadata): yield
# Global performance monitor instance
[docs] def get_performance_monitor() -> PerformanceMonitor: """Get the global performance monitor instance.""" return PerformanceMonitor.get_instance()
[docs] def run_xpcs_benchmark( sample_data_path: str | None = None, output_dir: str = "benchmarks" ) -> BenchmarkResult: """Convenience function to run the complete XPCS benchmark suite.""" suite = XPCSBenchmarkSuite(output_dir) return suite.run_comprehensive_benchmark(sample_data_path)