Skip to content

Commit

Permalink
Move monitoring router parameters into object attributes (#3521)
Browse files Browse the repository at this point in the history
This is to support rearrangement of the structure of the router code into
multiple threads and methods, without having to manually wire all of the
multiprocessing objects between the new methods and threads.

These objects are part of the context of the router object, rather than
parameters to individual methods which might change, and they are all
multiprocessing objects which are thread-safe.
  • Loading branch information
benclifford authored Jul 31, 2024
1 parent 5eb30f1 commit 2981a28
Showing 1 changed file with 32 additions and 17 deletions.
49 changes: 32 additions & 17 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ def __init__(self,
logdir: str = ".",
run_id: str,
logging_level: int = logging.INFO,
atexit_timeout: int = 3 # in seconds
atexit_timeout: int = 3, # in seconds
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
exit_event: Event,
):
""" Initializes a monitoring configuration class.
Expand All @@ -51,7 +56,11 @@ def __init__(self,
Logging level as defined in the logging module. Default: logging.INFO
atexit_timeout : float, optional
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
*_msgs : Queue
Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag.
exit_event : Event
An event that the main Parsl process will set to signal that the monitoring router should shut down.
"""
os.makedirs(logdir, exist_ok=True)
self.logger = set_file_logger("{}/monitoring_router.log".format(logdir),
Expand Down Expand Up @@ -93,19 +102,20 @@ def __init__(self,
min_port=zmq_port_range[0],
max_port=zmq_port_range[1])

def start(self,
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
exit_event: Event) -> None:
self.priority_msgs = priority_msgs
self.node_msgs = node_msgs
self.block_msgs = block_msgs
self.resource_msgs = resource_msgs
self.exit_event = exit_event

def start(self) -> None:
try:
while not exit_event.is_set():
while not self.exit_event.is_set():
try:
data, addr = self.udp_sock.recvfrom(2048)
resource_msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg))
resource_msgs.put((resource_msg, addr))
self.resource_msgs.put((resource_msg, addr))
except socket.timeout:
pass

Expand All @@ -125,15 +135,15 @@ def start(self,

if msg[0] == MessageType.NODE_INFO:
msg[1]['run_id'] = self.run_id
node_msgs.put(msg_0)
self.node_msgs.put(msg_0)
elif msg[0] == MessageType.RESOURCE_INFO:
resource_msgs.put(msg_0)
self.resource_msgs.put(msg_0)
elif msg[0] == MessageType.BLOCK_INFO:
block_msgs.put(msg_0)
self.block_msgs.put(msg_0)
elif msg[0] == MessageType.TASK_INFO:
priority_msgs.put(msg_0)
self.priority_msgs.put(msg_0)
elif msg[0] == MessageType.WORKFLOW_INFO:
priority_msgs.put(msg_0)
self.priority_msgs.put(msg_0)
else:
# There is a type: ignore here because if msg[0]
# is of the correct type, this code is unreachable,
Expand All @@ -158,7 +168,7 @@ def start(self,
data, addr = self.udp_sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
resource_msgs.put((msg, addr))
self.resource_msgs.put((msg, addr))
last_msg_received_time = time.time()
except socket.timeout:
pass
Expand Down Expand Up @@ -191,7 +201,12 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
zmq_port_range=zmq_port_range,
logdir=logdir,
logging_level=logging_level,
run_id=run_id)
run_id=run_id,
priority_msgs=priority_msgs,
node_msgs=node_msgs,
block_msgs=block_msgs,
resource_msgs=resource_msgs,
exit_event=exit_event)
except Exception as e:
logger.error("MonitoringRouter construction failed.", exc_info=True)
comm_q.put(f"Monitoring router construction failed: {e}")
Expand All @@ -200,7 +215,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",

router.logger.info("Starting MonitoringRouter in router_starter")
try:
router.start(priority_msgs, node_msgs, block_msgs, resource_msgs, exit_event)
router.start()
except Exception as e:
router.logger.exception("router.start exception")
exception_q.put(('Hub', str(e)))

0 comments on commit 2981a28

Please sign in to comment.