Skip to content

Commit

Permalink
chore(sarif-rendering): run black and fix dependencies for install
Browse files Browse the repository at this point in the history
  • Loading branch information
ReversingWithMe committed Mar 24, 2024
1 parent 819a340 commit 41ea0c4
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 103 deletions.
207 changes: 104 additions & 103 deletions capa/render/sarif.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import Optional, List


def render(meta, rules: RuleSet, capabilities: MatchResults, ghidra_compat = False) -> str:
def render(meta, rules: RuleSet, capabilities: MatchResults, ghidra_compat=False) -> str:
# Dump to JSON
data: str = rd.ResultDocument.from_capa(meta, rules, capabilities).model_dump_json(exclude_none=True)
try:
Expand All @@ -31,14 +31,14 @@ def render(meta, rules: RuleSet, capabilities: MatchResults, ghidra_compat = Fal

# Marshall json into Sarif
# Create baseline sarif structure to be populated from json data
sarif_structure: Optional[dict] = _sarif_boilerplate(json_data['meta'], json_data['rules'])
sarif_structure: Optional[dict] = _sarif_boilerplate(json_data["meta"], json_data["rules"])
if sarif_structure is None:
print('An Error has occured.')
print("An Error has occured.")
return ""

_populate_artifact(sarif_structure, json_data['meta'])
_populate_invoations(sarif_structure, json_data['meta'])
_populate_results(sarif_structure, json_data['rules'], ghidra_compat)
_populate_artifact(sarif_structure, json_data["meta"])
_populate_invoations(sarif_structure, json_data["meta"])
_populate_results(sarif_structure, json_data["rules"], ghidra_compat)

return json.dumps(sarif_structure, indent=4)

Expand All @@ -51,38 +51,46 @@ def _sarif_boilerplate(data_meta: dict, data_rules: dict) -> Optional[dict]:

# Use attack as default, if both exist then only use attack, if neither exist use the name of rule for ruleID
# FIXME:: this is not good practice to use long name for ruleID, expect this to yell at me.
attack_length = len(data_rules[key]['meta']['attack'])
mbc_length = len(data_rules[key]['meta']['mbc'])
attack_length = len(data_rules[key]["meta"]["attack"])
mbc_length = len(data_rules[key]["meta"]["mbc"])
if attack_length or mbc_length:
id = data_rules[key]['meta']['attack'][0]['id'] if attack_length > 0 else data_rules[key]['meta']['mbc'][0]['id']
id = (
data_rules[key]["meta"]["attack"][0]["id"]
if attack_length > 0
else data_rules[key]["meta"]["mbc"][0]["id"]
)
else:
id = data_rules[key]['meta']['name']
id = data_rules[key]["meta"]["name"]

# Append current rule
rules.append({
# Default to attack identifier, fall back to MBC, mainly relevant if both are present
'id': id,
'name': data_rules[key]['meta']['name'],
'shortDescription': {'text': data_rules[key]['meta']['name']},
'messageStrings': {'default': {'text': data_rules[key]['meta']['name']}},
'properties': {
'namespace': data_rules[key]['meta']['namespace'] if 'namespace' in data_rules[key]['meta'] else [],
'scopes': data_rules[key]['meta']['scopes'],
'references': data_rules[key]['meta']['references'],
'lib': data_rules[key]['meta']['lib']
}
})
rules.append(
{
# Default to attack identifier, fall back to MBC, mainly relevant if both are present
"id": id,
"name": data_rules[key]["meta"]["name"],
"shortDescription": {"text": data_rules[key]["meta"]["name"]},
"messageStrings": {"default": {"text": data_rules[key]["meta"]["name"]}},
"properties": {
"namespace": data_rules[key]["meta"]["namespace"] if "namespace" in data_rules[key]["meta"] else [],
"scopes": data_rules[key]["meta"]["scopes"],
"references": data_rules[key]["meta"]["references"],
"lib": data_rules[key]["meta"]["lib"],
},
}
)

tool = Tool(driver=ToolComponent(
name="Capa",
version=__version__,
information_uri="https://github.com/mandiant/capa",
rules=rules
)
)
tool = Tool(
driver=ToolComponent(
name="Capa", version=__version__, information_uri="https://github.com/mandiant/capa", rules=rules
)
)

# Create a SARIF Log object, populate with a single run
sarif_log = SarifLog(version="2.1.0", schema_uri="https://docs.oasis-open.org/sarif/sarif/v2.1.0/cos02/schemas/sarif-schema-2.1.0.json", runs=[Run(tool=tool, results=[], artifacts=[], invocations=[])])
sarif_log = SarifLog(
version="2.1.0",
schema_uri="https://docs.oasis-open.org/sarif/sarif/v2.1.0/cos02/schemas/sarif-schema-2.1.0.json",
runs=[Run(tool=tool, results=[], artifacts=[], invocations=[])],
)

# Convert the SARIF log to a dictionary and then to a JSON string
try:
Expand All @@ -96,71 +104,67 @@ def _sarif_boilerplate(data_meta: dict, data_rules: dict) -> Optional[dict]:

def _populate_artifact(sarif_log: dict, meta_data: dict) -> None:
"""
@param sarif_log: dict - sarif data structure including runs
@param meta_data: dict - Capa meta output
@param sarif_log: dict - sarif data structure including runs
@param meta_data: dict - Capa meta output
@returns None, updates sarif_log via side-effects
@returns None, updates sarif_log via side-effects
"""
sample = meta_data['sample']
sample = meta_data["sample"]
artifact = {
"location": {"uri": sample['path']},
"location": {"uri": sample["path"]},
"roles": ["analysisTarget"],
"hashes": {
"md5": sample["md5"],
"sha-1": sample["sha1"],
"sha-256": sample["sha256"]
}
"hashes": {"md5": sample["md5"], "sha-1": sample["sha1"], "sha-256": sample["sha256"]},
}
sarif_log['runs'][0]['artifacts'].append(artifact)
sarif_log["runs"][0]["artifacts"].append(artifact)


def _populate_invoations(sarif_log: dict, meta_data: dict) -> None:
"""
@param sarif_log: dict - sarif data structure including runs
@param meta_data: dict - Capa meta output
@param sarif_log: dict - sarif data structure including runs
@param meta_data: dict - Capa meta output
@returns None, updates sarif_log via side-effects
@returns None, updates sarif_log via side-effects
"""
analysis_time = meta_data['timestamp']
argv = meta_data['argv']
analysis = meta_data['analysis']
analysis_time = meta_data["timestamp"]
argv = meta_data["argv"]
analysis = meta_data["analysis"]
invoke = {
"commandLine": 'capa ' + ' '.join(argv),
"commandLine": "capa " + " ".join(argv),
"arguments": argv if len(argv) > 0 else [],
"endTimeUtc": analysis_time,
"executionSuccessful": True,
"properties": {
'format': analysis['format'],
'arch': analysis['arch'],
'os': analysis['os'],
'extractor': analysis['extractor'],
'rule_location': analysis['rules'],
'base_address': analysis['base_address'],
}
"format": analysis["format"],
"arch": analysis["arch"],
"os": analysis["os"],
"extractor": analysis["extractor"],
"rule_location": analysis["rules"],
"base_address": analysis["base_address"],
},
}
sarif_log['runs'][0]['invocations'].append(invoke)
sarif_log["runs"][0]["invocations"].append(invoke)


