From b099086f173c452941f318a9f860132f29f1c870 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Ruiz?= Date: Wed, 24 Apr 2024 09:32:05 +0200 Subject: [PATCH] Add Lambda function for the Amazon Security Lake integration (#189) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Migrate from #147 * Update amazon-security-lake integration - Improved documentation. - Python code has been moved to `wazuh-indexer/integrations/amazon-security-lake/src`. - Development environment now uses OpenSearch 2.12.0. - The `wazuh.integration.security.lake` container now displays logs, by watching logstash's log file. - [**NEEDS FIX**] As a temporary solution, the `INDEXER_USERNAME` and `INDEXER_PASSWORD` values have been added as an environment variable to the `wazuh.integration.security.lake` container. These values should be set at Dockerfile level, but isn't working, probably due to permission denied on invocation of the `setup.sh` script. - [**NEEDS FIX**] As a temporary solution, the output file of the `indexer-to-file` pipeline as been moved to `/var/log/logstash/indexer-to-file`. Previous path `/usr/share/logstash/pipeline/indexer-to-file.json` results in permission denied. - [**NEEDS FIX**] As a temporary solution, the input.opensearch.query has been replaced with `match_all`, as the previous one does not return any data, probably to the use of time filters `gt: now-1m`. - Standard output enable for `/usr/share/logstash/pipeline/indexer-to-file.json`. - [**NEEDS FIX**] ECS compatibility disabled: `echo "pipeline.ecs_compatibility: disabled" >> /etc/logstash/logstash.yml` -- to be included automatically - Python3 environment path added to the `indexer-to-integrator` pipeline. * Disable ECS compatibility (auto) - Adds pipeline.ecs_compatibility: disabled at Dockerfile level. - Removes `INDEXER_USERNAME` and `INDEXER_PASSWORD` as environment variables on the `wazuh.integration.security.lake` container. * Add @timestamp field to sample alerts * Fix Logstash pipelines * Add working indexer-to-s3 pipeline * Add working Python script up to S3 upload * Add latest changes * Remove duplicated line * Add working environment with minimal AWS lambda function * Mount src folder to Lambda's workdir * Add first functional lambda function Tested on local environment, using S3 Ninja and a Lambda container * Working state * Add documentation * Improve code * Improve code * Clean up * Add instructions to build a deployment package * Make zip file lighter * Use default name for aws_region * Add destination bucket validation * Add env var validation and full destination S3 path * Add AWS_ENDPOINT environment variable * Rename AWS_DEFAULT_REGION * Remove unused env vars * Remove unused file and improve documentation a bit. * Makefile improvements * Use dummy env variables --------- Signed-off-by: Álex Ruiz --- .gitignore | 6 + docker/dev/dev.yml | 2 +- integrations/README.md | 81 ++++++--- integrations/amazon-security-lake/Makefile | 28 +++ .../aws-lambda.dockerfile | 17 ++ .../amazon-security-lake/invoke-lambda.sh | 42 +++++ .../pipeline/indexer-to-integrator.conf | 33 ---- .../logstash/pipeline/indexer-to-s3.conf | 16 +- .../amazon-security-lake/requirements.aws.txt | 2 + .../amazon-security-lake/requirements.txt | 2 +- .../src/lambda_function.py | 169 ++++++++++++++++++ .../src/models/__init__.py | 2 + .../src/{transform => }/models/ocsf.py | 0 .../src/{transform => }/models/wazuh.py | 0 .../src/parquet/__init__.py | 1 - .../src/parquet/parquet.py | 20 --- .../amazon-security-lake/src/parquet/test.py | 10 -- integrations/amazon-security-lake/src/run.py | 122 ------------- .../src/transform/__init__.py | 1 - .../src/transform/converter.py | 105 ----------- .../src/transform/models/__init__.py | 2 - .../src/wazuh-event.sample.json | 1 - .../src/wazuh_ocsf_converter.py | 124 +++++++++++++ integrations/docker/amazon-security-lake.yml | 24 ++- 24 files changed, 481 insertions(+), 329 deletions(-) create mode 100644 integrations/amazon-security-lake/Makefile create mode 100644 integrations/amazon-security-lake/aws-lambda.dockerfile create mode 100644 integrations/amazon-security-lake/invoke-lambda.sh delete mode 100644 integrations/amazon-security-lake/logstash/pipeline/indexer-to-integrator.conf create mode 100644 integrations/amazon-security-lake/requirements.aws.txt create mode 100644 integrations/amazon-security-lake/src/lambda_function.py create mode 100644 integrations/amazon-security-lake/src/models/__init__.py rename integrations/amazon-security-lake/src/{transform => }/models/ocsf.py (100%) rename integrations/amazon-security-lake/src/{transform => }/models/wazuh.py (100%) delete mode 100644 integrations/amazon-security-lake/src/parquet/__init__.py delete mode 100644 integrations/amazon-security-lake/src/parquet/parquet.py delete mode 100644 integrations/amazon-security-lake/src/parquet/test.py delete mode 100644 integrations/amazon-security-lake/src/run.py delete mode 100644 integrations/amazon-security-lake/src/transform/__init__.py delete mode 100644 integrations/amazon-security-lake/src/transform/converter.py delete mode 100644 integrations/amazon-security-lake/src/transform/models/__init__.py delete mode 100644 integrations/amazon-security-lake/src/wazuh-event.sample.json create mode 100644 integrations/amazon-security-lake/src/wazuh_ocsf_converter.py diff --git a/.gitignore b/.gitignore index b0d5249dd325f..2790d2cfb49d1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,11 @@ # build files artifacts/ +*.deb +*.rpm +*.zip +*.tar.gz + +integrations/amazon-security-lake/package .java .m2 diff --git a/docker/dev/dev.yml b/docker/dev/dev.yml index 7176b044df5ba..cc55b0737dc84 100644 --- a/docker/dev/dev.yml +++ b/docker/dev/dev.yml @@ -5,7 +5,7 @@ services: image: wi-dev:${VERSION} container_name: wi-dev_${VERSION} build: - context: ./../.. + context: ${REPO_PATH} dockerfile: ${REPO_PATH}/docker/dev/images/Dockerfile ports: # OpenSearch REST API diff --git a/integrations/README.md b/integrations/README.md index c6183013ab5a0..e141452d7a8b5 100644 --- a/integrations/README.md +++ b/integrations/README.md @@ -1,18 +1,18 @@ ## Wazuh indexer integrations -This folder contains integrations with third-party XDR, SIEM and cybersecurity software. +This folder contains integrations with third-party XDR, SIEM and cybersecurity software. The goal is to transport Wazuh's analysis to the platform that suits your needs. ### Amazon Security Lake -Amazon Security Lake automatically centralizes security data from AWS environments, SaaS providers, -on premises, and cloud sources into a purpose-built data lake stored in your account. With Security Lake, -you can get a more complete understanding of your security data across your entire organization. You can -also improve the protection of your workloads, applications, and data. Security Lake has adopted the -Open Cybersecurity Schema Framework (OCSF), an open standard. With OCSF support, the service normalizes +Amazon Security Lake automatically centralizes security data from AWS environments, SaaS providers, +on premises, and cloud sources into a purpose-built data lake stored in your account. With Security Lake, +you can get a more complete understanding of your security data across your entire organization. You can +also improve the protection of your workloads, applications, and data. Security Lake has adopted the +Open Cybersecurity Schema Framework (OCSF), an open standard. With OCSF support, the service normalizes and combines security data from AWS and a broad range of enterprise security data sources. -##### Usage +#### Development guide A demo of the integration can be started using the content of this folder and Docker. @@ -20,32 +20,44 @@ A demo of the integration can be started using the content of this folder and Do docker compose -f ./docker/amazon-security-lake.yml up -d ``` -This docker compose project will bring a *wazuh-indexer* node, a *wazuh-dashboard* node, -a *logstash* node and our event generator. On the one hand, the event generator will push events -constantly to the indexer, on the `wazuh-alerts-4.x-sample` index by default (refer to the [events -generator](./tools/events-generator/README.md) documentation for customization options). -On the other hand, logstash will constantly query for new data and deliver it to the integration -Python program, also present in that node. Finally, the integration module will prepare and send the -data to the Amazon Security Lake's S3 bucket. +This docker compose project will bring a _wazuh-indexer_ node, a _wazuh-dashboard_ node, +a _logstash_ node, our event generator and an AWS Lambda Python container. On the one hand, the event generator will push events +constantly to the indexer, to the `wazuh-alerts-4.x-sample` index by default (refer to the [events +generator](./tools/events-generator/README.md) documentation for customization options). +On the other hand, logstash will constantly query for new data and deliver it to output configured in the +pipeline, which can be one of `indexer-to-s3` or `indexer-to-file`. + +The `indexer-to-s3` pipeline is the method used by the integration. This pipeline delivers +the data to an S3 bucket, from which the data is processed using a Lambda function, to finally +be sent to the Amazon Security Lake bucket in Parquet format. + Attach a terminal to the container and start the integration by starting logstash, as follows: ```console -/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-integrator.conf --path.settings /etc/logstash +/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-s3.conf --path.settings /etc/logstash ``` -Unprocessed data can be sent to a file or to an S3 bucket. -```console -/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-file.conf --path.settings /etc/logstash -/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-s3.conf --path.settings /etc/logstash +After 5 minutes, the first batch of data will show up in http://localhost:9444/ui/wazuh-indexer-aux-bucket. +You'll need to invoke the Lambda function manually, selecting the log file to process. + +```bash +export AUX_BUCKET=wazuh-indexer-aux-bucket + +bash amazon-security-lake/src/invoke-lambda.sh ``` -All three pipelines are configured to fetch the latest data from the *wazuh-indexer* every minute. In -the case of `indexer-to-file`, the data is written at the same pace, whereas `indexer-to-s3`, data -is uploaded every 5 minutes. +Processed data will be uploaded to http://localhost:9444/ui/wazuh-indexer-amazon-security-lake-bucket. Click on any file to download it, +and check it's content using `parquet-tools`. Just make sure of installing the virtual environment first, through [requirements.txt](./amazon-security-lake/). -For development or debugging purposes, you may want to enable hot-reload, test or debug on these files, +```bash +parquet-tools show +``` + +Bucket names can be configured editing the [amazon-security-lake.yml](./docker/amazon-security-lake.yml) file. + +For development or debugging purposes, you may want to enable hot-reload, test or debug on these files, by using the `--config.reload.automatic`, `--config.test_and_exit` or `--debug` flags, respectively. For production usage, follow the instructions in our documentation page about this matter. @@ -53,6 +65,29 @@ For production usage, follow the instructions in our documentation page about th As a last note, we would like to point out that we also use this Docker environment for development. +#### Deployment guide + +- Create one S3 bucket to store the raw events, for example: `wazuh-security-lake-integration` +- Create a new AWS Lambda function + - Create an IAM role with access to the S3 bucket created above. + - Select Python 3.12 as the runtime + - Configure the runtime to have 512 MB of memory and 30 seconds timeout + - Configure an S3 trigger so every created object in the bucket with `.txt` extension invokes the Lambda. + - Run `make` to generate a zip deployment package, or create it manually as per the [AWS Lambda documentation](https://docs.aws.amazon.com/lambda/latest/dg/python-package.html#python-package-create-dependencies). + - Upload the zip package to the bucket. Then, upload it to the Lambda from the S3 as per these instructions: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-package.html#gettingstarted-package-zip +- Create a Custom Source within Security Lake for the Wazuh Parquet files as per the following guide: https://docs.aws.amazon.com/security-lake/latest/userguide/custom-sources.html +- Set the **AWS account ID** for the Custom Source **AWS account with permission to write data**. + + + + +The instructions on this section have been based on the following AWS tutorials and documentation. + +- [Tutorial: Using an Amazon S3 trigger to create thumbnail images](https://docs.aws.amazon.com/lambda/latest/dg/with-s3-tutorial.html) +- [Tutorial: Using an Amazon S3 trigger to invoke a Lambda function](https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html) +- [Working with .zip file archives for Python Lambda functions](https://docs.aws.amazon.com/lambda/latest/dg/python-package.html) +- [Best practices for working with AWS Lambda functions](https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html) + ### Other integrations TBD diff --git a/integrations/amazon-security-lake/Makefile b/integrations/amazon-security-lake/Makefile new file mode 100644 index 0000000000000..9a6dd674b37e7 --- /dev/null +++ b/integrations/amazon-security-lake/Makefile @@ -0,0 +1,28 @@ + +ZIP_NAME = wazuh_to_amazon_security_lake +TARGET = package +SRC = src + +# Main target +.ONESHELL: +$(ZIP_NAME).zip: $(TARGET) $(SRC)/lambda_function.py $(SRC)/wazuh_ocsf_converter.py + @cd $(TARGET) + @zip -r ../$(ZIP_NAME).zip . + @cd ../$(SRC) + @zip ../$@ lambda_function.py wazuh_ocsf_converter.py + @zip ../$@ models -r + +$(TARGET): + docker run -v `pwd`:/src -w /src \ + python:3.12 \ + pip install \ + --platform manylinux2014_x86_64 \ + --target=$(TARGET) \ + --implementation cp \ + --python-version 3.12 \ + --only-binary=:all: --upgrade \ + -r requirements.aws.txt + +clean: + @rm -rf $(TARGET) + @py3clean . \ No newline at end of file diff --git a/integrations/amazon-security-lake/aws-lambda.dockerfile b/integrations/amazon-security-lake/aws-lambda.dockerfile new file mode 100644 index 0000000000000..7039c2b935de8 --- /dev/null +++ b/integrations/amazon-security-lake/aws-lambda.dockerfile @@ -0,0 +1,17 @@ +# docker build --platform linux/amd64 --no-cache -f aws-lambda.dockerfile -t docker-image:test . +# docker run --platform linux/amd64 -p 9000:8080 docker-image:test + +# FROM public.ecr.aws/lambda/python:3.9 +FROM amazon/aws-lambda-python:3.12 + +# Copy requirements.txt +COPY requirements.aws.txt ${LAMBDA_TASK_ROOT} + +# Install the specified packages +RUN pip install -r requirements.aws.txt + +# Copy function code +COPY src ${LAMBDA_TASK_ROOT} + +# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile) +CMD [ "lambda_function.lambda_handler" ] \ No newline at end of file diff --git a/integrations/amazon-security-lake/invoke-lambda.sh b/integrations/amazon-security-lake/invoke-lambda.sh new file mode 100644 index 0000000000000..c4545ef12874b --- /dev/null +++ b/integrations/amazon-security-lake/invoke-lambda.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +export AUX_BUCKET=wazuh-indexer-aux-bucket + +curl -X POST "http://localhost:9000/2015-03-31/functions/function/invocations" -d '{ + "Records": [ + { + "eventVersion": "2.0", + "eventSource": "aws:s3", + "awsRegion": "us-east-1", + "eventTime": "1970-01-01T00:00:00.000Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "AIDAJDPLRKLG7UEXAMPLE" + }, + "requestParameters":{ + "sourceIPAddress":"127.0.0.1" + }, + "responseElements":{ + "x-amz-request-id":"C3D13FE58DE4C810", + "x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "testConfigRule", + "bucket": { + "name": "'"${AUX_BUCKET}"'", + "ownerIdentity": { + "principalId":"A3NL1KOZZKExample" + }, + "arn": "'"arn:aws:s3:::${AUX_BUCKET}"'" + }, + "object": { + "key": "'"${1}"'", + "size": 1024, + "eTag":"d41d8cd98f00b204e9800998ecf8427e", + "versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko" + } + } + } + ] +}' \ No newline at end of file diff --git a/integrations/amazon-security-lake/logstash/pipeline/indexer-to-integrator.conf b/integrations/amazon-security-lake/logstash/pipeline/indexer-to-integrator.conf deleted file mode 100644 index afd7712413ddf..0000000000000 --- a/integrations/amazon-security-lake/logstash/pipeline/indexer-to-integrator.conf +++ /dev/null @@ -1,33 +0,0 @@ -input { - opensearch { - hosts => ["wazuh.indexer:9200"] - user => "${INDEXER_USERNAME}" - password => "${INDEXER_PASSWORD}" - ssl => true - ca_file => "/usr/share/logstash/root-ca.pem" - index => "wazuh-alerts-4.x-*" - query => '{ - "query": { - "range": { - "@timestamp": { - "gt": "now-1m" - } - } - } - }' - schedule => "* * * * *" - } -} - -output { - stdout { - id => "output.stdout" - codec => json_lines - } - pipe { - id => "output.integrator" - ttl => "10" - command => "/env/bin/python3 /usr/share/logstash/amazon-security-lake/run.py" - # command => "/usr/share/logstash/amazon-security-lake/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --bucketname securitylake --s3endpoint s3.ninja:9000 --s3profile default" - } -} diff --git a/integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf b/integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf index 35aed294cc794..831ced288edf5 100644 --- a/integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf +++ b/integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf @@ -10,12 +10,12 @@ input { "query": { "range": { "@timestamp": { - "gt": "now-1m" + "gt": "now-5m" } } } }' - schedule => "5/* * * * *" + schedule => "*/5 * * * *" } } @@ -26,15 +26,15 @@ output { } s3 { id => "output.s3" - access_key_id => "${AWS_KEY}" - secret_access_key => "${AWS_SECRET}" + access_key_id => "${AWS_ACCESS_KEY_ID}" + secret_access_key => "${AWS_SECRET_ACCESS_KEY}" region => "${AWS_REGION}" - endpoint => "http://s3.ninja:9000" - bucket => "${AWS_BUCKET}" - codec => "json" + endpoint => "${AWS_ENDPOINT}" + bucket => "${AUX_BUCKET}" + codec => "json_lines" retry_count => 0 validate_credentials_on_root_bucket => false - prefix => "%{+YYYY}/%{+MM}/%{+dd}" + prefix => "%{+YYYY}%{+MM}%{+dd}" server_side_encryption => true server_side_encryption_algorithm => "AES256" additional_settings => { diff --git a/integrations/amazon-security-lake/requirements.aws.txt b/integrations/amazon-security-lake/requirements.aws.txt new file mode 100644 index 0000000000000..ea911617dede4 --- /dev/null +++ b/integrations/amazon-security-lake/requirements.aws.txt @@ -0,0 +1,2 @@ +pyarrow>=10.0.1 +pydantic>=2.6.1 \ No newline at end of file diff --git a/integrations/amazon-security-lake/requirements.txt b/integrations/amazon-security-lake/requirements.txt index 69acf806ed3fe..7d14ea9fb1b10 100644 --- a/integrations/amazon-security-lake/requirements.txt +++ b/integrations/amazon-security-lake/requirements.txt @@ -1,4 +1,4 @@ pyarrow>=10.0.1 parquet-tools>=0.2.15 -pydantic==2.6.1 +pydantic>=2.6.1 boto3==1.34.46 \ No newline at end of file diff --git a/integrations/amazon-security-lake/src/lambda_function.py b/integrations/amazon-security-lake/src/lambda_function.py new file mode 100644 index 0000000000000..57b4e5d3d92bd --- /dev/null +++ b/integrations/amazon-security-lake/src/lambda_function.py @@ -0,0 +1,169 @@ +import logging +import os +import urllib.parse +import json +import boto3 +import pyarrow as pa +import pyarrow.parquet as pq +from botocore.exceptions import ClientError +import wazuh_ocsf_converter + +# Initialize boto3 client outside the handler +if os.environ.get('IS_DEV'): + s3_client = boto3.client( + service_name='s3', + aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'), + aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'), + region_name=os.environ.get('AWS_REGION'), + endpoint_url=os.environ.get('AWS_ENDPOINT'), + ) +else: + s3_client = boto3.client('s3') + + +def get_events(bucket: str, key: str) -> list: + """ + Retrieve events from S3 object. + """ + logging.info(f"Reading {key}.") + try: + response = s3_client.get_object(Bucket=bucket, Key=key) + data = response['Body'].read().decode('utf-8') + return data.splitlines() + except ClientError as e: + logging.error( + f"Failed to read S3 object {key} from bucket {bucket}: {e}") + return [] + + +def write_parquet_file(ocsf_events: list, filename: str) -> None: + """ + Write OCSF events to a Parquet file. + """ + table = pa.Table.from_pydict({'events': ocsf_events}) + pq.write_table(table, filename, compression='ZSTD') + + +def upload_to_s3(bucket: str, key: str, filename: str) -> bool: + """ + Upload a file to S3 bucket. + """ + logging.info(f"Uploading data to {bucket}.") + try: + with open(filename, 'rb') as data: + s3_client.put_object(Bucket=bucket, Key=key, Body=data) + return True + except ClientError as e: + logging.error( + f"Failed to upload file {filename} to bucket {bucket}: {e}") + return False + + +def exit_on_error(error_message): + """ + Print error message and exit with non-zero status code. + Args: + error_message (str): Error message to display. + """ + print(f"Error: {error_message}") + exit(1) + + +def check_environment_variables(variables): + """ + Check if required environment variables are set. + Args: + variables (list): List of required environment variable names. + Returns: + bool: True if all required environment variables are set, False otherwise. + """ + missing_variables = [var for var in variables if not os.environ.get(var)] + if missing_variables: + error_message = f"The following environment variables are not set: {', '.join(missing_variables)}" + exit_on_error(error_message) + return False + return True + + +def get_full_key(src_location: str, account_id: str, region: str, key: str) -> str: + """ + Constructs a full S3 key path for storing a Parquet file based on event metadata. + + Args: + src_location (str): Source location identifier. + account_id (str): AWS account ID associated with the event. + region (str): AWS region where the event occurred. + key (str): Event key containing metadata information. + + Returns: + str: Full S3 key path for storing the Parquet file. + + Example: + If key is '20240417_ls.s3.0055f22e-200e-4259-b865-8ccea05812be.2024-04-17T15.45.part29.txt', + this function will return: + 'ext/src_location/region=region/accountId=account_id/eventDay=20240417/0055f22e200e4259b8658ccea05812be.parquet' + """ + # Extract event day from the key (first 8 characters) + event_day = key[:8] + + # Extract filename (UUID) from the key and remove hyphens + filename_parts = key.split('.') + filename = ''.join(filename_parts[2].split('-')) + + # Construct the full S3 key path for storing the Parquet file + parquet_key = ( + f'ext/{src_location}/region={region}/accountId={account_id}/eventDay={event_day}/{filename}.parquet' + ) + + return parquet_key + + +def lambda_handler(event, context): + logging.basicConfig(filename='/tmp/lambda.log', + encoding='utf-8', level=logging.DEBUG) + + # Define required environment variables + required_variables = ['AWS_BUCKET', + 'SOURCE_LOCATION', 'ACCOUNT_ID', 'AWS_REGION'] + + # Check if all required environment variables are set + if not check_environment_variables(required_variables): + return + + # Retrieve environment variables + dst_bucket = os.environ['AWS_BUCKET'] + src_location = os.environ['SOURCE_LOCATION'] + account_id = os.environ['ACCOUNT_ID'] + region = os.environ['AWS_REGION'] + + # Extract bucket and key from S3 event + src_bucket = event['Records'][0]['s3']['bucket']['name'] + key = urllib.parse.unquote_plus( + event['Records'][0]['s3']['object']['key'], encoding='utf-8') + logging.info(f"Lambda function invoked due to {key}.") + logging.info( + f"Source bucket name is {src_bucket}. Destination bucket is {dst_bucket}.") + + # Read events from source S3 bucket + raw_events = get_events(src_bucket, key) + + # Transform events to OCSF format + ocsf_events = wazuh_ocsf_converter.transform_events(raw_events) + + # Write OCSF events to Parquet file + tmp_filename = '/tmp/tmp.parquet' + write_parquet_file(ocsf_events, tmp_filename) + + # Upload Parquet file to destination S3 bucket + parquet_key = get_full_key(src_location, account_id, region, key) + upload_success = upload_to_s3(dst_bucket, parquet_key, tmp_filename) + + # Clean up temporary file + os.remove(tmp_filename) + + # Prepare response + response = { + 'size': len(raw_events), + 'upload_success': upload_success + } + return json.dumps(response) diff --git a/integrations/amazon-security-lake/src/models/__init__.py b/integrations/amazon-security-lake/src/models/__init__.py new file mode 100644 index 0000000000000..8dc7d9f3af00b --- /dev/null +++ b/integrations/amazon-security-lake/src/models/__init__.py @@ -0,0 +1,2 @@ +import models.wazuh +import models.ocsf diff --git a/integrations/amazon-security-lake/src/transform/models/ocsf.py b/integrations/amazon-security-lake/src/models/ocsf.py similarity index 100% rename from integrations/amazon-security-lake/src/transform/models/ocsf.py rename to integrations/amazon-security-lake/src/models/ocsf.py diff --git a/integrations/amazon-security-lake/src/transform/models/wazuh.py b/integrations/amazon-security-lake/src/models/wazuh.py similarity index 100% rename from integrations/amazon-security-lake/src/transform/models/wazuh.py rename to integrations/amazon-security-lake/src/models/wazuh.py diff --git a/integrations/amazon-security-lake/src/parquet/__init__.py b/integrations/amazon-security-lake/src/parquet/__init__.py deleted file mode 100644 index c434a5d9acc27..0000000000000 --- a/integrations/amazon-security-lake/src/parquet/__init__.py +++ /dev/null @@ -1 +0,0 @@ -import parquet.parquet diff --git a/integrations/amazon-security-lake/src/parquet/parquet.py b/integrations/amazon-security-lake/src/parquet/parquet.py deleted file mode 100644 index 79a146f0993a2..0000000000000 --- a/integrations/amazon-security-lake/src/parquet/parquet.py +++ /dev/null @@ -1,20 +0,0 @@ - -import pyarrow as pa -import pyarrow.parquet as pq -import pyarrow.fs as pafs - - -class Parquet: - - @staticmethod - def encode(data: dict): - return pa.Table.from_pydict(data) - - @staticmethod - def to_s3(data: pa.Table, s3: pafs.S3FileSystem): - pass - - @staticmethod - def to_file(data: pa.Table, path: str): - # pq.write_to_dataset(table=data, root_path=path) - pq.write_table(data, path) diff --git a/integrations/amazon-security-lake/src/parquet/test.py b/integrations/amazon-security-lake/src/parquet/test.py deleted file mode 100644 index 318da6ebe4740..0000000000000 --- a/integrations/amazon-security-lake/src/parquet/test.py +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/python - -import pyarrow as pa -from parquet import Parquet -import json - -with open("wazuh-event.ocsf.json", "r") as fd: - events = [json.load(fd)] - table = pa.Table.from_pylist(events) - Parquet.to_file(table, "output/wazuh-event.ocsf.parquet") diff --git a/integrations/amazon-security-lake/src/run.py b/integrations/amazon-security-lake/src/run.py deleted file mode 100644 index 30e2fd5af553c..0000000000000 --- a/integrations/amazon-security-lake/src/run.py +++ /dev/null @@ -1,122 +0,0 @@ -#!/env/bin/python3 -# vim: bkc=yes bk wb - -import sys -import os -import datetime -import transform -import pyarrow as pa -import pyarrow.parquet as pq -import logging -import boto3 -from botocore.exceptions import ClientError - -# NOTE work in progress -def upload_file(table, file_name, bucket, object_name=None): - """Upload a file to an S3 bucket - - :param table: PyArrow table with events data - :param file_name: File to upload - :param bucket: Bucket to upload to - :param object_name: S3 object name. If not specified then file_name is used - :return: True if file was uploaded, else False - """ - - client = boto3.client( - service_name='s3', - aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], - aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'], - region_name=os.environ['AWS_REGION'], - endpoint_url='http://s3.ninja:9000', - ) - - # If S3 object_name was not specified, use file_name - if object_name is None: - object_name = os.path.basename(file_name) - - # Upload the file - try: - client.put_object(Bucket=bucket, Key=file_name, Body=open(file_name, 'rb')) - except ClientError as e: - logging.error(e) - return False - return True - - -def main(): - '''Main function''' - # Get current timestamp - timestamp = datetime.datetime.now(datetime.timezone.utc).isoformat() - - # Generate filenames - filename_raw = f"/tmp/integrator-raw-{timestamp}.json" - filename_ocsf = f"/tmp/integrator-ocsf-{timestamp}.json" - filename_parquet = f"/tmp/integrator-ocsf-{timestamp}.parquet" - - # 1. Extract data - # ================ - raw_data = [] - for line in sys.stdin: - raw_data.append(line) - - # Echo piped data - with open(filename_raw, "a") as fd: - fd.write(line) - - # 2. Transform data - # ================ - # a. Transform to OCSF - ocsf_data = [] - for line in raw_data: - try: - event = transform.converter.from_json(line) - ocsf_event = transform.converter.to_detection_finding(event) - ocsf_data.append(ocsf_event.model_dump()) - - # Temporal disk storage - with open(filename_ocsf, "a") as fd: - fd.write(str(ocsf_event) + "\n") - except AttributeError as e: - print("Error transforming line to OCSF") - print(event) - print(e) - - # b. Encode as Parquet - try: - table = pa.Table.from_pylist(ocsf_data) - pq.write_table(table, filename_parquet) - except AttributeError as e: - print("Error encoding data to parquet") - print(e) - - # 3. Load data (upload to S3) - # ================ - if upload_file(table, filename_parquet, os.environ['AWS_BUCKET']): - # Remove /tmp files - pass - - -def _test(): - ocsf_event = {} - with open("./wazuh-event.sample.json", "r") as fd: - # Load from file descriptor - for raw_event in fd: - try: - event = transform.converter.from_json(raw_event) - print("") - print("-- Wazuh event Pydantic model") - print("") - print(event.model_dump()) - ocsf_event = transform.converter.to_detection_finding(event) - print("") - print("-- Converted to OCSF") - print("") - print(ocsf_event.model_dump()) - - except KeyError as e: - raise (e) - - -if __name__ == '__main__': - main() - # _test() diff --git a/integrations/amazon-security-lake/src/transform/__init__.py b/integrations/amazon-security-lake/src/transform/__init__.py deleted file mode 100644 index 6e8733a32b85d..0000000000000 --- a/integrations/amazon-security-lake/src/transform/__init__.py +++ /dev/null @@ -1 +0,0 @@ -import transform.converter diff --git a/integrations/amazon-security-lake/src/transform/converter.py b/integrations/amazon-security-lake/src/transform/converter.py deleted file mode 100644 index 88477b0dae814..0000000000000 --- a/integrations/amazon-security-lake/src/transform/converter.py +++ /dev/null @@ -1,105 +0,0 @@ -import json - -import pydantic -import transform.models as models - - -def normalize(level: int) -> int: - """ - Normalizes rule level into the 0-6 range, required by OCSF. - """ - if level >= 15: # (5) Critical - severity = 5 - elif level >= 11: # (4) High - severity = 4 - elif level >= 8: # (3) Medium - severity = 3 - elif level >= 4: # (2) Low - severity = 2 - elif level >= 0: # (1) Informational - severity = 1 - else: - severity = 0 # (0) Unknown - - return severity - - -def join(iterable, separator=","): - return (separator.join(iterable)) - - -def to_detection_finding(event: models.wazuh.Event) -> models.ocsf.DetectionFinding: - finding_info = models.ocsf.FindingInfo( - analytic=models.ocsf.AnalyticInfo( - category=", ".join(event.rule.groups), - name=event.decoder.name, - type_id=1, - uid=event.rule.id - ), - attacks=models.ocsf.AttackInfo( - tactic=models.ocsf.TechniqueInfo( - name=", ".join(event.rule.mitre.tactic), - uid=", ".join(event.rule.mitre.id) - ), - technique=models.ocsf.TechniqueInfo( - name=", ".join(event.rule.mitre.technique), - uid=", ".join(event.rule.mitre.id) - ), - version="v13.1" - ), - title=event.rule.description, - types=[event.input.type], - uid=event.id - ) - - metadata = models.ocsf.Metadata( - log_name="Security events", - log_provider="Wazuh", - product=models.ocsf.ProductInfo( - name="Wazuh", - lang="en", - vendor_name="Wazuh, Inc,." - ), - version="1.1.0" - ) - - resources = [models.ocsf.Resource( - name=event.agent.name, uid=event.agent.id)] - - severity_id = normalize(event.rule.level) - - unmapped = { - "data_sources": [ - event.location, - event.manager.name - ], - "nist": event.rule.nist_800_53 # Array - } - - return models.ocsf.DetectionFinding( - count=event.rule.firedtimes, - message=event.rule.description, - finding_info=finding_info, - metadata=metadata, - raw_data=event.full_log, - resources=resources, - risk_score=event.rule.level, - severity_id=severity_id, - time=event.timestamp, - unmapped=unmapped - ) - - -# def from_json(event: dict) -> models.wazuh.Event: -# # Needs to a string, bytes or bytearray -# try: -# return models.wazuh.Event.model_validate_json(json.dumps(event)) -# except pydantic.ValidationError as e: -# print(e) - -def from_json(event: str) -> models.wazuh.Event: - # Needs to a string, bytes or bytearray - try: - return models.wazuh.Event.model_validate_json(event) - except pydantic.ValidationError as e: - print(e) \ No newline at end of file diff --git a/integrations/amazon-security-lake/src/transform/models/__init__.py b/integrations/amazon-security-lake/src/transform/models/__init__.py deleted file mode 100644 index 2fdec7bc648af..0000000000000 --- a/integrations/amazon-security-lake/src/transform/models/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -import transform.models.wazuh -import transform.models.ocsf diff --git a/integrations/amazon-security-lake/src/wazuh-event.sample.json b/integrations/amazon-security-lake/src/wazuh-event.sample.json deleted file mode 100644 index b161bf24db419..0000000000000 --- a/integrations/amazon-security-lake/src/wazuh-event.sample.json +++ /dev/null @@ -1 +0,0 @@ -{"cluster":{"name":"wazuh-cluster","node":"wazuh-manager"},"agent":{"id":"003","ip":"10.0.0.180","name":"ip-10-0-0-180.us-west-1.compute.internal"},"@timestamp":"2024-03-14T12:57:05.730Z","data":{"audit":{"exe":"/usr/sbin/sshd","type":"NORMAL","cwd":"/home/wazuh","file":{"name":"/var/sample"},"success":"yes","command":"ssh"}},"@version":"1","manager":{"name":"wazuh-manager"},"location":"","decoder":{},"id":"1580123327.49031","predecoder":{},"timestamp":"2024-03-14T12:57:05.730+0000","rule":{"description":"Audit: Command: /usr/sbin/ssh","firedtimes":3,"level":3,"id":"80791","mail":false,"groups":["audit","audit_command"],"gdpr":["IV_30.1.g"]}} \ No newline at end of file diff --git a/integrations/amazon-security-lake/src/wazuh_ocsf_converter.py b/integrations/amazon-security-lake/src/wazuh_ocsf_converter.py new file mode 100644 index 0000000000000..e16147f398255 --- /dev/null +++ b/integrations/amazon-security-lake/src/wazuh_ocsf_converter.py @@ -0,0 +1,124 @@ +import pydantic +import models +import logging + + +def normalize(level: int) -> int: + """ + Normalizes rule level into the 0-6 range, required by OCSF. + """ + if level >= 15: # (5) Critical + severity = 5 + elif level >= 11: # (4) High + severity = 4 + elif level >= 8: # (3) Medium + severity = 3 + elif level >= 4: # (2) Low + severity = 2 + elif level >= 0: # (1) Informational + severity = 1 + else: + severity = 0 # (0) Unknown + + return severity + + +def join(iterable, separator=","): + return (separator.join(iterable)) + + +def to_detection_finding(event: models.wazuh.Event) -> models.ocsf.DetectionFinding: + """ + Convert Wazuh security event to OCSF detection finding. + """ + try: + + finding_info = models.ocsf.FindingInfo( + analytic=models.ocsf.AnalyticInfo( + category=", ".join(event.rule.groups), + name=event.decoder.name, + type_id=1, + uid=event.rule.id + ), + attacks=models.ocsf.AttackInfo( + tactic=models.ocsf.TechniqueInfo( + name=", ".join(event.rule.mitre.tactic), + uid=", ".join(event.rule.mitre.id) + ), + technique=models.ocsf.TechniqueInfo( + name=", ".join(event.rule.mitre.technique), + uid=", ".join(event.rule.mitre.id) + ), + version="v13.1" + ), + title=event.rule.description, + types=[event.input.type], + uid=event.id + ) + + metadata = models.ocsf.Metadata( + log_name="Security events", + log_provider="Wazuh", + product=models.ocsf.ProductInfo( + name="Wazuh", + lang="en", + vendor_name="Wazuh, Inc,." + ), + version="1.1.0" + ) + + resources = [models.ocsf.Resource( + name=event.agent.name, uid=event.agent.id)] + + severity_id = normalize(event.rule.level) + + unmapped = { + "data_sources": [ + event.location, + event.manager.name + ], + "nist": event.rule.nist_800_53 # Array + } + + return models.ocsf.DetectionFinding( + count=event.rule.firedtimes, + message=event.rule.description, + finding_info=finding_info, + metadata=metadata, + raw_data=event.full_log, + resources=resources, + risk_score=event.rule.level, + severity_id=severity_id, + time=event.timestamp, + unmapped=unmapped + ) + except AttributeError as e: + logging.error(f"Error transforming event: {e}") + return {} + + +def from_json(json_line: str) -> models.wazuh.Event: + """ + Parse the JSON string representation of a Wazuh security event into a dictionary (model). + """ + # Needs to a string, bytes or bytearray + try: + return models.wazuh.Event.model_validate_json(json_line) + except pydantic.ValidationError as e: + print(e) + + +def transform_events(events: list) -> list: + """ + Transform a list of Wazuh security events (json string) to OCSF format. + """ + logging.info("Transforming Wazuh security events to OCSF.") + ocsf_events = [] + for event in events: + try: + wazuh_event = from_json(event) + ocsf_event = to_detection_finding(wazuh_event).model_dump() + ocsf_events.append(ocsf_event) + except Exception as e: + logging.error(f"Error transforming line to OCSF: {e}") + return ocsf_events diff --git a/integrations/docker/amazon-security-lake.yml b/integrations/docker/amazon-security-lake.yml index 0a1465d2e6d81..2e48961d20b2b 100644 --- a/integrations/docker/amazon-security-lake.yml +++ b/integrations/docker/amazon-security-lake.yml @@ -83,7 +83,8 @@ services: AWS_ACCESS_KEY_ID: "AKIAIOSFODNN7EXAMPLE" AWS_SECRET_ACCESS_KEY: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" AWS_REGION: "us-east-1" - AWS_BUCKET: "wazuh-indexer-amazon-security-lake-bucket" + AUX_BUCKET: "indexer-amazon-security-lake-bucket" + AWS_ENDPOINT: "http://s3.ninja:9000" ports: - "5000:5000/tcp" - "5000:5000/udp" @@ -105,6 +106,27 @@ services: volumes: - s3-data:/home/sirius/data + aws.lambda: + image: wazuh/indexer-security-lake-integration:lambda + build: + context: ../amazon-security-lake + dockerfile: ../amazon-security-lake/aws-lambda.dockerfile + container_name: wazuh.integration.security.lake.aws.lambda + hostname: wazuh.integration.security.lake.aws.lambda + environment: + AWS_ACCESS_KEY_ID: "AKIAIOSFODNN7EXAMPLE" + AWS_SECRET_ACCESS_KEY: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + AWS_REGION: "us-east-1" + AWS_BUCKET: "wazuh-indexer-amazon-security-lake-bucket" + AWS_ENDPOINT: "http://s3.ninja:9000" + SOURCE_LOCATION: "wazuh" + ACCOUNT_ID: "111111111111" + IS_DEV: true + volumes: + - ../amazon-security-lake/src:/var/task + ports: + - "9000:8080" + wazuh-certs-generator: image: wazuh/wazuh-certs-generator:0.0.1 hostname: wazuh-certs-generator