diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index 3d4d1d0..8e22d42 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.0.24" +release_version = "0.0.25" diff --git a/pandacommon/pandamsgbkr/msg_bkr_utils.py b/pandacommon/pandamsgbkr/msg_bkr_utils.py index 5eb54fc..fd3ec51 100644 --- a/pandacommon/pandamsgbkr/msg_bkr_utils.py +++ b/pandacommon/pandamsgbkr/msg_bkr_utils.py @@ -33,10 +33,6 @@ def _get_connection_dict(host_port_list, use_ssl=False, cert_file=None, key_file get dict {conn_id: connection} """ tmp_logger = logger_utils.make_logger(base_logger, method_name='_get_connection_dict') - ssl_opts = {'use_ssl' : use_ssl, - 'ssl_version' : ssl.PROTOCOL_TLSv1, - 'ssl_cert_file' : cert_file, - 'ssl_key_file' : key_file} conn_dict = dict() # resolve all distinct hosts behind hostname resolved_host_port_set = set() @@ -52,7 +48,24 @@ def _get_connection_dict(host_port_list, use_ssl=False, cert_file=None, key_file host_port = '{0}:{1}'.format(host, port) conn_id = host_port if conn_id not in conn_dict: - conn = stomp.Connection12(host_and_ports = [(host, port)], vhost=vhost, **ssl_opts) + try: + conn = stomp.Connection12(host_and_ports=[(host, port)], vhost=vhost) + if use_ssl: + ssl_opts = { + 'ssl_version' : ssl.PROTOCOL_TLSv1, + 'cert_file' : cert_file, + 'key_file' : key_file + } + conn.set_ssl(for_hosts=[(host, port)], **ssl_opts) + except AttributeError: + # Older version of stomp.py + ssl_opts = { + 'use_ssl' : use_ssl, + 'ssl_version' : ssl.PROTOCOL_TLSv1, + 'ssl_cert_file' : cert_file, + 'ssl_key_file' : key_file + } + conn = stomp.Connection12(host_and_ports=[(host, port)], vhost=vhost, **ssl_opts) conn_dict[conn_id] = conn tmp_logger.debug('got {0} connections to {1}'.format(len(conn_dict), ' , '.join(conn_dict.keys()))) return conn_dict @@ -169,39 +182,55 @@ def __init__(self, mb_proxy, conn_id, *args, **kwargs): # whether log verbosely self.verbose = kwargs.get('verbose', False) - def on_error(self, headers, message): - self.logger.error('on_error start: {h} "{m}"'.format(h=headers, m=message)) - self.logger.error('on_error done: {h}'.format(h=headers)) + def _parse_args(self, args): + """ + Parse the args for different versions of stomp.py + return (cmd, headers, body) + """ + if len(args) == 1: + # [frame] : in newer version + frame = args[0] + return (frame.cmd, frame.headers, frame.body) + elif len(args) == 2: + # [headers, message] : in older version + headers, message = args + return (None, headers, message) + + def on_error(self, *args): + cmd, headers, body = self._parse_args(args) + self.logger.error('on_error : {h} | {b}'.format(h=headers, b=body)) def on_disconnected(self): self.logger.info('on_disconnected start') self.mb_proxy._on_disconnected(conn_id=self.conn_id) self.logger.info('on_disconnected done') - def on_send(self, frame): - obscured_headers = frame.headers - if 'passcode' in frame.headers: - obscured_headers = copy.deepcopy(frame.headers) + def on_send(self, *args): + cmd, headers, body = self._parse_args(args) + obscured_headers = headers + if 'passcode' in headers: + obscured_headers = copy.deepcopy(headers) obscured_headers['passcode'] = '********' if self.verbose: - self.logger.debug('on_send frame: {0} {1} "{2}"'.format(frame.cmd, obscured_headers, frame.body)) + self.logger.debug('on_send frame: {0} {1} | {2}'.format(cmd, obscured_headers, body)) - def on_message(self, headers, message): + def on_message(self, *args): + cmd, headers, body = self._parse_args(args) if self.verbose: - self.logger.debug('on_message start: {h} "{m}"'.format(h=headers, m=message)) - self.mb_proxy._on_message(headers, message, conn_id=self.conn_id) + self.logger.debug('on_message start: {h} | {b}'.format(h=headers, b=body)) + self.mb_proxy._on_message(headers, body, conn_id=self.conn_id) if self.verbose: self.logger.debug('on_message done: {h}'.format(h=headers)) # message broker proxy for receiver -class MBProxy(object): +class MBListenerProxy(object): def __init__(self, name, host_port_list, destination, use_ssl=False, cert_file=None, key_file=None, vhost=None, username=None, passcode=None, wait=True, ack_mode='client-individual', skip_buffer=False, conn_mode='all', - verbose=False): + prefetch_size=None, verbose=False, **kwargs): # logger - self.logger = logger_utils.make_logger(base_logger, token=name, method_name='MBProxy') + self.logger = logger_utils.make_logger(base_logger, token=name, method_name='MBListenerProxy') # name of message queue self.name = name # connection parameters @@ -213,7 +242,7 @@ def __init__(self, name, host_port_list, destination, use_ssl=False, cert_file=N # destination queue to subscribe self.destination = destination # subscription ID - self.sub_id = 'panda-MBProxy_{0}_r{1:06}'.format(socket.getfqdn(), random.randrange(10**6)) + self.sub_id = 'panda-MBListenerProxy_{0}_r{1:06}'.format(socket.getfqdn(), random.randrange(10**6)) # client ID self.client_id = 'client_{0}_{1}'.format(self.sub_id, hex(id(self))) # connect parameters @@ -241,6 +270,10 @@ def __init__(self, name, host_port_list, destination, use_ssl=False, cert_file=N self.to_disconnect = False # whether to log verbosely self.verbose = verbose + # prefetch count of the MB (max number of un-acknowledge messages allowed) + self.prefetch_size = prefetch_size + # evaluate subscription headers + self._evaluate_subscription_headers() # get connections self._get_connections() @@ -264,6 +297,14 @@ def _get_connections(self): self.logger.debug('got connection about {0}'.format(conn_id)) self.logger.debug('done') + def _evaluate_subscription_headers(self): + self.subscription_headers = {} + if self.prefetch_size is not None: + self.subscription_headers.update({ + 'activemq.prefetchSize': self.prefetch_size, # for ActiveMQ + 'prefetch-count': self.prefetch_size, # for RabbitMQ + }) + def _begin(self, conn_id): conn = self.connection_dict[conn_id] txs_id = conn.begin() @@ -295,8 +336,8 @@ def _nack(self, conn_id, msg_id, ack_id): conn.nack(ack_id) self.logger.warning('{conid} {mid} {ackid} NACK'.format(conid=conn_id, mid=msg_id, ackid=ack_id)) - def _on_message(self, headers, message, conn_id): - msg_obj = MsgObj(mb_proxy=self, conn_id=conn_id, msg_id=headers['message-id'], ack_id=headers['ack'], data=message) + def _on_message(self, headers, body, conn_id): + msg_obj = MsgObj(mb_proxy=self, conn_id=conn_id, msg_id=headers['message-id'], ack_id=headers['ack'], data=body) if self.verbose: self.logger.debug('_on_message from {c} made message object: {h}'.format(c=conn_id, h=headers)) if self.skip_buffer: @@ -323,7 +364,8 @@ def go(self): self.got_disconnected = False conn.set_listener(listener.__class__.__name__, listener) conn.connect(**self.connect_params) - conn.subscribe(destination=self.destination, id=self.sub_id, ack='client-individual') + conn.subscribe(destination=self.destination, id=self.sub_id, ack='client-individual', + headers=self.subscription_headers) self.logger.info('connected to {0} {1}'.format(conn_id, self.destination)) else: self.logger.info('connection to {0} {1} already exists. Skipped...'.format( @@ -352,12 +394,30 @@ def restart(self): self.go() self.logger.info('the {0}th restart ended'.format(self.n_restart)) + def get_messages(self, limit=100): + """ + get some messages capped by limit from local buffer + return list of message objects + """ + if self.verbose: + self.logger.debug('get_messages called') + # get messages from local buffer + msg_list = [] + for j in range(limit): + msg_obj = self.msg_buffer.get() + if msg_obj is None: + break + msg_list.append(msg_obj) + if self.verbose: + self.logger.debug('got {n} messages'.format(n=len(msg_list))) + return msg_list + # message broker proxy for sender, waster... class MBSenderProxy(object): def __init__(self, name, host_port_list, destination, use_ssl=False, cert_file=None, key_file=None, vhost=None, - username=None, passcode=None, wait=True, verbose=False): + username=None, passcode=None, wait=True, verbose=False, **kwargs): # logger self.logger = logger_utils.make_logger(base_logger, token=name, method_name='MBSenderProxy') # name of message queue @@ -393,16 +453,16 @@ def _get_connection(self): get a connection and a listener """ conn_dict = _get_connection_dict(self.host_port_list, self.use_ssl, self.cert_file, self.key_file, self.vhost) - self.conn_id, self.conn = random.choice([conn_dict.items()]) - self.listener = MsgListener(mb_proxy=self, conn_id=self.conn, verbose=self.verbose) + self.conn_id, self.conn = random.choice(list(conn_dict.items())) + self.listener = MsgListener(mb_proxy=self, conn_id=self.conn_id, verbose=self.verbose) self.logger.debug('got connection about {0}'.format(self.conn_id)) - def _on_message(self, headers, message): + def _on_message(self, headers, body, conn_id): if self.verbose: - self.logger.debug('_on_message drop message: {h} "{m}"'.format(h=headers, m=message)) + self.logger.debug('_on_message from {c} drop message: {h} | {b}'.format(c=conn_id, h=headers, b=body)) - def _on_disconnected(self): - self.logger.debug('_on_disconnected called') + def _on_disconnected(self, conn_id): + self.logger.debug('_on_disconnected from {c} called'.format(c=conn_id)) self.got_disconnected = True def send(self, data): @@ -411,7 +471,7 @@ def send(self, data): """ self.conn.send(destination=self.destination, body=data) if self.verbose: - self.logger.debug('send to {dest} "{data}"'.format(dest=self.destination, data=data)) + self.logger.debug('send to {dest} | {data}'.format(dest=self.destination, data=data)) def waste(self, duration=3): """ diff --git a/pandacommon/pandamsgbkr/msg_processor.py b/pandacommon/pandamsgbkr/msg_processor.py index f274efb..d343a54 100644 --- a/pandacommon/pandamsgbkr/msg_processor.py +++ b/pandacommon/pandamsgbkr/msg_processor.py @@ -4,7 +4,7 @@ import json import logging -from .msg_bkr_utils import MsgBuffer, MBProxy, MBSenderProxy +from .msg_bkr_utils import MsgBuffer, MBListenerProxy, MBSenderProxy from pandacommon.pandautils.thread_utils import GenericThread from pandacommon.pandautils.plugin_factory import PluginFactory from pandacommon.pandalogger import logger_utils @@ -13,6 +13,33 @@ base_logger = logger_utils.setup_logger('msg_processor') +# get mb proxy instance +def get_mb_proxy(name, sconf, qconf, mode='listener', **kwargs): + """ + get MBListenerProxy or MBSenderProxy instance according to config dict + """ + # class of mb proxy + the_class = MBListenerProxy + if mode == 'sender': + the_class = MBSenderProxy + # instantiate + mb_proxy = the_class( + name=name, + host_port_list=sconf['host_port_list'], + destination=qconf['destination'], + use_ssl=sconf.get('use_ssl', False), + cert_file=sconf.get('cert_file'), + key_file=sconf.get('key_file'), + username=sconf.get('username'), + passcode=sconf.get('passcode'), + vhost=sconf.get('vhost'), + wait=True, + verbose=sconf.get('verbose', False), + **kwargs + ) + return mb_proxy + + # simple message processor plugin Base class SimpleMsgProcPluginBase(object): """ @@ -156,21 +183,24 @@ def __init__(self, config_file, process_sleep_time=0.0001, **kwargs): self.__to_run = True self.config_file = config_file self.process_sleep_time = process_sleep_time - self.init_mb_proxy_list = [] + self.init_mb_listener_proxy_list = [] self.init_mb_sender_proxy_list = [] self.init_processor_list = [] self.processor_attr_map = dict() self.processor_thread_map = dict() self.guard_period = 300 self._last_guard_timestamp = 0 + self.prefetch_count = None # log tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='__init__') + # parse config + self._parse_config() # done tmp_logger.info('done') - def _set_from_config(self): + def _parse_config(self): """ - parse message processor configuration json file and set attributes accordingly + parse message processor configuration json file Typical example dict from config json: mb_servers_dict = { 'Server_1': { @@ -187,6 +217,7 @@ def _set_from_config(self): } queues_dict = { 'Queue_1': { + 'enable': True, 'server': 'Server_1', 'destination': '/queue/some_queue', }, @@ -205,20 +236,32 @@ def _set_from_config(self): } """ # logger - tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='_set_from_config') + tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='_parse_config') tmp_logger.debug('start') # parse config json with open(self.config_file, 'r') as _f: raw_dict = json.load(_f) - mb_servers_dict = raw_dict['mb_servers'] - queues_dict = raw_dict['queues'] - processors_dict = raw_dict['processors'] + self._mb_servers_dict = raw_dict['mb_servers'] + self._queues_dict = raw_dict['queues'] + self._processors_dict = raw_dict.get('processors', {}) + # set self optional attributes + if raw_dict.get('guard_period') is not None: + self.guard_period = raw_dict['guard_period'] + tmp_logger.debug('done') + + def _setup_instances(self): + """ + set up attributes and MBListenerProxy/plugin instances accordingly + """ + # logger + tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='_setup_instances') + tmp_logger.debug('start') # processor thread attribute dict processor_attr_map = dict() # inward/outward queues and plugin instances in_q_set = set() out_q_set = set() - for proc, pconf in processors_dict.items(): + for proc, pconf in self._processors_dict.items(): # skip if not enabled if not pconf.get('enable', True): continue @@ -237,97 +280,75 @@ def _set_from_config(self): processor_attr_map[proc]['in_queue'] = in_queue processor_attr_map[proc]['out_queue'] = out_queue processor_attr_map[proc]['plugin'] = plugin - - # mb_proxy instances - mb_proxy_dict = dict() + # mb_listener_proxy instances + mb_listener_proxy_dict = dict() for in_queue in in_q_set: - qconf = queues_dict[in_queue] - sconf = mb_servers_dict[qconf['server']] - mb_proxy = MBProxy(name=in_queue, - host_port_list=sconf['host_port_list'], - destination=qconf['destination'], - use_ssl=sconf.get('use_ssl', False), - cert_file=sconf.get('cert_file'), - key_file=sconf.get('key_file'), - username=sconf.get('username'), - passcode=sconf.get('passcode'), - vhost=sconf.get('vhost'), - wait=True, - verbose=sconf.get('verbose', False), - ) - mb_proxy_dict[in_queue] = mb_proxy + qconf = self._queues_dict[in_queue] + if not qconf.get('enable', True): + continue + sconf = self._mb_servers_dict[qconf['server']] + mb_listener_proxy = get_mb_proxy(name=in_queue, sconf=sconf, qconf=qconf, mode='listener') + mb_listener_proxy_dict[in_queue] = mb_listener_proxy # mb_sender_proxy instances mb_sender_proxy_dict = dict() for out_queue in out_q_set: - qconf = queues_dict[out_queue] - sconf = mb_servers_dict[qconf['server']] - mb_sender_proxy = MBSenderProxy(name=out_queue, - host_port_list=sconf['host_port_list'], - destination=qconf['destination'], - use_ssl=sconf.get('use_ssl', False), - cert_file=sconf.get('cert_file'), - key_file=sconf.get('key_file'), - username=sconf.get('username'), - passcode=sconf.get('passcode'), - vhost=sconf.get('vhost'), - wait=True, - verbose=sconf.get('verbose', False), - ) + qconf = self._queues_dict[out_queue] + if not qconf.get('enable', True): + continue + sconf = self._mb_servers_dict[qconf['server']] + mb_sender_proxy = get_mb_proxy(name=out_queue, sconf=sconf, qconf=qconf, mode='sender') mb_sender_proxy_dict[out_queue] = mb_sender_proxy # keep filling in thread attribute dict for proc in processor_attr_map.keys(): in_queue = processor_attr_map[proc]['in_queue'] if in_queue: - processor_attr_map[proc]['mb_proxy'] = mb_proxy_dict[in_queue] + processor_attr_map[proc]['mb_listener_proxy'] = mb_listener_proxy_dict[in_queue] out_queue = processor_attr_map[proc]['out_queue'] if out_queue: processor_attr_map[proc]['mb_sender_proxy'] = mb_sender_proxy_dict[out_queue] # set self attributes self.init_processor_list = list(processor_attr_map.keys()) - self.init_mb_proxy_list = list(mb_proxy_dict.values()) + self.init_mb_listener_proxy_list = list(mb_listener_proxy_dict.values()) self.init_mb_sender_proxy_list = list(mb_sender_proxy_dict.values()) self.processor_attr_map = dict(processor_attr_map) - # set self optional attributes - if raw_dict.get('guard_period') is not None: - self.guard_period = raw_dict['guard_period'] # tear down - del in_q_set, out_q_set, mb_proxy_dict, mb_sender_proxy_dict, processor_attr_map + del in_q_set, out_q_set, mb_listener_proxy_dict, mb_sender_proxy_dict, processor_attr_map tmp_logger.debug('done') - def _spawn_listeners(self, mb_proxy_list): + def _spawn_listeners(self, mb_listener_proxy_list): """ - spawn connection/listener threads of certain message broker proxy + spawn connection/listener threads of certain message broker listener proxy """ tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='_spawn_listeners') tmp_logger.debug('start') - for mb_proxy in mb_proxy_list: + for mb_proxy in mb_listener_proxy_list: mb_proxy.go() tmp_logger.info('spawned listener {0}'.format(mb_proxy.name)) tmp_logger.debug('done') - def _guard_listeners(self, mb_proxy_list): + def _guard_listeners(self, mb_listener_proxy_list): """ - guard connection/listener threads of certain message broker proxy, reconnect when disconnected + guard connection/listener threads of certain message broker listener proxy, reconnect when disconnected """ tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='_guard_listeners') tmp_logger.debug('start') - for mb_proxy in mb_proxy_list: + for mb_proxy in mb_listener_proxy_list: if mb_proxy.got_disconnected and not mb_proxy.to_disconnect: - tmp_logger.debug('found listner {0} disconnected unexpectedly; trigger restart...'.format(mb_proxy.name)) + tmp_logger.debug('found listener {0} disconnected unexpectedly; trigger restart...'.format(mb_proxy.name)) mb_proxy.restart() if mb_proxy.n_restart > 10: - tmp_logger.warning('found listner {0} keep getting disconnected; already restarted {1} times'.format( + tmp_logger.warning('found listener {0} keep getting disconnected; already restarted {1} times'.format( mb_proxy.name, mb_proxy.n_restart)) tmp_logger.info('restarted listener {0}'.format(mb_proxy.name)) tmp_logger.debug('done') - def _kill_listeners(self, mb_proxy_list): + def _kill_listeners(self, mb_listener_proxy_list): """ - kill connection/listener threads of certain message broker proxy + kill connection/listener threads of certain message broker listener proxy """ tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='_kill_listeners') tmp_logger.debug('start') - for mb_proxy in mb_proxy_list: + for mb_proxy in mb_listener_proxy_list: mb_proxy.stop() tmp_logger.info('signaled stop to listener {0}'.format(mb_proxy.name)) tmp_logger.debug('done') @@ -351,10 +372,10 @@ def _guard_senders(self, mb_sender_proxy_list): tmp_logger.debug('start') for mb_proxy in mb_sender_proxy_list: if mb_proxy.got_disconnected and not mb_proxy.to_disconnect: - tmp_logger.debug('found listner {0} disconnected unexpectedly; trigger restart...'.format(mb_proxy.name)) + tmp_logger.debug('found listener {0} disconnected unexpectedly; trigger restart...'.format(mb_proxy.name)) mb_proxy.restart() if mb_proxy.n_restart > 10: - tmp_logger.warning('found listner {0} keep getting disconnected; already restarted {1} times'.format( + tmp_logger.warning('found listener {0} keep getting disconnected; already restarted {1} times'.format( mb_proxy.name, mb_proxy.n_restart)) tmp_logger.info('restarted listener {0}'.format(mb_proxy.name)) tmp_logger.debug('done') @@ -445,12 +466,12 @@ def run(self): """ tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='run') tmp_logger.debug('start') - # set attributes from config - self._set_from_config() + # set up instances from config + self._setup_instances() # initialize self.initialize() - # spawn all message broker proxy threads - self._spawn_listeners(self.init_mb_proxy_list) + # spawn all message broker listener proxy threads + self._spawn_listeners(self.init_mb_listener_proxy_list) # spawn all message broker sender proxy threads self._spawn_senders(self.init_mb_sender_proxy_list) # spawn all processor threads according to config @@ -460,17 +481,67 @@ def run(self): while self.__to_run: # guard listeners and senders if time.time() >= self._last_guard_timestamp + self.guard_period: - self._guard_listeners(self.init_mb_proxy_list) + self._guard_listeners(self.init_mb_listener_proxy_list) self._guard_senders(self.init_mb_sender_proxy_list) self._last_guard_timestamp = time.time() # sleep time.sleep(0.01) # tear down tmp_logger.debug('tearing down') - # kill all message broker proxy threads - self._kill_listeners(self.init_mb_proxy_list) + # kill all message broker listener proxy threads + self._kill_listeners(self.init_mb_listener_proxy_list) # kill all message broker sender proxy threads self._kill_senders(self.init_mb_sender_proxy_list) # kill all processor threads according to config self._kill_processors(self.init_processor_list) tmp_logger.debug('done') + + def start_passive_mode(self, in_q_list=None, out_q_list=None, prefetch_size=100): + """ + start passive mode: only spwan mb proxies (without spawning agent and plugin threads) + in_q_list: list of inward queue name + out_q_list: list of outward queue name + prefetch_size: prefetch size of the message broker (can control number of un-acknowledged messages stored in the local buffer) + returns dict of mb proxies + """ + tmp_logger = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name='start_passive_mode') + tmp_logger.debug('start') + # initialize + # self.initialize() + all_queue_names = list(self._queues_dict.keys()) + if in_q_list is None: + in_q_list = all_queue_names + if out_q_list is None: + out_q_list = all_queue_names + # mb_listener_proxy instances + mb_listener_proxy_dict = dict() + for in_queue in in_q_list: + qconf = self._queues_dict[in_queue] + if not qconf.get('enable', True): + continue + sconf = self._mb_servers_dict[qconf['server']] + mb_listener_proxy = get_mb_proxy(name=in_queue, sconf=sconf, qconf=qconf, mode='listener', prefetch_size=prefetch_size) + mb_listener_proxy_dict[in_queue] = mb_listener_proxy + # mb_sender_proxy instances + mb_sender_proxy_dict = dict() + for out_queue in out_q_list: + qconf = self._queues_dict[out_queue] + if not qconf.get('enable', True): + continue + sconf = self._mb_servers_dict[qconf['server']] + mb_sender_proxy = get_mb_proxy(name=out_queue, sconf=sconf, qconf=qconf, mode='sender') + mb_sender_proxy_dict[out_queue] = mb_sender_proxy + # spawn message broker listener proxy connections + for queue_name, mb_proxy in mb_listener_proxy_dict.items(): + mb_proxy.go() + tmp_logger.debug('spawned listener proxy for {0}'.format(queue_name)) + # spawn message broker sender proxy connections + for queue_name, mb_proxy in mb_sender_proxy_dict.items(): + mb_proxy.go() + tmp_logger.debug('spawned sender proxy for {0}'.format(queue_name)) + tmp_logger.debug('done') + # return + return { + 'in': mb_listener_proxy_dict, + 'out': mb_sender_proxy_dict, + } diff --git a/pandacommon/test/mb_test.py b/pandacommon/test/mb_test.py index d3d5d02..1a3a232 100644 --- a/pandacommon/test/mb_test.py +++ b/pandacommon/test/mb_test.py @@ -159,8 +159,8 @@ def main(): # extra receiver sys.stderr.write('Start extra receiver ...') sys.stderr.flush() - receiver_3 = msg_bkr_utils.MBProxy(name='Q3', skip_buffer=True, **EXTRA_PROXY_INFO['Q3']) - receiver_4 = msg_bkr_utils.MBProxy(name='Q4', skip_buffer=True, **EXTRA_PROXY_INFO['Q4']) + receiver_3 = msg_bkr_utils.MBListenerProxy(name='Q3', skip_buffer=True, **EXTRA_PROXY_INFO['Q3']) + receiver_4 = msg_bkr_utils.MBListenerProxy(name='Q4', skip_buffer=True, **EXTRA_PROXY_INFO['Q4']) receiver_3.go() receiver_4.go() sys.stderr.write(' OK! \n') diff --git a/setup.py b/setup.py index f889411..9421db1 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,7 @@ def finalize_options (self): packages=find_packages(), install_requires=['configparser', 'pytz', - 'stomp.py>=4.1.23', + 'stomp.py >=4.1.23, <=7.0.0', 'requests', ], data_files=[