def _enumerate_evidence(node: dict, related_count: int) -> List[dict]:
related_locations = []
if node.get('success') and node.get('node').get('type') != 'statement':
label = ''
if node.get('node').get('type') == 'feature':
if node.get('node').get('feature').get('type') == 'api':
label = 'api: ' + node.get('node').get('feature').get('api')
elif node.get('node').get('feature').get('type') == 'match':
label = 'match: ' + node.get('node').get('feature').get('match')
elif node.get('node').get('feature').get('type') == 'number':
if node.get("success") and node.get("node").get("type") != "statement":
label = ""
if node.get("node").get("type") == "feature":
if node.get("node").get("feature").get("type") == "api":
label = "api: " + node.get("node").get("feature").get("api")
elif node.get("node").get("feature").get("type") == "match":
label = "match: " + node.get("node").get("feature").get("match")
elif node.get("node").get("feature").get("type") == "number":
label = f"number: {node.get('node').get('feature').get('description')} ({node.get('node').get('feature').get('number')})"
elif node.get('node').get('feature').get('type') == 'offset':
elif node.get("node").get("feature").get("type") == "offset":
label = f"offset: {node.get('node').get('feature').get('description')} ({node.get('node').get('feature').get('offset')})"
elif node.get('node').get('feature').get('type') == 'mnemonic':
elif node.get("node").get("feature").get("type") == "mnemonic":
label = f"mnemonic: {node.get('node').get('feature').get('mnemonic')}"
elif node.get('node').get('feature').get('type') == 'characteristic':
elif node.get("node").get("feature").get("type") == "characteristic":
label = f"characteristic: {node.get('node').get('feature').get('characteristic')}"
elif node.get('node').get('feature').get('type') == 'os':
elif node.get("node").get("feature").get("type") == "os":
label = f"os: {node.get('node').get('feature').get('os')}"
elif node.get('node').get('feature').get('type') == 'operand number':
elif node.get("node").get("feature").get("type") == "operand number":
label = f"operand: ({node.get('node').get('feature').get('index')} ) {node.get('node').get('feature').get('description')} ({node.get('node').get('feature').get('operand_number')})"
else:
print(f"Not implemented {node.get('node').get('feature').get('type')}", file=sys.stderr)
Expand All @@ -169,74 +173,71 @@ def _enumerate_evidence(node: dict, related_count: int) -> List[dict]:
print(f"Not implemented {node.get('node').get('type')}", file=sys.stderr)
return []

