diff --git a/src/mapex/write_parsed_mappings.py b/src/mapex/write_parsed_mappings.py index 1e2fcd36..6c277979 100644 --- a/src/mapex/write_parsed_mappings.py +++ b/src/mapex/write_parsed_mappings.py @@ -179,24 +179,28 @@ def write_parsed_mappings_navigator_layer(parsed_mappings, filepath): def write_parsed_mappings_stix(parsed_mappings, filepath): + created_date = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") # create bundle bundle_uuid = str(uuid.uuid4()) stix_bundle = { "type": "bundle", "id": f"bundle--{bundle_uuid}", "spec_version": "2.1", - "created": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), - "modified": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "created": created_date, + "modified": created_date, "objects": [], } technique_target_dict = load_attack_json(parsed_mappings) for mapping in parsed_mappings["mapping_objects"]: # create SDO for each capability - if not any( - stix_object.get("name") == mapping["capability_id"] - for stix_object in stix_bundle["objects"] + if ( + not any( + stix_object.get("name") == mapping["capability_id"] + for stix_object in stix_bundle["objects"] + ) + and mapping["capability_id"] ): - stix_object = get_stix_object(parsed_mappings, mapping) + stix_object = get_stix_object(parsed_mappings, mapping, created_date) stix_bundle["objects"].append(stix_object) # add attack pattern SDO for each technique/subtechnique @@ -205,7 +209,8 @@ def write_parsed_mappings_stix(parsed_mappings, filepath): stix_object["id"] for stix_object in stix_bundle["objects"] if stix_object.get("name") == mapping["capability_id"] - ][0] + ] + related_target_ref = technique_target_dict.get(mapping["attack_object_id"], "") # account for None mapping_type on not_mappable items mapping_type = ( @@ -214,25 +219,21 @@ def write_parsed_mappings_stix(parsed_mappings, filepath): else None ) # do not add a relationship node for a non-mappable technique - if not mapping_type == "non-mappable" and technique_target_dict.get( - mapping["attack_object_id"], "" + if ( + related_source_ref + and related_target_ref + and not mapping_type == "non-mappable" ): stix_bundle["objects"].append( { "type": "relationship", "id": f"relationship--{relationship_uuid}", "spec_version": "2.1", - "created": datetime.now(timezone.utc) - .isoformat() - .replace("+00:00", "Z"), - "modified": datetime.now(timezone.utc) - .isoformat() - .replace("+00:00", "Z"), + "created": created_date, + "modified": created_date, "relationship_type": mapping_type, - "source_ref": related_source_ref, - "target_ref": technique_target_dict.get( - mapping["attack_object_id"], "" - ), + "source_ref": related_source_ref[0], + "target_ref": related_target_ref, }, ) # only write to file if stix is valid @@ -261,29 +262,29 @@ def write_parsed_mappings_stix(parsed_mappings, filepath): ) -def get_stix_object(parsed_mappings, mapping): +def get_stix_object(parsed_mappings, mapping, created_date): mapping_framwork = parsed_mappings["metadata"]["mapping_framework"] infrastructure_frameworks = ["nist_800_53", "aws", "gcp", "azure", "m365"] if mapping_framwork == "cve": - return create_vulnerability_object(mapping) + return create_vulnerability_object(mapping, created_date) elif mapping_framwork in infrastructure_frameworks: - return create_infrastructure_object(mapping) + return create_infrastructure_object(mapping, created_date) elif mapping_framwork == "veris": - return create_attack_pattern_object(mapping) + return create_attack_pattern_object(mapping, created_date) else: logger.warning( "Cannot create STIX export for mappings with unrecognized mapping framework" ) -def create_vulnerability_object(mapping): +def create_vulnerability_object(mapping, created_date): vulnerability_uuid = str(uuid.uuid4()) return { "type": "vulnerability", "id": f"vulnerability--{vulnerability_uuid}", "spec_version": "2.1", - "created": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), - "modified": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "created": created_date, + "modified": created_date, "name": mapping["capability_id"], "description": mapping["capability_description"], "external_references": [ @@ -296,27 +297,27 @@ def create_vulnerability_object(mapping): } -def create_infrastructure_object(mapping): +def create_infrastructure_object(mapping, created_date): infrastructure_uuid = str(uuid.uuid4()) return { "type": "infrastructure", "spec_version": "2.1", "id": f"infrastructure--{infrastructure_uuid}", "name": mapping["capability_id"], - "created": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), - "modified": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "created": created_date, + "modified": created_date, } -def create_attack_pattern_object(mapping): +def create_attack_pattern_object(mapping, created_date): attack_pattern_uuid = str(uuid.uuid4()) return { "type": "attack-pattern", "spec_version": "2.1", "id": f"attack-pattern--{attack_pattern_uuid}", "name": mapping["capability_id"], - "created": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), - "modified": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "created": created_date, + "modified": created_date, }