File I/O

HDF5 data reading and Q-space mapping utilities for XPCS datasets.

HDF5 Reader

Optimized HDF5 file reading with connection pooling and batch operations. Supports both synchronous and asynchronous reading patterns.

HDF5 file reading utilities with connection pooling.

Provides optimized HDF5 file reading with connection pooling, batch operations, and automatic resource management for efficient data access.

Classes:

HDF5ConnectionPool: Thread-safe connection pool for HDF5 files HDF5Reader: High-level reader with caching and batch operations

class xpcsviewer.fileIO.hdf_reader.PooledConnection(file_handle, file_path)[source]

Bases: object

Wrapper for pooled HDF5 connections with metadata.

Parameters:
  • file_handle (File)

  • file_path (str)

__init__(file_handle, file_path)[source]
Parameters:
  • file_handle (File)

  • file_path (str)

touch()[source]

Update access time and count.

Return type:

None

check_health()[source]

Check if connection is still healthy.

Return type:

bool

close()[source]

Close the connection safely.

Return type:

None

class xpcsviewer.fileIO.hdf_reader.ConnectionStats[source]

Bases: object

Statistics tracker for HDF5 connections.

__init__()[source]
Return type:

None

record_connection_created()[source]
Return type:

None

record_connection_reused()[source]
Return type:

None

record_connection_evicted()[source]
Return type:

None

record_health_check(success)[source]
Parameters:

success (bool)

record_cache_miss()[source]
Return type:

None

record_io_time(duration)[source]
Parameters:

duration (float)

get_stats()[source]
Return type:

dict[str, Any]

class xpcsviewer.fileIO.hdf_reader.HDF5ConnectionPool(max_pool_size=20, health_check_interval=300.0, enable_memory_pressure_adaptation=True)[source]

Bases: object

Enhanced connection pool for HDF5 files with LRU eviction, health monitoring, and comprehensive I/O performance tracking.

Features: - LRU eviction policy with configurable pool size - Connection health monitoring with automatic cleanup - Performance statistics and monitoring - Thread-safe operations with fine-grained locking - Memory pressure detection and adaptive sizing - Batch read operations optimization

Parameters:
  • max_pool_size (int)

  • health_check_interval (float)

  • enable_memory_pressure_adaptation (bool)

__init__(max_pool_size=20, health_check_interval=300.0, enable_memory_pressure_adaptation=True)[source]
Parameters:
  • max_pool_size (int)

  • health_check_interval (float)

  • enable_memory_pressure_adaptation (bool)

get_connection(fname, mode='r')[source]

Enhanced context manager to get an HDF5 file connection with comprehensive health monitoring, LRU management, and performance tracking.

Lock ordering guarantee (BUG-008): All lock acquisitions follow a strict total order — _pool_lock (pool-level operations) first. File locks (_get_file_lock) are never held at the same time as _pool_lock. Health checks and memory-pressure adaptation that require _pool_lock are performed before acquiring any file lock so the ordering is always: pool operations then yield (no lock held). This eliminates the deadlock where Thread A holds _pool_lock (via a yield inside the old with self._pool_lock: block) while Thread B waits for _pool_lock from inside its own file_lock critical section.

Parameters:
  • fname (str) – Path to HDF5 file

  • mode (str) – File access mode (default: ‘r’)

Yields:

h5py.File – HDF5 file handle

clear_pool(from_destructor=False)[source]

Close all connections and clear the pool.

get_pool_stats()[source]

Get comprehensive pool statistics.

Return type:

dict[str, Any]

force_health_check()[source]

Force an immediate health check of all connections.

remove_unhealthy_file(fname)[source]

Remove a file from the unhealthy files set.

Parameters:

fname (str)

batch_read_datasets(fname, dataset_paths, use_cache=True)[source]

Optimized batch reading of multiple datasets from the same file.

Parameters:
  • fname (str) – HDF5 file path

  • dataset_paths (List[str]) – List of dataset paths to read

  • use_cache (bool) – Whether to use read cache

Returns:

Dictionary mapping dataset paths to their values

Return type:

Dict[str, Any]

clear_read_cache(fname=None)[source]

Clear read cache for specific file or all files.

Parameters:

fname (str | None)

xpcsviewer.fileIO.hdf_reader.put(save_path, result, ftype='nexus', mode='raw')[source]

save the result to hdf5 file :param save_path: path to save the result :type save_path: str :param result: dictionary to save :type result: dict :param ftype: file type, ‘nexus’ or ‘aps_8idi’ :type ftype: str :param mode: ‘raw’ or ‘alias’ :type mode: str

xpcsviewer.fileIO.hdf_reader.get_abs_cs_scale(fname, ftype='nexus', use_pool=True)[source]
xpcsviewer.fileIO.hdf_reader.read_metadata_to_dict(file_path, use_pool=True)[source]

