Skip to content

Commit

Permalink
Merge pull request #34 from DMTF/http-parser-remove
Browse files Browse the repository at this point in the history
Rewrite program to remove dependency
  • Loading branch information
mraineri authored Jun 28, 2024
2 parents 4c23b80 + e30b92c commit aeb2e75
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 52 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ The Redfish Event Listener is a lightweight HTTP(S) server that can be deployed

The Redfish Event Listener requires Python3 on the user's system. Additionally, the following Python packages are required:

* http_parser
* redfish
* redfish_utilities

Expand Down
125 changes: 76 additions & 49 deletions RedfishEventListener_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@
import logging
import json
import ssl
import sys, signal
import sys
import signal
import re
import socket
from datetime import datetime
import argparse

import threading
from http_parser.http import HttpStream
from http_parser.reader import SocketReader

from redfish import redfish_client, AuthMethod
from redfish import redfish_client
import redfish_utilities

PACKET_SIZE = 1024

my_logger = logging.getLogger()
my_logger.setLevel(logging.DEBUG)
standard_out = logging.StreamHandler(sys.stdout)
Expand Down Expand Up @@ -46,44 +48,73 @@
'registries': None
}

### Function to read data in json format using HTTP Stream reader, parse Headers and Body data, Response status OK to service and Update the output into file

def read_my_socket(active_socket):
# Read our socket data in blocks of PACKET_SIZE
# then return our HTTP data
data = b''

my_logger.info('Getting data from server...')
data = active_socket.recv(PACKET_SIZE)

# Check if our packet fit in PACKET_SIZE bytes
# Ordinarily Content-Length is more valid, but this is satisfactory
if len(data) >= PACKET_SIZE:
my_logger.info("Gathering more data...")
while True:
more_data = active_socket.recv(PACKET_SIZE)
data = b''.join([data, more_data])
if more_data < PACKET_SIZE:
break

response_array = data.decode("utf-8").split("\r\n")
r_type, r_headers, r_payload = response_array[0], response_array[1:-1], json.loads(response_array[-1])
r_method = r_type.split(' ')[0]

headers_dict = {}
for item in r_headers:
if ': ' in item:
key, value = tuple(item.split(': ', 1))
headers_dict[key] = value
return r_method, headers_dict, r_payload


# Function to read data in json format using HTTP Stream reader, parse Headers and Body data, Response status OK to service and Update the output into file
def process_data(newsocketconn, fromaddr):
if useSSL:
connstreamout = context.wrap_socket(newsocketconn, server_side=True)
else:
connstreamout = newsocketconn
### Output File Name
# Output File Name
outputfile = "Events_" + str(fromaddr[0]) + ".txt"
logfile = "TimeStamp.log"
global event_count, data_buffer
outdata = headers = HostDetails = ""
payload = headers = HostDetails = ""
try:
try:
### Read the json response using Socket Reader and split header and body
r = SocketReader(connstreamout)
p = HttpStream(r)
headers = p.headers()
my_logger.info("headers: %s", headers)

if p.method() == 'POST':
bodydata = p.body_file().read()
bodydata = bodydata.decode("utf-8")
# Read the json response using Socket Reader and split header and body
stream_output = read_my_socket(connstreamout)
method, headers, payload = stream_output

my_logger.info("response_type: {}".format(method))
my_logger.info("headers: {}".format(headers))

if method == 'POST':
my_logger.info("\n")
my_logger.info("bodydata: %s", bodydata)
data_buffer.append(bodydata)
my_logger.info("bodydata: %s", payload)
data_buffer.append(payload)
for eachHeader in headers.items():
if eachHeader[0] == 'Host' or eachHeader[0] == 'host':
HostDetails = eachHeader[1]

### Read the json response and print the output
# Read the json response and print the output
my_logger.info("\n")
my_logger.info("Server IP Address is %s", fromaddr[0])
my_logger.info("Server PORT number is %s", fromaddr[1])
my_logger.info("Listener IP is %s", HostDetails)
my_logger.info("\n")
outdata = json.loads(bodydata)
if 'Events' in outdata and config['verbose']:
event_array = outdata['Events']
if 'Events' in payload and config['verbose']:
event_array = payload['Events']
for event in event_array:
my_logger.info("EventType is %s", event['EventType'])
my_logger.info("MessageId is %s", event['MessageId'])
Expand All @@ -101,12 +132,12 @@ def process_data(newsocketconn, fromaddr):
my_logger.info("Message is %s", event['Message'])
if 'MessageArgs' in event:
my_logger.info("MessageArgs is %s", event['MessageArgs'])
if 'Context' in outdata:
my_logger.info("Context is %s", outdata['Context'])
if 'Context' in payload:
my_logger.info("Context is %s", payload['Context'])
my_logger.info("\n")
if 'MetricValues' in outdata and config['verbose']:
metric_array = outdata['MetricValues']
my_logger.info("Metric Report Name is: %s", outdata.get('Name'))
if 'MetricValues' in payload and config['verbose']:
metric_array = payload['MetricValues']
my_logger.info("Metric Report Name is: %s", payload.get('Name'))
for metric in metric_array:
my_logger.info("Member ID is: %s", metric.get('MetricId'))
my_logger.info("Metric Value is: %s", metric.get('MetricValue'))
Expand All @@ -115,18 +146,18 @@ def process_data(newsocketconn, fromaddr):
my_logger.info("Metric Property is: %s", metric['MetricProperty'])
my_logger.info("\n")

