diff --git a/XML_to_SON/grammar_parser_test.py b/XML_to_SON/grammar_parser_test.py index 1d8b044..3928574 100644 --- a/XML_to_SON/grammar_parser_test.py +++ b/XML_to_SON/grammar_parser_test.py @@ -5,45 +5,70 @@ import os.path import argparse +processed_facilities = {} +processed_regions = {} +processed_institutions = {} + +facilities = [] +regions = [] +institutions = [] + def get_element_type(node): + + conversion_dict = { + 'string': 'String', + 'nonNegativeInteger': 'Int', + 'boolean': 'Int', + 'double': 'Real', + 'positiveInteger': 'Int', + 'float': 'Real', + 'duration': 'Int', + 'integer': 'Int', + 'nonPositiveInteger': 'Int', + 'negativeInteger': 'Int', + 'long': 'Real', + 'int': 'Int', + 'token': 'String' + } + for child in node: if child.tag.endswith("data") and "type" in child.attrib: xml_type = child.attrib['type'] return conversion_dict.get(xml_type, 'Unknown') return None -def process_element(node, parent_attrib=None): - if parent_attrib is None: - parent_attrib = {} +def process_element(node, parent_attrib={}): ele_dict = {} - name = node.attrib.get('name') - ele_dict[name] = {} + name = node.attrib.get('name') + formatted_name = f'"{name}"' if "-" in name else name + ele_dict[formatted_name] = {} if not parent_attrib: - ele_dict[name]['minOccurs'] = 1 - ele_dict[name]['maxOccurs'] = 1 + ele_dict[formatted_name]['MinOccurs'] = 1 + ele_dict[formatted_name]['MaxOccurs'] = 1 element_type = get_element_type(node) if element_type: - ele_dict[name]['ValType'] = element_type + ele_dict[formatted_name]['ValType'] = element_type else: for child in node: if child.tag.endswith("text"): - ele_dict[name]['ValType'] = "String" + ele_dict[formatted_name]['ValType'] = "String" break for child in node: child_attrib = {} if child.tag.endswith("choice"): - choice_str = handle_choice_element(child) + choice_str, processed_choices = handle_choice_element(child) if choice_str: - ele_dict[name]['ChildExactlyOne'] = f"{choice_str}" + ele_dict[formatted_name]['ChildExactlyOne'] = f"{choice_str}" + ele_dict[formatted_name].update(processed_choices) break if child.tag not in {f"{ns}zeroOrMore", f"{ns}oneOrMore", f"{ns}optional"}: - child_attrib = {'minOccurs': 1, 'maxOccurs': 1} - ele_dict[name].update(process_node(child, child_attrib)) + child_attrib = {'MinOccurs': 1, 'MaxOccurs': 1} + ele_dict[formatted_name].update(process_node(child, child_attrib)) - ele_dict[name].update(parent_attrib) + ele_dict[formatted_name].update(parent_attrib) return ele_dict @@ -54,13 +79,13 @@ def process_node(node, add_attrib={}): node_dict.update(process_element(node, add_attrib)) elif node.tag.endswith("optional"): for child in node: - node_dict.update(process_node(child, {'minOccurs': 0, 'maxOccurs': 1})) + node_dict.update(process_node(child, {'MinOccurs': 0, 'MaxOccurs': 1})) elif node.tag.endswith("zeroOrMore"): for child in node: - node_dict.update(process_node(child, {'minOccurs': 0})) + node_dict.update(process_node(child, {'MinOccurs': 0})) elif node.tag.endswith("oneOrMore"): for child in node: - node_dict.update(process_node(child, {'minOccurs': 1})) + node_dict.update(process_node(child, {'MinOccurs': 1})) else: for child in node: node_dict.update(process_node(child)) @@ -68,23 +93,25 @@ def process_node(node, add_attrib={}): return node_dict def generate_child_exactly_one_line(entity_type): - if entity_type == "facility": - options = facilities - elif entity_type == "region": - options = regions - elif entity_type == "institution": - options = institutions - else: - options = [] - return f"ChildExactlyOne=[{'|'.join(options)}]" + if entity_type == "facility": + options = facilities + elif entity_type == "region": + options = regions + elif entity_type == "institution": + options = institutions + else: + options = [] + return f"ChildExactlyOne=[{'|'.join(options)}]" def handle_choice_element(node): choices = [] + processed_choices = {} for child in node: - if child.tag == f"{ns}element": + if child.tag.endswith("element"): name = child.attrib.get('name', 'Unknown') - choices.append(name) - + formatted_name = f'"{name}"' if "-" in name else name + choices.append(formatted_name) + processed_choices.update(process_node(child, {})) if node.text: processed_schemas = None @@ -92,21 +119,20 @@ def handle_choice_element(node): processed_schemas = processed_facilities replacement_text = generate_child_exactly_one_line('facility') node.text = node.text.replace('@Facility_REFS@', replacement_text) - return f"[{' '.join(facilities)}]" + return f"[{' '.join(facilities)}]", {} elif '@Region_REFS@' in node.text: processed_schemas = processed_regions replacement_text = generate_child_exactly_one_line('region') node.text = node.text.replace('@Region_REFS@', replacement_text) - return f"[{' '.join(regions)}]" + return f"[{' '.join(regions)}]", {} elif '@Inst_REFS@' in node.text: processed_schemas = processed_institutions replacement_text = generate_child_exactly_one_line('institution') node.text = node.text.replace('@Inst_REFS@', replacement_text) - return f"[{' '.join(institutions)}]" + return f"[{' '.join(institutions)}]", {} - if choices: - return f"[{' '.join(choices)}]" - return "" + choice_str = f"[{' '.join(choices)}]" if choices else "" + return choice_str, processed_choices def process_schema_from_mjson(xml_string, element_name): root = ET.fromstring(xml_string) @@ -141,17 +167,17 @@ def custom_format(value): val_str = val_str.replace('\\"', '"') return val_str -def custom_serialize(obj, key_name="simulation", indent=0): +def custom_serialize(obj, key_name="simulation", indent_size=0): lines = [] - base_indent = " " * indent - child_indent = " " * (indent + 4) + child_indent_size = indent_size + 4 + base_indent = " " * indent_size + child_indent = " " * (child_indent_size) lines.append(f"{base_indent}{key_name}={{") for key, value in obj.items(): if isinstance(value, dict): - nested = custom_serialize(value, key, indent + 4) - lines.append(nested) + lines.append(custom_serialize(value, key, child_indent_size + 4)) else: val_str = custom_format(value) lines.append(f"{child_indent}{key}={val_str}") @@ -160,59 +186,38 @@ def custom_serialize(obj, key_name="simulation", indent=0): return "\n".join(lines) -def custom_serialize_for_template(obj, annotations, key_name, indent=0): +def generate_doc_lines_for_key(key, value, annotations, annotation_key, doc_indent, child_indent): lines = [] - base_indent = " " * indent - doc_indent = " " * (indent + 8) - child_indent = " " * (indent + 4) - - annotation_key = f":agents:{key_name}" - if annotation_key not in annotations: - matching_keys = [key for key in annotations.keys() if key.endswith(key_name)] - if matching_keys: - annotation_key = matching_keys[0] - else: - print(f"No annotations for {key_name}.") - return "" - - annotation = annotations[annotation_key] + optional = "(optional)" if value.get("MinOccurs", "1") == "0" else "" + type_str = f"[{value.get('ValType', 'Unknown')}]" + var_doc = annotations[annotation_key].get('vars', {}).get(key, {}).get('doc', 'No documentation available') + doc_lines = [f'%{optional} {type_str} {line}' for line in var_doc.split('\n')] - if 'vars' not in annotation or not isinstance(annotation['vars'], dict): - print(f"'vars' missing or not a dictionary in annotations for {key_name}.") - return "" + lines.extend([f"{doc_indent}{line}" for line in doc_lines]) - for key, value in obj.items(): - var_info = annotation['vars'].get(key, {}) - - if isinstance(var_info, str): - alias_info = annotation['vars'].get(var_info, {}) - if not isinstance(alias_info, dict): - print(f"Alias {var_info} for {key} in {key_name} does not point to a valid dictionary.") - continue - var_doc = alias_info.get('doc', 'No documentation available') - elif isinstance(var_info, dict): - var_doc = var_info.get('doc', 'No documentation available') - else: - print(f"Unexpected type {type(var_info)} for {key} in {key_name}.") - continue + default_val = annotations[annotation_key].get('vars', {}).get(key, {}).get('default', '') + default_str = " = " + (str(default_val) if default_val != '' else "") + lines.append(f"{child_indent}{key}{default_str}\n") + return lines +def custom_serialize_for_template(obj, annotations, key_name): + lines = [] + indent_size = 0 + tab = 4 * " " + base_indent = indent_size * " " + doc_indent = base_indent + 2 * tab + child_indent = base_indent + tab + + matching_keys = [key for key in annotations.keys() if key.endswith(key_name)] + annotation_key = matching_keys[0] doc_string = annotations.get(annotation_key, {}).get('doc', 'No documentation available') #PENDING: work around streams_ in Mixer not being a dictionary. - doc_lines = [f'%{line}' for line in doc_string.split('\n')] + [''] + doc_lines = [f'% {line}' for line in doc_string.split('\n')] + [''] lines.extend(doc_lines) lines.append(f'{base_indent}{key_name} {{') for key, value in obj.items(): - optional = "(optional)" if value.get("minOccurs", "1") == "0" else "" - type_str = f"[{value.get('ValType', 'Unknown')}]" - var_doc = annotations[annotation_key].get('vars', {}).get(key, {}).get('doc', 'No documentation') - doc_lines = [f'%{optional} {type_str} {line}' for line in var_doc.split('\n')] - - lines.extend([f"{doc_indent}{line}" for line in doc_lines]) - - default_val = annotations[annotation_key].get('vars', {}).get(key, {}).get('default', '') - default_str = " = " + (str(default_val) if default_val != '' else "") - lines.append(f"{child_indent}{key}{default_str}\n") + lines.extend(generate_doc_lines_for_key(key, value, annotations, annotation_key, doc_indent, child_indent)) lines.append(f'{base_indent}}}') lines.append('') @@ -228,49 +233,28 @@ def save_template_for_all_schemas(processed_schemas, annotations, folder_name = file.write(template_string) print(f"Template for {key_name} saved as {filename}") -if __name__ == "__main__": +def parse_arguments(): parser = argparse.ArgumentParser(description='Process an XML schema file and a corresponding JSON file, and output to a specified file.') parser.add_argument('--xml', type=str, required=True, help='The path to the XML schema file.') parser.add_argument('--json', type=str, required=True, help='The path to the JSON file.') parser.add_argument('--output', type=str, required=True, help='The path for the output file.') - args = parser.parse_args() + return parser.parse_args() + +if __name__ == "__main__": + args = parse_arguments() tree = ET.parse(args.xml) root = tree.getroot() ns = re.match(r'\{.*\}', root.tag).group(0) simulation = root[0][0] - - conversion_dict = { - 'string': 'String', - 'nonNegativeInteger': 'Int', - 'boolean': 'Boolean', - 'double': 'double', - 'positiveInteger': 'Int', - 'float': 'Real', - 'duration': 'Int', - 'integer': 'Int', - 'nonPositiveInteger': 'Int', - 'negativeInteger': 'Int', - 'long': 'Real', - 'int': 'Int', - 'token': 'String' - } with open(args.json, 'r') as file: - m_json = json.load(file) - - processed_facilities = {} - processed_regions = {} - processed_institutions = {} - - facilities = [] - regions = [] - institutions = [] + cyclus_metadata = json.load(file) - for spec in m_json["schema"]: + for spec in cyclus_metadata["schema"]: element_name = spec.split(":")[-1] - xml_content = m_json["schema"][spec] - entity_type = m_json["annotations"][spec]["entity"] + xml_content = cyclus_metadata["schema"][spec] + entity_type = cyclus_metadata["annotations"][spec]["entity"] entity_name = spec.split(":")[-1] processed_and_wrapped = process_schema_from_mjson(xml_content, element_name) if entity_type == "facility": @@ -286,12 +270,10 @@ def save_template_for_all_schemas(processed_schemas, annotations, folder_name = result = process_node(simulation) final_result = {"simulation": result["simulation"]} + input_tmpl_entry = {"InputTmpl": "init_template"} + final_result['simulation'] = {**input_tmpl_entry, **final_result['simulation']} final_result_detailed_schemas = integrate_detailed_schemas(final_result, processed_facilities, processed_regions, processed_institutions) - - # Intermediate file created to track parsing progress. - # with open("sample_schema_product.json", "w") as outfile: - # json.dump(final_result_detailed_schemas, outfile, indent=4) serialized_string = custom_serialize(final_result_detailed_schemas["simulation"]) @@ -300,6 +282,6 @@ def save_template_for_all_schemas(processed_schemas, annotations, folder_name = sch_file.write(serialized_string) # These following lines create the templates and save them in a folder named "Templates" -# save_template_for_all_schemas(processed_facilities, m_json["annotations"]) -# save_template_for_all_schemas(processed_regions, m_json["annotations"]) -# save_template_for_all_schemas(processed_institutions, m_json["annotations"]) \ No newline at end of file +save_template_for_all_schemas(processed_facilities, cyclus_metadata["annotations"]) +save_template_for_all_schemas(processed_regions, cyclus_metadata["annotations"]) +save_template_for_all_schemas(processed_institutions, cyclus_metadata["annotations"]) \ No newline at end of file