Reads an HDF5 file and loads its contents into a nested dictionary. Optimized to read all metadata groups in one file open operation.

Parameters:
  • file_path (str) – Path to the HDF5 file.

  • use_pool (bool) – Whether to use connection pool for optimization.

Returns:

A nested dictionary containing datasets as NumPy arrays.

Return type:

dict

xpcsviewer.fileIO.hdf_reader.get(fname, fields, mode='raw', ret_type='dict', ftype='nexus', use_pool=True)[source]

get the values for the various keys listed in fields for a single file;

Parameters:
  • fname

  • fields_raw – list of keys [key1, key2, …, ]

  • mode – [‘raw’ | ‘alias’]; alias is defined in .hdf_key otherwise the raw hdf key will be used

  • ret_type – return dictonary if ‘dict’, list if it is ‘list’

  • use_pool – whether to use connection pool for optimization

Returns:

dictionary or dictionary;

xpcsviewer.fileIO.hdf_reader.get_analysis_type(fname, ftype='nexus', use_pool=True)[source]

determine the analysis type of the file :param fname: file name :type fname: str :param ftype: file type, ‘nexus’ or ‘legacy’ :type ftype: str :param use_pool: whether to use connection pool for optimization :type use_pool: bool

Returns:

analysis type, ‘Twotime’ or ‘Multitau’, or both

Return type:

tuple

xpcsviewer.fileIO.hdf_reader.batch_read_fields(fname, fields, mode='raw', ftype='nexus', use_pool=True)[source]

Optimized batch reading of multiple fields from HDF5 file.

Parameters:
  • fname (str) – HDF5 file path

  • fields (List[str]) – List of field names to read

  • mode (str) – ‘raw’ or ‘alias’ mode

  • ftype (str) – File type (‘nexus’ or ‘legacy’)

  • use_pool (bool) – Whether to use connection pooling

Returns:

Dictionary of field values

Return type:

Dict[str, Any]

xpcsviewer.fileIO.hdf_reader.get_file_info(fname, use_pool=True)[source]

Get basic file information and statistics.

Parameters:
  • fname (str) – HDF5 file path

  • use_pool (bool) – Whether to use connection pooling

Returns:

File information dictionary

Return type:

Dict[str, Any]

xpcsviewer.fileIO.hdf_reader.get_connection_pool_stats()[source]

Get comprehensive statistics about the global connection pool.

Returns:

Connection pool statistics

Return type:

Dict[str, Any]

xpcsviewer.fileIO.hdf_reader.clear_connection_pool()[source]

Clear all connections in the global connection pool.

xpcsviewer.fileIO.hdf_reader.force_connection_health_check()[source]

Force an immediate health check of all pooled connections.

xpcsviewer.fileIO.hdf_reader.get_chunked_dataset(fname, dataset_path, chunk_size=None, use_pool=True)[source]

Read a large dataset in chunks to manage memory usage.

Parameters:
  • fname (str) – HDF5 file path

  • dataset_path (str) – Path to dataset within HDF5 file

  • chunk_size (Tuple[int, ...], optional) – Size of chunks to read. If None, will use dataset’s native chunking

  • use_pool (bool) – Whether to use connection pooling

Returns:

The dataset array

Return type:

np.ndarray

Enhanced HDF5 Reader

Advanced HDF5 reader with additional caching and performance optimizations. Built on top of the base HDF5 reader with extended functionality.

Enhanced HDF5 Reader with Intelligent Chunking and Read-ahead Caching

This module provides optimized HDF5 I/O operations with intelligent caching, chunking strategies, and read-ahead mechanisms for XPCS data analysis.

class xpcsviewer.fileIO.hdf_reader_enhanced.AccessPattern(*values)[source]

Bases: Enum

HDF5 data access patterns for optimization.

SEQUENTIAL = 'sequential'
RANDOM = 'random'
BLOCK = 'block'
SPARSE = 'sparse'
class xpcsviewer.fileIO.hdf_reader_enhanced.ReadRequest(file_path, dataset_path, slice_info, priority=0.5, requested_time=0.0, access_pattern=AccessPattern.RANDOM)[source]

Bases: object

Request for HDF5 data reading.

Parameters:
  • file_path (str)

  • dataset_path (str)

  • slice_info (tuple[slice, ...] | None)

  • priority (float)

  • requested_time (float)

  • access_pattern (AccessPattern)

file_path: str
dataset_path: str
slice_info: tuple[slice, ...] | None
priority: float = 0.5
requested_time: float = 0.0
access_pattern: AccessPattern = 'random'
__init__(file_path, dataset_path, slice_info, priority=0.5, requested_time=0.0, access_pattern=AccessPattern.RANDOM)
Parameters:
  • file_path (str)

  • dataset_path (str)

  • slice_info (tuple[slice, ...] | None)

  • priority (float)

  • requested_time (float)

  • access_pattern (AccessPattern)

