"""
X-ray Reflectivity (XRR) processing library.
This library is used for the processing of Surface X-ray Scattering data
obtained on the ID10 beamline at ESRF.
"""
import copy
import logging
import os
import time
from math import pi
from typing import Optional, Tuple, Union, List, Dict, Any
import h5py
import orsopy.fileio as orso
from datetime import datetime
import matplotlib.patches as patches
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as stats
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger(__name__)
def rebin(q_vectors: np.ndarray,
Refl: np.ndarray,
Relf_E: np.ndarray,
new_q: Optional[np.ndarray] = None,
rebin_as: str = "linear",
number_of_bins: int = 5000) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Rebin the data on a linear or logarithmic q-scale.
This rebinning procedure is based on one from islatu by Andrew R. McCluskey:
https://github.com/DiamondLightSource/islatu
Args:
q_vectors (np.ndarray): The current q vectors.
Refl (np.ndarray): The current reflected intensities.
Relf_E (np.ndarray): The current reflected intensity errors.
new_q (Optional[np.ndarray]): Array of potential q-values. Defaults to None.
If this argument is not specified, then the new q, R values are binned
according to rebin_as and number_of_bins.
rebin_as (str): String specifying how the data should be rebinned.
Options are "linear" and "log". This is only used if new_q is unspecified.
Defaults to "linear".
number_of_bins (int, optional): The max number of q-vectors to be using
initially in the rebinning of the data. Defaults to 5000.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: Containing:
- cleaned_q: rebinned q-values.
- cleaned_R: rebinned intensities.
- cleaned_R_e: rebinned intensity errors.
"""
# Unpack the arguments.
q = q_vectors
R = Refl
R_e = Relf_E
# Required so that logspace/linspace encapsulates the whole data.
epsilon = 0.001
if new_q is None:
# Our new q vectors have not been specified, so we should generate some.
if rebin_as == "log":
new_q = np.logspace(
np.log10(q[0]),
np.log10(q[-1] + epsilon), number_of_bins)
elif rebin_as == "linear":
new_q = np.linspace(q.min(), q.max() + epsilon,
number_of_bins)
binned_q = np.zeros_like(new_q)
binned_R = np.zeros_like(new_q)
binned_R_e = np.zeros_like(new_q)
# Use vectorized operations instead of nested loops
weights = 1.0 / (R_e ** 2)
weighted_R = R * weights
weighted_q = q * weights
# np.digitize returns indices such that bins[i-1] <= x < bins[i]
# new_q are the bin edges.
# We want q in [new_q[i], new_q[i+1]) to fall into bin i.
# digitize returns i+1 for this range.
bin_indices = np.digitize(q, new_q) - 1
# Filter out indices that are out of bounds
valid_mask = (bin_indices >= 0) & (bin_indices < len(new_q) - 1)
valid_indices = bin_indices[valid_mask]
valid_weights = weights[valid_mask]
valid_weighted_R = weighted_R[valid_mask]
valid_weighted_q = weighted_q[valid_mask]
# Use bincount to sum up weights and weighted values for each bin
sum_weights = np.bincount(valid_indices, weights=valid_weights, minlength=len(new_q))
sum_wR = np.bincount(valid_indices, weights=valid_weighted_R, minlength=len(new_q))
sum_wq = np.bincount(valid_indices, weights=valid_weighted_q, minlength=len(new_q))
# Avoid division by zero
nonzero_mask = sum_weights > 0
# Calculate weighted averages
binned_R[nonzero_mask] = sum_wR[nonzero_mask] / sum_weights[nonzero_mask]
binned_q[nonzero_mask] = sum_wq[nonzero_mask] / sum_weights[nonzero_mask]
binned_R_e[nonzero_mask] = np.sqrt(1.0 / sum_weights[nonzero_mask])
# Get rid of any empty, unused elements of the array.
cleaned_q = np.delete(binned_q, np.argwhere(binned_R == 0))
cleaned_R = np.delete(binned_R, np.argwhere(binned_R == 0))
cleaned_R_e = np.delete(binned_R_e, np.argwhere(binned_R == 0))
return cleaned_q, cleaned_R, cleaned_R_e
[docs]
class XRR:
"""
Class for the processing of Surface X-ray Scattering data.
This class handles loading h5 files, region-of-interest based integration
of 2D detector data, corrections (footprint, transmission), and plotting.
Attributes:
file (str): Path to the HDF5 file containing the data.
scans (np.ndarray): Array of scan numbers to process.
alpha_i_name (str): Name of the incident angle motor/counter.
detector_name (str): Name of the detector data entry.
monitor_name (str): Name of the monitor counter.
transmission_name (str): Name of the transmission counter.
att_name (str): Name of the attenuator positioner.
energy_name (str): Name of the energy positioner.
cnttime_name (str): Name of the counting time counter.
PX0 (int): Direct beam X pixel coordinate on detector.
PY0 (int): Direct beam Y pixel coordinate on detector.
dPX (int): Half-width of the ROI in X direction.
dPY (int): Half-width of the ROI in Y direction.
pixel_size_qxz (float): Pixel size in reciprocal space (x/z).
pixel_size_qy (float): Pixel size in reciprocal space (y).
I0 (float): Incident intensity normalization factor.
footprint_correction_applied (bool): Flag if footprint correction was applied.
corrected_doubles (bool): Flag if double points were corrected.
replaced_transmission (bool): Flag if transmission was manually replaced.
is_rebinned (bool): Flag if data has been rebinned.
qz (Optional[np.ndarray]): Calculated qz vector.
reflectivity (Optional[np.ndarray]): Calculated reflectivity.
reflectivity_error (Optional[np.ndarray]): Calculated reflectivity error.
Smap2D (List): List of 2D maps.
Smap2D_e (List): List of 2D map errors.
Qx_map (Optional[np.ndarray]): Qx map for 2D plotting.
Qz_map (Optional[np.ndarray]): Qz map for 2D plotting.
sample_name (str): Name of the sample.
"""
def __init__(self,
file: str,
scans: Union[List[int], np.ndarray],
alpha_i_name: str = 'chi',
detector_name: str = 'mpx_cdte_22_eh1',
monitor_name: str = 'mon',
transmission_name: str = 'autof_eh1_transm',
att_name: str = 'autof_eh1_curratt',
cnttime_name: str = 'sec',
PX0: int = 404,
PY0: int = 165,
dPX: int = 5,
dPY: int = 5,
bckg_gap = 1,
pixel_size_qxz: float = 0.055,
pixel_size_qy: float = 0.055,
energy_name: str = 'monoe',
I0: float = 1e13,
saving_dir = None):
"""
Initialize the XRR processing class.
Args:
file (str): Path to the HDF5 file.
scans (Union[List[int], np.ndarray]): List of scan numbers.
alpha_i_name (str, optional): Motor name for incident angle. Defaults to 'chi'.
detector_name (str, optional): Detector dataset name. Defaults to 'mpx_cdte_22_eh1'.
monitor_name (str, optional): Monitor counter name. Defaults to 'mon'.
transmission_name (str, optional): Transmission counter name. Defaults to 'autof_eh1_transm'.
att_name (str, optional): Attenuator name. Defaults to 'autof_eh1_curratt'.
cnttime_name (str, optional): Count time name. Defaults to 'sec'.
PX0 (int, optional): Direct beam X pixel. Defaults to 404.
PY0 (int, optional): Direct beam Y pixel. Defaults to 165.
dPX (int, optional): ROI half-width X. Defaults to 5.
dPY (int, optional): ROI half-width Y. Defaults to 5.
bckg_gap (int, optional): Gap between signal and backgroung ROIs in pixels. Defaults to 1.
pixel_size_qxz (float, optional): Pixel size factor. Defaults to 0.055.
pixel_size_qy (float, optional): Pixel size factor. Defaults to 0.055.
energy_name (str, optional): Energy motor name. Defaults to 'monoe'.
I0 (float, optional): Intensity normalization. Defaults to 1e13.
"""
self.file = file
self.scans = np.array(scans)
self.alpha_i_name = alpha_i_name
self.detector_name = detector_name
self.monitor_name = monitor_name
self.transmission_name = transmission_name
self.att_name = att_name
self.energy_name = energy_name
self.cnttime_name = cnttime_name
self.footprint_correction_applied = False
self.corrected_doubles = False
self.replaced_transmission = False
self.is_rebinned = False
self.zgH_scan = False
self.PX0 = PX0
self.PY0 = PY0
self.dPX = dPX
self.dPY = dPY
self.bckg_gap = bckg_gap
self.I0 = I0
self.pixel_size_qy = pixel_size_qy
self.pixel_size_qxz = pixel_size_qxz
self.qz = None
self.reflectivity = None
self.reflectivity_error = None
self.Qx_map = None
self.Qz_map = None
self.Smap2D = []
self.Smap2D_e = []
self.sample_name = ""
self.Pi = 100
self.saving_dir = saving_dir
# Initialize data attributes to avoid attribute error if accessed before loading
self.data = None
self.alpha_i = None
self.monitor = None
self.transmission = None
self.attenuator = None
self.cnttime = None
self.energy = None
self.bckg = None
self.raw_counts = None
self.__load_data__()
self.__process_2D_data__()
self._check_saving_dir()
def __load_single_scan__(self, scan_n: str) -> Dict[str, Any]:
"""
Load data for a single scan.
Args:
scan_n (str): Scan number as a string.
Returns:
Dict[str, Any]: Dictionary containing loaded data components.
"""
#logger.info('Loading scan #%s', scan_n)
with h5py.File(self.file, "r") as f:
base_path = f"{scan_n}.1"
meas_path = f"{base_path}/measurement/"
data_dict = {
'data': np.array(f.get(f"{meas_path}{self.detector_name}")),
'alpha_i': np.array(f.get(f"{meas_path}{self.alpha_i_name}")),
'monitor': np.array(f.get(f"{meas_path}{self.monitor_name}")),
'transmission': np.array(f.get(f"{meas_path}{self.transmission_name}")),
'attenuator': np.array(f.get(f"{meas_path}{self.att_name}")),
'cnttime': np.array(f.get(f"{meas_path}{self.cnttime_name}")),
'energy': np.array(f.get(f"{base_path}/instrument/positioners/{self.energy_name}")),
'sample_name': str(f.get(f"{base_path}/sample/name/")[()])[2:-1:1],
'Pi': np.mean(f.get(f"{meas_path}{'fb_Pi'}")),
}
logger.info('Loaded scan #%s', scan_n)
return data_dict
def __load_data__(self, skip_points: int = 1):
"""
Load data from all scans and concatenate them.
Args:
skip_points (int, optional): Number of initial points to skip for subsequent scans.
Defaults to 1.
"""
t0 = time.time()
#logger.info("Start loading data.")
first_scan_n = str(self.scans[0])
first_scan_data = self.__load_single_scan__(first_scan_n)
self.data = first_scan_data['data']
self.alpha_i = first_scan_data['alpha_i']
self.monitor = first_scan_data['monitor']
self.transmission = first_scan_data['transmission']
self.attenuator = first_scan_data['attenuator']
self.cnttime = first_scan_data['cnttime']
self.energy = first_scan_data['energy']
self.sample_name = first_scan_data['sample_name']
self.Pi = first_scan_data['Pi']
if len(self.scans) > 1:
for scan_num in self.scans[1:]:
scan_n = str(scan_num)
scan_data = self.__load_single_scan__(scan_n)
# Append with skipping points
self.data = np.append(self.data, scan_data['data'][skip_points:], axis=0)
self.alpha_i = np.append(self.alpha_i, scan_data['alpha_i'][skip_points:])
self.monitor = np.append(self.monitor, scan_data['monitor'][skip_points:])
self.transmission = np.append(self.transmission, scan_data['transmission'][skip_points:])
self.attenuator = np.append(self.attenuator, scan_data['attenuator'][skip_points:])
self.cnttime = np.append(self.cnttime, scan_data['cnttime'][skip_points:])
self.energy = np.append(self.energy, scan_data['energy'])
#logger.info("Loading completed. Reading time %3.3f sec", time.time() - t0)
def __process_2D_data__(self):
"""
Process the 2D detector data to calculate reflectivity.
Performs ROI integration for signal and background, calculates qz,
and normalizes by monitor and transmission.
"""
t0 = time.time()
#logger.info('Starting 2D data processing.')
if self.detector_name == 'eiger4m':
self.data = self.data.transpose(0,2,1)
else:
pass
nic, nxc, nyc = np.shape(self.data)
# Handle legacy data where energy might be an array
try:
if np.size(self.energy) > 1:
self.energy = np.mean(self.energy)
except Exception:
pass
# Pre-calculate slice indices for performance and readability
roi_y_slice = slice(self.PY0 - self.dPY, self.PY0 + self.dPY)
roi_x_slice = slice(self.PX0 - self.dPX, self.PX0 + self.dPX)
bkg_low_y_slice = slice(self.PY0 + 2 * self.dPY + self.bckg_gap - self.dPY, self.PY0 + 2 * self.dPY + self.bckg_gap + self.dPY)
bkg_high_y_slice = slice(self.PY0 - 2 * self.dPY - self.bckg_gap - self.dPY, self.PY0 - 2 * self.dPY - self.bckg_gap + self.dPY)
# Vectorized Signal and Background calculation
# Summing over Y (axis 1) and X (axis 2) for each image
# self.data has shape (nic, nxc, nyc) where nxc is Y and nyc is X based on logic
IqxyBL_all = np.sum(self.data[:, bkg_low_y_slice, roi_x_slice], axis=(1, 2))
IqxyBH_all = np.sum(self.data[:, bkg_high_y_slice, roi_x_slice], axis=(1, 2))
IqxyS_all = np.sum(self.data[:, roi_y_slice, roi_x_slice], axis=(1, 2))
Ib_cut = (IqxyBL_all + IqxyBH_all) / 2
Is_cut = IqxyS_all
Is_cut_err = np.sqrt(Is_cut)
Ib_cut_err = np.sqrt(Ib_cut)
# Vectorized Error calculation
with np.errstate(divide='ignore', invalid='ignore'):
numerator = np.sqrt(Is_cut_err**2 + Ib_cut_err**2)
denominator = Is_cut - Ib_cut
I_err = numerator / denominator
# Identify invalid entries (inf or nan)
mask_invalid = ~np.isfinite(I_err)
# Calculate fallback: Is_cut_err / Is_cut
# Use np.divide to handle div by zero in fallback gracefully
fallback = np.divide(Is_cut_err, Is_cut, where=(Is_cut > 0))
# Apply fallback
I_err[mask_invalid] = fallback[mask_invalid]
# Smap2D processing
limit_map = min(nic, len(self.alpha_i))
if limit_map > 0:
# Slicing data up to limit_map
data_slice = self.data[:limit_map]
# Sum along axis 1 (Y axis) to get profile along X
Qzcut_all = np.sum(data_slice[:, roi_y_slice, :], axis=1)
Qzcut_bckg1_all = np.sum(data_slice[:, bkg_low_y_slice, :], axis=1)
Qzcut_bckg2_all = np.sum(data_slice[:, bkg_high_y_slice, :], axis=1)
norm_factor = self.transmission[:limit_map] * self.monitor[:limit_map] / self.monitor[0]
# Reshape for broadcasting: (N, 1)
norm_factor = norm_factor[:, np.newaxis]
self.Smap2D = (Qzcut_all - (Qzcut_bckg1_all + Qzcut_bckg2_all) / 2) / norm_factor
self.Smap2D_e = ((np.sqrt(np.abs(Qzcut_all)) + np.sqrt((Qzcut_bckg1_all + Qzcut_bckg2_all) / 2)) / norm_factor)
else:
self.Smap2D = []
self.Smap2D_e = []
logger.info('Number of points in the scan %6d', len(self.alpha_i))
limit_len = len(self.alpha_i)
I_Signal_cut = np.nan_to_num(Is_cut[:limit_len], nan=1e-11)
I_Backgr_cut = np.nan_to_num(Ib_cut[:limit_len], nan=1e-11)
I_error = np.nan_to_num(I_err[:limit_len], nan=1e-11)
self.qz = 4 * pi * np.sin(np.deg2rad(self.alpha_i)) / (12.398 / self.energy)
# Reflectivity calculation
norm_factor = self.transmission * self.monitor / self.monitor[0]
self.reflectivity = (I_Signal_cut - I_Backgr_cut) / norm_factor / self.I0
self.reflectivity_error = np.abs(I_error * self.reflectivity)
self.bckg = I_Backgr_cut / norm_factor
self.raw_counts = I_Signal_cut
# Clip values
self.reflectivity[self.reflectivity <= 1e-12] = 1e-12
self.reflectivity_error[self.reflectivity_error <= 1e-13] = 1e-13
self.Smap2D = np.array(self.Smap2D)
self.Smap2D_e = np.array(self.Smap2D_e)
#logger.info("Processing completed. Processing time %3.3f sec", time.time() - t0)
[docs]
def reprocess(self):
"""
Reload and reprocess the data, resetting corrections.
"""
self.__load_data__()
self.__process_2D_data__()
self.footprint_correction_applied = False
self.corrected_doubles = False
self.replaced_transmission = False
self.is_rebinned = False
logger.info("Reloaded and reprocessed data.")
[docs]
def apply_auto_corrections(self, sample_size:float, beam_size:float, z_scan:'XRR'):
self.correct_doubles()
self.assert_i0(z_scan)
self.reprocess()
self.correct_doubles()
self.footprint_correction(sample_size, beam_size, correct_dir_beam=True)
logger.info("Reflectivity is fully corrected.")
[docs]
def produce_Qmap(self, SDD: float = 900):
"""
Calculate Q-space map.
Args:
SDD (float, optional): Sample-to-Detector Distance in mm. Defaults to 900.
"""
t0 = time.time()
logger.info('Starting q-space mapping.')
chi_r = np.deg2rad(self.alpha_i)
nic, nxc, nyc = np.shape(self.data)
pixels = np.array(range(nyc))
k0 = 2 * pi / (12.398 / self.energy)
# Vectorized calculation
chi_r_col = chi_r[:, np.newaxis]
pixels_row = pixels[np.newaxis, :]
term_px = (self.PX0 - pixels_row) * self.pixel_size_qxz / SDD
angle_term = chi_r_col + term_px
self.Qz_map = np.round(k0 * (np.sin(angle_term) + np.sin(chi_r_col)), 10)
self.Qx_map = np.round(k0 * (np.cos(angle_term) - np.cos(chi_r_col)), 10)
logger.info("2D map calculated. Processing time %3.3f sec", time.time() - t0)
[docs]
def plot_Qmap(self, save: bool = False, fig: Optional[plt.Figure] = None, axes: Optional[plt.Axes] = None) -> Tuple[plt.Figure, plt.Axes]:
"""
Plot the Q-space map.
Args:
save (bool, optional): Whether to save the plot. Defaults to False.
fig (Optional[plt.Figure], optional): Matplotlib figure object. Defaults to None.
axes (Optional[plt.Axes], optional): Matplotlib axes object. Defaults to None.
Returns:
Tuple[plt.Figure, plt.Axes]: The figure and axes objects.
"""
if axes is None:
fig, (ax0) = plt.subplots(nrows=1, ncols=1, figsize=(6, 6), layout='tight')
else:
ax0 = axes
if fig is None:
fig = ax0.get_figure()
# Check if Qx_map and Qz_map are computed
if self.Qx_map is None or self.Qz_map is None:
self.produce_Qmap()
ax0.pcolormesh(self.Qx_map, self.Qz_map, np.log10(self.Smap2D), cmap='viridis', vmin=2, vmax=10,
shading='gouraud', snap=True)
ax0.set_xlabel(r'$q_x, \AA^{-1}$')
ax0.set_ylabel(r'$q_z, \AA^{-1}$')
ax0.set_ylim(0, 0.5)
ax0.set_xlim(-4e-4, 2e-4)
ax0.ticklabel_format(axis='x', style='sci', scilimits=(0, 0))
if save:
logger.info('Saving Q-space map.')
plt.savefig('Qmap_{}_scan_{}.png'.format(self.sample_name, self.scans), dpi=300)
return fig, ax0
[docs]
def save_Qmap(self, log_scale=False):
"""
Save the Q-space map to a text file. Format is 3 columns: Q_x, Q_z, Intensity
Args:
log_scale (bool, optional): Whether to save the logarithm of intensity to the file. Defaults to False.
"""
if self.Qx_map is None or self.Qz_map is None:
self.produce_Qmap()
self._ensure_sample_dir()
map_qx = np.ravel(self.Qx_map)
map_qz = np.ravel(self.Qz_map)
map_int = np.ravel(self.Smap2D)
if log_scale:
map_int = np.log10(map_int)
out = np.array([map_qx, map_qz, map_int]).T
if self.Pi<80:
filename = self.saving_dir + '/{}_2DMap_scan_{}_Pi_{:.0f}.dat'.format(
self.sample_name, self.scans, self.Pi)
else:
filename = self.saving_dir + '/{}_XRR_scan_{}.dat'.format(
self.sample_name, self.scans)
np.savetxt(filename, out)
logger.info('Q-space map in text saved to: %s', filename)
[docs]
def get_reflectivity(self) -> np.ndarray:
"""
Get the reflectivity data sorted by qz.
Returns:
np.ndarray: Array containing [qz, reflectivity, reflectivity_error].
"""
# Ensure indices sort by qz
sort_indices = self.qz.argsort()
qz_tr = np.sort(self.qz) # or self.qz[sort_indices]
R_tr = self.reflectivity[sort_indices]
Rerr_tr = self.reflectivity_error[sort_indices]
return np.array([qz_tr, R_tr, Rerr_tr])
[docs]
def plot_reflectivity(self, save: bool = False, ax: Optional[plt.Axes] = None, **kwargs) -> Tuple[plt.Figure, plt.Axes]:
"""
Plot the reflectivity curve.
Args:
save (bool, optional): Whether to save the plot. Defaults to False.
ax (Optional[plt.Axes], optional): Matplotlib axes object. Defaults to None.
Returns:
Tuple[plt.Figure, plt.Axes]: The figure and axes objects.
"""
if ax is None:
fig = plt.figure(figsize=(6, 6), layout='tight')
ax = plt.gca()
else:
fig = ax.get_figure()
ax.errorbar(*self.get_reflectivity(), **kwargs)
ax.semilogy()
ax.set_xlim(left=0)
ax.set_ylim(top=2)
ax.set_yticks([1e-10, 1e-8, 1e-6, 1e-4, 1e-2, 1])
ax.set_ylim(5e-11, 2e0)
ax.set_xlabel(r'$q_z, \AA^{-1}$')
ax.set_ylabel(r'$\mathrm{Reflectivity}$')
if save:
self._save_figure(plt.gcf(),'log')
return fig, ax
[docs]
def plot_reflectivity_qz4(self, save: bool = False, ax: Optional[plt.Axes] = None, **kwargs) -> Tuple[plt.Figure, plt.Axes]:
"""
Plot reflectivity multiplied by qz^4 (Porod plot).
Args:
save (bool, optional): Whether to save the plot. Defaults to False.
ax (Optional[plt.Axes], optional): Matplotlib axes object. Defaults to None.
Returns:
Tuple[plt.Figure, plt.Axes]: The figure and axes objects.
"""
if ax is None:
fig = plt.figure(figsize=(6, 6), layout='tight')
ax = plt.gca()
else:
fig = ax.get_figure()
ax.errorbar(self.qz, self.reflectivity * self.qz ** 4, self.reflectivity_error * self.qz ** 4, **kwargs)
ax.semilogy()
ax.set_xlim(left=0)
ax.set_ylim(bottom=1e-11)
ax.set_xlabel(r'$q_z, \AA^{-1}$')
ax.set_ylabel(r'$\mathrm{Reflectivity}\cdot q_z^4$')
if save:
self._save_figure(plt.gcf(),'qz4')
return fig, ax
[docs]
def save_reflectivity(self, format: str = 'dat', owner: str = 'ESRF', creator: str = 'opid10', affiliation: str='Harvard', zgh_scans: Optional[List[int]] = None):
"""
Save the reflectivity data to a text file.
Args:
format (str, optional): Format of the saved file. Options are 'dat' and 'orso'. Defaults to 'dat'.
owner (str, optional): Owner of the data. Defaults to 'ESRF'.
creator (str, optional): Creator of the reduced file. Defaults to 'opid10'.
zgh_scans (Optional[List[int]], optional): List of zgH scan numbers. Defaults to None.
"""
self._ensure_sample_dir()
if format == 'orso':
self._save_orso(owner, creator, affiliation)
return
out = self.get_reflectivity().T
if self.Pi<80:
filename = self.saving_dir + '/{}_XRR_scan_{}_Pi_{:.0f}.dat'.format(
self.sample_name, self.scans, self.Pi)
else:
filename = self.saving_dir + '/{}_XRR_scan_{}.dat'.format(
self.sample_name, self.scans)
np.savetxt(filename, out)
logger.info('Reflectivity saved to: %s', filename)
def _save_orso(self, owner: str, creator: str, affiliation: str):
"""
Save the reflectivity data in ORSO format.
owner --- PI
creator --- person, who processed the data
affiliation --- shared between the two
"""
# 1. Define Columns
columns = [
orso.Column(name='Qz', unit='1/angstrom', physical_quantity='momentum transfer'),
orso.Column(name='R', unit=None, physical_quantity='reflectivity'),
orso.ErrorColumn(error_of='R', error_type='uncertainty', value_is='sigma'),
orso.ErrorColumn(error_of='Qz', error_type='resolution', value_is='sigma')
]
# 2. DataSource
# Try to get date, else default
try:
# Try to read start_time from the first scan in the file
with h5py.File(self.file, "r") as f:
# Assuming standard ESRF structure where start_time might be in the scan group
# Need to find where it is located. Often in 'scan_number.1/start_time'
scan_n = str(self.scans[0])
base_path = f"{scan_n}.1"
start_time_str = f[base_path].attrs.get('start_time') or f[f"{base_path}/start_time"][()].decode('utf-8')
# Parse date string, e.g., '2023-10-27T10:00:00' or similar
# ESRF format can vary, often it is isoformat-like
try:
start_date = datetime.fromisoformat(str(start_time_str))
except ValueError:
# Fallback for other formats if needed, or just use now
start_date = datetime.now()
except Exception:
start_date = datetime.now()
owner_person = orso.Person(name=owner, affiliation=affiliation)
experiment = orso.Experiment(
title='XRR',
instrument='ID10-SURF',
start_date=start_date,
probe='x-ray',
facility='ESRF',
)
sample = orso.Sample(name=self.sample_name)
# Measurement
data_files = [self.file]
comment = f"Scans: {self.scans}, Pi: {self.Pi}"
if self.zgH_scan:
comment += f", zgH Scans: {self.zgH_scan}"
measurement = orso.Measurement(
instrument_settings=orso.InstrumentSettings(
incident_angle=orso.ValueRange(min=np.min(self.alpha_i), max=np.max(self.alpha_i), unit='deg'),
wavelength=orso.Value(magnitude=12.398/self.energy, unit='angstrom'),
),
data_files=data_files,
comment=comment
)
data_source = orso.DataSource(
owner=owner_person,
experiment=experiment,
sample=sample,
measurement=measurement
)
# 3. Reduction
reduction = orso.Reduction(
software=orso.Software(name='ESRF_ID10_SURF', version='0.1'),
creator=orso.Person(name=creator, affiliation=affiliation),
timestamp=datetime.now()
)
# 4. Data
qz, R, dR = self.get_reflectivity()
# Create resolution array
dqz = np.full_like(qz, 1e-5)
data = np.column_stack((qz, R, dR, dqz))
# 5. Create Dataset and Save
orso_info = orso.Orso(
data_source=data_source,
reduction=reduction,
columns=columns,
data_set=self.sample_name
)
dataset = orso.OrsoDataset(info=orso_info, data=data)
# Filename logic
if self.Pi < 80:
filename = os.path.join(self.saving_dir, '{}_XRR_scan_{}_Pi_{:.0f}.ort'.format(
self.sample_name, self.scans, self.Pi))
else:
filename = os.path.join(self.saving_dir, '{}_XRR_scan_{}.ort'.format(
self.sample_name, self.scans))
orso.save_orso([dataset], filename)
logger.info('Reflectivity saved to: %s', filename)
[docs]
def show_detector_image(self, frame_number: int = 50, ax: Optional[plt.Axes] = None, plot_cross: bool = True):
"""
Show a single frame of the detector image with ROI and background regions.
Args:
frame_number (int, optional): Frame number to display. Defaults to 50.
ax (Optional[plt.Axes], optional): Matplotlib axes object. Defaults to None.
plot_cross (bool, optional): Whether to plot crosshairs at beam center. Defaults to True.
"""
if ax is None:
fig = plt.figure(figsize=(6, 6), layout='tight')
ax = plt.gca()
ax.imshow(np.log10(self.data[frame_number] + 1e-3))
ax.set_ylim(self.PY0 - 10 * self.dPY, self.PY0 + 10 * self.dPY)
ax.set_xlim(self.PX0 - 10 * self.dPY, self.PX0 + 10 * self.dPY)
signal = patches.Rectangle((self.PX0 - self.dPX, self.PY0 - self.dPY), 2 * self.dPX, 2 * self.dPY, linewidth=1,
edgecolor='r', facecolor='none', label='Signal')
b1 = patches.Rectangle((self.PX0 - self.dPX, self.PY0 + 2 * self.dPY + self.bckg_gap - self.dPY), 2 * self.dPX,
2 * self.dPY, linewidth=1, edgecolor='cyan', facecolor='none', label='Background')
b2 = patches.Rectangle((self.PX0 - self.dPX, self.PY0 - 2 * self.dPY - self.bckg_gap - self.dPY), 2 * self.dPX,
2 * self.dPY, linewidth=1, edgecolor='cyan', facecolor='none')
ax.add_patch(signal)
ax.add_patch(b1)
ax.add_patch(b2)
if plot_cross:
ax.hlines(self.PY0, self.PX0 - 30, self.PX0 + 30)
ax.vlines(self.PX0, self.PY0 - 30, self.PY0 + 30)
ax.set_xlabel('Detector pixel, X')
ax.set_ylabel('Detector pixel, Y')
ax.set_title('Detector {}, frame #{}'.format(self.detector_name, frame_number))
ax.legend()
[docs]
def replace_transmission(self, filter_transmission: Dict[Any, float]):
"""
Replace transmission values using a dictionary of filter transmissions.
Args:
filter_transmission (Dict[Any, float]): Mapping of attenuator position to transmission.
"""
_new_transmission = np.array([])
for point in self.attenuator:
_new_transmission = np.append(_new_transmission, [float(filter_transmission.get(point, 1.0))])
# Calculate scaling factor
# Avoid division by zero
with np.errstate(divide='ignore', invalid='ignore'):
scaling_factor = self.transmission / _new_transmission
new_reflectivity = self.reflectivity * scaling_factor
new_reflectivity_error = self.reflectivity_error * scaling_factor
new_background = self.bckg * scaling_factor
self.transmission = _new_transmission
self.reflectivity = new_reflectivity
self.reflectivity_error = new_reflectivity_error
self.bckg = new_background
self.replaced_transmission = True
#logger.info('Transmission corrected. Recalculation will reset it to defaults.')
@staticmethod
def _find_double_(x: np.ndarray) -> Dict[float, np.ndarray]:
"""
Find indices of repeated values in an array.
Args:
x (np.ndarray): Input array.
Returns:
Dict[float, np.ndarray]: Dictionary mapping values to their indices.
"""
u, c = np.unique(x, return_counts=True)
values_of_interest = u[c > 1]
indexes_multiple_values = {value: np.where(x == value)[0] for value in values_of_interest}
return indexes_multiple_values
[docs]
def calculate_corrected_transmission(self) -> Dict[Any, float]:
"""
Calculate corrected transmission based on overlapping points (doubles).
Returns:
Dict[Any, float]: Dictionary of corrected transmissions.
"""
_new_transmission_dict = dict(zip(self.attenuator, self.transmission))
if self.replaced_transmission:
logger.warning('Transmission was changed. Consider reprocessing data.')
else:
double_x = XRR._find_double_(self.qz)
sorted_double_x = dict(sorted(double_x.items(), reverse=True))
coeff = []
# This logic assumes the array is sorted by angle/qz in general but contains overlaps
# It calculates ratio between the last point of the first segment and first point of second segment
# for a given Q value.
# Note: i is the key (qz value), sorted_double_x[i] are indices
for i in sorted_double_x:
indices = sorted_double_x[i]
# Assuming indices are ordered like [idx1, idx2], where idx1 is from higher intensity (lower attenuation) usually?
# The original code used [-2] and [-1].
if len(indices) >= 2:
coeff.append(self.reflectivity[indices[-2]] / self.reflectivity[indices[-1]])
else:
coeff.append(1.0) # Fallback
coeff_dict = {i: {'coeff': k, 'indexes': sorted_double_x[i]} for i, k in zip(sorted_double_x, coeff)}
_new_transm = copy.copy(self.transmission)
for i in coeff_dict:
# Apply correction to all points before the overlap
# This assumes scan direction and that overlaps are sequential adjustments
idx_limit = coeff_dict[i]['indexes'][1]
_new_transm[0:idx_limit] = _new_transm[0:idx_limit] * coeff_dict[i]['coeff']
_new_transmission_dict = dict(zip(self.attenuator, _new_transm))
return _new_transmission_dict
[docs]
def correct_doubles(self):
"""
Correct transmission using overlapping points.
"""
if not self.corrected_doubles:
logger.info('Correcting transmission using double points.')
new_transm = self.calculate_corrected_transmission()
self.replace_transmission(new_transm)
self.corrected_doubles = True
else:
logger.warning('Double points already corrected.')
[docs]
def do_rebin(self, *args, **kwargs):
"""
Rebin the reflectivity data.
Args:
*args: Arguments passed to rebin function.
**kwargs: Keyword arguments passed to rebin function.
"""
new_qz, new_R, new_R_err = rebin(*self.get_reflectivity(), **kwargs)
self.qz = new_qz
self.reflectivity = new_R
self.reflectivity_error = new_R_err
self.is_rebinned = True
[docs]
@staticmethod
def find_i0_from_z_scan(z: np.ndarray, I: np.ndarray) -> Tuple[float, int]:
"""
Find I0 (direct beam intensity) from a Z-scan.
Args:
z (np.ndarray): Z positions.
I (np.ndarray): Intensity.
Returns:
Tuple[float, int]: I0 value and index of maximum intensity.
"""
n_max = np.argmax(I)
I_cutoff = np.mean(I[:n_max]) - np.sqrt(np.mean(I[:n_max]))
zscan_new = I[np.where(I >= I_cutoff)]
if len(zscan_new) > 0:
I0 = np.median(zscan_new)
else:
I0 = I[n_max]
return I0, n_max
[docs]
def find_i0(self, to_print: bool = True) -> Tuple[float, Any, float]:
"""
Find I0, attenuator and transmission at the direct beam.
Args:
to_print (bool, optional): Whether to print details. Defaults to True.
Returns:
Tuple[float, Any, float]: I0, attenuator value, transmission value.
"""
if to_print:
logger.info('Processing scan of motor %s.', self.alpha_i_name)
I0, n_max = XRR.find_i0_from_z_scan(self.alpha_i, self.raw_counts)
if to_print:
logger.info('I0 = %.5e, attenuator = %s, transmission = %.5e',
I0, self.attenuator[n_max], self.transmission[n_max])
return I0, self.attenuator[n_max], self.transmission[n_max]
def _assert_i0_from_calc(self, calc_I0: float, atten: Any, transmission: float):
"""
Internal method to update I0 from calculated values.
"""
if self.alpha_i_name.lower() == 'zgh':
logger.warning('You are trying to replace I0 in zgH scan. Load reflectivity data.')
else:
if atten in self.attenuator:
if self.corrected_doubles:
# Find indices where attenuator matches
indices = np.where(self.attenuator == atten)
# Use the transmission of the first matching point for adjustment
# This logic seems specific to the beamline workflow
current_transmission = self.transmission[indices][0]
I0 = calc_I0 * transmission / current_transmission
self.I0 = I0
logger.info('I0 replaced.')
else:
logger.warning('Attenuator %s not found in the scan of motor %s.\nCheck inputs.',
atten, self.alpha_i)
[docs]
def assert_i0(self, zgH_scan: 'XRR'):
"""
Assert I0 from a zgH scan (direct beam scan).
Args:
zgH_scan (XRR): Another XRR object representing the direct beam scan.
"""
if self.alpha_i_name.lower() == 'zgh':
logger.warning('You are trying to replace I0 in zgH scan. Load reflectivity data.')
else:
calc_I0, atten, transmission = zgH_scan.find_i0(to_print=False)
calc_I0 = calc_I0 * zgH_scan.monitor[0] / self.monitor[0]
if atten in self.attenuator:
if self.corrected_doubles:
indices = np.where(self.attenuator == atten)
if len(indices[0]) > 0:
current_trans = self.transmission[indices[0][0]]
I0 = calc_I0 / current_trans
logger.info('Flux set to {:.4e}'.format(I0))
self.I0 = I0
self.zgH_scan = zgH_scan.scans
logger.info('I0 replaced.')
else:
logger.warning('Attenuator %s not found in the scan of motor %s.\nCheck inputs.',
atten, self.alpha_i)
def _check_saving_dir(self):
"""
Check if the saving directory is set; if not, set a default based on the current working directory and sample name.
"""
if self.saving_dir:
pass
else:
self.saving_dir = os.getcwd() + f"/{self.sample_name}"
def _ensure_sample_dir(self):
"""
Ensure the saving directory exists, creating it if necessary.
"""
try:
os.makedirs(self.saving_dir, exist_ok=True)
except OSError as e:
print('Saving directory is impossible: ', e)
def _save_figure(self, fig, suffix):
"""
Helper method to save a matplotlib figure.
Args:
fig (matplotlib.figure.Figure): The figure object to save.
suffix (str): Suffix to append to the filename.
"""
self._ensure_sample_dir()
if self.Pi<80:
filename = self.saving_dir + '/{}_XRR_scan_{}_Pi_{:.0f}_{}.png'.format(
self.sample_name, self.scans, self.Pi, suffix)
else:
filename = self.saving_dir + '/{}_XRR_scan_{}_{}.png'.format(
self.sample_name, self.scans, suffix)
fig.savefig(filename, dpi=200)
logger.info('Plot saved to {}.'.format(filename))