### Check the context and send the status OK if context matches
if config['contextdetail'] is not None and outdata.get('Context', None) != config['contextdetail']:
# Check the context and send the status OK if context matches
if config['contextdetail'] is not None and payload.get('Context', None) != config['contextdetail']:
my_logger.info("Context ({}) does not match with the server ({})."
.format(outdata.get('Context', None), config['contextdetail']))
.format(payload.get('Context', None), config['contextdetail']))
res = "HTTP/1.1 204 No Content\r\n" \
"Connection: close\r\n" \
"\r\n"
connstreamout.send(res.encode())
with open(logfile, 'a') as f:
if 'EventTimestamp' in outdata:
if 'EventTimestamp' in payload:
receTime = datetime.now()
sentTime = datetime.strptime(outdata['EventTimestamp'], "%Y-%m-%d %H:%M:%S.%f")
sentTime = datetime.strptime(payload['EventTimestamp'], "%Y-%m-%d %H:%M:%S.%f")
f.write("%s %s %sms\n" % (
sentTime.strftime("%Y-%m-%d %H:%M:%S.%f"), receTime, (receTime - sentTime).microseconds / 1000))
else:
Expand All @@ -142,12 +173,12 @@ def process_data(newsocketconn, fromaddr):
my_logger.info("\n")
fd = open(outputfile, "a")
fd.write("Time:%s Count:%s\nHost IP:%s\nEvent Details:%s\n" % (
datetime.now(), event_count[str(fromaddr[0])], str(fromaddr), json.dumps(outdata)))
datetime.now(), event_count[str(fromaddr[0])], str(fromaddr), json.dumps(payload)))
fd.close()
except Exception as err:
except Exception:
my_logger.info(traceback.print_exc())

if p.method() == 'GET':
if method == 'GET':
# for x in data_buffer:
# my_logger.info(x)
res = "HTTP/1.1 200 OK\n" \
Expand All @@ -156,17 +187,13 @@ def process_data(newsocketconn, fromaddr):
connstreamout.send(res.encode())
data_buffer.clear()

except Exception as err:
outdata = connstreamout.read()
my_logger.info("Data needs to read in normal Text format.")
my_logger.info(outdata)
except Exception:
my_logger.info(traceback.print_exc())

finally:
connstreamout.shutdown(socket.SHUT_RDWR)
connstreamout.close()

import argparse

if __name__ == '__main__':
"""
Main program
Expand Down Expand Up @@ -276,15 +303,15 @@ def parse_list(string: str):
try:
# Response bodies are expected to have the event destination
unsub_id = response.dict['Id']
except:
except Exception:
# Fallback to determining the Id from the Location header
if my_location is not None:
unsub_id = my_location.strip('/').split('/')[-1]
if unsub_id is None:
my_logger.error('{} did not provide a location for the subscription; cannot unsubscribe'.format(dest))
else:
target_contexts.append((dest, my_ctx, unsub_id))
except Exception as e:
except Exception:
my_logger.info('Unable to subscribe for events with {}'.format(dest))
my_logger.info(traceback.print_exc())

Expand All @@ -307,7 +334,7 @@ def parse_list(string: str):
# Check if the listener is IPv4 or IPv6; defaults to IPv4 if the lookup fails
try:
family = socket.getaddrinfo(config['listenerip'], config['listenerport'])[0][0]
except:
except Exception:
family = socket.AF_INET
socket_server = socket.create_server(my_host, family=family)
socket_server.listen(5)
Expand All @@ -327,7 +354,7 @@ def handler(sig, frame):
try:
redfish_utilities.delete_event_subscription(ctx, unsub_id)
ctx.logout()
except:
except Exception:
my_logger.info('Unable to unsubscribe for events with {}'.format(ctx.get_base_url()))
my_logger.info(traceback.print_exc())

Expand All @@ -337,16 +364,16 @@ def handler(sig, frame):
while True:
newsocketconn = None
try:
### Socket Binding
# Socket Binding
newsocketconn, fromaddr = socket_server.accept()
try:
### Multiple Threads to handle different request from different servers
# Multiple Threads to handle different request from different servers
my_logger.info('\nSocket connected::')
threading.Thread(target=process_data, args=(newsocketconn, fromaddr)).start()
except Exception as err:
except Exception:
my_logger.info(traceback.print_exc())
except socket.timeout:
print('.', end='', flush=True)
except Exception as err:
except Exception:
my_logger.info("Exception occurred in socket binding.")
my_logger.info(traceback.print_exc())
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
redfish
redfish_utilities>=1.2.0
http_parser
redfish_utilities>=1.2.0

0 comments on commit aeb2e75

Please sign in to comment.