Return type:

None

class xpcsviewer.fileIO.hdf_reader_enhanced.CacheEntry(data, file_path, dataset_path, slice_info, access_count=0, last_accessed=0.0, created_time=0.0, size_mb=0.0, access_pattern=AccessPattern.RANDOM)[source]

Bases: object

Enhanced cache entry for HDF5 data.

Parameters:
data: ndarray
file_path: str
dataset_path: str
slice_info: tuple[slice, ...] | None
access_count: int = 0
last_accessed: float = 0.0
created_time: float = 0.0
size_mb: float = 0.0
access_pattern: AccessPattern = 'random'
__init__(data, file_path, dataset_path, slice_info, access_count=0, last_accessed=0.0, created_time=0.0, size_mb=0.0, access_pattern=AccessPattern.RANDOM)
Parameters:
Return type:

None

class xpcsviewer.fileIO.hdf_reader_enhanced.IntelligentChunker[source]

Bases: object

Intelligent chunking strategy for HDF5 datasets based on access patterns.

__init__()[source]
analyze_access_pattern(file_path, dataset_path, recent_accesses)[source]

Analyze access pattern from recent slice requests.

Parameters:
  • file_path (str) – Path to HDF5 file

  • dataset_path (str) – Path to dataset within file

  • recent_accesses (list[tuple[slice, ...]]) – Recent access slice patterns

Returns:

Detected access pattern

Return type:

AccessPattern

get_optimal_chunk_shape(dataset_shape, dtype, access_pattern, target_chunk_mb=10.0)[source]

Calculate optimal chunk shape for dataset based on access pattern.

Parameters:
  • dataset_shape (tuple[int, ...]) – Shape of the dataset

  • dtype (np.dtype) – Data type of the dataset

  • access_pattern (AccessPattern) – Detected access pattern

  • target_chunk_mb (float) – Target chunk size in MB

Returns:

Optimal chunk shape

Return type:

tuple[int, …]

class xpcsviewer.fileIO.hdf_reader_enhanced.ReadAheadCache(max_cache_mb=200.0)[source]

Bases: object

Intelligent read-ahead cache for HDF5 data based on access patterns.

Parameters:

max_cache_mb (float)

__init__(max_cache_mb=200.0)[source]
Parameters:

max_cache_mb (float)

cache: OrderedDict[str, CacheEntry]
access_patterns: dict[str, deque[tuple[float, tuple[slice, ...] | None]]]
prediction_history: dict[str, list[Any]]
record_access(file_path, dataset_path, slice_info)[source]

Record an access for pattern analysis.

Parameters:
predict_next_access(file_path, dataset_path, current_slice)[source]

Predict next likely access patterns for read-ahead.

Parameters:
  • file_path (str) – Path to HDF5 file

  • dataset_path (str) – Path to dataset

  • current_slice (Optional[tuple[slice, ...]]) – Current slice being accessed

Returns:

Predicted next slice accesses

Return type:

list[tuple[slice, …]]

get_cached_data(file_path, dataset_path, slice_info)[source]

Get cached data if available.

Parameters:
Return type:

ndarray | None

cache_data(file_path, dataset_path, slice_info, data)[source]
Parameters:
clear_cache()[source]

Clear all cached data.

get_cache_stats()[source]

Get cache statistics.

Return type:

dict[str, Any]

class xpcsviewer.fileIO.hdf_reader_enhanced.EnhancedHDF5Reader(max_cache_mb=200.0, enable_read_ahead=True)[source]

Bases: object

Enhanced HDF5 reader with intelligent chunking and read-ahead caching.

Parameters:
  • max_cache_mb (float)

  • enable_read_ahead (bool)

__init__(max_cache_mb=200.0, enable_read_ahead=True)[source]
Parameters:
  • max_cache_mb (float)

  • enable_read_ahead (bool)

stats: dict[str, Any]
get_file_connection(file_path)[source]

Get HDF5 file connection with connection pooling.

Parameters:

file_path (str)

read_dataset(file_path, dataset_path, slice_info=None, enable_read_ahead=None)[source]

Read dataset with intelligent caching and read-ahead.

Parameters:
  • file_path (str) – Path to HDF5 file

  • dataset_path (str) – Path to dataset within file

  • slice_info (Optional[tuple[slice, ...]]) – Slice to read (None for full dataset)

  • enable_read_ahead (Optional[bool]) – Override global read-ahead setting

Returns:

Requested data

Return type:

np.ndarray

read_multiple_datasets(file_path, dataset_paths, slice_info=None)[source]

Efficiently read multiple datasets from the same file.

Parameters:
  • file_path (str) – Path to HDF5 file

  • dataset_paths (list[str]) – List of dataset paths to read

  • slice_info (Optional[tuple[slice, ...]]) – Slice to apply to all datasets

