Source code for ESRF_ID10_SURF.XRR.XRR

"""
X-ray Reflectivity (XRR) processing library.

This library is used for the processing of Surface X-ray Scattering data
obtained on the ID10 beamline at ESRF.
"""

import copy
import logging
import os
import time
from math import pi
from typing import Optional, Tuple, Union, List, Dict, Any

import h5py
import orsopy.fileio as orso
from datetime import datetime
import matplotlib.patches as patches
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as stats

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger(__name__)


def rebin(q_vectors: np.ndarray,
          Refl: np.ndarray,
          Relf_E: np.ndarray,
          new_q: Optional[np.ndarray] = None,
          rebin_as: str = "linear",
          number_of_bins: int = 5000) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Rebin the data on a linear or logarithmic q-scale.

    This rebinning procedure is based on one from islatu by Andrew R. McCluskey:
    https://github.com/DiamondLightSource/islatu

    Args:
        q_vectors (np.ndarray): The current q vectors.
        Refl (np.ndarray): The current reflected intensities.
        Relf_E (np.ndarray): The current reflected intensity errors.
        new_q (Optional[np.ndarray]): Array of potential q-values. Defaults to None.
            If this argument is not specified, then the new q, R values are binned
            according to rebin_as and number_of_bins.
        rebin_as (str): String specifying how the data should be rebinned.
            Options are "linear" and "log". This is only used if new_q is unspecified.
            Defaults to "linear".
        number_of_bins (int, optional): The max number of q-vectors to be using
            initially in the rebinning of the data. Defaults to 5000.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: Containing:
            - cleaned_q: rebinned q-values.
            - cleaned_R: rebinned intensities.
            - cleaned_R_e: rebinned intensity errors.
    """
    # Unpack the arguments.
    q = q_vectors
    R = Refl
    R_e = Relf_E

    # Required so that logspace/linspace encapsulates the whole data.
    epsilon = 0.001

    if new_q is None:
        # Our new q vectors have not been specified, so we should generate some.
        if rebin_as == "log":
            new_q = np.logspace(
                np.log10(q[0]),
                np.log10(q[-1] + epsilon), number_of_bins)
        elif rebin_as == "linear":
            new_q = np.linspace(q.min(), q.max() + epsilon,
                                number_of_bins)

    binned_q = np.zeros_like(new_q)
    binned_R = np.zeros_like(new_q)
    binned_R_e = np.zeros_like(new_q)

    # Use vectorized operations instead of nested loops
    weights = 1.0 / (R_e ** 2)
    weighted_R = R * weights
    weighted_q = q * weights

    # np.digitize returns indices such that bins[i-1] <= x < bins[i]
    # new_q are the bin edges.
    # We want q in [new_q[i], new_q[i+1]) to fall into bin i.
    # digitize returns i+1 for this range.
    bin_indices = np.digitize(q, new_q) - 1

    # Filter out indices that are out of bounds
    valid_mask = (bin_indices >= 0) & (bin_indices < len(new_q) - 1)

    valid_indices = bin_indices[valid_mask]
    valid_weights = weights[valid_mask]
    valid_weighted_R = weighted_R[valid_mask]
    valid_weighted_q = weighted_q[valid_mask]

    # Use bincount to sum up weights and weighted values for each bin
    sum_weights = np.bincount(valid_indices, weights=valid_weights, minlength=len(new_q))
    sum_wR = np.bincount(valid_indices, weights=valid_weighted_R, minlength=len(new_q))
    sum_wq = np.bincount(valid_indices, weights=valid_weighted_q, minlength=len(new_q))

    # Avoid division by zero
    nonzero_mask = sum_weights > 0

    # Calculate weighted averages
    binned_R[nonzero_mask] = sum_wR[nonzero_mask] / sum_weights[nonzero_mask]
    binned_q[nonzero_mask] = sum_wq[nonzero_mask] / sum_weights[nonzero_mask]
    binned_R_e[nonzero_mask] = np.sqrt(1.0 / sum_weights[nonzero_mask])

    # Get rid of any empty, unused elements of the array.
    cleaned_q = np.delete(binned_q, np.argwhere(binned_R == 0))
    cleaned_R = np.delete(binned_R, np.argwhere(binned_R == 0))
    cleaned_R_e = np.delete(binned_R_e, np.argwhere(binned_R == 0))

    return cleaned_q, cleaned_R, cleaned_R_e


[docs] class XRR: """ Class for the processing of Surface X-ray Scattering data. This class handles loading h5 files, region-of-interest based integration of 2D detector data, corrections (footprint, transmission), and plotting. Attributes: file (str): Path to the HDF5 file containing the data. scans (np.ndarray): Array of scan numbers to process. alpha_i_name (str): Name of the incident angle motor/counter. detector_name (str): Name of the detector data entry. monitor_name (str): Name of the monitor counter. transmission_name (str): Name of the transmission counter. att_name (str): Name of the attenuator positioner. energy_name (str): Name of the energy positioner. cnttime_name (str): Name of the counting time counter. PX0 (int): Direct beam X pixel coordinate on detector. PY0 (int): Direct beam Y pixel coordinate on detector. dPX (int): Half-width of the ROI in X direction. dPY (int): Half-width of the ROI in Y direction. pixel_size_qxz (float): Pixel size in reciprocal space (x/z). pixel_size_qy (float): Pixel size in reciprocal space (y). I0 (float): Incident intensity normalization factor. footprint_correction_applied (bool): Flag if footprint correction was applied. corrected_doubles (bool): Flag if double points were corrected. replaced_transmission (bool): Flag if transmission was manually replaced. is_rebinned (bool): Flag if data has been rebinned. qz (Optional[np.ndarray]): Calculated qz vector. reflectivity (Optional[np.ndarray]): Calculated reflectivity. reflectivity_error (Optional[np.ndarray]): Calculated reflectivity error. Smap2D (List): List of 2D maps. Smap2D_e (List): List of 2D map errors. Qx_map (Optional[np.ndarray]): Qx map for 2D plotting. Qz_map (Optional[np.ndarray]): Qz map for 2D plotting. sample_name (str): Name of the sample. """ def __init__(self, file: str, scans: Union[List[int], np.ndarray], alpha_i_name: str = 'chi', detector_name: str = 'mpx_cdte_22_eh1', monitor_name: str = 'mon', transmission_name: str = 'autof_eh1_transm', att_name: str = 'autof_eh1_curratt', cnttime_name: str = 'sec', PX0: int = 404, PY0: int = 165, dPX: int = 5, dPY: int = 5, bckg_gap = 1, pixel_size_qxz: float = 0.055, pixel_size_qy: float = 0.055, energy_name: str = 'monoe', I0: float = 1e13, saving_dir = None): """ Initialize the XRR processing class. Args: file (str): Path to the HDF5 file. scans (Union[List[int], np.ndarray]): List of scan numbers. alpha_i_name (str, optional): Motor name for incident angle. Defaults to 'chi'. detector_name (str, optional): Detector dataset name. Defaults to 'mpx_cdte_22_eh1'. monitor_name (str, optional): Monitor counter name. Defaults to 'mon'. transmission_name (str, optional): Transmission counter name. Defaults to 'autof_eh1_transm'. att_name (str, optional): Attenuator name. Defaults to 'autof_eh1_curratt'. cnttime_name (str, optional): Count time name. Defaults to 'sec'. PX0 (int, optional): Direct beam X pixel. Defaults to 404. PY0 (int, optional): Direct beam Y pixel. Defaults to 165. dPX (int, optional): ROI half-width X. Defaults to 5. dPY (int, optional): ROI half-width Y. Defaults to 5. bckg_gap (int, optional): Gap between signal and backgroung ROIs in pixels. Defaults to 1. pixel_size_qxz (float, optional): Pixel size factor. Defaults to 0.055. pixel_size_qy (float, optional): Pixel size factor. Defaults to 0.055. energy_name (str, optional): Energy motor name. Defaults to 'monoe'. I0 (float, optional): Intensity normalization. Defaults to 1e13. """ self.file = file self.scans = np.array(scans) self.alpha_i_name = alpha_i_name self.detector_name = detector_name self.monitor_name = monitor_name self.transmission_name = transmission_name self.att_name = att_name self.energy_name = energy_name self.cnttime_name = cnttime_name self.footprint_correction_applied = False self.corrected_doubles = False self.replaced_transmission = False self.is_rebinned = False self.zgH_scan = False self.PX0 = PX0 self.PY0 = PY0 self.dPX = dPX self.dPY = dPY self.bckg_gap = bckg_gap self.I0 = I0 self.pixel_size_qy = pixel_size_qy self.pixel_size_qxz = pixel_size_qxz self.qz = None self.reflectivity = None self.reflectivity_error = None self.Qx_map = None self.Qz_map = None self.Smap2D = [] self.Smap2D_e = [] self.sample_name = "" self.Pi = 100 self.saving_dir = saving_dir # Initialize data attributes to avoid attribute error if accessed before loading self.data = None self.alpha_i = None self.monitor = None self.transmission = None self.attenuator = None self.cnttime = None self.energy = None self.bckg = None self.raw_counts = None self.__load_data__() self.__process_2D_data__() self._check_saving_dir() def __load_single_scan__(self, scan_n: str) -> Dict[str, Any]: """ Load data for a single scan. Args: scan_n (str): Scan number as a string. Returns: Dict[str, Any]: Dictionary containing loaded data components. """ #logger.info('Loading scan #%s', scan_n) with h5py.File(self.file, "r") as f: base_path = f"{scan_n}.1" meas_path = f"{base_path}/measurement/" data_dict = { 'data': np.array(f.get(f"{meas_path}{self.detector_name}")), 'alpha_i': np.array(f.get(f"{meas_path}{self.alpha_i_name}")), 'monitor': np.array(f.get(f"{meas_path}{self.monitor_name}")), 'transmission': np.array(f.get(f"{meas_path}{self.transmission_name}")), 'attenuator': np.array(f.get(f"{meas_path}{self.att_name}")), 'cnttime': np.array(f.get(f"{meas_path}{self.cnttime_name}")), 'energy': np.array(f.get(f"{base_path}/instrument/positioners/{self.energy_name}")), 'sample_name': str(f.get(f"{base_path}/sample/name/")[()])[2:-1:1], 'Pi': np.mean(f.get(f"{meas_path}{'fb_Pi'}")), } logger.info('Loaded scan #%s', scan_n) return data_dict def __load_data__(self, skip_points: int = 1): """ Load data from all scans and concatenate them. Args: skip_points (int, optional): Number of initial points to skip for subsequent scans. Defaults to 1. """ t0 = time.time() #logger.info("Start loading data.") first_scan_n = str(self.scans[0]) first_scan_data = self.__load_single_scan__(first_scan_n) self.data = first_scan_data['data'] self.alpha_i = first_scan_data['alpha_i'] self.monitor = first_scan_data['monitor'] self.transmission = first_scan_data['transmission'] self.attenuator = first_scan_data['attenuator'] self.cnttime = first_scan_data['cnttime'] self.energy = first_scan_data['energy'] self.sample_name = first_scan_data['sample_name'] self.Pi = first_scan_data['Pi'] if len(self.scans) > 1: for scan_num in self.scans[1:]: scan_n = str(scan_num) scan_data = self.__load_single_scan__(scan_n) # Append with skipping points self.data = np.append(self.data, scan_data['data'][skip_points:], axis=0) self.alpha_i = np.append(self.alpha_i, scan_data['alpha_i'][skip_points:]) self.monitor = np.append(self.monitor, scan_data['monitor'][skip_points:]) self.transmission = np.append(self.transmission, scan_data['transmission'][skip_points:]) self.attenuator = np.append(self.attenuator, scan_data['attenuator'][skip_points:]) self.cnttime = np.append(self.cnttime, scan_data['cnttime'][skip_points:]) self.energy = np.append(self.energy, scan_data['energy']) #logger.info("Loading completed. Reading time %3.3f sec", time.time() - t0) def __process_2D_data__(self): """ Process the 2D detector data to calculate reflectivity. Performs ROI integration for signal and background, calculates qz, and normalizes by monitor and transmission. """ t0 = time.time() #logger.info('Starting 2D data processing.') if self.detector_name == 'eiger4m': self.data = self.data.transpose(0,2,1) else: pass nic, nxc, nyc = np.shape(self.data) # Handle legacy data where energy might be an array try: if np.size(self.energy) > 1: self.energy = np.mean(self.energy) except Exception: pass # Pre-calculate slice indices for performance and readability roi_y_slice = slice(self.PY0 - self.dPY, self.PY0 + self.dPY) roi_x_slice = slice(self.PX0 - self.dPX, self.PX0 + self.dPX) bkg_low_y_slice = slice(self.PY0 + 2 * self.dPY + self.bckg_gap - self.dPY, self.PY0 + 2 * self.dPY + self.bckg_gap + self.dPY) bkg_high_y_slice = slice(self.PY0 - 2 * self.dPY - self.bckg_gap - self.dPY, self.PY0 - 2 * self.dPY - self.bckg_gap + self.dPY) # Vectorized Signal and Background calculation # Summing over Y (axis 1) and X (axis 2) for each image # self.data has shape (nic, nxc, nyc) where nxc is Y and nyc is X based on logic IqxyBL_all = np.sum(self.data[:, bkg_low_y_slice, roi_x_slice], axis=(1, 2)) IqxyBH_all = np.sum(self.data[:, bkg_high_y_slice, roi_x_slice], axis=(1, 2)) IqxyS_all = np.sum(self.data[:, roi_y_slice, roi_x_slice], axis=(1, 2)) Ib_cut = (IqxyBL_all + IqxyBH_all) / 2 Is_cut = IqxyS_all Is_cut_err = np.sqrt(Is_cut) Ib_cut_err = np.sqrt(Ib_cut) # Vectorized Error calculation with np.errstate(divide='ignore', invalid='ignore'): numerator = np.sqrt(Is_cut_err**2 + Ib_cut_err**2) denominator = Is_cut - Ib_cut I_err = numerator / denominator # Identify invalid entries (inf or nan) mask_invalid = ~np.isfinite(I_err) # Calculate fallback: Is_cut_err / Is_cut # Use np.divide to handle div by zero in fallback gracefully fallback = np.divide(Is_cut_err, Is_cut, where=(Is_cut > 0)) # Apply fallback I_err[mask_invalid] = fallback[mask_invalid] # Smap2D processing limit_map = min(nic, len(self.alpha_i)) if limit_map > 0: # Slicing data up to limit_map data_slice = self.data[:limit_map] # Sum along axis 1 (Y axis) to get profile along X Qzcut_all = np.sum(data_slice[:, roi_y_slice, :], axis=1) Qzcut_bckg1_all = np.sum(data_slice[:, bkg_low_y_slice, :], axis=1) Qzcut_bckg2_all = np.sum(data_slice[:, bkg_high_y_slice, :], axis=1) norm_factor = self.transmission[:limit_map] * self.monitor[:limit_map] / self.monitor[0] # Reshape for broadcasting: (N, 1) norm_factor = norm_factor[:, np.newaxis] self.Smap2D = (Qzcut_all - (Qzcut_bckg1_all + Qzcut_bckg2_all) / 2) / norm_factor self.Smap2D_e = ((np.sqrt(np.abs(Qzcut_all)) + np.sqrt((Qzcut_bckg1_all + Qzcut_bckg2_all) / 2)) / norm_factor) else: self.Smap2D = [] self.Smap2D_e = [] logger.info('Number of points in the scan %6d', len(self.alpha_i)) limit_len = len(self.alpha_i) I_Signal_cut = np.nan_to_num(Is_cut[:limit_len], nan=1e-11) I_Backgr_cut = np.nan_to_num(Ib_cut[:limit_len], nan=1e-11) I_error = np.nan_to_num(I_err[:limit_len], nan=1e-11) self.qz = 4 * pi * np.sin(np.deg2rad(self.alpha_i)) / (12.398 / self.energy) # Reflectivity calculation norm_factor = self.transmission * self.monitor / self.monitor[0] self.reflectivity = (I_Signal_cut - I_Backgr_cut) / norm_factor / self.I0 self.reflectivity_error = np.abs(I_error * self.reflectivity) self.bckg = I_Backgr_cut / norm_factor self.raw_counts = I_Signal_cut # Clip values self.reflectivity[self.reflectivity <= 1e-12] = 1e-12 self.reflectivity_error[self.reflectivity_error <= 1e-13] = 1e-13 self.Smap2D = np.array(self.Smap2D) self.Smap2D_e = np.array(self.Smap2D_e) #logger.info("Processing completed. Processing time %3.3f sec", time.time() - t0)
[docs] def footprint_correction(self, sample_size: float = 1.0, beam_size: float = 9.6, correct_dir_beam: bool = True): """ Apply footprint correction to the reflectivity data. Args: sample_size (float, optional): Sample size in cm. Defaults to 1. beam_size (float, optional): Beam size in microns. Defaults to 9.6. correct_dir_beam (bool, optional): Whether to correct direct beam region. Defaults to False. """ if not self.footprint_correction_applied: Si_critical_angle = 4 * np.pi * np.sin(np.deg2rad(8.103E-02)) / (12.398 / self.energy) samplesize_microns = sample_size * 10000 # converting cm to microns # Avoid division by zero by using a small epsilon or handling alpha_i == 0 alpha_rad = np.deg2rad(self.alpha_i) effective_alpha_rad = np.where(self.alpha_i != 0, alpha_rad, np.deg2rad(1e-3)) footprint = 0.5 * beam_size / np.sin(effective_alpha_rad) beam_fraction = (stats.norm.cdf(samplesize_microns / 2, 0, footprint) - stats.norm.cdf(-samplesize_microns / 2, 0, footprint)) Icor = self.reflectivity / beam_fraction if correct_dir_beam: i = 0 while i < len(self.qz) and self.qz[i] < Si_critical_angle: if Icor[i] > 1.15: Icor[i] = 1 i += 1 self.reflectivity = Icor self.footprint_correction_applied = True logger.info('Footprint correction completed with beam size = %s microns and sample size = %s cm', beam_size, sample_size) else: logger.warning('Footprint correction already applied! To apply again use reprocess() method.')
[docs] def reprocess(self): """ Reload and reprocess the data, resetting corrections. """ self.__load_data__() self.__process_2D_data__() self.footprint_correction_applied = False self.corrected_doubles = False self.replaced_transmission = False self.is_rebinned = False logger.info("Reloaded and reprocessed data.")
[docs] def apply_auto_corrections(self, sample_size:float, beam_size:float, z_scan:'XRR'): self.correct_doubles() self.assert_i0(z_scan) self.reprocess() self.correct_doubles() self.footprint_correction(sample_size, beam_size, correct_dir_beam=True) logger.info("Reflectivity is fully corrected.")
[docs] def produce_Qmap(self, SDD: float = 900): """ Calculate Q-space map. Args: SDD (float, optional): Sample-to-Detector Distance in mm. Defaults to 900. """ t0 = time.time() logger.info('Starting q-space mapping.') chi_r = np.deg2rad(self.alpha_i) nic, nxc, nyc = np.shape(self.data) pixels = np.array(range(nyc)) k0 = 2 * pi / (12.398 / self.energy) # Vectorized calculation chi_r_col = chi_r[:, np.newaxis] pixels_row = pixels[np.newaxis, :] term_px = (self.PX0 - pixels_row) * self.pixel_size_qxz / SDD angle_term = chi_r_col + term_px self.Qz_map = np.round(k0 * (np.sin(angle_term) + np.sin(chi_r_col)), 10) self.Qx_map = np.round(k0 * (np.cos(angle_term) - np.cos(chi_r_col)), 10) logger.info("2D map calculated. Processing time %3.3f sec", time.time() - t0)
[docs] def plot_Qmap(self, save: bool = False, fig: Optional[plt.Figure] = None, axes: Optional[plt.Axes] = None) -> Tuple[plt.Figure, plt.Axes]: """ Plot the Q-space map. Args: save (bool, optional): Whether to save the plot. Defaults to False. fig (Optional[plt.Figure], optional): Matplotlib figure object. Defaults to None. axes (Optional[plt.Axes], optional): Matplotlib axes object. Defaults to None. Returns: Tuple[plt.Figure, plt.Axes]: The figure and axes objects. """ if axes is None: fig, (ax0) = plt.subplots(nrows=1, ncols=1, figsize=(6, 6), layout='tight') else: ax0 = axes if fig is None: fig = ax0.get_figure() # Check if Qx_map and Qz_map are computed if self.Qx_map is None or self.Qz_map is None: self.produce_Qmap() ax0.pcolormesh(self.Qx_map, self.Qz_map, np.log10(self.Smap2D), cmap='viridis', vmin=2, vmax=10, shading='gouraud', snap=True) ax0.set_xlabel(r'$q_x, \AA^{-1}$') ax0.set_ylabel(r'$q_z, \AA^{-1}$') ax0.set_ylim(0, 0.5) ax0.set_xlim(-4e-4, 2e-4) ax0.ticklabel_format(axis='x', style='sci', scilimits=(0, 0)) if save: logger.info('Saving Q-space map.') plt.savefig('Qmap_{}_scan_{}.png'.format(self.sample_name, self.scans), dpi=300) return fig, ax0
[docs] def save_Qmap(self, log_scale=False): """ Save the Q-space map to a text file. Format is 3 columns: Q_x, Q_z, Intensity Args: log_scale (bool, optional): Whether to save the logarithm of intensity to the file. Defaults to False. """ if self.Qx_map is None or self.Qz_map is None: self.produce_Qmap() self._ensure_sample_dir() map_qx = np.ravel(self.Qx_map) map_qz = np.ravel(self.Qz_map) map_int = np.ravel(self.Smap2D) if log_scale: map_int = np.log10(map_int) out = np.array([map_qx, map_qz, map_int]).T if self.Pi<80: filename = self.saving_dir + '/{}_2DMap_scan_{}_Pi_{:.0f}.dat'.format( self.sample_name, self.scans, self.Pi) else: filename = self.saving_dir + '/{}_XRR_scan_{}.dat'.format( self.sample_name, self.scans) np.savetxt(filename, out) logger.info('Q-space map in text saved to: %s', filename)
[docs] def get_reflectivity(self) -> np.ndarray: """ Get the reflectivity data sorted by qz. Returns: np.ndarray: Array containing [qz, reflectivity, reflectivity_error]. """ # Ensure indices sort by qz sort_indices = self.qz.argsort() qz_tr = np.sort(self.qz) # or self.qz[sort_indices] R_tr = self.reflectivity[sort_indices] Rerr_tr = self.reflectivity_error[sort_indices] return np.array([qz_tr, R_tr, Rerr_tr])
[docs] def plot_reflectivity(self, save: bool = False, ax: Optional[plt.Axes] = None, **kwargs) -> Tuple[plt.Figure, plt.Axes]: """ Plot the reflectivity curve. Args: save (bool, optional): Whether to save the plot. Defaults to False. ax (Optional[plt.Axes], optional): Matplotlib axes object. Defaults to None. Returns: Tuple[plt.Figure, plt.Axes]: The figure and axes objects. """ if ax is None: fig = plt.figure(figsize=(6, 6), layout='tight') ax = plt.gca() else: fig = ax.get_figure() ax.errorbar(*self.get_reflectivity(), **kwargs) ax.semilogy() ax.set_xlim(left=0) ax.set_ylim(top=2) ax.set_yticks([1e-10, 1e-8, 1e-6, 1e-4, 1e-2, 1]) ax.set_ylim(5e-11, 2e0) ax.set_xlabel(r'$q_z, \AA^{-1}$') ax.set_ylabel(r'$\mathrm{Reflectivity}$') if save: self._save_figure(plt.gcf(),'log') return fig, ax
[docs] def plot_reflectivity_qz4(self, save: bool = False, ax: Optional[plt.Axes] = None, **kwargs) -> Tuple[plt.Figure, plt.Axes]: """ Plot reflectivity multiplied by qz^4 (Porod plot). Args: save (bool, optional): Whether to save the plot. Defaults to False. ax (Optional[plt.Axes], optional): Matplotlib axes object. Defaults to None. Returns: Tuple[plt.Figure, plt.Axes]: The figure and axes objects. """ if ax is None: fig = plt.figure(figsize=(6, 6), layout='tight') ax = plt.gca() else: fig = ax.get_figure() ax.errorbar(self.qz, self.reflectivity * self.qz ** 4, self.reflectivity_error * self.qz ** 4, **kwargs) ax.semilogy() ax.set_xlim(left=0) ax.set_ylim(bottom=1e-11) ax.set_xlabel(r'$q_z, \AA^{-1}$') ax.set_ylabel(r'$\mathrm{Reflectivity}\cdot q_z^4$') if save: self._save_figure(plt.gcf(),'qz4') return fig, ax
[docs] def save_reflectivity(self, format: str = 'dat', owner: str = 'ESRF', creator: str = 'opid10', affiliation: str='Harvard', zgh_scans: Optional[List[int]] = None): """ Save the reflectivity data to a text file. Args: format (str, optional): Format of the saved file. Options are 'dat' and 'orso'. Defaults to 'dat'. owner (str, optional): Owner of the data. Defaults to 'ESRF'. creator (str, optional): Creator of the reduced file. Defaults to 'opid10'. zgh_scans (Optional[List[int]], optional): List of zgH scan numbers. Defaults to None. """ self._ensure_sample_dir() if format == 'orso': self._save_orso(owner, creator, affiliation) return out = self.get_reflectivity().T if self.Pi<80: filename = self.saving_dir + '/{}_XRR_scan_{}_Pi_{:.0f}.dat'.format( self.sample_name, self.scans, self.Pi) else: filename = self.saving_dir + '/{}_XRR_scan_{}.dat'.format( self.sample_name, self.scans) np.savetxt(filename, out) logger.info('Reflectivity saved to: %s', filename)
def _save_orso(self, owner: str, creator: str, affiliation: str): """ Save the reflectivity data in ORSO format. owner --- PI creator --- person, who processed the data affiliation --- shared between the two """ # 1. Define Columns columns = [ orso.Column(name='Qz', unit='1/angstrom', physical_quantity='momentum transfer'), orso.Column(name='R', unit=None, physical_quantity='reflectivity'), orso.ErrorColumn(error_of='R', error_type='uncertainty', value_is='sigma'), orso.ErrorColumn(error_of='Qz', error_type='resolution', value_is='sigma') ] # 2. DataSource # Try to get date, else default try: # Try to read start_time from the first scan in the file with h5py.File(self.file, "r") as f: # Assuming standard ESRF structure where start_time might be in the scan group # Need to find where it is located. Often in 'scan_number.1/start_time' scan_n = str(self.scans[0]) base_path = f"{scan_n}.1" start_time_str = f[base_path].attrs.get('start_time') or f[f"{base_path}/start_time"][()].decode('utf-8') # Parse date string, e.g., '2023-10-27T10:00:00' or similar # ESRF format can vary, often it is isoformat-like try: start_date = datetime.fromisoformat(str(start_time_str)) except ValueError: # Fallback for other formats if needed, or just use now start_date = datetime.now() except Exception: start_date = datetime.now() owner_person = orso.Person(name=owner, affiliation=affiliation) experiment = orso.Experiment( title='XRR', instrument='ID10-SURF', start_date=start_date, probe='x-ray', facility='ESRF', ) sample = orso.Sample(name=self.sample_name) # Measurement data_files = [self.file] comment = f"Scans: {self.scans}, Pi: {self.Pi}" if self.zgH_scan: comment += f", zgH Scans: {self.zgH_scan}" measurement = orso.Measurement( instrument_settings=orso.InstrumentSettings( incident_angle=orso.ValueRange(min=np.min(self.alpha_i), max=np.max(self.alpha_i), unit='deg'), wavelength=orso.Value(magnitude=12.398/self.energy, unit='angstrom'), ), data_files=data_files, comment=comment ) data_source = orso.DataSource( owner=owner_person, experiment=experiment, sample=sample, measurement=measurement ) # 3. Reduction reduction = orso.Reduction( software=orso.Software(name='ESRF_ID10_SURF', version='0.1'), creator=orso.Person(name=creator, affiliation=affiliation), timestamp=datetime.now() ) # 4. Data qz, R, dR = self.get_reflectivity() # Create resolution array dqz = np.full_like(qz, 1e-5) data = np.column_stack((qz, R, dR, dqz)) # 5. Create Dataset and Save orso_info = orso.Orso( data_source=data_source, reduction=reduction, columns=columns, data_set=self.sample_name ) dataset = orso.OrsoDataset(info=orso_info, data=data) # Filename logic if self.Pi < 80: filename = os.path.join(self.saving_dir, '{}_XRR_scan_{}_Pi_{:.0f}.ort'.format( self.sample_name, self.scans, self.Pi)) else: filename = os.path.join(self.saving_dir, '{}_XRR_scan_{}.ort'.format( self.sample_name, self.scans)) orso.save_orso([dataset], filename) logger.info('Reflectivity saved to: %s', filename)
[docs] def show_detector_image(self, frame_number: int = 50, ax: Optional[plt.Axes] = None, plot_cross: bool = True): """ Show a single frame of the detector image with ROI and background regions. Args: frame_number (int, optional): Frame number to display. Defaults to 50. ax (Optional[plt.Axes], optional): Matplotlib axes object. Defaults to None. plot_cross (bool, optional): Whether to plot crosshairs at beam center. Defaults to True. """ if ax is None: fig = plt.figure(figsize=(6, 6), layout='tight') ax = plt.gca() ax.imshow(np.log10(self.data[frame_number] + 1e-3)) ax.set_ylim(self.PY0 - 10 * self.dPY, self.PY0 + 10 * self.dPY) ax.set_xlim(self.PX0 - 10 * self.dPY, self.PX0 + 10 * self.dPY) signal = patches.Rectangle((self.PX0 - self.dPX, self.PY0 - self.dPY), 2 * self.dPX, 2 * self.dPY, linewidth=1, edgecolor='r', facecolor='none', label='Signal') b1 = patches.Rectangle((self.PX0 - self.dPX, self.PY0 + 2 * self.dPY + self.bckg_gap - self.dPY), 2 * self.dPX, 2 * self.dPY, linewidth=1, edgecolor='cyan', facecolor='none', label='Background') b2 = patches.Rectangle((self.PX0 - self.dPX, self.PY0 - 2 * self.dPY - self.bckg_gap - self.dPY), 2 * self.dPX, 2 * self.dPY, linewidth=1, edgecolor='cyan', facecolor='none') ax.add_patch(signal) ax.add_patch(b1) ax.add_patch(b2) if plot_cross: ax.hlines(self.PY0, self.PX0 - 30, self.PX0 + 30) ax.vlines(self.PX0, self.PY0 - 30, self.PY0 + 30) ax.set_xlabel('Detector pixel, X') ax.set_ylabel('Detector pixel, Y') ax.set_title('Detector {}, frame #{}'.format(self.detector_name, frame_number)) ax.legend()
[docs] def replace_transmission(self, filter_transmission: Dict[Any, float]): """ Replace transmission values using a dictionary of filter transmissions. Args: filter_transmission (Dict[Any, float]): Mapping of attenuator position to transmission. """ _new_transmission = np.array([]) for point in self.attenuator: _new_transmission = np.append(_new_transmission, [float(filter_transmission.get(point, 1.0))]) # Calculate scaling factor # Avoid division by zero with np.errstate(divide='ignore', invalid='ignore'): scaling_factor = self.transmission / _new_transmission new_reflectivity = self.reflectivity * scaling_factor new_reflectivity_error = self.reflectivity_error * scaling_factor new_background = self.bckg * scaling_factor self.transmission = _new_transmission self.reflectivity = new_reflectivity self.reflectivity_error = new_reflectivity_error self.bckg = new_background self.replaced_transmission = True
#logger.info('Transmission corrected. Recalculation will reset it to defaults.') @staticmethod def _find_double_(x: np.ndarray) -> Dict[float, np.ndarray]: """ Find indices of repeated values in an array. Args: x (np.ndarray): Input array. Returns: Dict[float, np.ndarray]: Dictionary mapping values to their indices. """ u, c = np.unique(x, return_counts=True) values_of_interest = u[c > 1] indexes_multiple_values = {value: np.where(x == value)[0] for value in values_of_interest} return indexes_multiple_values
[docs] def calculate_corrected_transmission(self) -> Dict[Any, float]: """ Calculate corrected transmission based on overlapping points (doubles). Returns: Dict[Any, float]: Dictionary of corrected transmissions. """ _new_transmission_dict = dict(zip(self.attenuator, self.transmission)) if self.replaced_transmission: logger.warning('Transmission was changed. Consider reprocessing data.') else: double_x = XRR._find_double_(self.qz) sorted_double_x = dict(sorted(double_x.items(), reverse=True)) coeff = [] # This logic assumes the array is sorted by angle/qz in general but contains overlaps # It calculates ratio between the last point of the first segment and first point of second segment # for a given Q value. # Note: i is the key (qz value), sorted_double_x[i] are indices for i in sorted_double_x: indices = sorted_double_x[i] # Assuming indices are ordered like [idx1, idx2], where idx1 is from higher intensity (lower attenuation) usually? # The original code used [-2] and [-1]. if len(indices) >= 2: coeff.append(self.reflectivity[indices[-2]] / self.reflectivity[indices[-1]]) else: coeff.append(1.0) # Fallback coeff_dict = {i: {'coeff': k, 'indexes': sorted_double_x[i]} for i, k in zip(sorted_double_x, coeff)} _new_transm = copy.copy(self.transmission) for i in coeff_dict: # Apply correction to all points before the overlap # This assumes scan direction and that overlaps are sequential adjustments idx_limit = coeff_dict[i]['indexes'][1] _new_transm[0:idx_limit] = _new_transm[0:idx_limit] * coeff_dict[i]['coeff'] _new_transmission_dict = dict(zip(self.attenuator, _new_transm)) return _new_transmission_dict
[docs] def correct_doubles(self): """ Correct transmission using overlapping points. """ if not self.corrected_doubles: logger.info('Correcting transmission using double points.') new_transm = self.calculate_corrected_transmission() self.replace_transmission(new_transm) self.corrected_doubles = True else: logger.warning('Double points already corrected.')
[docs] def do_rebin(self, *args, **kwargs): """ Rebin the reflectivity data. Args: *args: Arguments passed to rebin function. **kwargs: Keyword arguments passed to rebin function. """ new_qz, new_R, new_R_err = rebin(*self.get_reflectivity(), **kwargs) self.qz = new_qz self.reflectivity = new_R self.reflectivity_error = new_R_err self.is_rebinned = True
[docs] @staticmethod def find_i0_from_z_scan(z: np.ndarray, I: np.ndarray) -> Tuple[float, int]: """ Find I0 (direct beam intensity) from a Z-scan. Args: z (np.ndarray): Z positions. I (np.ndarray): Intensity. Returns: Tuple[float, int]: I0 value and index of maximum intensity. """ n_max = np.argmax(I) I_cutoff = np.mean(I[:n_max]) - np.sqrt(np.mean(I[:n_max])) zscan_new = I[np.where(I >= I_cutoff)] if len(zscan_new) > 0: I0 = np.median(zscan_new) else: I0 = I[n_max] return I0, n_max
[docs] def find_i0(self, to_print: bool = True) -> Tuple[float, Any, float]: """ Find I0, attenuator and transmission at the direct beam. Args: to_print (bool, optional): Whether to print details. Defaults to True. Returns: Tuple[float, Any, float]: I0, attenuator value, transmission value. """ if to_print: logger.info('Processing scan of motor %s.', self.alpha_i_name) I0, n_max = XRR.find_i0_from_z_scan(self.alpha_i, self.raw_counts) if to_print: logger.info('I0 = %.5e, attenuator = %s, transmission = %.5e', I0, self.attenuator[n_max], self.transmission[n_max]) return I0, self.attenuator[n_max], self.transmission[n_max]
def _assert_i0_from_calc(self, calc_I0: float, atten: Any, transmission: float): """ Internal method to update I0 from calculated values. """ if self.alpha_i_name.lower() == 'zgh': logger.warning('You are trying to replace I0 in zgH scan. Load reflectivity data.') else: if atten in self.attenuator: if self.corrected_doubles: # Find indices where attenuator matches indices = np.where(self.attenuator == atten) # Use the transmission of the first matching point for adjustment # This logic seems specific to the beamline workflow current_transmission = self.transmission[indices][0] I0 = calc_I0 * transmission / current_transmission self.I0 = I0 logger.info('I0 replaced.') else: logger.warning('Attenuator %s not found in the scan of motor %s.\nCheck inputs.', atten, self.alpha_i)
[docs] def assert_i0(self, zgH_scan: 'XRR'): """ Assert I0 from a zgH scan (direct beam scan). Args: zgH_scan (XRR): Another XRR object representing the direct beam scan. """ if self.alpha_i_name.lower() == 'zgh': logger.warning('You are trying to replace I0 in zgH scan. Load reflectivity data.') else: calc_I0, atten, transmission = zgH_scan.find_i0(to_print=False) calc_I0 = calc_I0 * zgH_scan.monitor[0] / self.monitor[0] if atten in self.attenuator: if self.corrected_doubles: indices = np.where(self.attenuator == atten) if len(indices[0]) > 0: current_trans = self.transmission[indices[0][0]] I0 = calc_I0 / current_trans logger.info('Flux set to {:.4e}'.format(I0)) self.I0 = I0 self.zgH_scan = zgH_scan.scans logger.info('I0 replaced.') else: logger.warning('Attenuator %s not found in the scan of motor %s.\nCheck inputs.', atten, self.alpha_i)
def _check_saving_dir(self): """ Check if the saving directory is set; if not, set a default based on the current working directory and sample name. """ if self.saving_dir: pass else: self.saving_dir = os.getcwd() + f"/{self.sample_name}" def _ensure_sample_dir(self): """ Ensure the saving directory exists, creating it if necessary. """ try: os.makedirs(self.saving_dir, exist_ok=True) except OSError as e: print('Saving directory is impossible: ', e) def _save_figure(self, fig, suffix): """ Helper method to save a matplotlib figure. Args: fig (matplotlib.figure.Figure): The figure object to save. suffix (str): Suffix to append to the filename. """ self._ensure_sample_dir() if self.Pi<80: filename = self.saving_dir + '/{}_XRR_scan_{}_Pi_{:.0f}_{}.png'.format( self.sample_name, self.scans, self.Pi, suffix) else: filename = self.saving_dir + '/{}_XRR_scan_{}_{}.png'.format( self.sample_name, self.scans, suffix) fig.savefig(filename, dpi=200) logger.info('Plot saved to {}.'.format(filename))