Skip to content

Commit

Permalink
merge from devel
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-merzky committed Aug 20, 2024
2 parents 79e6ce9 + db228c5 commit 02ef849
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 0 deletions.
86 changes: 86 additions & 0 deletions examples/zmq/pubsub/ru_zmq_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env python3

import time

import threading as mt

import radical.utils as ru

CHANNEL = 'test'
TOPIC = 'test'


# ------------------------------------------------------------------------------
#
def put(channel, uid, url, n):

put = ru.zmq.Putter(channel, url)
for i in range(n):
put.put(ru.as_bytes('%s: message %d' % (uid, i)))
print('%s: message %d' % (uid, i))
time.sleep(0.1)

put.put(ru.as_bytes('%s: STOP' % uid))

print('%s: done' % uid)


# ------------------------------------------------------------------------------
#
def get(channel, uid, url):

cont = True
get = ru.zmq.Getter(channel, url)

while cont:

msgs = get.get()
if not msgs:
continue

print('%s: %s' % (uid, ru.as_string(msgs)))

for msg in msgs:
if 'STOP' in ru.as_string(msg):
cont = False

print('%s: done' % uid)


# ------------------------------------------------------------------------------
#
def main():

bridge = ru.zmq.Queue(CHANNEL)
bridge.start()

threads = list()

# start some putters and getters
# NOTE: start more putters than getters for clean termination
for i in range(4):
t = mt.Thread(target=put, args=(CHANNEL, 'put.%d' % i, bridge.addr_put, 5))
t.daemon = True
t.start()
threads.append(t)

for i in range(3):
t = mt.Thread(target=get, args=(CHANNEL, 'get.%d' % i, bridge.addr_get))
t.daemon = True
t.start()
threads.append(t)

# wait for completion
for t in threads:
t.join()


# ------------------------------------------------------------------------------
#
if __name__ == '__main__':

main()


# ------------------------------------------------------------------------------

83 changes: 83 additions & 0 deletions examples/zmq/queue/ru_zmq_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python3

import time

import threading as mt

import radical.utils as ru

CHANNEL = 'channel'
TOPIC = 'test'


# ------------------------------------------------------------------------------
#
def pub(uid, url, topic, n):

pub = ru.zmq.Publisher(topic, url)
for i in range(n):
pub.put(topic, ru.as_bytes('%s: message %d' % (uid, i)))
print('%s: message %d' % (uid, i))
time.sleep(0.1)

pub.put(topic, ru.as_bytes('%s: STOP' % uid))

print('%s: done' % uid)


# ------------------------------------------------------------------------------
#
def sub(uid, url, topic):

cont = True
sub = ru.zmq.Subscriber(topic, url)
sub.subscribe(topic)

while cont:
topic, msg = sub.get()
print('%s: %s' % (uid, ru.as_string(msg)))

if 'STOP' in ru.as_string(msg):
cont = False

print('%s: done' % uid)


# ------------------------------------------------------------------------------
#
def main():

bridge = ru.zmq.PubSub(CHANNEL)
bridge.start()

threads = list()


# start some subscribers and publishers
for i in range(3):
t = mt.Thread(target=sub, args=('sub.%d' % i, bridge.addr_sub, TOPIC))
t.daemon = True
t.start()
threads.append(t)

for i in range(4):
t = mt.Thread(target=pub, args=('pub.%d' % i, bridge.addr_pub, TOPIC, 5))
t.daemon = True
t.start()
threads.append(t)


# wait for completion
for t in threads:
t.join()


# ------------------------------------------------------------------------------
#
if __name__ == '__main__':

main()


# ------------------------------------------------------------------------------

0 comments on commit 02ef849

Please sign in to comment.