Skip to content

Commit

Permalink
Improved readability and formatting; modification to handle_choice_el…
Browse files Browse the repository at this point in the history
…ement.
  • Loading branch information
GersonEsquivel committed Mar 31, 2024
1 parent bc2879d commit 06353d2
Showing 1 changed file with 102 additions and 120 deletions.
222 changes: 102 additions & 120 deletions XML_to_SON/grammar_parser_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,70 @@
import os.path
import argparse

processed_facilities = {}
processed_regions = {}
processed_institutions = {}

facilities = []
regions = []
institutions = []

def get_element_type(node):

conversion_dict = {
'string': 'String',
'nonNegativeInteger': 'Int',
'boolean': 'Int',
'double': 'Real',
'positiveInteger': 'Int',
'float': 'Real',
'duration': 'Int',
'integer': 'Int',
'nonPositiveInteger': 'Int',
'negativeInteger': 'Int',
'long': 'Real',
'int': 'Int',
'token': 'String'
}

for child in node:
if child.tag.endswith("data") and "type" in child.attrib:
xml_type = child.attrib['type']
return conversion_dict.get(xml_type, 'Unknown')
return None

def process_element(node, parent_attrib=None):
if parent_attrib is None:
parent_attrib = {}
def process_element(node, parent_attrib={}):
ele_dict = {}
name = node.attrib.get('name')
ele_dict[name] = {}
name = node.attrib.get('name')
formatted_name = f'"{name}"' if "-" in name else name
ele_dict[formatted_name] = {}

if not parent_attrib:
ele_dict[name]['minOccurs'] = 1
ele_dict[name]['maxOccurs'] = 1
ele_dict[formatted_name]['MinOccurs'] = 1
ele_dict[formatted_name]['MaxOccurs'] = 1

element_type = get_element_type(node)
if element_type:
ele_dict[name]['ValType'] = element_type
ele_dict[formatted_name]['ValType'] = element_type
else:
for child in node:
if child.tag.endswith("text"):
ele_dict[name]['ValType'] = "String"
ele_dict[formatted_name]['ValType'] = "String"
break

for child in node:
child_attrib = {}
if child.tag.endswith("choice"):
choice_str = handle_choice_element(child)
choice_str, processed_choices = handle_choice_element(child)
if choice_str:
ele_dict[name]['ChildExactlyOne'] = f"{choice_str}"
ele_dict[formatted_name]['ChildExactlyOne'] = f"{choice_str}"
ele_dict[formatted_name].update(processed_choices)
break
if child.tag not in {f"{ns}zeroOrMore", f"{ns}oneOrMore", f"{ns}optional"}:
child_attrib = {'minOccurs': 1, 'maxOccurs': 1}
ele_dict[name].update(process_node(child, child_attrib))
child_attrib = {'MinOccurs': 1, 'MaxOccurs': 1}
ele_dict[formatted_name].update(process_node(child, child_attrib))

ele_dict[name].update(parent_attrib)
ele_dict[formatted_name].update(parent_attrib)

return ele_dict

Expand All @@ -54,59 +79,60 @@ def process_node(node, add_attrib={}):
node_dict.update(process_element(node, add_attrib))
elif node.tag.endswith("optional"):
for child in node:
node_dict.update(process_node(child, {'minOccurs': 0, 'maxOccurs': 1}))
node_dict.update(process_node(child, {'MinOccurs': 0, 'MaxOccurs': 1}))
elif node.tag.endswith("zeroOrMore"):
for child in node:
node_dict.update(process_node(child, {'minOccurs': 0}))
node_dict.update(process_node(child, {'MinOccurs': 0}))
elif node.tag.endswith("oneOrMore"):
for child in node:
node_dict.update(process_node(child, {'minOccurs': 1}))
node_dict.update(process_node(child, {'MinOccurs': 1}))
else:
for child in node:
node_dict.update(process_node(child))

return node_dict

def generate_child_exactly_one_line(entity_type):
if entity_type == "facility":
options = facilities
elif entity_type == "region":
options = regions
elif entity_type == "institution":
options = institutions
else:
options = []
return f"ChildExactlyOne=[{'|'.join(options)}]"
if entity_type == "facility":
options = facilities
elif entity_type == "region":
options = regions
elif entity_type == "institution":
options = institutions
else:
options = []
return f"ChildExactlyOne=[{'|'.join(options)}]"

def handle_choice_element(node):
choices = []
processed_choices = {}
for child in node:
if child.tag == f"{ns}element":
if child.tag.endswith("element"):
name = child.attrib.get('name', 'Unknown')
choices.append(name)

