Skip to content

Commit

Permalink
[FR] Support missing events (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikaayenson authored Oct 31, 2023
1 parent 9b4f682 commit 49d8d64
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 24 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

# Version 0.9.19

_Released 2023-10-10_
_Released 2023-10-31_

### Added

* Support for [missing events](https://www.elastic.co/guide/en/elasticsearch/reference/current/eql-syntax.html#eql-missing-events) feature used in Elasticsearch sequence queries
* Added IPv6 support for CidrMatch
* Removed the regex support for testing CidrMatch in favor of the native ipaddress module testing

Expand Down
19 changes: 12 additions & 7 deletions eql/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,30 +890,35 @@ def _render(self):
class SubqueryBy(EqlNode):
"""Node for holding the :class:`~EventQuery` and parameters to join on."""

__slots__ = 'query', 'join_values', 'fork',
__slots__ = 'query', 'join_values', 'data',

def __init__(self, query, join_values=None, fork=None):
def __init__(self, query, join_values=None, data=None):
"""Init.
:param EventQuery query: The event query enclosed in the term
:param list[Expression] join_values: The field to join values on
:param bool fork: Toggle for copying instead of moving a sequence on match
:param dict data: Fork (copying instead of moving a sequence on match) and is_negated params
"""
self.query = query
self.join_values = join_values or []
self.fork = fork
self.data = data

@property
def params(self):
"""Keep params for backwards compatibility."""
params = {}
if self.fork is not None:
params["fork"] = Boolean(self.fork)
if self.data is not None:
if "fork" in self.data:
params["fork"] = Boolean(self.data["fork"])
if "is_negated" in self.data:
params["is_negated"] = Boolean(self.data["is_negated"])
return NamedParams(params)

def _render(self):
text = "[{}]".format(self.query.render())
params = self.params.render()
param_copy = self.params
del param_copy.kv["is_negated"]
params = param_copy.render()
if len(params):
text += ' ' + params

Expand Down
35 changes: 25 additions & 10 deletions eql/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,42 +983,57 @@ def _convert_sequence_term(self, subquery, position, size, lookups, next_pipe=No
get_join_value = self._convert_key(subquery.join_values, scoped=True)
last_position = size - 1
fork = bool(subquery.params.kv.get('fork', Boolean(False)).value)
is_negated = bool(subquery.params.kv.get('is_negated', Boolean(False)).value)

if position == 0:
@self.event_callback(subquery.query.event_type)
def start_sequence_callback(event): # type: (Event) -> None
if check_event(event):
event_check = check_event(event)
if event_check or is_negated:
join_value = get_join_value(event)
sequence = [event]
lookups[1][join_value] = sequence
if event_check and not is_negated:
sequence = [event]
lookups[1][join_value] = sequence
elif is_negated and not event_check:
sequence = []
lookups[1][join_value] = sequence

elif position < last_position:
next_position = position + 1

@self.event_callback(subquery.query.event_type)
def continue_sequence_callback(event): # type: (Event) -> None
if len(lookups[position]) and check_event(event):
event_check = check_event(event)
if len(lookups[position]) and (check_event(event) or is_negated):
join_value = get_join_value(event)
if join_value in lookups[position]:
if fork:
sequence = list(lookups[position].get(join_value))
else:
sequence = lookups[position].pop(join_value)
sequence.append(event)
lookups[next_position][join_value] = sequence

if is_negated and not event_check:
lookups[next_position][join_value] = sequence
elif not is_negated:
sequence.append(event)
lookups[next_position][join_value] = sequence

else:
@self.event_callback(subquery.query.event_type)
def finish_sequence(event): # type: (Event) -> None
if len(lookups[position]) and check_event(event):
event_check = check_event(event)
if len(lookups[position]) and (check_event(event) or is_negated):
join_value = get_join_value(event)
if join_value in lookups[position]:
if fork:
sequence = list(lookups[position].get(join_value))
else:
sequence = lookups[position].pop(join_value)
sequence.append(event)
next_pipe(sequence)

if not is_negated or (is_negated and not event_check):
if not is_negated:
sequence.append(event)
next_pipe(sequence)

def _convert_sequence(self, node, next_pipe): # type: (Sequence, callable) -> callable
# Two lookups can help avoid unnecessary calls
Expand All @@ -1034,7 +1049,7 @@ def check_timeout(event): # type: (Event) -> None
minimum_start = event.time - max_span
for sub_lookup in lookups:
for join_key, sequence in list(sub_lookup.items()):
if sequence[0].time < minimum_start:
if sequence and sequence[0].time < minimum_start:
sub_lookup.pop(join_key)

if node.close:
Expand Down
3 changes: 2 additions & 1 deletion eql/etc/eql.g
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ time_range: number name?


subquery_by: subquery fork_param? join_values? repeated_sequence? sequence_alias?
subquery: "[" event_query "]"
subquery: ( "[" | MISSING_EVENT_OPEN ) event_query "]"
fork_param: "fork" (EQUALS boolean)?

// Expressions
Expand Down Expand Up @@ -107,6 +107,7 @@ escaped_name: ESCAPED_NAME
// sequence by pid [1] [true] looks identical to:
// sequence by pid[1] [true]
FIELD: FIELD_IDENT (ATTR | INDEX)+
MISSING_EVENT_OPEN: "!["
OPTIONAL_FIELD: "?" FIELD_IDENT (ATTR | INDEX)*
ATTR: "." WHITESPACE? FIELD_IDENT
INDEX: "[" WHITESPACE? UNSIGNED_INTEGER WHITESPACE? "]"
Expand Down
32 changes: 31 additions & 1 deletion eql/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
nullable_fields = ParserConfig(strict_fields=False)
non_nullable_fields = ParserConfig(strict_fields=True)
allow_enum_fields = ParserConfig(enable_enum=True)
allow_negation = ParserConfig(allow_negation=True)
allow_sample = ParserConfig(allow_sample=True)
allow_runs = ParserConfig(allow_runs=True)
elasticsearch_syntax = ParserConfig(elasticsearch_syntax=True)
Expand Down Expand Up @@ -151,6 +152,7 @@ def __init__(self, text):
self._allow_runs = ParserConfig.read_stack("allow_runs", False)
self._in_variable = False
self._allow_sample = ParserConfig.read_stack("allow_sample", False)
self._allow_negation = ParserConfig.read_stack("allow_negation", False)

@property
def lines(self):
Expand Down Expand Up @@ -1088,7 +1090,9 @@ def subquery_by(self, node, num_values=None, position=None, close=None, allow_fo
else:
join_values = []

node_info = NodeInfo(ast.SubqueryBy(query, [v.node for v in join_values], **kwargs), source=node)
node0 = self.visit(node["subquery"])[0]
kwargs["is_negated"] = True if hasattr(node0, "type") and node0.type == "MISSING_EVENT_OPEN" else False
node_info = NodeInfo(ast.SubqueryBy(query, [v.node for v in join_values], kwargs), source=node)

alias = node["sequence_alias"]
if alias is not None:
Expand All @@ -1108,10 +1112,15 @@ def join_values(self, node):
def join(self, node):
"""Callback function to walk the AST."""
queries, close = self._get_subqueries_and_close(node)
if self.negative_subquery_used:
raise self._error(node, "Negative subquery not permitted in join",
cls=EqlSemanticError)
return ast.Join(queries, close)

def _get_subqueries_and_close(self, node, allow_fork=False, allow_runs=False):
"""Helper function used by join and sequence to avoid duplicate code."""
self.negative_subquery_used = False

if not self._subqueries_enabled:
# Raise the error earlier (instead of waiting until subquery_by) so that it's more meaningful
raise self._error(node, "Subqueries not supported")
Expand All @@ -1120,6 +1129,8 @@ def _get_subqueries_and_close(self, node, allow_fork=False, allow_runs=False):
subquery_nodes = node.get_list("subquery_by")
first, first_info, first_runs_count = self.subquery_by(subquery_nodes[0], allow_fork=allow_fork,
position=0, allow_runs=allow_runs)
if first.node.data["is_negated"]:
self.negative_subquery_used = True

num_values = len(first_info)
subqueries = [(first, first_info)] * first_runs_count
Expand All @@ -1137,6 +1148,9 @@ def _get_subqueries_and_close(self, node, allow_fork=False, allow_runs=False):
for pos, subquery in enumerate(subquery_nodes[1:], 1):
subquery, join_values, runs_count = self.subquery_by(subquery, num_values=num_values, allow_fork=allow_fork,
position=pos, allow_runs=allow_runs)
if subquery.node.data["is_negated"]:
self.negative_subquery_used = True

multiple_subqueries = [(subquery, join_values)] * runs_count
subqueries.extend(multiple_subqueries)

Expand Down Expand Up @@ -1228,6 +1242,10 @@ def sample(self, node):
if len(queries) <= 1:
raise self._error(node, "Only one item in the sample",
cls=EqlSemanticError)

if self.negative_subquery_used:
raise self._error(node, "Negative subquery not permitted in sample",
cls=EqlSemanticError)
return ast.Sample(queries)

def sequence(self, node):
Expand All @@ -1241,8 +1259,20 @@ def sequence(self, node):
params = self.time_range(node['with_params']['time_range'])

allow_runs = self._elasticsearch_syntax and self._allow_runs
allow_negation = self._elasticsearch_syntax and self._allow_negation

queries, close = self._get_subqueries_and_close(node, allow_fork=True, allow_runs=allow_runs)

# Fail if negative operator used without exposing in elasticsearch syntax
if self.negative_subquery_used and not allow_negation:
raise self._error(node, "Negative subquery used ",
cls=EqlSemanticError if self._elasticsearch_syntax else EqlSyntaxError)

# Fail if any subquery uses the negative operator without maxspan
if not params and self.negative_subquery_used:
raise self._error(node, "Negative subquery used without maxspan",
cls=EqlSemanticError if self._elasticsearch_syntax else EqlSyntaxError)

if len(queries) <= 1 and not self._elasticsearch_syntax:
raise self._error(node, "Only one item in the sequence",
cls=EqlSemanticError if self._elasticsearch_syntax else EqlSyntaxError)
Expand Down
30 changes: 27 additions & 3 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from eql.ast import * # noqa: F403
from eql.errors import EqlSchemaError, EqlSyntaxError, EqlSemanticError, EqlTypeMismatchError, EqlParseError
from eql.parser import (
allow_sample, allow_runs, parse_query, parse_expression, parse_definitions, ignore_missing_functions, parse_field,
parse_literal, extract_query_terms, keywords, elasticsearch_syntax, elastic_endpoint_syntax,
elasticsearch_validate_optional_fields
allow_negation, allow_sample, allow_runs, parse_query, parse_expression, parse_definitions,
ignore_missing_functions, parse_field, parse_literal, extract_query_terms, keywords, elasticsearch_syntax,
elastic_endpoint_syntax, elasticsearch_validate_optional_fields
)
from eql.walkers import DepthFirstWalker
from eql.pipes import * # noqa: F403
Expand Down Expand Up @@ -324,6 +324,12 @@ def test_invalid_queries(self):

# bad sequence alias, without endpoint syntax
'sequence [process where process.name == "cmd.exe"] as a0 [network where a0.process.id == process.id]'

# sequence with negative missing events without maxspan
'sequence [process where true] ![file where true]',

# sequence with negative missing events without elasticsearch flag
'sequence with maxspan [process where true] ![file where true]',
]
for query in invalid:
self.assertRaises(EqlParseError, parse_query, query)
Expand Down Expand Up @@ -637,6 +643,20 @@ def test_elasticsearch_flag(self):
# invalid sample base query usage
self.assertRaises(EqlSemanticError, parse_query,
'sample by user [process where opcode == 1] [process where opcode == 1]')
self.assertRaises(EqlSemanticError, parse_query,
'sample by user [process where opcode == 1] ![process where opcode == 1]')

with elasticsearch_syntax, allow_negation:
parse_query('sequence with maxspan=2s [process where true] ![file where true]')
parse_query('sequence with maxspan=2s ![process where true] [file where true]')
parse_query('sequence with maxspan=2s [process where true] ![file where true] [file where true]')

self.assertRaises(EqlSemanticError, parse_query,
'sequence [process where true] [file where true] ![file where true]')
self.assertRaises(EqlSemanticError, parse_query,
'join ![process where true] [file where true] [file where true]')
self.assertRaises(EqlSemanticError, parse_query,
'sample ![process where true] [file where true] [file where true]')

with schema:
parse_query("process where process_name == 'cmd.exe'")
Expand Down Expand Up @@ -695,6 +715,7 @@ def test_elasticsearch_flag(self):
event1 = '[network where p0.process.name == process.name]'
event2 = '[network where p0.pid == 0]'
event3 = '[network where p0.badfield == 0]'
event4 = '!%s' % (event0)
parse_query('sequence %s as p0 %s' % (event0, event1))
parse_query('sequence by user.name %s as p0 %s' % (event0, event1))
parse_query('sequence with maxspan=1m %s by user.name as p0 %s by user.name' % (event0, event1))
Expand All @@ -703,6 +724,9 @@ def test_elasticsearch_flag(self):
self.assertRaises(EqlSchemaError, parse_query, 'sequence by user.name %s as p1 %s' % (event0, event3))
self.assertRaises(EqlSyntaxError, parse_query, "process where process_name == 'cmd.exe'")

# negative runs not supported on the endpoint
self.assertRaises(EqlSemanticError, parse_query, 'sequence %s %s' % (event0, event4))

# as fields not emmitted by the endpoint
self.assertRaises(EqlSyntaxError, parse_query, 'process where client.as.organization.name == "string"')
self.assertRaises(EqlSyntaxError, parse_query, 'process where destination.as.organization.name == "string"')
Expand Down
Loading

0 comments on commit 49d8d64

Please sign in to comment.