Skip to content

Commit

Permalink
Merge pull request #21 from PanDAWMS/flin
Browse files Browse the repository at this point in the history
msg processor: add passive mode, support newer stomp version
  • Loading branch information
mightqxc authored Apr 8, 2022
2 parents 0617167 + 2489828 commit 81e3295
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 102 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.0.24"
release_version = "0.0.25"
122 changes: 91 additions & 31 deletions pandacommon/pandamsgbkr/msg_bkr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ def _get_connection_dict(host_port_list, use_ssl=False, cert_file=None, key_file
get dict {conn_id: connection}
"""
tmp_logger = logger_utils.make_logger(base_logger, method_name='_get_connection_dict')
ssl_opts = {'use_ssl' : use_ssl,
'ssl_version' : ssl.PROTOCOL_TLSv1,
'ssl_cert_file' : cert_file,
'ssl_key_file' : key_file}
conn_dict = dict()
# resolve all distinct hosts behind hostname
resolved_host_port_set = set()
Expand All @@ -52,7 +48,24 @@ def _get_connection_dict(host_port_list, use_ssl=False, cert_file=None, key_file
host_port = '{0}:{1}'.format(host, port)
conn_id = host_port
if conn_id not in conn_dict:
conn = stomp.Connection12(host_and_ports = [(host, port)], vhost=vhost, **ssl_opts)
try:
conn = stomp.Connection12(host_and_ports=[(host, port)], vhost=vhost)
if use_ssl:
ssl_opts = {
'ssl_version' : ssl.PROTOCOL_TLSv1,
'cert_file' : cert_file,
'key_file' : key_file
}
conn.set_ssl(for_hosts=[(host, port)], **ssl_opts)
except AttributeError:
# Older version of stomp.py
ssl_opts = {
'use_ssl' : use_ssl,
'ssl_version' : ssl.PROTOCOL_TLSv1,
'ssl_cert_file' : cert_file,
'ssl_key_file' : key_file
}
conn = stomp.Connection12(host_and_ports=[(host, port)], vhost=vhost, **ssl_opts)
conn_dict[conn_id] = conn
tmp_logger.debug('got {0} connections to {1}'.format(len(conn_dict), ' , '.join(conn_dict.keys())))
return conn_dict
Expand Down Expand Up @@ -169,39 +182,55 @@ def __init__(self, mb_proxy, conn_id, *args, **kwargs):
# whether log verbosely
self.verbose = kwargs.get('verbose', False)

def on_error(self, headers, message):
self.logger.error('on_error start: {h} "{m}"'.format(h=headers, m=message))
self.logger.error('on_error done: {h}'.format(h=headers))
def _parse_args(self, args):
"""
Parse the args for different versions of stomp.py
return (cmd, headers, body)
"""
if len(args) == 1:
# [frame] : in newer version
frame = args[0]
return (frame.cmd, frame.headers, frame.body)
elif len(args) == 2:
# [headers, message] : in older version
headers, message = args
return (None, headers, message)

def on_error(self, *args):
cmd, headers, body = self._parse_args(args)
self.logger.error('on_error : {h} | {b}'.format(h=headers, b=body))

def on_disconnected(self):
self.logger.info('on_disconnected start')
self.mb_proxy._on_disconnected(conn_id=self.conn_id)
self.logger.info('on_disconnected done')

def on_send(self, frame):
obscured_headers = frame.headers
if 'passcode' in frame.headers:
obscured_headers = copy.deepcopy(frame.headers)
def on_send(self, *args):
cmd, headers, body = self._parse_args(args)
obscured_headers = headers
if 'passcode' in headers:
obscured_headers = copy.deepcopy(headers)
obscured_headers['passcode'] = '********'
if self.verbose:
self.logger.debug('on_send frame: {0} {1} "{2}"'.format(frame.cmd, obscured_headers, frame.body))
self.logger.debug('on_send frame: {0} {1} | {2}'.format(cmd, obscured_headers, body))

def on_message(self, headers, message):
def on_message(self, *args):
cmd, headers, body = self._parse_args(args)
if self.verbose:
self.logger.debug('on_message start: {h} "{m}"'.format(h=headers, m=message))
self.mb_proxy._on_message(headers, message, conn_id=self.conn_id)
self.logger.debug('on_message start: {h} | {b}'.format(h=headers, b=body))
self.mb_proxy._on_message(headers, body, conn_id=self.conn_id)
if self.verbose:
self.logger.debug('on_message done: {h}'.format(h=headers))


# message broker proxy for receiver
class MBProxy(object):
class MBListenerProxy(object):

def __init__(self, name, host_port_list, destination, use_ssl=False, cert_file=None, key_file=None, vhost=None,
username=None, passcode=None, wait=True, ack_mode='client-individual', skip_buffer=False, conn_mode='all',
verbose=False):
prefetch_size=None, verbose=False, **kwargs):
# logger
self.logger = logger_utils.make_logger(base_logger, token=name, method_name='MBProxy')
self.logger = logger_utils.make_logger(base_logger, token=name, method_name='MBListenerProxy')
# name of message queue
self.name = name
# connection parameters
Expand All @@ -213,7 +242,7 @@ def __init__(self, name, host_port_list, destination, use_ssl=False, cert_file=N
# destination queue to subscribe
self.destination = destination
# subscription ID
self.sub_id = 'panda-MBProxy_{0}_r{1:06}'.format(socket.getfqdn(), random.randrange(10**6))
self.sub_id = 'panda-MBListenerProxy_{0}_r{1:06}'.format(socket.getfqdn(), random.randrange(10**6))
# client ID
self.client_id = 'client_{0}_{1}'.format(self.sub_id, hex(id(self)))
# connect parameters
Expand Down Expand Up @@ -241,6 +270,10 @@ def __init__(self, name, host_port_list, destination, use_ssl=False, cert_file=N
self.to_disconnect = False
# whether to log verbosely
self.verbose = verbose
# prefetch count of the MB (max number of un-acknowledge messages allowed)
self.prefetch_size = prefetch_size
# evaluate subscription headers
self._evaluate_subscription_headers()
# get connections
self._get_connections()

Expand All @@ -264,6 +297,14 @@ def _get_connections(self):
self.logger.debug('got connection about {0}'.format(conn_id))
self.logger.debug('done')

def _evaluate_subscription_headers(self):
self.subscription_headers = {}
if self.prefetch_size is not None:
self.subscription_headers.update({
'activemq.prefetchSize': self.prefetch_size, # for ActiveMQ
'prefetch-count': self.prefetch_size, # for RabbitMQ
})

def _begin(self, conn_id):
conn = self.connection_dict[conn_id]
txs_id = conn.begin()
Expand Down Expand Up @@ -295,8 +336,8 @@ def _nack(self, conn_id, msg_id, ack_id):
conn.nack(ack_id)
self.logger.warning('{conid} {mid} {ackid} NACK'.format(conid=conn_id, mid=msg_id, ackid=ack_id))

def _on_message(self, headers, message, conn_id):
msg_obj = MsgObj(mb_proxy=self, conn_id=conn_id, msg_id=headers['message-id'], ack_id=headers['ack'], data=message)
def _on_message(self, headers, body, conn_id):
msg_obj = MsgObj(mb_proxy=self, conn_id=conn_id, msg_id=headers['message-id'], ack_id=headers['ack'], data=body)
if self.verbose:
self.logger.debug('_on_message from {c} made message object: {h}'.format(c=conn_id, h=headers))
if self.skip_buffer:
Expand All @@ -323,7 +364,8 @@ def go(self):
self.got_disconnected = False
conn.set_listener(listener.__class__.__name__, listener)
conn.connect(**self.connect_params)
conn.subscribe(destination=self.destination, id=self.sub_id, ack='client-individual')
conn.subscribe(destination=self.destination, id=self.sub_id, ack='client-individual',
headers=self.subscription_headers)
self.logger.info('connected to {0} {1}'.format(conn_id, self.destination))
else:
self.logger.info('connection to {0} {1} already exists. Skipped...'.format(
Expand Down Expand Up @@ -352,12 +394,30 @@ def restart(self):
self.go()
self.logger.info('the {0}th restart ended'.format(self.n_restart))

def get_messages(self, limit=100):
"""
get some messages capped by limit from local buffer
return list of message objects
"""
if self.verbose:
self.logger.debug('get_messages called')
# get messages from local buffer
msg_list = []
for j in range(limit):
msg_obj = self.msg_buffer.get()
if msg_obj is None:
break
msg_list.append(msg_obj)
if self.verbose:
self.logger.debug('got {n} messages'.format(n=len(msg_list)))
return msg_list


# message broker proxy for sender, waster...
class MBSenderProxy(object):

def __init__(self, name, host_port_list, destination, use_ssl=False, cert_file=None, key_file=None, vhost=None,
username=None, passcode=None, wait=True, verbose=False):
username=None, passcode=None, wait=True, verbose=False, **kwargs):
# logger
self.logger = logger_utils.make_logger(base_logger, token=name, method_name='MBSenderProxy')
# name of message queue
Expand Down Expand Up @@ -393,16 +453,16 @@ def _get_connection(self):
get a connection and a listener
"""
conn_dict = _get_connection_dict(self.host_port_list, self.use_ssl, self.cert_file, self.key_file, self.vhost)
self.conn_id, self.conn = random.choice([conn_dict.items()])
self.listener = MsgListener(mb_proxy=self, conn_id=self.conn, verbose=self.verbose)
self.conn_id, self.conn = random.choice(list(conn_dict.items()))
self.listener = MsgListener(mb_proxy=self, conn_id=self.conn_id, verbose=self.verbose)
self.logger.debug('got connection about {0}'.format(self.conn_id))

def _on_message(self, headers, message):
def _on_message(self, headers, body, conn_id):
if self.verbose:
self.logger.debug('_on_message drop message: {h} "{m}"'.format(h=headers, m=message))
self.logger.debug('_on_message from {c} drop message: {h} | {b}'.format(c=conn_id, h=headers, b=body))

def _on_disconnected(self):
self.logger.debug('_on_disconnected called')
def _on_disconnected(self, conn_id):
self.logger.debug('_on_disconnected from {c} called'.format(c=conn_id))
self.got_disconnected = True

def send(self, data):
Expand All @@ -411,7 +471,7 @@ def send(self, data):
"""
self.conn.send(destination=self.destination, body=data)
if self.verbose:
self.logger.debug('send to {dest} "{data}"'.format(dest=self.destination, data=data))
self.logger.debug('send to {dest} | {data}'.format(dest=self.destination, data=data))

def waste(self, duration=3):
"""
Expand Down
Loading

0 comments on commit 81e3295

Please sign in to comment.