From 2981a287bbffc4aead9f70e2a427c1def3fa9f36 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 31 Jul 2024 13:17:07 +0200 Subject: [PATCH] Move monitoring router parameters into object attributes (#3521) This is to support rearrangement of the structure of the router code into multiple threads and methods, without having to manually wire all of the multiprocessing objects between the new methods and threads. These objects are part of the context of the router object, rather than parameters to individual methods which might change, and they are all multiprocessing objects which are thread-safe. --- parsl/monitoring/router.py | 49 +++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 70b4862295..9a422027c1 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -32,7 +32,12 @@ def __init__(self, logdir: str = ".", run_id: str, logging_level: int = logging.INFO, - atexit_timeout: int = 3 # in seconds + atexit_timeout: int = 3, # in seconds + priority_msgs: "queue.Queue[AddressedMonitoringMessage]", + node_msgs: "queue.Queue[AddressedMonitoringMessage]", + block_msgs: "queue.Queue[AddressedMonitoringMessage]", + resource_msgs: "queue.Queue[AddressedMonitoringMessage]", + exit_event: Event, ): """ Initializes a monitoring configuration class. @@ -51,7 +56,11 @@ def __init__(self, Logging level as defined in the logging module. Default: logging.INFO atexit_timeout : float, optional The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. + *_msgs : Queue + Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag. + exit_event : Event + An event that the main Parsl process will set to signal that the monitoring router should shut down. """ os.makedirs(logdir, exist_ok=True) self.logger = set_file_logger("{}/monitoring_router.log".format(logdir), @@ -93,19 +102,20 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - def start(self, - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", - exit_event: Event) -> None: + self.priority_msgs = priority_msgs + self.node_msgs = node_msgs + self.block_msgs = block_msgs + self.resource_msgs = resource_msgs + self.exit_event = exit_event + + def start(self) -> None: try: - while not exit_event.is_set(): + while not self.exit_event.is_set(): try: data, addr = self.udp_sock.recvfrom(2048) resource_msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - resource_msgs.put((resource_msg, addr)) + self.resource_msgs.put((resource_msg, addr)) except socket.timeout: pass @@ -125,15 +135,15 @@ def start(self, if msg[0] == MessageType.NODE_INFO: msg[1]['run_id'] = self.run_id - node_msgs.put(msg_0) + self.node_msgs.put(msg_0) elif msg[0] == MessageType.RESOURCE_INFO: - resource_msgs.put(msg_0) + self.resource_msgs.put(msg_0) elif msg[0] == MessageType.BLOCK_INFO: - block_msgs.put(msg_0) + self.block_msgs.put(msg_0) elif msg[0] == MessageType.TASK_INFO: - priority_msgs.put(msg_0) + self.priority_msgs.put(msg_0) elif msg[0] == MessageType.WORKFLOW_INFO: - priority_msgs.put(msg_0) + self.priority_msgs.put(msg_0) else: # There is a type: ignore here because if msg[0] # is of the correct type, this code is unreachable, @@ -158,7 +168,7 @@ def start(self, data, addr = self.udp_sock.recvfrom(2048) msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - resource_msgs.put((msg, addr)) + self.resource_msgs.put((msg, addr)) last_msg_received_time = time.time() except socket.timeout: pass @@ -191,7 +201,12 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - run_id=run_id) + run_id=run_id, + priority_msgs=priority_msgs, + node_msgs=node_msgs, + block_msgs=block_msgs, + resource_msgs=resource_msgs, + exit_event=exit_event) except Exception as e: logger.error("MonitoringRouter construction failed.", exc_info=True) comm_q.put(f"Monitoring router construction failed: {e}") @@ -200,7 +215,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", router.logger.info("Starting MonitoringRouter in router_starter") try: - router.start(priority_msgs, node_msgs, block_msgs, resource_msgs, exit_event) + router.start() except Exception as e: router.logger.exception("router.start exception") exception_q.put(('Hub', str(e)))