Skip to content

Commit

Permalink
Add working Python script up to S3 upload
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexRuiz7 committed Mar 14, 2024
1 parent 522e6e8 commit 50a34c9
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ input {
output {
stdout {
id => "output.stdout"
codec => rubydebug
codec => json_lines
}
file {
id => "output.file"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ input {
}

output {
pipe {
id => "securityLake"
message_format => "%{_source}"
ttl => "10"
command => "/env/bin/python3 /usr/share/logstash/amazon-security-lake/run.py -d"
# command => "/usr/share/logstash/amazon-security-lake/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --bucketname securitylake --s3endpoint s3.ninja:9000 --s3profile default"
}
stdout {
id => "output.stdout"
codec => json_lines
}
pipe {
id => "output.integrator"
ttl => "10"
command => "/env/bin/python3 /usr/share/logstash/amazon-security-lake/run.py"
# command => "/usr/share/logstash/amazon-security-lake/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --bucketname securitylake --s3endpoint s3.ninja:9000 --s3profile default"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ input {
output {
stdout {
id => "output.stdout"
codec => rubydebug
codec => json_lines
}
s3 {
id => "output.s3"
access_key_id => "${AWS_KEY}"
secret_access_key => "${AWS_SECRET}"
region => "${AWS_REGION}"
endpoint => "http://s3.ninja:9000"
bucket => "wazuh-indexer-amazon-security-lake-bucket"
bucket => "${AWS_BUCKET}"
codec => "json"
retry_count => 0
validate_credentials_on_root_bucket => false
Expand Down
308 changes: 105 additions & 203 deletions integrations/amazon-security-lake/src/run.py
Original file line number Diff line number Diff line change
@@ -1,227 +1,129 @@
#!/env/bin/python3.9
#!/env/bin/python3
# vim: bkc=yes bk wb

import os
import sys
import argparse
import logging
import time
import json
import os
import datetime
import boto3
import transform
from pyarrow import parquet, Table, fs


logging.basicConfig(
format='%(asctime)s %(message)s',
encoding='utf-8',
level=logging.DEBUG
)

BLOCK_ENDING = {"block_ending": True}


def create_arg_parser():
parser = argparse.ArgumentParser(
description='Wazuh\'s integrator module for Amazon Security Lake')
parser.add_argument(
'-d',
'--debug',
action='store_true',
help='Enable debug output'
)
parser.add_argument(
'-b',
'--bucketname',
type=str,
action='store',
help='S3 bucket name to write parquet files to'
)
parser.add_argument(
'-e',
'--s3endpoint',
type=str,
action='store',
default=None,
help='Hostname and port of the S3 destination (defaults to AWS\')'
)
parser.add_argument(
'-i',
'--pushinterval',
type=int,
action='store',
default=299,
help='Time interval in seconds for pushing data to Security Lake'
)
parser.add_argument(
'-l',
'--logoutput',
type=str,
default="/tmp/indexer-to-security-lake-data.txt",
help='File path of the destination file to write to'
)
parser.add_argument(
'-m',
'--maxlength',
type=int,
action='store',
default=2000,
help='Event number threshold for submission to Security Lake'
)
parser.add_argument(
'-n',
'--linebuffer',
type=int,
action='store',
default=100,
help='Stadndard input\'s line buffer length'
)
parser.add_argument(
'-p',
'--s3profile',
type=str,
action='store',
default='default',
help='AWS profile as stored in credentials file'
)
parser.add_argument(
'-s',
'--sleeptime',
type=int,
action='store',
default=5,
help='Input buffer polling interval'
)
return parser


def s3authenticate(profile, endpoint=None, scheme='https'):
session = boto3.session.Session(profile_name=profile)
credentials = session.get_credentials()

if endpoint != None:
scheme = 'http'

s3fs = fs.S3FileSystem(
endpoint_override=endpoint,
access_key=credentials.access_key,
secret_key=credentials.secret_key,
scheme=scheme
import pyarrow as pa
import pyarrow.parquet as pq
import logging
import boto3
from botocore.exceptions import ClientError

# NOTE work in progress
def upload_file(file_name, bucket, object_name=None):
"""Upload a file to an S3 bucket
:param file_name: File to upload
:param bucket: Bucket to upload to
:param object_name: S3 object name. If not specified then file_name is used
:return: True if file was uploaded, else False
"""

# session = boto3.Session(
# aws_access_key_id=os.environ['AWS_KEY'],
# aws_secret_access_key=os.environ['AWS_SECRET']
# )
s3 = boto3.resource(
's3',
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
region_name=os.environ['AWS_REGION']
)

return s3fs

# If S3 object_name was not specified, use file_name
if object_name is None:
object_name = os.path.basename(file_name)

def encode_parquet(list, bucketname, filename, filesystem):
# Upload the file
# s3_client = boto3.client('s3')
try:
table = Table.from_pylist(list)
parquet.write_table(
table,
'{}/{}'.format(bucketname, filename), filesystem=filesystem
)
except Exception as e:
# s3_client.upload_file(file_name, bucket, object_name)
s3.Bucket(os.environ['AWS_BUCKET']).upload_file()
except ClientError as e:
logging.error(e)
raise


def map_block(fileobject, length):
output = []
ocsf_event = {}
for line in range(0, length):
line = fileobject.readline()
if line == '':
output.append(BLOCK_ENDING)
break
# alert = json.loads(line)
# ocsf_event = converter.convert(alert)

event = transform.converter.from_json(line)
print(event)
ocsf_event = transform.converter.to_detection_finding(event)

output.append(ocsf_event)
return output


def timedelta(reference_timestamp):
current_time = datetime.datetime.now(datetime.timezone.utc)
return (current_time - reference_timestamp).total_seconds()


def utctime():
return datetime.datetime.now(datetime.timezone.utc)
return False
return True


def main():
'''Main function'''
# Get current timestamp
timestamp = datetime.datetime.now(datetime.timezone.utc).isoformat()

# Generate filenames
filename_raw = f"/tmp/integrator-raw-{timestamp}.json"
filename_ocsf = f"/tmp/integrator-ocsf-{timestamp}.json"
filename_ocsf = f"/tmp/integrator-ocsf-{timestamp}.json"
filename_parquet = f"/tmp/integrator-ocsf-{timestamp}.parquet"

# 1. Extract data
# ================
raw_data = []
for line in sys.stdin:
raw_data.append(line)

# Echo piped data
with open(filename_raw, "a") as fd:
fd.write(line)

# 2. Transform data
# ================
# a. Transform to OCSF
ocsf_data = []
for line in raw_data:
try:
# raw_event = json.loads(line)
event = transform.converter.from_json(line)
# ocsf_event = transform.converter.to_detection_finding(event)
ocsf_event = transform.converter.to_detection_finding(event)
ocsf_data.append(ocsf_event.model_dump())

# Temporal disk storage
with open(filename_ocsf, "a") as fd:
fd.write(str(ocsf_event) + "\n")
except AttributeError as e:
print("Error transforming line to OCSF")
print(event)
print(e)

if __name__ == "__main__":
# b. Encode as Parquet
try:
args = create_arg_parser().parse_args()
logging.info('BUFFERING STDIN')

with os.fdopen(sys.stdin.fileno(), 'rt') as stdin:
output_buffer = []
loop_start_time = utctime()

try:
s3fs = s3authenticate(args.s3profile, args.s3endpoint)
while True:
table = pa.Table.from_pylist(ocsf_data)
pq.write_table(table, filename_parquet)
# parquet.Parquet.to_file(table, filename_parquet)
except AttributeError as e:
print("Error encoding data to parquet")
print(e)

current_block = map_block(stdin, args.linebuffer)

if current_block[-1] == BLOCK_ENDING:
output_buffer += current_block[0:-1]
time.sleep(args.sleeptime)
else:
output_buffer += current_block

buffer_length = len(output_buffer)

if buffer_length == 0:
continue

elapsed_seconds = timedelta(loop_start_time)

if buffer_length > args.maxlength or elapsed_seconds > args.pushinterval:
logging.info(
'Elapsed seconds: {}'.format(elapsed_seconds))
loop_start_time = utctime()
timestamp = loop_start_time.strftime('%F_%H.%M.%S')
filename = 'wazuh-{}.parquet'.format(timestamp)
logging.info(
'Writing data to s3://{}/{}'.format(args.bucketname, filename))
encode_parquet(
output_buffer, args.bucketname, filename, s3fs)
output_buffer = []

except KeyboardInterrupt:
logging.info("Keyboard Interrupt issued")
exit(0)

logging.info('FINISHED RETRIEVING STDIN')

except Exception as e:
logging.error("Error running script")
logging.error(e)
raise
# 3. Load data
# ================
# parquet.parquet.Parquet.to_s3(table, filename_parquet)
upload_file(filename_parquet, os.environ['AWS_BUCKET'])


def _test():
ocsf_event = {}
with open("./wazuh-event.sample.json", "r") as fd:
# Load from file descriptor
raw_event = json.load(fd)
try:
event = transform.converter.from_json(raw_event)
print(event)
ocsf_event = transform.converter.to_detection_finding(event)
print("")
print("--")
print("")
print(ocsf_event)
for raw_event in fd:
try:
event = transform.converter.from_json(raw_event)
print("")
print("-- Wazuh event Pydantic model")
print("")
print(event.model_dump())
ocsf_event = transform.converter.to_detection_finding(event)
print("")
print("-- Converted to OCSF")
print("")
print(ocsf_event.model_dump())

except KeyError as e:
raise (e)
except KeyError as e:
raise (e)


if __name__ == '__main__':
_test()
main()
# _test()
Loading

0 comments on commit 50a34c9

Please sign in to comment.