Utilities

Utility modules providing logging, memory management, performance monitoring, and other support functions.

Logging Configuration

Advanced logging setup with hierarchical loggers, custom formatters, and environment-variable driven configuration.

Centralized logging configuration for XPCS Viewer.

This module provides a robust, configurable logging system with support for: - Environment variable-based configuration - Multiple output handlers (console + file with rotation) - Structured logging with consistent formatting - JSON format option for structured logging - Thread-safe implementation for GUI applications - Integration with existing LoggerWriter helper

Environment Variables:

PYXPCS_LOG_LEVEL: DEBUG/INFO/WARNING/ERROR/CRITICAL (default: INFO) PYXPCS_LOG_FILE: Custom log file path PYXPCS_LOG_DIR: Log directory (default: ~/.xpcsviewer/logs) PYXPCS_LOG_FORMAT: TEXT/JSON for structured logging (default: TEXT) PYXPCS_LOG_MAX_SIZE: Maximum log file size in MB (default: 10) PYXPCS_LOG_BACKUP_COUNT: Number of backup log files (default: 5) PYXPCS_SUPPRESS_QT_WARNINGS: 1 to suppress Qt logging (default: 0) PYXPCS_LOG_RATE_LIMIT: Rate limit for high-frequency logs in msgs/sec (default: 10.0) PYXPCS_LOG_SANITIZE_PATHS: Path sanitization mode: none/home/hash (default: home) PYXPCS_LOG_SESSION_ID: Enable session correlation IDs: 1/0 (default: 1)

Usage:

from xpcsviewer.utils.logging_config import get_logger logger = get_logger(__name__) logger.info(“Application started”)

class xpcsviewer.utils.logging_config.LoggingConfig[source]

Bases: object

Singleton logging configuration manager.

__init__()[source]
setup_logging()[source]

Public method to setup or reset logging configuration.

get_logger_info()[source]

Get current logging configuration info.

Return type:

dict[str, Any]

update_log_level(level)[source]

Update the log level for all handlers.

Parameters:

level (str | int)

xpcsviewer.utils.logging_config.initialize_logging()[source]

Initialize the logging configuration (called automatically on first use).

Return type:

LoggingConfig

xpcsviewer.utils.logging_config.get_logger(name=None)[source]

Get a configured logger instance.

Parameters:

name (str | None) – Logger name (typically __name__)

Returns:

Configured logger instance

Return type:

Logger

Usage:

logger = get_logger(__name__) logger.info(“This is a log message”)

xpcsviewer.utils.logging_config.get_logging_config()[source]

Get the current logging configuration instance.

Return type:

LoggingConfig

xpcsviewer.utils.logging_config.set_log_level(level)[source]

Set the logging level globally.

Parameters:

level (str | int) – Log level (string like ‘DEBUG’, ‘INFO’ or integer)

xpcsviewer.utils.logging_config.get_log_file_path()[source]

Get the current log file path.

Return type:

Path

xpcsviewer.utils.logging_config.get_log_directory()[source]

Get the current log directory path.

Return type:

Path

xpcsviewer.utils.logging_config.reset_logging_config()[source]

Reset the logging configuration singleton for testing purposes.

xpcsviewer.utils.logging_config.log_system_info()[source]

Log useful system information for debugging.

xpcsviewer.utils.logging_config.setup_exception_logging()[source]

Setup logging for uncaught exceptions.

xpcsviewer.utils.logging_config.setup_logging(level=None)[source]

Setup logging with optional level override.

This function provides compatibility with existing code patterns.

Parameters:

level (str | int | None) – Optional log level override

Returns:

Root logger instance

Return type:

Logger

Logging Utilities

Session context, rate-limited logging, method timing, and path sanitization.

Logging utilities for comprehensive application monitoring.

This module provides logging utilities for session context management, rate-limited logging, method timing, and path sanitization.

Features:
  • LoggingContext: Session-level context with correlation IDs (contextvars)

  • SessionContextFilter: Logging filter that adds session context to records

  • RateLimitedLogger: Token bucket rate-limited logging wrapper

  • log_timing: Decorator for logging method entry/exit with timing

  • sanitize_path: Path sanitization for privacy in logs

Environment Variables:
  • PYXPCS_LOG_RATE_LIMIT: Default rate limit in msgs/sec (default: 10.0)

  • PYXPCS_LOG_SANITIZE_PATHS: Path sanitization mode: none/home/hash (default: home)

  • PYXPCS_LOG_SESSION_ID: Enable session IDs: 1/0 (default: 1)

Example:

from xpcsviewer.utils.log_utils import (
    LoggingContext,
    RateLimitedLogger,
    log_timing,
    sanitize_path,
)

# Session context
with LoggingContext(operation="data_analysis") as ctx:
    logger.info("Starting analysis")  # Includes session_id

# Rate-limited logging
rate_limited = RateLimitedLogger(logger, rate_per_second=5.0)
rate_limited.debug("High-frequency event")

# Timing decorator
@log_timing()
def process_data(data):
    ...

# Path sanitization
logger.info(f"Loaded: {sanitize_path(file_path)}")
class xpcsviewer.utils.log_utils.LoggingContext(session_id=None, operation=None, current_file=None)[source]

Bases: object

Manage session-level logging context with correlation IDs.

Uses contextvars for thread-safe session tracking. All log entries within a context share the same session_id for correlation.

Parameters:
  • session_id (str | None)

  • operation (str | None)

  • current_file (str | None)

session_id

UUID4 identifier for the session (read-only after creation)

Example

with LoggingContext(operation=”batch_analysis”) as ctx:

logger.info(“Starting batch”) # Includes session_id ctx.update_file(“/path/to/data.hdf5”) process_file()

__init__(session_id=None, operation=None, current_file=None)[source]

Initialize a logging context.

Parameters:
  • session_id (str | None) – Session ID (auto-generated UUID4 if not provided)

  • operation (str | None) – Current high-level operation name

  • current_file (str | None) – Currently loaded file path (will be sanitized)

Return type:

None

__enter__()[source]

Enter the context manager, setting contextvars.

Return type:

LoggingContext

__exit__(*args)[source]

Exit the context manager, restoring previous contextvars.

Parameters:

args (object)

Return type:

None

property session_id: str

Get the session ID.

update_operation(operation)[source]

Update the current operation context.

Parameters:

operation (str)

Return type:

None

update_file(file_path)[source]

Update the current file context (path will be sanitized).

Parameters:

file_path (str | PathLike[str] | None)

Return type:

None

static get_current()[source]

Get the current logging context if one is active.

Returns:

Current LoggingContext or None if no context is active

Return type:

LoggingContext | None

class xpcsviewer.utils.log_utils.SessionContextFilter(name='')[source]

Bases: Filter

Logging filter that adds session context to log records.

Adds the following attributes to each LogRecord:
  • session_id: Current session ID or ‘no-session’

  • operation: Current operation name or ‘’

  • current_file: Current file path (sanitized) or ‘’

This filter should be added to handlers via logging_config.py.

filter(record)[source]

Add session context attributes to the log record.

Parameters:

record (LogRecord) – The log record to enrich

Returns:

True (always allows the record through)

Return type:

bool

class xpcsviewer.utils.log_utils.RateLimitedLogger(logger, rate_per_second=None, burst_size=None)[source]

Bases: object

Rate-limited logger wrapper using token bucket algorithm.

Prevents log flooding by limiting messages per second. Each unique message template is rate-limited independently.

Parameters:
suppressed_count

Total number of suppressed messages

Example

logger = logging.getLogger(__name__) rate_limited = RateLimitedLogger(logger, rate_per_second=5.0)

def on_mouse_move(x, y):

rate_limited.debug(f”Mouse at ({x}, {y})”) # Max 5/sec

__init__(logger, rate_per_second=None, burst_size=None)[source]

Initialize the rate-limited logger.

Parameters:
  • logger (Logger) – The underlying logger to wrap

  • rate_per_second (float | None) – Maximum messages per second (default from env var)

  • burst_size (float | None) – Initial token bucket size (default: rate_per_second)

Return type:

None

debug(msg, *args, **kwargs)[source]

Log DEBUG message if not rate-limited. Returns True if logged.

Parameters:
Return type:

bool

info(msg, *args, **kwargs)[source]

Log INFO message if not rate-limited. Returns True if logged.

Parameters:
Return type:

bool

warning(msg, *args, **kwargs)[source]

Log WARNING message if not rate-limited. Returns True if logged.

Parameters:
Return type:

bool

error(msg, *args, **kwargs)[source]

Log ERROR message if not rate-limited. Returns True if logged.

Parameters:
Return type:

bool

critical(msg, *args, **kwargs)[source]

Log CRITICAL message if not rate-limited. Returns True if logged.

Parameters:
Return type:

bool

get_suppressed_count(msg_key=None)[source]

Get count of suppressed messages.

Parameters:

msg_key (str | None) – Specific message key, or None for total

Returns:

Number of suppressed messages

Return type:

int

reset()[source]

Clear all rate limit state.

Return type:

None

xpcsviewer.utils.log_utils.log_timing(logger=None, level=10, threshold_ms=None, threshold_level=30, include_args=False)[source]

Decorator for logging method entry/exit with timing.

Logs the method name and execution time. Optionally logs at a higher level if execution exceeds a threshold.

Parameters:
  • logger (Logger | None) – Logger to use (default: logger for decorated function’s module)

  • level (int) – Normal log level (default: DEBUG)

  • threshold_ms (float | None) – Time threshold in ms for elevated logging

  • threshold_level (int) – Level to use when threshold exceeded (default: WARNING)

  • include_args (bool) – Whether to include function arguments in log

Returns:

Decorated function

Return type:

Callable[[F], F]

Example:

@log_timing()
def process_data(data):
    ...  # Logs: "process_data completed in 123.45ms"

@log_timing(threshold_ms=1000)
def slow_operation():
    ...  # Logs WARNING if > 1000ms
xpcsviewer.utils.log_utils.sanitize_path(path, mode=None)[source]

Sanitize file paths for logging privacy.

Parameters:
  • path (str | PathLike[str] | None) – File path to sanitize

  • mode (Literal['none', 'home', 'hash'] | None) – Sanitization mode (default from PYXPCS_LOG_SANITIZE_PATHS env var) - ‘none’: No sanitization, full path - ‘home’: Replace home directory with ~ - ‘hash’: Replace home with ~ and hash the filename

Returns:

Sanitized path string

Return type:

str

Example

sanitize_path(‘/Users/john/data/file.h5’) # mode=’none’ -> ‘/Users/john/data/file.h5’ # mode=’home’ -> ‘~/data/file.h5’ # mode=’hash’ -> ‘~/data/a1b2c3d4.h5’

xpcsviewer.utils.log_utils.get_session_context()[source]

Get the current session context as a dictionary.

Returns:

Dictionary with session_id, operation, and current_file

Return type:

dict[str, str]

Environment Variables:

Variable

Default

Description

PYXPCS_LOG_LEVEL

INFO

DEBUG/INFO/WARNING/ERROR/CRITICAL

PYXPCS_LOG_FORMAT

TEXT

TEXT or JSON (for log aggregation)

PYXPCS_LOG_RATE_LIMIT

10.0

Rate limit (msgs/sec)

PYXPCS_LOG_SANITIZE_PATHS

home

none/home/hash

Log Formatters

Custom log formatters for console, file, JSON, and performance output.

Custom log formatters for XPCS Viewer.

This module provides specialized formatters for different logging outputs: - ColoredConsoleFormatter: Colored console output with clean formatting - StructuredFileFormatter: Detailed file logging with context - JSONFormatter: Structured JSON logging for log analysis

Features: - Color-coded log levels for console output - Thread-safe formatting - Structured data support - Performance-optimized formatters - Consistent timestamp formatting

class xpcsviewer.utils.log_formatters.ColoredConsoleFormatter(use_colors=None)[source]

Bases: Formatter

Colored console formatter with clean, readable output.

Features: - Color-coded log levels - Clean module name formatting - Thread information for debugging - Exception formatting

Parameters:

use_colors (bool | None)

