diff --git a/integrations/amazon-security-lake/stdin_to_securitylake.py b/integrations/amazon-security-lake/stdin_to_securitylake.py index 51f1ab81cb9c6..f6931bcc51eb1 100755 --- a/integrations/amazon-security-lake/stdin_to_securitylake.py +++ b/integrations/amazon-security-lake/stdin_to_securitylake.py @@ -7,15 +7,36 @@ import time import json import datetime -from pyarrow import parquet, Table +import boto3 +from pyarrow import parquet, Table, fs from ocsf import converter block_ending = { "block_ending": True } + +def s3authenticate(profile, endpoint=None, scheme='https'): + session = boto3.session.Session(profile_name=profile) + credentials = session.get_credentials() + + if endpoint != None: + scheme='http' + + s3fs = fs.S3FileSystem( + endpoint_override=endpoint, + access_key=credentials.access_key, + secret_key=credentials.secret_key, + scheme=scheme) + + return s3fs + + + + + def encode_parquet(list,foldername,filename): try: table = Table.from_pylist(list) - parquet.write_table(table, '{}/{}.parquet'.format(foldername,filename)) + parquet.write_table(table, '{}.parquet'.format(filename), filesystem=localS3) except Exception as e: logging.error(e) raise @@ -41,11 +62,13 @@ def get_elapsedseconds(reference_timestamp): date = datetime.datetime.now(datetime.timezone.utc).strftime('%F_%H.%M.%S') parser = argparse.ArgumentParser(description='STDIN to Security Lake pipeline') parser.add_argument('-d','--debug', action='store_true', help='Activate debugging') + parser.add_argument('-e','--s3endpoint', type=str, action='store', default=None, help='Hostname and port of the S3 destination (defaults to AWS\')') parser.add_argument('-i','--pushinterval', type=int, action='store', default=299, help='Time interval in seconds for pushing data to Security Lake') parser.add_argument('-l','--logoutput', type=str, default="/tmp/stdintosecuritylake.txt", help='File path of the destination file to write to') parser.add_argument('-m','--maxlength', type=int, action='store', default=2000, help='Event number threshold for submission to Security Lake') parser.add_argument('-n','--linebuffer', type=int, action='store', default=100, help='stdin line buffer length') parser.add_argument('-o','--outputfolder', type=str, action='store', help='Folder or S3 bucket URL to dump parquet files to') + parser.add_argument('-p','--s3profile', type=str, action='store', default='default', help='AWS profile as stored in credentials file') parser.add_argument('-s','--sleeptime', type=int, action='store', default=5, help='Input buffer polling interval') args = parser.parse_args() #logging.basicConfig(format='%(asctime)s %(message)s', filename=args.logoutput, encoding='utf-8', level=logging.DEBUG) @@ -74,6 +97,7 @@ def get_elapsedseconds(reference_timestamp): if len(output_buffer) > args.maxlength or get_elapsedseconds(starttimestamp) > args.pushinterval: logging.info('Writing data to parquet file') + s3fs = s3authenticate(args.s3profile,args.s3endpoint) encode_parquet(output_buffer,args.outputfolder,'wazuh-{}'.format(date)) starttimestamp = datetime.datetime.now(datetime.timezone.utc) output_buffer = [] @@ -88,3 +112,5 @@ def get_elapsedseconds(reference_timestamp): logging.error("Error running script") logging.error(e) raise + +