for loc in node.get('locations'):
if loc['type'] != 'absolute':
for loc in node.get("locations"):
if loc["type"] != "absolute":
continue

related_locations.append({
'id': related_count,
'message': {
'text': label
},
'physicalLocation': {
'address': {
'absoluteAddress': loc['value']
}
related_locations.append(
{
"id": related_count,
"message": {"text": label},
"physicalLocation": {"address": {"absoluteAddress": loc["value"]}},
}
})
)
related_count += 1

if node.get('success') and node.get('node').get('type') == 'statement':
for child in node.get('children'):
if node.get("success") and node.get("node").get("type") == "statement":
for child in node.get("children"):
related_locations += _enumerate_evidence(child, related_count)

return related_locations


def _populate_results(sarif_log: dict, data_rules: dict, ghidra_compat: bool) -> None:
"""
@param sarif_log: dict - sarif data structure including runs
@param meta_data: dict - Capa meta output
@param sarif_log: dict - sarif data structure including runs
@param meta_data: dict - Capa meta output
@returns None, updates sarif_log via side-effects
@returns None, updates sarif_log via side-effects
"""
results = sarif_log['runs'][0]['results']

results = sarif_log["runs"][0]["results"]

# Parse rules from parsed sarif structure
for key in data_rules:

# Use attack as default, if both exist then only use attack, if neither exist use the name of rule for ruleID
# FIXME:: this is not good practice to use long name for ruleID, expect this to yell at me.
attack_length = len(data_rules[key]['meta']['attack'])
mbc_length = len(data_rules[key]['meta']['mbc'])
attack_length = len(data_rules[key]["meta"]["attack"])
mbc_length = len(data_rules[key]["meta"]["mbc"])
if attack_length or mbc_length:
id = data_rules[key]['meta']['attack'][0]['id'] if attack_length > 0 else data_rules[key]['meta']['mbc'][0]['id']
id = (
data_rules[key]["meta"]["attack"][0]["id"]
if attack_length > 0
else data_rules[key]["meta"]["mbc"][0]["id"]
)
else:
id = data_rules[key]['meta']['name']
id = data_rules[key]["meta"]["name"]

for address, details in data_rules[key]['matches']:
for address, details in data_rules[key]["matches"]:
related_cnt = 0
related_locations = _enumerate_evidence(details, related_cnt)

res = {
"ruleId": id,
"level": "none" if not ghidra_compat else 'NONE',
"message": {
"text": data_rules[key]['meta']['name']
},
"kind": "informational" if not ghidra_compat else 'INFORMATIONAL',
"level": "none" if not ghidra_compat else "NONE",
"message": {"text": data_rules[key]["meta"]["name"]},
"kind": "informational" if not ghidra_compat else "INFORMATIONAL",
"locations": [
{
"physicalLocation": {
"address": {
"absoluteAddress": address['value'],
"absoluteAddress": address["value"],
}
},
}
]
],
}
if not ghidra_compat:
res['relatedLocations'] = related_locations
res["relatedLocations"] = related_locations

results.append(res)
results.append(res)
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ dependencies = [
"dncil==1.0.2",
"pydantic==2.4.0",
"protobuf==4.23.4",
"sarif_om==1.0.4",
"jschema_to_python==1.2.3"
]
dynamic = ["version"]

Expand Down

0 comments on commit 41ea0c4

Please sign in to comment.