formatted_name = f'"{name}"' if "-" in name else name
choices.append(formatted_name)
processed_choices.update(process_node(child, {}))

if node.text:
processed_schemas = None
if '@Facility_REFS@' in node.text:
processed_schemas = processed_facilities
replacement_text = generate_child_exactly_one_line('facility')
node.text = node.text.replace('@Facility_REFS@', replacement_text)
return f"[{' '.join(facilities)}]"
return f"[{' '.join(facilities)}]", {}
elif '@Region_REFS@' in node.text:
processed_schemas = processed_regions
replacement_text = generate_child_exactly_one_line('region')
node.text = node.text.replace('@Region_REFS@', replacement_text)
return f"[{' '.join(regions)}]"
return f"[{' '.join(regions)}]", {}
elif '@Inst_REFS@' in node.text:
processed_schemas = processed_institutions
replacement_text = generate_child_exactly_one_line('institution')
node.text = node.text.replace('@Inst_REFS@', replacement_text)
return f"[{' '.join(institutions)}]"
return f"[{' '.join(institutions)}]", {}

if choices:
return f"[{' '.join(choices)}]"
return ""
choice_str = f"[{' '.join(choices)}]" if choices else ""
return choice_str, processed_choices

def process_schema_from_mjson(xml_string, element_name):
root = ET.fromstring(xml_string)
Expand Down Expand Up @@ -141,17 +167,17 @@ def custom_format(value):
val_str = val_str.replace('\\"', '"')
return val_str

def custom_serialize(obj, key_name="simulation", indent=0):
def custom_serialize(obj, key_name="simulation", indent_size=0):
lines = []
base_indent = " " * indent
child_indent = " " * (indent + 4)
child_indent_size = indent_size + 4
base_indent = " " * indent_size
child_indent = " " * (child_indent_size)

lines.append(f"{base_indent}{key_name}={{")

for key, value in obj.items():
if isinstance(value, dict):
nested = custom_serialize(value, key, indent + 4)
lines.append(nested)
lines.append(custom_serialize(value, key, child_indent_size + 4))
else:
val_str = custom_format(value)
lines.append(f"{child_indent}{key}={val_str}")
Expand All @@ -160,59 +186,38 @@ def custom_serialize(obj, key_name="simulation", indent=0):

return "\n".join(lines)

def custom_serialize_for_template(obj, annotations, key_name, indent=0):
def generate_doc_lines_for_key(key, value, annotations, annotation_key, doc_indent, child_indent):
lines = []
base_indent = " " * indent
doc_indent = " " * (indent + 8)
child_indent = " " * (indent + 4)

annotation_key = f":agents:{key_name}"
if annotation_key not in annotations:
matching_keys = [key for key in annotations.keys() if key.endswith(key_name)]
if matching_keys:
annotation_key = matching_keys[0]
else:
print(f"No annotations for {key_name}.")
return ""

annotation = annotations[annotation_key]
optional = "(optional)" if value.get("MinOccurs", "1") == "0" else ""
type_str = f"[{value.get('ValType', 'Unknown')}]"
var_doc = annotations[annotation_key].get('vars', {}).get(key, {}).get('doc', 'No documentation available')
doc_lines = [f'%{optional} {type_str} {line}' for line in var_doc.split('\n')]

if 'vars' not in annotation or not isinstance(annotation['vars'], dict):
print(f"'vars' missing or not a dictionary in annotations for {key_name}.")
return ""
lines.extend([f"{doc_indent}{line}" for line in doc_lines])

for key, value in obj.items():
var_info = annotation['vars'].get(key, {})

if isinstance(var_info, str):
alias_info = annotation['vars'].get(var_info, {})
if not isinstance(alias_info, dict):
print(f"Alias {var_info} for {key} in {key_name} does not point to a valid dictionary.")
continue
var_doc = alias_info.get('doc', 'No documentation available')
elif isinstance(var_info, dict):
var_doc = var_info.get('doc', 'No documentation available')
else:
print(f"Unexpected type {type(var_info)} for {key} in {key_name}.")
continue
default_val = annotations[annotation_key].get('vars', {}).get(key, {}).get('default', '')
default_str = " = " + (str(default_val) if default_val != '' else "")
lines.append(f"{child_indent}{key}{default_str}\n")
return lines

def custom_serialize_for_template(obj, annotations, key_name):
lines = []
indent_size = 0
tab = 4 * " "
base_indent = indent_size * " "
doc_indent = base_indent + 2 * tab
child_indent = base_indent + tab

matching_keys = [key for key in annotations.keys() if key.endswith(key_name)]
annotation_key = matching_keys[0]

