From 6b9a05b8419f8e9d4c925baa2b72c8f021c5e795 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Mon, 30 Oct 2023 13:17:12 -0400 Subject: [PATCH] Moved python2 checks to utils --- CHANGELOG.md | 1 + eql/functions.py | 42 +++++++++--------------------------------- eql/utils.py | 17 +++++++++++++++++ 3 files changed, 27 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b77f503..c434214 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Added * Added IPv6 support for CidrMatch +* Removed the regex support for testing CidrMatch in favor of the native ipaddress module testing # Version 0.9.18 diff --git a/eql/functions.py b/eql/functions.py index 0c89fed..4d6a6d7 100644 --- a/eql/functions.py +++ b/eql/functions.py @@ -1,12 +1,11 @@ """EQL functions.""" -import ipaddress import re -import sys from .errors import EqlError from .signatures import SignatureMixin from .types import TypeHint -from .utils import fold_case, is_insensitive, is_number, is_string, to_unicode +from .utils import (fold_case, get_ipaddress, get_subnet, is_insensitive, + is_number, is_string, to_unicode) _registry = {} REGEX_FLAGS = re.UNICODE | re.DOTALL @@ -194,18 +193,11 @@ class CidrMatch(FunctionSignature): @classmethod def get_callback(cls, _, *cidr_matches): """Get the callback function with all the masks converted.""" - # Python 2 support - if sys.version_info.major == 2: - cidr_networks = [ipaddress.ip_network(cidr.value.decode("utf-8"), strict=False) for cidr in cidr_matches] # noqa: F821 - else: - cidr_networks = [ipaddress.ip_network(cidr.value, strict=False) for cidr in cidr_matches] + cidr_networks = [get_subnet(cidr.value) for cidr in cidr_matches] def callback(source, *_): if is_string(source): - # Python 2 support - if sys.version_info.major == 2: - source = source.decode("utf-8") # noqa: F821 - ip_address = ipaddress.ip_address(source) + ip_address = get_ipaddress(source) for subnet in cidr_networks: if ip_address in subnet: @@ -219,18 +211,11 @@ def callback(source, *_): def run(cls, ip_address, *cidr_matches): """Compare an IP address against a list of cidr blocks.""" if is_string(ip_address): - # Python 2 support - if sys.version_info.major == 2: - ip_address = ip_address.decode("utf-8") # noqa: F821 - ip_address = ipaddress.ip_address(ip_address) + ip_address = get_ipaddress(ip_address) for cidr in cidr_matches: if is_string(cidr): - # Python 2 support - if sys.version_info.major == 2: - subnet = ipaddress.ip_network(cidr.decode("utf-8"), strict=False) # noqa: F821 - else: - subnet = ipaddress.ip_network(cidr, strict=False) + subnet = get_subnet(cidr) if ip_address in subnet: return True @@ -243,11 +228,7 @@ def is_cidr(cls, cidr): if "/" not in cidr: return False try: - # Python 2 support - if sys.version_info.major == 2: - ipaddress.ip_network(cidr.decode("utf-8"), strict=False) # noqa: F821 - else: - ipaddress.ip_network(cidr, strict=False) + get_subnet(cidr) return True except ValueError: return False @@ -272,13 +253,8 @@ def validate(cls, arguments): return pos # Since it does match, we should also rewrite the string to align to the base of the subnet - - ip_address, size = text.split("/") - # Python 2 support - if sys.version_info.major == 2: - subnet = ipaddress.ip_network(text.decode("utf-8"), strict=False) # noqa: F821 - else: - subnet = ipaddress.ip_network(text, strict=False) + _, size = text.split("/") + subnet = get_subnet(text) subnet_base = subnet.network_address # overwrite the original argument so it becomes the subnet diff --git a/eql/utils.py b/eql/utils.py index 3066520..4d5cb6d 100644 --- a/eql/utils.py +++ b/eql/utils.py @@ -2,6 +2,7 @@ import codecs import gzip import io +import ipaddress import json import os import sys @@ -11,6 +12,9 @@ CASE_INSENSITIVE = True _loaded_plugins = False +# Var to check if Python2 or Python3 +py_version = sys.version_info.major + # Python2 and Python3 compatible type checking unicode_t = type(u"") long_t = type(int(1e100)) @@ -78,6 +82,19 @@ def str_presenter(dumper, data): return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') return dumper.represent_scalar('tag:yaml.org,2002:str', data) +def get_ipaddress(ipaddr_string): + """Get an ipaddress ip_address object from a string containing an ipaddress""" + if py_version == 2: + ipaddr_string = ipaddr_string.decode("utf-8") # noqa: F821 + return ipaddress.ip_address(ipaddr_string) + + +def get_subnet(cidr_string): + """Get an ipaddress ip_network object from a string containing an cidr range""" + if py_version == 2: + cidr_string = cidr_string.decode("utf-8") # noqa: F821 + return ipaddress.ip_network(cidr_string, strict=False) + def get_type_converter(items): """Get a python callback function that can convert None to observed typed values."""