From 4b9c5184b2fa4b63577434c0a4521fbf251c3bfd Mon Sep 17 00:00:00 2001 From: Lukas Pielsticker <50139597+lukaspie@users.noreply.github.com> Date: Wed, 29 Nov 2023 10:19:35 +0100 Subject: [PATCH] black formatting all xps readers --- .../dataconverter/readers/xps/file_parser.py | 8 +- .../dataconverter/readers/xps/reader_utils.py | 9 +- .../readers/xps/txt/txt_vamas_export.py | 210 +++++++++--------- .../readers/xps/xml/xml_specs.py | 14 +- 4 files changed, 118 insertions(+), 123 deletions(-) diff --git a/pynxtools/dataconverter/readers/xps/file_parser.py b/pynxtools/dataconverter/readers/xps/file_parser.py index b1f66d78f..48357fdbc 100644 --- a/pynxtools/dataconverter/readers/xps/file_parser.py +++ b/pynxtools/dataconverter/readers/xps/file_parser.py @@ -26,14 +26,16 @@ from pynxtools.dataconverter.readers.xps.sle.sle_specs import SleParserSpecs from pynxtools.dataconverter.readers.xps.slh.slh_specs import SlhParserSpecs from pynxtools.dataconverter.readers.xps.txt.txt_scienta import TxtParserScienta + # from pynxtools.dataconverter.readers.xps.txt.txt_specs import TxtParserSpecs -from pynxtools.dataconverter.readers.xps.txt.txt_vamas_export import TxtParserVamasExport +from pynxtools.dataconverter.readers.xps.txt.txt_vamas_export import ( + TxtParserVamasExport, +) from pynxtools.dataconverter.readers.xps.vms.vamas import VamasParser from pynxtools.dataconverter.readers.xps.xy.xy_specs import XyParserSpecs from pynxtools.dataconverter.readers.xps.xml.xml_specs import XmlParserSpecs - class XpsDataFileParser: """Class intended for receiving any type of XPS data file.""" @@ -49,7 +51,7 @@ class XpsDataFileParser: }, "vms": {"unkwown": VamasParser}, "xml": {"specs": XmlParserSpecs}, - "xy": {'specs': XyParserSpecs}, + "xy": {"specs": XyParserSpecs}, } __config_files: Dict = { diff --git a/pynxtools/dataconverter/readers/xps/reader_utils.py b/pynxtools/dataconverter/readers/xps/reader_utils.py index 80df92f03..5d6e24824 100644 --- a/pynxtools/dataconverter/readers/xps/reader_utils.py +++ b/pynxtools/dataconverter/readers/xps/reader_utils.py @@ -21,6 +21,7 @@ from scipy.interpolate import interp1d import numpy as np + def safe_arange_with_edges(start, stop, step): """ In order to avoid float point errors in the division by step. @@ -43,6 +44,7 @@ def safe_arange_with_edges(start, stop, step): """ return step * np.arange(start / step, (stop + step) / step) + def check_uniform_step_width(x): """ Check to see if a non-uniform step width is used in the spectrum @@ -66,6 +68,7 @@ def check_uniform_step_width(x): return False return True + def get_minimal_step(x): """ Return the minimal difference between two consecutive values @@ -90,6 +93,7 @@ def get_minimal_step(x): return step + def _resample_array(y, x0, x1): """ Resample an array (y) which has the same initial spacing @@ -114,6 +118,7 @@ def _resample_array(y, x0, x1): fn = interp1d(x0, y, axis=0, fill_value="extrapolate") return fn(x1) + def interpolate_arrays(x, array_list): """ Interpolate data points in case a non-uniform step width was used. @@ -195,9 +200,7 @@ def construct_entry_name(key): try: # entry example : sample__name_of_scan_region entry_name = ( - f'{key_parts[2].split("_", 1)[1]}' - f"__" - f'{key_parts[4].split("_", 1)[1]}' + f'{key_parts[2].split("_", 1)[1]}' f"__" f'{key_parts[4].split("_", 1)[1]}' ) except IndexError: entry_name = "" diff --git a/pynxtools/dataconverter/readers/xps/txt/txt_vamas_export.py b/pynxtools/dataconverter/readers/xps/txt/txt_vamas_export.py index 56ab78e73..7db1aee5e 100644 --- a/pynxtools/dataconverter/readers/xps/txt/txt_vamas_export.py +++ b/pynxtools/dataconverter/readers/xps/txt/txt_vamas_export.py @@ -51,9 +51,9 @@ def __init__(self): self._root_path = "/ENTRY[entry]" self.parser_map = { - 'rows_of_tables': TextParserRows, - 'columns_of_tables':TextParserColumns - } + "rows_of_tables": TextParserRows, + "columns_of_tables": TextParserColumns, + } self.raw_data: list = [] self._xps_dict: dict = {} @@ -92,9 +92,9 @@ def _get_file_type(self, file): """ with open(file) as f: first_line = f.readline() - if first_line.startswith('Cycle'): - return 'columns_of_tables' - return 'rows_of_tables' + if first_line.startswith("Cycle"): + return "columns_of_tables" + return "rows_of_tables" @property def data_dict(self) -> dict: @@ -102,42 +102,35 @@ def data_dict(self) -> dict: return self._xps_dict def construct_data(self): - """ Map TXT format to NXmpes-ready dict. """ + """Map TXT format to NXmpes-ready dict.""" spectra = copy.deepcopy(self.raw_data) self._xps_dict["data"]: dict = {} key_map = { - 'file_info': [], - 'user': [], - 'instrument': [], - 'source': [], - 'beam': [ - 'excitation_energy', - 'excitation_energy/@units', - ], - 'analyser': [], - 'collectioncolumn': [], - 'energydispersion': [], - 'detector': [ - 'dwell_time', - 'dwell_time/@units', - ], - 'manipulator': [], - 'calibration': [], - 'sample': [ - 'sample_name' + "file_info": [], + "user": [], + "instrument": [], + "source": [], + "beam": [ + "excitation_energy", + "excitation_energy/@units", ], - 'data': [ - 'dwell_time', - 'y_units', + "analyser": [], + "collectioncolumn": [], + "energydispersion": [], + "detector": [ + "dwell_time", + "dwell_time/@units", ], - 'region': [ - 'region_name', - 'start_energy', - 'stop_energy', - 'step_size' + "manipulator": [], + "calibration": [], + "sample": ["sample_name"], + "data": [ + "dwell_time", + "y_units", ], + "region": ["region_name", "start_energy", "stop_energy", "step_size"], } for spectrum in spectra: @@ -151,25 +144,25 @@ def _update_xps_dict_with_spectrum(self, spectrum, key_map): # pylint: disable=too-many-locals group_parent = f'{self._root_path}/RegionGroup_{spectrum["spectrum_type"]}' region_parent = f'{group_parent}/regions/RegionData_{spectrum["region_name"]}' - file_parent = f'{region_parent}/file_info' - instrument_parent = f'{region_parent}/instrument' - analyser_parent = f'{instrument_parent}/analyser' + file_parent = f"{region_parent}/file_info" + instrument_parent = f"{region_parent}/instrument" + analyser_parent = f"{instrument_parent}/analyser" path_map = { - 'file_info': f'{file_parent}', - 'user': f'{region_parent}/user', - 'instrument': f'{instrument_parent}', - 'source': f'{instrument_parent}/source', - 'beam': f'{instrument_parent}/beam', - 'analyser': f'{analyser_parent}', - 'collectioncolumn': f'{analyser_parent}/collectioncolumn', - 'energydispersion': f'{analyser_parent}/energydispersion', - 'detector': f'{analyser_parent}/detector', - 'manipulator': f'{instrument_parent}/manipulator', - 'calibration': f'{instrument_parent}/calibration', - 'sample': f'{region_parent}/sample', - 'data': f'{region_parent}/data', - 'region': f'{region_parent}' + "file_info": f"{file_parent}", + "user": f"{region_parent}/user", + "instrument": f"{instrument_parent}", + "source": f"{instrument_parent}/source", + "beam": f"{instrument_parent}/beam", + "analyser": f"{analyser_parent}", + "collectioncolumn": f"{analyser_parent}/collectioncolumn", + "energydispersion": f"{analyser_parent}/energydispersion", + "detector": f"{analyser_parent}/detector", + "manipulator": f"{instrument_parent}/manipulator", + "calibration": f"{instrument_parent}/calibration", + "sample": f"{region_parent}/sample", + "data": f"{region_parent}/data", + "region": f"{region_parent}", } for grouping, spectrum_keys in key_map.items(): @@ -177,7 +170,7 @@ def _update_xps_dict_with_spectrum(self, spectrum, key_map): for spectrum_key in spectrum_keys: try: mpes_key = spectrum_key - self._xps_dict[f'{root}/{mpes_key}'] = spectrum[spectrum_key] + self._xps_dict[f"{root}/{mpes_key}"] = spectrum[spectrum_key] except KeyError: pass @@ -186,17 +179,16 @@ def _update_xps_dict_with_spectrum(self, spectrum, key_map): scan_key = construct_data_key(spectrum) - energy = np.array(spectrum["data"]['binding_energy']) + energy = np.array(spectrum["data"]["binding_energy"]) - self._xps_dict["data"][entry][scan_key] = \ - xr.DataArray( - data=spectrum["data"]['intensity'], - coords={"energy": energy}) + self._xps_dict["data"][entry][scan_key] = xr.DataArray( + data=spectrum["data"]["intensity"], coords={"energy": energy} + ) detector_data_key_child = construct_detector_data_key(spectrum) detector_data_key = f'{path_map["detector"]}/{detector_data_key_child}/counts' - self._xps_dict[detector_data_key] = spectrum["data"]['intensity'] + self._xps_dict[detector_data_key] = spectrum["data"]["intensity"] class TextParser(ABC): @@ -335,30 +327,30 @@ def _parse_blocks(self): return self.lines def _build_list_of_dicts(self, blocks): - """ - Build list of dictionaries, with each dict containing data - and metadata of one spectrum. + """ + Build list of dictionaries, with each dict containing data + and metadata of one spectrum. - Parameters - ---------- - blocks : list - List of data blocks containing one spectrum each. + Parameters + ---------- + blocks : list + List of data blocks containing one spectrum each. - Returns - ------- - spectra : list - List of dicts with spectrum data and metadata. + Returns + ------- + spectra : list + List of dicts with spectrum data and metadata. - """ - spectra = [] + """ + spectra = [] - header, data_lines = self._separate_header_and_data(blocks) - settings = self._parse_header(header) - data = self._parse_data(data_lines) - for spec_settings, spec_data in zip(settings, data): - spectra += [spec_settings|spec_data] + header, data_lines = self._separate_header_and_data(blocks) + settings = self._parse_header(header) + data = self._parse_data(data_lines) + for spec_settings, spec_data in zip(settings, data): + spectra += [spec_settings | spec_data] - return spectra + return spectra def _parse_header(self, header): """ @@ -377,16 +369,16 @@ def _parse_header(self, header): """ settings = [] - for spec_header in header[-1].split('\t')[1::3]: + for spec_header in header[-1].split("\t")[1::3]: sample_name = spec_header.split(":")[1] region = spec_header.split(":")[2] - spectrum_type = sample_name + '_' + region + spectrum_type = sample_name + "_" + region spectrum_settings = { - 'sample_name': sample_name, - 'region_name': region, - 'spectrum_type': spectrum_type, - 'y_units': spec_header.split(":")[3], - } + "sample_name": sample_name, + "region_name": region, + "spectrum_type": spectrum_type, + "y_units": spec_header.split(":")[3], + } settings += [spectrum_settings] return settings @@ -407,7 +399,7 @@ def _parse_data(self, data_lines): and the intensity axes of one spectrum each. """ - data_lines = [x.split('\t') for x in data_lines] + data_lines = [x.split("\t") for x in data_lines] for line in data_lines: del line[2::3] del line[-1] @@ -417,27 +409,26 @@ def _parse_data(self, data_lines): for line in data_lines: for i, data_point in enumerate(line): try: - lines[i].append(float(data_point.strip('\n'))) + lines[i].append(float(data_point.strip("\n"))) except ValueError: pass data = [] - for (x_bin, y) in zip(lines[::2], lines[1::2]): + for x_bin, y in zip(lines[::2], lines[1::2]): x_bin, y = np.array(x_bin), np.array(y) - if (self.uniform_energy_steps and - not check_uniform_step_width(x_bin)): + if self.uniform_energy_steps and not check_uniform_step_width(x_bin): x_bin, y = interpolate_arrays(x_bin, y) spectrum = { "data": { "binding_energy": np.array(x_bin), "intensity": np.array(y).squeeze(), - }, + }, "start_energy": x_bin[0], "stop_energy": x_bin[-1], - } + } if check_uniform_step_width(x_bin): spectrum["step_size"] = get_minimal_step(x_bin) @@ -491,17 +482,17 @@ def _parse_block_header(self, header): """ sample_name = header[0].split(":")[1] - region = header[0].split(":")[2].split('\n')[0] - spectrum_type = sample_name + '_' + region + region = header[0].split(":")[2].split("\n")[0] + spectrum_type = sample_name + "_" + region settings = { - 'sample_name': sample_name, - 'region_name': region, - 'spectrum_type': spectrum_type, - "excitation_energy": header[1].split('\t')[2], - "excitation_energy/@units": header[1].split('\t')[1].split(' ')[-1], - "dwell_time": float(header[1].split('\t')[4].strip()), - "dwell_time/@units": header[1].split('\t')[3].split(' ')[-1], - } + "sample_name": sample_name, + "region_name": region, + "spectrum_type": spectrum_type, + "excitation_energy": header[1].split("\t")[2], + "excitation_energy/@units": header[1].split("\t")[1].split(" ")[-1], + "dwell_time": float(header[1].split("\t")[4].strip()), + "dwell_time/@units": header[1].split("\t")[3].split(" ")[-1], + } return settings @@ -527,8 +518,7 @@ def _parse_block_data(self, block_data): x_bin = lines[:, 2] y = lines[:, 1] - if (self.uniform_energy_steps and - not check_uniform_step_width(x_kin)): + if self.uniform_energy_steps and not check_uniform_step_width(x_kin): x_kin, (x_bin, y) = interpolate_arrays(x_kin, [x_bin, y]) return { @@ -560,15 +550,15 @@ def _build_list_of_dicts(self, blocks): block_settings = self._parse_block_header(header) block_data = {"data": self._parse_block_data(block_data_lines)} kinetic_energy = block_data["data"]["kinetic_energy"] - block_settings.update({ - "start_energy": kinetic_energy[0], - "stop_energy": kinetic_energy[-1], - }) + block_settings.update( + { + "start_energy": kinetic_energy[0], + "stop_energy": kinetic_energy[-1], + } + ) if check_uniform_step_width(kinetic_energy): block_settings["step_size"] = get_minimal_step(kinetic_energy) - spectra += [block_settings|block_data] + spectra += [block_settings | block_data] return spectra - - diff --git a/pynxtools/dataconverter/readers/xps/xml/xml_specs.py b/pynxtools/dataconverter/readers/xps/xml/xml_specs.py index b05b1cfd2..1164e1be5 100644 --- a/pynxtools/dataconverter/readers/xps/xml/xml_specs.py +++ b/pynxtools/dataconverter/readers/xps/xml/xml_specs.py @@ -31,12 +31,13 @@ def _construct_entry_name_xml(key): key_parts = key.split("/") try: # entry example : vendor__sample__name_of_scan_region - entry_name = (f'{key_parts[2]}' - f'__' - f'{key_parts[3].split("_", 1)[1]}' - f'__' - f'{key_parts[5].split("_", 1)[1]}' - ) + entry_name = ( + f"{key_parts[2]}" + f"__" + f'{key_parts[3].split("_", 1)[1]}' + f"__" + f'{key_parts[5].split("_", 1)[1]}' + ) except IndexError: entry_name = "" return entry_name @@ -722,4 +723,3 @@ def construct_data(self): self._xps_dict["data"][entry][scan_nm] = xr.DataArray( data=channel_counts[0, :], coords={"BE": binding_energy} ) -