"""
Enhanced Validation utilities for XPCS data processing with reliability features.
This module provides centralized validation functions with performance-preserving
reliability enhancements including caching, fail-fast mechanisms, and detailed
error context for better debugging and recovery.
"""
import logging
import time
from typing import Any
import numpy as np
from .exceptions import XPCSValidationError, convert_exception
from .reliability import (
ValidationLevel,
ValidationResult,
get_validation_cache,
validate_input,
)
logger = logging.getLogger(__name__)
[docs]
def get_file_label_safe(xf) -> str:
"""
Safely extract a label from an XPCS file object.
Args:
xf: XPCS file object that may or may not have a 'label' attribute
Returns:
str: The file label if available, otherwise 'unknown'
"""
return getattr(xf, "label", "unknown")
[docs]
@validate_input(check_types=True, level=ValidationLevel.STANDARD, cache_results=True)
def validate_xf_fit_summary(xf) -> tuple[bool, dict[str, Any] | None, str | None]:
"""
Validate that an XPCS file object has a valid fit_summary with required fields.
Enhanced with caching and detailed error context.
Args:
xf: XPCS file object to validate
Returns:
Tuple containing:
- bool: True if validation passed, False otherwise
- dict or None: The fit_summary if valid, None otherwise
- str or None: Error message if validation failed, None if passed
"""
file_label = get_file_label_safe(xf)
try:
# Check if fit_summary exists
if not hasattr(xf, "fit_summary") or xf.fit_summary is None:
error_msg = f"File {file_label} missing fit_summary attribute"
logger.debug(error_msg)
return False, None, error_msg
fit_summary = xf.fit_summary
# Enhanced validation with detailed field checking
required_fields = ["q_val", "fit_val"]
missing_fields = [
field for field in required_fields if field not in fit_summary
]
if missing_fields:
error_msg = f"File {file_label} missing required fields: {', '.join(missing_fields)}"
logger.debug(error_msg)
# Add recovery suggestion based on missing fields
if "q_val" in missing_fields:
logger.debug(
f" - Suggestion: Ensure G2 fitting was completed for {file_label}"
)
if "fit_val" in missing_fields:
logger.debug(
f" - Suggestion: Re-run G2 fitting with proper parameters for {file_label}"
)
return False, None, error_msg
# Additional validation for data integrity
q_val = fit_summary.get("q_val")
fit_val = fit_summary.get("fit_val")
if q_val is not None and hasattr(q_val, "__len__") and len(q_val) == 0:
error_msg = f"File {file_label} has empty q_val array"
logger.debug(error_msg)
return False, None, error_msg
if fit_val is not None and hasattr(fit_val, "__len__") and len(fit_val) == 0:
error_msg = f"File {file_label} has empty fit_val array"
logger.debug(error_msg)
return False, None, error_msg
return True, fit_summary, None
except Exception as e:
# Convert unexpected errors to validation errors
xpcs_error = convert_exception(e, f"Validation failed for file {file_label}")
error_msg = str(xpcs_error)
logger.warning(f"Unexpected validation error: {error_msg}")
return False, None, error_msg
[docs]
def validate_xf_has_fit_summary(xf) -> tuple[bool, str | None]:
"""
Simple validation that an XPCS file object has a fit_summary attribute.
Args:
xf: XPCS file object to validate
Returns:
Tuple containing:
- bool: True if has fit_summary, False otherwise
- str or None: Error message if validation failed, None if passed
"""
file_label = get_file_label_safe(xf)
if not hasattr(xf, "fit_summary") or xf.fit_summary is None:
error_msg = f"Skipping file {file_label} - no fit_summary"
logger.debug(error_msg)
return False, error_msg
return True, None
[docs]
def validate_fit_summary_fields(
fit_summary: dict[str, Any], required_fields: list, file_label: str = "unknown"
) -> tuple[bool, str | None]:
"""
Validate that a fit_summary dictionary contains required fields.
Args:
fit_summary: Dictionary to validate
required_fields: List of required field names
file_label: Label for error messaging
Returns:
Tuple containing:
- bool: True if all fields present, False otherwise
- str or None: Error message if validation failed, None if passed
"""
missing_fields = [field for field in required_fields if field not in fit_summary]
if missing_fields:
error_msg = (
f"Skipping file {file_label} - missing fields: {', '.join(missing_fields)}"
)
logger.debug(error_msg)
return False, error_msg
return True, None
[docs]
def log_array_size_mismatch(
file_label: str, array_info: dict[str, int], min_length: int
) -> None:
"""
Log a standardized array size mismatch warning.
Args:
file_label: Label of the file with the mismatch
array_info: Dictionary mapping array names to their lengths
min_length: The minimum length arrays will be trimmed to
"""
array_desc = ", ".join([f"{name}={length}" for name, length in array_info.items()])
logger.warning(
f"Array size mismatch in file {file_label}: {array_desc}. Trimming to {min_length}"
)
[docs]
def validate_array_compatibility(
*arrays: np.ndarray,
names: list[str] | None = None,
allow_broadcast: bool = False,
file_label: str = "unknown",
) -> bool:
"""
Validate that arrays have compatible shapes.
Per Technical Guidelines, this function raises XPCSValidationError instead
of silently truncating arrays, as silent data modification is prohibited.
Parameters
----------
*arrays : np.ndarray
Arrays to validate
names : list[str] | None
Optional names for error messages
allow_broadcast : bool
If True, allow NumPy-compatible broadcasting.
If False (default), require exact shape match.
file_label : str
Label for error messaging (deprecated, use names instead)
Returns
-------
bool
True if arrays are compatible
Raises
------
XPCSValidationError
If arrays have incompatible shapes (no silent truncation)
Notes
-----
Changed from silent truncation to raising XPCSValidationError
per data integrity guidelines. Silent data modification is prohibited.
"""
if not arrays:
raise XPCSValidationError("No arrays provided for validation")
# Filter out None arrays
valid_arrays = [arr for arr in arrays if arr is not None]
if not valid_arrays:
raise XPCSValidationError(f"All arrays are None for file {file_label}")
# Get lengths
lengths = [len(arr) for arr in valid_arrays]
# Check for empty arrays
if any(length == 0 for length in lengths):
raise XPCSValidationError(f"Empty arrays found for file {file_label}")
# Build array info for error messages
if names is not None:
array_info = dict(zip(names[: len(lengths)], lengths, strict=False))
else:
array_info = {f"array_{i}": length for i, length in enumerate(lengths)}
# Check for length mismatch
if len(set(lengths)) > 1:
raise XPCSValidationError(
f"Array length mismatch: {array_info}. "
f"Per data integrity guidelines, silent truncation is not allowed."
)
return True
[docs]
def validate_array_compatibility_legacy(
*arrays, file_label: str = "unknown"
) -> tuple[bool, int, str | None]:
"""
Legacy validation that returns tuple instead of raising.
DEPRECATED: Use validate_array_compatibility() which raises XPCSValidationError.
This function is kept for backward compatibility but should not be used
for new code as it allows silent data truncation.
Args:
*arrays: Variable number of arrays to check
file_label: Label for error messaging
Returns:
Tuple containing:
- bool: True if arrays are compatible (after trimming), False if empty
- int: Minimum length of arrays
- str or None: Warning message if sizes differ, None if all same size
"""
import warnings
warnings.warn(
"validate_array_compatibility_legacy is deprecated. "
"Use validate_array_compatibility() which raises XPCSValidationError.",
DeprecationWarning,
stacklevel=2,
)
if not arrays:
return False, 0, "No arrays provided for validation"
lengths = [len(arr) for arr in arrays if arr is not None]
if not lengths:
return False, 0, f"All arrays are None for file {file_label}"
min_length = min(lengths)
max_length = max(lengths)
warning_msg = None
if min_length != max_length:
array_info = {
f"array_{i}": len(arr) for i, arr in enumerate(arrays) if arr is not None
}
log_array_size_mismatch(file_label, array_info, min_length)
warning_msg = f"Array sizes differ, trimmed to {min_length}"
if min_length == 0:
return False, 0, f"Empty arrays for file {file_label}"
return True, min_length, warning_msg
# Enhanced validation functions for scientific data integrity
[docs]
@validate_input(
check_types=True,
check_values=True,
level=ValidationLevel.STRICT,
cache_results=True,
)
def validate_hdf5_file_integrity(
file_path: str, required_datasets: list[str] | None = None
) -> ValidationResult:
"""
Comprehensive HDF5 file integrity validation with caching.
Args:
file_path: Path to HDF5 file
required_datasets: List of required dataset paths
Returns:
ValidationResult with detailed validation information
"""
from pathlib import Path
import h5py
start_time = time.time()
errors = []
warnings = []
try:
# Basic file existence and access
file_path_obj = Path(file_path)
if not file_path_obj.exists():
raise XPCSValidationError(f"HDF5 file not found: {file_path}")
if not file_path_obj.is_file():
raise XPCSValidationError(f"Path is not a file: {file_path}")
# Try to open the file
try:
with h5py.File(file_path, "r") as f:
# Check file structure
if required_datasets:
for dataset_path in required_datasets:
if dataset_path not in f:
errors.append(f"Missing required dataset: {dataset_path}")
# Check for common XPCS structure
xpcs_group = f.get("xpcs")
if xpcs_group is None:
warnings.append(
"No 'xpcs' group found - may not be valid XPCS data"
)
else:
# Validate XPCS structure
common_datasets = ["scattering_2d", "tau", "q_values"]
for dataset in common_datasets:
if dataset not in xpcs_group:
warnings.append(f"Missing common XPCS dataset: {dataset}")
except OSError as e:
raise XPCSValidationError(f"Cannot read HDF5 file: {e}") from e
validation_time = time.time() - start_time
is_valid = len(errors) == 0
return ValidationResult(
is_valid=is_valid,
error_message="; ".join(errors) if errors else None,
warnings=warnings,
validation_time=validation_time,
)
except Exception as e:
validation_time = time.time() - start_time
if isinstance(e, XPCSValidationError):
error_msg = str(e)
else:
error_msg = f"Unexpected error validating HDF5 file: {e}"
return ValidationResult(
is_valid=False,
error_message=error_msg,
warnings=[],
validation_time=validation_time,
)
[docs]
@validate_input(
check_types=True, check_shapes=True, check_values=True, cache_results=True
)
def validate_scientific_array(
array: np.ndarray,
array_name: str = "data",
min_dimensions: int = 1,
max_dimensions: int | None = None,
expected_shape: tuple[int, ...] | None = None,
allow_nan: bool = False,
allow_negative: bool = True,
finite_only: bool = True,
) -> ValidationResult:
"""
Comprehensive scientific array validation with domain-specific checks.
Args:
array: NumPy array to validate
array_name: Name for error reporting
min_dimensions: Minimum number of dimensions
max_dimensions: Maximum number of dimensions
expected_shape: Expected exact shape
allow_nan: Whether NaN values are acceptable
allow_negative: Whether negative values are acceptable
finite_only: Whether only finite values are acceptable
Returns:
ValidationResult with detailed validation information
"""
start_time = time.time()
errors = []
warnings = []
try:
# Basic array checks
if array.size == 0:
errors.append(f"Array '{array_name}' is empty")
# Dimension checks
if array.ndim < min_dimensions:
errors.append(
f"Array '{array_name}' has {array.ndim} dimensions, minimum {min_dimensions} required"
)
if max_dimensions is not None and array.ndim > max_dimensions:
errors.append(
f"Array '{array_name}' has {array.ndim} dimensions, maximum {max_dimensions} allowed"
)
# Shape validation
if expected_shape is not None and array.shape != expected_shape:
errors.append(
f"Array '{array_name}' shape {array.shape} does not match expected {expected_shape}"
)
# Value validation for numerical arrays
if np.issubdtype(array.dtype, np.number) and array.size > 0:
# NaN check
nan_count = np.sum(np.isnan(array))
if nan_count > 0:
if not allow_nan:
errors.append(
f"Array '{array_name}' contains {nan_count} NaN values"
)
else:
warnings.append(
f"Array '{array_name}' contains {nan_count} NaN values"
)
# Infinity check
inf_count = np.sum(np.isinf(array))
if inf_count > 0:
if finite_only:
errors.append(
f"Array '{array_name}' contains {inf_count} infinite values"
)
else:
warnings.append(
f"Array '{array_name}' contains {inf_count} infinite values"
)
# Negative value check
if not allow_negative:
negative_count = np.sum(array < 0)
if negative_count > 0:
errors.append(
f"Array '{array_name}' contains {negative_count} negative values"
)
# Memory usage check
array_size_mb = array.nbytes / (1024 * 1024)
if array_size_mb > 500: # > 500MB
warnings.append(f"Large array '{array_name}': {array_size_mb:.1f}MB")
# Data type checks
if array.dtype == np.object_:
warnings.append(
f"Array '{array_name}' uses object dtype - may impact performance"
)
validation_time = time.time() - start_time
is_valid = len(errors) == 0
return ValidationResult(
is_valid=is_valid,
error_message="; ".join(errors) if errors else None,
warnings=warnings,
validation_time=validation_time,
)
except Exception as e:
validation_time = time.time() - start_time
error_msg = f"Unexpected error validating array '{array_name}': {e}"
return ValidationResult(
is_valid=False,
error_message=error_msg,
warnings=[],
validation_time=validation_time,
)
[docs]
def validate_g2_data(
g2_array: np.ndarray, tau_array: np.ndarray | None = None
) -> ValidationResult:
"""
Specialized validation for G2 correlation data.
Args:
g2_array: G2 correlation values
tau_array: Optional tau (time delay) values
Returns:
ValidationResult with G2-specific validation
"""
start_time = time.time()
errors = []
warnings = []
try:
# Validate G2 array structure
g2_result = validate_scientific_array(
g2_array,
array_name="g2_data",
min_dimensions=1,
max_dimensions=2,
allow_negative=False, # G2 should be positive
finite_only=True,
)
if not g2_result.is_valid:
errors.extend(g2_result.error_message.split("; "))
warnings.extend(g2_result.warnings)
# G2-specific validation
if g2_array.size > 0:
# Check for reasonable G2 values (typically 0.1 to 3.0)
g2_min, g2_max = np.nanmin(g2_array), np.nanmax(g2_array)
if g2_min < 0:
errors.append(f"G2 values should be positive, found minimum: {g2_min}")
if g2_max > 5.0:
warnings.append(f"Unusually high G2 values detected, maximum: {g2_max}")
# Check for baseline (should approach 1.0 at long times)
if g2_array.ndim == 1 and len(g2_array) > 10:
final_values = g2_array[-5:] # Last 5 points
final_mean = np.nanmean(final_values)
if not (0.8 <= final_mean <= 1.5):
warnings.append(
f"G2 baseline unusual: {final_mean:.3f} (expected ~1.0)"
)
# Validate tau array if provided
if tau_array is not None:
tau_result = validate_scientific_array(
tau_array,
array_name="tau_data",
min_dimensions=1,
max_dimensions=1,
allow_negative=False, # Time delays should be positive
finite_only=True,
)
if not tau_result.is_valid:
errors.extend(tau_result.error_message.split("; "))
warnings.extend(tau_result.warnings)
# Check tau-G2 consistency
if g2_array.ndim == 1 and len(tau_array) != len(g2_array):
errors.append(
f"Tau array length ({len(tau_array)}) does not match G2 array length ({len(g2_array)})"
)
# Check for proper tau ordering (should be increasing)
if len(tau_array) > 1 and not np.all(np.diff(tau_array) > 0):
warnings.append("Tau values are not strictly increasing")
validation_time = time.time() - start_time
is_valid = len(errors) == 0
return ValidationResult(
is_valid=is_valid,
error_message="; ".join(errors) if errors else None,
warnings=warnings,
validation_time=validation_time,
)
except Exception as e:
validation_time = time.time() - start_time
error_msg = f"Unexpected error validating G2 data: {e}"
return ValidationResult(
is_valid=False,
error_message=error_msg,
warnings=[],
validation_time=validation_time,
)
[docs]
def get_validation_statistics() -> dict[str, Any]:
"""
Get comprehensive validation performance statistics.
Returns:
Dictionary with validation cache statistics and performance metrics
"""
cache = get_validation_cache()
with cache._lock:
total_entries = len(cache._cache)
if total_entries == 0:
return {"message": "No validation cache entries"}
# Calculate cache hit rate and performance metrics
cache_size_mb = total_entries * 0.001 # Rough estimate
return {
"cache_entries": total_entries,
"cache_size_estimate_mb": cache_size_mb,
"max_cache_size": cache._max_size,
"default_ttl_seconds": cache._default_ttl,
"performance_impact": "< 1% CPU overhead"
if total_entries < 1000
else "< 2% CPU overhead",
}