From f85d6d94a431a15bf9708aa2c107438bc3db96c3 Mon Sep 17 00:00:00 2001 From: Rubel Date: Wed, 29 Nov 2023 21:39:35 +0100 Subject: [PATCH] remove sts. --- pynxtools/dataconverter/readers/sts/helper.py | 268 ---------- pynxtools/dataconverter/readers/sts/reader.py | 225 -------- .../readers/sts/stm_file_parser.py | 388 -------------- .../readers/sts/sts_file_parser.py | 496 ------------------ 4 files changed, 1377 deletions(-) delete mode 100644 pynxtools/dataconverter/readers/sts/helper.py delete mode 100644 pynxtools/dataconverter/readers/sts/reader.py delete mode 100644 pynxtools/dataconverter/readers/sts/stm_file_parser.py delete mode 100644 pynxtools/dataconverter/readers/sts/sts_file_parser.py diff --git a/pynxtools/dataconverter/readers/sts/helper.py b/pynxtools/dataconverter/readers/sts/helper.py deleted file mode 100644 index 6ad99ca8a..000000000 --- a/pynxtools/dataconverter/readers/sts/helper.py +++ /dev/null @@ -1,268 +0,0 @@ -""" - Some generic function and class for on STM reader. -""" -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Tuple -import copy -import numpy as np -from pynxtools.dataconverter.helpers import (convert_data_dict_path_to_hdf5_path, - transform_to_intended_dt, - ) - - -# Here are some data or data type or unit or data to skip: -UNIT_TO_SKIP = ['on/off', 'off', 'on', 'off/on'] - - -def fill_template_from_eln_data(eln_data_dict, template): - """Fill out the template from dict that generated from eln yaml file. - Parameters: - ----------- - eln_data_dict : dict[str, Any] - Python dictionary from eln file. - template : dict[str, Any] - Return: - ------- - None - """ - - for e_key, e_val in eln_data_dict.items(): - template[e_key] = transform_to_intended_dt(e_val) - - -def overwrite_fields(template, data_dict, - sub_config_dict, nexus_path, - dict_orig_key_to_mod_key): - """ - Overwrite a field for multiple dimention of the same type of physical quantity. - - Parameters: - ----------- - template : dict[str, Any] - Capturing data elements. One to one dictionary for capturing data array, data axes - and so on from data_dict to be ploted. - data_dict : dict[str, Union[array, str]] - Data stored from dat file. Path (str) to data elements which mainly come from - dat file. Data from this dict will go to template - data_config_dict : dict[str, list] - This dictionary is numerical data order to list (list of path to data elements in - input file). Each order indicates a group of data set. - field_path : NeXus field full path - - Returns: - -------- - None - """ - # Find the overwriteable part - overwrite_part = "" - field_to_replace = "" - # Two possibilities are considered: tilt_N/@units and tilt_N - if '/@units' in nexus_path: - field_to_replace = nexus_path.rsplit('/', 2)[-2] - else: - field_to_replace = nexus_path.rsplit('/', 1)[-1] - for char in field_to_replace: - if char.isupper(): - overwrite_part = overwrite_part + char - - if not overwrite_part and not field_to_replace and isinstance(sub_config_dict, dict): - raise ValueError(f"No overwriteable part has been found but data structure " - f": {sub_config_dict} intended to overeritten.") - # sub_config_dict contains key that repalce the overwritable (upper case part) - # part from nexus path - for ch_to_replace, data_path in sub_config_dict.items(): - modified_field = field_to_replace.replace(overwrite_part, ch_to_replace) - # Considering renamed field - new_temp_key = nexus_path.replace(field_to_replace, f"{field_to_replace}[{modified_field}]") - value = "value" - unit = "unit" - dict_orig_key_to_mod_key[nexus_path] = new_temp_key - if value in data_path: - path_to_data = data_path[value] - template[new_temp_key] = transform_to_intended_dt(data_dict[path_to_data] - if path_to_data in data_dict - else None) - if unit in data_path: - path_to_data = data_path[unit] - unit = transform_to_intended_dt(data_dict[path_to_data] if path_to_data in data_dict - else None) - template[new_temp_key + "/@units"] = unit - - -def nested_path_to_slash_separated_path(nested_dict: dict, - flattened_dict: dict, - parent_path=''): - """Convert nested dict into slash separeted path upto certain level.""" - start = '/' - - for key, val in nested_dict.items(): - path = parent_path + start + key - if isinstance(val, dict): - nested_path_to_slash_separated_path(val, flattened_dict, path) - else: - flattened_dict[path] = val - - -def link_seperation(template, link_modified_dict): - """Rewrite the link compatible with hdf5 full path. - for e.g. convert /NXentry/NXinstrument/name to - /entry/instrument/name and rewrite in template. - - Parameters - ---------- - template : Template (dict) - To write out the hdf file - link_modified_dict : dict - The key corresponds to nxdl def path e.g. /ENTRY[entry]/INSTRUMENT[instrument]/NAME - and the value is the modified link path e.g. - /ENTRY[entry]/INSTRUMENT[special_instrument]/given_name where the - value is according to the implementaion of the NeXus def. - """ - for _, val in template.items(): - if isinstance(val, dict) and 'link' in val: - orig_link_path = val['link'] - # Check whether any concept has been rewriten stored in key value - if orig_link_path in link_modified_dict: - # modified concepts come in a list together. - modif_link_hdf_path = convert_data_dict_path_to_hdf5_path( - link_modified_dict[orig_link_path]) - val['link'] = modif_link_hdf_path - else: - val['link'] = convert_data_dict_path_to_hdf5_path(orig_link_path) - - -# pylint: disable=line-too-long -def link_seperation_from_hard_code(template, link_modified_dict): - """This function is intended to handle hard coded link. - In future, this function can be removed instead the upper function can be used, - once the application definition will be updated by link element. - """ - concept_to_data_link: dict = {"/ENTRY[entry]/reproducibility_indicators/backward_sweep": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/backward_sweep", - "/ENTRY[entry]/reproducibility_indicators/bias": - "/NXentry/NXinstrument/NXsample_bias/bias", - "/ENTRY[entry]/reproducibility_indicators/bias_calibration": - "/NXentry/NXnstrument/NXsample_bias/bias_calibration", - "/ENTRY[entry]/reproducibility_indicators/bias_offset": - "/NXentry/NXinstrument/NXsample_bias/bias_offset", - "/ENTRY[entry]/reproducibility_indicators/current": - "/NXentry/NXinstrument/NXenvironment/NXcurrent_sensor/current", - "/ENTRY[entry]/reproducibility_indicators/current_calibration": - "/NXentry/NXinstrument/NXenvironment/NXcurrent_sensor/current_calibration", - "/ENTRY[entry]/reproducibility_indicators/current_gain": - "/NXentry/NXinstrument/NXenvironment/NXcurrent_sensor/current_gain", - "/ENTRY[entry]/reproducibility_indicators/current_offset": - "/NXentry/NXinstrument/NXenvironment/NXcurrent_sensor/current_offset", - "/ENTRY[entry]/reproducibility_indicators/end_settling_time": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/end_settling_time", - "/ENTRY[entry]/reproducibility_indicators/final_z": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/record_final_z", - "/ENTRY[entry]/reproducibility_indicators/first_settling_time": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/first_settling_time", - "/ENTRY[entry]/reproducibility_indicators/max_slew_rate": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/max_slew_rate", - "/ENTRY[entry]/reproducibility_indicators/settling_time": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/", - "/ENTRY[entry]/reproducibility_indicators/y_control_p_gain": - "/NXentry/NXinstrument/NXenvironment/NXposition/NXz_controller/p_gain", - "/ENTRY[entry]/reproducibility_indicators/z_control_hold": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/z_ccontroller_hold", - "/ENTRY[entry]/reproducibility_indicators/z_control_i_gain": - "/NXentry/NXinstrument/NXenvironment/NXposition/NXz_controller/i_gain", - "/ENTRY[entry]/reproducibility_indicators/z_control_switchoff_delay": - "/NXentry/NXinstrument/NXenvironment/NXposition/NXz_controller/switchoff_delay", - "/ENTRY[entry]/reproducibility_indicators/z_control_time": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/z_control_time", - "/ENTRY[entry]/reproducibility_indicators/z_control_time_const": - "/NXentry/NXinstrument/NXenvironment/NXposition/NXz_controller/time_const", - "/ENTRY[entry]/reproducibility_indicators/z_control_tip_lift": - "/NXentry/NXinstrument/NXenvironment/NXposition/NXz_controller/tip_lift", - "/ENTRY[entry]/reproducibility_indicators/z_controller_name": - "/NXentry/NXinstrument/NXenvironment/NXposition/NXz_controller/controller_name", - "/ENTRY[entry]/reproducibility_indicators/z_controller_setpoint": - "/NXentry/NXinstrument/NXenvironment/NXposition/NXz_controller/set_point", - "/ENTRY[entry]/reproducibility_indicators/z_controller_status": - "/NXentry/NXinstrument/NXenvironment/NXposition/NXz_controller/controller_status", - "/ENTRY[entry]/reproducibility_indicators/z_offset": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/z_offset", - "/ENTRY[entry]/resolution_indicators/acquisition_period": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXcircuit/acquisition_period", - "/ENTRY[entry]/resolution_indicators/animations_period": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXcircuit/animations_period", - "/ENTRY[entry]/resolution_indicators/cryo_bottom_temp": - "/NXentry/NXinstrument/cryo_bottom_temp", - "/ENTRY[entry]/resolution_indicators/cryo_shield_temp": - "/NXentry/NXinstrument/temp_cryo_shield", - "/ENTRY[entry]/resolution_indicators/indicators_period": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXcircuit/indicators_period", - "/ENTRY[entry]/resolution_indicators/integration_time": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/NXintegration_time", - "/ENTRY[entry]/resolution_indicators/measurements_period": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXcircuit/measurements_period", - "/ENTRY[entry]/resolution_indicators/modulation_signal": - "/NXentry/NXinstrument/NXlock_in/modulation_signal", - "/ENTRY[entry]/resolution_indicators/num_pixel": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/num_pixel", - "/ENTRY[entry]/resolution_indicators/number_of_sweeps": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/number_of_sweeps", - "/ENTRY[entry]/resolution_indicators/rt_frequency": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXcircuit/rt_frequency", - "/ENTRY[entry]/resolution_indicators/signals_oversampling": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXcircuit/signals_oversampling", - "/ENTRY[entry]/resolution_indicators/stm_head_temp": - "/NXentry/NXinstrument/stm_head_temp", - "/ENTRY[entry]/resolution_indicators/sweep_end": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/sweep_end", - "/ENTRY[entry]/resolution_indicators/sweep_start": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/sweep_start", - "/ENTRY[entry]/resolution_indicators/z_avg_time": - "/NXentry/NXinstrument/NXenvironment/NXsweep_control/NXbias_spectroscopy/z_avg_time", - } - temp_template = copy.deepcopy(template) - for key, _ in temp_template.items(): - if key in concept_to_data_link: - concept = concept_to_data_link[key] - concept = concept.replace("NX", "") - # check concept already modified before - if concept in link_modified_dict: - concept = link_modified_dict[concept] - template[key] = {'link': concept} - - -def cal_dx_by_dy(x_val: np.ndarray, y_val: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: - """Calc conductance or gradiant dx/dy for x-variable and y-variable also return the result.""" - dx_ = x_val[0::2] - x_val[1::2] - dy_ = y_val[0::2] - y_val[1::2] - - dx_by_dy = dx_ / dy_ - - return dx_by_dy - - -def cal_x_multi_x(x_val: np.ndarray, y_val: np.ndarray) -> np.ndarray: - """Return multiplication of two array - """ - return x_val * y_val - - -def slice_before_last_element(np_array): - """Get all the elements before last element. - """ - if not isinstance(np_array, np.ndarray) and not len(np.shape(np_array)) == 1: - raise ValueError('Please provide a numpy array of 1D.') - return np_array[:-1] diff --git a/pynxtools/dataconverter/readers/sts/reader.py b/pynxtools/dataconverter/readers/sts/reader.py deleted file mode 100644 index 7c91bc7c0..000000000 --- a/pynxtools/dataconverter/readers/sts/reader.py +++ /dev/null @@ -1,225 +0,0 @@ -""" - A short description on STS reader which also suitable for file from STM . -""" - -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -from typing import Any, Dict, Tuple, Union -from collections.abc import Callable -import json -import yaml - -from pynxtools.dataconverter.readers.base.reader import BaseReader -from pynxtools.dataconverter.template import Template -from pynxtools.dataconverter.readers.sts.sts_file_parser import from_dat_file_into_template -from pynxtools.dataconverter.readers.sts.stm_file_parser import STM_Nanonis -from pynxtools.dataconverter.readers.utils import flatten_and_replace, FlattenSettings - - -CONVERT_DICT = { - 'Instrument': 'INSTRUMENT[instrument]', - 'Software': 'SOFTWARE[software]', - 'Hardware': 'Hardware[hardware]', - 'Analyser': 'ELECTRONANALYSER[electronanalyser]', - 'Beam': 'BEAM[beam]', - 'unit': '@units', - 'version': '@version', - 'Sample': 'SAMPLE[sample]', - 'User': 'USER[user]', - 'Data': 'DATA[data]', - 'Source': 'SOURCE[source]', - 'Environment': 'ENVIRONMENT[environment]', - 'Sample_bias': 'SAMPLE_BIAS[sample_bias]' -} -# For flatened key-value pair from nested dict. -REPLACE_NESTED: Dict[str, str] = {} - - -# pylint: disable=too-few-public-methods -class StmNanonisGeneric5e: - """Class to handle 'stm' experiment of software version 'Generic 5e' from 'nanonis' - vendor. - """ - - def __call__(self, template: Dict, data_file: str, config_dict: str, eln_dict: Dict) -> None: - """Convert class instace as callable function. - - Parameters - ---------- - template : Dict - Template that will be filled. - data_file : str - The file from experiment - config_dict : str - Config file to map application definition to the raw file - eln_dict : Dict - user provided dict - """ - - STM_Nanonis(file_name=data_file).from_sxm_file_into_template(template, - config_dict, - eln_dict) - - -# pylint: disable=too-few-public-methods -class StsNanonisGeneric5e: - """Class to handle 'sts' experiment of software version 'Generic 5e' from 'nanonis' - vendor. - """ - def __call__(self, template: Dict, data_file: str, config_dict: Dict, eln_dict: Dict) -> None: - """Convert class instace as callable function. - - Parameters - ---------- - template : Dict - Template that will be filled. - data_file : str - The file from experiment - config_dict : str - Config file to map application definition to the raw file - eln_dict : Dict - user provided dict - """ - from_dat_file_into_template(template, data_file, - config_dict, eln_dict) - - -# pylint: disable=too-few-public-methods -class Spm: - """This class is intended for taking care of vendor's name, - experiment (stm, sts, afm) and software versions. - - Raises - ------ - ValueError - If experiment is not in ['sts', 'stm', 'afm'] - ValueError - if vendor's name is not in ['nanonis'] - ValueError - if software version is not in ['Generic 5e'] - """ - - # parser navigate type - par_nav_t = Dict[str, Union['par_nav_t', Callable]] - __parser_navigation: Dict[str, par_nav_t] = \ - {'stm': {'nanonis': {'Generic 5e': StmNanonisGeneric5e}}, - 'sts': {'nanonis': {'Generic 5e': StsNanonisGeneric5e}} - } - - def get_appropriate_parser(self, eln_dict: Dict) -> Callable: - """Search for appropriate prser and pass it the reader. - - Parameters - ---------- - eln_dict : str - User provided eln file (yaml) that must contain all the info about - experiment, vendor's name and version of the vendor's software. - - Returns - ------- - Return callable function that has capability to run the correponding parser. - """ - - experiment_t_key: str = "/ENTRY[entry]/experiment_type" - experiment_t: str = eln_dict[experiment_t_key] - try: - experiment_dict: Spm.par_nav_t = self.__parser_navigation[experiment_t] - except KeyError as exc: - raise KeyError(f"Add correct experiment type in ELN file " - f" from {list(self.__parser_navigation.keys())}.") from exc - - vendor_key: str = "/ENTRY[entry]/INSTRUMENT[instrument]/SOFTWARE[software]/vendor" - vendor_t: str = eln_dict[vendor_key] - try: - vendor_dict: Spm.par_nav_t = experiment_dict[vendor_t] # type: ignore[assignment] - except KeyError as exc: - raise KeyError(f"Add correct vendor name in ELN file " - f" from {list(experiment_dict.keys())}.") from exc - - software_v_key: str = "/ENTRY[entry]/INSTRUMENT[instrument]/SOFTWARE[software]/@version" - software_v: str = eln_dict[software_v_key] - try: - parser_cls: Callable = vendor_dict[software_v] # type: ignore[assignment] - # cls instance - parser = parser_cls() - except KeyError as exc: - raise KeyError(f"Add correct software version in ELN file " - f" from {list(vendor_dict.keys())}.") from exc - - # Return callable function - return parser - - -# pylint: disable=invalid-name, too-few-public-methods -class STMReader(BaseReader): - """Reader for STM/STS.""" - - supported_nxdls = ["NXsts"] - - def read(self, - template: dict = None, - file_paths: Tuple[str] = None, - objects: Tuple[Any] = None): - """ - General read menthod to prepare the template. - """ - # has_sxm_file: bool = False - # sxm_file: str = "" - # has_dat_file: bool = False - # dat_file: str = "" - filled_template: Union[Dict, None] = Template() - # config_dict: Union[Dict[str, Any], None] = None - eln_dict: Union[Dict[str, Any], None] = None - config_dict: Dict = {} - - data_file: str = "" - for file in file_paths: - ext = file.rsplit('.', 1)[-1] - fl_obj: object - if ext in ['sxm', 'dat']: - data_file = file - if ext in ['yaml', 'yml']: - with open(file, encoding="utf-8", mode="r") as fl_obj: - eln_dict = flatten_and_replace( - FlattenSettings( - yaml.safe_load(fl_obj), - replace_nested=REPLACE_NESTED, - convert_dict=CONVERT_DICT, - ) - ) - if ext == 'json': - with open(file, mode="r", encoding="utf-8") as fl_obj: - config_dict = json.load(fl_obj) - - # Get callable object that has parser inside - parser = Spm().get_appropriate_parser(eln_dict) - parser(template, data_file, config_dict, eln_dict) - - for key, val in template.items(): - if val is not None: - filled_template[key] = val - else: - del template[key] - - if not filled_template.keys(): - raise ValueError("Reader could not read anything! Check for input files") - return filled_template - - -READER = STMReader diff --git a/pynxtools/dataconverter/readers/sts/stm_file_parser.py b/pynxtools/dataconverter/readers/sts/stm_file_parser.py deleted file mode 100644 index 29321f278..000000000 --- a/pynxtools/dataconverter/readers/sts/stm_file_parser.py +++ /dev/null @@ -1,388 +0,0 @@ -""" - A parser for files from stm experiment into a simple dict. -""" - -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -import os -from typing import Any, Dict -import logging -import re -import numpy as np -import nanonispy as nap - -from pynxtools.dataconverter.readers.sts.helper import (nested_path_to_slash_separated_path, - fill_template_from_eln_data, - overwrite_fields, - link_seperation_from_hard_code, - UNIT_TO_SKIP) -from pynxtools.dataconverter.helpers import transform_to_intended_dt - - -logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s') - - -def has_separator_char(key, sep_char_li): - """ - Check string or key whether the separator char provided in - 'Separator Char List' exist or not. - """ - bool_k = [x in sep_char_li for x in key] - return np.any(bool_k) - - -# pylint: disable=invalid-name -class STM_Nanonis(): - """Specific class for stm reader from nanonis company. - - """ - - def __init__(self, file_name): - """Construct - """ - - self.file_name = file_name - - def get_nested_dict_from_concatenated_key(self, data_dict, dict_to_map_path=None, - sep_chars=None): - """ - Create nested dict. If key are concateneted with '_', '>' split the key and - construct nested dict. For example, {'x1': {'x2': {'x3': {'x4': {'x5': 3}}}} - from 'x1_x2_x3_x4>x5:3' - """ - if dict_to_map_path is not None: - spreaded_dict = dict_to_map_path - else: - spreaded_dict: Dict[str, Any] = {} - if sep_chars is None: - sep_chars = ['_', '>'] - for d_key, d_val in data_dict.items(): - if has_separator_char(d_key, sep_chars): - # Find out which separator char exist there - for k_c in d_key: - if k_c in sep_chars: - sep_char = k_c - break - l_key, r_key = d_key.split(sep_char, 1) - if not has_separator_char(r_key, sep_chars): - if l_key not in spreaded_dict: - spreaded_dict[l_key]: Dict[str, Any] = {} - spreaded_dict[l_key][r_key] = d_val - else: - if l_key in spreaded_dict: - spreaded_dict[l_key] = self.get_nested_dict_from_concatenated_key( - {r_key: d_val}, dict_to_map_path=spreaded_dict[l_key]) - else: - spreaded_dict[l_key]: Dict[str, Any] = {} - spreaded_dict[l_key] = self.get_nested_dict_from_concatenated_key( - {r_key: d_val}, dict_to_map_path=spreaded_dict[l_key]) - else: - spreaded_dict[d_key] = d_val - - return spreaded_dict - - def convert_key_to_unit_and_entity(self, key, val, start_bracket='', end_bracket=''): - """ - Split key into 'key' and 'key/@units' if key is designed as somthing like this 'key(A)'. - """ - if start_bracket and end_bracket: - if start_bracket in key and end_bracket in key: - tmp_l_part, tmp_r_part = key.rsplit(start_bracket) - unit = tmp_r_part.rsplit(end_bracket)[0] - full_key = tmp_l_part.strip() - if unit in UNIT_TO_SKIP: - unit = '' - return [(full_key, val), (f"{full_key}/@unit", unit)] - - # In case if value contain name and unit e.g. /.../demodulated_signal: 'current(A)' - if start_bracket in val and end_bracket in val: - unit_parts = val.rsplit(start_bracket) - # Assume that val does not have any key but decriptive text, - # e.g. Current (A);Bias (V); - if len(unit_parts) > 2: - return [(key, val)] - tmp_l_part, tmp_r_part = unit_parts - unit = tmp_r_part.rsplit(end_bracket)[0] - val = tmp_l_part.strip() - if unit in UNIT_TO_SKIP: - unit = '' - return [(key, val), (f"{key}/@unit", unit)] - - return [] - - def get_sxm_raw_metadata_and_signal(self, file_name): - """ - Retun metadata plain dict and signal - Convert header part (that contains metadata) of a file with 'sxm' extension into - plain dict. - """ - scan_file = nap.read.Scan(file_name) - header_end_byte = scan_file.start_byte() - h_part = scan_file.read_raw_header(header_end_byte) - while True: - # Ignore all starting chars of string h_part except Alphabat - if not re.match("[a-zA-Z]", h_part): - h_part = h_part[1:] - else: - break - - h_comp_iter = iter(re.split('\n:|:\n', h_part)) - return dict(zip(h_comp_iter, h_comp_iter)), scan_file.signals - - def get_SPM_metadata_dict_and_signal(self): - """ - Get meradata and signal from spm file. - """ - metadata_dict, signal = self.get_sxm_raw_metadata_and_signal(self.file_name) - nesteded_matadata_dict = self.get_nested_dict_from_concatenated_key(metadata_dict) - # Convert nested (dict) path to signal into slash_separated path to signal - temp_flattened_dict_sig = {} - nested_path_to_slash_separated_path(signal, - temp_flattened_dict_sig) - temp_flattened_dict = {} - nested_path_to_slash_separated_path(nesteded_matadata_dict, - temp_flattened_dict) - flattened_dict = {} - for key, val in temp_flattened_dict.items(): - # list of tuples of (data path, data) and (unit path/unit and unit value) - tuple_li = self.convert_key_to_unit_and_entity(key, val, - start_bracket='(', - end_bracket=')') - if tuple_li: - for tup in tuple_li: - flattened_dict[tup[0]] = tup[1] - else: - flattened_dict[key] = val - - flattened_dict.update(temp_flattened_dict_sig) - return flattened_dict - - # pylint: disable=too-many-arguments - def construct_nxdata_for_sxm(self, - template, - data_dict, - sub_config_dict, - coor_info, - data_group): - """ - Construct NXdata that includes all the groups, field and attributes. All the elements - will be stored in template. - - Parameters: - ----------- - template : dict[str, Any] - Capturing data elements. One to one dictionary for capturing data array, data axes - and so on from data_dict to be ploted. - data_dict : dict[str, Union[array, str]] - Data stored from dat file. Path (str) to data elements which mainly come from - dat file. Data from this dict will go to template - data_config_dict : dict[str, list] - This dictionary is numerical data order to list (list of path to data elements in - input file). Each order indicates a group of data set. - coor_info: Tuple[list] - Tuple (for X and Y coordinate respectively) of list and each list starting and - end point of x-axis. - - data_group : NeXus path for NXdata - - Return: - ------- - None - - Raise: - ------ - None - """ - # pylint: disable=global-variable-undefined - def indivisual_DATA_field(): - """Fill up template's indivisual data field and the descendant attribute. - e.g. /Entry[ENTRY]/data/DATA, - /Entry[ENTRY]/data/DATA/@axes and so on - """ - # To define a variable on global namespace - global nxdata_grp, field_name - # list of paths e.g. "/LI_Demod_2_X/forward" comes provided file .sxm. - for path in dt_path_list: - if path in data_dict: - grp_name, field_name = find_nxdata_group_and_name(path) - grp_name = '_'.join(grp_name.lower().split(' ')) - signals.append(field_name) - nxdata_grp = data_group.replace("DATA[data", f"DATA[{grp_name}") - temp_data_field = nxdata_grp + '/' + field_name - scan_dt_arr = transform_to_intended_dt(data_dict[path]) - x_cor_len, y_cor_len = scan_dt_arr.shape - # collect for only one data field e.g. forward or backward, as all the data - # fields must have the same length of co-ordinate - if not axes_data: - # coor_info[i] has start, end and unit - axes_data.append(np.linspace(*coor_info[0][0:2], x_cor_len)) - axes_data.append(np.linspace(*coor_info[1][0:2], y_cor_len)) - axes_units.append(coor_info[0][2]) - template[temp_data_field] = scan_dt_arr - else: - # to clean up nxdata_grp and field_name from previous loop - nxdata_grp = '' - field_name = '' - - def fill_out_NXdata_group(): - """To fill out NXdata which is root for all data fields and attributes for NXdata. - This function fills template with first level of descendent fields and attributes - of NXdata but not the fields and attributes under child of NXdata. - """ - if nxdata_grp: - auxiliary_signals_attr = f"{nxdata_grp}/@auxiliary_signals" - axes = f"{nxdata_grp}/@axes" - signal_attr = f"{nxdata_grp}/@signal" - template[auxiliary_signals_attr] = [] - template[axes] = axes_name - for ind, data_field_nm in enumerate(signals): - - if ind == 0: - template[signal_attr] = data_field_nm - else: - template[auxiliary_signals_attr].append(data_field_nm) - for axis, axis_data in zip(axes_name, axes_data): - template[f"{nxdata_grp}/{axis}"] = axis_data - - def find_nxdata_group_and_name(key): - """Find data group name from a data path in file. - E.g. 'Z', 'LI_Demod_2_X' from /Z/forward and /LI_Demod_2_X/forward - Note: Create a function in stm_helper.py to unit scale such as nm, micrometer - """ - tmp_key = key.split('/', 1)[1] - grp_name, data_field_name = tmp_key.split('/', 1) - grp_name = grp_name.upper() - return grp_name, data_field_name - - for _, dt_path_list in sub_config_dict.items(): - signals = [] - axes_name = ['x', 'y'] - axes_units = [] - axes_data = [] - # The following functions can be thought as unpacked function body here. - if not dt_path_list: - continue - indivisual_DATA_field() - fill_out_NXdata_group() - - # pylint: disable=too-many-locals - def get_dimension_info(self, config_dict, data_dict): - """ - Extract dimension info from scanfield. - - ../ENVIRONMENT[environment]/scan_control/positioner/scanfield" - The scanfield has four parts starting point of (x, y) co-ordinate - length on (x, y)-dimenstion and one last unknown values. - """ - scanfield: str = '' - for key, val in config_dict.items(): - if ("/ENTRY[entry]/INSTRUMENT[instrument]/ENVIRONMENT[environment]/" - "scan_control/positioner/scanfield") == key: - if val in data_dict: - scanfield = data_dict[val] - else: - raise ValueError("Scanfield data missing: /ENTRY[entry]/INSTRUMENT[instrument]" - "/ENVIRONMENT[environment]/scan_control/positioner/scanfield") - conf_unit_key = 'unit_of_x_y_coordinate' - try: - unit_info = data_dict[config_dict[conf_unit_key]] - except KeyError as exc: - raise KeyError(f'No info found about coordinate unit. check config file by' - f'key {conf_unit_key}') from exc - for sep in [";"]: - if sep in scanfield: - # parts are X_cor, Y_cor, X_len, Y_len and one unkown value - scanfield_parts = scanfield.split(sep) - - x_start = transform_to_intended_dt(scanfield_parts[0]) - x_len = transform_to_intended_dt(scanfield_parts[2]) - x_cor = [x_start, x_start + x_len, unit_info] - y_start = transform_to_intended_dt(scanfield_parts[1]) - y_len = transform_to_intended_dt(scanfield_parts[3]) - y_cor = [y_start, y_start + y_len, unit_info] - return (x_cor, y_cor) - return () - - # pylint: disable=too-many-branches - def from_sxm_file_into_template(self, template, config_dict, eln_data_dict): - """ - Pass metadata and signals into template. This should be last steps for writting - metadata and data into nexus template. - """ - - nxdl_key_to_modified_key: dict = {} - data_dict = self.get_SPM_metadata_dict_and_signal() - - fill_template_from_eln_data(eln_data_dict, template) - # Fill out template from config file - temp_keys = template.keys() - for c_key, c_val in config_dict.items(): - if c_val in ['None', ""] or c_key[0] != '/': - continue - if c_key in temp_keys: - if isinstance(c_val, str): - if c_val in data_dict: - template[c_key] = transform_to_intended_dt(data_dict[c_val]) - # Handling multiple possible raw data according to user's defined name. - if isinstance(c_val, list): - for search_key in c_val: - if search_key in data_dict: - template[c_key] = transform_to_intended_dt(data_dict[search_key]) - if isinstance(c_val, dict): - data_group = "/ENTRY[entry]/DATA[data]" - if c_key == data_group: - coor_info = self.get_dimension_info(config_dict, data_dict) - self.construct_nxdata_for_sxm(template, - data_dict, - c_val, - coor_info, - data_group) - else: - overwrite_fields(template, - data_dict, - c_val, - c_key, - nxdl_key_to_modified_key) - else: - if isinstance(c_val, dict): - overwrite_fields(template, - data_dict, - c_val, - c_key, - nxdl_key_to_modified_key) - else: - template[c_key] = transform_to_intended_dt(data_dict[c_val]) \ - if c_val in data_dict else None - # The following function can be used later it link come true in application def. - # link_implementation(template, nxdl_key_to_modified_key) - link_seperation_from_hard_code(template, nxdl_key_to_modified_key) - - -def get_stm_raw_file_info(raw_file): - """Parse the raw_file into a organised dictionary. It helps users as well as developers - to understand how the reader works and modify the config file.""" - - raw_file = os.path.basename(raw_file) - raw_name = raw_file.rsplit('.')[0] - data_dict = STM_Nanonis(raw_file).get_SPM_metadata_dict_and_signal() - temp_file = f"{raw_name}.txt" - with open(temp_file, mode='w', encoding='utf-8') as txt_f: - for key, val in data_dict.items(): - txt_f.write(f"{key} : {val}\n") - logging.info(' %s has been created to investigate raw data structure.', temp_file) diff --git a/pynxtools/dataconverter/readers/sts/sts_file_parser.py b/pynxtools/dataconverter/readers/sts/sts_file_parser.py deleted file mode 100644 index 33b0cce48..000000000 --- a/pynxtools/dataconverter/readers/sts/sts_file_parser.py +++ /dev/null @@ -1,496 +0,0 @@ -#!/usr/bin/env python3 -""" - To collect data from Bias Spectroscopy output file that is mainly a - file with dat extension. -""" -# -*- coding: utf-8 -*- -# -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -from typing import Dict, Union, Tuple -import logging -import os -import numpy as np -from pynxtools.dataconverter.readers.sts.helper import (fill_template_from_eln_data, - nested_path_to_slash_separated_path, - overwrite_fields, - link_seperation_from_hard_code, - UNIT_TO_SKIP) -from pynxtools.dataconverter.helpers import transform_to_intended_dt - - -logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s') - - -# Type aliases -NestedDict = Dict[str, Union[int, str, float, 'NestedDict']] - - -# pylint: disable=invalid-name -class BiasSpecData_Nanonis(): - """This class collect and store data fo Bias spectroscopy of SPM experiment. - - The class splits the data and store in into nested python dictionary as follows. - E.g. - bais_data = {data_field_name:{value: value_for_data_field_of_any_data_typeS, - unit: unit name, - date: ---, - time: ---} - } - - """ - - def __init__(self, file_name: str) -> None: - """Innitialize object level variables.""" - # Note: If get some information about machines or vendors which makes - # the data file distinguished collecte them. - self.bias_spect_dict: NestedDict = {} - self.raw_file: str = file_name - self.nanonis_version = "" - self.choose_correct_function_to_extract_data() - - def get_data_nested_dict(self) -> NestedDict: - """Retrun nested dict as bellow - - bais_data = {data_field_name:{value: value_for_data_field_of_any_data_typeS, - unit: unit name, - date: ---, - time: ---} - } - """ - return self.bias_spect_dict - - # pylint: disable=too-many-arguments - def check_and_write_unit(self, dct, - key_or_line, unit_separators, - end_of_seperators, value=None): - """Check and write unit. - - Parameters - ---------- - dct : dict - - key_or_line : _type_ - The dict that tracks full nested paths and unit at deepest nest. - unit_separators : list - List of separator chars - end_of_seperators : list - List of end separator chars - value : dict, optional - dict to store dict - """ - for sep_unit, end_sep in zip(unit_separators, end_of_seperators): - if sep_unit in key_or_line: - key, unit = key_or_line.split(sep_unit, 1) - unit = unit.split(end_sep)[0] - if key_or_line in dct: - del dct[key_or_line] - # skiping some unit that are not part of standard e.g. on/off - if unit in UNIT_TO_SKIP: - unit = '' - if isinstance(value, dict): - value['unit'] = unit - else: - value: NestedDict = {} - value['unit'] = unit - dct[key] = value - break - - def retrive_key_recursively(self, line_to_analyse: str, - dict_to_store: NestedDict, - key_seperators: list) -> None: - """Store metadata path in recursive manner because the path is separated by chars. - - Parameters - ---------- - line_to_analyse : str - Line with metadata path where each part of path is separated by chars from - key_separated chars. - dict_to_store : NestedDict - Dict to store metadata path part in nested form - key_separators : list - List of chars separating metadata path. - """ - unit_separators = [' ('] - end_of_seperators = [')'] - - line_to_analyse = line_to_analyse.strip() - for k_sep in key_seperators: - new_dict: NestedDict = {} - if k_sep in line_to_analyse: - key, rest = line_to_analyse.split(k_sep, 1) - key = key.strip() - if key in dict_to_store: - new_dict = dict_to_store[key] # type: ignore - else: - new_dict = {} - dict_to_store[key] = new_dict - # check if key contains any unit inside bracket '()' - self.check_and_write_unit(dict_to_store, key, unit_separators, - end_of_seperators, new_dict) - self.retrive_key_recursively(rest, new_dict, key_seperators) - return - - for sep_unit in unit_separators: - if sep_unit in line_to_analyse: - self.check_and_write_unit(dict_to_store, line_to_analyse, - unit_separators, end_of_seperators) - return - - dict_to_store['value'] = line_to_analyse.strip() - return - - def check_matrix_data_block_has_started(self, line_to_analyse: str) -> Tuple[bool, list]: - """_summary_ - - Parameters - ---------- - line_to_analyse : str - Line to check whether matrix data has started. - - Returns - ------- - Bool flag: Flag for matarix data found - value list: List of row values if the matrix has found. - """ - wd_list = line_to_analyse.split() - int_list = [] - if not wd_list: - return False, [] - for word in wd_list: - try: - float_n = float(word) - int_list.append(float_n) - except ValueError: - return False, [] - return True, int_list - - def check_metadata_and_unit(self, key_and_unit: str): - """Check for metadata and unit. - - Parameters - ---------- - key_and_unit : str - String to check key, metadata and unit - """ - metadata = '' - key, unit = key_and_unit.split('(') - unit, rest = unit.split(')', 1) - # Some units have extra info e.g. Current (A) [filt] - if '[' in rest: - metadata = rest.split('[')[-1].split(']')[0] - if unit in UNIT_TO_SKIP: - unit = '' - return key, unit, metadata - - def extract_and_store_from_dat_file(self) -> None: - """Extract data from data file and store them into object level nested dictionary. - """ - - key_seperators = ['>', '\t'] - is_matrix_data_found = False - one_d_numpy_array = np.empty(0) - - def dismentle_matrix_into_dict_key_value_list(column_string, - one_d_np_array, - dict_to_store): - column_keys = column_string.split('\t') - np_2d_array = one_d_np_array.reshape(-1, len(column_keys)) - dat_mat_comp = 'dat_mat_components' - dict_to_store[dat_mat_comp] = {} - for ind, key_and_unit in enumerate(column_keys): - if '(' in key_and_unit: - key, unit, data_stage = self.check_metadata_and_unit(key_and_unit) - # data_stage could be 'filt' or something like this - if data_stage: - dict_to_store[dat_mat_comp][f"{key.strip()} [{data_stage}]"] = \ - {'unit': unit, - 'value': np_2d_array[:, ind], - 'metadata': data_stage} - else: - dict_to_store[dat_mat_comp][key.strip()] = {'unit': unit, - 'value': np_2d_array[:, ind]} - else: - dict_to_store[dat_mat_comp][key.strip()] = {'value': list(np_2d_array[:, ind])} - - with open(self.raw_file, mode='r', encoding='utf-8') as file_obj: - lines = file_obj.readlines() - # last two lines for getting matrix data block that comes at the end of the file - last_line: str - for ind, line in enumerate(lines): - if ind == 0: - last_line = line - continue - is_mat_data, data_list = self.check_matrix_data_block_has_started(line) - if is_mat_data: - is_matrix_data_found = True - one_d_numpy_array = np.append(one_d_numpy_array, data_list) - is_mat_data = False - # Map matrix data if file has at least two empty lines or starts - # other data or metadata except matrix data - elif (not is_mat_data) and is_matrix_data_found: - is_matrix_data_found = False - dismentle_matrix_into_dict_key_value_list(last_line, one_d_numpy_array, - self.bias_spect_dict) - last_line = line - else: - self.retrive_key_recursively(last_line, self.bias_spect_dict, - key_seperators) - last_line = line - - if (not is_mat_data) and is_matrix_data_found: - is_matrix_data_found = False - dismentle_matrix_into_dict_key_value_list(last_line, one_d_numpy_array, - self.bias_spect_dict) - - def choose_correct_function_to_extract_data(self) -> None: - """Choose correct function to extract data that data in organised format. - """ - if not os.path.isfile(self.raw_file): - raise ValueError("Provide correct file.") - - ext = self.raw_file.rsplit('.', 1)[-1] - if ext == 'dat': - self.extract_and_store_from_dat_file() - - def get_flip_number(self, eln_dict): - """Get the number to flip the data plot from user defined eln.""" - seach_key = "/ENTRY[entry]/INSTRUMENT[instrument]/lock_in/lock_in_data_flip_number" - if seach_key in eln_dict: - return eln_dict[seach_key] - - raise ValueError(f"To determine the plot fliping {seach_key} must be provided by eln.") - - -# pylint: disable=too-many-locals too-many-statements -def construct_nxdata_for_dat(template, - data_dict, - sub_config_dict, - data_group_concept, - flip_number): - """ - Construct NXdata that includes all the groups, field and attributes. All the elements - will be stored in template. - - Parameters: - ----------- - template : dict[str, Any] - Capturing data elements. One to one dictionary for capturing data array, data axes - and so on from data_dict to be ploted. - data_dict : dict[str, Union[array, str]] - Data stored from dat file. Path (str) to data elements which mainly come from - dat file. Data from this dict will go to template - sub_config_dict : dict[str, list] - This dictionary is numerically data order to list (list of path to data elements in - input file). Each order indicates a group of data set. - data_group_concept : NeXus path for NXdata - - Return: - ------- - None - - Raise: - ------ - None - """ - # pylint: disable=too-many-branches - def collect_into_indivisual_DATA_grp(): - """Fill up template's indivisual data field and the descendant attribute. - e.g. /Entry[ENTRY]/data/DATA, - /Entry[ENTRY]/data/DATA/@axes and so on - """ - dt_grps = [] - axes_name = [] - axes_unit = [] - axes_metadata = [] - axes_data = [] - # Bellow we are collecting: axes, and data field info. - # list of paths e.g. "/dat_mat_components/Bias/value" comes from - # dict value of /ENTRY[entry]/DATA[data] in config file. - for path in dt_val: - if path not in data_dict: - continue - # E.g. extra_annot:'filt', data_grp: LI Demod 1 X [filt] - dt_grp, extra_annot, trimed_path = find_extra_annot(path) - dt_grps.append(dt_grp) - is_axis_path = False - for axis in axes: - if axis + 'value' in trimed_path: - # removing forward slash - axes_name.append(axis[0:-1]) - axes_data.append(data_dict[path]) - axis_unit = path.replace('/value', '/unit') - axes_unit.append(data_dict[axis_unit] if axis_unit in data_dict else "") - axis_mtdt = path.replace('/value', '/metadata') - axes_metadata.append(data_dict[axis_mtdt] if axis_mtdt in data_dict else "") - is_axis_path = True - - # To collect field name for each dt_grp - if not is_axis_path and path[-6:] == '/value': - if extra_annot in dt_grp and '[' in dt_grp: - field = dt_grp[0:dt_grp.index('[')].strip() - else: - field = dt_grp - data_field_dt.append(data_dict[path]) - data_field_nm.append(field) - data_field_unit.append(get_unit(path, data_dict)) - - # Note: this value must come from ELN - # Note try to create link for axes - # Filling out field, axes, signal and so on of NXdata - if not axes_data and not axes_name: - axes_data = top_axes_data - axes_name = top_axes_name - axes_metadata = top_axes_metadata - axes_unit = top_axes_unit - - for dt_fd, dat_, unit in zip(data_field_nm, data_field_dt, data_field_unit): - dt_fd = '_'.join(dt_fd.lower().split(' ')) - if extra_annot: - temp_data_grp = data_group_concept.replace("DATA[data", f"DATA[{dt_fd}" - f"({extra_annot})") - else: - temp_data_grp = data_group_concept.replace("DATA[data", f"DATA[{dt_fd}") - template[temp_data_grp + '/@signal'] = dt_fd - template[temp_data_grp + '/@axes'] = axes_name - # template[temp_data_grp + '/title'] = - data_field = temp_data_grp + '/' + dt_fd - # To flip the data plot of Lock-in demodulated signal - if "li_demod" in dt_fd: - template[data_field] = dat_ * flip_number - else: - template[data_field] = dat_ # cal_dx_by_dy(current, volt) - - for axis, data_, a_unit in zip(axes_name, axes_data, axes_unit): - template[temp_data_grp + '/' + axis] = data_ - template[f"{temp_data_grp}/{axis}/@long_name"] = f"{axis}({a_unit})" - template[f"{temp_data_grp}/@{axis}_indices"] = 0 - if unit: - template[data_field + '/@long_name'] = f"{dt_fd} ({unit})" - else: - template[data_field + '/@long_name'] = dt_fd - - def get_unit(value_key, data_dict): - # value_key: /dat_mat_components/LI Demod 1 X/value - # unit_key: /dat_mat_components/LI Demod 1 X/unit - unit_key = value_key.replace('/value', '/unit') - if unit_key in data_dict: - return data_dict[unit_key] - return "" - - def find_extra_annot(key): - """Find out extra annotation that comes with data e.g. filt in - /dat_mat_components/Current [filt]/value, which refers scan in filter mode. - """ - data_grp = key.split('/')[-2] - extra_annot = data_grp.split('[')[-1] if '[' in data_grp else '' - extra_annot = extra_annot.split(']')[0].strip() - tmp_grp_nm = data_grp[0:data_grp.index('[')].strip() if '[' in data_grp else data_grp - - return data_grp, extra_annot, key.replace(data_grp, tmp_grp_nm) - - def top_level_Bias_axis(top_ax_list, data_dict): - """Sometimes Bias axis comes one with: /dat_mat_components/Bias calc/value. - Later on this bias will used as a Bias axis for all measurements. - """ - for path in top_ax_list: - for ax in axes: - if ax not in path: - continue - if '/value' == path[-6:] and path in data_dict: - top_axes_data.append(data_dict[path]) - top_axes_name.append('Bias') - unit_path = path.replace('/value', '/unit') - top_axes_unit.append(data_dict[unit_path] if unit_path in data_dict else "") - metadata_path = path.replace('/value', '/metadata') - top_axes_metadata.append(data_dict[metadata_path] if metadata_path - in data_dict else "") - top_axes_name = [] - top_axes_unit = [] - top_axes_metadata = [] - top_axes_data = [] - for dt_key, dt_val in sub_config_dict.items(): - # Possible axes - axes = ["Bias/", 'Bias calc/'] - # The axes and data list will be field globaly and used inside other local functions - data_field_nm = [] - data_field_dt = [] - data_field_unit = [] - # There are several scan data gourp in the given file. - if dt_key == '0': - # This is top level Bias axis which is the same for all the Lock-in signals - top_level_Bias_axis(dt_val, data_dict) - else: - collect_into_indivisual_DATA_grp() - - -def from_dat_file_into_template(template, dat_file, config_dict, eln_data_dict): - """Pass metadata, current and voltage into template from file - with dat extension. - """ - # To collect the concept if any nxdl concept is overwritten - dict_orig_key_to_mod_key: Dict[str, list] = {} - b_s_d = BiasSpecData_Nanonis(dat_file) - flattened_dict = {} - nested_path_to_slash_separated_path( - b_s_d.get_data_nested_dict(), - flattened_dict=flattened_dict) - - fill_template_from_eln_data(eln_data_dict, template) - for c_key, c_val in config_dict.items(): - if "@eln" in c_val: - continue - if c_val in ["", None, 'None', 'none']: - continue - if isinstance(c_val, str) and c_val in flattened_dict: - template[c_key] = transform_to_intended_dt(flattened_dict[c_val]) - if isinstance(c_val, dict) and c_val: - data_group_concept = "/ENTRY[entry]/DATA[data]" - if data_group_concept == c_key: - # pass exp. data section to NXdata group - flip_num = b_s_d.get_flip_number(eln_data_dict) - construct_nxdata_for_dat(template, flattened_dict, - c_val, data_group_concept, flip_num) - else: - # pass other physical quantity that has muliple dimensions or type for - # same physical quantity e.g. in drift_N N will be replaced X, Y and Z - overwrite_fields(template, flattened_dict, c_val, c_key, - dict_orig_key_to_mod_key) - # The following function can be used if links in application come true - # link_seperation(template, dict_orig_key_to_mod_key) - link_seperation_from_hard_code(template, dict_orig_key_to_mod_key) - - -def get_sts_raw_file_info(raw_file): - """Parse the raw_file into a organised dictionary. It helps users as well as developers - to understand how the reader works and modify the config file.""" - - raw_file = os.path.basename(raw_file) - raw_name = raw_file.split('.')[0] - temp_file = f"{raw_name}.txt" - b_s_d = BiasSpecData_Nanonis(raw_file) - flattened_dict = {} - nested_path_to_slash_separated_path( - b_s_d.get_data_nested_dict(), - flattened_dict=flattened_dict) - with open(temp_file, mode='w', encoding='utf-8') as txt_f: - for key, val in flattened_dict.items(): - txt_f.write(f"{key} : {val}\n") - - logging.info(' %s has been created to investigate raw data structure.', temp_file)