Skip to content

Commit

Permalink
added message removal functions to MBSenderProxy
Browse files Browse the repository at this point in the history
  • Loading branch information
tmaeno committed Jul 1, 2022
1 parent 5c97e8f commit 05123c2
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 3 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.0.29"
release_version = "0.0.30"
3 changes: 3 additions & 0 deletions README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Includes all libraries used by both server and monitor (and others).
Release Note
------------

* 0.0.30 (1/7/2022)
* added message removal functions to MBSenderProxy

* 0.0.29 (27/6/2022)
* env vars in msg config json

Expand Down
52 changes: 50 additions & 2 deletions pandacommon/pandamsgbkr/msg_bkr_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import datetime
import threading
import socket
import ssl
import random
import uuid
import collections
import time
import copy
Expand Down Expand Up @@ -445,6 +447,10 @@ def __init__(self, name, host_port_list, destination, use_ssl=False, cert_file=N
self.to_disconnect = False
# whether to log verbosely
self.verbose = verbose
# instance lock for removers
self.remover_lock = threading.Lock()
# removers
self.removers = {}
# get connection
self._get_connection()

Expand All @@ -465,11 +471,11 @@ def _on_disconnected(self, conn_id):
self.logger.debug('_on_disconnected from {c} called'.format(c=conn_id))
self.got_disconnected = True

def send(self, data):
def send(self, data, headers=None):
"""
send a message to queue
"""
self.conn.send(destination=self.destination, body=data)
self.conn.send(destination=self.destination, body=data, headers=headers)
if self.verbose:
self.logger.debug('send to {dest} | {data}'.format(dest=self.destination, data=data))

Expand All @@ -490,6 +496,11 @@ def go(self):
self.got_disconnected = False
self.conn.set_listener(self.listener.__class__.__name__, self.listener)
self.conn.connect(**self.connect_params)
# add removers
with self.remover_lock:
for r_id in self.removers:
headers = self.removers[r_id]['headers']
self.conn.subscribe(destination=self.destination, headers=headers, id=r_id, ack='auto')
self.logger.info('connected to {0} {1}'.format(self.conn_id, self.destination))
else:
self.logger.info('connection to {0} {1} already exists. Skipped...'.format(
Expand All @@ -514,3 +525,40 @@ def restart(self):
self._get_connection()
self.go()
self.logger.info('the {0}th restart done'.format(self.n_restart))

def add_remover(self, headers, timeout):
"""
add a message remover relevant to the selector specified in the headers
:param headers: a dictionary to specify the selector
:param timeout: lifetime of the subscription
"""
self.logger.debug('adding remover with headers={}'.format(headers))
# unique id for each remover
r_id = self.sub_id + '.' + str(uuid.uuid4())
with self.remover_lock:
self.removers[r_id] = {'timeout': datetime.datetime.utcnow() + datetime.timedelta(seconds=timeout),
'headers': copy.copy(headers)}
# reconnect if necessary
if self.got_disconnected:
self.restart()
# subscribe to remove the messages
self.conn.subscribe(destination=self.destination, headers=headers, id=r_id, ack='auto')
self.logger.debug('added remover id={}'.format(r_id))

def purge_removers(self):
"""
purge old message removers
"""
self.logger.debug('purging old removers')
with self.remover_lock:
time_now = datetime.datetime.utcnow()
n_old = len(self.removers)
for r_id in list(self.removers):
timeout = self.removers[r_id]['timeout']
# unsubscribe if old
if timeout < time_now:
self.conn.unsubscribe(id=r_id)
del self.removers[r_id]
self.logger.debug('purged remover id={}'.format(r_id))
n_new = len(self.removers)
self.logger.debug('purged {} removers in total among {} removers'.format(n_old-n_new, n_old))

0 comments on commit 05123c2

Please sign in to comment.