Returns:

Dictionary of dataset_path -> data

Return type:

dict[str, np.ndarray]

get_dataset_info(file_path, dataset_path)[source]

Get dataset metadata without loading data.

Parameters:
  • file_path (str) – Path to HDF5 file

  • dataset_path (str) – Path to dataset

Returns:

Dataset metadata

Return type:

dict[str, Any]

optimize_chunking_for_dataset(file_path, dataset_path, access_pattern=None)[source]

Get optimal chunk shape for dataset based on access pattern.

Parameters:
  • file_path (str) – Path to HDF5 file

  • dataset_path (str) – Path to dataset

  • access_pattern (Optional[AccessPattern]) – Known access pattern (auto-detected if None)

Returns:

Optimal chunk shape

Return type:

tuple[int, …]

clear_caches()[source]

Clear all caches.

get_performance_stats()[source]

Get comprehensive performance statistics.

Return type:

dict[str, Any]

xpcsviewer.fileIO.hdf_reader_enhanced.get_enhanced_hdf5_reader()[source]

Get or create the global enhanced HDF5 reader.

Return type:

EnhancedHDF5Reader

xpcsviewer.fileIO.hdf_reader_enhanced.get_enhanced_reader()[source]

Alias for get_enhanced_hdf5_reader for backward compatibility.

Return type:

EnhancedHDF5Reader

xpcsviewer.fileIO.hdf_reader_enhanced.read_hdf5_optimized(file_path, dataset_path, slice_info=None)[source]

Convenience function for optimized HDF5 reading.

Parameters:
Return type:

ndarray

xpcsviewer.fileIO.hdf_reader_enhanced.read_multiple_hdf5_optimized(file_path, dataset_paths, slice_info=None)[source]

Convenience function for reading multiple datasets.

Parameters:
Return type:

dict[str, ndarray]

Q-space Mapping

Detector geometry calculations and Q-space coordinate transformations. Essential for converting pixel coordinates to reciprocal space.

xpcsviewer.fileIO.qmap_utils.Q_UNIT_DISPLAY = 'Å⁻¹'

Canonical display string for inverse Ångström.

class xpcsviewer.fileIO.qmap_utils.QMapManager[source]

Bases: object

__init__()[source]
get_qmap(fname)[source]
class xpcsviewer.fileIO.qmap_utils.QMap(fname=None, root_key='/xpcs/qmap')[source]

Bases: object

__init__(fname=None, root_key='/xpcs/qmap')[source]
load_dataset()[source]
reshape_phi_analysis(compressed_data_raw, label='data', mode='saxs_1d')[source]

the saxs1d and stability data are compressed. the values of the empty static bins are not saved. this function reshapes the array and fills the empty bins with nan. nanmean is performed to get the correct results;

get_detector_extent()[source]

Optimized detector extent calculation with caching. get the angular extent on the detector, for saxs2d, qmap/display; :return:

get(key, default=None)[source]

Provide dictionary-like access to QMap attributes.

get_qmap_at_pos(x, y)[source]
create_qbin_labels()[source]
get_qbin_label(qbin, append_qbin=False)[source]
Parameters:

qbin (int)

get_qbin_in_qrange(qrange, zero_based=True)[source]

Optimized q-bin selection with improved vectorization and caching.

get_qbinlist_at_qindex(qindex, zero_based=True)[source]
compute_qmap()[source]

Optimized qmap computation with improved vectorization and memory efficiency.

xpcsviewer.fileIO.qmap_utils.get_hash(fname, root_key='/xpcs/qmap')[source]

Extracts the hash from the HDF5 file.

xpcsviewer.fileIO.qmap_utils.get_qmap(fname, **kwargs)[source]
xpcsviewer.fileIO.qmap_utils.test_qmap_manager()[source]

APS 8-IDI Beamline Support

Beamline-specific data structure definitions and format handlers. Supports both “nexus” and legacy data formats from APS-8IDI.

File Type Utilities

File format detection and validation utilities for XPCS data files.

File type detection utilities for XPCS data files.

Provides functions to identify HDF5 file formats:

  • isNeXusFile: Check for NeXus format (APS-8IDI beamline)

  • isLegacyFile: Check for legacy XPCS format

  • get_ftype: Determine file type (‘nexus’, ‘legacy’, or False)

xpcsviewer.fileIO.ftype_utils.isNeXusFile(fname)[source]

Check if file is in NeXus format.

Parameters:

fname – Path to HDF5 file.

Returns:

True if file contains NeXus metadata structure.

xpcsviewer.fileIO.ftype_utils.isLegacyFile(fname)[source]
xpcsviewer.fileIO.ftype_utils.get_ftype(fname)[source]
Parameters:

fname (str)