Skip to content

Commit

Permalink
Add comments on handling files and streams with pyarrow for future re…
Browse files Browse the repository at this point in the history
…ference
  • Loading branch information
f-galland committed Feb 6, 2024
1 parent 4ad01c2 commit 1638b17
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions integrations/stdin_to_securitylake.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ def map_to_ocsf():
## Code that translates fields to OCSF

def encode_parquet(list):
### We can write directly to S3 from pyarrow:
### https://arrow.apache.org/docs/python/filesystems.html#s3
###
### Credentials can be stored in /root/.aws/credentials
### https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/credentials.html
table = Table.from_pylist(list)
parquet.write_table(table, '/tmp/{}.parquet'.format(clockstr))

def push_to_s3(parquet):
## Fill with AWS S3 code
pass

def read_block(fileobject,length):
output=[]
for i in range(0,length):
Expand Down Expand Up @@ -62,12 +63,18 @@ def parse_arguments():

try:
while True:
### We can possibly replace all the custom code here
### and just use Arrow's built-in input and output facilities:
### * https://arrow.apache.org/docs/python/memory.html#input-and-output
### * https://arrow.apache.org/docs/python/ipc.html#reading-from-stream-and-file-format-for-pandas
### * https://stackoverflow.com/questions/52945609/pandas-dataframe-to-parquet-buffer-in-memory

current_block = read_block(stdin,args.linebuffer)
if current_block[-1] == block_ending :
output_buffer += current_block[0:current_block.index(block_ending)]
time.sleep(args.sleeptime)
if len(output_buffer) > args.maxlength or get_elapsedseconds(starttimestamp) > args.pushinterval:
push_to_s3(encode_parquet(output_buffer))
encode_parquet(output_buffer)
logging.debug(json.dumps(output_buffer))
starttimestamp = datetime.now(tz='UTC')
output_buffer = []
Expand Down

0 comments on commit 1638b17

Please sign in to comment.