diff --git a/integrations/amazon-security-lake/src/invoke-lambda.sh b/integrations/amazon-security-lake/src/invoke-lambda.sh index b07e26b370536..340025dbf207e 100644 --- a/integrations/amazon-security-lake/src/invoke-lambda.sh +++ b/integrations/amazon-security-lake/src/invoke-lambda.sh @@ -22,14 +22,14 @@ curl -X POST "http://localhost:9000/2015-03-31/functions/function/invocations" - "s3SchemaVersion": "1.0", "configurationId": "testConfigRule", "bucket": { - "name": "wazuh-indexer-amazon-security-lake-bucket", + "name": "wazuh-indexer-aux-bucket", "ownerIdentity": { "principalId":"A3NL1KOZZKExample" }, - "arn": "arn:aws:s3:::wazuh-indexer-amazon-security-lake-bucket" + "arn": "arn:aws:s3:::wazuh-indexer-aux-bucket" }, "object": { - "key": "2024/04/11/ls.s3.f6e2a1b2-4ea5-47b6-be32-3a6746a48187.2024-04-11T17.10.part14.txt", + "key": "2024/04/16/ls.s3.0906d6d6-e4ca-4db8-b445-b3c572425ee1.2024-04-16T09.15.part3.txt", "size": 1024, "eTag":"d41d8cd98f00b204e9800998ecf8427e", "versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko" diff --git a/integrations/amazon-security-lake/src/run.py b/integrations/amazon-security-lake/src/run.py index ec516754af496..8043afb6335e6 100644 --- a/integrations/amazon-security-lake/src/run.py +++ b/integrations/amazon-security-lake/src/run.py @@ -123,23 +123,14 @@ def _test(): main() # _test() -def lambda_handler(event, context): - logging.basicConfig(filename='lambda.log', encoding='utf-8', level=logging.DEBUG) - print(event) - print(context) - - # Constants : bucket names - # src_bucket = "wazuh-indexer-amazon-security-lake-bucket" - dst_bucket = "final-bucket" - - # Variables : object name -> uploaded log file - # - From https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html#with-s3-example-create-function - src_bucket = event['Records'][0]['s3']['bucket']['name'] - key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') - logging.info(f"Lambda function invoked due to {key}.") - logging.info(f"Source bucket name is {src_bucket}. Destination bucket is {dst_bucket}.") - - # boto3 client setup +# ================================================ # +# AWS LAMBDA method +# ================================================ # + +def init_aws_client(): + ''' + boto3 client setup + ''' logging.info("Initializing boto3 client.") client = boto3.client( service_name='s3', @@ -149,37 +140,82 @@ def lambda_handler(event, context): endpoint_url='http://s3.ninja:9000', ) logging.info("boto3 client initialized.") + return client - # Read from bucket A + +def get_events(bucket: str, key: str): + ''' + ''' logging.info(f"Reading {key}.") response = client.get_object( - Bucket=src_bucket, + Bucket=bucket, Key=key ) data = response['Body'].read().decode('utf-8') - raw_events = data.splitlines() + return data.splitlines() - # Transform data - logging.info("Transforming data.") + +def transform_events_to_ocsf(events): + ''' + ''' + logging.info("Transforming Wazuh security events to OCSF.") ocsf_events = [] - for line in raw_events: + for line in events: try: + # Validate event using a model event = transform.converter.from_json(line) - ocsf_event = transform.converter.to_detection_finding(event) - ocsf_events.append(ocsf_event.model_dump()) - - # Temporal disk storage - with open('tmp.json', "a") as fd: - fd.write(str(ocsf_event) + "\n") + # Transform to OCSF + ocsf_event = transform.converter.to_detection_finding(event).model_dump() + # Append + ocsf_events.append(ocsf_event) except AttributeError as e: - print("Error transforming line to OCSF") - print(event) - print(e) + logging.error("Error transforming line to OCSF") + logging.error(event) + logging.error(e) + return ocsf_events + + +def to_parquet(ocsf_events): + ''' + ''' table = pa.Table.from_pylist(ocsf_events) - pq.write_table(table, 'tmp.parquet') - # Upload to bucket B + # Write to file. + to_parquet_file(table) + + +def to_parquet_file(pa_table, filename='tmp'): + ''' + Write data to file. + ''' + pq.write_table(pa_table, f'{filename}.parquet', compression='ZSTD') + + +# "Initialize SDK clients and database connections outside of the function handler" +client = init_aws_client() + +def lambda_handler(event, context): + logging.basicConfig(filename='lambda.log', encoding='utf-8', level=logging.DEBUG) + + # Variables: get the object name and bucket name from the event + # - From https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html#with-s3-example-create-function + src_bucket = event['Records'][0]['s3']['bucket']['name'] + key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') + dst_bucket = os.environ['AWS_BUCKET'] + logging.info(f"Lambda function invoked due to {key}.") + logging.info(f"Source bucket name is {src_bucket}. Destination bucket is {dst_bucket}.") + + # Read events from the source (aux) bucket + raw_events = get_events(src_bucket, key) + + # Transform data + ocsf_events = transform_events_to_ocsf(raw_events) + + # Encode events as parquet + to_parquet(ocsf_events) + + # Upload to destination bucket B logging.info(f"Uploading data to {dst_bucket}.") response = client.put_object( Bucket=dst_bucket, diff --git a/integrations/docker/amazon-security-lake.yml b/integrations/docker/amazon-security-lake.yml index 1c4affd1e1de6..34b36a148cce8 100644 --- a/integrations/docker/amazon-security-lake.yml +++ b/integrations/docker/amazon-security-lake.yml @@ -83,7 +83,8 @@ services: AWS_ACCESS_KEY_ID: "AKIAIOSFODNN7EXAMPLE" AWS_SECRET_ACCESS_KEY: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" AWS_REGION: "us-east-1" - AWS_BUCKET: "wazuh-indexer-amazon-security-lake-bucket" + AWS_BUCKET: "wazuh-indexer-aux-bucket" + ASL_BUCKET: "wazuh-indexer-amazon-security-lake-bucket" ports: - "5000:5000/tcp" - "5000:5000/udp"