From 43e3437760e197a5ac70fe65010ff0b7ee57ae11 Mon Sep 17 00:00:00 2001 From: Ryuichi Arafune Date: Tue, 30 Apr 2024 11:39:16 +0900 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=A8=20=20Reorganize=20class=20ARPESAcc?= =?UTF-8?q?essorBase?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/arpes/xarray_extensions.py | 2608 ++++++++++++++++---------------- 1 file changed, 1326 insertions(+), 1282 deletions(-) diff --git a/src/arpes/xarray_extensions.py b/src/arpes/xarray_extensions.py index e71784b3..c384d1c4 100644 --- a/src/arpes/xarray_extensions.py +++ b/src/arpes/xarray_extensions.py @@ -163,42 +163,117 @@ T = TypeVar("T") -class ARPESAccessorBase: - """Base class for the xarray extensions in PyARPES.""" +class ARPESAngleProperty: + def __init__(self, xarray_obj: XrTypes) -> None: + self._obj = xarray_obj - class _SliceAlongPathKwags(TypedDict, total=False): - axis_name: str - resolution: float - n_points: int - extend_to_edge: bool + @property + def angle_unit(self) -> Literal["Degrees", "Radians"]: + return self._obj.attrs.get("angle_unit", "Radians") - def along( - self, - directions: list[Hashable | dict[Hashable, float]], - **kwargs: Unpack[_SliceAlongPathKwags], - ) -> xr.Dataset: # TODO: [RA] xr.DataArray - """[TODO:summary]. + @angle_unit.setter + def angle_unit(self, angle_unit: Literal["Degrees", "Radians"]) -> None: + """Set "angle unit". + + Angle unit should be "Degrees" or "Radians" Args: - directions (list[Hashable] | dict[Hashable, float]): Direction to slice. - kwargs: axis_name, resolution, n_points, extend_to_edge_shift_gamma + angle_unit: Literal["Degrees", "Radians"] + """ + assert angle_unit in { + "Degrees", + "Radians", + }, "Angle unit should be 'Degrees' or 'Radians'" + self._obj.attrs["angle_unit"] = angle_unit - Returns: - xr.Dataset + def swap_angle_unit(self) -> None: + """Swap angle unit (radians <-> degrees). + + Change the value of angle related objects/variables in attrs and coords """ - assert isinstance(self._obj, xr.DataArray) - return slice_along_path(self._obj, interpolation_points=directions, **kwargs) + if self.angle_unit == "Radians" or self.angle_unit.startswith("rad"): + self._radian_to_degree() + elif self.angle_unit == "Degrees" or self.angle_unit.startswith("deg"): + self._degree_to_radian() + else: + msg = 'The angle_unit must be "Radians" or "Degrees"' + raise TypeError(msg) - def find(self, name: str) -> list[str]: - """Return the property names containing the "name". + def _radian_to_degree(self) -> None: + """A Helper function for swap_angle_unit. - Args: - name (str): string to find. + Degree -> Radian + """ + self.angle_unit = "Degrees" + for angle in ANGLE_VARS: + if angle in self._obj.attrs: + self._obj.attrs[angle] = np.rad2deg(self._obj.attrs.get(angle, np.nan)) + if angle + "_offset" in self._obj.attrs: + self._obj.attrs[angle + "_offset"] = np.rad2deg( + self._obj.attrs.get(angle + "_offset", np.nan), + ) + if angle in self._obj.coords: + self._obj.coords[angle] = np.rad2deg(self._obj.coords[angle]) - Returns: list[str] - Property list + def _degree_to_radian(self) -> None: + """A Helper function for swan_angle_unit. + + Radian -> Degree """ - return [n for n in dir(self) if name in n] + self.angle_unit = "Radians" + for angle in ANGLE_VARS: + if angle in self._obj.attrs: + self._obj.attrs[angle] = np.deg2rad(self._obj.attrs.get(angle, np.nan)) + if angle + "_offset" in self._obj.attrs: + self._obj.attrs[angle + "_offset"] = np.deg2rad( + self._obj.attrs.get(angle + "_offset", np.nan), + ) + if angle in self._obj.coords: + self._obj.coords[angle] = np.deg2rad(self._obj.coords[angle]) + + +class ARPESPhysicalProperty: + def __init__(self, xarray_obj: XrTypes) -> None: + self._obj = xarray_obj + + @property + def work_function(self) -> float: + """Provides the work function of the sample, if present in metadata. + + Otherwise, uses something approximate. + + Note: + This "work_function" should *NOT* be used for k-conversion! + """ + assert isinstance(self._obj, xr.DataArray | xr.Dataset) + if "sample_workfunction" in self._obj.attrs: + return self._obj.attrs["sample_workfunction"] + return 4.3 + + @property + def analyzer_work_function(self) -> float: + """Provides the work function of the analyzer, if present in metadata. + + otherwise, use appropriate + + Note: + Use this value for k-conversion. + """ + assert isinstance(self._obj, xr.DataArray | xr.Dataset) + if "workfunction" in self._obj.attrs: + return self._obj.attrs["workfunction"] + return 4.401 + + @property + def inner_potential(self) -> float: + """Provides the inner potential, if present in metadata. + + Otherwise, 10 eV is assumed. + """ + assert isinstance(self._obj, xr.DataArray | xr.Dataset) + if "inner_potential" in self._obj.attrs: + return self._obj.attrs["inner_potential"] + return 10 @property def sherman_function(self) -> float: @@ -218,6 +293,46 @@ def sherman_function(self) -> float: msg = "No Sherman function could be found on the data. Is this a spin dataset?" raise ValueError(msg) + @property + def hv(self) -> float | xr.DataArray: + """Return the photon energy. + + Returns: float | xr.DataArray + Photon energy in eV unit. (for hv_map type, xr.DataArray is returned.) + """ + assert isinstance(self._obj, xr.DataArray | xr.Dataset) + try: + return float(self._obj.coords["hv"]) + except TypeError: + return self._obj.coords["hv"] + + @property + def temp(self) -> float | Literal["RT", "LT"]: + """The temperature at which an experiment was performed.""" + prefered_attrs = [ + "TA", + "ta", + "t_a", + "T_A", + "T_1", + "t_1", + "t1", + "T1", + "temp", + "temp_sample", + "temperature", + "temp_cryotip", + "temperature_sensor_b", + "temperature_sensor_a", + "temperature_cryotip", + ] + for attr in prefered_attrs: + if attr in self._obj.attrs: + return self._obj.attrs[attr] + msg = "Could not read temperature off any standard attr" + logger.debug(msg, stacklevel=2) + return np.nan + @property def experimental_conditions( self, @@ -254,63 +369,53 @@ def polarization(self) -> float | str | tuple[float, float]: return np.nan @property - def is_subtracted(self) -> bool: # TODO: [RA] xr.DataArray - """Infers whether a given data is subtracted. - - Returns (bool): - Return True if the data is subtracted. - """ - assert isinstance(self._obj, xr.DataArray) - if self._obj.attrs.get("subtracted"): - return True + def sample_pos(self) -> tuple[float, float, float]: + return ( + float(self._obj.attrs["x"]), + float(self._obj.attrs["y"]), + float(self._obj.attrs["z"]), + ) - threshold_is_5_percent = 0.05 - return (((self._obj < 0) * 1).mean() > threshold_is_5_percent).item() + @property + def probe_polarization(self) -> tuple[float, float]: + """Provides the probe polarization of the UV/x-ray source.""" + return ( + self._obj.attrs.get("probe_polarization_theta", np.nan), + self._obj.attrs.get("probe_polarization_alpha", np.nan), + ) @property - def is_spatial(self) -> bool: - """Infers whether a given scan has real-space dimensions (SPEM or u/nARPES). + def pump_polarization(self) -> tuple[float, float]: + """For Tr-ARPES experiments, provides the pump polarization.""" + return ( + self._obj.attrs.get("pump_polarization_theta", np.nan), + self._obj.attrs.get("pump_polarization_alpha", np.nan), + ) - Returns: - True if the data is explicltly a "ucut" or "spem" or contains "X", "Y", or "Z" - dimensions. False otherwise. - """ - assert isinstance(self._obj, xr.DataArray | xr.Dataset) - if self.spectrum_type in {"ucut", "spem"}: - return True - return any(d in {"X", "Y", "Z"} for d in self._obj.dims) +class ARPESInfoProperty(ARPESPhysicalProperty): + def __init__(self, xarray_obj: XrTypes) -> None: + self._obj = xarray_obj @property - def is_kspace(self) -> bool: - """Infers whether the scan is k-space converted or not. - - Because of the way this is defined, it will return - true for XPS spectra, which I suppose is true but trivially. + def scan_name(self) -> str: + """Return scan name. - Returns: - True if the data is k-space converted. False otherwise. + Returns: (str) + If "scan" or "file" is set in attrs, return the file name. + If they are not set, return "id" if "id" is set. """ - assert isinstance(self._obj, xr.DataArray | xr.Dataset) - return not any(d in {"phi", "theta", "beta", "angle"} for d in self._obj.dims) + for option in ["scan", "file"]: + if option in self._obj.attrs: + return Path(self._obj.attrs[option]).name - @property - def is_slit_vertical(self) -> bool: # TODO: [RA] Refactoring ? - """Infers whether the scan is taken on an analyzer with vertical slit. + id_code = self._obj.attrs.get("id") - Caveat emptor: this assumes that the alpha coordinate is not some intermediate value. + return str(id_code) if id_code is not None else "No ID" - Returns: - True if the alpha value is consistent with a vertical slit analyzer. False otherwise. - """ - angle_tolerance = 1.0 - if self.angle_unit.startswith("Deg") or self.angle_unit.startswith("deg"): - return float(np.abs(self.lookup_offset_coord("alpha") - 90.0)) < angle_tolerance - return float(np.abs(self.lookup_offset_coord("alpha") - np.pi / 2)) < float( - np.deg2rad( - angle_tolerance, - ), - ) + @property + def label(self) -> str: + return str(self._obj.attrs.get("description", self.scan_name)) @property def endstation(self) -> str: @@ -321,65 +426,201 @@ def endstation(self) -> str: """ return str(self._obj.attrs["location"]) - def with_values( - self, - new_values: NDArray[np.float_], - ) -> xr.DataArray: # TODO: [RA] xr.DataArray - """Copy with new array values. - - Easy way of creating a DataArray that has the same shape as the calling object but data - populated from the array `new_values`. - - Notes: This method is applicable only for xr.DataArray. (Not xr.Dataset) + @property + def sample_info(self) -> SampleInfo: + """Return sample info property. - Args: - new_values: The new values which should be used for the data. + Returns (dict): + """ + sample_info: SampleInfo = { + "id": self._obj.attrs.get("sample_id"), + "sample_name": self._obj.attrs.get("sample_name"), + "source": self._obj.attrs.get("sample_source"), + "reflectivity": self._obj.attrs.get("sample_reflectivity", np.nan), + } + return sample_info - Returns: - A copy of the data with new values but identical dimensions, coordinates, and attrs. + @property + def scan_info(self) -> ScanInfo: + scan_info: ScanInfo = { + "time": self._obj.attrs.get("time", None), + "date": self._obj.attrs.get("date", None), + "type": self.scan_type, + "spectrum_type": self.spectrum_type, + "experimenter": self._obj.attrs.get("experimenter"), + "sample": self._obj.attrs.get("sample_name"), + } + return scan_info - ToDo: Test - """ - assert isinstance(self._obj, xr.DataArray) - return xr.DataArray( - new_values.reshape(self._obj.values.shape), - coords=self._obj.coords, - dims=self._obj.dims, - attrs=self._obj.attrs, - ) + @property + def experiment_info(self) -> ExperimentInfo: + """Return experiment info property.""" + experiment_info: ExperimentInfo = { + "temperature": self.temp, + "temperature_cryotip": self._obj.attrs.get("temperature_cryotip", np.nan), + "pressure": self._obj.attrs.get("pressure", np.nan), + "polarization": self.probe_polarization, + "photon_flux": self._obj.attrs.get("photon_flux", np.nan), + "photocurrent": self._obj.attrs.get("photocurrent", np.nan), + "probe": self._obj.attrs.get("probe"), + "probe_detail": self._obj.attrs.get("probe_detail"), + "analyzer_detail": self.analyzer_detail, + } + return experiment_info @property - def logical_offsets(self) -> dict[str, float | xr.DataArray]: - """Return logical offsets. + def pump_info(self) -> LightSourceInfo: + """Return pump info property.""" + pump_info: LightSourceInfo = { + "pump_wavelength": self._obj.attrs.get("pump_wavelength", np.nan), + "pump_energy": self._obj.attrs.get("pump_energy", np.nan), + "pump_fluence": self._obj.attrs.get("pump_fluence", np.nan), + "pump_pulse_energy": self._obj.attrs.get("pump_pulse_energy", np.nan), + "pump_spot_size": ( + self._obj.attrs.get("pump_spot_size_x", np.nan), + self._obj.attrs.get("pump_spot_size_y", np.nan), + ), + "pump_profile": self._obj.attrs.get("pump_profile"), + "pump_linewidth": self._obj.attrs.get("pump_linewidth", np.nan), + "pump_duration": self._obj.attrs.get("pump_duration", np.nan), + "pump_polarization": self.pump_polarization, + } + return pump_info - Returns: - dict object of long_* + physical_long_* (*: x, y, or z) + @property + def probe_info(self) -> LightSourceInfo: + """Return probe info property. + + Returns (LIGHTSOURCEINFO): """ - assert isinstance(self._obj, xr.DataArray | xr.Dataset) - if "long_x" not in self._obj.coords: - msg = "Logical offsets can currently only be accessed for hierarchical" - msg += " motor systems like nanoARPES." - raise ValueError( - msg, - ) + probe_info: LightSourceInfo = { + "probe_wavelength": self._obj.attrs.get("probe_wavelength", np.nan), + "probe_energy": self.hv, + "probe_fluence": self._obj.attrs.get("probe_fluence", np.nan), + "probe_pulse_energy": self._obj.attrs.get("probe_pulse_energy", np.nan), + "probe_spot_size": ( + self._obj.attrs.get("probe_spot_size_x", np.nan), + self._obj.attrs.get("probe_spot_size_y", np.nan), + ), + "probe_profile": self._obj.attrs.get("probe_profile"), + "probe_linewidth": self._obj.attrs.get("probe_linewidth", np.nan), + "probe_duration": self._obj.attrs.get("probe_duration", np.nan), + "probe_polarization": self.probe_polarization, + } + return probe_info + + @property + def laser_info(self) -> LightSourceInfo: return { - "x": self._obj.coords["long_x"] - self._obj.coords["physical_long_x"], - "y": self._obj.coords["long_y"] - self._obj.coords["physical_long_y"], - "z": self._obj.coords["long_z"] - self._obj.coords["physical_long_z"], + **self.probe_info, + **self.pump_info, + "repetition_rate": self._obj.attrs.get("repetition_rate", np.nan), } @property - def hv(self) -> float | xr.DataArray: - """Return the photon energy. + def analyzer_info(self) -> AnalyzerInfo: + """General information about the photoelectron analyzer used.""" + analyzer_info: AnalyzerInfo = { + "lens_mode": self._obj.attrs.get("lens_mode"), + "lens_mode_name": self._obj.attrs.get("lens_mode_name"), + "acquisition_mode": self._obj.attrs.get("acquisition_mode", None), + "pass_energy": self._obj.attrs.get("pass_energy", np.nan), + "slit_shape": self._obj.attrs.get("slit_shape", None), + "slit_width": self._obj.attrs.get("slit_width", np.nan), + "slit_number": self._obj.attrs.get("slit_number", np.nan), + "lens_table": self._obj.attrs.get("lens_table"), + "analyzer_type": self._obj.attrs.get("analyzer_type"), + "mcp_voltage": self._obj.attrs.get("mcp_voltage", np.nan), + "work_function": self._obj.attrs.get("workfunction", 4.401), + } + return analyzer_info - Returns: float | xr.DataArray - Photon energy in eV unit. (for hv_map type, xr.DataArray is returned.) - """ - assert isinstance(self._obj, xr.DataArray | xr.Dataset) - try: - return float(self._obj.coords["hv"]) - except TypeError: - return self._obj.coords["hv"] + @property + def daq_info(self) -> DAQInfo: + """General information about the acquisition settings for an ARPES experiment.""" + daq_info: DAQInfo = { + "daq_type": self._obj.attrs.get("daq_type"), + "region": self._obj.attrs.get("daq_region"), + "region_name": self._obj.attrs.get("daq_region_name"), + "center_energy": self._obj.attrs.get("daq_center_energy", np.nan), + "prebinning": self.prebinning, + "trapezoidal_correction_strategy": self._obj.attrs.get( + "trapezoidal_correction_strategy", + ), + "dither_settings": self._obj.attrs.get("dither_settings"), + "sweep_settings": self.sweep_settings, + "frames_per_slice": self._obj.attrs.get("frames_per_slice", np.nan), + "frame_duration": self._obj.attrs.get("frame_duration", np.nan), + } + return daq_info + + @property + def beamline_info(self) -> LightSourceInfo: + """Information about the beamline or light source used for a measurement.""" + beamline_info: LightSourceInfo = { + "hv": self.hv, + "linewidth": self._obj.attrs.get("probe_linewidth", np.nan), + "photon_polarization": self.probe_polarization, + "undulator_info": self.undulator_info, + "repetition_rate": self._obj.attrs.get("repetition_rate", np.nan), + "beam_current": self._obj.attrs.get("beam_current", np.nan), + "entrance_slit": self._obj.attrs.get("entrance_slit", None), + "exit_slit": self._obj.attrs.get("exit_slit", None), + "monochromator_info": self.monochromator_info, + } + return beamline_info + + @property + def sweep_settings(self) -> dict[str, xr.DataArray | NDArray[np.float_] | float | None]: + """For datasets acquired with swept acquisition settings, provides those settings.""" + return { + "high_energy": self._obj.attrs.get("sweep_high_energy"), + "low_energy": self._obj.attrs.get("sweep_low_energy"), + "n_sweeps": self._obj.attrs.get("n_sweeps"), + "step": self._obj.attrs.get("sweep_step"), + } + + @property + def prebinning(self) -> dict[str, Any]: + """Information about the prebinning performed during scan acquisition.""" + prebinning = {} + for d in self._obj.indexes: + if f"{d}_prebinning" in self._obj.attrs: + prebinning[d] = self._obj.attrs[f"{d}_prebinning"] + + return prebinning # type: ignore [return-value] # because RA don't know the format of FITS. + + @property + def monochromator_info(self) -> dict[str, float]: + """Details about the monochromator used on the UV/x-ray source.""" + return { + "grating_lines_per_mm": self._obj.attrs.get("grating_lines_per_mm", np.nan), + } + + @property + def undulator_info(self) -> dict[str, str | float | None]: + """Details about the undulator for data performed at an undulator source.""" + return { + "gap": self._obj.attrs.get("undulator_gap"), + "z": self._obj.attrs.get("undulator_z"), + "harmonic": self._obj.attrs.get("undulator_harmonic"), + "polarization": self._obj.attrs.get("undulator_polarization"), + "type": self._obj.attrs.get("undulator_type"), + } + + @property + def analyzer_detail(self) -> AnalyzerInfo: + """Details about the analyzer, its capabilities, and metadata.""" + return { + "analyzer_name": self._obj.attrs.get( + "analyzer_name", + self._obj.attrs.get("analyzer", ""), + ), + "parallel_deflectors": self._obj.attrs.get("parallel_deflectors", False), + "perpendicular_deflectors": self._obj.attrs.get("perpendicular_deflectors", False), + "analyzer_type": self._obj.attrs.get("analyzer_type", ""), + "analyzer_radius": self._obj.attrs.get("analyzer_radius", np.nan), + } @property def scan_type(self) -> str | None: @@ -418,274 +659,191 @@ def _dim_type_check( msg = "Dimension type may be incorrect" raise TypeError(msg) - @property - def is_differentiated(self) -> bool: - """Return True if the spectrum is differentiated data. - Returns: bool +class ARPESOffsetProperty(ARPESAngleProperty): + def __init__(self, xarray_obj: XrTypes) -> None: + self._obj = xarray_obj - ToDo: Test - """ - history = self.short_history() - return "dn_along_axis" in history or "curvature" in history + def symmetry_points( + self, + ) -> dict[HIGH_SYMMETRY_POINTS, dict[str, float]]: + """Return the dict object about symmetry point such as G-point in the ARPES data. - def transpose_to_front(self, dim: str) -> XrTypes: - """Transpose the dimensions (to front). + The original version was something complicated, but the coding seemed to be in + process and the purpose was unclear, so it was streamlined considerably. - Args: - dim: dimension to front - Returns: (XrTypes) - Transposed ARPES data + Returns (dict[HIGH_SYMMETRY_POINTS, dict[str, float]]): + Dict object representing the symmpetry points in the ARPES data. - ToDo: Test + Examples: + example of "symmetry_points": symmetry_points = {"G": {"phi": 0.405}} """ - dims = list(self._obj.dims) - assert dim in dims - dims.remove(dim) - return self._obj.transpose(*([dim, *dims])) + symmetry_points: dict[str, dict[str, float]] = {} + our_symmetry_points = self._obj.attrs.get("symmetry_points", {}) - def transpose_to_back(self, dim: str) -> XrTypes: - """Transpose the dimensions (to back). + symmetry_points.update(our_symmetry_points) - Args: - dim: dimension to back + return symmetry_points - Returns: (XrTypes) - Transposed ARPES data. + @property + def logical_offsets(self) -> dict[str, float | xr.DataArray]: + """Return logical offsets. - ToDo: Test + Returns: + dict object of long_* + physical_long_* (*: x, y, or z) """ - dims = list(self._obj.dims) - assert dim in dims - dims.remove(dim) - return self._obj.transpose(*([*dims, dim])) - - def select_around_data( - self, - points: dict[Hashable, xr.DataArray], - radius: dict[Hashable, float] | float | None = None, # radius={"phi": 0.005} - *, - mode: Literal["sum", "mean"] = "sum", - **kwargs: Incomplete, - ) -> xr.DataArray: - """Performs a binned selection around a point or points. + assert isinstance(self._obj, xr.DataArray | xr.Dataset) + if "long_x" not in self._obj.coords: + msg = "Logical offsets can currently only be accessed for hierarchical" + msg += " motor systems like nanoARPES." + raise ValueError( + msg, + ) + return { + "x": self._obj.coords["long_x"] - self._obj.coords["physical_long_x"], + "y": self._obj.coords["long_y"] - self._obj.coords["physical_long_y"], + "z": self._obj.coords["long_z"] - self._obj.coords["physical_long_z"], + } - Can be used to perform a selection along one axis as a function of another, integrating a - region in the other dimensions. + @property + def offsets(self) -> dict[str, float]: + return { + str(coord): self.lookup_offset(str(coord)) + for coord in self._obj.coords + if f"{coord}_offset" in self._obj.attrs + } - Example: - As an example, suppose we have a dataset with dimensions ('eV', 'kp', 'T',) - and we also by fitting determined the Fermi momentum as a function of T, kp_F('T'), - stored in the dataarray kFs. Then we could select momentum integrated EDCs in a small - window around the fermi momentum for each temperature by using + def lookup_offset_coord(self, name: str) -> xr.DataArray | float: + return self.lookup_coord(name) - self.lookup_offset(name) - >>> edcs = full_data.S.select_around_data({'kp': kFs}, radius={'kp': 0.04}, fast=True) + def lookup_coord(self, name: str) -> xr.DataArray | float: + if name in self._obj.coords: + return unwrap_xarray_item(self._obj.coords[name]) + self._obj.coords[name] = np.nan + return np.nan - The resulting data will be EDCs for each T, in a region of radius 0.04 inverse angstroms - around the Fermi momentum. + def lookup_offset(self, attr_name: str) -> float: + symmetry_points = self.symmetry_points() + assert isinstance(symmetry_points, dict) + if "G" in symmetry_points: + gamma_point = symmetry_points["G"] # {"phi": 0.405} (cut) + if attr_name in gamma_point: + return gamma_point[attr_name] - Args: - points: The set of points where the selection should be performed. - radius: The radius of the selection in each coordinate. If dimensions are omitted, a - standard sized selection will be made as a compromise. - mode: How the reduction should be performed, one of "sum" or "mean". Defaults to "sum" - kwargs: Can be used to pass radii parameters by keyword with `_r` postfix. + offset_name = attr_name + "_offset" + if offset_name in self._obj.attrs: + return self._obj.attrs[offset_name] - Returns: - The binned selection around the desired point or points. - """ - assert isinstance( - self._obj, - xr.DataArray, - ), "Cannot use select_around on Datasets only DataArrays!" + return self._obj.attrs.get("data_preparation", {}).get(offset_name, 0) - assert mode in {"sum", "mean"}, "mode parameter should be either sum or mean." - assert isinstance(points, dict | xr.Dataset) - radius = radius or {} - if isinstance(points, xr.Dataset): - points = {k: points[k].item() for k in points.data_vars} - assert isinstance(points, dict) - radius = self._radius(points, radius, **kwargs) - logger.debug(f"radius: {radius}") + @property + def beta_offset(self) -> float: + return self.lookup_offset("beta") - assert isinstance(radius, dict) - logger.debug(f"iter(points.values()): {iter(points.values())}") + @property + def psi_offset(self) -> float: + return self.lookup_offset("psi") - along_dims = next(iter(points.values())).dims - selected_dims = list(points.keys()) + @property + def theta_offset(self) -> float: + return self.lookup_offset("theta") - stride = self._obj.G.stride(generic_dim_names=False) + @property + def phi_offset(self) -> float: + return self.lookup_offset("phi") - new_dim_order = [d for d in self._obj.dims if d not in along_dims] + list(along_dims) + @property + def chi_offset(self) -> float: + return self.lookup_offset("chi") - data_for = self._obj.transpose(*new_dim_order) - new_data = data_for.sum(selected_dims, keep_attrs=True) - for coord, value in data_for.G.iterate_axis(along_dims): - nearest_sel_params = {} - # -- originally, if safe == True, the following liens starting from hear - for d, v in radius.items(): - if v < stride[d]: - nearest_sel_params[d] = points[d].sel(coord) + @property + def sample_angles( + self, + ) -> tuple[ + xr.DataArray | float, + xr.DataArray | float, + xr.DataArray | float, + xr.DataArray | float, + xr.DataArray | float, + xr.DataArray | float, + ]: + """Returns angle information. - radius = {d: v for d, v in radius.items() if d not in nearest_sel_params} - # -- to heari, but as name said, should be alwayws safe. + Returns: + ------- + tuple[xr.DataArray | float, ...] + beta, theta, chi, phi, psi, alpha + """ + return ( + # manipulator + self.lookup_coord("beta"), + self.lookup_coord("theta"), + self.lookup_coord("chi"), + # analyzer + self.lookup_coord("phi"), + self.lookup_coord("psi"), + self.lookup_coord("alpha"), + ) - selection_slices = { - d: slice( - points[d].sel(coord) - radius[d], - points[d].sel(coord) + radius[d], - ) - for d in points - if d in radius - } - selected = value.sel(selection_slices) + @property + def is_slit_vertical(self) -> bool: # TODO: [RA] Refactoring ? + """Infers whether the scan is taken on an analyzer with vertical slit. - if nearest_sel_params: - selected = selected.sel(nearest_sel_params, method="nearest") + Caveat emptor: this assumes that the alpha coordinate is not some intermediate value. - for d in nearest_sel_params: - # need to remove the extra dims from coords - del selected.coords[d] + Returns: + True if the alpha value is consistent with a vertical slit analyzer. False otherwise. + """ + angle_tolerance = 1.0 + if self.angle_unit.startswith("Deg") or self.angle_unit.startswith("deg"): + return float(np.abs(self.lookup_offset_coord("alpha") - 90.0)) < angle_tolerance + return float(np.abs(self.lookup_offset_coord("alpha") - np.pi / 2)) < float( + np.deg2rad( + angle_tolerance, + ), + ) - if mode == "sum": - new_data.loc[coord] = selected.sum(list(radius.keys())).values - elif mode == "mean": - new_data.loc[coord] = selected.mean(list(radius.keys())).values + @contextlib.contextmanager + def with_rotation_offset(self, offset: float) -> Generator: + """Temporarily rotates the chi_offset by `offset`. - return new_data + Args: + offset (float): offset value about chi. + """ + old_chi_offset = self.offsets.get("chi", 0) + self.apply_offsets({"chi": old_chi_offset + offset}) + yield old_chi_offset + offset + self.apply_offsets({"chi": old_chi_offset}) - def select_around( - self, - points: dict[Hashable, float] | xr.Dataset, - radius: dict[Hashable, float] | float, - *, - mode: Literal["sum", "mean"] = "sum", - **kwargs: float, - ) -> xr.DataArray: - """Selects and integrates a region around a one dimensional point. + def apply_offsets(self, offsets: dict[ANGLE, float]) -> None: + assert isinstance(self._obj, xr.Dataset | xr.DataArray) + for k, v in offsets.items(): + self._obj.attrs[f"{k}_offset"] = v - This method is useful to do a small region integration, especially around - points on a path of a k-point of interest. See also the companion method - `select_around_data`. - If the fast flag is set, we will use the Manhattan norm, i.e. sum over square regions - rather than ellipsoids, as this is less costly. +class ARPESProvenanceProperty(ARPESOffsetProperty): + def __init__(self, xarray_obj: XrTypes) -> None: + self._obj = xarray_obj - If radii are not set, or provided through kwargs as 'eV_r' or 'phi_r' for instance, - then we will try to use reasonable default values; buyer beware. + def short_history(self, key: str = "by") -> list: + """Return the short version of history. Args: - points: The points where the selection should be performed. - radius: The radius of the selection in each coordinate. If dimensions are omitted, a - standard sized selection will be made as a compromise. - safe: If true, infills radii with default values. Defaults to `True`. - mode: How the reduction should be performed, one of "sum" or "mean". Defaults to "sum" - **kwargs: Can be used to pass radii parameters by keyword with `_r` postfix. - - Returns: - The binned selection around the desired point or points. + key (str): [TODO:description] """ - assert isinstance( - self._obj, - xr.DataArray, - ), "Cannot use select_around on Datasets only DataArrays!" + return [h["record"][key] if isinstance(h, dict) else h for h in self.history] # type: ignore[literal-required] - assert mode in {"sum", "mean"}, "mode parameter should be either sum or mean." - assert isinstance(points, dict | xr.Dataset) - if isinstance(points, xr.Dataset): - points = {k: points[k].item() for k in points.data_vars} - logger.debug(f"points: {points}") - assert isinstance(points, dict) - radius = self._radius(points, radius, **kwargs) - logger.debug(f"radius: {radius}") - nearest_sel_params = {} + @property + def is_differentiated(self) -> bool: + """Return True if the spectrum is differentiated data. - # -- originally, if safe == True, the following liens starting from hear - stride = self._obj.G.stride(generic_dim_names=False) - for d, v in radius.items(): - if v < stride[d]: - nearest_sel_params[d] = points[d] + Returns: bool - radius = {d: v for d, v in radius.items() if d not in nearest_sel_params} - # -- to heari, but as name said, should be alwayws safe. - - selection_slices = { - d: slice(points[d] - radius[d], points[d] + radius[d]) for d in points if d in radius - } - selected = self._obj.sel(selection_slices) - - if nearest_sel_params: - selected = selected.sel(nearest_sel_params, method="nearest") - - for d in nearest_sel_params: - # need to remove the extra dims from coords - del selected.coords[d] - - if mode == "sum": - return selected.sum(list(radius.keys())) - return selected.mean(list(radius.keys())) - - @staticmethod - def _radius( - points: dict[Hashable, xr.DataArray] | dict[Hashable, float], - radius: float | dict[Hashable, float], - **kwargs: float, - ) -> dict[Hashable, float]: - """Helper function. Generate radius dict. - - When radius is dict form, nothing has been done, essentially. - - Args: - points (dict[Hashable, float]): Selection point - radius (dict[Hashable, float] | float | None): radius - kwargs (float): [TODO:description] - - Returns: dict[Hashable, float] - radius for selection. - """ - if isinstance(radius, float): - radius = {str(d): radius for d in points} - else: - collectted_terms = {f"{k}_r" for k in points}.intersection(set(kwargs.keys())) - if collectted_terms: - radius = { - d: kwargs.get(f"{d}_r", DEFAULT_RADII.get(str(d), UNSPESIFIED)) for d in points - } - elif radius is None: - radius = {d: DEFAULT_RADII.get(str(d), UNSPESIFIED) for d in points} - assert isinstance(radius, dict) - return {d: radius.get(str(d), DEFAULT_RADII.get(str(d), UNSPESIFIED)) for d in points} - - def short_history(self, key: str = "by") -> list: - """Return the short version of history. - - Args: - key (str): [TODO:description] - """ - return [h["record"][key] if isinstance(h, dict) else h for h in self.history] # type: ignore[literal-required] - - def symmetry_points( - self, - ) -> dict[HIGH_SYMMETRY_POINTS, dict[str, float]]: - """Return the dict object about symmetry point such as G-point in the ARPES data. - - The original version was something complicated, but the coding seemed to be in - process and the purpose was unclear, so it was streamlined considerably. - - - Returns (dict[HIGH_SYMMETRY_POINTS, dict[str, float]]): - Dict object representing the symmpetry points in the ARPES data. - - Examples: - example of "symmetry_points": symmetry_points = {"G": {"phi": 0.405}} - """ - symmetry_points: dict[str, dict[str, float]] = {} - our_symmetry_points = self._obj.attrs.get("symmetry_points", {}) - - symmetry_points.update(our_symmetry_points) - - return symmetry_points + ToDo: Test + """ + history = self.short_history() + return "dn_along_axis" in history or "curvature" in history @property def iter_own_symmetry_points(self) -> Iterator[tuple[HIGH_SYMMETRY_POINTS, dict[str, float]]]: @@ -728,1119 +886,1005 @@ def _unwrap_provenance(prov: Provenance | None) -> list[Provenance | None]: return _unwrap_provenance(provenance_recorded) + +class ARPESPropertyBase(ARPESInfoProperty, ARPESProvenanceProperty): + def __init__(self, xarray_obj: XrTypes) -> None: + self._obj = xarray_obj + @property - def scan_name(self) -> str: - """Return scan name. + def is_subtracted(self) -> bool: # TODO: [RA] xr.DataArray + """Infers whether a given data is subtracted. - Returns: (str) - If "scan" or "file" is set in attrs, return the file name. - If they are not set, return "id" if "id" is set. + Returns (bool): + Return True if the data is subtracted. """ - for option in ["scan", "file"]: - if option in self._obj.attrs: - return Path(self._obj.attrs[option]).name - - id_code = self._obj.attrs.get("id") + assert isinstance(self._obj, xr.DataArray) + if self._obj.attrs.get("subtracted"): + return True - return str(id_code) if id_code is not None else "No ID" + threshold_is_5_percent = 0.05 + return (((self._obj < 0) * 1).mean() > threshold_is_5_percent).item() @property - def label(self) -> str: - return str(self._obj.attrs.get("description", self.scan_name)) - - @contextlib.contextmanager - def with_rotation_offset(self, offset: float) -> Generator: - """Temporarily rotates the chi_offset by `offset`. + def is_spatial(self) -> bool: + """Infers whether a given scan has real-space dimensions (SPEM or u/nARPES). - Args: - offset (float): offset value about chi. + Returns: + True if the data is explicltly a "ucut" or "spem" or contains "X", "Y", or "Z" + dimensions. False otherwise. """ - old_chi_offset = self.offsets.get("chi", 0) - self.apply_offsets({"chi": old_chi_offset + offset}) - yield old_chi_offset + offset - self.apply_offsets({"chi": old_chi_offset}) + assert isinstance(self._obj, xr.DataArray | xr.Dataset) + if self.spectrum_type in {"ucut", "spem"}: + return True - def apply_offsets(self, offsets: dict[ANGLE, float]) -> None: - assert isinstance(self._obj, xr.Dataset | xr.DataArray) - for k, v in offsets.items(): - self._obj.attrs[f"{k}_offset"] = v + return any(d in {"X", "Y", "Z"} for d in self._obj.dims) @property - def offsets(self) -> dict[str, float]: - return { - str(coord): self.lookup_offset(str(coord)) - for coord in self._obj.coords - if f"{coord}_offset" in self._obj.attrs - } + def is_kspace(self) -> bool: + """Infers whether the scan is k-space converted or not. - def lookup_offset_coord(self, name: str) -> xr.DataArray | float: - return self.lookup_coord(name) - self.lookup_offset(name) + Because of the way this is defined, it will return + true for XPS spectra, which I suppose is true but trivially. - def lookup_coord(self, name: str) -> xr.DataArray | float: - if name in self._obj.coords: - return unwrap_xarray_item(self._obj.coords[name]) - self._obj.coords[name] = np.nan - return np.nan + Returns: + True if the data is k-space converted. False otherwise. + """ + assert isinstance(self._obj, xr.DataArray | xr.Dataset) + return not any(d in {"phi", "theta", "beta", "angle"} for d in self._obj.dims) - def lookup_offset(self, attr_name: str) -> float: - symmetry_points = self.symmetry_points() - assert isinstance(symmetry_points, dict) - if "G" in symmetry_points: - gamma_point = symmetry_points["G"] # {"phi": 0.405} (cut) - if attr_name in gamma_point: - return gamma_point[attr_name] + @property + def reference_settings(self) -> dict[str, Any]: + settings = self.spectrometer_settings or {} - offset_name = attr_name + "_offset" - if offset_name in self._obj.attrs: - return self._obj.attrs[offset_name] + settings.update( + { + "hv": self.hv, + }, + ) - return self._obj.attrs.get("data_preparation", {}).get(offset_name, 0) + return settings @property - def beta_offset(self) -> float: - return self.lookup_offset("beta") + def beamline_settings(self) -> BeamLineSettings: + settings: BeamLineSettings = {} + settings["entrance_slit"] = self._obj.attrs.get("entrance_slit", np.nan) + settings["exit_slit"] = self._obj.attrs.get("exit_slit", np.nan) + settings["hv"] = self._obj.attrs.get( + "exit_slit", + self._obj.attrs.get("photon_energy", np.nan), + ) + settings["grating"] = self._obj.attrs.get("grating", None) - @property - def psi_offset(self) -> float: - return self.lookup_offset("psi") + return settings @property - def theta_offset(self) -> float: - return self.lookup_offset("theta") + def spectrometer_settings(self) -> dict[str, Any]: + find_keys = { + "lens_mode": { + "lens_mode", + }, + "pass_energy": { + "pass_energy", + }, + "scan_mode": { + "scan_mode", + }, + "scan_region": { + "scan_region", + }, + "slit": { + "slit", + "slit_plate", + }, + } + settings = {} + for key, options in find_keys.items(): + for option in options: + if option in self._obj.attrs: + settings[key] = self._obj.attrs[option] + break - @property - def phi_offset(self) -> float: - return self.lookup_offset("phi") + if isinstance(settings.get("slit"), float): + settings["slit"] = int(round(settings["slit"])) - @property - def chi_offset(self) -> float: - return self.lookup_offset("chi") + return settings @property - def work_function(self) -> float: - """Provides the work function of the sample, if present in metadata. + def full_coords( + self, + ) -> xr.Coordinates: + """Return the coordinate. - Otherwise, uses something approximate. + Returns: xr.Coordinates + Coordinates data. - Note: - This "work_function" should *NOT* be used for k-conversion! """ - assert isinstance(self._obj, xr.DataArray | xr.Dataset) - if "sample_workfunction" in self._obj.attrs: - return self._obj.attrs["sample_workfunction"] - return 4.3 + full_coords: xr.Coordinates - @property - def analyzer_work_function(self) -> float: - """Provides the work function of the analyzer, if present in metadata. + full_coords = xr.Coordinates(dict(zip(["x", "y", "z"], self.sample_pos, strict=True))) + full_coords.update( + dict( + zip( + ["beta", "theta", "chi", "phi", "psi", "alpha"], + self.sample_angles, + strict=True, + ), + ), + ) + full_coords.update( + { + "hv": self.hv, + }, + ) + full_coords.update(self._obj.coords) + return full_coords - otherwise, use appropriate - Note: - Use this value for k-conversion. - """ - assert isinstance(self._obj, xr.DataArray | xr.Dataset) - if "workfunction" in self._obj.attrs: - return self._obj.attrs["workfunction"] - return 4.401 +class ARPESProperty(ARPESPropertyBase): + def __init__(self, xarray_obj: XrTypes) -> None: + self._obj = xarray_obj - @property - def inner_potential(self) -> float: - """Provides the inner potential, if present in metadata. + @staticmethod + def dict_to_html(d: Mapping[str, float | str]) -> str: + return """ + + + + + + + + + {rows} + +
KeyValue
+ """.format( + rows="".join([f"{k}{v}" for k, v in d.items()]), + ) - Otherwise, 10 eV is assumed. - """ - assert isinstance(self._obj, xr.DataArray | xr.Dataset) - if "inner_potential" in self._obj.attrs: - return self._obj.attrs["inner_potential"] - return 10 + @staticmethod + def _repr_html_full_coords( + coords: xr.Coordinates, + ) -> str: + significant_coords = {} + for k, v in coords.items(): + if v is None: + continue + if np.any(np.isnan(v)): + continue + significant_coords[k] = v - def find_spectrum_energy_edges( - self, - *, - indices: bool = False, - ) -> NDArray[np.float_]: # TODO: xr.DataArray - """Return energy position corresponding to the (1D) spectrum edge. + def coordinate_dataarray_to_flat_rep(value: xr.DataArray) -> str | float: + if not isinstance(value, xr.DataArray | DataArrayCoordinates | DatasetCoordinates): + return value + if len(value.dims) == 0: + tmp = "{var:.5g}" + return tmp.format(var=value.values) + tmp = "{min:.3g} to {max:.3g}" + tmp += " by {delta:.3g}" + return tmp.format( + min=value.min().item(), + max=value.max().item(), + delta=value.values[1] - value.values[0], + ) - Spectrum edge is infection point of the peak. + return ARPESProperty.dict_to_html( + {str(k): coordinate_dataarray_to_flat_rep(v) for k, v in significant_coords.items()}, + ) + + def _repr_html_spectrometer_info(self) -> str: + ordered_settings = OrderedDict(self.spectrometer_settings) + + return ARPESProperty.dict_to_html(ordered_settings) + + @staticmethod + def _repr_html_experimental_conditions(conditions: ExperimentInfo) -> str: + """Return the experimental conditions with html format. Args: - indices (bool): if True, return the pixel (index) number. + conditions (ExperimentInfo): self.confitions is usually used. - Returns: NDArray - Energy position + Returns (str): + html representation of the experimental conditions. """ - assert isinstance( - self._obj, - xr.DataArray, - ) # if self._obj is xr.Dataset, values is function - energy_marginal = self._obj.sum([d for d in self._obj.dims if d != "eV"]) - embed_size = 20 - embedded: NDArray[np.float_] = np.ndarray(shape=[embed_size, energy_marginal.sizes["eV"]]) - embedded[:] = energy_marginal.values - embedded = ndi.gaussian_filter(embedded, embed_size / 3) + def _experimentalinfo_to_dict(conditions: ExperimentInfo) -> dict[str, str]: + transformed_dict = {} + for k, v in conditions.items(): + if k == "polarrization": + assert isinstance(v, (float | str)) + transformed_dict[k] = { + "p": "Linear Horizontal", + "s": "Linear Vertical", + "rc": "Right Circular", + "lc": "Left Circular", + "s-p": "Linear Dichroism", + "p-s": "Linear Dichroism", + "rc-lc": "Circular Dichroism", + "lc-rc": "Circular Dichroism", + }.get(str(v), str(v)) + if k == "temp": + if isinstance(v, float) and not np.isnan(v): + transformed_dict[k] = f"{v} Kelvin" + elif isinstance(v, str): + transformed_dict[k] = v + if k == "hv": + if isinstance(v, xr.DataArray): + min_hv = float(v.min()) + max_hv = float(v.max()) + transformed_dict[k] = ( + f" from {min_hv} to {max_hv} eV" + ) + elif isinstance(v, float) and not np.isnan(v): + transformed_dict[k] = f"{v} eV" + return transformed_dict - from skimage import feature + transformed_dict = _experimentalinfo_to_dict(conditions) + return ARPESProperty.dict_to_html(transformed_dict) - edges = feature.canny( - embedded, - sigma=embed_size / 5, - use_quantiles=True, - low_threshold=0.1, - ) - edges = np.where(edges[int(embed_size / 2)] == 1)[0] - if indices: - return edges + def _repr_html_(self) -> str: + """Return html representation of ARPES data. - delta = self._obj.G.stride(generic_dim_names=False) - return edges * delta["eV"] + self._obj.coords["eV"].values[0] + Returns: + html representation. + """ + skip_data_vars = { + "time", + } - def find_spectrum_angular_edges_full( - self, - *, - indices: bool = False, - energy_division: float = 0.05, - ) -> tuple[NDArray[np.float_], NDArray[np.float_], xr.DataArray]: - # as a first pass, we need to find the bottom of the spectrum, we will use this - # to select the active region and then to rebin into course steps in energy from 0 - # down to this region - # we will then find the appropriate edge for each slice, and do a fit to the edge locations - energy_edge: NDArray[np.float_] = self.find_spectrum_energy_edges() - low_edge = np.min(energy_edge) + energy_division - high_edge = np.max(energy_edge) - energy_division + if isinstance(self._obj, xr.Dataset): + to_plot = [str(k) for k in self._obj.data_vars if k not in skip_data_vars] + to_plot = [str(k) for k in to_plot if 1 <= len(self._obj[k].dims) < 3] # noqa: PLR2004 + to_plot = to_plot[:5] - if high_edge - low_edge < 3 * energy_division: - # Doesn't look like the automatic inference of the energy edge was valid - high_edge = self._obj.coords["eV"].max().item() - low_edge = self._obj.coords["eV"].min().item() + if to_plot: + _, ax = plt.subplots( + 1, + len(to_plot), + figsize=(len(to_plot) * 3, 3), + ) + if len(to_plot) == 1: + ax = [ax] - angular_dim = "pixel" if "pixel" in self._obj.dims else "phi" - energy_cut = self._obj.sel(eV=slice(low_edge, high_edge)).S.sum_other(["eV", angular_dim]) + for i, plot_var in enumerate(to_plot): + self._obj[plot_var].T.plot(ax=ax[i]) + fancy_labels(ax[i]) + ax[i].set_title(plot_var.replace("_", " ")) - n_cuts = int(np.ceil((high_edge - low_edge) / energy_division)) - new_shape = {"eV": n_cuts} - new_shape[angular_dim] = energy_cut.sizes[angular_dim] - logger.debug(f"new_shape: {new_shape}") - rebinned = rebin(energy_cut, shape=new_shape) + remove_colorbars() - embed_size = 20 - embedded: NDArray[np.float_] = np.empty( - shape=[embed_size, rebinned.sizes[angular_dim]], - ) - low_edges = [] - high_edges = [] - for e_cut_index in range(rebinned.sizes["eV"]): - e_slice = rebinned.isel(eV=e_cut_index) - embedded[:] = e_slice.values - embedded = ndi.gaussian_filter(embedded, embed_size / 1.5) # < = Why 1.5 + elif 1 <= len(self._obj.dims) < 3: # noqa: PLR2004 + _, ax = plt.subplots(1, 1, figsize=(4, 3)) + self._obj.T.plot(ax=ax) + fancy_labels(ax, data=self._obj) + ax.set_title("") - from skimage import feature + remove_colorbars() + wrapper_style = 'style="display: flex; flex-direction: row;"' - edges = feature.canny( - embedded, - sigma=4, - use_quantiles=False, - low_threshold=0.7, - high_threshold=1.5, - ) - edges = np.where(edges[int(embed_size / 2)] == 1)[0] - low_edges.append(np.min(edges)) - high_edges.append(np.max(edges)) + if "id" in self._obj.attrs: + name = "ID: " + str(self._obj.attrs["id"])[:9] + "..." + else: + name = "No name" - if indices: - return np.array(low_edges), np.array(high_edges), rebinned.coords["eV"] + warning = "" - delta = self._obj.G.stride(generic_dim_names=False) + if len(self._obj.attrs) < 10: # noqa: PLR2004 + warning = ': Few Attributes, Data Is Summed?' - return ( - np.array(low_edges) * delta[angular_dim] + rebinned.coords[angular_dim].values[0], - np.array(high_edges) * delta[angular_dim] + rebinned.coords[angular_dim].values[0], - rebinned.coords["eV"], - ) + return f""" +
{name}{warning}
+
+
+ Experimental Conditions + {self._repr_html_experimental_conditions(self.experimental_conditions)} +
+
+ Full Coordinates + {self._repr_html_full_coords(self.full_coords)} +
+
+ Spectrometer + {self._repr_html_spectrometer_info()} +
+
+ """ - def zero_spectrometer_edges( - self, - cut_margin: int = 0, - interp_range: float | None = None, - low: Sequence[float] | NDArray[np.float_] | None = None, - high: Sequence[float] | NDArray[np.float_] | None = None, - ) -> xr.DataArray: # TODO: [RA] xr.DataArray - assert isinstance(self._obj, xr.DataArray) - if low is not None: - assert high is not None - assert len(low) == len(high) == TWO_DIMENSION - low_edges = low - high_edges = high +class ARPESAccessorBase(ARPESProperty): + """Base class for the xarray extensions in PyARPES.""" - ( - low_edges, - high_edges, - rebinned_eV_coord, - ) = self.find_spectrum_angular_edges_full(indices=True) - - angular_dim = "pixel" if "pixel" in self._obj.dims else "phi" - if not cut_margin: - if "pixel" in self._obj.dims: - cut_margin = 50 - else: - cut_margin = int(0.08 / self._obj.G.stride(generic_dim_names=False)[angular_dim]) - elif isinstance(cut_margin, float): - assert angular_dim == "phi" - cut_margin = int( - cut_margin / self._obj.G.stride(generic_dim_names=False)[angular_dim], - ) + def __init__(self, xarray_obj: XrTypes) -> None: + self._obj = xarray_obj - if interp_range is not None: - low_edge = xr.DataArray(low_edges, coords={"eV": rebinned_eV_coord}, dims=["eV"]) - high_edge = xr.DataArray(high_edges, coords={"eV": rebinned_eV_coord}, dims=["eV"]) - low_edge = low_edge.sel(eV=interp_range) - high_edge = high_edge.sel(eV=interp_range) - other_dims = list(self._obj.dims) - other_dims.remove("eV") - other_dims.remove(angular_dim) - copied = self._obj.copy(deep=True).transpose(*(["eV", angular_dim, *other_dims])) + class _SliceAlongPathKwags(TypedDict, total=False): + axis_name: str + resolution: float + n_points: int + extend_to_edge: bool - low_edges += cut_margin - high_edges -= cut_margin + def along( + self, + directions: list[Hashable | dict[Hashable, float]], + **kwargs: Unpack[_SliceAlongPathKwags], + ) -> xr.Dataset: # TODO: [RA] xr.DataArray + """[TODO:summary]. - for i, energy in enumerate(copied.coords["eV"].values): - index = np.searchsorted(rebinned_eV_coord, energy) - other = index + 1 - if other >= len(rebinned_eV_coord): - other = len(rebinned_eV_coord) - 1 - index = len(rebinned_eV_coord) - 2 + Args: + directions (list[Hashable] | dict[Hashable, float]): Direction to slice. + kwargs: axis_name, resolution, n_points, extend_to_edge_shift_gamma - low_index = int(np.interp(energy, rebinned_eV_coord, low_edges)) - high_index = int(np.interp(energy, rebinned_eV_coord, high_edges)) - copied.values[i, 0:low_index] = 0 - copied.values[i, high_index:-1] = 0 + Returns: + xr.Dataset + """ + assert isinstance(self._obj, xr.DataArray) + return slice_along_path(self._obj, interpolation_points=directions, **kwargs) - return copied + def find(self, name: str) -> list[str]: + """Return the property names containing the "name". - def sum_other( - self, - dim_or_dims: list[str], - *, - keep_attrs: bool = False, - ) -> XrTypes: - assert isinstance(dim_or_dims, list) + Args: + name (str): string to find. - return self._obj.sum( - [d for d in self._obj.dims if d not in dim_or_dims], - keep_attrs=keep_attrs, - ) + Returns: list[str] + Property list + """ + return [n for n in dir(self) if name in n] - def mean_other( + def with_values( self, - dim_or_dims: list[str] | str, + new_values: NDArray[np.float_], *, - keep_attrs: bool = False, - ) -> XrTypes: - assert isinstance(dim_or_dims, list) + with_attrs: bool = True, + ) -> xr.DataArray: # TODO: [RA] xr.DataArray + """Copy with new array values. - return self._obj.mean( - [d for d in self._obj.dims if d not in dim_or_dims], - keep_attrs=keep_attrs, - ) + Easy way of creating a DataArray that has the same shape as the calling object but data + populated from the array `new_values`. - def find_spectrum_angular_edges( - self, - *, - angle_name: str = "phi", - indices: bool = False, - ) -> NDArray[np.float_] | NDArray[np.int_]: # TODO: [RA] xr.DataArray - """Return angle position corresponding to the (1D) spectrum edge. + Notes: This method is applicable only for xr.DataArray. (Not xr.Dataset) Args: - angle_name (str): angle name to find the edge - indices (bool): if True, return the index not the angle value. + new_values: The new values which should be used for the data. + with_attrs (bool): If True, attributes are also copied. - Returns: NDArray - Angle position + Returns: + A copy of the data with new values but identical dimensions, coordinates, and attrs. + + ToDo: Test """ - angular_dim = "pixel" if "pixel" in self._obj.dims else angle_name assert isinstance(self._obj, xr.DataArray) - phi_marginal = self._obj.sum( - [d for d in self._obj.dims if d != angular_dim], + if with_attrs: + return xr.DataArray( + new_values.reshape(self._obj.values.shape), + coords=self._obj.coords, + dims=self._obj.dims, + attrs=self._obj.attrs, + ) + return xr.DataArray( + new_values.reshape(self._obj.values.shape), + coords=self._obj.coords, + dims=self._obj.dims, ) - embed_size = 20 - embedded: NDArray[np.float_] = np.ndarray( - shape=[embed_size, phi_marginal.sizes[angular_dim]], - ) - embedded[:] = phi_marginal.values - embedded = ndi.gaussian_filter(embedded, embed_size / 3) + def transpose_to_front(self, dim: str) -> XrTypes: + """Transpose the dimensions (to front). - # try to avoid dependency conflict with numpy v0.16 - from skimage import feature # pylint: disable=import-error + Args: + dim: dimension to front - edges = feature.canny( - embedded, - sigma=embed_size / 5, - use_quantiles=True, - low_threshold=0.2, - ) - edges = np.where(edges[int(embed_size / 2)] == 1)[0] - if indices: - return edges + Returns: (XrTypes) + Transposed ARPES data - delta = self._obj.G.stride(generic_dim_names=False) - return edges * delta[angular_dim] + self._obj.coords[angular_dim].values[0] + ToDo: Test + """ + dims = list(self._obj.dims) + assert dim in dims + dims.remove(dim) + return self._obj.transpose(*([dim, *dims])) - def wide_angle_selector(self, *, include_margin: bool = True) -> slice: - edges = self.find_spectrum_angular_edges() - low_edge, high_edge = np.min(edges), np.max(edges) + def transpose_to_back(self, dim: str) -> XrTypes: + """Transpose the dimensions (to back). - # go and build in a small margin - if include_margin: - if "pixels" in self._obj.dims: - low_edge += 50 - high_edge -= 50 - else: - low_edge += 0.05 - high_edge -= 0.05 + Args: + dim: dimension to back - return slice(low_edge, high_edge) + Returns: (XrTypes) + Transposed ARPES data. - def meso_effective_selector(self) -> slice: - energy_edge = self.find_spectrum_energy_edges() - return slice(np.max(energy_edge) - 0.3, np.max(energy_edge) - 0.1) + ToDo: Test + """ + dims = list(self._obj.dims) + assert dim in dims + dims.remove(dim) + return self._obj.transpose(*([*dims, dim])) - def region_sel( + def select_around_data( self, - *regions: Literal["copper_prior", "wide_angular", "narrow_angular"] - | dict[str, DesignatedRegions], - ) -> XrTypes: - def process_region_selector( - selector: slice | DesignatedRegions, - dimension_name: str, - ) -> slice | Callable[..., slice]: - if isinstance(selector, slice): - return selector - - options = { - "eV": ( - DesignatedRegions.ABOVE_EF, - DesignatedRegions.BELOW_EF, - DesignatedRegions.EF_NARROW, - DesignatedRegions.MESO_EF, - DesignatedRegions.MESO_EFFECTIVE_EF, - DesignatedRegions.ABOVE_EFFECTIVE_EF, - DesignatedRegions.BELOW_EFFECTIVE_EF, - DesignatedRegions.EFFECTIVE_EF_NARROW, - ), - "phi": ( - DesignatedRegions.NARROW_ANGLE, - DesignatedRegions.WIDE_ANGLE, - DesignatedRegions.TRIM_EMPTY, - ), - } - - options_for_dim = options.get(dimension_name, list(DesignatedRegions)) - assert selector in options_for_dim + points: dict[Hashable, xr.DataArray], + radius: dict[Hashable, float] | float | None = None, # radius={"phi": 0.005} + *, + mode: Literal["sum", "mean"] = "sum", + **kwargs: Incomplete, + ) -> xr.DataArray: + """Performs a binned selection around a point or points. - # now we need to resolve out the region - resolution_methods = { - DesignatedRegions.ABOVE_EF: slice(0, None), - DesignatedRegions.BELOW_EF: slice(None, 0), - DesignatedRegions.EF_NARROW: slice(-0.1, 0.1), - DesignatedRegions.MESO_EF: slice(-0.3, -0.1), - DesignatedRegions.MESO_EFFECTIVE_EF: self.meso_effective_selector, - # Implement me - # DesignatedRegions.TRIM_EMPTY: , - DesignatedRegions.WIDE_ANGLE: self.wide_angle_selector, - # DesignatedRegions.NARROW_ANGLE: self.narrow_angle_selector, - } - resolution_method = resolution_methods[selector] - if isinstance(resolution_method, slice): - return resolution_method - if callable(resolution_method): - return resolution_method() + Can be used to perform a selection along one axis as a function of another, integrating a + region in the other dimensions. - msg = "Unable to determine resolution method." - raise NotImplementedError(msg) + Example: + As an example, suppose we have a dataset with dimensions ('eV', 'kp', 'T',) + and we also by fitting determined the Fermi momentum as a function of T, kp_F('T'), + stored in the dataarray kFs. Then we could select momentum integrated EDCs in a small + window around the fermi momentum for each temperature by using - obj = self._obj + >>> edcs = full_data.S.select_around_data({'kp': kFs}, radius={'kp': 0.04}, fast=True) - def unpack_dim(dim_name: str) -> str: - if dim_name == "angular": - return "pixel" if "pixel" in obj.dims else "phi" + The resulting data will be EDCs for each T, in a region of radius 0.04 inverse angstroms + around the Fermi momentum. - return dim_name + Args: + points: The set of points where the selection should be performed. + radius: The radius of the selection in each coordinate. If dimensions are omitted, a + standard sized selection will be made as a compromise. + mode: How the reduction should be performed, one of "sum" or "mean". Defaults to "sum" + kwargs: Can be used to pass radii parameters by keyword with `_r` postfix. - for region in regions: - # remove missing dimensions from selection for permissiveness - # and to transparent composing of regions - obj = obj.sel( - { - k: process_region_selector(v, k) - for k, v in { - unpack_dim(k): v for k, v in normalize_region(region).items() - }.items() - if k in obj.dims - }, - ) - - return obj + Returns: + The binned selection around the desired point or points. + """ + assert isinstance( + self._obj, + xr.DataArray, + ), "Cannot use select_around on Datasets only DataArrays!" - def fat_sel( - self, - widths: dict[str, Any] | None = None, - **kwargs: Incomplete, - ) -> XrTypes: - """Allows integrating a selection over a small region. + assert mode in {"sum", "mean"}, "mode parameter should be either sum or mean." + assert isinstance(points, dict | xr.Dataset) + radius = radius or {} + if isinstance(points, xr.Dataset): + points = {k: points[k].item() for k in points.data_vars} + assert isinstance(points, dict) + radius = self._radius(points, radius, **kwargs) + logger.debug(f"radius: {radius}") - The produced dataset will be normalized by dividing by the number - of slices integrated over. + assert isinstance(radius, dict) + logger.debug(f"iter(points.values()): {iter(points.values())}") - This can be used to produce temporary datasets that have reduced - uncorrelated noise. + along_dims = next(iter(points.values())).dims + selected_dims = list(points.keys()) - Args: - widths: Override the widths for the slices. Reasonable defaults are used otherwise. - Defaults to None. - kwargs: slice dict. Has the same function as xarray.DataArray.sel + stride = self._obj.G.stride(generic_dim_names=False) - Returns: - The data after selection. - """ - if widths is None: - widths = {} - assert isinstance(widths, dict) - default_widths = { - "eV": 0.05, - "phi": 2, - "beta": 2, - "theta": 2, - "kx": 0.02, - "ky": 0.02, - "kp": 0.02, - "kz": 0.1, - } + new_dim_order = [d for d in self._obj.dims if d not in along_dims] + list(along_dims) - extra_kwargs = {k: v for k, v in kwargs.items() if k not in self._obj.dims} - slice_kwargs = {k: v for k, v in kwargs.items() if k not in extra_kwargs} - slice_widths = { - k: widths.get(k, extra_kwargs.get(k + "_width", default_widths.get(k))) - for k in slice_kwargs - } - slices = { - k: slice(v - slice_widths[k] / 2, v + slice_widths[k] / 2) - for k, v in slice_kwargs.items() - } + data_for = self._obj.transpose(*new_dim_order) + new_data = data_for.sum(selected_dims, keep_attrs=True) + for coord, value in data_for.G.iterate_axis(along_dims): + nearest_sel_params = {} + # -- originally, if safe == True, the following liens starting from hear + for d, v in radius.items(): + if v < stride[d]: + nearest_sel_params[d] = points[d].sel(coord) - sliced = self._obj.sel(slices) # Need check. "**" should not be required. - thickness = np.prod([len(sliced.coords[k]) for k in slice_kwargs]) - normalized = sliced.sum(slices.keys(), keep_attrs=True, min_count=1) / thickness - for k, v in slices.items(): - normalized.coords[k] = (v.start + v.stop) / 2 - normalized.attrs.update(self._obj.attrs.copy()) - return normalized + radius = {d: v for d, v in radius.items() if d not in nearest_sel_params} + # -- to heari, but as name said, should be alwayws safe. - @property - def reference_settings(self) -> dict[str, Any]: - settings = self.spectrometer_settings or {} + selection_slices = { + d: slice( + points[d].sel(coord) - radius[d], + points[d].sel(coord) + radius[d], + ) + for d in points + if d in radius + } + selected = value.sel(selection_slices) - settings.update( - { - "hv": self.hv, - }, - ) + if nearest_sel_params: + selected = selected.sel(nearest_sel_params, method="nearest") - return settings + for d in nearest_sel_params: + # need to remove the extra dims from coords + del selected.coords[d] - @property - def beamline_settings(self) -> BeamLineSettings: - settings: BeamLineSettings = {} - settings["entrance_slit"] = self._obj.attrs.get("entrance_slit", np.nan) - settings["exit_slit"] = self._obj.attrs.get("exit_slit", np.nan) - settings["hv"] = self._obj.attrs.get( - "exit_slit", - self._obj.attrs.get("photon_energy", np.nan), - ) - settings["grating"] = self._obj.attrs.get("grating", None) + if mode == "sum": + new_data.loc[coord] = selected.sum(list(radius.keys())).values + elif mode == "mean": + new_data.loc[coord] = selected.mean(list(radius.keys())).values - return settings + return new_data - @property - def spectrometer_settings(self) -> dict[str, Any]: - find_keys = { - "lens_mode": { - "lens_mode", - }, - "pass_energy": { - "pass_energy", - }, - "scan_mode": { - "scan_mode", - }, - "scan_region": { - "scan_region", - }, - "slit": { - "slit", - "slit_plate", - }, - } - settings = {} - for key, options in find_keys.items(): - for option in options: - if option in self._obj.attrs: - settings[key] = self._obj.attrs[option] - break + def select_around( + self, + points: dict[Hashable, float] | xr.Dataset, + radius: dict[Hashable, float] | float, + *, + mode: Literal["sum", "mean"] = "sum", + **kwargs: float, + ) -> xr.DataArray: + """Selects and integrates a region around a one dimensional point. - if isinstance(settings.get("slit"), float): - settings["slit"] = int(round(settings["slit"])) + This method is useful to do a small region integration, especially around + points on a path of a k-point of interest. See also the companion method + `select_around_data`. - return settings + If the fast flag is set, we will use the Manhattan norm, i.e. sum over square regions + rather than ellipsoids, as this is less costly. - @property - def sample_pos(self) -> tuple[float, float, float]: - return ( - float(self._obj.attrs["x"]), - float(self._obj.attrs["y"]), - float(self._obj.attrs["z"]), - ) + If radii are not set, or provided through kwargs as 'eV_r' or 'phi_r' for instance, + then we will try to use reasonable default values; buyer beware. - @property - def sample_angles( - self, - ) -> tuple[ - xr.DataArray | float, - xr.DataArray | float, - xr.DataArray | float, - xr.DataArray | float, - xr.DataArray | float, - xr.DataArray | float, - ]: - """Returns angle information. + Args: + points: The points where the selection should be performed. + radius: The radius of the selection in each coordinate. If dimensions are omitted, a + standard sized selection will be made as a compromise. + safe: If true, infills radii with default values. Defaults to `True`. + mode: How the reduction should be performed, one of "sum" or "mean". Defaults to "sum" + **kwargs: Can be used to pass radii parameters by keyword with `_r` postfix. Returns: - ------- - tuple[xr.DataArray | float, ...] - beta, theta, chi, phi, psi, alpha + The binned selection around the desired point or points. """ - return ( - # manipulator - self.lookup_coord("beta"), - self.lookup_coord("theta"), - self.lookup_coord("chi"), - # analyzer - self.lookup_coord("phi"), - self.lookup_coord("psi"), - self.lookup_coord("alpha"), - ) + assert isinstance( + self._obj, + xr.DataArray, + ), "Cannot use select_around on Datasets only DataArrays!" - @property - def full_coords( - self, - ) -> xr.Coordinates: - """Return the coordinate. + assert mode in {"sum", "mean"}, "mode parameter should be either sum or mean." + assert isinstance(points, dict | xr.Dataset) + if isinstance(points, xr.Dataset): + points = {k: points[k].item() for k in points.data_vars} + logger.debug(f"points: {points}") + assert isinstance(points, dict) + radius = self._radius(points, radius, **kwargs) + logger.debug(f"radius: {radius}") + nearest_sel_params = {} - Returns: xr.Coordinates - Coordinates data. + # -- originally, if safe == True, the following liens starting from hear + stride = self._obj.G.stride(generic_dim_names=False) + for d, v in radius.items(): + if v < stride[d]: + nearest_sel_params[d] = points[d] - """ - full_coords: xr.Coordinates + radius = {d: v for d, v in radius.items() if d not in nearest_sel_params} + # -- to heari, but as name said, should be alwayws safe. - full_coords = xr.Coordinates(dict(zip(["x", "y", "z"], self.sample_pos, strict=True))) - full_coords.update( - dict( - zip( - ["beta", "theta", "chi", "phi", "psi", "alpha"], - self.sample_angles, - strict=True, - ), - ), - ) - full_coords.update( - { - "hv": self.hv, - }, - ) - full_coords.update(self._obj.coords) - return full_coords + selection_slices = { + d: slice(points[d] - radius[d], points[d] + radius[d]) for d in points if d in radius + } + selected = self._obj.sel(selection_slices) - @property - def sample_info(self) -> SampleInfo: - """Return sample info property. + if nearest_sel_params: + selected = selected.sel(nearest_sel_params, method="nearest") - Returns (dict): - """ - sample_info: SampleInfo = { - "id": self._obj.attrs.get("sample_id"), - "sample_name": self._obj.attrs.get("sample_name"), - "source": self._obj.attrs.get("sample_source"), - "reflectivity": self._obj.attrs.get("sample_reflectivity", np.nan), - } - return sample_info + for d in nearest_sel_params: + # need to remove the extra dims from coords + del selected.coords[d] - @property - def scan_info(self) -> ScanInfo: - scan_info: ScanInfo = { - "time": self._obj.attrs.get("time", None), - "date": self._obj.attrs.get("date", None), - "type": self.scan_type, - "spectrum_type": self.spectrum_type, - "experimenter": self._obj.attrs.get("experimenter"), - "sample": self._obj.attrs.get("sample_name"), - } - return scan_info + if mode == "sum": + return selected.sum(list(radius.keys())) + return selected.mean(list(radius.keys())) - @property - def experiment_info(self) -> ExperimentInfo: - """Return experiment info property.""" - experiment_info: ExperimentInfo = { - "temperature": self.temp, - "temperature_cryotip": self._obj.attrs.get("temperature_cryotip", np.nan), - "pressure": self._obj.attrs.get("pressure", np.nan), - "polarization": self.probe_polarization, - "photon_flux": self._obj.attrs.get("photon_flux", np.nan), - "photocurrent": self._obj.attrs.get("photocurrent", np.nan), - "probe": self._obj.attrs.get("probe"), - "probe_detail": self._obj.attrs.get("probe_detail"), - "analyzer_detail": self.analyzer_detail, - } - return experiment_info + @staticmethod + def _radius( + points: dict[Hashable, xr.DataArray] | dict[Hashable, float], + radius: float | dict[Hashable, float], + **kwargs: float, + ) -> dict[Hashable, float]: + """Helper function. Generate radius dict. - @property - def pump_info(self) -> LightSourceInfo: - """Return pump info property.""" - pump_info: LightSourceInfo = { - "pump_wavelength": self._obj.attrs.get("pump_wavelength", np.nan), - "pump_energy": self._obj.attrs.get("pump_energy", np.nan), - "pump_fluence": self._obj.attrs.get("pump_fluence", np.nan), - "pump_pulse_energy": self._obj.attrs.get("pump_pulse_energy", np.nan), - "pump_spot_size": ( - self._obj.attrs.get("pump_spot_size_x", np.nan), - self._obj.attrs.get("pump_spot_size_y", np.nan), - ), - "pump_profile": self._obj.attrs.get("pump_profile"), - "pump_linewidth": self._obj.attrs.get("pump_linewidth", np.nan), - "pump_duration": self._obj.attrs.get("pump_duration", np.nan), - "pump_polarization": self.pump_polarization, - } - return pump_info + When radius is dict form, nothing has been done, essentially. - @property - def probe_info(self) -> LightSourceInfo: - """Return probe info property. + Args: + points (dict[Hashable, float]): Selection point + radius (dict[Hashable, float] | float | None): radius + kwargs (float): [TODO:description] - Returns (LIGHTSOURCEINFO): + Returns: dict[Hashable, float] + radius for selection. """ - probe_info: LightSourceInfo = { - "probe_wavelength": self._obj.attrs.get("probe_wavelength", np.nan), - "probe_energy": self.hv, - "probe_fluence": self._obj.attrs.get("probe_fluence", np.nan), - "probe_pulse_energy": self._obj.attrs.get("probe_pulse_energy", np.nan), - "probe_spot_size": ( - self._obj.attrs.get("probe_spot_size_x", np.nan), - self._obj.attrs.get("probe_spot_size_y", np.nan), - ), - "probe_profile": self._obj.attrs.get("probe_profile"), - "probe_linewidth": self._obj.attrs.get("probe_linewidth", np.nan), - "probe_duration": self._obj.attrs.get("probe_duration", np.nan), - "probe_polarization": self.probe_polarization, - } - return probe_info + if isinstance(radius, float): + radius = {str(d): radius for d in points} + else: + collectted_terms = {f"{k}_r" for k in points}.intersection(set(kwargs.keys())) + if collectted_terms: + radius = { + d: kwargs.get(f"{d}_r", DEFAULT_RADII.get(str(d), UNSPESIFIED)) for d in points + } + elif radius is None: + radius = {d: DEFAULT_RADII.get(str(d), UNSPESIFIED) for d in points} + assert isinstance(radius, dict) + return {d: radius.get(str(d), DEFAULT_RADII.get(str(d), UNSPESIFIED)) for d in points} - @property - def laser_info(self) -> LightSourceInfo: - return { - **self.probe_info, - **self.pump_info, - "repetition_rate": self._obj.attrs.get("repetition_rate", np.nan), - } + def find_spectrum_energy_edges( + self, + *, + indices: bool = False, + ) -> NDArray[np.float_]: # TODO: xr.DataArray + """Return energy position corresponding to the (1D) spectrum edge. - @property - def analyzer_info(self) -> AnalyzerInfo: - """General information about the photoelectron analyzer used.""" - analyzer_info: AnalyzerInfo = { - "lens_mode": self._obj.attrs.get("lens_mode"), - "lens_mode_name": self._obj.attrs.get("lens_mode_name"), - "acquisition_mode": self._obj.attrs.get("acquisition_mode", None), - "pass_energy": self._obj.attrs.get("pass_energy", np.nan), - "slit_shape": self._obj.attrs.get("slit_shape", None), - "slit_width": self._obj.attrs.get("slit_width", np.nan), - "slit_number": self._obj.attrs.get("slit_number", np.nan), - "lens_table": self._obj.attrs.get("lens_table"), - "analyzer_type": self._obj.attrs.get("analyzer_type"), - "mcp_voltage": self._obj.attrs.get("mcp_voltage", np.nan), - "work_function": self._obj.attrs.get("workfunction", 4.401), - } - return analyzer_info + Spectrum edge is infection point of the peak. - @property - def daq_info(self) -> DAQInfo: - """General information about the acquisition settings for an ARPES experiment.""" - daq_info: DAQInfo = { - "daq_type": self._obj.attrs.get("daq_type"), - "region": self._obj.attrs.get("daq_region"), - "region_name": self._obj.attrs.get("daq_region_name"), - "center_energy": self._obj.attrs.get("daq_center_energy", np.nan), - "prebinning": self.prebinning, - "trapezoidal_correction_strategy": self._obj.attrs.get( - "trapezoidal_correction_strategy", - ), - "dither_settings": self._obj.attrs.get("dither_settings"), - "sweep_settings": self.sweep_settings, - "frames_per_slice": self._obj.attrs.get("frames_per_slice", np.nan), - "frame_duration": self._obj.attrs.get("frame_duration", np.nan), - } - return daq_info + Args: + indices (bool): if True, return the pixel (index) number. - @property - def beamline_info(self) -> LightSourceInfo: - """Information about the beamline or light source used for a measurement.""" - beamline_info: LightSourceInfo = { - "hv": self.hv, - "linewidth": self._obj.attrs.get("probe_linewidth", np.nan), - "photon_polarization": self.probe_polarization, - "undulator_info": self.undulator_info, - "repetition_rate": self._obj.attrs.get("repetition_rate", np.nan), - "beam_current": self._obj.attrs.get("beam_current", np.nan), - "entrance_slit": self._obj.attrs.get("entrance_slit", None), - "exit_slit": self._obj.attrs.get("exit_slit", None), - "monochromator_info": self.monochromator_info, - } - return beamline_info + Returns: NDArray + Energy position + """ + assert isinstance( + self._obj, + xr.DataArray, + ) # if self._obj is xr.Dataset, values is function + energy_marginal = self._obj.sum([d for d in self._obj.dims if d != "eV"]) - @property - def sweep_settings(self) -> dict[str, xr.DataArray | NDArray[np.float_] | float | None]: - """For datasets acquired with swept acquisition settings, provides those settings.""" - return { - "high_energy": self._obj.attrs.get("sweep_high_energy"), - "low_energy": self._obj.attrs.get("sweep_low_energy"), - "n_sweeps": self._obj.attrs.get("n_sweeps"), - "step": self._obj.attrs.get("sweep_step"), - } + embed_size = 20 + embedded: NDArray[np.float_] = np.ndarray(shape=[embed_size, energy_marginal.sizes["eV"]]) + embedded[:] = energy_marginal.values + embedded = ndi.gaussian_filter(embedded, embed_size / 3) - @property - def probe_polarization(self) -> tuple[float, float]: - """Provides the probe polarization of the UV/x-ray source.""" - return ( - self._obj.attrs.get("probe_polarization_theta", np.nan), - self._obj.attrs.get("probe_polarization_alpha", np.nan), + from skimage import feature + + edges = feature.canny( + embedded, + sigma=embed_size / 5, + use_quantiles=True, + low_threshold=0.1, ) + edges = np.where(edges[int(embed_size / 2)] == 1)[0] + if indices: + return edges - @property - def pump_polarization(self) -> tuple[float, float]: - """For Tr-ARPES experiments, provides the pump polarization.""" - return ( - self._obj.attrs.get("pump_polarization_theta", np.nan), - self._obj.attrs.get("pump_polarization_alpha", np.nan), + delta = self._obj.G.stride(generic_dim_names=False) + return edges * delta["eV"] + self._obj.coords["eV"].values[0] + + def find_spectrum_angular_edges_full( + self, + *, + indices: bool = False, + energy_division: float = 0.05, + ) -> tuple[NDArray[np.float_], NDArray[np.float_], xr.DataArray]: + # as a first pass, we need to find the bottom of the spectrum, we will use this + # to select the active region and then to rebin into course steps in energy from 0 + # down to this region + # we will then find the appropriate edge for each slice, and do a fit to the edge locations + energy_edge: NDArray[np.float_] = self.find_spectrum_energy_edges() + low_edge = np.min(energy_edge) + energy_division + high_edge = np.max(energy_edge) - energy_division + + if high_edge - low_edge < 3 * energy_division: + # Doesn't look like the automatic inference of the energy edge was valid + high_edge = self._obj.coords["eV"].max().item() + low_edge = self._obj.coords["eV"].min().item() + + angular_dim = "pixel" if "pixel" in self._obj.dims else "phi" + energy_cut = self._obj.sel(eV=slice(low_edge, high_edge)).S.sum_other(["eV", angular_dim]) + + n_cuts = int(np.ceil((high_edge - low_edge) / energy_division)) + new_shape = {"eV": n_cuts} + new_shape[angular_dim] = energy_cut.sizes[angular_dim] + logger.debug(f"new_shape: {new_shape}") + rebinned = rebin(energy_cut, shape=new_shape) + + embed_size = 20 + embedded: NDArray[np.float_] = np.empty( + shape=[embed_size, rebinned.sizes[angular_dim]], ) + low_edges = [] + high_edges = [] + for e_cut_index in range(rebinned.sizes["eV"]): + e_slice = rebinned.isel(eV=e_cut_index) + embedded[:] = e_slice.values + embedded = ndi.gaussian_filter(embedded, embed_size / 1.5) # < = Why 1.5 + + from skimage import feature + + edges = feature.canny( + embedded, + sigma=4, + use_quantiles=False, + low_threshold=0.7, + high_threshold=1.5, + ) + edges = np.where(edges[int(embed_size / 2)] == 1)[0] + low_edges.append(np.min(edges)) + high_edges.append(np.max(edges)) - @property - def prebinning(self) -> dict[str, Any]: - """Information about the prebinning performed during scan acquisition.""" - prebinning = {} - for d in self._obj.indexes: - if f"{d}_prebinning" in self._obj.attrs: - prebinning[d] = self._obj.attrs[f"{d}_prebinning"] + if indices: + return np.array(low_edges), np.array(high_edges), rebinned.coords["eV"] - return prebinning # type: ignore [return-value] # because RA don't know the format of FITS. + delta = self._obj.G.stride(generic_dim_names=False) - @property - def monochromator_info(self) -> dict[str, float]: - """Details about the monochromator used on the UV/x-ray source.""" - return { - "grating_lines_per_mm": self._obj.attrs.get("grating_lines_per_mm", np.nan), - } + return ( + np.array(low_edges) * delta[angular_dim] + rebinned.coords[angular_dim].values[0], + np.array(high_edges) * delta[angular_dim] + rebinned.coords[angular_dim].values[0], + rebinned.coords["eV"], + ) - @property - def undulator_info(self) -> dict[str, str | float | None]: - """Details about the undulator for data performed at an undulator source.""" - return { - "gap": self._obj.attrs.get("undulator_gap"), - "z": self._obj.attrs.get("undulator_z"), - "harmonic": self._obj.attrs.get("undulator_harmonic"), - "polarization": self._obj.attrs.get("undulator_polarization"), - "type": self._obj.attrs.get("undulator_type"), - } + def zero_spectrometer_edges( + self, + cut_margin: int = 0, + interp_range: float | None = None, + low: Sequence[float] | NDArray[np.float_] | None = None, + high: Sequence[float] | NDArray[np.float_] | None = None, + ) -> xr.DataArray: # TODO: [RA] xr.DataArray + assert isinstance(self._obj, xr.DataArray) + if low is not None: + assert high is not None + assert len(low) == len(high) == TWO_DIMENSION - @property - def analyzer_detail(self) -> AnalyzerInfo: - """Details about the analyzer, its capabilities, and metadata.""" - return { - "analyzer_name": self._obj.attrs.get( - "analyzer_name", - self._obj.attrs.get("analyzer", ""), - ), - "parallel_deflectors": self._obj.attrs.get("parallel_deflectors", False), - "perpendicular_deflectors": self._obj.attrs.get("perpendicular_deflectors", False), - "analyzer_type": self._obj.attrs.get("analyzer_type", ""), - "analyzer_radius": self._obj.attrs.get("analyzer_radius", np.nan), - } + low_edges = low + high_edges = high - @property - def temp(self) -> float | Literal["RT", "LT"]: - """The temperature at which an experiment was performed.""" - prefered_attrs = [ - "TA", - "ta", - "t_a", - "T_A", - "T_1", - "t_1", - "t1", - "T1", - "temp", - "temp_sample", - "temperature", - "temp_cryotip", - "temperature_sensor_b", - "temperature_sensor_a", - "temperature_cryotip", - ] - for attr in prefered_attrs: - if attr in self._obj.attrs: - return self._obj.attrs[attr] - msg = "Could not read temperature off any standard attr" - logger.debug(msg, stacklevel=2) - return np.nan + ( + low_edges, + high_edges, + rebinned_eV_coord, + ) = self.find_spectrum_angular_edges_full(indices=True) - def generic_fermi_surface(self, fermi_energy: float) -> XrTypes: - return self.fat_sel(eV=fermi_energy, method="nearest") + angular_dim = "pixel" if "pixel" in self._obj.dims else "phi" + if not cut_margin: + if "pixel" in self._obj.dims: + cut_margin = 50 + else: + cut_margin = int(0.08 / self._obj.G.stride(generic_dim_names=False)[angular_dim]) + elif isinstance(cut_margin, float): + assert angular_dim == "phi" + cut_margin = int( + cut_margin / self._obj.G.stride(generic_dim_names=False)[angular_dim], + ) - @property - def fermi_surface(self) -> XrTypes: - return self.fat_sel(eV=0, method="nearest") + if interp_range is not None: + low_edge = xr.DataArray(low_edges, coords={"eV": rebinned_eV_coord}, dims=["eV"]) + high_edge = xr.DataArray(high_edges, coords={"eV": rebinned_eV_coord}, dims=["eV"]) + low_edge = low_edge.sel(eV=interp_range) + high_edge = high_edge.sel(eV=interp_range) + other_dims = list(self._obj.dims) + other_dims.remove("eV") + other_dims.remove(angular_dim) + copied = self._obj.copy(deep=True).transpose(*(["eV", angular_dim, *other_dims])) - def __init__(self, xarray_obj: XrTypes) -> None: - self._obj = xarray_obj + low_edges += cut_margin + high_edges -= cut_margin - @staticmethod - def dict_to_html(d: Mapping[str, float | str]) -> str: - return """ - - - - - - - - - {rows} - -
KeyValue
- """.format( - rows="".join([f"{k}{v}" for k, v in d.items()]), - ) + for i, energy in enumerate(copied.coords["eV"].values): + index = np.searchsorted(rebinned_eV_coord, energy) + other = index + 1 + if other >= len(rebinned_eV_coord): + other = len(rebinned_eV_coord) - 1 + index = len(rebinned_eV_coord) - 2 - @staticmethod - def _repr_html_full_coords( - coords: xr.Coordinates, - ) -> str: - significant_coords = {} - for k, v in coords.items(): - if v is None: - continue - if np.any(np.isnan(v)): - continue - significant_coords[k] = v + low_index = int(np.interp(energy, rebinned_eV_coord, low_edges)) + high_index = int(np.interp(energy, rebinned_eV_coord, high_edges)) + copied.values[i, 0:low_index] = 0 + copied.values[i, high_index:-1] = 0 - def coordinate_dataarray_to_flat_rep(value: xr.DataArray) -> str | float: - if not isinstance(value, xr.DataArray | DataArrayCoordinates | DatasetCoordinates): - return value - if len(value.dims) == 0: - tmp = "{var:.5g}" - return tmp.format(var=value.values) - tmp = "{min:.3g} to {max:.3g}" - tmp += " by {delta:.3g}" - return tmp.format( - min=value.min().item(), - max=value.max().item(), - delta=value.values[1] - value.values[0], - ) + return copied - return ARPESAccessorBase.dict_to_html( - {str(k): coordinate_dataarray_to_flat_rep(v) for k, v in significant_coords.items()}, + def sum_other( + self, + dim_or_dims: list[str], + *, + keep_attrs: bool = False, + ) -> XrTypes: + assert isinstance(dim_or_dims, list) + + return self._obj.sum( + [d for d in self._obj.dims if d not in dim_or_dims], + keep_attrs=keep_attrs, ) - def _repr_html_spectrometer_info(self) -> str: - ordered_settings = OrderedDict(self.spectrometer_settings) + def mean_other( + self, + dim_or_dims: list[str] | str, + *, + keep_attrs: bool = False, + ) -> XrTypes: + assert isinstance(dim_or_dims, list) - return ARPESAccessorBase.dict_to_html(ordered_settings) + return self._obj.mean( + [d for d in self._obj.dims if d not in dim_or_dims], + keep_attrs=keep_attrs, + ) - @staticmethod - def _repr_html_experimental_conditions(conditions: ExperimentInfo) -> str: - """Return the experimental conditions with html format. + def find_spectrum_angular_edges( + self, + *, + angle_name: str = "phi", + indices: bool = False, + ) -> NDArray[np.float_] | NDArray[np.int_]: # TODO: [RA] xr.DataArray + """Return angle position corresponding to the (1D) spectrum edge. Args: - conditions (ExperimentInfo): self.confitions is usually used. + angle_name (str): angle name to find the edge + indices (bool): if True, return the index not the angle value. - Returns (str): - html representation of the experimental conditions. + Returns: NDArray + Angle position """ + angular_dim = "pixel" if "pixel" in self._obj.dims else angle_name + assert isinstance(self._obj, xr.DataArray) + phi_marginal = self._obj.sum( + [d for d in self._obj.dims if d != angular_dim], + ) - def _experimentalinfo_to_dict(conditions: ExperimentInfo) -> dict[str, str]: - transformed_dict = {} - for k, v in conditions.items(): - if k == "polarrization": - assert isinstance(v, (float | str)) - transformed_dict[k] = { - "p": "Linear Horizontal", - "s": "Linear Vertical", - "rc": "Right Circular", - "lc": "Left Circular", - "s-p": "Linear Dichroism", - "p-s": "Linear Dichroism", - "rc-lc": "Circular Dichroism", - "lc-rc": "Circular Dichroism", - }.get(str(v), str(v)) - if k == "temp": - if isinstance(v, float) and not np.isnan(v): - transformed_dict[k] = f"{v} Kelvin" - elif isinstance(v, str): - transformed_dict[k] = v - if k == "hv": - if isinstance(v, xr.DataArray): - min_hv = float(v.min()) - max_hv = float(v.max()) - transformed_dict[k] = ( - f" from {min_hv} to {max_hv} eV" - ) - elif isinstance(v, float) and not np.isnan(v): - transformed_dict[k] = f"{v} eV" - return transformed_dict + embed_size = 20 + embedded: NDArray[np.float_] = np.ndarray( + shape=[embed_size, phi_marginal.sizes[angular_dim]], + ) + embedded[:] = phi_marginal.values + embedded = ndi.gaussian_filter(embedded, embed_size / 3) + + # try to avoid dependency conflict with numpy v0.16 + from skimage import feature # pylint: disable=import-error - transformed_dict = _experimentalinfo_to_dict(conditions) - return ARPESAccessorBase.dict_to_html(transformed_dict) + edges = feature.canny( + embedded, + sigma=embed_size / 5, + use_quantiles=True, + low_threshold=0.2, + ) + edges = np.where(edges[int(embed_size / 2)] == 1)[0] + if indices: + return edges - def _repr_html_(self) -> str: - """Return html representation of ARPES data. + delta = self._obj.G.stride(generic_dim_names=False) + return edges * delta[angular_dim] + self._obj.coords[angular_dim].values[0] - Returns: - html representation. - """ - skip_data_vars = { - "time", - } + def wide_angle_selector(self, *, include_margin: bool = True) -> slice: + edges = self.find_spectrum_angular_edges() + low_edge, high_edge = np.min(edges), np.max(edges) - if isinstance(self._obj, xr.Dataset): - to_plot = [str(k) for k in self._obj.data_vars if k not in skip_data_vars] - to_plot = [str(k) for k in to_plot if 1 <= len(self._obj[k].dims) < 3] # noqa: PLR2004 - to_plot = to_plot[:5] + # go and build in a small margin + if include_margin: + if "pixels" in self._obj.dims: + low_edge += 50 + high_edge -= 50 + else: + low_edge += 0.05 + high_edge -= 0.05 - if to_plot: - _, ax = plt.subplots( - 1, - len(to_plot), - figsize=(len(to_plot) * 3, 3), - ) - if len(to_plot) == 1: - ax = [ax] + return slice(low_edge, high_edge) - for i, plot_var in enumerate(to_plot): - self._obj[plot_var].T.plot(ax=ax[i]) - fancy_labels(ax[i]) - ax[i].set_title(plot_var.replace("_", " ")) + def meso_effective_selector(self) -> slice: + energy_edge = self.find_spectrum_energy_edges() + return slice(np.max(energy_edge) - 0.3, np.max(energy_edge) - 0.1) - remove_colorbars() + def region_sel( + self, + *regions: Literal["copper_prior", "wide_angular", "narrow_angular"] + | dict[str, DesignatedRegions], + ) -> XrTypes: + def process_region_selector( + selector: slice | DesignatedRegions, + dimension_name: str, + ) -> slice | Callable[..., slice]: + if isinstance(selector, slice): + return selector - elif 1 <= len(self._obj.dims) < 3: # noqa: PLR2004 - _, ax = plt.subplots(1, 1, figsize=(4, 3)) - self._obj.T.plot(ax=ax) - fancy_labels(ax, data=self._obj) - ax.set_title("") + options = { + "eV": ( + DesignatedRegions.ABOVE_EF, + DesignatedRegions.BELOW_EF, + DesignatedRegions.EF_NARROW, + DesignatedRegions.MESO_EF, + DesignatedRegions.MESO_EFFECTIVE_EF, + DesignatedRegions.ABOVE_EFFECTIVE_EF, + DesignatedRegions.BELOW_EFFECTIVE_EF, + DesignatedRegions.EFFECTIVE_EF_NARROW, + ), + "phi": ( + DesignatedRegions.NARROW_ANGLE, + DesignatedRegions.WIDE_ANGLE, + DesignatedRegions.TRIM_EMPTY, + ), + } - remove_colorbars() - wrapper_style = 'style="display: flex; flex-direction: row;"' + options_for_dim = options.get(dimension_name, list(DesignatedRegions)) + assert selector in options_for_dim - if "id" in self._obj.attrs: - name = "ID: " + str(self._obj.attrs["id"])[:9] + "..." - else: - name = "No name" + # now we need to resolve out the region + resolution_methods = { + DesignatedRegions.ABOVE_EF: slice(0, None), + DesignatedRegions.BELOW_EF: slice(None, 0), + DesignatedRegions.EF_NARROW: slice(-0.1, 0.1), + DesignatedRegions.MESO_EF: slice(-0.3, -0.1), + DesignatedRegions.MESO_EFFECTIVE_EF: self.meso_effective_selector, + # Implement me + # DesignatedRegions.TRIM_EMPTY: , + DesignatedRegions.WIDE_ANGLE: self.wide_angle_selector, + # DesignatedRegions.NARROW_ANGLE: self.narrow_angle_selector, + } + resolution_method = resolution_methods[selector] + if isinstance(resolution_method, slice): + return resolution_method + if callable(resolution_method): + return resolution_method() - warning = "" + msg = "Unable to determine resolution method." + raise NotImplementedError(msg) - if len(self._obj.attrs) < 10: # noqa: PLR2004 - warning = ': Few Attributes, Data Is Summed?' + obj = self._obj - return f""" -
{name}{warning}
-
-
- Experimental Conditions - {self._repr_html_experimental_conditions(self.experimental_conditions)} -
-
- Full Coordinates - {self._repr_html_full_coords(self.full_coords)} -
-
- Spectrometer - {self._repr_html_spectrometer_info()} -
-
- """ + def unpack_dim(dim_name: str) -> str: + if dim_name == "angular": + return "pixel" if "pixel" in obj.dims else "phi" - @property - def angle_unit(self) -> Literal["Degrees", "Radians"]: - return self._obj.attrs.get("angle_unit", "Radians") + return dim_name - @angle_unit.setter - def angle_unit(self, angle_unit: Literal["Degrees", "Radians"]) -> None: - """Set "angle unit". + for region in regions: + # remove missing dimensions from selection for permissiveness + # and to transparent composing of regions + obj = obj.sel( + { + k: process_region_selector(v, k) + for k, v in { + unpack_dim(k): v for k, v in normalize_region(region).items() + }.items() + if k in obj.dims + }, + ) - Angle unit should be "Degrees" or "Radians" + return obj - Args: - angle_unit: Literal["Degrees", "Radians"] - """ - assert angle_unit in { - "Degrees", - "Radians", - }, "Angle unit should be 'Degrees' or 'Radians'" - self._obj.attrs["angle_unit"] = angle_unit + def fat_sel( + self, + widths: dict[str, Any] | None = None, + **kwargs: Incomplete, + ) -> XrTypes: + """Allows integrating a selection over a small region. - def swap_angle_unit(self) -> None: - """Swap angle unit (radians <-> degrees). + The produced dataset will be normalized by dividing by the number + of slices integrated over. - Change the value of angle related objects/variables in attrs and coords - """ - if self.angle_unit == "Radians" or self.angle_unit.startswith("rad"): - self._radian_to_degree() - elif self.angle_unit == "Degrees" or self.angle_unit.startswith("deg"): - self._degree_to_radian() - else: - msg = 'The angle_unit must be "Radians" or "Degrees"' - raise TypeError(msg) + This can be used to produce temporary datasets that have reduced + uncorrelated noise. - def _radian_to_degree(self) -> None: - """A Helper function for swap_angle_unit. + Args: + widths: Override the widths for the slices. Reasonable defaults are used otherwise. + Defaults to None. + kwargs: slice dict. Has the same function as xarray.DataArray.sel - Degree -> Radian + Returns: + The data after selection. """ - self.angle_unit = "Degrees" - for angle in ANGLE_VARS: - if angle in self._obj.attrs: - self._obj.attrs[angle] = np.rad2deg(self._obj.attrs.get(angle, np.nan)) - if angle + "_offset" in self._obj.attrs: - self._obj.attrs[angle + "_offset"] = np.rad2deg( - self._obj.attrs.get(angle + "_offset", np.nan), - ) - if angle in self._obj.coords: - self._obj.coords[angle] = np.rad2deg(self._obj.coords[angle]) + if widths is None: + widths = {} + assert isinstance(widths, dict) + default_widths = { + "eV": 0.05, + "phi": 2, + "beta": 2, + "theta": 2, + "kx": 0.02, + "ky": 0.02, + "kp": 0.02, + "kz": 0.1, + } - def _degree_to_radian(self) -> None: - """A Helper function for swan_angle_unit. + extra_kwargs = {k: v for k, v in kwargs.items() if k not in self._obj.dims} + slice_kwargs = {k: v for k, v in kwargs.items() if k not in extra_kwargs} + slice_widths = { + k: widths.get(k, extra_kwargs.get(k + "_width", default_widths.get(k))) + for k in slice_kwargs + } + slices = { + k: slice(v - slice_widths[k] / 2, v + slice_widths[k] / 2) + for k, v in slice_kwargs.items() + } - Radian -> Degree - """ - self.angle_unit = "Radians" - for angle in ANGLE_VARS: - if angle in self._obj.attrs: - self._obj.attrs[angle] = np.deg2rad(self._obj.attrs.get(angle, np.nan)) - if angle + "_offset" in self._obj.attrs: - self._obj.attrs[angle + "_offset"] = np.deg2rad( - self._obj.attrs.get(angle + "_offset", np.nan), - ) - if angle in self._obj.coords: - self._obj.coords[angle] = np.deg2rad(self._obj.coords[angle]) + sliced = self._obj.sel(slices) # Need check. "**" should not be required. + thickness = np.prod([len(sliced.coords[k]) for k in slice_kwargs]) + normalized = sliced.sum(slices.keys(), keep_attrs=True, min_count=1) / thickness + for k, v in slices.items(): + normalized.coords[k] = (v.start + v.stop) / 2 + normalized.attrs.update(self._obj.attrs.copy()) + return normalized + + def generic_fermi_surface(self, fermi_energy: float) -> XrTypes: + return self.fat_sel(eV=fermi_energy, method="nearest") + + @property + def fermi_surface(self) -> XrTypes: + return self.fat_sel(eV=0, method="nearest") @xr.register_dataarray_accessor("S")