From 0f2c2c3db96a91897bba846ed9de32c53e85a39a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Ruiz?= Date: Fri, 26 Apr 2024 17:39:18 +0200 Subject: [PATCH] Save intermediate OCSF files to an S3 bucket (#218) --- integrations/README.md | 2 -- integrations/amazon-security-lake/Dockerfile | 2 +- .../amazon-security-lake/invoke-lambda.sh | 6 ++--- .../logstash/pipeline/indexer-to-s3.conf | 2 +- .../src/lambda_function.py | 26 ++++++++++++++----- integrations/docker/amazon-security-lake.yml | 5 ++-- 6 files changed, 27 insertions(+), 16 deletions(-) diff --git a/integrations/README.md b/integrations/README.md index e141452d7a8b5..8e2c9b968c4e9 100644 --- a/integrations/README.md +++ b/integrations/README.md @@ -43,8 +43,6 @@ After 5 minutes, the first batch of data will show up in http://localhost:9444/u You'll need to invoke the Lambda function manually, selecting the log file to process. ```bash -export AUX_BUCKET=wazuh-indexer-aux-bucket - bash amazon-security-lake/src/invoke-lambda.sh ``` diff --git a/integrations/amazon-security-lake/Dockerfile b/integrations/amazon-security-lake/Dockerfile index 566434ef76798..41fc87679734b 100644 --- a/integrations/amazon-security-lake/Dockerfile +++ b/integrations/amazon-security-lake/Dockerfile @@ -19,7 +19,7 @@ ENV LOGSTASH_KEYSTORE_PASS="SecretPassword" # Add the application source code. COPY --chown=logstash:logstash ./src /home/app # Add execution persmissions. -RUN chmod a+x /home/app/run.py +RUN chmod a+x /home/app/lambda_function.py # Copy the application's dependencies. COPY --from=builder /env /env diff --git a/integrations/amazon-security-lake/invoke-lambda.sh b/integrations/amazon-security-lake/invoke-lambda.sh index c4545ef12874b..7d2c379faae12 100644 --- a/integrations/amazon-security-lake/invoke-lambda.sh +++ b/integrations/amazon-security-lake/invoke-lambda.sh @@ -1,6 +1,6 @@ #!/bin/bash -export AUX_BUCKET=wazuh-indexer-aux-bucket +export S3_BUCKET_RAW=wazuh-aws-security-lake-raw curl -X POST "http://localhost:9000/2015-03-31/functions/function/invocations" -d '{ "Records": [ @@ -24,11 +24,11 @@ curl -X POST "http://localhost:9000/2015-03-31/functions/function/invocations" - "s3SchemaVersion": "1.0", "configurationId": "testConfigRule", "bucket": { - "name": "'"${AUX_BUCKET}"'", + "name": "'"${S3_BUCKET_RAW}"'", "ownerIdentity": { "principalId":"A3NL1KOZZKExample" }, - "arn": "'"arn:aws:s3:::${AUX_BUCKET}"'" + "arn": "'"arn:aws:s3:::${S3_BUCKET_RAW}"'" }, "object": { "key": "'"${1}"'", diff --git a/integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf b/integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf index 831ced288edf5..a2446b4d9406e 100644 --- a/integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf +++ b/integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf @@ -30,7 +30,7 @@ output { secret_access_key => "${AWS_SECRET_ACCESS_KEY}" region => "${AWS_REGION}" endpoint => "${AWS_ENDPOINT}" - bucket => "${AUX_BUCKET}" + bucket => "${S3_BUCKET_RAW}" codec => "json_lines" retry_count => 0 validate_credentials_on_root_bucket => false diff --git a/integrations/amazon-security-lake/src/lambda_function.py b/integrations/amazon-security-lake/src/lambda_function.py index 57b4e5d3d92bd..5313418a4580b 100644 --- a/integrations/amazon-security-lake/src/lambda_function.py +++ b/integrations/amazon-security-lake/src/lambda_function.py @@ -85,7 +85,7 @@ def check_environment_variables(variables): return True -def get_full_key(src_location: str, account_id: str, region: str, key: str) -> str: +def get_full_key(src_location: str, account_id: str, region: str, key: str, format: str) -> str: """ Constructs a full S3 key path for storing a Parquet file based on event metadata. @@ -94,6 +94,7 @@ def get_full_key(src_location: str, account_id: str, region: str, key: str) -> s account_id (str): AWS account ID associated with the event. region (str): AWS region where the event occurred. key (str): Event key containing metadata information. + format (str): File extension. Returns: str: Full S3 key path for storing the Parquet file. @@ -110,12 +111,12 @@ def get_full_key(src_location: str, account_id: str, region: str, key: str) -> s filename_parts = key.split('.') filename = ''.join(filename_parts[2].split('-')) - # Construct the full S3 key path for storing the Parquet file - parquet_key = ( - f'ext/{src_location}/region={region}/accountId={account_id}/eventDay={event_day}/{filename}.parquet' + # Construct the full S3 key path for storing the file + key = ( + f'ext/{src_location}/region={region}/accountId={account_id}/eventDay={event_day}/{filename}.{format}' ) - return parquet_key + return key def lambda_handler(event, context): @@ -135,6 +136,7 @@ def lambda_handler(event, context): src_location = os.environ['SOURCE_LOCATION'] account_id = os.environ['ACCOUNT_ID'] region = os.environ['AWS_REGION'] + ocsf_bucket = os.environ.get('S3_BUCKET_OCSF') # Extract bucket and key from S3 event src_bucket = event['Records'][0]['s3']['bucket']['name'] @@ -150,12 +152,21 @@ def lambda_handler(event, context): # Transform events to OCSF format ocsf_events = wazuh_ocsf_converter.transform_events(raw_events) + # Upload event in OCSF format + ocsf_upload_success = False + if ocsf_bucket is not None: + tmp_filename = '/tmp/tmp.json' + with open(tmp_filename, "w") as fd: + fd.write(json.dumps(ocsf_events)) + ocsf_key = get_full_key(src_location, account_id, region, key, 'json') + ocsf_upload_success = upload_to_s3(ocsf_bucket, ocsf_key, tmp_filename) + # Write OCSF events to Parquet file tmp_filename = '/tmp/tmp.parquet' write_parquet_file(ocsf_events, tmp_filename) # Upload Parquet file to destination S3 bucket - parquet_key = get_full_key(src_location, account_id, region, key) + parquet_key = get_full_key(src_location, account_id, region, key, 'parquet') upload_success = upload_to_s3(dst_bucket, parquet_key, tmp_filename) # Clean up temporary file @@ -164,6 +175,7 @@ def lambda_handler(event, context): # Prepare response response = { 'size': len(raw_events), - 'upload_success': upload_success + 'upload_success': upload_success, + 'ocsf_upload_success': ocsf_upload_success } return json.dumps(response) diff --git a/integrations/docker/amazon-security-lake.yml b/integrations/docker/amazon-security-lake.yml index 2e48961d20b2b..0645829df12c7 100644 --- a/integrations/docker/amazon-security-lake.yml +++ b/integrations/docker/amazon-security-lake.yml @@ -83,7 +83,7 @@ services: AWS_ACCESS_KEY_ID: "AKIAIOSFODNN7EXAMPLE" AWS_SECRET_ACCESS_KEY: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" AWS_REGION: "us-east-1" - AUX_BUCKET: "indexer-amazon-security-lake-bucket" + S3_BUCKET_RAW: "wazuh-aws-security-lake-raw" AWS_ENDPOINT: "http://s3.ninja:9000" ports: - "5000:5000/tcp" @@ -117,7 +117,8 @@ services: AWS_ACCESS_KEY_ID: "AKIAIOSFODNN7EXAMPLE" AWS_SECRET_ACCESS_KEY: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" AWS_REGION: "us-east-1" - AWS_BUCKET: "wazuh-indexer-amazon-security-lake-bucket" + AWS_BUCKET: "wazuh-aws-security-lake-parquet" + S3_BUCKET_OCSF: "wazuh-aws-security-lake-ocsf" AWS_ENDPOINT: "http://s3.ninja:9000" SOURCE_LOCATION: "wazuh" ACCOUNT_ID: "111111111111"