diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py index e52a60d99..eeff1abeb 100644 --- a/capa/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -23,7 +23,10 @@ # The number of calls that make up a sequence. -SEQUENCE_SIZE = 5 +# +# The larger this is, the more calls are grouped together to match rule logic. +# This means a longer chain can be recognized; however, its a bit more expensive. +SEQUENCE_SIZE = 20 @dataclass @@ -69,7 +72,8 @@ def find_thread_capabilities( ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle ) -> ThreadCapabilities: """ - find matches for the given rules within the given thread. + find matches for the given rules within the given thread, + which includes matches for all the sequences and calls within it. """ # all features found within this thread, # includes features found within calls. @@ -82,8 +86,18 @@ def find_thread_capabilities( # matches found at the sequence scope. sequence_matches: MatchResults = collections.defaultdict(list) + # We matches sequences as the sliding window of calls with size SEQUENCE_SIZE. + # + # For each call, we consider the window of SEQUENCE_SIZE calls leading up to it, + # merging all their features and doing a match. + # Here's the primary data structure: a deque of those features found in the prior calls. + # We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed. sequence: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE) + # the names of rules matched at the last sequence, + # so that we can deduplicate long strings of the same matche. + last_sequence_matches: set[str] = set() + for ch in extractor.get_calls(ph, th): call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch) for feature, vas in call_capabilities.features.items(): @@ -92,7 +106,13 @@ def find_thread_capabilities( for rule_name, res in call_capabilities.matches.items(): call_matches[rule_name].extend(res) + # + # sequence scope matching + # + # as we add items to the end of the deque, the oldest items will overflow and get dropped. sequence.append(call_capabilities.features) + # collect all the features seen across the last SEQUENCE_SIZE calls, + # and match against them. sequence_features: FeatureSet = collections.defaultdict(set) for call in sequence: for feature, vas in call.items(): @@ -100,8 +120,21 @@ def find_thread_capabilities( _, smatches = ruleset.match(Scope.SEQUENCE, sequence_features, ch.address) for rule_name, res in smatches.items(): + if rule_name in last_sequence_matches: + # don't emit match results for rules seen during the immediately preceeding sequence. + # + # This means that we won't emit duplicate matches when there are multiple sequences + # that overlap a single matching event. + # It also handles the case of a tight loop containing matched logic; + # only the first match will be recorded. + # + # In theory, this means the result document doesn't have *every* possible match location, + # but in practice, humans will only be interested in the first handful anyways. + continue sequence_matches[rule_name].extend(res) + last_sequence_matches = set(smatches.keys()) + for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()): features[feature].add(va) diff --git a/tests/test_dynamic_sequence_scope.py b/tests/test_dynamic_sequence_scope.py index d1f42f338..5e43227bc 100644 --- a/tests/test_dynamic_sequence_scope.py +++ b/tests/test_dynamic_sequence_scope.py @@ -134,7 +134,7 @@ def test_dynamic_sequence_scope(): assert 12 in get_call_ids(capabilities.matches[r.name]) -# show the sequence is only 5 calls long, and doesn't match beyond that 5-tuple. +# show that when the sequence is only 5 calls long (for example), it doesn't match beyond that 5-tuple. # # proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052) # thread: 3064 @@ -168,7 +168,13 @@ def test_dynamic_sequence_scope2(): r = capa.rules.Rule.from_yaml(rule) ruleset = capa.rules.RuleSet([r]) - capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + # patch SEQUENCE_SIZE since we may use a much larger value in the real world. + from pytest import MonkeyPatch + + with MonkeyPatch.context() as m: + m.setattr(capa.capabilities.dynamic, "SEQUENCE_SIZE", 5) + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + assert r.name not in capabilities.matches @@ -215,7 +221,6 @@ def test_dynamic_sequence_example(): # show how sequences that overlap a single event are handled. -# TODO(williballenthin): but I think we really just want one match for this, not copies of the same thing. # # proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052) # thread: 3064 @@ -252,4 +257,5 @@ def test_dynamic_sequence_multiple_sequences_overlapping_single_event(): capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) assert r.name in capabilities.matches - assert [11, 12, 13, 14, 15] == list(get_call_ids(capabilities.matches[r.name])) + # we only match the first overlapping sequence + assert [11] == list(get_call_ids(capabilities.matches[r.name]))