From 4783a409b735baba26bb51b3fdff624c092fc19f Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Thu, 28 Nov 2024 00:27:17 +0100 Subject: [PATCH 1/5] add callbacks to pulling end of pipe --- src/radical/utils/zmq/pipe.py | 73 ++++++++++++++++++++++++++------ tests/unittests/test_zmq_pipe.py | 43 ++++++++++++++----- 2 files changed, 93 insertions(+), 23 deletions(-) diff --git a/src/radical/utils/zmq/pipe.py b/src/radical/utils/zmq/pipe.py index 7c36dcf1..977ecc88 100644 --- a/src/radical/utils/zmq/pipe.py +++ b/src/radical/utils/zmq/pipe.py @@ -1,7 +1,9 @@ import zmq +import threading as mt from ..serialize import to_msgpack, from_msgpack +from ..logger import Logger from .utils import zmq_bind @@ -32,7 +34,7 @@ class Pipe(object): # -------------------------------------------------------------------------- # - def __init__(self, mode, url=None) -> None: + def __init__(self, mode, url=None, log=None) -> None: ''' Create a `Pipe` instance which can be used for either sending (`put()`) or receiving (`get()` / `get_nowait()`) data. according to the specified @@ -43,11 +45,14 @@ def __init__(self, mode, url=None) -> None: URL provided by the listening end (`Pipe.url`). ''' - self._context = zmq.Context.instance() - self._mode = mode - self._url = None - self._sock = None - self._poller = zmq.Poller() + self._context = zmq.Context.instance() + self._mode = mode + self._url = None + self._log = log + self._sock = None + self._poller = zmq.Poller() + self._cbs = list() + self._listener = None if mode == MODE_PUSH: self._connect_push(url) @@ -58,6 +63,10 @@ def __init__(self, mode, url=None) -> None: else: raise ValueError('unsupported pipe mode [%s]' % mode) + if not self._log: + + self._log = Logger('radical.utils.pipe') + # -------------------------------------------------------------------------- # @@ -97,12 +106,6 @@ def _connect_pull(self, url): if self._sock: raise RuntimeError('already connected at %s' % self._url) - if url: - bind = False - else: - bind = True - url = 'tcp://*:*' - self._sock = self._context.socket(zmq.PULL) if url: @@ -114,6 +117,49 @@ def _connect_pull(self, url): self._poller.register(self._sock, zmq.POLLIN) + # -------------------------------------------------------------------------- + # + def register_cb(self, cb): + ''' + Register a callback for incoming messages. The callback will be called + with the message as argument. + + Only a pipe in pull mode can have callbacks registered. Note that once + a callback is registered, the `get()` and `get_nowait()` methods must + not be used anymore. + ''' + + assert self._mode == MODE_PULL + + self._cbs.append(cb) + + if not self._listener: + self._listener = mt.Thread(target=self._listen) + self._listener.daemon = True + self._listener.start() + + + # -------------------------------------------------------------------------- + # + def _listen(self): + ''' + Listen for incoming messages, and call registered callbacks. + ''' + + while True: + + socks = dict(self._poller.poll(timeout=10)) + + if self._sock in socks: + msg = from_msgpack(self._sock.recv()) + + for cb in self._cbs: + try: + cb(msg) + except: + self._log.exception('callback failed') + + # -------------------------------------------------------------------------- # def put(self, msg): @@ -134,6 +180,8 @@ def get(self): ''' assert self._mode == MODE_PULL + assert not self._cbs + return from_msgpack(self._sock.recv()) @@ -147,6 +195,7 @@ def get_nowait(self, timeout: float = 0): ''' assert self._mode == MODE_PULL + assert not self._cbs # zmq timeouts are in milliseconds socks = dict(self._poller.poll(timeout=int(timeout * 1000))) diff --git a/tests/unittests/test_zmq_pipe.py b/tests/unittests/test_zmq_pipe.py index a4f18520..29cb8c1e 100755 --- a/tests/unittests/test_zmq_pipe.py +++ b/tests/unittests/test_zmq_pipe.py @@ -14,14 +14,10 @@ def test_zmq_pipe(): pipe_1 = ru.zmq.Pipe(ru.zmq.MODE_PUSH) + pipe_2 = ru.zmq.Pipe(ru.zmq.MODE_PULL, pipe_1.url) + pipe_3 = ru.zmq.Pipe(ru.zmq.MODE_PULL, pipe_1.url) - url = pipe_1.url - - pipe_2 = ru.zmq.Pipe(ru.zmq.MODE_PULL, url) - pipe_3 = ru.zmq.Pipe(ru.zmq.MODE_PULL, url) - - # let ZMQ settle - time.sleep(0.1) + time.sleep(0.01) for i in range(1000): pipe_1.put('foo %d' % i) @@ -34,24 +30,49 @@ def test_zmq_pipe(): result_3.append(pipe_3.get()) for i in range(100): - result_2.append(pipe_2.get_nowait(timeout=1.0)) - result_3.append(pipe_3.get_nowait(timeout=1.0)) + result_2.append(pipe_2.get_nowait(timeout=0.01)) + result_3.append(pipe_3.get_nowait(timeout=0.01)) assert len(result_2) == 500 assert len(result_3) == 500 - test_2 = result_2.append(pipe_2.get_nowait(timeout=1.0)) - test_3 = result_3.append(pipe_3.get_nowait(timeout=1.0)) + test_2 = result_2.append(pipe_2.get_nowait(timeout=0.01)) + test_3 = result_3.append(pipe_3.get_nowait(timeout=0.01)) assert test_2 is None assert test_3 is None +# ------------------------------------------------------------------------------ +# +def test_zmq_pipe_cb(): + + pipe_1 = ru.zmq.Pipe(ru.zmq.MODE_PUSH) + pipe_2 = ru.zmq.Pipe(ru.zmq.MODE_PULL, pipe_1.url) + results = list() + + time.sleep(0.01) + + def cb(msg): + results.append(msg) + + pipe_2.register_cb(cb) + + n = 1000 + for i in range(n): + pipe_1.put('foo %d' % i) + + time.sleep(0.01) + + assert len(results) == n, results + + # ------------------------------------------------------------------------------ # run tests if called directly if __name__ == '__main__': test_zmq_pipe() + test_zmq_pipe_cb() # ------------------------------------------------------------------------------ From dd49905a15e9af2113230633ed1fbaec6601dff8 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 3 Dec 2024 17:21:21 +0100 Subject: [PATCH 2/5] term --- src/radical/utils/zmq/pipe.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/radical/utils/zmq/pipe.py b/src/radical/utils/zmq/pipe.py index 977ecc88..c6bc6188 100644 --- a/src/radical/utils/zmq/pipe.py +++ b/src/radical/utils/zmq/pipe.py @@ -53,6 +53,7 @@ def __init__(self, mode, url=None, log=None) -> None: self._poller = zmq.Poller() self._cbs = list() self._listener = None + self._term = mt.Event() if mode == MODE_PUSH: self._connect_push(url) @@ -146,7 +147,7 @@ def _listen(self): Listen for incoming messages, and call registered callbacks. ''' - while True: + while not self._term.is_set(): socks = dict(self._poller.poll(timeout=10)) @@ -204,5 +205,12 @@ def get_nowait(self, timeout: float = 0): return from_msgpack(self._sock.recv()) + # -------------------------------------------------------------------------- + # + def stop(self): + + self._term.set() + + # ------------------------------------------------------------------------------ From 5623da8d3b211da4b3ea8699c27727f7db1fddd1 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 3 Dec 2024 23:44:34 +0100 Subject: [PATCH 3/5] snap --- src/radical/utils/zmq/pipe.py | 111 ++++++++++++++++++++------------ src/radical/utils/zmq/pubsub.py | 3 +- src/radical/utils/zmq/queue.py | 5 +- src/radical/utils/zmq/utils.py | 2 +- 4 files changed, 74 insertions(+), 47 deletions(-) diff --git a/src/radical/utils/zmq/pipe.py b/src/radical/utils/zmq/pipe.py index c6bc6188..31f4e2cf 100644 --- a/src/radical/utils/zmq/pipe.py +++ b/src/radical/utils/zmq/pipe.py @@ -45,15 +45,15 @@ def __init__(self, mode, url=None, log=None) -> None: URL provided by the listening end (`Pipe.url`). ''' - self._context = zmq.Context.instance() - self._mode = mode - self._url = None - self._log = log - self._sock = None - self._poller = zmq.Poller() - self._cbs = list() - self._listener = None - self._term = mt.Event() + self._context = zmq.Context.instance() + self._mode = mode + self._url = None + self._log = log + self._sock = None + self._poller = zmq.Poller() + self._cbs = list() + self._thread = None + self._term = mt.Event() if mode == MODE_PUSH: self._connect_push(url) @@ -120,29 +120,39 @@ def _connect_pull(self, url): # -------------------------------------------------------------------------- # - def register_cb(self, cb): + def get(self): + ''' + Receive a message. This call blocks until a message is available. ''' - Register a callback for incoming messages. The callback will be called - with the message as argument. - Only a pipe in pull mode can have callbacks registered. Note that once - a callback is registered, the `get()` and `get_nowait()` methods must - not be used anymore. + assert self._mode == MODE_PULL + assert not self._cbs + + return from_msgpack(self._sock.recv()) + + + # -------------------------------------------------------------------------- + # + def get_nowait(self, timeout: float = 0): + ''' + Receive a message. This call blocks for `timeout` seconds + until a message is available. If no message is available after timeout, + `None` is returned. ''' assert self._mode == MODE_PULL + assert not self._cbs - self._cbs.append(cb) + # zmq timeouts are in milliseconds + socks = dict(self._poller.poll(timeout=int(timeout * 1000))) - if not self._listener: - self._listener = mt.Thread(target=self._listen) - self._listener.daemon = True - self._listener.start() + if self._sock in socks: + return from_msgpack(self._sock.recv()) # -------------------------------------------------------------------------- # - def _listen(self): + def _listener(self): ''' Listen for incoming messages, and call registered callbacks. ''' @@ -163,53 +173,72 @@ def _listen(self): # -------------------------------------------------------------------------- # - def put(self, msg): + def register_cb(self, cb): ''' - Send a message - if receiving endpoints are connected, exactly one of - them will be able to receive that message. + Register a callback for incoming messages. The callback will be called + with the message as argument. + + Only a pipe in pull mode can have callbacks registered. Note that once + a callback is registered, the `get()` and `get_nowait()` methods must + not be used anymore. ''' - assert self._mode == MODE_PUSH - self._sock.send(to_msgpack(msg)) + assert self._mode == MODE_PULL + + self._cbs.append(cb) + + if not self._thread: + self._thread = mt.Thread(target=self._listener) + self._thread.daemon = True + self._thread.start() # -------------------------------------------------------------------------- # - def get(self): + def unregister_cb(self, cb): ''' - Receive a message. This call blocks until a message is available. + Unregister a callback. If no callback remains registered, the listener + thread will be stopped. ''' assert self._mode == MODE_PULL - assert not self._cbs + assert cb in self._cbs - return from_msgpack(self._sock.recv()) + self._cbs.remove(cb) + + if not self._cbs: + self._stop_listener() # -------------------------------------------------------------------------- # - def get_nowait(self, timeout: float = 0): + def put(self, msg): ''' - Receive a message. This call blocks for `timeout` seconds - until a message is available. If no message is available after timeout, - `None` is returned. + Send a message - if receiving endpoints are connected, exactly one of + them will be able to receive that message. ''' - assert self._mode == MODE_PULL - assert not self._cbs + assert self._mode == MODE_PUSH + self._sock.send(to_msgpack(msg)) - # zmq timeouts are in milliseconds - socks = dict(self._poller.poll(timeout=int(timeout * 1000))) - if self._sock in socks: - return from_msgpack(self._sock.recv()) + # -------------------------------------------------------------------------- + # + def _stop_listener(self): + + if self._thread: + self._term.set() + self._thread.join() + self._term.clear() + self._thread = None # -------------------------------------------------------------------------- # def stop(self): - self._term.set() + self._stop_listener() + # ------------------------------------------------------------------------------ diff --git a/src/radical/utils/zmq/pubsub.py b/src/radical/utils/zmq/pubsub.py index 9faf0157..9ec45ea6 100644 --- a/src/radical/utils/zmq/pubsub.py +++ b/src/radical/utils/zmq/pubsub.py @@ -10,9 +10,7 @@ from ..atfork import atfork from ..config import Config from ..ids import generate_id, ID_CUSTOM -from ..url import Url from ..misc import as_string, as_bytes, as_list, noop -from ..host import get_hostip from ..logger import Logger from ..profile import Profiler from ..serialize import to_msgpack, from_msgpack @@ -410,6 +408,7 @@ def _start_listener(self): args=[self._sock, lock, term, callbacks, self._log, self._prof]) t.daemon = True + print('=== create pubsub listener %s' % t) t.start() self._thread = t diff --git a/src/radical/utils/zmq/queue.py b/src/radical/utils/zmq/queue.py index d255f8f0..8630c789 100644 --- a/src/radical/utils/zmq/queue.py +++ b/src/radical/utils/zmq/queue.py @@ -11,9 +11,7 @@ from ..atfork import atfork from ..config import Config from ..ids import generate_id, ID_CUSTOM -from ..url import Url -from ..misc import as_string, as_bytes, as_list, noop, find_port -from ..host import get_hostip +from ..misc import as_string, as_bytes, as_list, noop from ..logger import Logger from ..profile import Profiler from ..debug import print_exception_trace @@ -501,6 +499,7 @@ def _start_listener(self, qname=None): t = mt.Thread(target=Getter._listener, args=[self._url, qname, self._uid]) t.daemon = True + print('=== create queue listener %s' % t) t.start() Getter._callbacks[self._url]['thread'] = t diff --git a/src/radical/utils/zmq/utils.py b/src/radical/utils/zmq/utils.py index 75752b88..ab8a1613 100644 --- a/src/radical/utils/zmq/utils.py +++ b/src/radical/utils/zmq/utils.py @@ -129,7 +129,7 @@ def get_channel_url(ep_type, channel=None, url=None): # def log_bulk(log, token, msgs): - if log._num_level > 1: + if log.num_level > 1: # log level `debug_9` disabled return From b60dc0bfde00d8d68b86193c669841065c0797a6 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 4 Dec 2024 00:03:55 +0100 Subject: [PATCH 4/5] snap --- src/radical/utils/zmq/pubsub.py | 36 ++++++++++++++++----------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/radical/utils/zmq/pubsub.py b/src/radical/utils/zmq/pubsub.py index 9ec45ea6..1985f4e6 100644 --- a/src/radical/utils/zmq/pubsub.py +++ b/src/radical/utils/zmq/pubsub.py @@ -395,23 +395,22 @@ def channel(self): def _start_listener(self): # only start if needed - if self._thread: - return + with self._lock: - lock = self._lock - term = self._term - callbacks = self._callbacks + if self._thread: + return - self._log.info('start listener for %s', self._channel) + lock = self._lock + term = self._term + callbacks = self._callbacks - t = mt.Thread(target=Subscriber._listener, - args=[self._sock, lock, term, callbacks, - self._log, self._prof]) - t.daemon = True - print('=== create pubsub listener %s' % t) - t.start() + self._log.info('start listener for %s', self._channel) - self._thread = t + self._thread = mt.Thread(target=Subscriber._listener, + args=[self._sock, lock, term, callbacks, + self._log, self._prof]) + self._thread.daemon = True + self._thread.start() # -------------------------------------------------------------------------- @@ -420,11 +419,12 @@ def _stop_listener(self, force=False): # only stop listener if no callbacks remain registered (unless forced) if force or not self._callbacks: - if self._thread: - self._term.set() - self._thread.join() - self._term.clear() - self._thread = None + with self._lock: + if self._thread: + self._term.set() + self._thread.join() + self._term.clear() + self._thread = None # -------------------------------------------------------------------------- From c405616ef41328fdbc3a8e0bd32585df23890356 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 4 Dec 2024 10:47:39 +0100 Subject: [PATCH 5/5] log cleanup --- src/radical/utils/debug.py | 8 +++++--- src/radical/utils/zmq/pubsub.py | 6 +++--- src/radical/utils/zmq/queue.py | 7 +++---- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/radical/utils/debug.py b/src/radical/utils/debug.py index 01397baf..8ef880b6 100644 --- a/src/radical/utils/debug.py +++ b/src/radical/utils/debug.py @@ -339,8 +339,8 @@ def attach_pudb(log=None): if log: log.info('debugger open: telnet %s %d', host, port) - else: - print('debugger open: telnet %s %d' % (host, port)) + + print('debugger open: telnet %s %d' % (host, port)) try: import pudb # pylint: disable=E0401 @@ -352,7 +352,9 @@ def attach_pudb(log=None): except Exception as e: if log: - log.warning('failed to attach pudb (%s)', e) + log.exception('failed to attach pudb') + else: + print('failed to attach pudb (%s)' % repr(e)) # ------------------------------------------------------------------------------ diff --git a/src/radical/utils/zmq/pubsub.py b/src/radical/utils/zmq/pubsub.py index 1985f4e6..d98509da 100644 --- a/src/radical/utils/zmq/pubsub.py +++ b/src/radical/utils/zmq/pubsub.py @@ -228,8 +228,8 @@ def put(self, topic, msg): assert isinstance(topic, str), 'invalid topic type' - self._log.debug_9('=== put %s : %s: %s', topic, self.channel, msg) - # self._log.debug_9('=== put %s: %s', msg, get_stacktrace()) + self._log.debug_9('put %s : %s: %s', topic, self.channel, msg) + # self._log.debug_9('put %s: %s', msg, get_stacktrace()) # self._prof.prof('put', uid=self._uid, msg=msg) log_bulk(self._log, '-> %s' % topic, [msg]) @@ -450,7 +450,7 @@ def subscribe(self, topic, cb=None, lock=None): log_bulk(self._log, '~~2 %s' % topic, [topic]) with self._lock: - self._log.debug_9('==== subscribe for %s', topic) + self._log.debug_9('subscribe for %s', topic) no_intr(self._sock.setsockopt, zmq.SUBSCRIBE, as_bytes(topic)) if topic not in self._topics: diff --git a/src/radical/utils/zmq/queue.py b/src/radical/utils/zmq/queue.py index 8630c789..ce61a3f8 100644 --- a/src/radical/utils/zmq/queue.py +++ b/src/radical/utils/zmq/queue.py @@ -403,7 +403,7 @@ def _get_nowait(url, qname=None, timeout=None, uid=None): # timeout in ms # send the request *once* per recieval (got lock above) # FIXME: why is this sent repeatedly? - logger.debug_9('=== => from %s[%s]', uid, qname) + logger.debug_9('=> from %s[%s]', uid, qname) no_intr(info['socket'].send, as_bytes(qname)) info['requested'] = True @@ -499,7 +499,6 @@ def _start_listener(self, qname=None): t = mt.Thread(target=Getter._listener, args=[self._url, qname, self._uid]) t.daemon = True - print('=== create queue listener %s' % t) t.start() Getter._callbacks[self._url]['thread'] = t @@ -673,7 +672,7 @@ def get(self, qname=None): if not self._requested: with self._lock: if not self._requested: - self._log.debug_9('=== => from %s[%s]', self._channel, qname) + self._log.debug_9('=> from %s[%s]', self._channel, qname) no_intr(self._q.send, as_bytes(qname)) self._requested = True @@ -709,7 +708,7 @@ def get_nowait(self, qname=None, timeout=None): # timeout in ms if not self._requested: with self._lock: # need to protect self._requested if not self._requested: - self._log.debug_9('=== => from %s[%s]', self._channel, qname) + self._log.debug_9('=> from %s[%s]', self._channel, qname) no_intr(self._q.send_multipart, [as_bytes(qname)]) self._requested = True