Skip to content

Commit

Permalink
Moved python2 checks to utils
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-forte-elastic committed Oct 30, 2023
1 parent df3fda4 commit 6b9a05b
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Added

* Added IPv6 support for CidrMatch
* Removed the regex support for testing CidrMatch in favor of the native ipaddress module testing

# Version 0.9.18

Expand Down
42 changes: 9 additions & 33 deletions eql/functions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
"""EQL functions."""
import ipaddress
import re
import sys

from .errors import EqlError
from .signatures import SignatureMixin
from .types import TypeHint
from .utils import fold_case, is_insensitive, is_number, is_string, to_unicode
from .utils import (fold_case, get_ipaddress, get_subnet, is_insensitive,
is_number, is_string, to_unicode)

_registry = {}
REGEX_FLAGS = re.UNICODE | re.DOTALL
Expand Down Expand Up @@ -194,18 +193,11 @@ class CidrMatch(FunctionSignature):
@classmethod
def get_callback(cls, _, *cidr_matches):
"""Get the callback function with all the masks converted."""
# Python 2 support
if sys.version_info.major == 2:
cidr_networks = [ipaddress.ip_network(cidr.value.decode("utf-8"), strict=False) for cidr in cidr_matches] # noqa: F821
else:
cidr_networks = [ipaddress.ip_network(cidr.value, strict=False) for cidr in cidr_matches]
cidr_networks = [get_subnet(cidr.value) for cidr in cidr_matches]

def callback(source, *_):
if is_string(source):
# Python 2 support
if sys.version_info.major == 2:
source = source.decode("utf-8") # noqa: F821
ip_address = ipaddress.ip_address(source)
ip_address = get_ipaddress(source)

for subnet in cidr_networks:
if ip_address in subnet:
Expand All @@ -219,18 +211,11 @@ def callback(source, *_):
def run(cls, ip_address, *cidr_matches):
"""Compare an IP address against a list of cidr blocks."""
if is_string(ip_address):
# Python 2 support
if sys.version_info.major == 2:
ip_address = ip_address.decode("utf-8") # noqa: F821
ip_address = ipaddress.ip_address(ip_address)
ip_address = get_ipaddress(ip_address)

for cidr in cidr_matches:
if is_string(cidr):
# Python 2 support
if sys.version_info.major == 2:
subnet = ipaddress.ip_network(cidr.decode("utf-8"), strict=False) # noqa: F821
else:
subnet = ipaddress.ip_network(cidr, strict=False)
subnet = get_subnet(cidr)

if ip_address in subnet:
return True
Expand All @@ -243,11 +228,7 @@ def is_cidr(cls, cidr):
if "/" not in cidr:
return False
try:
# Python 2 support
if sys.version_info.major == 2:
ipaddress.ip_network(cidr.decode("utf-8"), strict=False) # noqa: F821
else:
ipaddress.ip_network(cidr, strict=False)
get_subnet(cidr)
return True
except ValueError:
return False
Expand All @@ -272,13 +253,8 @@ def validate(cls, arguments):
return pos

# Since it does match, we should also rewrite the string to align to the base of the subnet

ip_address, size = text.split("/")
# Python 2 support
if sys.version_info.major == 2:
subnet = ipaddress.ip_network(text.decode("utf-8"), strict=False) # noqa: F821
else:
subnet = ipaddress.ip_network(text, strict=False)
_, size = text.split("/")
subnet = get_subnet(text)
subnet_base = subnet.network_address

# overwrite the original argument so it becomes the subnet
Expand Down
17 changes: 17 additions & 0 deletions eql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import codecs
import gzip
import io
import ipaddress
import json
import os
import sys
Expand All @@ -11,6 +12,9 @@
CASE_INSENSITIVE = True
_loaded_plugins = False

# Var to check if Python2 or Python3
py_version = sys.version_info.major

# Python2 and Python3 compatible type checking
unicode_t = type(u"")
long_t = type(int(1e100))
Expand Down Expand Up @@ -78,6 +82,19 @@ def str_presenter(dumper, data):
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)

def get_ipaddress(ipaddr_string):
"""Get an ipaddress ip_address object from a string containing an ipaddress"""
if py_version == 2:
ipaddr_string = ipaddr_string.decode("utf-8") # noqa: F821
return ipaddress.ip_address(ipaddr_string)


def get_subnet(cidr_string):
"""Get an ipaddress ip_network object from a string containing an cidr range"""
if py_version == 2:
cidr_string = cidr_string.decode("utf-8") # noqa: F821
return ipaddress.ip_network(cidr_string, strict=False)


def get_type_converter(items):
"""Get a python callback function that can convert None to observed typed values."""
Expand Down

0 comments on commit 6b9a05b

Please sign in to comment.