diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index 8f9511a..9c8a7ff 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.0.29" +release_version = "0.0.30" diff --git a/README.txt b/README.txt index 8f7c642..c5c3063 100644 --- a/README.txt +++ b/README.txt @@ -7,6 +7,9 @@ Includes all libraries used by both server and monitor (and others). Release Note ------------ +* 0.0.30 (1/7/2022) + * added message removal functions to MBSenderProxy + * 0.0.29 (27/6/2022) * env vars in msg config json diff --git a/pandacommon/pandamsgbkr/msg_bkr_utils.py b/pandacommon/pandamsgbkr/msg_bkr_utils.py index fd3ec51..feedead 100644 --- a/pandacommon/pandamsgbkr/msg_bkr_utils.py +++ b/pandacommon/pandamsgbkr/msg_bkr_utils.py @@ -1,7 +1,9 @@ +import datetime import threading import socket import ssl import random +import uuid import collections import time import copy @@ -445,6 +447,10 @@ def __init__(self, name, host_port_list, destination, use_ssl=False, cert_file=N self.to_disconnect = False # whether to log verbosely self.verbose = verbose + # instance lock for removers + self.remover_lock = threading.Lock() + # removers + self.removers = {} # get connection self._get_connection() @@ -465,11 +471,11 @@ def _on_disconnected(self, conn_id): self.logger.debug('_on_disconnected from {c} called'.format(c=conn_id)) self.got_disconnected = True - def send(self, data): + def send(self, data, headers=None): """ send a message to queue """ - self.conn.send(destination=self.destination, body=data) + self.conn.send(destination=self.destination, body=data, headers=headers) if self.verbose: self.logger.debug('send to {dest} | {data}'.format(dest=self.destination, data=data)) @@ -490,6 +496,11 @@ def go(self): self.got_disconnected = False self.conn.set_listener(self.listener.__class__.__name__, self.listener) self.conn.connect(**self.connect_params) + # add removers + with self.remover_lock: + for r_id in self.removers: + headers = self.removers[r_id]['headers'] + self.conn.subscribe(destination=self.destination, headers=headers, id=r_id, ack='auto') self.logger.info('connected to {0} {1}'.format(self.conn_id, self.destination)) else: self.logger.info('connection to {0} {1} already exists. Skipped...'.format( @@ -514,3 +525,40 @@ def restart(self): self._get_connection() self.go() self.logger.info('the {0}th restart done'.format(self.n_restart)) + + def add_remover(self, headers, timeout): + """ + add a message remover relevant to the selector specified in the headers + :param headers: a dictionary to specify the selector + :param timeout: lifetime of the subscription + """ + self.logger.debug('adding remover with headers={}'.format(headers)) + # unique id for each remover + r_id = self.sub_id + '.' + str(uuid.uuid4()) + with self.remover_lock: + self.removers[r_id] = {'timeout': datetime.datetime.utcnow() + datetime.timedelta(seconds=timeout), + 'headers': copy.copy(headers)} + # reconnect if necessary + if self.got_disconnected: + self.restart() + # subscribe to remove the messages + self.conn.subscribe(destination=self.destination, headers=headers, id=r_id, ack='auto') + self.logger.debug('added remover id={}'.format(r_id)) + + def purge_removers(self): + """ + purge old message removers + """ + self.logger.debug('purging old removers') + with self.remover_lock: + time_now = datetime.datetime.utcnow() + n_old = len(self.removers) + for r_id in list(self.removers): + timeout = self.removers[r_id]['timeout'] + # unsubscribe if old + if timeout < time_now: + self.conn.unsubscribe(id=r_id) + del self.removers[r_id] + self.logger.debug('purged remover id={}'.format(r_id)) + n_new = len(self.removers) + self.logger.debug('purged {} removers in total among {} removers'.format(n_old-n_new, n_old))