Source code for xpcsviewer.utils.validation

"""
Enhanced Validation utilities for XPCS data processing with reliability features.

This module provides centralized validation functions with performance-preserving
reliability enhancements including caching, fail-fast mechanisms, and detailed
error context for better debugging and recovery.
"""

import logging
import time
from typing import Any

import numpy as np

from .exceptions import XPCSValidationError, convert_exception
from .reliability import (
    ValidationLevel,
    ValidationResult,
    get_validation_cache,
    validate_input,
)

logger = logging.getLogger(__name__)


[docs] def get_file_label_safe(xf) -> str: """ Safely extract a label from an XPCS file object. Args: xf: XPCS file object that may or may not have a 'label' attribute Returns: str: The file label if available, otherwise 'unknown' """ return getattr(xf, "label", "unknown")
[docs] @validate_input(check_types=True, level=ValidationLevel.STANDARD, cache_results=True) def validate_xf_fit_summary(xf) -> tuple[bool, dict[str, Any] | None, str | None]: """ Validate that an XPCS file object has a valid fit_summary with required fields. Enhanced with caching and detailed error context. Args: xf: XPCS file object to validate Returns: Tuple containing: - bool: True if validation passed, False otherwise - dict or None: The fit_summary if valid, None otherwise - str or None: Error message if validation failed, None if passed """ file_label = get_file_label_safe(xf) try: # Check if fit_summary exists if not hasattr(xf, "fit_summary") or xf.fit_summary is None: error_msg = f"File {file_label} missing fit_summary attribute" logger.debug(error_msg) return False, None, error_msg fit_summary = xf.fit_summary # Enhanced validation with detailed field checking required_fields = ["q_val", "fit_val"] missing_fields = [ field for field in required_fields if field not in fit_summary ] if missing_fields: error_msg = f"File {file_label} missing required fields: {', '.join(missing_fields)}" logger.debug(error_msg) # Add recovery suggestion based on missing fields if "q_val" in missing_fields: logger.debug( f" - Suggestion: Ensure G2 fitting was completed for {file_label}" ) if "fit_val" in missing_fields: logger.debug( f" - Suggestion: Re-run G2 fitting with proper parameters for {file_label}" ) return False, None, error_msg # Additional validation for data integrity q_val = fit_summary.get("q_val") fit_val = fit_summary.get("fit_val") if q_val is not None and hasattr(q_val, "__len__") and len(q_val) == 0: error_msg = f"File {file_label} has empty q_val array" logger.debug(error_msg) return False, None, error_msg if fit_val is not None and hasattr(fit_val, "__len__") and len(fit_val) == 0: error_msg = f"File {file_label} has empty fit_val array" logger.debug(error_msg) return False, None, error_msg return True, fit_summary, None except Exception as e: # Convert unexpected errors to validation errors xpcs_error = convert_exception(e, f"Validation failed for file {file_label}") error_msg = str(xpcs_error) logger.warning(f"Unexpected validation error: {error_msg}") return False, None, error_msg
[docs] def validate_xf_has_fit_summary(xf) -> tuple[bool, str | None]: """ Simple validation that an XPCS file object has a fit_summary attribute. Args: xf: XPCS file object to validate Returns: Tuple containing: - bool: True if has fit_summary, False otherwise - str or None: Error message if validation failed, None if passed """ file_label = get_file_label_safe(xf) if not hasattr(xf, "fit_summary") or xf.fit_summary is None: error_msg = f"Skipping file {file_label} - no fit_summary" logger.debug(error_msg) return False, error_msg return True, None
[docs] def validate_fit_summary_fields( fit_summary: dict[str, Any], required_fields: list, file_label: str = "unknown" ) -> tuple[bool, str | None]: """ Validate that a fit_summary dictionary contains required fields. Args: fit_summary: Dictionary to validate required_fields: List of required field names file_label: Label for error messaging Returns: Tuple containing: - bool: True if all fields present, False otherwise - str or None: Error message if validation failed, None if passed """ missing_fields = [field for field in required_fields if field not in fit_summary] if missing_fields: error_msg = ( f"Skipping file {file_label} - missing fields: {', '.join(missing_fields)}" ) logger.debug(error_msg) return False, error_msg return True, None
[docs] def log_array_size_mismatch( file_label: str, array_info: dict[str, int], min_length: int ) -> None: """ Log a standardized array size mismatch warning. Args: file_label: Label of the file with the mismatch array_info: Dictionary mapping array names to their lengths min_length: The minimum length arrays will be trimmed to """ array_desc = ", ".join([f"{name}={length}" for name, length in array_info.items()]) logger.warning( f"Array size mismatch in file {file_label}: {array_desc}. Trimming to {min_length}" )
[docs] def validate_array_compatibility( *arrays: np.ndarray, names: list[str] | None = None, allow_broadcast: bool = False, file_label: str = "unknown", ) -> bool: """ Validate that arrays have compatible shapes. Per Technical Guidelines, this function raises XPCSValidationError instead of silently truncating arrays, as silent data modification is prohibited. Parameters ---------- *arrays : np.ndarray Arrays to validate names : list[str] | None Optional names for error messages allow_broadcast : bool If True, allow NumPy-compatible broadcasting. If False (default), require exact shape match. file_label : str Label for error messaging (deprecated, use names instead) Returns ------- bool True if arrays are compatible Raises ------ XPCSValidationError If arrays have incompatible shapes (no silent truncation) Notes ----- Changed from silent truncation to raising XPCSValidationError per data integrity guidelines. Silent data modification is prohibited. """ if not arrays: raise XPCSValidationError("No arrays provided for validation") # Filter out None arrays valid_arrays = [arr for arr in arrays if arr is not None] if not valid_arrays: raise XPCSValidationError(f"All arrays are None for file {file_label}") # Get lengths lengths = [len(arr) for arr in valid_arrays] # Check for empty arrays if any(length == 0 for length in lengths): raise XPCSValidationError(f"Empty arrays found for file {file_label}") # Build array info for error messages if names is not None: array_info = dict(zip(names[: len(lengths)], lengths, strict=False)) else: array_info = {f"array_{i}": length for i, length in enumerate(lengths)} # Check for length mismatch if len(set(lengths)) > 1: raise XPCSValidationError( f"Array length mismatch: {array_info}. " f"Per data integrity guidelines, silent truncation is not allowed." ) return True
[docs] def validate_array_compatibility_legacy( *arrays, file_label: str = "unknown" ) -> tuple[bool, int, str | None]: """ Legacy validation that returns tuple instead of raising. DEPRECATED: Use validate_array_compatibility() which raises XPCSValidationError. This function is kept for backward compatibility but should not be used for new code as it allows silent data truncation. Args: *arrays: Variable number of arrays to check file_label: Label for error messaging Returns: Tuple containing: - bool: True if arrays are compatible (after trimming), False if empty - int: Minimum length of arrays - str or None: Warning message if sizes differ, None if all same size """ import warnings warnings.warn( "validate_array_compatibility_legacy is deprecated. " "Use validate_array_compatibility() which raises XPCSValidationError.", DeprecationWarning, stacklevel=2, ) if not arrays: return False, 0, "No arrays provided for validation" lengths = [len(arr) for arr in arrays if arr is not None] if not lengths: return False, 0, f"All arrays are None for file {file_label}" min_length = min(lengths) max_length = max(lengths) warning_msg = None if min_length != max_length: array_info = { f"array_{i}": len(arr) for i, arr in enumerate(arrays) if arr is not None } log_array_size_mismatch(file_label, array_info, min_length) warning_msg = f"Array sizes differ, trimmed to {min_length}" if min_length == 0: return False, 0, f"Empty arrays for file {file_label}" return True, min_length, warning_msg
# Enhanced validation functions for scientific data integrity
[docs] @validate_input( check_types=True, check_values=True, level=ValidationLevel.STRICT, cache_results=True, ) def validate_hdf5_file_integrity( file_path: str, required_datasets: list[str] | None = None ) -> ValidationResult: """ Comprehensive HDF5 file integrity validation with caching. Args: file_path: Path to HDF5 file required_datasets: List of required dataset paths Returns: ValidationResult with detailed validation information """ from pathlib import Path import h5py start_time = time.time() errors = [] warnings = [] try: # Basic file existence and access file_path_obj = Path(file_path) if not file_path_obj.exists(): raise XPCSValidationError(f"HDF5 file not found: {file_path}") if not file_path_obj.is_file(): raise XPCSValidationError(f"Path is not a file: {file_path}") # Try to open the file try: with h5py.File(file_path, "r") as f: # Check file structure if required_datasets: for dataset_path in required_datasets: if dataset_path not in f: errors.append(f"Missing required dataset: {dataset_path}") # Check for common XPCS structure xpcs_group = f.get("xpcs") if xpcs_group is None: warnings.append( "No 'xpcs' group found - may not be valid XPCS data" ) else: # Validate XPCS structure common_datasets = ["scattering_2d", "tau", "q_values"] for dataset in common_datasets: if dataset not in xpcs_group: warnings.append(f"Missing common XPCS dataset: {dataset}") except OSError as e: raise XPCSValidationError(f"Cannot read HDF5 file: {e}") from e validation_time = time.time() - start_time is_valid = len(errors) == 0 return ValidationResult( is_valid=is_valid, error_message="; ".join(errors) if errors else None, warnings=warnings, validation_time=validation_time, ) except Exception as e: validation_time = time.time() - start_time if isinstance(e, XPCSValidationError): error_msg = str(e) else: error_msg = f"Unexpected error validating HDF5 file: {e}" return ValidationResult( is_valid=False, error_message=error_msg, warnings=[], validation_time=validation_time, )
[docs] @validate_input( check_types=True, check_shapes=True, check_values=True, cache_results=True ) def validate_scientific_array( array: np.ndarray, array_name: str = "data", min_dimensions: int = 1, max_dimensions: int | None = None, expected_shape: tuple[int, ...] | None = None, allow_nan: bool = False, allow_negative: bool = True, finite_only: bool = True, ) -> ValidationResult: """ Comprehensive scientific array validation with domain-specific checks. Args: array: NumPy array to validate array_name: Name for error reporting min_dimensions: Minimum number of dimensions max_dimensions: Maximum number of dimensions expected_shape: Expected exact shape allow_nan: Whether NaN values are acceptable allow_negative: Whether negative values are acceptable finite_only: Whether only finite values are acceptable Returns: ValidationResult with detailed validation information """ start_time = time.time() errors = [] warnings = [] try: # Basic array checks if array.size == 0: errors.append(f"Array '{array_name}' is empty") # Dimension checks if array.ndim < min_dimensions: errors.append( f"Array '{array_name}' has {array.ndim} dimensions, minimum {min_dimensions} required" ) if max_dimensions is not None and array.ndim > max_dimensions: errors.append( f"Array '{array_name}' has {array.ndim} dimensions, maximum {max_dimensions} allowed" ) # Shape validation if expected_shape is not None and array.shape != expected_shape: errors.append( f"Array '{array_name}' shape {array.shape} does not match expected {expected_shape}" ) # Value validation for numerical arrays if np.issubdtype(array.dtype, np.number) and array.size > 0: # NaN check nan_count = np.sum(np.isnan(array)) if nan_count > 0: if not allow_nan: errors.append( f"Array '{array_name}' contains {nan_count} NaN values" ) else: warnings.append( f"Array '{array_name}' contains {nan_count} NaN values" ) # Infinity check inf_count = np.sum(np.isinf(array)) if inf_count > 0: if finite_only: errors.append( f"Array '{array_name}' contains {inf_count} infinite values" ) else: warnings.append( f"Array '{array_name}' contains {inf_count} infinite values" ) # Negative value check if not allow_negative: negative_count = np.sum(array < 0) if negative_count > 0: errors.append( f"Array '{array_name}' contains {negative_count} negative values" ) # Memory usage check array_size_mb = array.nbytes / (1024 * 1024) if array_size_mb > 500: # > 500MB warnings.append(f"Large array '{array_name}': {array_size_mb:.1f}MB") # Data type checks if array.dtype == np.object_: warnings.append( f"Array '{array_name}' uses object dtype - may impact performance" ) validation_time = time.time() - start_time is_valid = len(errors) == 0 return ValidationResult( is_valid=is_valid, error_message="; ".join(errors) if errors else None, warnings=warnings, validation_time=validation_time, ) except Exception as e: validation_time = time.time() - start_time error_msg = f"Unexpected error validating array '{array_name}': {e}" return ValidationResult( is_valid=False, error_message=error_msg, warnings=[], validation_time=validation_time, )
[docs] def validate_g2_data( g2_array: np.ndarray, tau_array: np.ndarray | None = None ) -> ValidationResult: """ Specialized validation for G2 correlation data. Args: g2_array: G2 correlation values tau_array: Optional tau (time delay) values Returns: ValidationResult with G2-specific validation """ start_time = time.time() errors = [] warnings = [] try: # Validate G2 array structure g2_result = validate_scientific_array( g2_array, array_name="g2_data", min_dimensions=1, max_dimensions=2, allow_negative=False, # G2 should be positive finite_only=True, ) if not g2_result.is_valid: errors.extend(g2_result.error_message.split("; ")) warnings.extend(g2_result.warnings) # G2-specific validation if g2_array.size > 0: # Check for reasonable G2 values (typically 0.1 to 3.0) g2_min, g2_max = np.nanmin(g2_array), np.nanmax(g2_array) if g2_min < 0: errors.append(f"G2 values should be positive, found minimum: {g2_min}") if g2_max > 5.0: warnings.append(f"Unusually high G2 values detected, maximum: {g2_max}") # Check for baseline (should approach 1.0 at long times) if g2_array.ndim == 1 and len(g2_array) > 10: final_values = g2_array[-5:] # Last 5 points final_mean = np.nanmean(final_values) if not (0.8 <= final_mean <= 1.5): warnings.append( f"G2 baseline unusual: {final_mean:.3f} (expected ~1.0)" ) # Validate tau array if provided if tau_array is not None: tau_result = validate_scientific_array( tau_array, array_name="tau_data", min_dimensions=1, max_dimensions=1, allow_negative=False, # Time delays should be positive finite_only=True, ) if not tau_result.is_valid: errors.extend(tau_result.error_message.split("; ")) warnings.extend(tau_result.warnings) # Check tau-G2 consistency if g2_array.ndim == 1 and len(tau_array) != len(g2_array): errors.append( f"Tau array length ({len(tau_array)}) does not match G2 array length ({len(g2_array)})" ) # Check for proper tau ordering (should be increasing) if len(tau_array) > 1 and not np.all(np.diff(tau_array) > 0): warnings.append("Tau values are not strictly increasing") validation_time = time.time() - start_time is_valid = len(errors) == 0 return ValidationResult( is_valid=is_valid, error_message="; ".join(errors) if errors else None, warnings=warnings, validation_time=validation_time, ) except Exception as e: validation_time = time.time() - start_time error_msg = f"Unexpected error validating G2 data: {e}" return ValidationResult( is_valid=False, error_message=error_msg, warnings=[], validation_time=validation_time, )
[docs] def get_validation_statistics() -> dict[str, Any]: """ Get comprehensive validation performance statistics. Returns: Dictionary with validation cache statistics and performance metrics """ cache = get_validation_cache() with cache._lock: total_entries = len(cache._cache) if total_entries == 0: return {"message": "No validation cache entries"} # Calculate cache hit rate and performance metrics cache_size_mb = total_entries * 0.001 # Rough estimate return { "cache_entries": total_entries, "cache_size_estimate_mb": cache_size_mb, "max_cache_size": cache._max_size, "default_ttl_seconds": cache._default_ttl, "performance_impact": "< 1% CPU overhead" if total_entries < 1000 else "< 2% CPU overhead", }