diff --git a/pynxtools/nyaml2nxdl/README.md b/pynxtools/nyaml2nxdl/README.md deleted file mode 100644 index ff083e189..000000000 --- a/pynxtools/nyaml2nxdl/README.md +++ /dev/null @@ -1,72 +0,0 @@ -# YAML to NXDL converter and NXDL to YAML converter - -**NOTE: Please use python3.8 or above to run this converter** - -**Tools purpose**: Offer a simple YAML-based schema and a XML-based schema to describe NeXus instances. These can be NeXus application definitions or classes -such as base or contributed classes. Users either create NeXus instances by writing a YAML file or a XML file which details a hierarchy of data/metadata elements. -The forward (YAML -> NXDL.XML) and backward (NXDL.XML -> YAML) conversions are implemented. - -**How the tool works**: -- yaml2nxdl.py -1. Reads the user-specified NeXus instance, either in YML or XML format. -2. If input is in YAML, creates an instantiated NXDL schema XML tree by walking the dictionary nest. - If input is in XML, creates a YML file walking the dictionary nest. -3. Write the tree into a YAML file or a properly formatted NXDL XML schema file to disk. -4. Optionally, if --append argument is given, - the XML or YAML input file is interpreted as an extension of a base class and the entries contained in it - are appended below a standard NeXus base class. - You need to specify both your input file (with YAML or XML extension) and NeXus class (with no extension). - Both .yml and .nxdl.xml file of the extended class are printed. - -```console -user@box:~$ python yaml2nxdl.py - -Usage: python yaml2nxdl.py [OPTIONS] - -Options: - --input-file TEXT The path to the input data file to read. - --append TEXT Parse xml NeXus file and append to specified base class, - write the base class name with no extension. - --check-consistency Check consistency by generating another version of the input file. - E.g. for input file: NXexample.nxdl.xml the output file - NXexample_consistency.nxdl.xml. - --verbose Addictional std output info is printed to help debugging. - --help Show this message and exit. - -``` - -## Documentation - -**Rule set**: From transcoding YAML files we need to follow several rules. -* Named NeXus groups, which are instances of NeXus classes especially base or contributed classes. Creating (NXbeam) is a simple example of a request to define a group named according to NeXus default rules. mybeam1(NXbeam) or mybeam2(NXbeam) are examples how to create multiple named instances at the same hierarchy level. -* Members of groups so-called fields or attributes. A simple example of a member is voltage. Here the datatype is implied automatically as the default NeXus NX_CHAR type. By contrast, voltage(NX_FLOAT) can be used to instantiate a member of class which should be of NeXus type NX_FLOAT. -* And attributes of either groups or fields. Names of attributes have to be preceeded by \@ to mark them as attributes. -* Optionality: For all fields, groups and attributes in `application definitions` are `required` by default, except anything (`recommended` or `optional`) mentioned. - -**Special keywords**: Several keywords can be used as childs of groups, fields, and attributes to specify the members of these. Groups, fields and attributes are nodes of the XML tree. -* **doc**: A human-readable description/docstring -* **exists** Options are recommended, required, [min, 1, max, infty] numbers like here 1 can be replaced by any uint, or infty to indicate no restriction on how frequently the entry can occur inside the NXDL schema at the same hierarchy level. -* **link** Define links between nodes. -* **units** A statement introducing NeXus-compliant NXDL units arguments, like NX_VOLTAGE -* **dimensions** Details which dimensional arrays to expect -* **enumeration** Python list of strings which are considered as recommended entries to choose from. -* **dim_parameters** `dim` which is a child of `dimension` and the `dim` might have several attributes `ref`, -`incr` including `index` and `value`. So while writting `yaml` file schema definition please following structure: -``` -dimensions: - rank: integer value - dim: [[ind_1, val_1], [ind_2, val_2], ...] - dim_parameters: - ref: [ref_value_1, ref_value_2, ...] - incr: [incr_value_1, incr_value_2, ...] -``` -Keep in mind that length of all the lists must be same. - -## Next steps - -The NOMAD team is currently working on the establishing of a one-to-one mapping between -NeXus definitions and the NOMAD MetaInfo. As soon as this is in place the YAML files will -be annotated with further metadata so that they can serve two purposes. -On the one hand they can serve as an instance for a schema to create a GUI representation -of a NOMAD Oasis ELN schema. On the other hand the YAML to NXDL converter will skip all -those pieces of information which are irrelevant from a NeXus perspective. diff --git a/pynxtools/nyaml2nxdl/__init__.py b/pynxtools/nyaml2nxdl/__init__.py deleted file mode 100644 index 22eb35f68..000000000 --- a/pynxtools/nyaml2nxdl/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python3 -""" -# Load paths -""" -# -*- coding: utf-8 -*- -# -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# diff --git a/pynxtools/nyaml2nxdl/comment_collector.py b/pynxtools/nyaml2nxdl/comment_collector.py deleted file mode 100644 index 515f55ddd..000000000 --- a/pynxtools/nyaml2nxdl/comment_collector.py +++ /dev/null @@ -1,513 +0,0 @@ -#!usr/bin/env python3 -# -*- coding: utf-8 -*- -# -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Collect comments in a list by CommentCollector class. Comment is a instance of Comment, -where each comment includes comment text and line info or neighbour info where the -comment must be assinged. - -The class Comment is an abstract class for general functions or method to be implemented -XMLComment and YAMLComment class. - -NOTE: Here comment block mainly stands for (comment text + line or element for what comment is -intended.) -""" - - -from typing import List, Type, Any, Tuple, Union, Dict -from pynxtools.nyaml2nxdl.nyaml2nxdl_helper import LineLoader - -__all__ = ["Comment", "CommentCollector", "XMLComment", "YAMLComment"] - - -# pylint: disable=inconsistent-return-statements -class CommentCollector: - """CommentCollector will store a full comment ('Comment') object in - _comment_chain. - """ - - def __init__(self, input_file: str = None, loaded_obj: Union[object, Dict] = None): - """ - Initialise CommentCollector - parameters: - input_file: raw input file (xml, yml) - loaded_obj: file loaded by third party library - """ - self._comment_chain: List = [] - self.file = input_file - self._comment_tracker = 0 - self._comment_hash: Dict[Tuple, Type[Comment]] = {} - self.comment: Type[Comment] - if self.file and not loaded_obj: - if self.file.split(".")[-1] == "xml": - self.comment = XMLComment - if self.file.split(".")[-1] == "yaml": - self.comment = YAMLComment - with open(self.file, "r", encoding="utf-8") as plain_text_yaml: - loader = LineLoader(plain_text_yaml) - self.comment.__yaml_dict__ = loader.get_single_data() - elif self.file and loaded_obj: - if self.file.split(".")[-1] == "yaml" and isinstance(loaded_obj, dict): - self.comment = YAMLComment - self.comment.__yaml_dict__ = loaded_obj - else: - raise ValueError( - "Incorrect inputs for CommentCollector e.g. Wrong file extension." - ) - - else: - raise ValueError("Incorrect inputs for CommentCollector") - - def extract_all_comment_blocks(self): - """ - Collect all comments. Note that here comment means (comment text + element or line info - intended for comment. - """ - id_ = 0 - single_comment = self.comment(comment_id=id_) - with open(self.file, mode="r", encoding="UTF-8") as enc_f: - lines = enc_f.readlines() - # Make an empty line for last comment if no empty lines in original file - if lines[-1] != "": - lines.append("") - for line_num, line in enumerate(lines): - if single_comment.is_storing_single_comment(): - # If the last comment comes without post nxdl fields, groups and attributes - if "++ SHA HASH ++" in line: - # Handle with stored nxdl.xml file that is not part of yaml - line = "" - single_comment.process_each_line( - line + "post_comment", (line_num + 1) - ) - self._comment_chain.append(single_comment) - break - if line_num < (len(lines) - 1): - # Processing file from Line number 1 - single_comment.process_each_line(line, (line_num + 1)) - else: - # For processing last line of file - single_comment.process_each_line( - line + "post_comment", (line_num + 1) - ) - self._comment_chain.append(single_comment) - else: - self._comment_chain.append(single_comment) - single_comment = self.comment(last_comment=single_comment) - single_comment.process_each_line(line, (line_num + 1)) - - def get_comment(self): - """ - Return comment from comment_chain that must come earlier in order. - """ - return self._comment_chain[self._comment_tracker] - - def get_coment_by_line_info(self, comment_locs: Tuple[str, Union[int, str]]): - """ - Get comment using line information. - """ - if comment_locs in self._comment_hash: - return self._comment_hash[comment_locs] - - line_annot, line_loc = comment_locs - for cmnt in self._comment_chain: - if line_annot in cmnt: - line_loc_ = cmnt.get_line_number(line_annot) - if line_loc == line_loc_: - self._comment_hash[comment_locs] = cmnt - return cmnt - - def remove_comment(self, ind): - """Remove a comment from comment list.""" - if ind < len(self._comment_chain): - del self._comment_chain[ind] - else: - raise ValueError("Oops! Index is out of range.") - - def reload_comment(self): - """ - Update self._comment_tracker after done with last comment. - """ - self._comment_tracker += 1 - - def __contains__(self, comment_locs: tuple): - """ - Confirm wether the comment corresponds to key_line and line_loc - is exist or not. - comment_locs is equvalant to (line_annotation, line_loc) e.g. - (__line__doc and 35) - """ - if not isinstance(comment_locs, tuple): - raise TypeError( - "Comment_locs should be 'tuple' containing line annotation " - "(e.g.__line__doc) and line_loc (e.g. 35)." - ) - line_annot, line_loc = comment_locs - for cmnt in self._comment_chain: - if line_annot in cmnt: - line_loc_ = cmnt.get_line_number(line_annot) - if line_loc == line_loc_: - self._comment_hash[comment_locs] = cmnt - return True - return False - - def __getitem__(self, ind): - """Get comment from self.obj._comment_chain by index.""" - if isinstance(ind, int): - if ind >= len(self._comment_chain): - raise IndexError( - f"Oops! Comment index {ind} in {__class__} is out of range!" - ) - return self._comment_chain[ind] - - if isinstance(ind, slice): - start_n = ind.start or 0 - end_n = ind.stop or len(self._comment_chain) - return self._comment_chain[start_n:end_n] - - def __iter__(self): - """get comment ieratively""" - return iter(self._comment_chain) - - -# pylint: disable=too-many-instance-attributes -class Comment: - """ - This class is building yaml comment and the intended line for what comment is written. - """ - - def __init__(self, comment_id: int = -1, last_comment: "Comment" = None) -> None: - """Comment object can be considered as a block element that includes - document element (an entity for what the comment is written). - """ - self._elemt: Any = None - self._elemt_text: str = None - self._is_elemt_found: bool = None - self._is_elemt_stored: bool = None - - self._comnt: str = "" - # If Multiple comments for one element or entity - self._comnt_list: List[str] = [] - self.last_comment: "Comment" = last_comment if last_comment else None - if comment_id >= 0 and last_comment: - self.cid = comment_id - self.last_comment = last_comment - elif comment_id == 0 and not last_comment: - self.cid = comment_id - self.last_comment = None - elif last_comment: - self.cid = self.last_comment.cid + 1 - self.last_comment = last_comment - else: - raise ValueError("Neither last comment nor comment id dound") - self._comnt_start_found: bool = False - self._comnt_end_found: bool = False - self.is_storing_single_comment = lambda: not ( - self._comnt_end_found and self._is_elemt_stored - ) - - def get_comment_text(self) -> Union[List, str]: - """ - Extract comment text from entrire comment (comment text + elment or - line for what comment is intended) - """ - - def append_comment(self, text: str) -> None: - """ - Append lines of the same comment. - """ - - def store_element(self, args) -> None: - """ - Strore comment text and line or element that is intended for comment. - """ - - -class XMLComment(Comment): - """ - XMLComment to store xml comment element. - """ - - def __init__(self, comment_id: int = -1, last_comment: "Comment" = None) -> None: - super().__init__(comment_id, last_comment) - - def process_each_line(self, text, line_num): - """Take care of each line of text. Through which function the text - must be passed should be decide here. - """ - text = text.strip() - if text and line_num: - self.append_comment(text) - if self._comnt_end_found and not self._is_elemt_found: - # for multiple comment if exist - if self._comnt: - self._comnt_list.append(self._comnt) - self._comnt = "" - - if self._comnt_end_found: - self.store_element(text) - - def append_comment(self, text: str) -> None: - # Comment in single line - if "" == text[-4:]: - self._comnt_end_found = True - self._comnt_start_found = False - self._comnt = self._comnt.replace("-->", "") - - elif "-->" == text[0:4] and self._comnt_start_found: - self._comnt_end_found = True - self._comnt_start_found = False - self._comnt = self._comnt + "\n" + text.replace("-->", "") - elif self._comnt_start_found: - self._comnt = self._comnt + "\n" + text - - # pylint: disable=arguments-differ, arguments-renamed - def store_element(self, text) -> None: - def collect_xml_attributes(text_part): - for part in text_part: - part = part.strip() - if part and '">' == "".join(part[-2:]): - self._is_elemt_stored = True - self._is_elemt_found = False - part = "".join(part[0:-2]) - elif part and '"/>' == "".join(part[-3:]): - self._is_elemt_stored = True - self._is_elemt_found = False - part = "".join(part[0:-3]) - elif part and "/>" == "".join(part[-2:]): - self._is_elemt_stored = True - self._is_elemt_found = False - part = "".join(part[0:-2]) - elif part and ">" == part[-1]: - self._is_elemt_stored = True - self._is_elemt_found = False - part = "".join(part[0:-1]) - elif part and '"' == part[-1]: - part = "".join(part[0:-1]) - - if '="' in part: - lf_prt, rt_prt = part.split('="') - else: - continue - if ":" in lf_prt: - continue - self._elemt[lf_prt] = str(rt_prt) - - if not self._elemt: - self._elemt = {} - # First check for comment part has been collected prefectly - if " Union[List, str]: - """ - This method returns list of commnent text. As some xml element might have - multiple separated comment intended for a single element. - """ - return self._comnt_list - - -class YAMLComment(Comment): - """ - This class for stroing comment text as well as location of the comment e.g. line - number of other in the file. - NOTE: - 1. Do not delete any element form yaml dictionary (for loaded_obj. check: Comment_collector - class. because this loaded file has been exploited in nyaml2nxdl forward tools.) - """ - - # Class level variable. The main reason behind that to follow structure of - # abstract class 'Comment' - __yaml_dict__: dict = {} - __yaml_line_info: dict = {} - __comment_escape_char = {"--": "-\\-"} - - def __init__(self, comment_id: int = -1, last_comment: "Comment" = None) -> None: - """Initialization of YAMLComment follow Comment class.""" - super().__init__(comment_id, last_comment) - self.collect_yaml_line_info( - YAMLComment.__yaml_dict__, YAMLComment.__yaml_line_info - ) - - def process_each_line(self, text, line_num): - """Take care of each line of text. Through which function the text - must be passed should be decide here. - """ - text = text.strip() - self.append_comment(text) - if self._comnt_end_found and not self._is_elemt_found: - if self._comnt: - self._comnt_list.append(self._comnt) - self._comnt = "" - - if self._comnt_end_found: - line_key = "" - if ":" in text: - ind = text.index(":") - line_key = "__line__" + "".join(text[0:ind]) - - for l_num, l_key in self.__yaml_line_info.items(): - if line_num == int(l_num) and line_key == l_key: - self.store_element(line_key, line_num) - break - # Comment comes very end of the file - if text == "post_comment" and line_key == "": - line_key = "__line__post_comment" - self.store_element(line_key, line_num) - - def has_post_comment(self): - """ - Ensure is this a post coment or not. - Post comment means the comment that come at the very end without having any - nxdl element(class, group, filed and attribute.) - """ - for key, _ in self._elemt.items(): - if "__line__post_comment" == key: - return True - return False - - def append_comment(self, text: str) -> None: - """ - Collects all the line of the same comment and - append them with that single comment. - """ - # check for escape char - text = self.replace_scape_char(text) - # Empty line after last line of comment - if not text and self._comnt_start_found: - self._comnt_end_found = True - self._comnt_start_found = False - # For empty line inside doc or yaml file. - elif not text: - return - elif "# " == "".join(text[0:2]): - self._comnt_start_found = True - self._comnt_end_found = False - self._comnt = "" if not self._comnt else self._comnt + "\n" - self._comnt = self._comnt + "".join(text[2:]) - elif "#" == text[0]: - self._comnt_start_found = True - self._comnt_end_found = False - self._comnt = "" if not self._comnt else self._comnt + "\n" - self._comnt = self._comnt + "".join(text[1:]) - elif "post_comment" == text: - self._comnt_end_found = True - self._comnt_start_found = False - # for any line after 'comment block' found - elif self._comnt_start_found: - self._comnt_start_found = False - self._comnt_end_found = True - - # pylint: disable=arguments-differ - def store_element(self, line_key, line_number): - """ - Store comment content and information of commen location (for what comment is - created.). - """ - self._elemt = {} - self._elemt[line_key] = int(line_number) - self._is_elemt_found = False - self._is_elemt_stored = True - - def get_comment_text(self): - """ - Return list of comments if there are multiple comment for same yaml line. - """ - return self._comnt_list - - def get_line_number(self, line_key): - """ - Retrun line number for what line the comment is created - """ - return self._elemt[line_key] - - def get_line_info(self): - """ - Return line annotation and line number from a comment. - """ - for line_anno, line_loc in self._elemt.items(): - return line_anno, line_loc - - def replace_scape_char(self, text): - """Replace escape char according to __comment_escape_char dict""" - for ecp_char, ecp_alt in YAMLComment.__comment_escape_char.items(): - if ecp_char in text: - text = text.replace(ecp_char, ecp_alt) - return text - - def get_element_location(self): - """ - Retrun yaml line '__line__KEY' info and and line numner - """ - if len(self._elemt) > 1: - raise ValueError(f"Comment element should be one but got " f"{self._elemt}") - - for key, val in self._elemt.items(): - yield key, val - - def collect_yaml_line_info(self, yaml_dict, line_info_dict): - """Collect __line__key and corresponding value from - a yaml file dictonary in another dictionary. - """ - for line_key, line_n in yaml_dict.items(): - if "__line__" in line_key: - line_info_dict[line_n] = line_key - - for _, val in yaml_dict.items(): - if isinstance(val, dict): - self.collect_yaml_line_info(val, line_info_dict) - - def __contains__(self, line_key): - """For Checking whether __line__NAME is in _elemt dict or not.""" - return line_key in self._elemt - - def __eq__(self, comment_obj): - """Check the self has same value as right comment.""" - if len(self._comnt_list) != len(comment_obj._comnt_list): - return False - for left_cmnt, right_cmnt in zip(self._comnt_list, comment_obj._comnt_list): - left_cmnt = left_cmnt.split("\n") - right_cmnt = right_cmnt.split("\n") - for left_line, right_line in zip(left_cmnt, right_cmnt): - if left_line.strip() != right_line.strip(): - return False - return True diff --git a/pynxtools/nyaml2nxdl/nyaml2nxdl.py b/pynxtools/nyaml2nxdl/nyaml2nxdl.py deleted file mode 100755 index f9703f160..000000000 --- a/pynxtools/nyaml2nxdl/nyaml2nxdl.py +++ /dev/null @@ -1,253 +0,0 @@ -#!/usr/bin/env python3 -"""Main file of yaml2nxdl tool. -Users create NeXus instances by writing a YAML file -which details a hierarchy of data/metadata elements - -""" -# -*- coding: utf-8 -*- -# -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import os -import xml.etree.ElementTree as ET - -import click -from pynxtools.nyaml2nxdl.nyaml2nxdl_helper import ( - get_sha256_hash, - extend_yamlfile_with_comment, - separate_hash_yaml_and_nxdl, -) -from pynxtools.nyaml2nxdl.nyaml2nxdl_forward_tools import nyaml2nxdl, pretty_print_xml -from pynxtools.nyaml2nxdl.nyaml2nxdl_backward_tools import ( - Nxdl2yaml, - compare_niac_and_my, -) - - -DEPTH_SIZE = 4 * " " - -# NOTE: Some handful links for nyaml2nxdl converter: -# https://manual.nexusformat.org/nxdl_desc.html?highlight=optional - - -def generate_nxdl_or_retrieve_nxdl(yaml_file, out_xml_file, verbose): - """ - Generate yaml, nxdl and hash. - if the extracted hash is exactly the same as producd from generated yaml then - retrieve the nxdl part from provided yaml. - Else, generate nxdl from separated yaml with the help of nyaml2nxdl function - """ - pa_path, rel_file = os.path.split(yaml_file) - sep_yaml = os.path.join(pa_path, f"temp_{rel_file}") - hash_found = separate_hash_yaml_and_nxdl(yaml_file, sep_yaml, out_xml_file) - - if hash_found: - gen_hash = get_sha256_hash(sep_yaml) - if hash_found == gen_hash: - os.remove(sep_yaml) - return - - nyaml2nxdl(sep_yaml, out_xml_file, verbose) - os.remove(sep_yaml) - - -# pylint: disable=too-many-locals -def append_yml(input_file, append, verbose): - """Append to an existing NeXus base class new elements provided in YML input file \ -and print both an XML and YML file of the extended base class. - -""" - nexus_def_path = os.path.join( - os.path.abspath(os.path.dirname(__file__)), "../../definitions" - ) - assert [ - s - for s in os.listdir(os.path.join(nexus_def_path, "base_classes")) - if append.strip() == s.replace(".nxdl.xml", "") - ], "Your base class extension does not match any existing NeXus base classes" - tree = ET.parse( - os.path.join(nexus_def_path + "/base_classes", append + ".nxdl.xml") - ) - root = tree.getroot() - # warning: tmp files are printed on disk and removed at the ends!! - pretty_print_xml(root, "tmp.nxdl.xml") - input_tmp_xml = "tmp.nxdl.xml" - out_tmp_yml = "tmp_parsed.yaml" - converter = Nxdl2yaml([], []) - converter.print_yml(input_tmp_xml, out_tmp_yml, verbose) - nyaml2nxdl(input_file=out_tmp_yml, out_file="tmp_parsed.nxdl.xml", verbose=verbose) - tree = ET.parse("tmp_parsed.nxdl.xml") - tree2 = ET.parse(input_file) - root_no_duplicates = ET.Element( - "definition", - { - "xmlns": "http://definition.nexusformat.org/nxdl/3.1", - "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", - "xsi:schemaLocation": "http://www.w3.org/2001/XMLSchema-instance", - }, - ) - for attribute_keys in root.attrib.keys(): - if ( - attribute_keys - != "{http://www.w3.org/2001/XMLSchema-instance}schemaLocation" - ): - attribute_value = root.attrib[attribute_keys] - root_no_duplicates.set(attribute_keys, attribute_value) - for elems in root.iter(): - if "doc" in elems.tag: - root_doc = ET.SubElement(root_no_duplicates, "doc") - root_doc.text = elems.text - break - group = "{http://definition.nexusformat.org/nxdl/3.1}group" - root_no_duplicates = compare_niac_and_my( - tree, tree2, verbose, group, root_no_duplicates - ) - field = "{http://definition.nexusformat.org/nxdl/3.1}field" - root_no_duplicates = compare_niac_and_my( - tree, tree2, verbose, field, root_no_duplicates - ) - attribute = "{http://definition.nexusformat.org/nxdl/3.1}attribute" - root_no_duplicates = compare_niac_and_my( - tree, tree2, verbose, attribute, root_no_duplicates - ) - pretty_print_xml( - root_no_duplicates, - f"{input_file.replace('.nxdl.xml', '')}" f"_appended.nxdl.xml", - ) - - input_file_xml = input_file.replace(".nxdl.xml", "_appended.nxdl.xml") - out_file_yml = input_file.replace(".nxdl.xml", "_appended_parsed.yaml") - converter = Nxdl2yaml([], []) - converter.print_yml(input_file_xml, out_file_yml, verbose) - nyaml2nxdl( - input_file=out_file_yml, - out_file=out_file_yml.replace(".yaml", ".nxdl.xml"), - verbose=verbose, - ) - os.rename( - f"{input_file.replace('.nxdl.xml', '_appended_parsed.yaml')}", - f"{input_file.replace('.nxdl.xml', '_appended.yaml')}", - ) - os.rename( - f"{input_file.replace('.nxdl.xml', '_appended_parsed.nxdl.xml')}", - f"{input_file.replace('.nxdl.xml', '_appended.nxdl.xml')}", - ) - os.remove("tmp.nxdl.xml") - os.remove("tmp_parsed.yaml") - os.remove("tmp_parsed.nxdl.xml") - - -def split_name_and_extension(file_name): - """ - Split file name into extension and rest of the file name. - return file raw nam and extension - """ - parts = file_name.rsplit(".", 3) - if len(parts) == 2: - raw = parts[0] - ext = parts[1] - if len(parts) == 3: - raw = parts[0] - ext = ".".join(parts[1:]) - - return raw, ext - - -@click.command() -@click.option( - "--input-file", - required=True, - prompt=True, - help="The path to the XML or YAML input data file to read and create \ -a YAML or XML file from, respectively.", -) -@click.option( - "--append", - help="Parse xml file and append to base class, given that the xml file has same name \ -of an existing base class", -) -@click.option( - "--check-consistency", - is_flag=True, - default=False, - help=( - "Check wether yaml or nxdl has followed general rules of scema or not" - "check whether your comment in the right place or not. The option render an " - "output file of the same extension(*_consistency.yaml or *_consistency.nxdl.xml)" - ), -) -@click.option( - "--verbose", - is_flag=True, - default=False, - help="Print in standard output keywords and value types to help \ -possible issues in yaml files", -) -def launch_tool(input_file, verbose, append, check_consistency): - """ - Main function that distiguishes the input file format and launches the tools. - """ - if os.path.isfile(input_file): - raw_name, ext = split_name_and_extension(input_file) - else: - raise ValueError("Need a valid input file.") - - if ext == "yaml": - xml_out_file = raw_name + ".nxdl.xml" - generate_nxdl_or_retrieve_nxdl(input_file, xml_out_file, verbose) - if append: - append_yml(raw_name + ".nxdl.xml", append, verbose) - # For consistency running - if check_consistency: - yaml_out_file = raw_name + "_consistency." + ext - converter = Nxdl2yaml([], []) - converter.print_yml(xml_out_file, yaml_out_file, verbose) - os.remove(xml_out_file) - elif ext == "nxdl.xml": - if not append: - yaml_out_file = raw_name + "_parsed" + ".yaml" - converter = Nxdl2yaml([], []) - converter.print_yml(input_file, yaml_out_file, verbose) - # Append nxdl.xml file with yaml output file - yaml_hash = get_sha256_hash(yaml_out_file) - # Lines as divider between yaml and nxdl - top_lines = [ - ( - "\n# ++++++++++++++++++++++++++++++++++ SHA HASH" - " ++++++++++++++++++++++++++++++++++\n" - ), - f"# {yaml_hash}\n", - ] - - extend_yamlfile_with_comment( - yaml_file=yaml_out_file, - file_to_be_appended=input_file, - top_lines_list=top_lines, - ) - else: - append_yml(input_file, append, verbose) - # Taking care of consistency running - if check_consistency: - xml_out_file = raw_name + "_consistency." + ext - generate_nxdl_or_retrieve_nxdl(yaml_out_file, xml_out_file, verbose) - os.remove(yaml_out_file) - else: - raise ValueError("Provide correct file with extension '.yaml or '.nxdl.xml") - - -if __name__ == "__main__": - launch_tool().parse() # pylint: disable=no-value-for-parameter diff --git a/pynxtools/nyaml2nxdl/nyaml2nxdl_backward_tools.py b/pynxtools/nyaml2nxdl/nyaml2nxdl_backward_tools.py deleted file mode 100755 index f69e30665..000000000 --- a/pynxtools/nyaml2nxdl/nyaml2nxdl_backward_tools.py +++ /dev/null @@ -1,1047 +0,0 @@ -#!/usr/bin/env python3 -"""This file collects the function used in the reverse tool nxdl2yaml. - -""" -# -*- coding: utf-8 -*- -# -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import sys -from typing import List, Dict -import xml.etree.ElementTree as ET -import os - -from pynxtools.nyaml2nxdl.nyaml2nxdl_helper import ( - get_node_parent_info, - get_yaml_escape_char_dict, - cleaning_empty_lines, -) -from pynxtools.dataconverter.helpers import remove_namespace_from_tag - - -DEPTH_SIZE = " " -CMNT_TAG = "!--" - - -def separate_pi_comments(input_file): - """ - Separate PI comments from ProcessesInstruction (pi) - """ - comments_list = [] - comment = [] - xml_lines = [] - - with open(input_file, "r", encoding="utf-8") as file: - lines = file.readlines() - has_pi = True - for line in lines: - c_start = "" - def_tag = " 0 and has_pi: - comment.append(line.replace(cmnt_end, "")) - comments_list.append("".join(comment)) - comment = [] - elif def_tag in line or not has_pi: - has_pi = False - xml_lines.append(line) - elif len(comment) > 0 and has_pi: - comment.append(line) - else: - xml_lines.append(line) - return comments_list, "".join(xml_lines) - - -# Collected: https://dustinoprea.com/2019/01/22/python-parsing-xml-and-retaining-the-comments/ -class _CommentedTreeBuilder(ET.TreeBuilder): - def comment(self, text): - """ - defining comment builder in TreeBuilder - """ - self.start("!--", {}) - self.data(text) - self.end("--") - - -def parse(filepath): - """ - Construct parse function for modified tree builder for including modified TreeBuilder - and rebuilding XMLParser. - """ - comments, xml_str = separate_pi_comments(filepath) - ctb = _CommentedTreeBuilder() - xp_parser = ET.XMLParser(target=ctb) - root = ET.fromstring(xml_str, parser=xp_parser) - return comments, root - - -def handle_mapping_char(text, depth=-1, skip_n_line_on_top=False): - """Check for ":" char and replace it by "':'".""" - - escape_char = get_yaml_escape_char_dict() - for esc_key, val in escape_char.items(): - if esc_key in text: - text = text.replace(esc_key, val) - if not skip_n_line_on_top: - if depth > 0: - text = add_new_line_with_pipe_on_top(text, depth) - else: - raise ValueError("Need depth size to co-ordinate text line in yaml file.") - return text - - -def add_new_line_with_pipe_on_top(text, depth): - """ - Return modified text for what we get error in converter, such as ':'. After adding a - new line at the start of text the error is solved. - """ - char_list_to_add_new_line_on_top_of_text = [":"] - for char in char_list_to_add_new_line_on_top_of_text: - if char in text: - return "|" + "\n" + depth * DEPTH_SIZE + text - return text - - -# pylint: disable=too-many-instance-attributes -class Nxdl2yaml: - """ - Parse XML file and print a YML file - """ - - def __init__( - self, - symbol_list: List[str], - root_level_definition: List[str], - root_level_doc="", - root_level_symbols="", - ): - # updated part of yaml_dict - self.found_definition = False - self.root_level_doc = root_level_doc - self.root_level_symbols = root_level_symbols - self.root_level_definition = root_level_definition - self.symbol_list = symbol_list - self.is_last_element_comment = False - self.include_comment = True - self.pi_comments = None - # NOTE: Here is how root_level_comments organised for storing comments - # root_level_comment= {'root_doc': comment, - # 'symbols': comment, - # The 'symbol_doc_comments' list is for comments from all 'symbol doc' - # 'symbol_doc_comments' : [comments] - # 'symbol_list': [symbols], - # The 'symbol_comments' contains comments for 'symbols doc' and all 'symbol' - # 'symbol_comments': [comments]} - self.root_level_comment: Dict[str, str] = {} - - def print_yml(self, input_file, output_yml, verbose): - """ - Parse an XML file provided as input and print a YML file - """ - if os.path.isfile(output_yml): - os.remove(output_yml) - - depth = 0 - - self.pi_comments, root = parse(input_file) - xml_tree = {"tree": root, "node": root} - self.xmlparse(output_yml, xml_tree, depth, verbose) - - def handle_symbols(self, depth, node): - """Handle symbols field and its childs symbol""" - - # pylint: disable=consider-using-f-string - self.root_level_symbols = ( - f"{remove_namespace_from_tag(node.tag)}: " - f"{node.text.strip() if node.text else ''}" - ) - depth += 1 - last_comment = "" - sbl_doc_cmnt_list = [] - # Comments that come above symbol tag - symbol_cmnt_list = [] - for child in list(node): - tag = remove_namespace_from_tag(child.tag) - if tag == CMNT_TAG and self.include_comment: - last_comment = self.comvert_to_ymal_comment( - depth * DEPTH_SIZE, child.text - ) - if tag == "doc": - symbol_cmnt_list.append(last_comment) - # The bellow line is for handling lenth of 'symbol_comments' and - # 'symbol_doc_comments'. Otherwise print_root_level_info() gets inconsistency - # over for the loop while writting comment on file - sbl_doc_cmnt_list.append("") - last_comment = "" - self.symbol_list.append( - self.handle_not_root_level_doc(depth, text=child.text) - ) - elif tag == "symbol": - # place holder is symbol name - symbol_cmnt_list.append(last_comment) - last_comment = "" - if "doc" in child.attrib: - self.symbol_list.append( - self.handle_not_root_level_doc( - depth, tag=child.attrib["name"], text=child.attrib["doc"] - ) - ) - else: - for symbol_doc in list(child): - tag = remove_namespace_from_tag(symbol_doc.tag) - if tag == CMNT_TAG and self.include_comment: - last_comment = self.comvert_to_ymal_comment( - depth * DEPTH_SIZE, symbol_doc.text - ) - if tag == "doc": - sbl_doc_cmnt_list.append(last_comment) - last_comment = "" - self.symbol_list.append( - self.handle_not_root_level_doc( - depth, - tag=child.attrib["name"], - text=symbol_doc.text, - ) - ) - self.store_root_level_comments("symbol_doc_comments", sbl_doc_cmnt_list) - self.store_root_level_comments("symbol_comments", symbol_cmnt_list) - - def store_root_level_comments(self, holder, comment): - """Store yaml text or section line and the comments inteded for that lines or section""" - - self.root_level_comment[holder] = comment - - def handle_definition(self, node): - """ - Handle definition group and its attributes - NOTE: Here we tried to store the order of the xml element attributes. So that we get - exactly the same file in nxdl from yaml. - """ - # pylint: disable=consider-using-f-string - # self.root_level_definition[0] = '' - keyword = "" - # tmp_word for reseving the location - tmp_word = "#xx#" - attribs = node.attrib - # for tracking the order of name and type - keyword_order = -1 - for item in attribs: - if "name" in item: - keyword = keyword + attribs[item] - if keyword_order == -1: - self.root_level_definition.append(tmp_word) - keyword_order = self.root_level_definition.index(tmp_word) - elif "extends" in item: - keyword = f"{keyword}({attribs[item]})" - if keyword_order == -1: - self.root_level_definition.append(tmp_word) - keyword_order = self.root_level_definition.index(tmp_word) - elif "schemaLocation" not in item and "extends" != item: - text = f"{item}: {attribs[item]}" - self.root_level_definition.append(text) - self.root_level_definition[keyword_order] = f"{keyword}:" - - def handle_root_level_doc(self, node): - """ - Handle the documentation field found at root level. - """ - # tag = remove_namespace_from_tag(node.tag) - text = node.text - text = self.handle_not_root_level_doc(depth=0, text=text) - self.root_level_doc = text - - # pylint: disable=too-many-branches - def handle_not_root_level_doc(self, depth, text, tag="doc", file_out=None): - """ - Handle docs field along the yaml file. In this function we also tried to keep - the track of intended indentation. E.g. the bollow doc block. - * Topic name - Description of topic - """ - - # Handling empty doc - if not text: - text = "" - else: - text = handle_mapping_char(text, -1, True) - if "\n" in text: - # To remove '\n' character as it will be added before text. - text = cleaning_empty_lines(text.split("\n")) - text_tmp = [] - yaml_indent_n = len((depth + 1) * DEPTH_SIZE) - # Find indentaion in the first text line with alphabet - tmp_i = 0 - while tmp_i != -1: - first_line_indent_n = 0 - # Taking care of empty text whitout any character - if len(text) == 1 and text[0] == "": - break - for ch_ in text[tmp_i]: - if ch_ == " ": - first_line_indent_n = first_line_indent_n + 1 - elif ch_ != "": - tmp_i = -2 - break - tmp_i = tmp_i + 1 - # Taking care of doc like bellow: - # Text liness - # text continues - # So no indentaion at the staring or doc. So doc group will come along general - # alignment - if first_line_indent_n == 0: - first_line_indent_n = yaml_indent_n - - # for indent_diff -ve all lines will move left by the same ammout - # for indect_diff +ve all lines will move right the same amount - indent_diff = yaml_indent_n - first_line_indent_n - # CHeck for first line empty if not keep first line empty - - for _, line in enumerate(text): - line_indent_n = 0 - # Collect first empty space without alphabate - for ch_ in line: - if ch_ == " ": - line_indent_n = line_indent_n + 1 - else: - break - line_indent_n = line_indent_n + indent_diff - if line_indent_n < yaml_indent_n: - # if line still under yaml identation - text_tmp.append(yaml_indent_n * " " + line.strip()) - else: - text_tmp.append(line_indent_n * " " + line.strip()) - - text = "\n" + "\n".join(text_tmp) - if "}" in tag: - tag = remove_namespace_from_tag(tag) - indent = depth * DEPTH_SIZE - elif text: - text = "\n" + (depth + 1) * DEPTH_SIZE + text.strip() - if "}" in tag: - tag = remove_namespace_from_tag(tag) - indent = depth * DEPTH_SIZE - else: - text = "" - if "}" in tag: - tag = remove_namespace_from_tag(tag) - indent = depth * DEPTH_SIZE - - doc_str = f"{indent}{tag}: |{text}\n" - if file_out: - file_out.write(doc_str) - return None - return doc_str - - def write_out(self, indent, text, file_out): - """ - Write text line in output file. - """ - line_string = f"{indent}{text.rstrip()}\n" - file_out.write(line_string) - - def print_root_level_doc(self, file_out): - """ - Print at the root level of YML file \ - the general documentation field found in XML file - """ - indent = 0 * DEPTH_SIZE - - if ( - "root_doc" in self.root_level_comment - and self.root_level_comment["root_doc"] != "" - ): - text = self.root_level_comment["root_doc"] - self.write_out(indent, text, file_out) - - text = self.root_level_doc - self.write_out(indent, text, file_out) - self.root_level_doc = "" - - def comvert_to_ymal_comment(self, indent, text): - """ - Convert into yaml comment by adding exta '#' char in front of comment lines - """ - lines = text.split("\n") - mod_lines = [] - for line in lines: - line = line.strip() - if line and line[0] != "#": - line = indent + "# " + line - mod_lines.append(line) - elif line: - line = indent + line - mod_lines.append(line) - # The starting '\n' to keep multiple comments separate - return "\n" + "\n".join(mod_lines) - - def print_root_level_info(self, depth, file_out): - """ - Print at the root level of YML file \ - the information stored as definition attributes in the XML file - """ - # pylint: disable=consider-using-f-string - if depth < 0: - raise ValueError("Somthing wrong with indentaion in root level.") - - has_categoty = False - for def_line in self.root_level_definition: - if def_line in ("category: application", "category: base"): - self.write_out(indent=0 * DEPTH_SIZE, text=def_line, file_out=file_out) - # file_out.write(f"{def_line}\n") - has_categoty = True - - if not has_categoty: - raise ValueError( - "Definition dose not get any category from 'base or application'." - ) - self.print_root_level_doc(file_out) - if ( - "symbols" in self.root_level_comment - and self.root_level_comment["symbols"] != "" - ): - indent = depth * DEPTH_SIZE - text = self.root_level_comment["symbols"] - self.write_out(indent, text, file_out) - if self.root_level_symbols: - self.write_out( - indent=0 * DEPTH_SIZE, text=self.root_level_symbols, file_out=file_out - ) - # symbol_list include 'symbols doc', and all 'symbol' - for ind, symbol in enumerate(self.symbol_list): - # Taking care of comments that come on to of 'symbols doc' and 'symbol' - if ( - "symbol_comments" in self.root_level_comment - and self.root_level_comment["symbol_comments"][ind] != "" - ): - indent = depth * DEPTH_SIZE - self.write_out( - indent, - self.root_level_comment["symbol_comments"][ind], - file_out, - ) - if ( - "symbol_doc_comments" in self.root_level_comment - and self.root_level_comment["symbol_doc_comments"][ind] != "" - ): - indent = depth * DEPTH_SIZE - self.write_out( - indent, - self.root_level_comment["symbol_doc_comments"][ind], - file_out, - ) - - self.write_out(indent=(0 * DEPTH_SIZE), text=symbol, file_out=file_out) - if len(self.pi_comments) > 1: - indent = DEPTH_SIZE * depth - # The first comment is top level copy-right doc string - for comment in self.pi_comments[1:]: - self.write_out( - indent, self.comvert_to_ymal_comment(indent, comment), file_out - ) - if self.root_level_definition: - # Soring NXname for writting end of the definition attributes - nx_name = "" - for defs in self.root_level_definition: - if "NX" in defs and defs[-1] == ":": - nx_name = defs - continue - if defs in ("category: application", "category: base"): - continue - self.write_out(indent=0 * DEPTH_SIZE, text=defs, file_out=file_out) - self.write_out(indent=0 * DEPTH_SIZE, text=nx_name, file_out=file_out) - self.found_definition = False - - def handle_exists(self, exists_dict, key, val): - """ - Create exist component as folows: - - {'min' : value for min, - 'max' : value for max, - 'optional' : value for optional} - - This is created separately so that the keys stays in order. - """ - if not val: - val = "" - else: - val = str(val) - if "minOccurs" == key: - exists_dict["minOccurs"] = ["min", val] - if "maxOccurs" == key: - exists_dict["maxOccurs"] = ["max", val] - if "optional" == key: - exists_dict["optional"] = ["optional", val] - if "recommended" == key: - exists_dict["recommended"] = ["recommended", val] - if "required" == key: - exists_dict["required"] = ["required", val] - - # pylint: disable=too-many-branches, consider-using-f-string - def handle_group_or_field(self, depth, node, file_out): - """Handle all the possible attributes that come along a field or group""" - - allowed_attr = [ - "optional", - "recommended", - "name", - "type", - "axes", - "axis", - "data_offset", - "interpretation", - "long_name", - "maxOccurs", - "minOccurs", - "nameType", - "optional", - "primary", - "signal", - "stride", - "units", - "required", - "deprecated", - "exists", - ] - - name_type = "" - node_attr = node.attrib - rm_key_list = [] - # Maintain order: name and type in form name(type) or (type)name that come first - for key, val in node_attr.items(): - if key == "name": - name_type = name_type + val - rm_key_list.append(key) - if key == "type": - name_type = name_type + "(%s)" % val - rm_key_list.append(key) - if not name_type: - raise ValueError( - f"No 'name' or 'type' hase been found. But, 'group' or 'field' " - f"must have at list a nme.We got attributes: {node_attr}" - ) - file_out.write( - "{indent}{name_type}:\n".format( - indent=depth * DEPTH_SIZE, name_type=name_type - ) - ) - - for key in rm_key_list: - del node_attr[key] - - # tmp_dict intended to persevere order of attribnutes - tmp_dict = {} - exists_dict = {} - for key, val in node_attr.items(): - # As both 'minOccurs', 'maxOccurs' and optionality move to the 'exists' - if key in ["minOccurs", "maxOccurs", "optional", "recommended", "required"]: - if "exists" not in tmp_dict: - tmp_dict["exists"] = [] - self.handle_exists(exists_dict, key, val) - elif key == "units": - tmp_dict["unit"] = str(val) - else: - tmp_dict[key] = str(val) - if key not in allowed_attr: - raise ValueError( - f"An attribute ({key}) in 'field' or 'group' has been found " - f"that is not allowed. The allowed attr is {allowed_attr}." - ) - - if exists_dict: - for key, val in exists_dict.items(): - if key in ["minOccurs", "maxOccurs"]: - tmp_dict["exists"] = tmp_dict["exists"] + val - elif key in ["optional", "recommended", "required"]: - tmp_dict["exists"] = key - - depth_ = depth + 1 - for key, val in tmp_dict.items(): - # Increase depth size inside handle_map...() for writting text with one - # more indentation. - file_out.write( - f"{depth_ * DEPTH_SIZE}{key}: " - f"{handle_mapping_char(val, depth_ + 1, False)}\n" - ) - - # pylint: disable=too-many-branches, too-many-locals - def handle_dimension(self, depth, node, file_out): - """ - Handle the dimension field. - NOTE: Usually we take care of any xml element in xmlparse(...) and - recursion_in_xml_tree(...) functions. But Here it is a bit different. The doc dimension - and attributes of dim has been handled inside this function here. - """ - # pylint: disable=consider-using-f-string - possible_dim_attrs = ["ref", "required", "incr", "refindex"] - possible_dimemsion_attrs = ["rank"] - - # taking care of Dimension tag - file_out.write( - "{indent}{tag}:\n".format( - indent=depth * DEPTH_SIZE, tag=node.tag.split("}", 1)[1] - ) - ) - # Taking care of dimension attributes - for attr, value in node.attrib.items(): - if attr in possible_dimemsion_attrs and not isinstance(value, dict): - indent = (depth + 1) * DEPTH_SIZE - file_out.write(f"{indent}{attr}: {value}\n") - else: - raise ValueError( - f"Dimension has got an attribute {attr} that is not valid." - f"Current the allowd atributes are {possible_dimemsion_attrs}." - f" Please have a look" - ) - # taking carew of dimension doc - for child in list(node): - tag = remove_namespace_from_tag(child.tag) - if tag == "doc": - text = self.handle_not_root_level_doc(depth + 1, child.text) - file_out.write(text) - node.remove(child) - - dim_index_value = "" - dim_other_parts = {} - dim_cmnt_node = [] - # taking care of dim and doc childs of dimension - for child in list(node): - tag = remove_namespace_from_tag(child.tag) - child_attrs = child.attrib - # taking care of index and value attributes - if tag == ("dim"): - # taking care of index and value in format [[index, value]] - dim_index_value = dim_index_value + "[{index}, {value}], ".format( - index=child_attrs["index"] if "index" in child_attrs else "", - value=child_attrs["value"] if "value" in child_attrs else "", - ) - if "index" in child_attrs: - del child_attrs["index"] - if "value" in child_attrs: - del child_attrs["value"] - - # Taking care of doc comes as child of dim - for cchild in list(child): - ttag = cchild.tag.split("}", 1)[1] - if ttag == ("doc"): - if ttag not in dim_other_parts: - dim_other_parts[ttag] = [] - text = cchild.text - dim_other_parts[ttag].append(text.strip()) - child.remove(cchild) - continue - # taking care of other attributes except index and value - for attr, value in child_attrs.items(): - if attr in possible_dim_attrs: - if attr not in dim_other_parts: - dim_other_parts[attr] = [] - dim_other_parts[attr].append(value) - if tag == CMNT_TAG and self.include_comment: - # Store and remove node so that comment nodes from dim node so - # that it does not call in xmlparser function - dim_cmnt_node.append(child) - node.remove(child) - - # All 'dim' element comments on top of 'dim' yaml key - if dim_cmnt_node: - for ch_nd in dim_cmnt_node: - self.handel_comment(depth + 1, ch_nd, file_out) - # index and value attributes of dim elements - file_out.write( - "{indent}dim: [{value}]\n".format( - indent=(depth + 1) * DEPTH_SIZE, value=dim_index_value[:-2] or "" - ) - ) - # Write the attributes, except index and value, and doc of dim as child of dim_parameter. - # But tthe doc or attributes for each dim come inside list according to the order of dim. - if dim_other_parts: - file_out.write( - "{indent}dim_parameters:\n".format(indent=(depth + 1) * DEPTH_SIZE) - ) - # depth = depth + 2 dim_paramerter has child such as doc of dim - indent = (depth + 2) * DEPTH_SIZE - for key, value in dim_other_parts.items(): - if key == "doc": - value = self.handle_not_root_level_doc( - depth + 2, str(value), key, file_out - ) - else: - # Increase depth size inside handle_map...() for writting text with one - # more indentation. - file_out.write( - f"{indent}{key}: " - f"{handle_mapping_char(value, depth + 3, False)}\n" - ) - - def handle_enumeration(self, depth, node, file_out): - """ - Handle the enumeration field parsed from the xml file. - - If the enumeration items contain a doc field, the yaml file will contain items as child - fields of the enumeration field. - - If no doc are inherited in the enumeration items, a list of the items is given for the - enumeration list. - - """ - # pylint: disable=consider-using-f-string - - check_doc = [] - for child in list(node): - if list(child): - check_doc.append(list(child)) - # pylint: disable=too-many-nested-blocks - if check_doc: - file_out.write( - "{indent}{tag}: \n".format( - indent=depth * DEPTH_SIZE, tag=node.tag.split("}", 1)[1] - ) - ) - for child in list(node): - tag = remove_namespace_from_tag(child.tag) - itm_depth = depth + 1 - if tag == ("item"): - file_out.write( - "{indent}{value}: \n".format( - indent=(itm_depth) * DEPTH_SIZE, value=child.attrib["value"] - ) - ) - - if list(child): - for item_doc in list(child): - if remove_namespace_from_tag(item_doc.tag) == "doc": - item_doc_depth = itm_depth + 1 - self.handle_not_root_level_doc( - item_doc_depth, - item_doc.text, - item_doc.tag, - file_out, - ) - if ( - remove_namespace_from_tag(item_doc.tag) == CMNT_TAG - and self.include_comment - ): - self.handel_comment(itm_depth + 1, item_doc, file_out) - if tag == CMNT_TAG and self.include_comment: - self.handel_comment(itm_depth + 1, child, file_out) - else: - enum_list = "" - remove_nodes = [] - for item_child in list(node): - tag = remove_namespace_from_tag(item_child.tag) - if tag == ("item"): - enum_list = enum_list + "{value}, ".format( - value=item_child.attrib["value"] - ) - if tag == CMNT_TAG and self.include_comment: - self.handel_comment(depth, item_child, file_out) - remove_nodes.append(item_child) - for ch_node in remove_nodes: - node.remove(ch_node) - - file_out.write( - "{indent}{tag}: [{enum_list}]\n".format( - indent=depth * DEPTH_SIZE, - tag=remove_namespace_from_tag(node.tag), - enum_list=enum_list[:-2] or "", - ) - ) - - def handle_attributes(self, depth, node, file_out): - """Handle the attributes parsed from the xml file""" - - allowed_attr = [ - "name", - "type", - "units", - "nameType", - "recommended", - "optional", - "minOccurs", - "maxOccurs", - "deprecated", - ] - - name = "" - node_attr = node.attrib - if "name" in node_attr: - pass - else: - raise ValueError("Attribute must have an name key.") - rm_key_list = [] - # Maintain order: name and type in form name(type) or (type)name that come first - for key, val in node_attr.items(): - if key == "name": - name = val - rm_key_list.append(key) - - for key in rm_key_list: - del node_attr[key] - - file_out.write( - "{indent}{escapesymbol}{name}:\n".format( - indent=depth * DEPTH_SIZE, escapesymbol=r"\@", name=name - ) - ) - - tmp_dict = {} - exists_dict = {} - for key, val in node_attr.items(): - # As both 'minOccurs', 'maxOccurs' and optionality move to the 'exists' - if key in ["minOccurs", "maxOccurs", "optional", "recommended", "required"]: - if "exists" not in tmp_dict: - tmp_dict["exists"] = [] - self.handle_exists(exists_dict, key, val) - elif key == "units": - tmp_dict["unit"] = val - else: - tmp_dict[key] = val - if key not in allowed_attr: - raise ValueError( - f"An attribute ({key}) has been found that is not allowed." - f"The allowed attr is {allowed_attr}." - ) - - has_min_max = False - has_opt_reco_requ = False - if exists_dict: - for key, val in exists_dict.items(): - if key in ["minOccurs", "maxOccurs"]: - tmp_dict["exists"] = tmp_dict["exists"] + val - has_min_max = True - elif key in ["optional", "recommended", "required"]: - tmp_dict["exists"] = key - has_opt_reco_requ = True - if has_min_max and has_opt_reco_requ: - raise ValueError( - "Optionality 'exists' can take only either from ['minOccurs'," - " 'maxOccurs'] or from ['optional', 'recommended', 'required']" - ". But not from both of the groups together. Please check in" - " attributes" - ) - - depth_ = depth + 1 - for key, val in tmp_dict.items(): - # Increase depth size inside handle_map...() for writting text with one - # more indentation. - file_out.write( - f"{depth_ * DEPTH_SIZE}{key}: " - f"{handle_mapping_char(val, depth_ + 1, False)}\n" - ) - - def handel_link(self, depth, node, file_out): - """ - Handle link elements of nxdl - """ - - possible_link_attrs = ["name", "target", "napimount"] - node_attr = node.attrib - # Handle special cases - if "name" in node_attr: - file_out.write( - "{indent}{name}(link):\n".format( - indent=depth * DEPTH_SIZE, name=node_attr["name"] or "" - ) - ) - del node_attr["name"] - - depth_ = depth + 1 - # Handle general cases - for attr_key, val in node_attr.items(): - if attr_key in possible_link_attrs: - file_out.write( - "{indent}{attr}: {value}\n".format( - indent=depth_ * DEPTH_SIZE, attr=attr_key, value=val - ) - ) - else: - raise ValueError( - f"An anexpected attribute '{attr_key}' of link has found." - f"At this moment the alloed keys are {possible_link_attrs}" - ) - - def handel_choice(self, depth, node, file_out): - """ - Handle choice element which is a parent node of group. - """ - - possible_attr = [] - - node_attr = node.attrib - # Handle special casees - if "name" in node_attr: - file_out.write( - "{indent}{attr}(choice): \n".format( - indent=depth * DEPTH_SIZE, attr=node_attr["name"] - ) - ) - del node_attr["name"] - - depth_ = depth + 1 - # Taking care of general attrinutes. Though, still no attrinutes have found, - # but could be used for future - for attr in node_attr.items(): - if attr in possible_attr: - file_out.write( - "{indent}{attr}: {value}\n".format( - indent=depth_ * DEPTH_SIZE, attr=attr, value=node_attr[attr] - ) - ) - else: - raise ValueError( - f"An unexpected attribute '{attr}' of 'choice' has been found." - f"At this moment attributes for choice {possible_attr}" - ) - - def handel_comment(self, depth, node, file_out): - """ - Collect comment element and pass to write_out function - """ - indent = depth * DEPTH_SIZE - if self.is_last_element_comment: - text = self.comvert_to_ymal_comment(indent, node.text) - self.write_out(indent, text, file_out) - else: - text = self.comvert_to_ymal_comment(indent, node.text) - self.write_out(indent, text, file_out) - self.is_last_element_comment = True - - def recursion_in_xml_tree(self, depth, xml_tree, output_yml, verbose): - """ - Descend lower level in xml tree. If we are in the symbols branch, the recursive - behaviour is not triggered as we already handled the symbols' childs. - """ - - tree = xml_tree["tree"] - node = xml_tree["node"] - for child in list(node): - xml_tree_children = {"tree": tree, "node": child} - self.xmlparse(output_yml, xml_tree_children, depth, verbose) - - # pylint: disable=too-many-branches, too-many-statements - def xmlparse(self, output_yml, xml_tree, depth, verbose): - """ - Main of the nxdl2yaml converter. - It parses XML tree, then prints recursively each level of the tree - """ - tree = xml_tree["tree"] - node = xml_tree["node"] - if verbose: - sys.stdout.write(f"Node tag: {remove_namespace_from_tag(node.tag)}\n") - sys.stdout.write(f"Attributes: {node.attrib}\n") - with open(output_yml, "a", encoding="utf-8") as file_out: - tag = remove_namespace_from_tag(node.tag) - if tag == "definition": - self.found_definition = True - self.handle_definition(node) - # Taking care of root level doc and symbols - remove_cmnt_n = None - last_comment = "" - for child in list(node): - tag_tmp = remove_namespace_from_tag(child.tag) - if tag_tmp == CMNT_TAG and self.include_comment: - last_comment = self.comvert_to_ymal_comment( - depth * DEPTH_SIZE, child.text - ) - remove_cmnt_n = child - if tag_tmp == "doc": - self.store_root_level_comments("root_doc", last_comment) - last_comment = "" - self.handle_root_level_doc(child) - node.remove(child) - if remove_cmnt_n is not None: - node.remove(remove_cmnt_n) - remove_cmnt_n = None - if tag_tmp == "symbols": - self.store_root_level_comments("symbols", last_comment) - last_comment = "" - self.handle_symbols(depth, child) - node.remove(child) - if remove_cmnt_n is not None: - node.remove(remove_cmnt_n) - remove_cmnt_n = None - - if tag == ("doc") and depth != 1: - parent = get_node_parent_info(tree, node)[0] - doc_parent = remove_namespace_from_tag(parent.tag) - if doc_parent != "item": - self.handle_not_root_level_doc( - depth, text=node.text, tag=node.tag, file_out=file_out - ) - - if self.found_definition is True and self.root_level_doc: - self.print_root_level_info(depth, file_out) - # End of print root-level definitions in file - if tag in ("field", "group") and depth != 0: - self.handle_group_or_field(depth, node, file_out) - if tag == ("enumeration"): - self.handle_enumeration(depth, node, file_out) - if tag == ("attribute"): - self.handle_attributes(depth, node, file_out) - if tag == ("dimensions"): - self.handle_dimension(depth, node, file_out) - if tag == ("link"): - self.handel_link(depth, node, file_out) - if tag == ("choice"): - self.handel_choice(depth, node, file_out) - if tag == CMNT_TAG and self.include_comment: - self.handel_comment(depth, node, file_out) - depth += 1 - # Write nested nodes - self.recursion_in_xml_tree(depth, xml_tree, output_yml, verbose) - - -def compare_niac_and_my(tree, tree2, verbose, node, root_no_duplicates): - """This function creates two trees with Niac XML file and My XML file. - The main aim is to compare the two trees and create a new one that is the - union of the two initial trees. - """ - root = tree.getroot() - root2 = tree2.getroot() - attrs_list_niac = [] - for nodo in root.iter(node): - attrs_list_niac.append(nodo.attrib) - if verbose: - sys.stdout.write("Attributes found in Niac file: \n") - sys.stdout.write(str(attrs_list_niac) + "\n") - sys.stdout.write(" \n") - sys.stdout.write("Started merging of Niac and My file... \n") - for elem in root.iter(node): - if verbose: - sys.stdout.write("- Niac element inserted: \n") - sys.stdout.write(str(elem.attrib) + "\n") - index = get_node_parent_info(tree, elem)[1] - root_no_duplicates.insert(index, elem) - - for elem2 in root2.iter(node): - index = get_node_parent_info(tree2, elem2)[1] - if elem2.attrib not in attrs_list_niac: - if verbose: - sys.stdout.write("- My element inserted: \n") - sys.stdout.write(str(elem2.attrib) + "\n") - root_no_duplicates.insert(index, elem2) - - if verbose: - sys.stdout.write(" \n") - return root_no_duplicates diff --git a/pynxtools/nyaml2nxdl/nyaml2nxdl_forward_tools.py b/pynxtools/nyaml2nxdl/nyaml2nxdl_forward_tools.py deleted file mode 100644 index 66cd0dfa6..000000000 --- a/pynxtools/nyaml2nxdl/nyaml2nxdl_forward_tools.py +++ /dev/null @@ -1,1236 +0,0 @@ -#!/usr/bin/env python3 -"""Creates an instantiated NXDL schema XML tree by walking the dictionary nest - -""" -# -*- coding: utf-8 -*- -# -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import sys -import xml.etree.ElementTree as ET -from xml.dom import minidom -import os -import textwrap - -import yaml - -from pynxtools.nexus import nexus -from pynxtools.nyaml2nxdl.comment_collector import CommentCollector -from pynxtools.dataconverter.helpers import remove_namespace_from_tag -from pynxtools.nyaml2nxdl.nyaml2nxdl_helper import ( - get_yaml_escape_char_reverter_dict, - nx_name_type_resolving, - cleaning_empty_lines, - LineLoader, -) - - -# pylint: disable=too-many-lines, global-statement, invalid-name -DOM_COMMENT = ( - "\n" - "# NeXus - Neutron and X-ray Common Data Format\n" - "# \n" - "# Copyright (C) 2014-2022 NeXus International Advisory Committee (NIAC)\n" - "# \n" - "# This library is free software; you can redistribute it and/or\n" - "# modify it under the terms of the GNU Lesser General Public\n" - "# License as published by the Free Software Foundation; either\n" - "# version 3 of the License, or (at your option) any later version.\n" - "#\n" - "# This library is distributed in the hope that it will be useful,\n" - "# but WITHOUT ANY WARRANTY; without even the implied warranty of\n" - "# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU\n" - "# Lesser General Public License for more details.\n" - "#\n" - "# You should have received a copy of the GNU Lesser General Public\n" - "# License along with this library; if not, write to the Free Software\n" - "# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA\n" - "#\n" - "# For further information, see http://www.nexusformat.org\n" -) -NX_CLSS = nexus.get_nx_classes() -NX_NEW_DEFINED_CLASSES = ["NX_COMPLEX"] -NX_TYPE_KEYS = nexus.get_nx_attribute_type() -NX_ATTR_IDNT = "\\@" -NX_UNIT_IDNT = "unit" -DEPTH_SIZE = " " -NX_UNIT_TYPES = nexus.get_nx_units() -COMMENT_BLOCKS: CommentCollector -CATEGORY = "" # Definition would be either 'base' or 'application' - - -def check_for_dom_comment_in_yaml(): - """Check the yaml file has dom comment or dom comment needed to be hard coded.""" - dignature_keyword_list = [ - "NeXus", - "GNU Lesser General Public", - "Free Software Foundation", - "Copyright (C)", - "WITHOUT ANY WARRANTY", - ] - - # Check for dom comments in first three comments - dom_comment = "" - dom_comment_ind = 1 - for ind, comnt in enumerate(COMMENT_BLOCKS[0:5]): - cmnt_list = comnt.get_comment_text() - if len(cmnt_list) == 1: - text = cmnt_list[0] - else: - continue - dom_comment = text - dom_comment_ind = ind - for keyword in dignature_keyword_list: - if keyword not in text: - dom_comment = "" - break - if dom_comment: - break - - # deactivate the root dom_comment, So that the corresponding comment would not be - # considered as comment for definition xml element. - if dom_comment: - COMMENT_BLOCKS.remove_comment(dom_comment_ind) - - return dom_comment - - -def yml_reader(inputfile): - """ - This function launches the LineLoader class. - It parses the yaml in a dict and extends it with line tag keys for each key of the dict. - """ - global COMMENT_BLOCKS - with open(inputfile, "r", encoding="utf-8") as plain_text_yaml: - loader = LineLoader(plain_text_yaml) - loaded_yaml = loader.get_single_data() - COMMENT_BLOCKS = CommentCollector(inputfile, loaded_yaml) - COMMENT_BLOCKS.extract_all_comment_blocks() - dom_cmnt_frm_yaml = check_for_dom_comment_in_yaml() - global DOM_COMMENT - if dom_cmnt_frm_yaml: - DOM_COMMENT = dom_cmnt_frm_yaml - - if "category" not in loaded_yaml.keys(): - raise ValueError( - "All definitions should be either 'base' or 'application' category. " - "No category has been found." - ) - global CATEGORY - CATEGORY = loaded_yaml["category"] - return loaded_yaml - - -def check_for_default_attribute_and_value(xml_element): - """NeXus Groups, fields and attributes might have xml default attributes and valuesthat must - come. For example: 'optional' which is 'true' by default for base class and false otherwise. - """ - - # base:Default attributes and value for all elements of base class except dimension element - base_attr_to_val = {"optional": "true"} - - # application: Default attributes and value for all elements of application class except - # dimension element - application_attr_to_val = {"optional": "false"} - - # Default attributes and value for dimension element - base_dim_attr_to_val = {"required": "false"} - application_dim_attr_to_val = {"required": "true"} - - # Eligible tag for default attr and value - elegible_tag = ["group", "field", "attribute"] - - def set_default_attribute(xml_elem, default_attr_to_val): - for deflt_attr, deflt_val in default_attr_to_val.items(): - if ( - deflt_attr not in xml_elem.attrib - and "maxOccurs" not in xml_elem.attrib - and "minOccurs" not in xml_elem.attrib - and "recommended" not in xml_elem.attrib - ): - xml_elem.set(deflt_attr, deflt_val) - - for child in list(xml_element): - # skiping comment 'function' that mainly collect comment from yaml file. - if not isinstance(child.tag, str): - continue - tag = remove_namespace_from_tag(child.tag) - - if tag == "dim" and CATEGORY == "base": - set_default_attribute(child, base_dim_attr_to_val) - if tag == "dim" and CATEGORY == "application": - set_default_attribute(child, application_dim_attr_to_val) - if tag in elegible_tag and CATEGORY == "base": - set_default_attribute(child, base_attr_to_val) - if tag in elegible_tag and CATEGORY == "application": - set_default_attribute(child, application_attr_to_val) - check_for_default_attribute_and_value(child) - - -def yml_reader_nolinetag(inputfile): - """ - pyyaml based parsing of yaml file in python dict - """ - with open(inputfile, "r", encoding="utf-8") as stream: - parsed_yaml = yaml.safe_load(stream) - return parsed_yaml - - -def check_for_skiped_attributes(component, value, allowed_attr=None, verbose=False): - """ - Check for any attributes have been skipped or not. - NOTE: We should keep in mind about 'doc' - """ - block_tag = ["enumeration"] - if value: - for attr, val in value.items(): - if attr in ["doc"]: - continue - if "__line__" in attr or attr in block_tag: - continue - line_number = f"__line__{attr}" - if verbose: - print(f"__line__ : {value[line_number]}") - if ( - not isinstance(val, dict) - and "\\@" not in attr - and attr not in allowed_attr - and "NX" not in attr - and val - ): - raise ValueError( - f"An attribute '{attr}' in part '{component}' has been found" - f". Please check arround line '{value[line_number]}. At this " - f"moment. The allowed attrbutes are {allowed_attr}" - ) - - -def format_nxdl_doc(string): - """NeXus format for doc string""" - string = check_for_mapping_char_other(string) - formatted_doc = "" - if "\n" not in string: - if len(string) > 80: - wrapped = textwrap.TextWrapper( - width=80, break_long_words=False, replace_whitespace=False - ) - string = "\n".join(wrapped.wrap(string)) - formatted_doc = "\n" + f"{string}" - else: - text_lines = string.split("\n") - text_lines = cleaning_empty_lines(text_lines) - formatted_doc += "\n" + "\n".join(text_lines) - if not formatted_doc.endswith("\n"): - formatted_doc += "\n" - return formatted_doc - - -def check_for_mapping_char_other(text): - """ - Check for mapping char \':\' which does not be passed through yaml library. - Then replace it by ':'. - """ - if not text: - text = "" - text = str(text) - if text == "True": - text = "true" - if text == "False": - text = "false" - # Some escape char is not valid in yaml libray which is written while writting - # yaml file. In the time of writting nxdl revert to that escape char. - escape_reverter = get_yaml_escape_char_reverter_dict() - for key, val in escape_reverter.items(): - if key in text: - text = text.replace(key, val) - return str(text).strip() - - -def xml_handle_doc(obj, value: str, line_number=None, line_loc=None): - """This function creates a 'doc' element instance, and appends it to an existing element""" - # global comment_bolcks - doc_elemt = ET.SubElement(obj, "doc") - text = format_nxdl_doc(check_for_mapping_char_other(value)).strip() - # To keep the doc middle of doc tag. - doc_elemt.text = f"\n{text}\n" - if line_loc is not None and line_number is not None: - xml_handle_comment(obj, line_number, line_loc, doc_elemt) - - -def xml_handle_units(obj, value): - """This function creates a 'units' element instance, and appends it to an existing element""" - obj.set("units", str(value)) - - -# pylint: disable=too-many-branches -def xml_handle_exists(dct, obj, keyword, value): - """ - This function creates an 'exists' element instance, and appends it to an existing element - """ - line_number = f"__line__{keyword}" - assert ( - value is not None - ), f"Line {dct[line_number]}: exists argument must not be None !" - if isinstance(value, list): - if len(value) == 4 and value[0] == "min" and value[2] == "max": - obj.set("minOccurs", str(value[1])) - if str(value[3]) != "infty": - obj.set("maxOccurs", str(value[3])) - else: - obj.set("maxOccurs", "unbounded") - elif len(value) == 2 and value[0] == "min": - obj.set("minOccurs", str(value[1])) - elif len(value) == 2 and value[0] == "max": - obj.set("maxOccurs", str(value[1])) - elif len(value) == 4 and value[0] == "max" and value[2] == "min": - obj.set("minOccurs", str(value[3])) - if str(value[1]) != "infty": - obj.set("maxOccurs", str(value[3])) - else: - obj.set("maxOccurs", "unbounded") - elif len(value) == 4 and (value[0] != "min" or value[2] != "max"): - raise ValueError( - f"Line {dct[line_number]}: exists keyword" - f"needs to go either with an optional [recommended] list with two " - f"entries either [min, ] or [max, ], or a list of four " - f"entries [min, , max, ] !" - ) - else: - raise ValueError( - f"Line {dct[line_number]}: exists keyword " - f"needs to go either with optional, recommended, a list with two " - f"entries either [min, ] or [max, ], or a list of four " - f"entries [min, , max, ] !" - ) - else: - # This clause take optional in all concept except dimension where 'required' key is allowed - # not the 'optional' key. - if value == "optional": - obj.set("optional", "true") - elif value == "recommended": - obj.set("recommended", "true") - elif value == "required": - obj.set("optional", "false") - else: - obj.set("minOccurs", "0") - - -# pylint: disable=too-many-branches, too-many-locals, too-many-statements -def xml_handle_group(dct, obj, keyword, value, verbose=False): - """ - The function deals with group instances - """ - line_number = f"__line__{keyword}" - line_loc = dct[line_number] - xml_handle_comment(obj, line_number, line_loc) - list_of_attr = [ - "name", - "type", - "nameType", - "deprecated", - "optional", - "recommended", - "exists", - "unit", - ] - l_bracket = -1 - r_bracket = -1 - if keyword.count("(") == 1: - l_bracket = keyword.index("(") - if keyword.count(")") == 1: - r_bracket = keyword.index(")") - - keyword_name, keyword_type = nx_name_type_resolving(keyword) - if not keyword_name and not keyword_type: - raise ValueError("A group must have both value and name. Check for group.") - grp = ET.SubElement(obj, "group") - - if l_bracket == 0 and r_bracket > 0: - grp.set("type", keyword_type) - if keyword_name: - grp.set("name", keyword_name) - elif l_bracket > 0: - grp.set("name", keyword_name) - if keyword_type: - grp.set("type", keyword_type) - else: - grp.set("name", keyword_name) - - if value: - rm_key_list = [] - for attr, vval in value.items(): - if "__line__" in attr: - continue - line_number = f"__line__{attr}" - line_loc = value[line_number] - if attr == "doc": - xml_handle_doc(grp, vval, line_number, line_loc) - rm_key_list.append(attr) - rm_key_list.append(line_number) - elif attr == "exists" and vval: - xml_handle_exists(value, grp, attr, vval) - rm_key_list.append(attr) - rm_key_list.append(line_number) - xml_handle_comment(obj, line_number, line_loc, grp) - elif attr == "unit": - xml_handle_units(grp, vval) - xml_handle_comment(obj, line_number, line_loc, grp) - elif attr in list_of_attr and not isinstance(vval, dict) and vval: - validate_field_attribute_and_value(attr, vval, list_of_attr, value) - grp.set(attr, check_for_mapping_char_other(vval)) - rm_key_list.append(attr) - rm_key_list.append(line_number) - xml_handle_comment(obj, line_number, line_loc, grp) - - for key in rm_key_list: - del value[key] - # Check for skipped attrinutes - check_for_skiped_attributes("group", value, list_of_attr, verbose) - if isinstance(value, dict) and value != {}: - recursive_build(grp, value, verbose) - - -def xml_handle_dimensions(dct, obj, keyword, value: dict): - """ - This function creates a 'dimensions' element instance, and appends it to an existing element - - NOTE: we could create xml_handle_dim() function. - But, the dim elements in yaml file is defined as 'dim =[[index, value]]' - but dim has other attributes such as 'ref' and also might have doc as chlid. - so in that sense 'dim' should have come as dict keeping attributes and child as members of - dict. - Regarding this situation all the attributes of 'dimensions' and child 'doc' has been - included here. - - Other attributes, except 'index' and 'value', of 'dim' comes under nested dict named - 'dim_parameter: - incr:[...]' - """ - - possible_dimension_attrs = ["rank"] # nxdl attributes - line_number = f"__line__{keyword}" - line_loc = dct[line_number] - assert "dim" in value.keys(), ( - f"Line {line_loc}: No dim as child of dimension has " f"been found." - ) - xml_handle_comment(obj, line_number, line_loc) - dims = ET.SubElement(obj, "dimensions") - # Consider all the childs under dimension is dim element and - # its attributes - - rm_key_list = [] - rank = "" - for key, val in value.items(): - if "__line__" in key: - continue - line_number = f"__line__{key}" - line_loc = value[line_number] - if key == "rank": - rank = val or "" - if isinstance(rank, int) and rank < 0: - raise ValueError( - f"Dimension must have some info about rank which is not " - f"available. Please check arround Line: {dct[line_number]}" - ) - dims.set(key, str(val)) - rm_key_list.append(key) - rm_key_list.append(line_number) - xml_handle_comment(obj, line_number, line_loc, dims) - # Check dimension doc and handle it - elif key == "doc" and isinstance(val, str): - xml_handle_doc(dims, val, line_number, line_loc) - rm_key_list.append(key) - rm_key_list.append(line_number) - elif key in possible_dimension_attrs and not isinstance(val, dict): - dims.set(key, str(val)) - rm_key_list.append(key) - rm_key_list.append(line_number) - xml_handle_comment(obj, line_number, line_loc, dims) - - for key in rm_key_list: - del value[key] - - xml_handle_dim_from_dimension_dict(dct, dims, keyword, value, rank=False) - - if isinstance(value, dict) and value != {}: - recursive_build(dims, value, verbose=None) - - -# pylint: disable=too-many-locals, too-many-arguments -def xml_handle_dim_from_dimension_dict( - dct, dims_obj, keyword, value, rank, verbose=False -): - """ - Handling dim element. - NOTE: The inputs 'keyword' and 'value' are as input for xml_handle_dimensions - function. please also read note in xml_handle_dimensions. - """ - - possible_dim_attrs = ["ref", "incr", "refindex", "required"] - - # Some attributes might have equivalent name e.g. 'required' is correct one and - # 'optional' could be another name. Then change attribute to the correct one. - wrong_to_correct_attr = [("optional", "required")] - header_line_number = f"__line__{keyword}" - dim_list = [] - rm_key_list = [] - # NOTE: dim doc and other attributes except 'index' and 'value' will come as list of value - # under dim_parameters - if not value: - return - rank = "" - # pylint: disable=too-many-nested-blocks - for attr, vvalue in value.items(): - if "__line__" in attr: - continue - line_number = f"__line__{attr}" - line_loc = value[line_number] - # dim comes in precedence - if attr == "dim": - # dim consists of list of [index, value] - llist_ind_value = vvalue - assert isinstance(llist_ind_value, list), ( - f"Line {value[line_number]}: dim" f"argument not a list !" - ) - xml_handle_comment(dims_obj, line_number, line_loc) - if isinstance(rank, int) and rank > 0: - assert rank == len(llist_ind_value), ( - f"Wrong dimension rank check around Line {dct[header_line_number]}.\n" - f"Line {[dct[header_line_number]]} rank value {rank} " - f"is not the same as dim array = " - f"{len(llist_ind_value)}." - ) - # Taking care of ind and value that comes as list of list - for dim_ind_val in llist_ind_value: - dim = ET.SubElement(dims_obj, "dim") - - # Taking care of multidimensions or rank - if len(dim_ind_val) >= 1 and dim_ind_val[0]: - dim.set("index", str(dim_ind_val[0])) - if len(dim_ind_val) == 2 and dim_ind_val[1]: - dim.set("value", str(dim_ind_val[1])) - dim_list.append(dim) - rm_key_list.append(attr) - rm_key_list.append(line_number) - elif attr == "dim_parameters" and isinstance(vvalue, dict): - xml_handle_comment(dims_obj, line_number, line_loc) - for kkkey, vvval in vvalue.items(): - if "__line__" in kkkey: - continue - cmnt_number = f"__line__{kkkey}" - cmnt_loc = vvalue[cmnt_number] - # Check whether any optional attributes added - for tuple_wng_crt in wrong_to_correct_attr: - if kkkey == tuple_wng_crt[0]: - raise ValueError( - f"{cmnt_loc}: Attribute '{kkkey}' is prohibited, use " - f"'{tuple_wng_crt[1]}" - ) - if kkkey == "doc" and dim_list: - # doc comes as list of doc - for i, dim in enumerate(dim_list): - if isinstance(vvval, list) and i < len(vvval): - tmp_val = vvval[i] - xml_handle_doc(dim, vvval[i], cmnt_number, cmnt_loc) - # Check all the dim have doc if not skip - elif isinstance(vvval, list) and i >= len(vvval): - pass - else: - for i, dim in enumerate(dim_list): - # all atribute of dims comes as list - if isinstance(vvval, list) and i < len(vvval): - tmp_val = vvval[i] - dim.set(kkkey, str(tmp_val)) - - # Check all the dim have doc if not skip - elif isinstance(vvval, list) and i >= len(vvval): - pass - # All dim might have the same value for the same attribute - elif not isinstance(vvval, list): - tmp_val = value - dim.set(kkkey, str(tmp_val)) - rm_key_list.append(attr) - rm_key_list.append(line_number) - else: - raise ValueError( - f"Got unexpected block except 'dim' and 'dim_parameters'." - f"Please check arround line {line_number}" - ) - - for key in rm_key_list: - del value[key] - - check_for_skiped_attributes("dim", value, possible_dim_attrs, verbose) - - -def xml_handle_enumeration(dct, obj, keyword, value, verbose): - """This function creates an 'enumeration' element instance. - - Two cases are handled: - 1) the items are in a list - 2) the items are dictionaries and may contain a nested doc - """ - line_number = f"__line__{keyword}" - line_loc = dct[line_number] - xml_handle_comment(obj, line_number, line_loc) - enum = ET.SubElement(obj, "enumeration") - - assert value is not None, f"Line {line_loc}: enumeration must \ -bear at least an argument !" - assert ( - len(value) >= 1 - ), f"Line {dct[line_number]}: enumeration must not be an empty list!" - if isinstance(value, list): - for element in value: - itm = ET.SubElement(enum, "item") - itm.set("value", str(element)) - if isinstance(value, dict) and value != {}: - for element in value.keys(): - if "__line__" not in element: - itm = ET.SubElement(enum, "item") - itm.set("value", str(element)) - if isinstance(value[element], dict): - recursive_build(itm, value[element], verbose) - - -# pylint: disable=unused-argument -def xml_handle_link(dct, obj, keyword, value, verbose): - """ - If we have an NXDL link we decode the name attribute from (link)[:-6] - """ - - line_number = f"__line__{keyword}" - line_loc = dct[line_number] - xml_handle_comment(obj, line_number, line_loc) - possible_attrs = ["name", "target", "napimount"] - name = keyword[:-6] - link_obj = ET.SubElement(obj, "link") - link_obj.set("name", str(name)) - - if value: - rm_key_list = [] - for attr, vval in value.items(): - if "__line__" in attr: - continue - line_number = f"__line__{attr}" - line_loc = value[line_number] - if attr == "doc": - xml_handle_doc(link_obj, vval, line_number, line_loc) - rm_key_list.append(attr) - rm_key_list.append(line_number) - elif attr in possible_attrs and not isinstance(vval, dict): - if vval: - link_obj.set(attr, str(vval)) - rm_key_list.append(attr) - rm_key_list.append(line_number) - xml_handle_comment(obj, line_number, line_loc, link_obj) - - for key in rm_key_list: - del value[key] - # Check for skipped attrinutes - check_for_skiped_attributes("link", value, possible_attrs, verbose) - - if isinstance(value, dict) and value != {}: - recursive_build(link_obj, value, verbose=None) - - -def xml_handle_choice(dct, obj, keyword, value, verbose=False): - """ - Build choice xml elements. That consists of groups. - """ - line_number = f"__line__{keyword}" - line_loc = dct[line_number] - xml_handle_comment(obj, line_number, line_loc) - # Add attributes in possible if new attributs have been added nexus definition. - possible_attr = [] - choice_obj = ET.SubElement(obj, "choice") - # take care of special attributes - name = keyword[:-8] - choice_obj.set("name", name) - - if value: - rm_key_list = [] - for attr, vval in value.items(): - if "__line__" in attr: - continue - line_number = f"__line__{attr}" - line_loc = value[line_number] - if attr == "doc": - xml_handle_doc(choice_obj, vval, line_number, line_loc) - rm_key_list.append(attr) - rm_key_list.append(line_number) - elif attr in possible_attr and not isinstance(vval, dict): - if vval: - choice_obj.set(attr, str(vval)) - rm_key_list.append(attr) - rm_key_list.append(line_number) - xml_handle_comment(obj, line_number, line_loc, choice_obj) - - for key in rm_key_list: - del value[key] - # Check for skipped attrinutes - check_for_skiped_attributes("choice", value, possible_attr, verbose) - - if isinstance(value, dict) and value != {}: - recursive_build(choice_obj, value, verbose=None) - - -def xml_handle_symbols(dct, obj, keyword, value: dict): - """Handle a set of NXDL symbols as a child to obj""" - line_number = f"__line__{keyword}" - line_loc = dct[line_number] - assert ( - len(list(value.keys())) >= 1 - ), f"Line {line_loc}: symbols table must not be empty !" - xml_handle_comment(obj, line_number, line_loc) - syms = ET.SubElement(obj, "symbols") - if "doc" in value.keys(): - line_number = "__line__doc" - line_loc = value[line_number] - xml_handle_comment(syms, line_number, line_loc) - doctag = ET.SubElement(syms, "doc") - doctag.text = "\n" + textwrap.fill(value["doc"], width=70) + "\n" - rm_key_list = [] - for kkeyword, vvalue in value.items(): - if "__line__" in kkeyword: - continue - if kkeyword != "doc": - line_number = f"__line__{kkeyword}" - line_loc = value[line_number] - xml_handle_comment(syms, line_number, line_loc) - assert vvalue is not None and isinstance( - vvalue, str - ), f"Line {line_loc}: put a comment in doc string !" - sym = ET.SubElement(syms, "symbol") - sym.set("name", str(kkeyword)) - # sym_doc = ET.SubElement(sym, 'doc') - xml_handle_doc(sym, vvalue) - rm_key_list.append(kkeyword) - rm_key_list.append(line_number) - # sym_doc.text = '\n' + textwrap.fill(vvalue, width=70) + '\n' - for key in rm_key_list: - del value[key] - - -def check_keyword_variable(verbose, dct, keyword, value): - """ - Check whether both keyword_name and keyword_type are empty, - and complains if it is the case - """ - keyword_name, keyword_type = nx_name_type_resolving(keyword) - if verbose: - sys.stdout.write( - f"{keyword_name}({keyword_type}): value type is {type(value)}\n" - ) - if keyword_name == "" and keyword_type == "": - line_number = f"__line__{keyword}" - raise ValueError(f"Line {dct[line_number]}: found an improper yaml key !") - - -def helper_keyword_type(kkeyword_type): - """ - This function is returning a value of keyword_type if it belong to NX_TYPE_KEYS - """ - if kkeyword_type in NX_TYPE_KEYS: - return kkeyword_type - return None - - -def verbose_flag(verbose, keyword, value): - """ - Verbose stdout printing for nested levels of yaml file, if verbose flag is active - """ - if verbose: - sys.stdout.write(f" key:{keyword}; value type is {type(value)}\n") - - -def xml_handle_attributes(dct, obj, keyword, value, verbose): - """Handle the attributes found connected to attribute field""" - - line_number = f"__line__{keyword}" - line_loc = dct[line_number] - xml_handle_comment(obj, line_number, line_loc) - # list of possible attribute of xml attribute elementsa - attr_attr_list = [ - "name", - "type", - "unit", - "nameType", - "optional", - "recommended", - "minOccurs", - "maxOccurs", - "deprecated", - "exists", - ] - # as an attribute identifier - keyword_name, keyword_typ = nx_name_type_resolving(keyword) - line_number = f"__line__{keyword}" - if verbose: - print(f"__line__ : {dct[line_number]}") - if keyword_name == "" and keyword_typ == "": - raise ValueError(f"Line {dct[line_number]}: found an improper yaml key !") - elemt_obj = ET.SubElement(obj, "attribute") - elemt_obj.set("name", keyword_name[2:]) - if keyword_typ: - elemt_obj.set("type", keyword_typ) - - rm_key_list = [] - if value and value: - # taking care of attributes of attributes - for attr, attr_val in value.items(): - if "__line__" in attr: - continue - line_number = f"__line__{attr}" - line_loc = value[line_number] - if attr in ["doc", *attr_attr_list] and not isinstance(attr_val, dict): - if attr == "unit": - elemt_obj.set(f"{attr}s", str(value[attr])) - rm_key_list.append(attr) - rm_key_list.append(line_number) - xml_handle_comment(obj, line_number, line_loc, elemt_obj) - elif attr == "exists" and attr_val: - xml_handle_exists(value, elemt_obj, attr, attr_val) - rm_key_list.append(attr) - rm_key_list.append(line_number) - xml_handle_comment(obj, line_number, line_loc, elemt_obj) - elif attr == "doc": - xml_handle_doc( - elemt_obj, format_nxdl_doc(attr_val), line_number, line_loc - ) - rm_key_list.append(attr) - rm_key_list.append(line_number) - else: - elemt_obj.set(attr, check_for_mapping_char_other(attr_val)) - rm_key_list.append(attr) - rm_key_list.append(line_number) - xml_handle_comment(obj, line_number, line_loc, elemt_obj) - - for key in rm_key_list: - del value[key] - # Check cor skiped attribute - check_for_skiped_attributes("Attribute", value, attr_attr_list, verbose) - if value: - recursive_build(elemt_obj, value, verbose) - - -def validate_field_attribute_and_value(v_attr, vval, allowed_attribute, value): - """ - Check for any attributes that comes with invalid name, - and invalid value. - """ - - # check for empty val - if not isinstance(vval, dict) and not str(vval): # check for empty value - line_number = f"__line__{v_attr}" - raise ValueError( - f"In a field a valid attrbute ('{v_attr}') found that is not stored." - f" Please check arround line {value[line_number]}" - ) - - # The bellow elements might come as child element - skipped_child_name = ["doc", "dimension", "enumeration", "choice", "exists"] - # check for invalid key or attributes - if ( - v_attr not in [*skipped_child_name, *allowed_attribute] - and "__line__" not in v_attr - and not isinstance(vval, dict) - and "(" not in v_attr # skip only groups and field that has name and type - and "\\@" not in v_attr - ): # skip nexus attributes - line_number = f"__line__{v_attr}" - raise ValueError( - f"In a field or group a invalid attribute ('{v_attr}') or child has found." - f" Please check arround line {value[line_number]}." - ) - - -def xml_handle_fields(obj, keyword, value, line_annot, line_loc, verbose=False): - """ - Handle a field in yaml file. - When a keyword is NOT: - symbol, - NX baseclass member, - attribute (\\@), - doc, - enumerations, - dimension, - exists, - then the not empty keyword_name is a field! - This simple function will define a new node of xml tree - """ - # List of possible attributes of xml elements - allowed_attr = [ - "name", - "type", - "nameType", - "unit", - "minOccurs", - "long_name", - "axis", - "signal", - "deprecated", - "axes", - "exists", - "data_offset", - "interpretation", - "maxOccurs", - "primary", - "recommended", - "optional", - "stride", - ] - - xml_handle_comment(obj, line_annot, line_loc) - l_bracket = -1 - r_bracket = -1 - if keyword.count("(") == 1: - l_bracket = keyword.index("(") - if keyword.count(")") == 1: - r_bracket = keyword.index(")") - - keyword_name, keyword_type = nx_name_type_resolving(keyword) - if not keyword_type and not keyword_name: - raise ValueError("Check for name or type in field.") - elemt_obj = ET.SubElement(obj, "field") - - # type come first - if l_bracket == 0 and r_bracket > 0: - elemt_obj.set("type", keyword_type) - if keyword_name: - elemt_obj.set("name", keyword_name) - elif l_bracket > 0: - elemt_obj.set("name", keyword_name) - if keyword_type: - elemt_obj.set("type", keyword_type) - else: - elemt_obj.set("name", keyword_name) - - if value: - rm_key_list = [] - # In each each if clause apply xml_handle_comment(), to collect - # comments on that yaml line. - for attr, vval in value.items(): - if "__line__" in attr: - continue - line_number = f"__line__{attr}" - line_loc = value[line_number] - if attr == "doc": - xml_handle_doc( - elemt_obj, - vval, - line_number, - line_loc, - ) - rm_key_list.append(attr) - rm_key_list.append(line_number) - elif attr == "exists" and vval: - xml_handle_exists(value, elemt_obj, attr, vval) - rm_key_list.append(attr) - rm_key_list.append(line_number) - xml_handle_comment(obj, line_number, line_loc, elemt_obj) - elif attr == "unit": - xml_handle_units(elemt_obj, vval) - xml_handle_comment(obj, line_number, line_loc, elemt_obj) - elif attr in allowed_attr and not isinstance(vval, dict) and vval: - validate_field_attribute_and_value(attr, vval, allowed_attr, value) - elemt_obj.set(attr, check_for_mapping_char_other(vval)) - rm_key_list.append(attr) - rm_key_list.append(line_number) - xml_handle_comment(obj, line_number, line_loc, elemt_obj) - - for key in rm_key_list: - del value[key] - # Check for skipped attrinutes - check_for_skiped_attributes("field", value, allowed_attr, verbose) - - if isinstance(value, dict) and value != {}: - recursive_build(elemt_obj, value, verbose) - - -def xml_handle_comment( - obj: ET.Element, - line_annotation: str, - line_loc_no: int, - xml_ele: ET.Element = None, - is_def_cmnt: bool = False, -): - """ - Add xml comment: check for comments that has the same 'line_annotation' - (e.g. __line__data) and the same line_loc_no (e.g. 30). After that, i - does of three tasks: - 1. Returns list of comments texts (multiple members if element has multiple comments) - 2. Rearrange comment element and xml_ele where comment comes first. - 3. Append comment element when no xml_ele will no be provided. - """ - - line_info = (line_annotation, int(line_loc_no)) - if line_info in COMMENT_BLOCKS: - cmnt = COMMENT_BLOCKS.get_coment_by_line_info(line_info) - cmnt_text = cmnt.get_comment_text() - - if is_def_cmnt: - return cmnt_text - if xml_ele is not None: - obj.remove(xml_ele) - for string in cmnt_text: - si_comnt = ET.Comment(string) - obj.append(si_comnt) - obj.append(xml_ele) - elif not is_def_cmnt and xml_ele is None: - for string in cmnt_text: - si_comnt = ET.Comment(string) - obj.append(si_comnt) - else: - raise ValueError("Provied correct parameter values.") - return "" - - -def recursive_build(obj, dct, verbose): - """obj is the current node of the XML tree where we want to append to, - dct is a dictionary object which represents the content of a child to obj - dct may contain further dictionary nests, representing NXDL groups, - which trigger recursive processing - NXDL fields may contain attributes but trigger no recursion so attributes are leafs. - - """ - for keyword, value in iter(dct.items()): - if "__line__" in keyword: - continue - line_number = f"__line__{keyword}" - line_loc = dct[line_number] - keyword_name, keyword_type = nx_name_type_resolving(keyword) - check_keyword_variable(verbose, dct, keyword, value) - if verbose: - sys.stdout.write( - f"keyword_name:{keyword_name} keyword_type {keyword_type}\n" - ) - - if keyword[-6:] == "(link)": - xml_handle_link(dct, obj, keyword, value, verbose) - elif keyword[-8:] == "(choice)": - xml_handle_choice(dct, obj, keyword, value) - # The bellow xml_symbol clause is for the symbols that come ubde filed or attributes - # Root level symbols has been inside nyaml2nxdl() - elif keyword_type == "" and keyword_name == "symbols": - xml_handle_symbols(dct, obj, keyword, value) - - elif (keyword_type in NX_CLSS) or ( - keyword_type not in [*NX_TYPE_KEYS, "", *NX_NEW_DEFINED_CLASSES] - ): - # we can be sure we need to instantiate a new group - xml_handle_group(dct, obj, keyword, value, verbose) - - elif keyword_name[0:2] == NX_ATTR_IDNT: # check if obj qualifies - xml_handle_attributes(dct, obj, keyword, value, verbose) - elif keyword == "doc": - xml_handle_doc(obj, value, line_number, line_loc) - elif keyword == NX_UNIT_IDNT: - xml_handle_units(obj, value) - elif keyword == "enumeration": - xml_handle_enumeration(dct, obj, keyword, value, verbose) - - elif keyword == "dimensions": - xml_handle_dimensions(dct, obj, keyword, value) - - elif keyword == "exists": - xml_handle_exists(dct, obj, keyword, value) - # Handles fileds e.g. AXISNAME - elif keyword_name != "" and "__line__" not in keyword_name: - xml_handle_fields(obj, keyword, value, line_number, line_loc, verbose) - else: - raise ValueError( - f"An unfamiliar type of element {keyword} has been found which is " - f"not be able to be resolved. Chekc arround line {dct[line_number]}" - ) - - -def pretty_print_xml(xml_root, output_xml, def_comments=None): - """ - Print better human-readable indented and formatted xml file using - built-in libraries and preceding XML processing instruction - """ - dom = minidom.parseString(ET.tostring(xml_root, encoding="utf-8", method="xml")) - proc_instractionn = dom.createProcessingInstruction( - "xml-stylesheet", 'type="text/xsl" href="nxdlformat.xsl"' - ) - dom_comment = dom.createComment(DOM_COMMENT) - root = dom.firstChild - dom.insertBefore(proc_instractionn, root) - dom.insertBefore(dom_comment, root) - - if def_comments: - for string in def_comments: - def_comt_ele = dom.createComment(string) - dom.insertBefore(def_comt_ele, root) - - xml_string = dom.toprettyxml(indent=1 * DEPTH_SIZE, newl="\n", encoding="UTF-8") - with open("tmp.xml", "wb") as file_tmp: - file_tmp.write(xml_string) - flag = False - with open("tmp.xml", "r", encoding="utf-8") as file_out: - with open(output_xml, "w", encoding="utf-8") as file_out_mod: - for i in file_out.readlines(): - if "" not in i and "" not in i and flag is False: - file_out_mod.write(i) - elif "" in i and "" in i: - file_out_mod.write(i) - elif "" in i and "" not in i: - flag = True - white_spaces = len(i) - len(i.lstrip()) - file_out_mod.write(i) - elif "" not in i and "" not in i and flag is True: - file_out_mod.write((white_spaces + 5) * " " + i) - elif "" not in i and "" in i and flag is True: - file_out_mod.write(white_spaces * " " + i) - flag = False - os.remove("tmp.xml") - - -# pylint: disable=too-many-statements -def nyaml2nxdl(input_file: str, out_file, verbose: bool): - """ - Main of the nyaml2nxdl converter, creates XML tree, namespace and - schema, definitions then evaluates a dictionary nest of groups recursively and - fields or (their) attributes as childs of the groups - """ - - def_attributes = [ - "deprecated", - "ignoreExtraGroups", - "category", - "type", - "ignoreExtraFields", - "ignoreExtraAttributes", - "restricts", - ] - yml_appdef = yml_reader(input_file) - def_cmnt_text = [] - if verbose: - sys.stdout.write(f"input-file: {input_file}\n") - sys.stdout.write( - "application/base contains the following root-level entries:\n" - ) - sys.stdout.write(str(yml_appdef.keys())) - xml_root = ET.Element("definition", {}) - assert ( - "category" in yml_appdef.keys() - ), "Required root-level keyword category is missing!" - assert yml_appdef["category"] in ["application", "base"], "Only \ -application and base are valid categories!" - assert "doc" in yml_appdef.keys(), "Required root-level keyword doc is missing!" - - name_extends = "" - yml_appdef_copy = yml_appdef.copy() - for kkey, vvalue in yml_appdef_copy.items(): - if "__line__" in kkey: - continue - line_number = f"__line__{kkey}" - line_loc_no = yml_appdef[line_number] - if not isinstance(vvalue, dict) and kkey in def_attributes: - xml_root.set(kkey, str(vvalue) or "") - cmnt_text = xml_handle_comment( - xml_root, line_number, line_loc_no, is_def_cmnt=True - ) - def_cmnt_text += cmnt_text if cmnt_text else [] - - del yml_appdef[line_number] - del yml_appdef[kkey] - # Taking care or name and extends - elif "NX" in kkey: - # Tacking the attribute order but the correct value will be stored later - # check for name first or type first if (NXobject)NXname then type first - l_bracket_ind = kkey.rfind("(") - r_bracket_ind = kkey.rfind(")") - if l_bracket_ind == 0: - extend = kkey[1:r_bracket_ind] - name = kkey[r_bracket_ind + 1 :] - xml_root.set("extends", extend) - xml_root.set("name", name) - elif l_bracket_ind > 0: - name = kkey[0:l_bracket_ind] - extend = kkey[l_bracket_ind + 1 : r_bracket_ind] - xml_root.set("name", name) - xml_root.set("extends", extend) - else: - name = kkey - xml_root.set("name", name) - xml_root.set("extends", "NXobject") - cmnt_text = xml_handle_comment( - xml_root, line_number, line_loc_no, is_def_cmnt=True - ) - def_cmnt_text += cmnt_text if cmnt_text else [] - - name_extends = kkey - - if "type" not in xml_root.attrib: - xml_root.set("type", "group") - # Taking care of namespaces - namespaces = { - "xmlns": "http://definition.nexusformat.org/nxdl/3.1", - "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", - "xsi:schemaLocation": "http://definition.nexusformat.org/nxdl/3.1 ../nxdl.xsd", - } - for key, ns_ in namespaces.items(): - xml_root.attrib[key] = ns_ - # Taking care of Symbols elements - if "symbols" in yml_appdef.keys(): - xml_handle_symbols(yml_appdef, xml_root, "symbols", yml_appdef["symbols"]) - - del yml_appdef["symbols"] - del yml_appdef["__line__symbols"] - - assert isinstance(yml_appdef["doc"], str) and yml_appdef["doc"] != "", "Doc \ -has to be a non-empty string!" - - line_number = "__line__doc" - line_loc_no = yml_appdef[line_number] - xml_handle_doc(xml_root, yml_appdef["doc"], line_number, line_loc_no) - - del yml_appdef["doc"] - - root_keys = 0 - for key in yml_appdef.keys(): - if "__line__" not in key: - root_keys += 1 - extra_key = key - - assert root_keys == 1, ( - f"Accepting at most keywords: category, doc, symbols, and NX... " - f"at root-level! check key at root level {extra_key}" - ) - - assert "NX" in name_extends and len(name_extends) > 2, "NX \ -keyword has an invalid pattern, or is too short!" - # Taking care if definition has empty content - if yml_appdef[name_extends]: - recursive_build(xml_root, yml_appdef[name_extends], verbose) - # Taking care of comments that comes at the end of file that is might not be intended for - # any nxdl elements. - if COMMENT_BLOCKS[-1].has_post_comment: - post_comment = COMMENT_BLOCKS[-1] - (lin_annot, line_loc) = post_comment.get_line_info() - xml_handle_comment(xml_root, lin_annot, line_loc) - - # Note: Just to keep the functionality if we need this functionality later. - default_attr = False - if default_attr: - check_for_default_attribute_and_value(xml_root) - pretty_print_xml(xml_root, out_file, def_cmnt_text) - if verbose: - sys.stdout.write("Parsed YAML to NXDL successfully\n") diff --git a/pynxtools/nyaml2nxdl/nyaml2nxdl_helper.py b/pynxtools/nyaml2nxdl/nyaml2nxdl_helper.py deleted file mode 100644 index e9929ed51..000000000 --- a/pynxtools/nyaml2nxdl/nyaml2nxdl_helper.py +++ /dev/null @@ -1,223 +0,0 @@ -#!/usr/bin/env python3 -"""Main file of yaml2nxdl tool. -Users create NeXus instances by writing a YAML file -which details a hierarchy of data/metadata elements - -""" -# -*- coding: utf-8 -*- -# -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -# Yaml library does not except the keys (escapechar "\t" and yaml separator ":") -# So the corresponding value is to skip them and -# and also carefull about this order -import hashlib -from yaml.composer import Composer -from yaml.constructor import Constructor - -from yaml.nodes import ScalarNode -from yaml.resolver import BaseResolver -from yaml.loader import Loader - -# NOTE: If any one change one of the bellow dict please change it for both -ESCAPE_CHAR_DICT_IN_YAML = {"\t": " ", "':'": ":"} - -ESCAPE_CHAR_DICT_IN_XML = {" ": "\t", "':'": ":"} - - -class LineLoader(Loader): # pylint: disable=too-many-ancestors - """ - LineLoader parses a yaml into a python dictionary extended with extra items. - The new items have as keys __line__ and as values the yaml file line number - """ - - def compose_node(self, parent, index): - # the line number where the previous token has ended (plus empty lines) - node = Composer.compose_node(self, parent, index) - node.__line__ = self.line + 1 - return node - - def construct_mapping(self, node, deep=False): - node_pair_lst = node.value - node_pair_lst_for_appending = [] - - for key_node in node_pair_lst: - shadow_key_node = ScalarNode( - tag=BaseResolver.DEFAULT_SCALAR_TAG, - value="__line__" + key_node[0].value, - ) - shadow_value_node = ScalarNode( - tag=BaseResolver.DEFAULT_SCALAR_TAG, value=key_node[0].__line__ - ) - node_pair_lst_for_appending.append((shadow_key_node, shadow_value_node)) - - node.value = node_pair_lst + node_pair_lst_for_appending - return Constructor.construct_mapping(self, node, deep=deep) - - -def get_yaml_escape_char_dict(): - """Get escape char and the way to skip them in yaml.""" - return ESCAPE_CHAR_DICT_IN_YAML - - -def get_yaml_escape_char_reverter_dict(): - """To revert yaml escape char in xml constructor from yaml.""" - - return ESCAPE_CHAR_DICT_IN_XML - - -def type_check(nx_type): - """ - Check for nexus type if type is NX_CHAR get '' or get as it is. - """ - - if nx_type in ["NX_CHAR", ""]: - nx_type = "" - else: - nx_type = f"({nx_type})" - return nx_type - - -def get_node_parent_info(tree, node): - """ - Return tuple of (parent, index) where: - parent = node of parent within tree - index = index of node under parent - """ - - parent_map = {c: p for p in tree.iter() for c in p} - parent = parent_map[node] - return parent, list(parent).index(node) - - -def cleaning_empty_lines(line_list): - """ - Cleaning up empty lines on top and bottom. - """ - if not isinstance(line_list, list): - line_list = line_list.split("\n") if "\n" in line_list else [""] - - # Clining up top empty lines - while True: - if line_list[0].strip(): - break - line_list = line_list[1:] - if len(line_list) == 0: - line_list.append("") - return line_list - - # Clining bottom empty lines - while True: - if line_list[-1].strip(): - break - line_list = line_list[0:-1] - if len(line_list) == 0: - line_list.append("") - return line_list - - return line_list - - -def nx_name_type_resolving(tmp): - """ - extracts the eventually custom name {optional_string} - and type {nexus_type} from a YML section string. - YML section string syntax: optional_string(nexus_type) - """ - if tmp.count("(") == 1 and tmp.count(")") == 1: - # we can safely assume that every valid YML key resolves - # either an nx_ (type, base, candidate) class contains only 1 '(' and ')' - index_start = tmp.index("(") - index_end = tmp.index(")", index_start + 1) - typ = tmp[index_start + 1 : index_end] - nam = tmp.replace("(" + typ + ")", "") - return nam, typ - - # or a name for a member - typ = "" - nam = tmp - return nam, typ - - -def get_sha256_hash(file_name): - """Generate a sha256_hash for a given file.""" - sha_hash = hashlib.sha256() - - with open( - file=file_name, - mode="rb", - ) as file_obj: - # Update hash for each 4k block of bytes - for b_line in iter(lambda: file_obj.read(4096), b""): - sha_hash.update(b_line) - return sha_hash.hexdigest() - - -def extend_yamlfile_with_comment(yaml_file, file_to_be_appended, top_lines_list=None): - """Extend yaml file by the file_to_be_appended as comment.""" - - with open(yaml_file, mode="a+", encoding="utf-8") as f1_obj: - if top_lines_list: - for line in top_lines_list: - f1_obj.write(line) - - with open(file_to_be_appended, mode="r", encoding="utf-8") as f2_obj: - lines = f2_obj.readlines() - for line in lines: - f1_obj.write(f"# {line}") - - -def separate_hash_yaml_and_nxdl(yaml_file, sep_yaml, sep_xml): - """Separate the provided yaml file into yaml, nxdl and hash if yaml was extended with - nxdl at the end of yaml by - '\n# ++++++++++++++++++++++++++++++++++ SHA HASH \ - ++++++++++++++++++++++++++++++++++\n' - # ' - """ - sha_hash = "" - with open(yaml_file, "r", encoding="utf-8") as inp_file: - lines = inp_file.readlines() - # file to write yaml part - with open(sep_yaml, "w", encoding="utf-8") as yml_f_ob, open( - sep_xml, "w", encoding="utf-8" - ) as xml_f_ob: - last_line = "" - write_on_yaml = True - for ind, line in enumerate(lines): - if ind == 0: - last_line = line - # Write in file when ensured that the nest line is not with '++ SHA HASH ++' - elif "++ SHA HASH ++" not in line and write_on_yaml: - yml_f_ob.write(last_line) - last_line = line - elif "++ SHA HASH ++" in line: - write_on_yaml = False - last_line = "" - elif not write_on_yaml and not last_line: - # The first line of xml file has been found. Onward write lines directly - # into xml file. - if not sha_hash: - sha_hash = line.split("# ", 1)[-1].strip() - else: - xml_f_ob.write(line[2:]) - # If the yaml fiile does not contain any hash for nxdl then we may have last line. - if last_line: - yml_f_ob.write(last_line) - - return sha_hash