From 77b3fcac87e238ad4be0606c50788695b09124bf Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 8 Apr 2024 19:59:32 +0000 Subject: [PATCH] Replace DFK and JobStatusPoller monitoring zmq channels with Queue plugin Before this PR, the DataFlowKernel and each JobStatusPoller PolledExecutorFacade each opened a ZMQ connection to the monitoring router. These connections are not threadsafe, but (especially in the DFK case) no reasoning or checking stops the DFK's connection being used in multiple threads. Before this PR, the MonitoringHub presented a 'send' method and stored the DFK's ZMQ connection in self._dfk_channel, and each PolledExecutorFacade contained a copy of the ZMQ channel code to open its own channel, configured using parameters from the DFK passed in at construction. This PR: * moves the above uses to use the MonitoringRadio interface (which was originally designed for remote workers to send monitoring information, but seems ok here too) * has MonitoringHub construct an appropriate MonitoringRadio instance for use on the submit side, exposed as self.radio; * replaces the implementation of send with a new MultiprocessingQueueRadio which is thread safe but only works in the same multiprocessing environment as the launched monitoring database manager process (which is true on the submit side, but for example means this radio cannot be used on most remote workers) This work aligns with the prototype in #3315 (which pushes on monitoring radio configuration for remote workers) and pushes in the direction (without getting there) of allowing other submit-side hooks. This work removes some monitoring specific code from the JobStatusPoller, replacing it with a dependency injection style. This is part of work to move JobStatusPoller facade state into other classes, as part of job handling rearrangements in PR #3293 This PR will change how monitoring messages are delivered from the submitting process, and the most obvious thing I can think of that will change is how this will behave under load: heavily loaded messaging causing full buffers and other heavy-load symptoms will now behave as multiprocessing Queues do, rather than as ZMQ connections do. I have not attempted to characterise either of those modes. --- parsl/dataflow/dflow.py | 3 +- parsl/jobs/job_status_poller.py | 29 +++++--------------- parsl/monitoring/monitoring.py | 27 ++++-------------- parsl/monitoring/radios.py | 15 ++++++++++ parsl/tests/test_monitoring/test_fuzz_zmq.py | 11 +++++++- 5 files changed, 39 insertions(+), 46 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index a0954441ea..73c70975cc 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -177,10 +177,11 @@ def __init__(self, config: Config) -> None: # this must be set before executors are added since add_executors calls # job_status_poller.add_executors. + radio = self.monitoring.radio if self.monitoring else None self.job_status_poller = JobStatusPoller(strategy=self.config.strategy, strategy_period=self.config.strategy_period, max_idletime=self.config.max_idletime, - dfk=self) + monitoring=radio) self.executors: Dict[str, ParslExecutor] = {} diff --git a/parsl/jobs/job_status_poller.py b/parsl/jobs/job_status_poller.py index a05a2c1181..aeba8ca425 100644 --- a/parsl/jobs/job_status_poller.py +++ b/parsl/jobs/job_status_poller.py @@ -1,7 +1,6 @@ import logging import parsl import time -import zmq from typing import Dict, List, Sequence, Optional, Union from parsl.jobs.states import JobStatus, JobState @@ -17,25 +16,11 @@ class PolledExecutorFacade: - def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None): + def __init__(self, executor: BlockProviderExecutor, monitoring: Optional["parsl.monitoring.radios.MonitoringRadio"] = None): self._executor = executor self._last_poll_time = 0.0 self._status = {} # type: Dict[str, JobStatus] - - # Create a ZMQ channel to send poll status to monitoring - - self.hub_channel: Optional[zmq.Socket] - - if dfk and dfk.monitoring is not None: - hub_address = dfk.hub_address - hub_port = dfk.hub_zmq_port - context = zmq.Context() - self.hub_channel = context.socket(zmq.DEALER) - self.hub_channel.set_hwm(0) - self.hub_channel.connect("tcp://{}:{}".format(hub_address, hub_port)) - logger.info("Monitoring enabled on job status poller") - else: - self.hub_channel = None + self._monitoring = monitoring def poll(self) -> None: now = time.time() @@ -54,10 +39,10 @@ def poll(self) -> None: def send_monitoring_info(self, status: Dict) -> None: # Send monitoring info for HTEX when monitoring enabled - if self.hub_channel: + if self._monitoring: msg = self._executor.create_monitoring_info(status) logger.debug("Sending message {} to hub from job status poller".format(msg)) - self.hub_channel.send_pyobj((MessageType.BLOCK_INFO, msg)) + self._monitoring.send((MessageType.BLOCK_INFO, msg)) @property def status(self) -> Dict[str, JobStatus]: @@ -107,9 +92,9 @@ def __repr__(self) -> str: class JobStatusPoller(Timer): def __init__(self, *, strategy: Optional[str], max_idletime: float, strategy_period: Union[float, int], - dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None) -> None: + monitoring: Optional["parsl.monitoring.radios.MonitoringRadio"] = None) -> None: self._executor_facades = [] # type: List[PolledExecutorFacade] - self.dfk = dfk + self.monitoring = monitoring self._strategy = Strategy(strategy=strategy, max_idletime=max_idletime) super().__init__(self.poll, interval=strategy_period, name="JobStatusPoller") @@ -131,7 +116,7 @@ def add_executors(self, executors: Sequence[BlockProviderExecutor]) -> None: for executor in executors: if executor.status_polling_interval > 0: logger.debug("Adding executor {}".format(executor.label)) - self._executor_facades.append(PolledExecutorFacade(executor, self.dfk)) + self._executor_facades.append(PolledExecutorFacade(executor, self.monitoring)) self._strategy.add_executors(executors) def close(self): diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index feda60cc2c..ba13729564 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -5,7 +5,6 @@ import logging import multiprocessing.synchronize as ms import typeguard -import zmq import queue @@ -20,6 +19,7 @@ from parsl.serialize import deserialize +from parsl.monitoring.radios import MultiprocessingQueueRadio from parsl.monitoring.router import router_starter from parsl.monitoring.message_type import MessageType from parsl.monitoring.types import AddressedMonitoringMessage @@ -92,12 +92,6 @@ def __init__(self, Default: 30 seconds """ - # Any is used to disable typechecking on uses of _dfk_channel, - # because it is used in the code as if it points to a channel, but - # the static type is that it can also be None. The code relies on - # .start() being called and initialising this to a real channel. - self._dfk_channel = None # type: Any - if _db_manager_excepts: raise _db_manager_excepts @@ -197,6 +191,8 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat self.filesystem_proc.start() logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") + self.radio = MultiprocessingQueueRadio(self.block_msgs) + try: comm_q_result = comm_q.get(block=True, timeout=120) except queue.Empty: @@ -211,14 +207,6 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port) - context = zmq.Context() - self.dfk_channel_timeout = 10000 # in milliseconds - self._dfk_channel = context.socket(zmq.DEALER) - self._dfk_channel.setsockopt(zmq.LINGER, 0) - self._dfk_channel.set_hwm(0) - self._dfk_channel.setsockopt(zmq.SNDTIMEO, self.dfk_channel_timeout) - self._dfk_channel.connect("tcp://{}:{}".format(self.hub_address, zmq_port)) - logger.info("Monitoring Hub initialized") return zmq_port @@ -226,11 +214,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat # TODO: tighten the Any message format def send(self, mtype: MessageType, message: Any) -> None: logger.debug("Sending message type {}".format(mtype)) - try: - self._dfk_channel.send_pyobj((mtype, message)) - except zmq.Again: - logger.exception( - "The monitoring message sent from DFK to router timed-out after {}ms".format(self.dfk_channel_timeout)) + self.radio.send((mtype, message)) def close(self) -> None: logger.info("Terminating Monitoring Hub") @@ -241,9 +225,8 @@ def close(self) -> None: logger.error("There was a queued exception (Either router or DBM process got exception much earlier?)") except queue.Empty: break - if self._dfk_channel and self.monitoring_hub_active: + if self.monitoring_hub_active: self.monitoring_hub_active = False - self._dfk_channel.close() if exception_msgs: for exception_msg in exception_msgs: logger.error( diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py index 5c815ffd12..52de9f8469 100644 --- a/parsl/monitoring/radios.py +++ b/parsl/monitoring/radios.py @@ -6,6 +6,7 @@ from abc import ABCMeta, abstractmethod +from multiprocessing.queues import Queue from typing import Optional from parsl.serialize import serialize @@ -173,3 +174,17 @@ def send(self, message: object) -> None: logging.error("Could not send message within timeout limit") return return + + +class MultiprocessingQueueRadio(MonitoringRadio): + """A monitoring radio intended which connects over a multiprocessing Queue. + This radio is intended to be used on the submit side, where components + in the submit process, or processes launched by multiprocessing, will have + access to a Queue shared with the monitoring database code (bypassing the + monitoring router). + """ + def __init__(self, queue: Queue) -> None: + self.queue = queue + + def send(self, message: object) -> None: + self.queue.put((message, 0)) diff --git a/parsl/tests/test_monitoring/test_fuzz_zmq.py b/parsl/tests/test_monitoring/test_fuzz_zmq.py index d9be378cda..3aaff7d803 100644 --- a/parsl/tests/test_monitoring/test_fuzz_zmq.py +++ b/parsl/tests/test_monitoring/test_fuzz_zmq.py @@ -4,6 +4,7 @@ import pytest import socket import time +import zmq logger = logging.getLogger(__name__) @@ -48,8 +49,16 @@ def test_row_counts(): s.connect((hub_address, hub_zmq_port)) s.sendall(b'fuzzing\r') + context = zmq.Context() + channel_timeout = 10000 # in milliseconds + hub_channel = context.socket(zmq.DEALER) + hub_channel.setsockopt(zmq.LINGER, 0) + hub_channel.set_hwm(0) + hub_channel.setsockopt(zmq.SNDTIMEO, channel_timeout) + hub_channel.connect("tcp://{}:{}".format(hub_address, hub_zmq_port)) + # this will send a non-object down the DFK's existing ZMQ connection - parsl.dfk().monitoring._dfk_channel.send(b'FuzzyByte\rSTREAM') + hub_channel.send(b'FuzzyByte\rSTREAM') # This following attack is commented out, because monitoring is not resilient # to this.