diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index a1b20f2705..08d771036a 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -270,6 +270,8 @@ def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir: new_dir = f"{base_path}/new/" logger.debug("Creating new and tmp paths under %s", base_path) + target_radio = MultiprocessingQueueRadioSender(q) + os.makedirs(tmp_dir, exist_ok=True) os.makedirs(new_dir, exist_ok=True) @@ -285,7 +287,7 @@ def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir: message = pickle.load(f) logger.debug("Message received is: %s", message) assert isinstance(message, tuple) - q.put(cast(TaggedMonitoringMessage, message)) + target_radio.send(cast(TaggedMonitoringMessage, message)) os.remove(full_path_filename) except Exception: logger.exception("Exception processing %s - probably will be retried next iteration", filename) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 1d4b522e82..a45500fc23 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -14,6 +14,7 @@ import zmq from parsl.log_utils import set_file_logger +from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.types import TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle @@ -55,7 +56,6 @@ def __init__(self, The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. resource_msgs : multiprocessing.Queue A multiprocessing queue to receive messages to be routed onwards to the database process - exit_event : Event An event that the main Parsl process will set to signal that the monitoring router should shut down. """ @@ -98,7 +98,7 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - self.resource_msgs = resource_msgs + self.target_radio = MultiprocessingQueueRadioSender(resource_msgs) self.exit_event = exit_event @wrap_with_logs(target="monitoring_router") @@ -125,7 +125,7 @@ def start_udp_listener(self) -> None: data, addr = self.udp_sock.recvfrom(2048) resource_msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - self.resource_msgs.put(resource_msg) + self.target_radio.send(resource_msg) except socket.timeout: pass @@ -136,7 +136,7 @@ def start_udp_listener(self) -> None: data, addr = self.udp_sock.recvfrom(2048) msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.resource_msgs.put(msg) + self.target_radio.send(msg) last_msg_received_time = time.time() except socket.timeout: pass @@ -160,7 +160,7 @@ def start_zmq_listener(self) -> None: assert len(msg) >= 1, "ZMQ Receiver expects tuples of length at least 1, got {}".format(msg) assert len(msg) == 2, "ZMQ Receiver expects message tuples of exactly length 2, got {}".format(msg) - self.resource_msgs.put(msg) + self.target_radio.send(msg) except zmq.Again: pass except Exception: