Skip to content

Commit

Permalink
Add function to handle local s3 buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
f-galland committed Feb 27, 2024
1 parent b9b21a8 commit 81ecb7c
Showing 1 changed file with 28 additions and 2 deletions.
30 changes: 28 additions & 2 deletions integrations/amazon-security-lake/stdin_to_securitylake.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,36 @@
import time
import json
import datetime
from pyarrow import parquet, Table
import boto3
from pyarrow import parquet, Table, fs
from ocsf import converter

block_ending = { "block_ending": True }


def s3authenticate(profile, endpoint=None, scheme='https'):
session = boto3.session.Session(profile_name=profile)
credentials = session.get_credentials()

if endpoint != None:
scheme='http'

s3fs = fs.S3FileSystem(
endpoint_override=endpoint,
access_key=credentials.access_key,
secret_key=credentials.secret_key,
scheme=scheme)

return s3fs





def encode_parquet(list,foldername,filename):
try:
table = Table.from_pylist(list)
parquet.write_table(table, '{}/{}.parquet'.format(foldername,filename))
parquet.write_table(table, '{}.parquet'.format(filename), filesystem=localS3)
except Exception as e:
logging.error(e)
raise
Expand All @@ -41,11 +62,13 @@ def get_elapsedseconds(reference_timestamp):
date = datetime.datetime.now(datetime.timezone.utc).strftime('%F_%H.%M.%S')
parser = argparse.ArgumentParser(description='STDIN to Security Lake pipeline')
parser.add_argument('-d','--debug', action='store_true', help='Activate debugging')
parser.add_argument('-e','--s3endpoint', type=str, action='store', default=None, help='Hostname and port of the S3 destination (defaults to AWS\')')
parser.add_argument('-i','--pushinterval', type=int, action='store', default=299, help='Time interval in seconds for pushing data to Security Lake')
parser.add_argument('-l','--logoutput', type=str, default="/tmp/stdintosecuritylake.txt", help='File path of the destination file to write to')
parser.add_argument('-m','--maxlength', type=int, action='store', default=2000, help='Event number threshold for submission to Security Lake')
parser.add_argument('-n','--linebuffer', type=int, action='store', default=100, help='stdin line buffer length')
parser.add_argument('-o','--outputfolder', type=str, action='store', help='Folder or S3 bucket URL to dump parquet files to')
parser.add_argument('-p','--s3profile', type=str, action='store', default='default', help='AWS profile as stored in credentials file')
parser.add_argument('-s','--sleeptime', type=int, action='store', default=5, help='Input buffer polling interval')
args = parser.parse_args()
#logging.basicConfig(format='%(asctime)s %(message)s', filename=args.logoutput, encoding='utf-8', level=logging.DEBUG)
Expand Down Expand Up @@ -74,6 +97,7 @@ def get_elapsedseconds(reference_timestamp):

if len(output_buffer) > args.maxlength or get_elapsedseconds(starttimestamp) > args.pushinterval:
logging.info('Writing data to parquet file')
s3fs = s3authenticate(args.s3profile,args.s3endpoint)
encode_parquet(output_buffer,args.outputfolder,'wazuh-{}'.format(date))
starttimestamp = datetime.datetime.now(datetime.timezone.utc)
output_buffer = []
Expand All @@ -88,3 +112,5 @@ def get_elapsedseconds(reference_timestamp):
logging.error("Error running script")
logging.error(e)
raise


0 comments on commit 81ecb7c

Please sign in to comment.