doc_string = annotations.get(annotation_key, {}).get('doc', 'No documentation available') #PENDING: work around streams_ in Mixer not being a dictionary.
doc_lines = [f'%{line}' for line in doc_string.split('\n')] + ['']
doc_lines = [f'% {line}' for line in doc_string.split('\n')] + ['']
lines.extend(doc_lines)
lines.append(f'{base_indent}{key_name} {{')

for key, value in obj.items():
optional = "(optional)" if value.get("minOccurs", "1") == "0" else ""
type_str = f"[{value.get('ValType', 'Unknown')}]"
var_doc = annotations[annotation_key].get('vars', {}).get(key, {}).get('doc', 'No documentation')
doc_lines = [f'%{optional} {type_str} {line}' for line in var_doc.split('\n')]

lines.extend([f"{doc_indent}{line}" for line in doc_lines])

default_val = annotations[annotation_key].get('vars', {}).get(key, {}).get('default', '')
default_str = " = " + (str(default_val) if default_val != '' else "")
lines.append(f"{child_indent}{key}{default_str}\n")
lines.extend(generate_doc_lines_for_key(key, value, annotations, annotation_key, doc_indent, child_indent))

lines.append(f'{base_indent}}}')
lines.append('')
Expand All @@ -228,49 +233,28 @@ def save_template_for_all_schemas(processed_schemas, annotations, folder_name =
file.write(template_string)
print(f"Template for {key_name} saved as {filename}")

if __name__ == "__main__":
def parse_arguments():
parser = argparse.ArgumentParser(description='Process an XML schema file and a corresponding JSON file, and output to a specified file.')
parser.add_argument('--xml', type=str, required=True, help='The path to the XML schema file.')
parser.add_argument('--json', type=str, required=True, help='The path to the JSON file.')
parser.add_argument('--output', type=str, required=True, help='The path for the output file.')
args = parser.parse_args()
return parser.parse_args()

if __name__ == "__main__":
args = parse_arguments()

tree = ET.parse(args.xml)
root = tree.getroot()
ns = re.match(r'\{.*\}', root.tag).group(0)
simulation = root[0][0]

conversion_dict = {
'string': 'String',
'nonNegativeInteger': 'Int',
'boolean': 'Boolean',
'double': 'double',
'positiveInteger': 'Int',
'float': 'Real',
'duration': 'Int',
'integer': 'Int',
'nonPositiveInteger': 'Int',
'negativeInteger': 'Int',
'long': 'Real',
'int': 'Int',
'token': 'String'
}

with open(args.json, 'r') as file:
m_json = json.load(file)

processed_facilities = {}
processed_regions = {}
processed_institutions = {}

facilities = []
regions = []
institutions = []
cyclus_metadata = json.load(file)

for spec in m_json["schema"]:
for spec in cyclus_metadata["schema"]:
element_name = spec.split(":")[-1]
xml_content = m_json["schema"][spec]
entity_type = m_json["annotations"][spec]["entity"]
xml_content = cyclus_metadata["schema"][spec]
entity_type = cyclus_metadata["annotations"][spec]["entity"]
entity_name = spec.split(":")[-1]
processed_and_wrapped = process_schema_from_mjson(xml_content, element_name)
if entity_type == "facility":
Expand All @@ -286,12 +270,10 @@ def save_template_for_all_schemas(processed_schemas, annotations, folder_name =
result = process_node(simulation)

final_result = {"simulation": result["simulation"]}
input_tmpl_entry = {"InputTmpl": "init_template"}
final_result['simulation'] = {**input_tmpl_entry, **final_result['simulation']}

final_result_detailed_schemas = integrate_detailed_schemas(final_result, processed_facilities, processed_regions, processed_institutions)

# Intermediate file created to track parsing progress.
# with open("sample_schema_product.json", "w") as outfile:
# json.dump(final_result_detailed_schemas, outfile, indent=4)

serialized_string = custom_serialize(final_result_detailed_schemas["simulation"])

Expand All @@ -300,6 +282,6 @@ def save_template_for_all_schemas(processed_schemas, annotations, folder_name =
sch_file.write(serialized_string)

# These following lines create the templates and save them in a folder named "Templates"
# save_template_for_all_schemas(processed_facilities, m_json["annotations"])
# save_template_for_all_schemas(processed_regions, m_json["annotations"])
# save_template_for_all_schemas(processed_institutions, m_json["annotations"])
save_template_for_all_schemas(processed_facilities, cyclus_metadata["annotations"])
save_template_for_all_schemas(processed_regions, cyclus_metadata["annotations"])
save_template_for_all_schemas(processed_institutions, cyclus_metadata["annotations"])

0 comments on commit 06353d2

Please sign in to comment.