COLORS: ClassVar[dict[str, str]] = {'BOLD': '\x1b[1m', 'CRITICAL': '\x1b[35m', 'DEBUG': '\x1b[36m', 'DIM': '\x1b[2m', 'ERROR': '\x1b[31m', 'INFO': '\x1b[32m', 'RESET': '\x1b[0m', 'WARNING': '\x1b[33m'}
__init__(use_colors=None)[source]

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of ‘%’, ‘{’ or ‘$’ to specify that you want to use one of %-formatting, str.format() ({}) formatting or string.Template formatting in your format string.

Changed in version 3.2: Added the style parameter.

Parameters:

use_colors (bool | None)

format(record)[source]

Format the log record for console output.

Parameters:

record (LogRecord)

Return type:

str

formatException(ei)[source]

Format exception with colors.

Return type:

str

class xpcsviewer.utils.log_formatters.StructuredFileFormatter[source]

Bases: Formatter

Structured file formatter with detailed context information.

Features: - Detailed timestamps - Full module paths - Thread and process information - Exception context - Performance timing

__init__()[source]

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of ‘%’, ‘{’ or ‘$’ to specify that you want to use one of %-formatting, str.format() ({}) formatting or string.Template formatting in your format string.

Changed in version 3.2: Added the style parameter.

format(record)[source]

Format the log record for file output.

Parameters:

record (LogRecord)

Return type:

str

class xpcsviewer.utils.log_formatters.JSONFormatter(app_name='XPCS Viewer', app_version='unknown')[source]

Bases: Formatter

JSON formatter for structured logging and log analysis.

Features: - Machine-readable JSON output - Structured exception handling - Application context - Performance metrics - Thread-safe JSON serialization

Parameters:
  • app_name (str)

  • app_version (str)

__init__(app_name='XPCS Viewer', app_version='unknown')[source]

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of ‘%’, ‘{’ or ‘$’ to specify that you want to use one of %-formatting, str.format() ({}) formatting or string.Template formatting in your format string.

Changed in version 3.2: Added the style parameter.

Parameters:
  • app_name (str)

  • app_version (str)

format(record)[source]

Format the log record as JSON.

Parameters:

record (LogRecord)

Return type:

str

class xpcsviewer.utils.log_formatters.PerformanceFormatter[source]

Bases: StructuredFileFormatter

Enhanced formatter that includes performance timing information.

Useful for performance analysis and debugging slow operations.

__init__()[source]

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of ‘%’, ‘{’ or ‘$’ to specify that you want to use one of %-formatting, str.format() ({}) formatting or string.Template formatting in your format string.

Changed in version 3.2: Added the style parameter.

format(record)[source]

Format record with performance timing.

Parameters:

record (LogRecord)

Return type:

str

xpcsviewer.utils.log_formatters.create_formatter(formatter_type, **kwargs)[source]

Factory function to create formatters.

Parameters:
  • formatter_type (str) – Type of formatter (‘console’, ‘file’, ‘json’, ‘performance’)

  • **kwargs – Additional arguments for the formatter

Returns:

Configured formatter instance

Return type:

Formatter

Memory Management

Intelligent memory management with LRU caching, memory pressure detection, and automatic cleanup.

Unified Memory Management System for XPCS Viewer

This module provides a comprehensive memory management solution that consolidates all caching strategies and implements intelligent memory pressure handling.

class xpcsviewer.utils.memory_manager.CacheType(*values)[source]

Bases: Enum

Types of cached data for different management strategies.

COMPUTATION = 'computation'
ARRAY_DATA = 'array_data'
METADATA = 'metadata'
PLOT_DATA = 'plot_data'
class xpcsviewer.utils.memory_manager.MemoryPressure(*values)[source]

Bases: Enum

Memory pressure levels for adaptive management.

LOW = 'low'
MODERATE = 'moderate'
HIGH = 'high'
CRITICAL = 'critical'
class xpcsviewer.utils.memory_manager.CacheEntry(data, cache_type, size_mb=None)[source]

Bases: object

Enhanced cache entry with comprehensive metadata.

Parameters:
  • data (Any)

  • cache_type (CacheType)

  • size_mb (float | None)

__init__(data, cache_type, size_mb=None)[source]
Parameters:
  • data (Any)

  • cache_type (CacheType)

  • size_mb (float | None)

touch()[source]

Update access metadata.

Return type:

None

age_seconds()[source]

Get age of cache entry in seconds.

Return type:

float

time_since_access()[source]

Get time since last access in seconds.

Return type:

float

class xpcsviewer.utils.memory_manager.UnifiedMemoryManager(max_memory_mb=2048, pressure_thresholds=None, enable_monitoring=True)[source]

Bases: object

Unified memory management system that consolidates all caching strategies.

Features: - Predictive memory pressure detection - Intelligent cache eviction with multiple strategies - Type-aware cache management - Memory-mapped file support for large arrays - Automatic memory optimization

Parameters:
__init__(max_memory_mb=2048, pressure_thresholds=None, enable_monitoring=True)[source]
Parameters:
preload_data(key, loader_func, cache_type, priority=5)[source]

Schedule data for intelligent preloading.

Parameters:
  • key (str) – Cache key for the data

  • loader_func (callable) – Function to load the data

  • cache_type (CacheType) – Type of cache to store in

  • priority (int) – Priority level (1-10, higher = more important)

get_memory_pressure()[source]

Get current memory pressure level.

Return type:

MemoryPressure

cache_put(key, data, cache_type, pin=False)[source]

Store data in the appropriate cache with intelligent eviction.

Parameters:
  • key (str) – Unique cache key

  • data (Any) – Data to cache

  • cache_type (CacheType) – Type of cache for management strategy

  • pin (bool) – Prevent eviction if True

Returns:

True if successfully cached

Return type:

bool

cache_get(key, cache_type)[source]

Retrieve data from cache.

Parameters:
  • key (str) – Cache key

  • cache_type (CacheType) – Type of cache to search

Returns:

Cached data or None if not found

Return type:

Any or None

clear_cache_type(cache_type)[source]

Clear all entries from specified cache type.

Parameters:

cache_type (CacheType)

clear_all_caches()[source]

Clear all caches.

get_cache_stats()[source]

Get comprehensive cache statistics.

Return type:

dict[str, Any]

shutdown()[source]

Shutdown memory manager and cleanup resources.

get_enhanced_stats()[source]

Get enhanced statistics including new caching features

Return type:

dict[str, Any]

xpcsviewer.utils.memory_manager.get_memory_manager()[source]

Get or create the global memory manager instance.

Uses double-checked locking to be thread-safe under concurrent first access from multiple threads without paying the lock cost on every subsequent call. (BUG-030)

Return type:

UnifiedMemoryManager

xpcsviewer.utils.memory_manager.shutdown_memory_manager()[source]

Shutdown the global memory manager.

xpcsviewer.utils.memory_manager.cache_computation(key, data)[source]

Cache computation result.

Parameters:
Return type:

bool

xpcsviewer.utils.memory_manager.get_computation(key)[source]

Get cached computation result.

Parameters:

key (str)

Return type:

Any | None

xpcsviewer.utils.memory_manager.cache_array(key, array)[source]

Cache large array data.

Parameters:
Return type:

bool

xpcsviewer.utils.memory_manager.get_array(key)[source]

Get cached array data.

Parameters:

key (str)

Return type:

ndarray | None

xpcsviewer.utils.memory_manager.memory_pressure_monitor()[source]

Context manager for monitoring memory pressure during operations.

Performance Monitoring

Real-time performance tracking for optimization and debugging.

Performance Monitoring and Benchmarking Infrastructure for XPCS Viewer

This module provides comprehensive performance monitoring capabilities to track, analyze, and benchmark the various optimizations implemented in the XPCS toolkit.

class xpcsviewer.utils.performance_monitor.PerformanceMetrics(operation_name, start_time, end_time, duration, memory_before_mb, memory_after_mb, memory_peak_mb, memory_delta_mb, cpu_percent, input_size_mb=0.0, output_size_mb=0.0, success=True, error_message='', metadata=<factory>)[source]

Bases: object

Container for performance metrics.

Parameters:
operation_name: str
start_time: float
end_time: float
duration: float
memory_before_mb: float
memory_after_mb: float
memory_peak_mb: float
memory_delta_mb: float
cpu_percent: float
input_size_mb: float = 0.0
output_size_mb: float = 0.0
success: bool = True
error_message: str = ''
metadata: dict[str, Any]
property throughput_mbs: float

Calculate throughput in MB/s.

property memory_efficiency: float

Calculate memory efficiency (input/peak memory).

__init__(operation_name, start_time, end_time, duration, memory_before_mb, memory_after_mb, memory_peak_mb, memory_delta_mb, cpu_percent, input_size_mb=0.0, output_size_mb=0.0, success=True, error_message='', metadata=<factory>)
Parameters:
Return type:

None

class xpcsviewer.utils.performance_monitor.BenchmarkResult(benchmark_name, timestamp, system_info, metrics, summary, duration_total, success_rate)[source]

Bases: object

Container for benchmark results.

Parameters:
benchmark_name: str
timestamp: str
system_info: dict[str, Any]
metrics: list[PerformanceMetrics]
summary: dict[str, Any]
duration_total: float
success_rate: float
__init__(benchmark_name, timestamp, system_info, metrics, summary, duration_total, success_rate)
Parameters:
Return type:

None

class xpcsviewer.utils.performance_monitor.PerformanceProfiler(operation_name, input_size_mb=0.0, metadata=None)[source]

Bases: object

Context manager for profiling performance of operations.

Parameters:
__init__(operation_name, input_size_mb=0.0, metadata=None)[source]
Parameters:
start_time: float
end_time: float
memory_before_mb: float
memory_after_mb: float
memory_peak_mb: float
cpu_percent: float
__enter__()[source]

Start profiling.

__exit__(exc_type, exc_val, exc_tb)[source]

End profiling and create metrics.

update_memory_peak(current_memory_mb)[source]

Update peak memory usage during operation.

Parameters:

current_memory_mb (float)

class xpcsviewer.utils.performance_monitor.PerformanceMonitor(max_history=10000)[source]

Bases: object

Global performance monitoring system.

Parameters:

max_history (int)

__init__(max_history=10000)[source]
Parameters:

max_history (int)

metrics_history: deque
operation_stats: dict[str, list[PerformanceMetrics]]
benchmarks: list[BenchmarkResult]
classmethod get_instance()[source]

Get or create global performance monitor instance.

Return type:

PerformanceMonitor

record_metrics(metrics)[source]

Record performance metrics.

Parameters:

metrics (PerformanceMetrics)

get_operation_stats(operation_name)[source]

Get statistics for a specific operation.

Parameters:

operation_name (str)

Return type:

dict[str, Any]

get_overall_stats()[source]

Get overall performance statistics.

Return type:

dict[str, Any]

export_metrics(filepath)[source]

Export performance metrics to JSON file.

Parameters:

filepath (str)

clear_history()[source]

Clear performance metrics history.

class xpcsviewer.utils.performance_monitor.XPCSBenchmarkSuite(output_dir='benchmarks')[source]

Bases: object

Comprehensive benchmark suite for XPCS performance testing.

Parameters:

output_dir (str)

__init__(output_dir='benchmarks')[source]
Parameters:

output_dir (str)

run_comprehensive_benchmark(sample_data_path=None)[source]

Run comprehensive benchmark of all XPCS optimizations.

Parameters:

sample_data_path (str | None)

Return type:

BenchmarkResult

xpcsviewer.utils.performance_monitor.profile_performance(operation_name=None, track_input_size=False)[source]

Decorator to automatically profile function performance.

Parameters:
  • operation_name (str | None)

  • track_input_size (bool)

xpcsviewer.utils.performance_monitor.benchmark_context(operation_name, input_size_mb=0.0, metadata=None)[source]

Context manager for benchmarking code blocks.

Parameters:
xpcsviewer.utils.performance_monitor.get_performance_monitor()[source]

Get the global performance monitor instance.

Return type:

PerformanceMonitor

xpcsviewer.utils.performance_monitor.run_xpcs_benchmark(sample_data_path=None, output_dir='benchmarks')[source]

Convenience function to run the complete XPCS benchmark suite.

Parameters:
  • sample_data_path (str | None)

  • output_dir (str)

Return type:

BenchmarkResult

Reliability

Error handling, validation, and fallback mechanisms.

Enhanced Reliability Framework for XPCS Viewer.

This module provides zero-overhead reliability mechanisms including: - Fail-fast validation decorators - Exception result caching - Smart fallback strategies - Performance-preserving reliability checks

class xpcsviewer.utils.reliability.ValidationLevel(*values)[source]

Bases: Enum

Validation strictness levels for performance tuning.

MINIMAL = 'minimal'
STANDARD = 'standard'
STRICT = 'strict'
PARANOID = 'paranoid'
class xpcsviewer.utils.reliability.ValidationResult(is_valid, error_message=None, warnings=None, validation_time=0.0, cached=False)[source]

Bases: object

Result of validation operation with caching support.

Parameters:
is_valid: bool
error_message: str | None = None
warnings: list[str] | None = None
validation_time: float = 0.0
cached: bool = False
__init__(is_valid, error_message=None, warnings=None, validation_time=0.0, cached=False)
Parameters:
Return type:

None

class xpcsviewer.utils.reliability.ValidationCache(max_size=1000, default_ttl=300.0)[source]

Bases: object

Thread-safe cache for validation results with TTL support.

Parameters:
__init__(max_size=1000, default_ttl=300.0)[source]
Parameters:
get(key)[source]

Get cached validation result if still valid.

Parameters:

key (str)

Return type:

ValidationResult | None

put(key, result)[source]

Store validation result in cache.

Parameters:
  • key (str)

  • result (ValidationResult)

Return type:

None

clear()[source]

Clear all cached results.

Return type:

None

xpcsviewer.utils.reliability.validate_input(*, check_types=True, check_ranges=True, check_shapes=True, check_values=True, level=ValidationLevel.STANDARD, cache_results=True, fast_fail=True)[source]

Zero-overhead input validation decorator with intelligent caching.

Parameters:
  • check_types (bool) – Validate parameter types

  • check_ranges (bool) – Validate numerical ranges

  • check_shapes (bool) – Validate array shapes

  • check_values (bool) – Validate for NaN, infinity, etc.

  • level (ValidationLevel) – Validation strictness level

  • cache_results (bool) – Cache validation results for performance

  • fast_fail (bool) – Fail immediately on first validation error

Example:

@validate_input(check_ranges=True, level=ValidationLevel.STRICT)
def process_data(data: np.ndarray, threshold: float = 0.1):
    # Function implementation
    pass
class xpcsviewer.utils.reliability.SmartFallbackManager[source]

Bases: object

Manager for smart fallback strategies with pre-computed paths.

__init__()[source]
register_fallback_chain(operation_name, strategies)[source]

Register a chain of fallback strategies for an operation.

Parameters:
Return type:

None

execute_with_fallback(operation_name, *args, **kwargs)[source]

Execute operation with automatic fallback on failure.

Parameters:

operation_name (str)

Return type:

Any

get_performance_stats(operation_name)[source]

Get performance statistics for an operation.

Parameters:

operation_name (str)

Return type:

dict[str, float]

xpcsviewer.utils.reliability.with_fallback(operation_name, strategies=None)[source]

Decorator for automatic fallback execution with pre-computed strategies.

Parameters:
  • operation_name (str) – Name of the operation for strategy registration

  • strategies (list[Callable] | None) – List of fallback functions (if not already registered)

Example:

@with_fallback("data_loading", [load_enhanced, load_standard])
def load_data(file_path):
    # Primary implementation - automatically falls back if it fails
    pass
class xpcsviewer.utils.reliability.ReliabilityContext(max_retries=3, retry_delay=0.1, exponential_backoff=True, acceptable_exceptions=None)[source]

Bases: object

Context manager for enhanced reliability with retries and exponential backoff.

Usage – wrap each attempt in its own with block inside a loop:

ctx = ReliabilityContext(max_retries=3, retry_delay=0.1)
while True:
    with ctx:
        risky_operation()
    if ctx.should_stop:
        break

Alternatively use the run() helper which encapsulates the loop:

ctx = ReliabilityContext(max_retries=3)
result = ctx.run(risky_operation, arg1, arg2)
Parameters:
  • max_retries (int)

  • retry_delay (float)

  • exponential_backoff (bool)

  • acceptable_exceptions (tuple[type, ...] | None)

__init__(max_retries=3, retry_delay=0.1, exponential_backoff=True, acceptable_exceptions=None)[source]
Parameters:
  • max_retries (int)

  • retry_delay (float)

  • exponential_backoff (bool)

  • acceptable_exceptions (tuple[type, ...] | None)

run(func, *args, **kwargs)[source]

Execute func(*args, **kwargs) with automatic retry on failure.

Retries up to self.max_retries times on acceptable_exceptions. Raises the last exception if all retries are exhausted.

This is the preferred API because it avoids the __exit__ self-catch problem that arises when the context manager is used in a while loop manually (BUG-054).

xpcsviewer.utils.reliability.reliability_context(max_retries=3, retry_delay=0.1, exponential_backoff=True, acceptable_exceptions=None)[source]

Context manager for enhanced reliability with retries and exponential backoff.

Parameters:
  • max_retries (int) – Maximum number of retry attempts

  • retry_delay (float) – Base delay between retries (seconds)

  • exponential_backoff (bool) – Use exponential backoff for delays

  • acceptable_exceptions (tuple[type, ...] | None) – Exception types that should be retried

Example:

attempt = 0
while attempt <= max_retries:
    try:
        with reliability_context(max_retries=3, retry_delay=0.5):
            risky_operation()
        break  # Success
    except (OSError, ConnectionError) as e:
        attempt += 1
        if attempt > max_retries:
            raise
xpcsviewer.utils.reliability.get_validation_cache()[source]

Get the global validation cache instance.

Return type:

ValidationCache

xpcsviewer.utils.reliability.get_fallback_manager()[source]

Get the global fallback manager instance.

Return type:

SmartFallbackManager

xpcsviewer.utils.reliability.clear_reliability_caches()[source]

Clear all reliability-related caches for testing or memory management.

Return type:

None

class xpcsviewer.utils.reliability.ReliabilityProfiler[source]

Bases: object

Lightweight profiler for reliability overhead measurement.

__init__()[source]
record_overhead(operation, overhead_time)[source]

Record reliability overhead for an operation.

Parameters:
Return type:

None

get_overhead_stats(operation=None)[source]

Get overhead statistics for operations.

Parameters:

operation (str | None)

Return type:

dict[str, Any]

xpcsviewer.utils.reliability.get_reliability_profiler()[source]

Get the global reliability profiler instance.

Return type:

ReliabilityProfiler

Validation

Input validation utilities for data integrity checks.

Enhanced Validation utilities for XPCS data processing with reliability features.

This module provides centralized validation functions with performance-preserving reliability enhancements including caching, fail-fast mechanisms, and detailed error context for better debugging and recovery.

xpcsviewer.utils.validation.get_file_label_safe(xf)[source]

Safely extract a label from an XPCS file object.

Parameters:

xf – XPCS file object that may or may not have a ‘label’ attribute

Returns:

The file label if available, otherwise ‘unknown’

Return type:

str

xpcsviewer.utils.validation.validate_xf_fit_summary(xf)[source]

Validate that an XPCS file object has a valid fit_summary with required fields. Enhanced with caching and detailed error context.

Parameters:

xf – XPCS file object to validate

Returns:

  • bool: True if validation passed, False otherwise

  • dict or None: The fit_summary if valid, None otherwise

  • str or None: Error message if validation failed, None if passed

Return type:

Tuple containing

xpcsviewer.utils.validation.validate_xf_has_fit_summary(xf)[source]

Simple validation that an XPCS file object has a fit_summary attribute.

Parameters:

xf – XPCS file object to validate

Returns:

  • bool: True if has fit_summary, False otherwise

  • str or None: Error message if validation failed, None if passed

Return type:

Tuple containing

xpcsviewer.utils.validation.validate_fit_summary_fields(fit_summary, required_fields, file_label='unknown')[source]

Validate that a fit_summary dictionary contains required fields.

Parameters:
  • fit_summary (dict[str, Any]) – Dictionary to validate

  • required_fields (list) – List of required field names

  • file_label (str) – Label for error messaging

Returns:

  • bool: True if all fields present, False otherwise

  • str or None: Error message if validation failed, None if passed

Return type:

Tuple containing

xpcsviewer.utils.validation.log_array_size_mismatch(file_label, array_info, min_length)[source]

Log a standardized array size mismatch warning.

Parameters:
  • file_label (str) – Label of the file with the mismatch

  • array_info (dict[str, int]) – Dictionary mapping array names to their lengths

  • min_length (int) – The minimum length arrays will be trimmed to

Return type:

None

xpcsviewer.utils.validation.validate_array_compatibility(*arrays, names=None, allow_broadcast=False, file_label='unknown')[source]

Validate that arrays have compatible shapes.

Per Technical Guidelines, this function raises XPCSValidationError instead of silently truncating arrays, as silent data modification is prohibited.

Parameters:
  • *arrays (np.ndarray) – Arrays to validate

  • names (list[str] | None) – Optional names for error messages

  • allow_broadcast (bool) – If True, allow NumPy-compatible broadcasting. If False (default), require exact shape match.

  • file_label (str) – Label for error messaging (deprecated, use names instead)

Returns:

True if arrays are compatible

Return type:

bool

Raises:

XPCSValidationError – If arrays have incompatible shapes (no silent truncation)

Notes

Changed from silent truncation to raising XPCSValidationError per data integrity guidelines. Silent data modification is prohibited.

xpcsviewer.utils.validation.validate_array_compatibility_legacy(*arrays, file_label='unknown')[source]

Legacy validation that returns tuple instead of raising.

DEPRECATED: Use validate_array_compatibility() which raises XPCSValidationError. This function is kept for backward compatibility but should not be used for new code as it allows silent data truncation.

Parameters:
  • *arrays – Variable number of arrays to check

  • file_label (str) – Label for error messaging

Returns:

  • bool: True if arrays are compatible (after trimming), False if empty

  • int: Minimum length of arrays

  • str or None: Warning message if sizes differ, None if all same size

Return type:

Tuple containing

xpcsviewer.utils.validation.validate_hdf5_file_integrity(file_path, required_datasets=None)[source]

Comprehensive HDF5 file integrity validation with caching.

Parameters:
  • file_path (str) – Path to HDF5 file

  • required_datasets (list[str] | None) – List of required dataset paths

Returns:

ValidationResult with detailed validation information

Return type:

ValidationResult

xpcsviewer.utils.validation.validate_scientific_array(array, array_name='data', min_dimensions=1, max_dimensions=None, expected_shape=None, allow_nan=False, allow_negative=True, finite_only=True)[source]

Comprehensive scientific array validation with domain-specific checks.

Parameters:
  • array (ndarray) – NumPy array to validate

  • array_name (str) – Name for error reporting

  • min_dimensions (int) – Minimum number of dimensions

  • max_dimensions (int | None) – Maximum number of dimensions

  • expected_shape (tuple[int, ...] | None) – Expected exact shape

  • allow_nan (bool) – Whether NaN values are acceptable

  • allow_negative (bool) – Whether negative values are acceptable

  • finite_only (bool) – Whether only finite values are acceptable

Returns:

ValidationResult with detailed validation information

Return type:

ValidationResult

xpcsviewer.utils.validation.validate_g2_data(g2_array, tau_array=None)[source]

Specialized validation for G2 correlation data.

Parameters:
  • g2_array (ndarray) – G2 correlation values

  • tau_array (ndarray | None) – Optional tau (time delay) values

Returns:

ValidationResult with G2-specific validation

Return type:

ValidationResult

xpcsviewer.utils.validation.get_validation_statistics()[source]

Get comprehensive validation performance statistics.

Returns:

Dictionary with validation cache statistics and performance metrics

Return type:

dict[str, Any]

Exceptions

Custom exception hierarchy for XPCS-specific error handling.

The validation system uses XPCSValidationError for structured error reporting. Analysis modules catch these exceptions to skip incompatible files gracefully rather than silently trimming data.

XPCS Viewer Exception Hierarchy for Enhanced Error Handling and Reliability.

This module provides a comprehensive exception hierarchy that enables: - Better error categorization and handling - Zero-overhead performance (exceptions only cost when raised) - Detailed error context and recovery suggestions - Exception chaining for full error traceability - Automated error reporting and classification

exception xpcsviewer.utils.exceptions.XPCSBaseError(message, *, error_code=None, context=None, recovery_suggestions=None, severity='ERROR')[source]

Bases: Exception

Base exception for all XPCS Viewer errors.

Provides common functionality for error context, recovery suggestions, and performance-aware error handling.

Parameters:
__init__(message, *, error_code=None, context=None, recovery_suggestions=None, severity='ERROR')[source]
Parameters:
add_context(key, value)[source]

Add context information to the error (fluent interface).

Parameters:
Return type:

XPCSBaseError

add_recovery_suggestion(suggestion)[source]

Add recovery suggestion (fluent interface).

Parameters:

suggestion (str)

Return type:

XPCSBaseError

get_error_details()[source]

Get comprehensive error details for logging/reporting.

Return type:

dict[str, Any]

exception xpcsviewer.utils.exceptions.XPCSDataError(message, **kwargs)[source]

Bases: XPCSBaseError

Exceptions related to data validation, corruption, or format issues.

Used for: - Invalid data formats or structures - Data corruption detection - Missing required data fields - Data consistency violations

Parameters:
Return type:

None

__init__(message, **kwargs)[source]
Parameters:
Return type:

None

exception xpcsviewer.utils.exceptions.XPCSFileError(message, file_path=None, **kwargs)[source]

Bases: XPCSBaseError

Exceptions related to file operations and I/O issues.

Used for: - File not found or access issues - HDF5 file corruption or format problems - File permission or disk space issues - Network file access problems

Parameters:
Return type:

None

__init__(message, file_path=None, **kwargs)[source]
Parameters:
Return type:

None

exception xpcsviewer.utils.exceptions.XPCSComputationError(message, operation=None, **kwargs)[source]

Bases: XPCSBaseError

Exceptions related to scientific computations and analysis.

Used for: - Numerical computation failures - Fitting algorithm convergence issues - Invalid computation parameters - Mathematical domain errors

Parameters:
  • message (str)

  • operation (str | None)

  • kwargs (Any)

Return type:

None

__init__(message, operation=None, **kwargs)[source]
Parameters:
  • message (str)

  • operation (str | None)

  • kwargs (Any)

Return type:

None

exception xpcsviewer.utils.exceptions.XPCSMemoryError(message, requested_mb=None, **kwargs)[source]

Bases: XPCSBaseError

Exceptions related to memory management and resource exhaustion.

Used for: - Out of memory conditions - Memory allocation failures - Cache overflow situations - Resource limit exceeded

Parameters:
  • message (str)

  • requested_mb (float | None)

  • kwargs (Any)

Return type:

None

__init__(message, requested_mb=None, **kwargs)[source]
Parameters:
  • message (str)

  • requested_mb (float | None)

  • kwargs (Any)

Return type:

None

exception xpcsviewer.utils.exceptions.XPCSConfigurationError(message, config_key=None, **kwargs)[source]

Bases: XPCSBaseError

Exceptions related to configuration and setup issues.

Used for: - Invalid configuration values - Missing required configuration - Configuration validation failures - Environment setup problems

Parameters:
  • message (str)

  • config_key (str | None)

  • kwargs (Any)

Return type:

None

__init__(message, config_key=None, **kwargs)[source]
Parameters:
  • message (str)

  • config_key (str | None)

  • kwargs (Any)

Return type:

None

exception xpcsviewer.utils.exceptions.XPCSGUIError(message, component=None, **kwargs)[source]

Bases: XPCSBaseError

Exceptions related to GUI operations and Qt interactions.

Used for: - Qt signal/slot connection issues - GUI component initialization failures - Threading/concurrency problems in GUI - Display or rendering issues

Parameters:
  • message (str)

  • component (str | None)

  • kwargs (Any)

Return type:

None

__init__(message, component=None, **kwargs)[source]
Parameters:
  • message (str)

  • component (str | None)

  • kwargs (Any)

Return type:

None

exception xpcsviewer.utils.exceptions.XPCSValidationError(message, field=None, value=None, **kwargs)[source]

Bases: XPCSDataError

Specific validation failures with detailed field information.

Used for: - Input parameter validation - Data schema validation - Range and constraint validation - Type validation failures

Parameters:
Return type:

None

__init__(message, field=None, value=None, **kwargs)[source]
Parameters:
Return type:

None

exception xpcsviewer.utils.exceptions.XPCSNetworkError(message, url=None, **kwargs)[source]

Bases: XPCSBaseError

Exceptions related to network operations and remote access.

Used for: - Network connectivity issues - Remote file access failures - API call failures - Timeout conditions

Parameters:
  • message (str)

  • url (str | None)

  • kwargs (Any)

Return type:

None

__init__(message, url=None, **kwargs)[source]
Parameters:
  • message (str)

  • url (str | None)

  • kwargs (Any)

Return type:

None

exception xpcsviewer.utils.exceptions.XPCSCriticalError(message, **kwargs)[source]

Bases: XPCSBaseError

Critical system errors that require immediate attention.

Used for: - System state corruption - Unrecoverable failures - Security violations - Data integrity failures

Parameters:
Return type:

None

__init__(message, **kwargs)[source]
Parameters:
Return type:

None

exception xpcsviewer.utils.exceptions.XPCSWarning(message, **kwargs)[source]

Bases: XPCSBaseError

Non-fatal issues that should be reported but don’t stop execution.

Used for: - Performance degradation warnings - Deprecated feature usage - Partial operation failures - Resource usage warnings

Parameters:
Return type:

None

__init__(message, **kwargs)[source]
Parameters:
Return type:

None

xpcsviewer.utils.exceptions.chain_exception(new_exception, original_exception)[source]

Chain exceptions to preserve full error context with zero overhead.

Usage:

try:
    risky_operation()
except ValueError as e:
    raise chain_exception(
        XPCSDataError("Failed to process data"),
        e
    )
Parameters:
  • new_exception (XPCSBaseError)

  • original_exception (Exception)

Return type:

XPCSBaseError

xpcsviewer.utils.exceptions.convert_exception(original_exception, message=None)[source]

Convert standard Python exceptions to XPCS exceptions with zero overhead.

Parameters:
  • original_exception (Exception) – The original exception to convert

  • message (str | None) – Optional custom message (uses original message if not provided)

Returns:

Appropriate XPCS exception with original exception chained

Return type:

XPCSBaseError

xpcsviewer.utils.exceptions.handle_exceptions(default_exception=<class 'xpcsviewer.utils.exceptions.XPCSBaseError'>, convert_exceptions=True, add_context=None)[source]

Decorator for automatic exception handling and conversion.

Zero overhead when no exceptions occur.

Usage:

@handle_exceptions(XPCSDataError, add_context={"operation": "data_loading"})
def load_data(file_path):
    # Function implementation
    pass
Parameters:
Return type:

Callable[[Callable[[…], Any]], Callable[[…], Any]]

class xpcsviewer.utils.exceptions.exception_context(exception_type=<class 'xpcsviewer.utils.exceptions.XPCSBaseError'>, message='Operation failed', cleanup_func=None)[source]

Bases: object

Context manager for safe exception handling with resource cleanup.

Zero overhead when no exceptions occur.

Usage:
with exception_context(XPCSFileError, “Failed to process file”):

# Operations that might fail pass

Parameters:
  • exception_type (type)

  • message (str)

  • cleanup_func (Callable[[], None] | None)

__init__(exception_type=<class 'xpcsviewer.utils.exceptions.XPCSBaseError'>, message='Operation failed', cleanup_func=None)[source]
Parameters:
  • exception_type (type)

  • message (str)

  • cleanup_func (Callable[[], None] | None)

Common Checks

Lightweight guard functions for quick input pre-checks.

Common utility functions for consistent validation and checking patterns.

This module provides standardized utility functions to reduce code duplication across validation, error checking, and common operations in the XPCS toolkit.

xpcsviewer.utils.common_checks.is_empty_list(obj)[source]

Standardized check for empty lists or list-like objects.

Parameters:

obj (Any) – Object to check for emptiness

Returns:

True if object is empty or None, False otherwise

Return type:

bool

xpcsviewer.utils.common_checks.is_empty_or_none(obj)[source]

Check if object is None or empty (for various container types).

Parameters:

obj (Any) – Object to check

Returns:

True if object is None or empty, False otherwise

Return type:

bool

xpcsviewer.utils.common_checks.safe_array_access(arr, index, default=None)[source]

Safely access array elements with bounds checking.

Parameters:
  • arr (Any) – Array-like object to access

  • index (int | slice) – Index or slice to access

  • default (Any) – Default value to return if access fails

Returns:

Array element/slice or default value

Return type:

Any

xpcsviewer.utils.common_checks.safe_dict_access(d, key, default=None)[source]

Safely access dictionary values with None checking.

Parameters:
  • d (dict | None) – Dictionary to access

  • key (str) – Key to access

  • default (Any) – Default value if key not found or dict is None

Returns:

Dictionary value or default

Return type:

Any

xpcsviewer.utils.common_checks.validate_numeric_range(value, min_val=None, max_val=None, param_name='value')[source]

Validate that a numeric value is within specified range.

Parameters:
  • value (Any) – Value to validate

  • min_val (float | None) – Minimum allowed value (inclusive)

  • max_val (float | None) – Maximum allowed value (inclusive)

  • param_name (str) – Parameter name for error messages

Returns:

Tuple of (is_valid, error_message)

Return type:

tuple[bool, str | None]

xpcsviewer.utils.common_checks.ensure_list(obj)[source]

Ensure object is a list, converting if necessary.

Parameters:

obj (Any) – Object to convert to list

Returns:

List representation of object

Return type:

list[Any]

xpcsviewer.utils.common_checks.safe_min_max(values, default_min=0.0, default_max=1.0)[source]

Safely compute min and max of a list with fallback defaults.

Parameters:
  • values (list[Any]) – List of values to compute min/max for

  • default_min (float) – Default minimum if computation fails

  • default_max (float) – Default maximum if computation fails

Returns:

Tuple of (min_value, max_value)

Return type:

tuple[float, float]

xpcsviewer.utils.common_checks.check_required_attributes(obj, required_attrs, obj_name='object')[source]

Check if an object has all required attributes.

Parameters:
  • obj (Any) – Object to check

  • required_attrs (list[str]) – List of required attribute names

  • obj_name (str) – Name of object for error messages

Returns:

Tuple of (all_present, missing_attributes)

Return type:

tuple[bool, list[str]]

xpcsviewer.utils.common_checks.log_validation_warning(message, category='validation')[source]

Log a standardized validation warning.

Parameters:
  • message (str) – Warning message

  • category (str) – Category for logging filtering

Return type:

None

xpcsviewer.utils.common_checks.log_validation_error(message, category='validation')[source]

Log a standardized validation error.

Parameters:
  • message (str) – Error message

  • category (str) – Category for logging filtering

Return type:

None

xpcsviewer.utils.common_checks.standardize_file_rows(rows)[source]

Standardize file row selection to a list of integers.

Parameters:

rows (Any) – Row selection (can be None, int, or list)

Returns:

List of integer row indices

Return type:

list[int]

xpcsviewer.utils.common_checks.create_progress_steps(base_steps, prefix=None)[source]

Create standardized progress step descriptions.

Parameters:
  • base_steps (list[str]) – Base list of step descriptions

  • prefix (str | None) – Optional prefix to add to each step

Returns:

List of formatted step descriptions

Return type:

list[str]

Data Processing

Optimized data processing utilities.

Lazy Loader – Deferred module loading for startup performance:

Intelligent Lazy Loading System for XPCS Data

This module provides smart data loading that minimizes memory usage while maintaining performance through intelligent prefetching and caching strategies.

class xpcsviewer.utils.lazy_loader.AccessPattern(*values)[source]

Bases: Enum

Types of data access patterns for prediction.

SEQUENTIAL = 'sequential'
RANDOM = 'random'
SLIDING_WINDOW = 'sliding'
FULL_SCAN = 'full_scan'
SPARSE = 'sparse'
class xpcsviewer.utils.lazy_loader.AccessEvent(timestamp, data_key, slice_info, access_size_mb, cache_hit)[source]

Bases: object

Record of a data access event.

Parameters:
timestamp: float
data_key: str
slice_info: tuple[slice, ...] | None
access_size_mb: float
cache_hit: bool
__init__(timestamp, data_key, slice_info, access_size_mb, cache_hit)
Parameters:
Return type:

None

class xpcsviewer.utils.lazy_loader.PrefetchRequest(data_key, priority, estimated_access_time, slice_info, justification)[source]

Bases: object

Request for data prefetching.

Parameters:
data_key: str
priority: float
estimated_access_time: float
slice_info: tuple[slice, ...] | None
justification: str
__init__(data_key, priority, estimated_access_time, slice_info, justification)
Parameters:
Return type:

None

class xpcsviewer.utils.lazy_loader.LazyDataProxy(data_key, loader_func, estimated_size_mb)[source]

Bases: ABC

Abstract base class for lazy-loaded data proxies.

Parameters:
__init__(data_key, loader_func, estimated_size_mb)[source]
Parameters:
Return type:

None

abstractmethod __getitem__(key)[source]

Get data slice, loading if necessary.

Parameters:

key (Any)

Return type:

Any

abstractmethod __array__()[source]

Convert to numpy array, loading if necessary.

Return type:

ndarray[Any, Any]

property shape: tuple[int, ...]

Get data shape without loading.

class xpcsviewer.utils.lazy_loader.LazyHDF5Array(data_key, hdf5_path, dataset_path, estimated_size_mb, chunk_size_mb=100.0)[source]

Bases: LazyDataProxy

Lazy loader for HDF5 array data with intelligent slicing.

Parameters:
  • data_key (str)

  • hdf5_path (str)

  • dataset_path (str)

  • estimated_size_mb (float)

  • chunk_size_mb (float)

__init__(data_key, hdf5_path, dataset_path, estimated_size_mb, chunk_size_mb=100.0)[source]
Parameters:
  • data_key (str)

  • hdf5_path (str)

  • dataset_path (str)

  • estimated_size_mb (float)

  • chunk_size_mb (float)

__getitem__(key)[source]

Get data slice, with intelligent loading.

__array__()[source]

Convert to numpy array, loading if necessary.

property nbytes

Get size in bytes.

class xpcsviewer.utils.lazy_loader.IntelligentPrefetcher(max_prefetch_mb=500.0)[source]

Bases: object

Intelligent data prefetching based on access patterns.

Parameters:

max_prefetch_mb (float)

__init__(max_prefetch_mb=500.0)[source]
Parameters:

max_prefetch_mb (float)

access_history: deque
pattern_cache: dict[str, AccessPattern]
active_prefetches: set[str]
prefetch_queue: deque
analyze_access_pattern(data_key)[source]

Analyze access pattern for a specific data key.

Parameters:

data_key (str)

Return type:

AccessPattern

predict_next_access(data_key)[source]

Predict next data access and generate prefetch requests.

Parameters:

data_key (str)

Return type:

list[PrefetchRequest]

record_access(event)[source]

Record an access event for pattern analysis.

Parameters:

event (AccessEvent)

shutdown()[source]

Shutdown the prefetcher.

class xpcsviewer.utils.lazy_loader.IntelligentLazyLoader(max_memory_mb=1024.0, prefetch_memory_mb=256.0, enable_prefetching=True)[source]

Bases: object

Main lazy loading system with intelligent prefetching and memory management.

Parameters:
  • max_memory_mb (float)

  • prefetch_memory_mb (float)

  • enable_prefetching (bool)

__init__(max_memory_mb=1024.0, prefetch_memory_mb=256.0, enable_prefetching=True)[source]
Parameters:
  • max_memory_mb (float)

  • prefetch_memory_mb (float)

  • enable_prefetching (bool)

data_proxies: dict[str, LazyDataProxy]
weak_refs: WeakValueDictionary
access_history: deque
prefetcher: IntelligentPrefetcher | None
register_hdf5_data(data_key, hdf5_path, dataset_path, estimated_size_mb)[source]

Register HDF5 dataset for lazy loading.

Parameters:
  • data_key (str) – Unique identifier for this data

  • hdf5_path (str) – Path to HDF5 file

  • dataset_path (str) – Path within HDF5 file to dataset

  • estimated_size_mb (float) – Estimated size of dataset in MB

Returns:

Lazy data proxy

Return type:

LazyHDF5Array

get_data(data_key)[source]

Get lazy data proxy by key.

Parameters:

data_key (str)

Return type:

LazyDataProxy | None

get_memory_stats()[source]

Get memory usage statistics for lazy loader.

Return type:

dict[str, Any]

shutdown()[source]

Shutdown the lazy loader system.

xpcsviewer.utils.lazy_loader.get_lazy_loader()[source]

Get or create the global lazy loader instance.

Uses double-checked locking to be thread-safe under concurrent first access from multiple threads without paying the lock cost on every subsequent call. (BUG-030)

Return type:

IntelligentLazyLoader

xpcsviewer.utils.lazy_loader.register_lazy_hdf5(data_key, hdf5_path, dataset_path, estimated_size_mb)[source]

Convenience function for registering HDF5 data for lazy loading.

Parameters:
  • data_key (str)

  • hdf5_path (str)

  • dataset_path (str)

  • estimated_size_mb (float)

Return type:

LazyHDF5Array

xpcsviewer.utils.lazy_loader.shutdown_lazy_loader()[source]

Shutdown the global lazy loader.

Streaming Processor – Chunk-based data processing:

Streaming Data Processing for XPCS Viewer

This module provides memory-efficient streaming processing for large XPCS datasets, particularly for operations like logarithmic transformations of SAXS data.

class xpcsviewer.utils.streaming_processor.ChunkInfo(index, slice_obj, shape, estimated_size_mb, total_chunks)[source]

Bases: object

Information about a data chunk.

Parameters:
index: int
slice_obj: tuple[slice, ...]
shape: tuple[int, ...]
estimated_size_mb: float
total_chunks: int
__init__(index, slice_obj, shape, estimated_size_mb, total_chunks)
Parameters:
Return type:

None

xpcsviewer.utils.streaming_processor.calculate_chunk_slices(shape, dtype, chunk_size_mb)[source]

Calculate optimal chunk slices for streaming processing.

Parameters:
Return type:

list[tuple[slice, …]]

class xpcsviewer.utils.streaming_processor.StreamingProcessor(chunk_size_mb=50.0)[source]

Bases: ABC

Abstract base class for streaming data processors.

Parameters:

chunk_size_mb (float)

__init__(chunk_size_mb=50.0)[source]
Parameters:

chunk_size_mb (float)

abstractmethod process_chunk(chunk, chunk_info)[source]

Process a single data chunk.

Parameters:
  • chunk (ndarray)

  • chunk_info (ChunkInfo)

Return type:

Any

abstractmethod combine_chunks(processed_chunks, original_shape)[source]

Combine processed chunks into final result.

Parameters:
Return type:

Any

process_array_streaming(data, output_dtype=None)[source]

Process array data using streaming to reduce memory footprint.

Parameters:
  • data (np.ndarray) – Input array to process

  • output_dtype (np.dtype, optional) – Output data type for memory optimization

Returns:

Processed array

Return type:

np.ndarray

class xpcsviewer.utils.streaming_processor.SAXSLogProcessor(chunk_size_mb=50.0, epsilon=1e-10)[source]

Bases: StreamingProcessor

Streaming processor for SAXS logarithmic transformations.

Parameters:
__init__(chunk_size_mb=50.0, epsilon=1e-10)[source]
Parameters:
process_array_streaming(data, output_dtype=None)[source]

Two-pass streaming log transform that uses the global minimum positive value (across all chunks) as the replacement for non-positive pixels.

Pass 1 — find global_min across all chunks. Pass 2 — apply log10 with non-positive pixels replaced by global_min.

This overrides the single-pass base implementation to fix a correctness bug where using a per-chunk local minimum would produce wrong values for non-positive pixels in chunks whose local minimum differs from the global minimum (e.g. detector beamstop surrounded by high-intensity regions).

Parameters:
Return type:

ndarray

process_chunk(chunk, chunk_info)[source]

Process SAXS data chunk with logarithmic transformation.

Uses the global minimum positive value (set by process_array_streaming pass 1) as the replacement for non-positive pixels so that all chunks use a consistent floor value.

Parameters:
  • chunk (np.ndarray) – Input SAXS data chunk

  • chunk_info (ChunkInfo) – Information about the chunk

Returns:

Log-transformed chunk

Return type:

np.ndarray

combine_chunks(processed_chunks, original_shape)[source]

Combine processed log chunks into final result array.

Parameters:
  • processed_chunks (list) – List of (slice_obj, processed_chunk) tuples

  • original_shape (tuple[int, ...]) – Shape of original array

Returns:

Combined log-transformed array

Return type:

np.ndarray

class xpcsviewer.utils.streaming_processor.ROIProcessor(roi_mask, chunk_size_mb=50.0)[source]

Bases: StreamingProcessor

Streaming processor for ROI (Region of Interest) calculations.

Parameters:
__init__(roi_mask, chunk_size_mb=50.0)[source]
Parameters:
process_chunk(chunk, chunk_info)[source]

Process chunk for ROI calculations.

Parameters:
  • chunk (np.ndarray) – Input data chunk

  • chunk_info (ChunkInfo) – Information about the chunk

Returns:

ROI statistics for the chunk

Return type:

dict

combine_chunks(processed_chunks, original_shape)[source]

Combine ROI chunk results into final statistics.

Parameters:
  • processed_chunks (list) – List of (slice_obj, roi_stats) tuples

  • original_shape (tuple[int, ...]) – Shape of original array

Returns:

Combined ROI statistics

Return type:

dict

class xpcsviewer.utils.streaming_processor.AdaptiveChunkSizer(base_chunk_size_mb=50.0)[source]

Bases: object

Adaptive chunk sizing based on memory pressure and system performance.

Parameters:

base_chunk_size_mb (float)

__init__(base_chunk_size_mb=50.0)[source]
Parameters:

base_chunk_size_mb (float)

performance_history: list[float]
get_optimal_chunk_size(data_shape, data_dtype)[source]

Calculate optimal chunk size based on current conditions.

Parameters:
  • data_shape (tuple[int, ...]) – Shape of data to be processed

  • data_dtype (np.dtype) – Data type of array

Returns:

Optimal chunk size in MB

Return type:

float

xpcsviewer.utils.streaming_processor.process_saxs_log_streaming(data, chunk_size_mb=None, epsilon=1e-10)[source]

Convenience function for streaming SAXS log processing.

Parameters:
  • data (np.ndarray) – Input SAXS data

  • chunk_size_mb (float, optional) – Chunk size in MB (auto-calculated if None)

  • epsilon (float) – Small value for handling non-positive data

Returns:

Log-transformed SAXS data

Return type:

np.ndarray

xpcsviewer.utils.streaming_processor.process_roi_streaming(data, roi_mask, chunk_size_mb=None)[source]

Convenience function for streaming ROI processing.

Parameters:
  • data (np.ndarray) – Input data array

  • roi_mask (np.ndarray) – ROI mask array

  • chunk_size_mb (float, optional) – Chunk size in MB (auto-calculated if None)

Returns:

ROI statistics

Return type:

dict

class xpcsviewer.utils.streaming_processor.MemoryEfficientIterator(data, chunk_size_mb=50.0)[source]

Bases: object

Memory-efficient iterator for large arrays with automatic cleanup.

Parameters:
__init__(data, chunk_size_mb=50.0)[source]
Parameters:

Vectorized ROI – Optimized region-of-interest extraction:

Vectorized ROI (Region of Interest) Calculations for XPCS Viewer

This module provides highly optimized vectorized ROI calculations with advanced memory management and parallel processing capabilities for XPCS data analysis. Uses JAX backend for GPU acceleration when available.

class xpcsviewer.utils.vectorized_roi.ROIType(*values)[source]

Bases: Enum

Types of ROI calculations.

PIE = 'pie'
RING = 'ring'
PHI_RING = 'phi_ring'
RECTANGLE = 'rectangle'
POLYGON = 'polygon'
class xpcsviewer.utils.vectorized_roi.ROIParameters(roi_type, parameters, label='')[source]

Bases: object

Parameters for ROI calculation.

Parameters:
roi_type: ROIType
parameters: dict[str, Any]
label: str = ''
__init__(roi_type, parameters, label='')
Parameters:
Return type:

None

class xpcsviewer.utils.vectorized_roi.ROIResult(x_values, roi_data, roi_type, parameters, metadata, processing_time, memory_used_mb)[source]

Bases: object

Result of ROI calculation.

Parameters:
x_values: ndarray
roi_data: ndarray
roi_type: ROIType
parameters: dict[str, Any]
metadata: dict[str, Any]
processing_time: float
memory_used_mb: float
__init__(x_values, roi_data, roi_type, parameters, metadata, processing_time, memory_used_mb)
Parameters:
Return type:

None

class xpcsviewer.utils.vectorized_roi.VectorizedROICalculator(chunk_size_mb=100.0)[source]

Bases: ABC

Abstract base class for vectorized ROI calculators.

Uses JAX backend for GPU acceleration when available, with automatic vmap for batch processing.

Parameters:

chunk_size_mb (float)

__init__(chunk_size_mb=100.0)[source]
Parameters:

chunk_size_mb (float)

abstractmethod calculate_roi_mask(geometry_data, roi_params)[source]

Calculate the ROI mask for the given parameters.

Parameters:
Return type:

ndarray

abstractmethod process_roi_data(saxs_data, roi_mask, roi_params)[source]

Process SAXS data with the ROI mask.

Parameters:
Return type:

ROIResult

calculate_roi(saxs_data, geometry_data, roi_params, use_streaming=None)[source]

Main ROI calculation method with automatic streaming detection.

Parameters:
  • saxs_data (np.ndarray) – SAXS data array

  • geometry_data (Dict[str, np.ndarray]) – Geometry arrays (qmap, rmap, pmap, mask, etc.)

  • roi_params (ROIParameters) – ROI calculation parameters

  • use_streaming (bool, optional) – Whether to use streaming processing (auto-detected if None)

Returns:

ROI calculation result

Return type:

ROIResult

process_batch_vmap(frames, roi_mask)[source]

Process batch of frames using JAX vmap for GPU acceleration.

Parameters:
  • frames (np.ndarray) – Stack of frames [N, H, W]

  • roi_mask (np.ndarray) – ROI mask [H, W]

Returns:

Summed intensities for each frame [N]

Return type:

np.ndarray

class xpcsviewer.utils.vectorized_roi.PieROICalculator(chunk_size_mb=100.0)[source]

Bases: VectorizedROICalculator

Optimized calculator for pie-shaped ROI.

Parameters:

chunk_size_mb (float)

calculate_roi_mask(geometry_data, roi_params)[source]

Calculate pie ROI mask using vectorized operations.

Parameters:
Return type:

ndarray

process_roi_data(saxs_data, roi_mask, roi_params)[source]

Process pie ROI with optimized q-value binning.

Parameters:
Return type:

ROIResult

class xpcsviewer.utils.vectorized_roi.RingROICalculator(chunk_size_mb=100.0)[source]

Bases: VectorizedROICalculator

Optimized calculator for ring-shaped ROI.

Parameters:

chunk_size_mb (float)

calculate_roi_mask(geometry_data, roi_params)[source]

Calculate ring ROI mask using vectorized operations.

Parameters:
Return type:

ndarray

process_roi_data(saxs_data, roi_mask, roi_params)[source]

Process ring ROI with optimized angular binning.

Parameters:
Return type:

ROIResult

class xpcsviewer.utils.vectorized_roi.ParallelROIProcessor(max_workers=None)[source]

Bases: object

Process multiple ROIs in parallel for enhanced performance.

Parameters:

max_workers (int | None)

__init__(max_workers=None)[source]
Parameters:

max_workers (int | None)

calculate_multiple_rois(saxs_data, geometry_data, roi_list, use_parallel=True)[source]

Calculate multiple ROIs with optional parallel processing.

Parameters:
  • saxs_data (np.ndarray) – SAXS data array

  • geometry_data (Dict[str, np.ndarray]) – Geometry arrays

  • roi_list (List[ROIParameters]) – List of ROI parameters to calculate

  • use_parallel (bool) – Whether to use parallel processing

Returns:

List of ROI calculation results

Return type:

List[ROIResult]

xpcsviewer.utils.vectorized_roi.calculate_pie_roi(saxs_data, geometry_data, angle_range, **kwargs)[source]

Convenience function for pie ROI calculation.

Parameters:
Return type:

ROIResult

xpcsviewer.utils.vectorized_roi.calculate_ring_roi(saxs_data, geometry_data, radius_range, **kwargs)[source]

Convenience function for ring ROI calculation.

Parameters:
Return type:

ROIResult

xpcsviewer.utils.vectorized_roi.calculate_multiple_rois_parallel(saxs_data, geometry_data, roi_list, max_workers=None)[source]

Convenience function for parallel multiple ROI calculation.

Parameters:
Return type:

list[ROIResult]

Atomic I/O

Crash-safe JSON writes via atomic file replacement.

Atomic file I/O utilities.

Provides safe_json_write which writes JSON data to a temporary file then atomically replaces the target — preventing data corruption if the process is interrupted mid-write.

xpcsviewer.utils.atomic_io.safe_json_write(path, data, *, indent=2)[source]

Write data as JSON to path atomically.

Creates the parent directory if needed, writes to a temporary file in the same directory, then calls os.replace() to swap the temp file into place (atomic on POSIX).

Parameters:
  • path (str | Path) – Destination file path.

  • data (dict) – JSON-serialisable dictionary.

  • indent (int) – JSON indentation (default 2).

Raises:
  • OSError – If the write or replace fails.

  • TypeError – If data is not JSON-serialisable.

Return type:

None

Device Detection

GPU detection, system CUDA discovery, JAX backend diagnostics, and CUDA plugin conflict detection.

GPU detection and warning utilities (System CUDA version).

This module provides utilities for detecting GPU hardware, system CUDA installation, and JAX backend configuration. It helps users identify when GPU acceleration is available but not being used.

Example usage:
>>> from xpcsviewer.utils.device import check_gpu_availability, get_device_info
>>> check_gpu_availability()  # Prints warning if GPU available but not used
>>> info = get_device_info()  # Get comprehensive device information
xpcsviewer.utils.device.get_system_cuda_version()[source]

Detect system CUDA version from nvcc.

Returns:

Tuple of (full_version, major_version) or (None, None) if not found. Example: (“12.6”, 12) or (“13.0”, 13)

Return type:

tuple[str | None, int | None]

Examples

>>> version, major = get_system_cuda_version()
>>> if major is not None:
...     print(f"CUDA {version} detected")
xpcsviewer.utils.device.get_gpu_info()[source]

Detect GPU name and SM version.

Returns:

Tuple of (gpu_name, sm_version) or (None, None) if not found. Example: (“NVIDIA GeForce RTX 4090”, 8.9)

Return type:

tuple[str | None, float | None]

Examples

>>> name, sm = get_gpu_info()
>>> if name is not None:
...     print(f"GPU: {name} (SM {sm})")
xpcsviewer.utils.device.get_recommended_package()[source]

Get recommended JAX package based on system CUDA.

Returns:

Package name like “jax[cuda12-local]” or “jax[cuda13-local]”, or None if no compatible setup found.

Return type:

str | None

Examples

>>> pkg = get_recommended_package()
>>> if pkg:
...     print(f"Install with: pip install {pkg}")
xpcsviewer.utils.device.check_plugin_conflicts()[source]

Check for known JAX CUDA plugin conflicts.

Returns:

List of issue descriptions (empty = no issues).

Return type:

list[str]

Examples

>>> issues = check_plugin_conflicts()
>>> for issue in issues:
...     print(f"WARNING: {issue}")
xpcsviewer.utils.device.check_gpu_availability(warn=True)[source]

Check if GPU is available but not being used by JAX.

Prints a helpful warning if GPU hardware and system CUDA are detected but JAX is running in CPU-only mode.

Parameters:

warn (bool, optional) – If True, print warning when GPU available but not used, by default True.

Returns:

True if GPU is being used by JAX, False otherwise.

Return type:

bool

Examples

>>> if not check_gpu_availability():
...     print("Consider enabling GPU acceleration")
xpcsviewer.utils.device.get_device_info()[source]

Get comprehensive device information.

Returns:

Dictionary containing: - jax_version: JAX version string - jax_backend: Current backend (cpu, gpu) - devices: List of device strings - gpu_count: Number of GPU devices - using_gpu: Boolean - gpu_hardware: GPU name - gpu_sm_version: SM version (float) - system_cuda_version: System CUDA version string - system_cuda_major: System CUDA major version (int) - recommended_package: Recommended JAX package - plugin_issues: List of detected plugin conflict descriptions

Return type:

dict

Examples

>>> info = get_device_info()
>>> print(f"JAX backend: {info['jax_backend']}")
>>> if info['using_gpu']:
...     print(f"Using {info['gpu_count']} GPU(s)")

Health Monitoring

Background health monitoring for resource usage and reliability metrics.

Background Health Monitoring System for XPCS Viewer.

This module provides non-intrusive background monitoring using existing thread pools to track system health, resource usage, and reliability metrics without impacting performance of core operations.

class xpcsviewer.utils.health_monitor.HealthStatus(*values)[source]

Bases: Enum

Overall system health status levels.

EXCELLENT = 'excellent'
GOOD = 'good'
WARNING = 'warning'
CRITICAL = 'critical'
EMERGENCY = 'emergency'
class xpcsviewer.utils.health_monitor.ResourceType(*values)[source]

Bases: Enum

Types of system resources to monitor.

MEMORY = 'memory'
CPU = 'cpu'
DISK = 'disk'
THREADS = 'threads'
HDF5_CONNECTIONS = 'hdf5_connections'
GUI_RESPONSIVENESS = 'gui_responsiveness'
class xpcsviewer.utils.health_monitor.HealthMetric(name, current_value, threshold_warning, threshold_critical, unit='', history=<factory>, last_updated=<factory>)[source]

Bases: object

Individual health metric with thresholds and history.

Parameters:
name: str
current_value: float
threshold_warning: float
threshold_critical: float
unit: str = ''
history: deque
last_updated: float
update(value)[source]

Update metric value and history.

Parameters:

value (float)

Return type:

None

get_status()[source]

Get current status based on thresholds.

Return type:

HealthStatus

get_trend(window_minutes=5.0)[source]

Get trend over specified time window.

Parameters:

window_minutes (float)

Return type:

str

__init__(name, current_value, threshold_warning, threshold_critical, unit='', history=<factory>, last_updated=<factory>)
Parameters:
Return type:

None

class xpcsviewer.utils.health_monitor.HealthMonitor(monitoring_interval=30.0, history_size=100)[source]

Bases: object

Non-intrusive health monitoring system using background threads.

Monitors system resources, application state, and reliability metrics without impacting performance of core XPCS operations.

Parameters:
  • monitoring_interval (float)

  • history_size (int)

__init__(monitoring_interval=30.0, history_size=100)[source]
Parameters:
  • monitoring_interval (float)

  • history_size (int)

start_monitoring()[source]

Start background health monitoring.

Return type:

None

stop_monitoring()[source]

Stop background health monitoring.

The lock is released before joining the monitor thread to avoid a potential deadlock: the monitor loop may attempt to acquire _lock (e.g. via get_health_summary), so holding it while waiting for the thread to finish would block both sides (SRE-6).

Return type:

None

register_health_callback(status, callback)[source]

Register callback for specific health status.

Warning: Callbacks are invoked on the daemon monitoring thread. They must be thread-safe. For Qt GUI updates, use signals or QMetaObject.invokeMethod to marshal work to the main thread.

Parameters:
  • status (HealthStatus)

  • callback (Callable)

Return type:

None

register_periodic_callback(callback)[source]

Register a callable to be invoked once per monitoring interval.

This allows subsystems to share the health monitor’s daemon thread instead of spawning their own background threads (BUG-050). Each registered callback is called with no arguments from the monitoring loop. Callbacks must be thread-safe and must not block for longer than monitoring_interval seconds.

Warning: Callbacks are invoked on the daemon monitoring thread.

Parameters:

callback (Callable)

Return type:

None

unregister_periodic_callback(callback)[source]

Remove a previously registered periodic callback.

Parameters:

callback (Callable)

Return type:

None

track_object(obj)[source]

Track object for health monitoring.

Parameters:

obj (Any)

Return type:

None

get_health_summary()[source]

Get comprehensive health summary.

Return type:

dict[str, Any]

get_performance_impact()[source]

Get monitoring performance impact statistics.

Return type:

dict[str, float]

xpcsviewer.utils.health_monitor.get_health_monitor()[source]

Get or create the global health monitor instance.

Return type:

HealthMonitor

xpcsviewer.utils.health_monitor.start_health_monitoring(interval=30.0)[source]

Start background health monitoring.

Parameters:

interval (float)

Return type:

None

xpcsviewer.utils.health_monitor.stop_health_monitoring()[source]

Stop background health monitoring.

Return type:

None

xpcsviewer.utils.health_monitor.get_health_status()[source]

Get current health status summary.

Return type:

dict[str, Any]

xpcsviewer.utils.health_monitor.register_health_callback(status, callback)[source]

Register callback for specific health status changes.

Parameters:
  • status (HealthStatus)

  • callback (Callable)

Return type:

None

xpcsviewer.utils.health_monitor.track_object_health(obj)[source]

Track object for health monitoring.

Parameters:

obj (Any)

Return type:

None

class xpcsviewer.utils.health_monitor.health_monitoring_context(operation_name, alert_on_warning=True)[source]

Bases: object

Context manager for automatic health monitoring during operations.

Parameters:
  • operation_name (str)

  • alert_on_warning (bool)

__init__(operation_name, alert_on_warning=True)[source]
Parameters:
  • operation_name (str)

  • alert_on_warning (bool)

start_metrics: dict[str, float]

Memory Prediction

Predictive memory pressure detection for proactive memory management in XPCS data analysis workflows.

Predictive Memory Pressure Detection for XPCS Viewer

This module provides advanced memory pressure prediction specifically tailored for XPCS data analysis workflows, enabling proactive memory management.

class xpcsviewer.utils.memory_predictor.MemoryPrediction(predicted_mb, confidence, time_horizon_seconds, pressure_level, recommended_actions)[source]

Bases: object

Container for memory usage predictions.

Parameters:
  • predicted_mb (float)

  • confidence (float)

  • time_horizon_seconds (float)

  • pressure_level (MemoryPressure)

  • recommended_actions (list[str])

predicted_mb: float
confidence: float
time_horizon_seconds: float
pressure_level: MemoryPressure
recommended_actions: list[str]
__init__(predicted_mb, confidence, time_horizon_seconds, pressure_level, recommended_actions)
Parameters:
  • predicted_mb (float)

  • confidence (float)

  • time_horizon_seconds (float)

  • pressure_level (MemoryPressure)

  • recommended_actions (list[str])

Return type:

None

class xpcsviewer.utils.memory_predictor.OperationProfile(operation_type, avg_memory_mb, peak_memory_mb, duration_seconds, input_size_correlation)[source]

Bases: object

Memory profile for a specific operation type.

Parameters:
  • operation_type (str)

  • avg_memory_mb (float)

  • peak_memory_mb (float)

  • duration_seconds (float)

  • input_size_correlation (float)

operation_type: str
avg_memory_mb: float
peak_memory_mb: float
duration_seconds: float
input_size_correlation: float
__init__(operation_type, avg_memory_mb, peak_memory_mb, duration_seconds, input_size_correlation)
Parameters:
  • operation_type (str)

  • avg_memory_mb (float)

  • peak_memory_mb (float)

  • duration_seconds (float)

  • input_size_correlation (float)

Return type:

None

class xpcsviewer.utils.memory_predictor.XPCSMemoryPredictor(history_size=1000)[source]

Bases: object

Advanced memory pressure predictor for XPCS workflows.

This predictor learns from historical memory usage patterns and provides proactive warnings before memory pressure situations.

Parameters:

history_size (int)

__init__(history_size=1000)[source]
Parameters:

history_size (int)

memory_history: deque
operation_history: deque
operation_profiles: dict[str, OperationProfile]
record_operation(operation_type, input_size_mb, memory_before_mb, memory_after_mb, duration_seconds)[source]

Record memory usage for a completed operation.

Parameters:
  • operation_type (str) – Type of operation (‘load_saxs_2d’, ‘fit_g2’, etc.)

  • input_size_mb (float) – Size of input data in MB

  • memory_before_mb (float) – Memory usage before operation

  • memory_after_mb (float) – Memory usage after operation

  • duration_seconds (float) – How long the operation took

predict_operation_memory(operation_type, input_size_mb=0.0)[source]

Predict memory usage for a planned operation.

Parameters:
  • operation_type (str) – Type of operation to predict

  • input_size_mb (float) – Expected input data size

Returns:

Prediction with memory estimate and recommendations

Return type:

MemoryPrediction

detect_memory_trends()[source]

Analyze memory usage trends to predict future pressure.

Returns:

Trend metrics including growth rate and volatility

Return type:

dict[str, float]

check_proactive_cleanup_needed()[source]

Check if proactive cleanup is recommended based on trends.

Returns:

(cleanup_needed, reasons)

Return type:

tuple[bool, list[str]]

update_memory_snapshot()[source]

Update the memory usage snapshot for trend analysis.

get_prediction_summary()[source]

Get a summary of current memory predictions and trends.

Return type:

dict[str, Any]

xpcsviewer.utils.memory_predictor.get_memory_predictor()[source]

Get or create the global memory predictor instance.

Return type:

XPCSMemoryPredictor

xpcsviewer.utils.memory_predictor.predict_operation_memory(operation_type, input_size_mb=0.0)[source]

Convenience function for memory prediction.

Returns a cheap no-op prediction when MONITORING_ENABLED is False.

Parameters:
  • operation_type (str)

  • input_size_mb (float)

Return type:

MemoryPrediction

xpcsviewer.utils.memory_predictor.record_operation_memory(operation_type, input_size_mb, memory_before_mb, memory_after_mb, duration_seconds)[source]

Convenience function for recording operation memory usage.

No-op when MONITORING_ENABLED is False.

Parameters:
  • operation_type (str)

  • input_size_mb (float)

  • memory_before_mb (float)

  • memory_after_mb (float)

  • duration_seconds (float)

Reliability Manager

Unified entry point for enabling and configuring reliability features.

Unified Reliability Manager for XPCS Viewer.

This module provides a single entry point for enabling and configuring all reliability features while maintaining zero performance loss.

class xpcsviewer.utils.reliability_manager.ReliabilityProfile(*values)[source]

Bases: Enum

Predefined reliability profiles balancing safety vs performance.

MINIMAL = 'minimal'
BALANCED = 'balanced'
STRICT = 'strict'
PARANOID = 'paranoid'
class xpcsviewer.utils.reliability_manager.ReliabilityConfig(profile=ReliabilityProfile.BALANCED, enable_exception_conversion=True, enable_fallback_strategies=True, validation_level=ValidationLevel.STANDARD, enable_validation_caching=True, cache_size_limit=1000, enable_health_monitoring=True, health_monitoring_interval=30.0, enable_automatic_recovery=True, enable_state_validation=True, state_validation_level=ValidationLevel.STANDARD, state_monitoring_interval=60.0, max_cpu_overhead_percent=2.0, max_memory_overhead_mb=50.0, enable_performance_monitoring=True)[source]

Bases: object

Configuration for reliability features.

Parameters:
  • profile (ReliabilityProfile)

  • enable_exception_conversion (bool)

  • enable_fallback_strategies (bool)

  • validation_level (ValidationLevel)

  • enable_validation_caching (bool)

  • cache_size_limit (int)

  • enable_health_monitoring (bool)

  • health_monitoring_interval (float)

  • enable_automatic_recovery (bool)

  • enable_state_validation (bool)

  • state_validation_level (ValidationLevel)

  • state_monitoring_interval (float)

  • max_cpu_overhead_percent (float)

  • max_memory_overhead_mb (float)

  • enable_performance_monitoring (bool)

profile: ReliabilityProfile = 'balanced'
enable_exception_conversion: bool = True
enable_fallback_strategies: bool = True
validation_level: ValidationLevel = 'standard'
enable_validation_caching: bool = True
cache_size_limit: int = 1000
enable_health_monitoring: bool = True
health_monitoring_interval: float = 30.0
enable_automatic_recovery: bool = True
enable_state_validation: bool = True
state_validation_level: ValidationLevel = 'standard'
state_monitoring_interval: float = 60.0
max_cpu_overhead_percent: float = 2.0
max_memory_overhead_mb: float = 50.0
enable_performance_monitoring: bool = True
apply_profile()[source]

Apply predefined profile settings.

Return type:

None

__init__(profile=ReliabilityProfile.BALANCED, enable_exception_conversion=True, enable_fallback_strategies=True, validation_level=ValidationLevel.STANDARD, enable_validation_caching=True, cache_size_limit=1000, enable_health_monitoring=True, health_monitoring_interval=30.0, enable_automatic_recovery=True, enable_state_validation=True, state_validation_level=ValidationLevel.STANDARD, state_monitoring_interval=60.0, max_cpu_overhead_percent=2.0, max_memory_overhead_mb=50.0, enable_performance_monitoring=True)
Parameters:
  • profile (ReliabilityProfile)

  • enable_exception_conversion (bool)

  • enable_fallback_strategies (bool)

  • validation_level (ValidationLevel)

  • enable_validation_caching (bool)

  • cache_size_limit (int)

  • enable_health_monitoring (bool)

  • health_monitoring_interval (float)

  • enable_automatic_recovery (bool)

  • enable_state_validation (bool)

  • state_validation_level (ValidationLevel)

  • state_monitoring_interval (float)

  • max_cpu_overhead_percent (float)

  • max_memory_overhead_mb (float)

  • enable_performance_monitoring (bool)

Return type:

None

class xpcsviewer.utils.reliability_manager.XPCSReliabilityManager(config=None)[source]

Bases: object

Unified manager for all XPCS Viewer reliability features.

Provides single-point configuration and monitoring with guaranteed performance characteristics.

Parameters:

config (ReliabilityConfig | None)

__init__(config=None)[source]
Parameters:

config (ReliabilityConfig | None)

initialize(validate_performance=True)[source]

Initialize all reliability features according to configuration.

Parameters:

validate_performance (bool) – Whether to validate performance impact

Returns:

True if initialization successful and within performance limits

Return type:

bool

get_status()[source]

Get comprehensive reliability system status.

Return type:

dict[str, Any]

shutdown()[source]

Shutdown all reliability features.

Return type:

None

reconfigure(new_config)[source]

Reconfigure reliability features at runtime.

Parameters:

new_config (ReliabilityConfig)

Return type:

bool

classmethod create_from_environment()[source]

Create reliability manager from environment variables.

Return type:

XPCSReliabilityManager

xpcsviewer.utils.reliability_manager.get_reliability_manager()[source]

Get or create the global reliability manager.

Return type:

XPCSReliabilityManager

xpcsviewer.utils.reliability_manager.initialize_reliability(profile=ReliabilityProfile.BALANCED, validate_performance=True)[source]

Initialize XPCS reliability features with specified profile.

Parameters:
  • profile (ReliabilityProfile)

  • validate_performance (bool)

Return type:

bool

xpcsviewer.utils.reliability_manager.get_reliability_status()[source]

Get current reliability system status.

Return type:

dict[str, Any]

xpcsviewer.utils.reliability_manager.shutdown_reliability()[source]

Shutdown reliability features.

Return type:

None

xpcsviewer.utils.reliability_manager.enable_production_reliability()[source]

Enable reliability features optimized for production use.

Return type:

bool

xpcsviewer.utils.reliability_manager.enable_development_reliability()[source]

Enable reliability features optimized for development use.

Return type:

bool

xpcsviewer.utils.reliability_manager.enable_minimal_reliability()[source]

Enable minimal reliability features for maximum performance.

Return type:

bool

xpcsviewer.utils.reliability_manager.enable_maximum_reliability()[source]

Enable maximum reliability features for critical applications.

Return type:

bool

Startup Optimization

Application startup performance optimization via lazy loading, parallel initialization, and resource preloading.

Application Startup Performance Optimization for XPCS Viewer

This module optimizes application startup time by implementing lazy loading, parallel initialization, and intelligent resource preloading strategies.

class xpcsviewer.utils.startup_optimizer.StartupMetrics(component_name, start_time, end_time, duration, memory_before_mb, memory_after_mb, memory_delta_mb, success=True, error_message='')[source]

Bases: object

Container for startup performance metrics.

Parameters:
component_name: str
start_time: float
end_time: float
duration: float
memory_before_mb: float
memory_after_mb: float
memory_delta_mb: float
success: bool = True
error_message: str = ''
__init__(component_name, start_time, end_time, duration, memory_before_mb, memory_after_mb, memory_delta_mb, success=True, error_message='')
Parameters:
Return type:

None

class xpcsviewer.utils.startup_optimizer.ComponentInfo(name, init_func, dependencies, priority, lazy_load=False, critical=True)[source]

Bases: object

Information about a startup component.

Parameters:
name: str
init_func: Callable
dependencies: set[str]
priority: int
lazy_load: bool = False
critical: bool = True
__init__(name, init_func, dependencies, priority, lazy_load=False, critical=True)
Parameters:
Return type:

None

class xpcsviewer.utils.startup_optimizer.LazyImportManager[source]

Bases: object

Manages lazy importing of heavy modules to speed up startup.

__init__()[source]
register_lazy_import(alias, module_name)[source]

Register a module for lazy importing.

Parameters:
  • alias (str) – Alias to use for the module

  • module_name (str) – Full module name to import

get_module(alias)[source]

Get a module, importing it lazily if needed.

Parameters:

alias (str) – Module alias

Returns:

Imported module

Return type:

Any

preload_modules(aliases, background=True)[source]

Preload modules in the background.

Parameters:
  • aliases (list[str]) – List of module aliases to preload

  • background (bool) – Whether to load in background thread

get_load_statistics()[source]

Get module loading statistics.

Return type:

dict[str, float]

class xpcsviewer.utils.startup_optimizer.StartupProfiler[source]

Bases: object

Profiles application startup performance.

__init__()[source]
metrics: list[StartupMetrics]
start_startup_profiling()[source]

Start overall startup profiling.

end_startup_profiling()[source]

End overall startup profiling.

profile_component(component_name)[source]

Context manager for profiling component initialization.

Parameters:

component_name (str) – Name of the component being initialized

add_metrics(metrics)[source]

Add component metrics.

Parameters:

metrics (StartupMetrics)

export_metrics(filepath)[source]

Export startup metrics to file.

Parameters:

filepath (str)

class xpcsviewer.utils.startup_optimizer.StartupComponentProfiler(profiler, component_name)[source]

Bases: object

Context manager for profiling individual startup components.

Parameters:
  • profiler (StartupProfiler)

  • component_name (str)

__init__(profiler, component_name)[source]
Parameters:
  • profiler (StartupProfiler)

  • component_name (str)

class xpcsviewer.utils.startup_optimizer.ParallelStartupManager(max_workers=None)[source]

Bases: object

Manages parallel initialization of application components.

Parameters:

max_workers (int | None)

__init__(max_workers=None)[source]
Parameters:

max_workers (int | None)

components: dict[str, ComponentInfo]
initialized_components: set[str]
register_component(name, init_func, dependencies=None, priority=5, lazy_load=False, critical=True)[source]

Register a component for initialization.

Parameters:
  • name (str) – Component name

  • init_func (Callable) – Initialization function

  • dependencies (set[str], optional) – Set of component names this depends on

  • priority (int) – Priority level (lower = higher priority)

  • lazy_load (bool) – Whether to load lazily on first use

  • critical (bool) – Whether failure should stop startup

initialize_all()[source]

Initialize all registered components in optimal order.

Returns:

True if all critical components initialized successfully

Return type:

bool

get_startup_metrics()[source]

Get startup profiling metrics.

Return type:

StartupProfiler

class xpcsviewer.utils.startup_optimizer.ConfigurationManager[source]

Bases: object

Manages application configuration for startup optimization.

__init__()[source]
load_config(config_path=None)[source]

Load configuration from file.

Parameters:

config_path (str | None)

get(key_path, default=None)[source]

Get configuration value by dot-separated path.

Parameters:
  • key_path (str)

  • default (Any)

Return type:

Any

xpcsviewer.utils.startup_optimizer.get_lazy_import_manager()[source]

Get global lazy import manager.

Return type:

LazyImportManager

xpcsviewer.utils.startup_optimizer.get_startup_manager()[source]

Get global startup manager.

Return type:

ParallelStartupManager

xpcsviewer.utils.startup_optimizer.get_config_manager()[source]

Get global configuration manager.

Return type:

ConfigurationManager

xpcsviewer.utils.startup_optimizer.lazy_import(alias, module_name)[source]

Register a module for lazy importing.

Parameters:
  • alias (str)

  • module_name (str)

xpcsviewer.utils.startup_optimizer.get_module(alias)[source]

Get a lazily imported module.

Parameters:

alias (str)

xpcsviewer.utils.startup_optimizer.register_startup_component(name, init_func, **kwargs)[source]

Register a component for parallel startup initialization.

Parameters:
xpcsviewer.utils.startup_optimizer.initialize_application()[source]

Initialize the entire application with optimizations.

Return type:

bool

xpcsviewer.utils.startup_optimizer.profile_startup(component_name)[source]

Context manager for profiling startup components.

Parameters:

component_name (str)

State Validation

Lock-free state consistency validation using atomic operations and weak references.

Lock-Free State Consistency Validation for XPCS Viewer.

This module provides high-performance state validation using atomic operations, weak references, and lock-free data structures to ensure object consistency without blocking critical operations.

class xpcsviewer.utils.state_validator.StateTransition(*values)[source]

Bases: Enum

Valid state transitions for tracked objects.

INITIALIZING = 'initializing'
READY = 'ready'
PROCESSING = 'processing'
CACHED = 'cached'
INVALID = 'invalid'
DESTROYED = 'destroyed'
class xpcsviewer.utils.state_validator.StateSnapshot(object_id, state, checksum, timestamp, critical_attributes=<factory>)[source]

Bases: object

Immutable snapshot of object state for consistency checking.

Parameters:
  • object_id (int)

  • state (StateTransition)

  • checksum (str)

  • timestamp (float)

  • critical_attributes (dict[str, Any])

object_id: int
state: StateTransition
checksum: str
timestamp: float
critical_attributes: dict[str, Any]
__init__(object_id, state, checksum, timestamp, critical_attributes=<factory>)
Parameters:
  • object_id (int)

  • state (StateTransition)

  • checksum (str)

  • timestamp (float)

  • critical_attributes (dict[str, Any])

Return type:

None

class xpcsviewer.utils.state_validator.StateRecord(snapshot, version, last_validated)

Bases: tuple

last_validated

Alias for field number 2

snapshot

Alias for field number 0

version

Alias for field number 1

class xpcsviewer.utils.state_validator.AtomicCounter(initial_value=0)[source]

Bases: object

Thread-safe atomic counter using threading primitives.

Parameters:

initial_value (int)

__init__(initial_value=0)[source]
Parameters:

initial_value (int)

increment()[source]

Atomically increment and return new value.

Return type:

int

get()[source]

Get current value.

Return type:

int

set(value)[source]

Set value atomically.

Parameters:

value (int)

Return type:

None

class xpcsviewer.utils.state_validator.LockFreeStateValidator(validation_level=ValidationLevel.STANDARD)[source]

Bases: object

Lock-free state consistency validator using atomic operations and weak references.

Provides high-performance state tracking without blocking operations. Uses copy-on-write semantics for state snapshots and atomic version counters.

Parameters:

validation_level (ValidationLevel)

__init__(validation_level=ValidationLevel.STANDARD)[source]
Parameters:

validation_level (ValidationLevel)

register_object(obj, initial_state=StateTransition.INITIALIZING)[source]

Register object for state tracking using weak references.

Parameters:
  • obj (Any)

  • initial_state (StateTransition)

Return type:

None

update_object_state(obj, new_state, **critical_attributes)[source]

Update object state with validation (lock-free).

Parameters:
  • obj (Any)

  • new_state (StateTransition)

Return type:

bool

validate_object_consistency(obj)[source]

Validate object consistency against stored state (lock-free).

Parameters:

obj (Any)

Return type:

tuple[bool, list[str]]

validate_all_objects()[source]

Validate consistency of all tracked objects.

Return type:

dict[str, Any]

start_background_validation(interval=60.0)[source]

Start background consistency validation.

Parameters:

interval (float)

Return type:

None

stop_background_validation()[source]

Stop background consistency validation.

Return type:

None

get_statistics()[source]

Get state validation statistics.

Return type:

dict[str, Any]

cleanup_destroyed_objects()[source]

Clean up state records for destroyed objects.

Return type:

int

xpcsviewer.utils.state_validator.get_state_validator(level=ValidationLevel.STANDARD)[source]

Get or create the global state validator instance.

Parameters:

level (ValidationLevel)

Return type:

LockFreeStateValidator

xpcsviewer.utils.state_validator.track_object_state(obj, initial_state=StateTransition.INITIALIZING)[source]

Register object for state consistency tracking.

Parameters:
  • obj (Any)

  • initial_state (StateTransition)

Return type:

None

xpcsviewer.utils.state_validator.update_object_state(obj, new_state, **critical_attributes)[source]

Update tracked object state.

Parameters:
  • obj (Any)

  • new_state (StateTransition)

Return type:

bool

xpcsviewer.utils.state_validator.validate_object_state(obj)[source]

Validate object state consistency.

Parameters:

obj (Any)

Return type:

tuple[bool, list[str]]

xpcsviewer.utils.state_validator.start_state_monitoring(interval=60.0, level=ValidationLevel.STANDARD)[source]

Start background state consistency monitoring.

Parameters:
  • interval (float)

  • level (ValidationLevel)

Return type:

None

xpcsviewer.utils.state_validator.stop_state_monitoring()[source]

Stop background state consistency monitoring.

Return type:

None

xpcsviewer.utils.state_validator.get_state_statistics()[source]

Get state validation statistics.

Return type:

dict[str, Any]

xpcsviewer.utils.state_validator.track_state(initial_state=StateTransition.INITIALIZING)[source]

Decorator to automatically track object state.

Parameters:

initial_state (StateTransition)

class xpcsviewer.utils.state_validator.state_validation_context(obj, expected_final_state)[source]

Bases: object

Context manager for automatic state validation during operations.

Parameters:
  • obj (Any)

  • expected_final_state (StateTransition)

__init__(obj, expected_final_state)[source]
Parameters:
  • obj (Any)

  • expected_final_state (StateTransition)

initial_issues: list[str]

Visualization Optimization

Performance optimization for the visualization layer including PyQtGraph and matplotlib backends.

This module conditionally imports PyQtGraph and cannot be introspected during the documentation build. See the source at xpcsviewer/utils/visualization_optimizer.py for the full API.

See Also

  • Constants - Application-wide configuration constants

  • Backends - Backend abstraction layer