From e6edf434e3aa082a7f722284f8cbf462e20035bf Mon Sep 17 00:00:00 2001 From: jconnor0426 Date: Tue, 22 Aug 2023 15:15:24 -0400 Subject: [PATCH] Updating to be compliant with code style of project --- scripts/match-2-yar.py | 326 ++++++++++++++++++++++------------------- tests/test_scripts.py | 83 +++++------ 2 files changed, 220 insertions(+), 189 deletions(-) diff --git a/scripts/match-2-yar.py b/scripts/match-2-yar.py index 8dcffed9d..08a095e8f 100644 --- a/scripts/match-2-yar.py +++ b/scripts/match-2-yar.py @@ -1,8 +1,16 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python3 """ +Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. +You may obtain a copy of the License at: [package root]/LICENSE.txt +Unless required by applicable law or agreed to in writing, software distributed under the License + is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. + match-2-yar -Invoke capa to extract the capabilities of the given sample or list of samples, +Invoke capa to extract the capabilities of the given sample or list of samples, and emit the matches as yara rules. When providing multiple samples or directories the tool will attempt to create @@ -27,8 +35,14 @@ import collections import multiprocessing import multiprocessing.pool -from datetime import date +from typing import Set, Dict, List from pathlib import Path +from datetime import date + +import dnfile +from envi.memcanvas import MemoryCanvas +from dncil.clr.token import Token +from vivisect.renderers import WorkspaceRenderer import capa.main import capa.rules @@ -43,39 +57,37 @@ from capa.features.common import OS_AUTO from capa.features.extractors.dnfile.extractor import DnfileFeatureExtractor -import dnfile -from dncil.clr.token import Token - -from envi.memcanvas import MemoryCanvas -from vivisect.renderers import WorkspaceRenderer - try: - from capstone import Cs, CS_ARCH_X86, CS_MODE_32, CS_MODE_64, CS_OPT_SYNTAX_INTEL - from mkyara import YaraGenerator import yaramod + from mkyara import YaraGenerator + from capstone import CS_MODE_32, CS_MODE_64, CS_ARCH_X86, CS_OPT_SYNTAX_INTEL, Cs except ImportError: - print("""\nFailed to import a module try installing required Python libraries with the following: + print( + """\nFailed to import a module try installing required Python libraries with the following: pip install mkyara yaramod -""" ) +""" + ) sys.exit(1) logger = logging.getLogger("capa.match-2-yar") -######## Vivisect Related Classes and Functions ######## +# Vivisect Related Classes and Functions + class BufferCanvas(MemoryCanvas): """Subclass of Vivisect Memory canvas that captures disassemlby output as a string rather than printing to std.out """ + output = "" def addText(self, text, tag=None): - """Overwriting the method responsible for writing to std.out - """ + """Overwriting the method responsible for writing to std.out""" self.output += text + def get_disassembly_output(vw, va, size): """Get Vivisect's disassembly view for a given virtual addresss and size @@ -83,7 +95,7 @@ def get_disassembly_output(vw, va, size): vw: Vivisect Workspace va: Virtual Address to start disassembling from size: size in bytes to disassemble - + Returns: str: String containing vivisect's disassembly output """ @@ -95,28 +107,29 @@ def get_disassembly_output(vw, va, size): def get_comment_for_func(vw, funcva): """Get a CodeFeature comment for a function - - This function gets the size of a function and - uses that to get a dump of the function disassembly + + This function gets the size of a function and + uses that to get a dump of the function disassembly with get_dissasembly_output Args: vw: Vivisect Workspace funcva: Virtual Address of function to analyze - + Returns: str: String containing disassembly output for a function """ funcsize = get_function_size(vw, funcva) return get_disassembly_output(vw, funcva, funcsize) + def get_comment_for_cb(vw, va): """Get a CodeFeature comment for a Code Block - - This function gets the size of a code block and - uses that to get a dump of the code block disassembly + + This function gets the size of a code block and + uses that to get a dump of the code block disassembly with get_dissasembly_output - + Args: vw: Vivisect Workspace va: Virtual Address of Codeblock to analyze @@ -128,13 +141,14 @@ def get_comment_for_cb(vw, va): cbva, cbsize, cbfunc = cb return get_disassembly_output(vw, cbva, cbsize) + def get_function_size(vw, funcva): """Return the size of a function based on vivisect analysis Args: vw: Vivisect Workspace funcva: Virtual Address of function to analyze - + Returns: int: size of the function """ @@ -142,7 +156,7 @@ def get_function_size(vw, funcva): if funcva not in vw.getFunctions(): funcva = vw.getFunction(funcva) if funcva is None: - raise Exception('Given funcva not a function or within a known function') + raise Exception("Given funcva not a function or within a known function") func_blocks = [cbva for cbva, _, _ in vw.getFunctionBlocks(funcva)] # Figure out the size of the first linear chunk # in this function... @@ -154,33 +168,35 @@ def get_function_size(vw, funcva): if cbfunc != funcva: break fsize += cbsize - cb = vw.getCodeBlock(cbva+cbsize) + cb = vw.getCodeBlock(cbva + cbsize) if fsize == 0: raise Exception("0 length function??!?1") - + return fsize + def get_function_bytes(vw, funcva): """Return the bytes from a function - + Args: vw: Vivisect Workspace funcva: Virtual Address of function to analyze - + Returns: bytes: bytes of a function """ fsize = get_function_size(vw, funcva) return vw.readMemory(funcva, fsize) + def get_cb_bytes(vw, va): """Return the bytes from a code block - + Args: vw: Vivisect Workspace va: Virtual Address to analyze - + Returns: int: size of the function """ @@ -189,12 +205,10 @@ def get_cb_bytes(vw, va): return vw.readMemory(cbva, cbsize) -######## Capstone Related Classes and Functions ######## +# Capstone Related Classes and Functions + +VIVI_ARCH_TO_CAPSTONE = {"i386": (CS_ARCH_X86, CS_MODE_32), "amd64": (CS_ARCH_X86, CS_MODE_64)} -VIVI_ARCH_TO_CAPSTONE = { - 'i386': (CS_ARCH_X86, CS_MODE_32), - 'amd64': (CS_ARCH_X86, CS_MODE_64) -} def mkyara_sig_generation(start_va, bytez, arch, mode): """Mask x86/x64 instructions and generate a signature @@ -207,7 +221,7 @@ def mkyara_sig_generation(start_va, bytez, arch, mode): bytez: byte string containing raw bytes of the function arch: Capstone Architecture to use (CS_ARCH_X86 covers 32 and 64bit x86) mode: Capstone mode to choose between 32 and 64 bit - + Returns: str: signature string in the form of "AA BB CC DD" """ @@ -226,9 +240,9 @@ def mkyara_sig_generation(start_va, bytez, arch, mode): sig += rule_part + " " return sig - -def genSigAndMask(start_va, bytez, vivi_arch='i386'): + +def genSigAndMask(start_va, bytez, vivi_arch="i386"): """Generate a signature and masked signature for a fuction virtual address This function performs the translation from vivisect arch @@ -238,24 +252,26 @@ def genSigAndMask(start_va, bytez, vivi_arch='i386'): start_va: virtual address of first instruction bytez: byte string containing raw bytes of the function vivi_arch: Vivisect architecture - + Returns: str: signature string in the form of "AA BB CC DD" """ - + arch, mode = VIVI_ARCH_TO_CAPSTONE[vivi_arch] # Other option for normal is loose, but we won't use those here return mkyara_sig_generation(start_va, bytez, arch, mode) -######## .NET Related Classes and Functions ######## + +# .NET Related Classes and Functions + def format_operand(pe, op): """Return a string representation of a .NET operand - + Use a dnfile object to reference .NET tables to understand methods, classes, and strings - + Args: pe: dnfile object for a .NET PE op: dncil operand from an instruction @@ -272,19 +288,27 @@ def format_operand(pe, op): elif isinstance(op, list): return f"[{', '.join(['({:04X})'.format(x) for x in op])}]" elif isinstance(op, dnfile.mdtable.MemberRefRow) and not isinstance(op.Class.row, dnfile.mdtable.TypeSpecRow): - return f"{str(op.Class.row.TypeNamespace)}.{op.Class.row.TypeName}::{op.Name}" + retstr = getattr(op.Class.row, "TypeNamespace", "") + if retstr != "": + retstr += "." + retstr += getattr(op.Class.row, "TypeName", "") + if retstr != "": + retstr += "::" + retstr += op.Name + return retstr elif isinstance(op, (dnfile.mdtable.FieldRow, dnfile.mdtable.MethodDefRow, dnfile.mdtable.MemberRefRow)): return f"{op.Name}" elif isinstance(op, (dnfile.mdtable.TypeDefRow, dnfile.mdtable.TypeRefRow)): - return f"{op.TypeNamespace}.{op.TypeName}" + return f"{op.TypeNamespace}.{op.TypeName}" elif isinstance(op, (dnfile.mdtable.TypeSpecRow, dnfile.mdtable.MethodSpecRow)): return f"{str(op.struct)}" else: return "" if op is None else str(op) + def get_sig_and_mask_for_dotnet_func(dnpe, body): """Return the comment, sig, and bytes of a .NET Method - + Iterate a method body to get IL bytes and mask the operand values to create a more flexible signature @@ -302,45 +326,47 @@ def get_sig_and_mask_for_dotnet_func(dnpe, body): func_bytes = "" for insn in body.instructions: comment += ( - "{:04X}".format(insn.offset) - + " " - + f"{' '.join('{:02x}'.format(b) for b in insn.get_bytes()) : <20}" - + f"{str(insn.opcode) : <15}" - + format_operand(dnpe, insn.operand) - + "\n" - ) + "{:04X}".format(insn.offset) + + " " + + f"{' '.join('{:02x}'.format(b) for b in insn.get_bytes()) : <20}" + + f"{str(insn.opcode) : <15}" + + format_operand(dnpe, insn.operand) + + "\n" + ) sig += insn.get_opcode_bytes().hex() func_bytes += insn.get_opcode_bytes().hex() if insn.operand: - sig += '??' * len(insn.get_operand_bytes()) + sig += "??" * len(insn.get_operand_bytes()) func_bytes += insn.get_operand_bytes().hex() # Format the sig to be in the same style as the vivi portion (bytes seperated by spaces) formatted_sig = "" for idx, val in enumerate(sig): - if idx > 0 and idx % 2 == 0: + if idx > 0 and idx % 2 == 0: formatted_sig += " " formatted_sig += val - - + return comment, formatted_sig, func_bytes -######## CodeFeature Extractor Related Classes and Functions ######## -class CodeFeature(): - """Basic object that that will be used to create yara rules - """ - def __init__(self, sig: str, comment: str, bytez: bytes, filemd5:str): +# CodeFeature Extractor Related Classes and Functions + + +class CodeFeature: + """Basic object that that will be used to create yara rules""" + + def __init__(self, sig: str, comment: str, bytez: bytes, filemd5: str): self.sig = sig self.comment = comment self.bytez = bytez self.filemd5 = filemd5 + def get_code_features_for_capa_doc(doc: rd.ResultDocument, extractor): - """Returns a dictionary mapping a filemd5 to a list of CodeFeatures - + """Returns a dictionary mapping a filemd5 to a list of CodeFeatures + This function operates on x86/x64 PE files and creates CodeFeatures based on basic block and function CAPA matches @@ -353,25 +379,24 @@ def get_code_features_for_capa_doc(doc: rd.ResultDocument, extractor): # Grab the vivisect workspace object try: file_vw = extractor.vw - except: + except AttributeError: print("No extractor workspace") file_vw = None raise - # Get the filemd5 + # Get the filemd5 filemd5 = doc.meta.sample.md5 - cb_matches = collections.defaultdict(set) func_matches = collections.defaultdict(set) for rule in rutils.capability_rules(doc): if rule.meta.scope == capa.rules.FUNCTION_SCOPE: - for addr, _ in rule.matches: - func_matches[addr.value].add(rule.meta.name) + for addr_object, _ in rule.matches: + func_matches[addr_object.value].add(rule.meta.name) elif rule.meta.scope == capa.rules.BASIC_BLOCK_SCOPE: - for addr, _ in rule.matches: - cb_matches[addr.value].add(rule.meta.name) + for addr_object, _ in rule.matches: + cb_matches[addr_object.value].add(rule.meta.name) else: # file scope pass @@ -386,7 +411,7 @@ def get_code_features_for_capa_doc(doc: rd.ResultDocument, extractor): bytez = get_cb_bytes(file_vw, addr) sig = genSigAndMask(addr, bytez, doc.meta.analysis.arch) - code_features.append(CodeFeature(sig,comment,bytez,filemd5)) + code_features.append(CodeFeature(sig, comment, bytez, filemd5)) for addr, rules in func_matches.items(): comment = f"function at 0x{addr:08x}@{filemd5} with {len(rules)} features:\n" @@ -396,16 +421,16 @@ def get_code_features_for_capa_doc(doc: rd.ResultDocument, extractor): bytez = get_function_bytes(file_vw, addr) sig = genSigAndMask(addr, bytez, doc.meta.analysis.arch) - code_features.append(CodeFeature(sig,comment,bytez,filemd5)) - + code_features.append(CodeFeature(sig, comment, bytez, filemd5)) if len(code_features) == 0: logger.warning("No code features found for %s", filemd5) return {filemd5: code_features} + def get_code_features_for_dotnet_doc(doc: rd.ResultDocument, extractor): - """Returns a dictionary mapping a filemd5 to a list of CodeFeatures - + """Returns a dictionary mapping a filemd5 to a list of CodeFeatures + This function operates on .NET PE files and creates CodeFeatures based on .NET method CAPA matches @@ -418,7 +443,7 @@ def get_code_features_for_dotnet_doc(doc: rd.ResultDocument, extractor): # Grab the vivisect workspace object try: dnpe = extractor.pe - except: + except AttributeError: print("No dnpe file found") raise @@ -428,20 +453,20 @@ def get_code_features_for_dotnet_doc(doc: rd.ResultDocument, extractor): for rule in rutils.capability_rules(doc): if rule.meta.scope == capa.rules.FUNCTION_SCOPE: - for addr, _ in rule.matches: - func_matches[addr.value].add(rule.meta.name) + for addr_object, _ in rule.matches: + func_matches[addr_object.value].add(rule.meta.name) else: # file scope pass - # Funcs is the cache of functions we need to reference to get + # Funcs is the cache of functions we need to reference to get # the underlying dnfile object funcs = list(extractor.get_functions()) # Return list of CodeFeature objects code_features = [] - logger.debug(f"Building CodeFeatures for {len(func_matches.keys())} functions in {filemd5}") + logger.debug("Building CodeFeatures for %s functions in %s", len(func_matches.keys()), filemd5) for addr, rules in func_matches.items(): func_name = extractor.token_cache.get_method(addr) comment = f"function {func_name} 0x{addr:08x}@{filemd5} with {len(rules)} features:\n" @@ -454,23 +479,24 @@ def get_code_features_for_dotnet_doc(doc: rd.ResultDocument, extractor): func_comment, sig, bytez = get_sig_and_mask_for_dotnet_func(dnpe, f.inner) comment += func_comment - code_features.append(CodeFeature(sig,comment,bytez,filemd5)) - + code_features.append(CodeFeature(sig, comment, bytez, filemd5)) if len(code_features) == 0: logger.warning("No code features found for %s", filemd5) return {filemd5: code_features} -######## CAPA Entrypoints ######## + +# CAPA Entrypoints + def run_capa_and_get_features(args): """Main CAPA analysis entrypoint - - This function kicks off CAPA analysis and builds CodeFeatures that + + This function kicks off CAPA analysis and builds CodeFeatures that will be used to build yara rules in the main thread. Args: - args: Tuple containing the following + args: Tuple containing the following - rules: CAPA rules loaded from a repo - sig_paths: Path to signatures used for library identification - format: Format for processing (dotnet or auto are the expected values) @@ -516,7 +542,7 @@ def run_capa_and_get_features(args): } meta = capa.main.collect_metadata([], path, format, os_, [], extractor) - logger.info(f"Collecting capabilities for {path}") + logger.info("Collecting capabilities for %s", path) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) meta.analysis.feature_counts = counts["feature_counts"] @@ -526,15 +552,15 @@ def run_capa_and_get_features(args): if capa.main.has_file_limitation(rules, capabilities): # bail if capa encountered file limitation e.g. a packed binary # do show the output in verbose mode, though. - return { - "path": path, - "status": "error", - "error": f"Encountered file limitation", - } + return { + "path": path, + "status": "error", + "error": "Encountered file limitation", + } try: doc = rd.ResultDocument.from_capa(meta, rules, capabilities) - logger.info(f"Building code features for {path}") + logger.info("Building code features for %s", path) if type(extractor) == DnfileFeatureExtractor: # Handle .NET files features = get_code_features_for_dotnet_doc(doc, extractor) @@ -552,13 +578,13 @@ def run_capa_and_get_features(args): def multi_process_capa(argv=None): """CAPA argument handler and multiprocessing manager - + This function processes CLI arguments and kicks of capa analysis and extacts CodeFeatures into a dictionary that maps filemd5s to a list of CodeFeatures that will be used to build yara rules Args: - argv: + argv: Returns: dict: dictionary mapping filemd5's processed to a list of CodeFeatures """ @@ -568,9 +594,7 @@ def multi_process_capa(argv=None): parser = argparse.ArgumentParser(description="Build YARA rules for CAPA matches") capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os"}) parser.add_argument("input", type=str, nargs="+", help="Path to directory or files to analyze") - parser.add_argument( - "-n", "--parallelism", type=int, default=multiprocessing.cpu_count(), help="parallelism factor" - ) + parser.add_argument("-n", "--parallelism", type=int, default=multiprocessing.cpu_count(), help="parallelism factor") parser.add_argument("--no-mp", action="store_true", help="disable subprocesses") args = parser.parse_args(args=argv) capa.main.handle_common_args(args) @@ -594,11 +618,11 @@ def multi_process_capa(argv=None): if not path.exists(): raise ValueError(f"Invalid path {p}") if path.is_dir(): - samples.extend([x for x in path.rglob("*")]) + for subpath in path.rglob("*"): + samples.append(subpath) elif path.is_file(): samples.append(path) logger.info("Starting to process %s files", len(samples)) - cpu_count = multiprocessing.cpu_count() @@ -633,23 +657,25 @@ def map(f, args, parallelism=None): parallelism=args.parallelism, ): if result["status"] == "error": - logger.warning(f'{result["path"]}: {result["error"]}') + logger.warning("%s: %s", result["path"], result["error"]) elif result["status"] == "ok": results.update(result["ok"]) else: - raise ValueError(f"unexpected status: {result['status']}") + raise ValueError("unexpected status: %s", result["status"]) - logger.info(f"Done processing {len(samples)} samples") + logger.info("Done processing %s samples", len(samples)) return results -######## YARA related functions ######## -CODE_FEATURES_REFERENCED = [] +# YARA related functions + +CODE_FEATURES_REFERENCED: List[CodeFeature] = [] + def build_rule_from_combo(combo_dict: dict, **kwargs): """Build a yaramod yara rule using a combination dictionary - + Args: combo_dict: Dictionary of features that all matched on a group of files Returns: @@ -659,15 +685,14 @@ def build_rule_from_combo(combo_dict: dict, **kwargs): # we're going to use this to create unique code features to insert the comment strings global CODE_FEATURES_REFERENCED - # Build metadata for the rule rule_name = "super_rule_" + "_".join([x[:5] for x in sorted(combo_dict["files"])]) - metadict = dict( - author=kwargs.get("author", "CAPA Matches"), - date_created=kwargs.get("date_created", date.today().isoformat()), - date_modified=kwargs.get("date_modified", date.today().isoformat()), - description=kwargs.get("description", ""), - ) + metadict = { + "author": kwargs.get("author", "CAPA Matches"), + "date_created": kwargs.get("date_created", date.today().isoformat()), + "date_modified": kwargs.get("date_modified", date.today().isoformat()), + "description": kwargs.get("description", ""), + } rule = yaramod.YaraRuleBuilder().with_name(rule_name) for metakey, metavalue in metadict.items(): @@ -680,30 +705,29 @@ def build_rule_from_combo(combo_dict: dict, **kwargs): rule = rule.with_string_meta("md5", hsh) conditions = [yaramod.of(yaramod.all(), yaramod.them())] - for codefeature in combo_dict['features']: + for codefeature in combo_dict["features"]: idx = len(CODE_FEATURES_REFERENCED) hexstr = yaramod.YaraHexStringBuilder() for byte in codefeature.sig.split(" "): if byte == "??": hexstr = hexstr.add(yaramod.wildcard()) - elif byte == '': + elif byte == "": continue else: hexstr = hexstr.add(yaramod.YaraHexStringBuilder(int(byte, 16))) rule = rule.with_hex_string(f"$c{idx}", hexstr.get()) CODE_FEATURES_REFERENCED.append(codefeature) - if len(conditions) == 1: # No fancy expression needed rule = rule.with_condition(conditions[0].get()) else: - rule = rule.with_condition( - yaramod.conjunction(conditions, linebreaks=True).get() - ) + rule = rule.with_condition(yaramod.conjunction(conditions, linebreaks=True).get()) return rule.get() -TAB_CHAR = " "*4 + +TAB_CHAR = " " * 4 + def replace_tabs_with_spaces(yara_text): """Replacing tabs with spaces in yara rule @@ -715,6 +739,7 @@ def replace_tabs_with_spaces(yara_text): """ return yara_text.replace("\t", TAB_CHAR) + def add_comments_to_yara_file(yara_text): """Add comments to yara file text @@ -729,11 +754,23 @@ def add_comments_to_yara_file(yara_text): # replace it with the comment search_str = f"$c{idx} =" comment_str = "/*\n" - comment_str += ("\n"+2*TAB_CHAR).join(feature.comment.split("\n")) - comment_str += "*/\n" + 2*TAB_CHAR + search_str + comment_str += ("\n" + 2 * TAB_CHAR).join(feature.comment.split("\n")) + comment_str += "*/\n" + 2 * TAB_CHAR + search_str yara_text = yara_text.replace(search_str, comment_str) return yara_text + +class SimilarityDictEntry: + """Simple object to hold information about a feature in a similarity dictionary""" + + values: List[CodeFeature] + files: Set[str] + + def __init__(self) -> None: + self.values = [] + self.files = set() + + def build_yara_ruleset(files_dict, **kwargs): """Build a YARA ruleset string based on CodeFeatures @@ -749,35 +786,30 @@ def build_yara_ruleset(files_dict, **kwargs): for filemd5, features in files_dict.items(): for value in features: if value.sig not in similarity_dict: - similarity_dict[value.sig] = { - "values":[value], - "files":set([value.filemd5]) - } - else: - similarity_dict[value.sig]['values'].append(value) - similarity_dict[value.sig]['files'].add(value.filemd5) + similarity_dict[value.sig] = SimilarityDictEntry() + similarity_dict[value.sig].values.append(value) + similarity_dict[value.sig].files.add(filemd5) # Next we build out a combodict and track which files have which combos of features - file_combinations = {} + file_combinations: Dict[str, dict] = {} for feature, result_dict in similarity_dict.items(): - sample_combo_key = ":".join(list(sorted(result_dict["files"]))) + logger.debug("Processing feature: %s", feature) + sample_combo_key = ":".join(sorted(result_dict.files)) + # logger.debug("Combo Key: %s", sample_combo_key) if sample_combo_key not in file_combinations: - file_combinations[sample_combo_key] = dict() - file_combinations[sample_combo_key]["files"] = sorted( - result_dict["files"] - ) + file_combinations[sample_combo_key] = {} + file_combinations[sample_combo_key]["files"] = sorted(result_dict.files) file_combinations[sample_combo_key]["feature_count"] = 0 file_combinations[sample_combo_key]["features"] = [] - # Use the full code feature from the alphabetical match - chosen_code_version = sorted(result_dict['values'], key=lambda x: x.filemd5)[0] - file_combinations[sample_combo_key]["features"].append( - chosen_code_version - ) + chosen_code_version = sorted(result_dict.values, key=lambda x: x.filemd5)[0] + file_combinations[sample_combo_key]["features"].append(chosen_code_version) + logger.warning("Adding %s to %s", chosen_code_version.sig, sample_combo_key) file_combinations[sample_combo_key]["feature_count"] += 1 # Create a list of combo keys and sort them so we get deterministic output combo_keys = sorted(file_combinations.keys(), key=lambda x: (len(x), x)) + logger.warning("Combo Keys: %s", combo_keys) # Build the YARA rule set based on the grouping yara_file = yaramod.YaraFileBuilder() @@ -785,9 +817,7 @@ def build_yara_ruleset(files_dict, **kwargs): for key in combo_keys: combo_dict = file_combinations[key] - rule = build_rule_from_combo( - combo_dict, **kwargs - ) + rule = build_rule_from_combo(combo_dict, **kwargs) if rule is not None: observed_files.extend(combo_dict["files"]) yara_file = yara_file.with_rule(rule) @@ -803,10 +833,10 @@ def build_yara_ruleset(files_dict, **kwargs): return yara_text - def main(argv=None): all_features = multi_process_capa(argv) print(build_yara_ruleset(all_features)) + if __name__ == "__main__": sys.exit(main()) diff --git a/tests/test_scripts.py b/tests/test_scripts.py index 91a1ae340..6b4e392d4 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -10,8 +10,8 @@ import logging import textwrap import subprocess -from datetime import date from pathlib import Path +from datetime import date import pytest @@ -27,8 +27,10 @@ def get_script_path(s: str): def get_file_path(): return str(CD / "data" / "9324d1a8ae37a36ae560c37448c9705a.exe_") + def get_data_path(p: str): - return str(CD / "data" / p ) + return str(CD / "data" / p) + def get_rules_path(): return str(CD / ".." / "rules") @@ -71,87 +73,80 @@ def test_bulk_process(tmp_path): p = run_program(get_script_path("bulk-process.py"), [str(t.parent)]) assert p.returncode == 0 + @pytest.mark.parametrize( "script,args,expected_output_path", [ # Test match-2-yar x86 EXE pytest.param( - "match-2-yar.py", - [ - get_data_path("9324d1a8ae37a36ae560c37448c9705a.exe_") - ], - "yara/expected_9324d1a8ae37a36ae560c37448c9705a.exe_.yar" + "match-2-yar.py", + [get_data_path("9324d1a8ae37a36ae560c37448c9705a.exe_")], + "yara/expected_9324d1a8ae37a36ae560c37448c9705a.exe_.yar", ), # Test match-2-yar x64 EXE pytest.param( - "match-2-yar.py", - [ - get_data_path("c2bb17c12975ea61ff43a71afd9c3ff111d018af161859abae0bdb0b3dae98f9.exe_") - ], - "yara/expected_c2bb17c12975e.yar" + "match-2-yar.py", + [get_data_path("c2bb17c12975ea61ff43a71afd9c3ff111d018af161859abae0bdb0b3dae98f9.exe_")], + "yara/expected_c2bb17c12975e.yar", ), # Test match-2-yar x86 .NET EXE pytest.param( - "match-2-yar.py", + "match-2-yar.py", [ - "-f", - "dotnet", + "-f", + "dotnet", get_data_path("dotnet/1c444ebeba24dcba8628b7dfe5fec7c6.exe_"), - - ], - "yara/expected_1c444ebeba24dcba8628b7dfe5fec7c6.exe_.yar" + ], + "yara/expected_1c444ebeba24dcba8628b7dfe5fec7c6.exe_.yar", ), # Test match-2-yar files with multiple X86 PEs pytest.param( - "match-2-yar.py", - [ + "match-2-yar.py", + [ get_data_path("Practical Malware Analysis Lab 03-04.exe_"), get_data_path("Practical Malware Analysis Lab 11-03.exe_"), - get_data_path("Practical Malware Analysis Lab 16-01.exe_") - ], - "yara/expected_pma_03-04.exe_11-03.exe_16-01.exe" + get_data_path("Practical Malware Analysis Lab 16-01.exe_"), + ], + "yara/expected_pma_03-04.exe_11-03.exe_16-01.exe", ), # Test match-2-yar files with CAPA file limitations are filtered out of multi sample pytest.param( - "match-2-yar.py", - [ + "match-2-yar.py", + [ get_data_path("Practical Malware Analysis Lab 01-01.exe_"), - get_data_path("Practical Malware Analysis Lab 01-02.exe_") - ], - "yara/expected_pma_01-01.exe_01-02.exe" + get_data_path("Practical Malware Analysis Lab 01-02.exe_"), + ], + "yara/expected_pma_01-01.exe_01-02.exe", ), - # Test match-2-yar multiple x86 .NET PE pytest.param( - "match-2-yar.py", + "match-2-yar.py", [ - "-f", - "dotnet", + "-f", + "dotnet", get_data_path("dotnet/1c444ebeba24dcba8628b7dfe5fec7c6.exe_"), get_data_path("dotnet/692f7fd6d198e804d6af98eb9e390d61.exe_"), - - ], - "yara/expected_1c444ebe_692f7fd6.yar" + ], + "yara/expected_1c444ebe_692f7fd6.yar", ), ], ) def test_script_expected_output(script, args, expected_output_path): script_path = get_script_path(script) - with open(get_data_path(expected_output_path), 'rb') as f: - expected_output = f.read() - + + expected_output = Path(get_data_path(expected_output_path)).read_bytes() # Update dates in expected output to be todays date dates_to_replace = [ b"2023-08-10", ] for dt in dates_to_replace: - expected_output = expected_output.replace(dt, date.today().isoformat().encode('utf8')) + expected_output = expected_output.replace(dt, date.today().isoformat().encode("utf8")) p = run_program(script_path, args) assert p.returncode == 0 - assert p.stdout.decode('utf8') == expected_output.decode('utf8') - + assert p.stdout.decode("utf8") == expected_output.decode("utf8") + def run_program(script_path, args): args = [sys.executable] + [script_path] + args @@ -283,3 +278,9 @@ def test_detect_duplicate_features(tmpdir): args = [rule_dir.strpath, rule_path] overlaps_found = run_program(script_path, args) assert overlaps_found.returncode == expected_overlaps + + +# Rough outline for function extract bytes, function length, function masking +# Use importlib to import the script +# Use fixtures vivisect to get a vivisect workspace for a given path +# We can use known functions from the yara matches to extract out length, bytes, and masked sig