diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index a0954441ea..73c70975cc 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -177,10 +177,11 @@ def __init__(self, config: Config) -> None: # this must be set before executors are added since add_executors calls # job_status_poller.add_executors. + radio = self.monitoring.radio if self.monitoring else None self.job_status_poller = JobStatusPoller(strategy=self.config.strategy, strategy_period=self.config.strategy_period, max_idletime=self.config.max_idletime, - dfk=self) + monitoring=radio) self.executors: Dict[str, ParslExecutor] = {} diff --git a/parsl/jobs/job_status_poller.py b/parsl/jobs/job_status_poller.py index a05a2c1181..aeba8ca425 100644 --- a/parsl/jobs/job_status_poller.py +++ b/parsl/jobs/job_status_poller.py @@ -1,7 +1,6 @@ import logging import parsl import time -import zmq from typing import Dict, List, Sequence, Optional, Union from parsl.jobs.states import JobStatus, JobState @@ -17,25 +16,11 @@ class PolledExecutorFacade: - def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None): + def __init__(self, executor: BlockProviderExecutor, monitoring: Optional["parsl.monitoring.radios.MonitoringRadio"] = None): self._executor = executor self._last_poll_time = 0.0 self._status = {} # type: Dict[str, JobStatus] - - # Create a ZMQ channel to send poll status to monitoring - - self.hub_channel: Optional[zmq.Socket] - - if dfk and dfk.monitoring is not None: - hub_address = dfk.hub_address - hub_port = dfk.hub_zmq_port - context = zmq.Context() - self.hub_channel = context.socket(zmq.DEALER) - self.hub_channel.set_hwm(0) - self.hub_channel.connect("tcp://{}:{}".format(hub_address, hub_port)) - logger.info("Monitoring enabled on job status poller") - else: - self.hub_channel = None + self._monitoring = monitoring def poll(self) -> None: now = time.time() @@ -54,10 +39,10 @@ def poll(self) -> None: def send_monitoring_info(self, status: Dict) -> None: # Send monitoring info for HTEX when monitoring enabled - if self.hub_channel: + if self._monitoring: msg = self._executor.create_monitoring_info(status) logger.debug("Sending message {} to hub from job status poller".format(msg)) - self.hub_channel.send_pyobj((MessageType.BLOCK_INFO, msg)) + self._monitoring.send((MessageType.BLOCK_INFO, msg)) @property def status(self) -> Dict[str, JobStatus]: @@ -107,9 +92,9 @@ def __repr__(self) -> str: class JobStatusPoller(Timer): def __init__(self, *, strategy: Optional[str], max_idletime: float, strategy_period: Union[float, int], - dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None) -> None: + monitoring: Optional["parsl.monitoring.radios.MonitoringRadio"] = None) -> None: self._executor_facades = [] # type: List[PolledExecutorFacade] - self.dfk = dfk + self.monitoring = monitoring self._strategy = Strategy(strategy=strategy, max_idletime=max_idletime) super().__init__(self.poll, interval=strategy_period, name="JobStatusPoller") @@ -131,7 +116,7 @@ def add_executors(self, executors: Sequence[BlockProviderExecutor]) -> None: for executor in executors: if executor.status_polling_interval > 0: logger.debug("Adding executor {}".format(executor.label)) - self._executor_facades.append(PolledExecutorFacade(executor, self.dfk)) + self._executor_facades.append(PolledExecutorFacade(executor, self.monitoring)) self._strategy.add_executors(executors) def close(self): diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index feda60cc2c..ba13729564 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -5,7 +5,6 @@ import logging import multiprocessing.synchronize as ms import typeguard -import zmq import queue @@ -20,6 +19,7 @@ from parsl.serialize import deserialize +from parsl.monitoring.radios import MultiprocessingQueueRadio from parsl.monitoring.router import router_starter from parsl.monitoring.message_type import MessageType from parsl.monitoring.types import AddressedMonitoringMessage @@ -92,12 +92,6 @@ def __init__(self, Default: 30 seconds """ - # Any is used to disable typechecking on uses of _dfk_channel, - # because it is used in the code as if it points to a channel, but - # the static type is that it can also be None. The code relies on - # .start() being called and initialising this to a real channel. - self._dfk_channel = None # type: Any - if _db_manager_excepts: raise _db_manager_excepts @@ -197,6 +191,8 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat self.filesystem_proc.start() logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") + self.radio = MultiprocessingQueueRadio(self.block_msgs) + try: comm_q_result = comm_q.get(block=True, timeout=120) except queue.Empty: @@ -211,14 +207,6 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port) - context = zmq.Context() - self.dfk_channel_timeout = 10000 # in milliseconds - self._dfk_channel = context.socket(zmq.DEALER) - self._dfk_channel.setsockopt(zmq.LINGER, 0) - self._dfk_channel.set_hwm(0) - self._dfk_channel.setsockopt(zmq.SNDTIMEO, self.dfk_channel_timeout) - self._dfk_channel.connect("tcp://{}:{}".format(self.hub_address, zmq_port)) - logger.info("Monitoring Hub initialized") return zmq_port @@ -226,11 +214,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat # TODO: tighten the Any message format def send(self, mtype: MessageType, message: Any) -> None: logger.debug("Sending message type {}".format(mtype)) - try: - self._dfk_channel.send_pyobj((mtype, message)) - except zmq.Again: - logger.exception( - "The monitoring message sent from DFK to router timed-out after {}ms".format(self.dfk_channel_timeout)) + self.radio.send((mtype, message)) def close(self) -> None: logger.info("Terminating Monitoring Hub") @@ -241,9 +225,8 @@ def close(self) -> None: logger.error("There was a queued exception (Either router or DBM process got exception much earlier?)") except queue.Empty: break - if self._dfk_channel and self.monitoring_hub_active: + if self.monitoring_hub_active: self.monitoring_hub_active = False - self._dfk_channel.close() if exception_msgs: for exception_msg in exception_msgs: logger.error( diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py index 5c815ffd12..52de9f8469 100644 --- a/parsl/monitoring/radios.py +++ b/parsl/monitoring/radios.py @@ -6,6 +6,7 @@ from abc import ABCMeta, abstractmethod +from multiprocessing.queues import Queue from typing import Optional from parsl.serialize import serialize @@ -173,3 +174,17 @@ def send(self, message: object) -> None: logging.error("Could not send message within timeout limit") return return + + +class MultiprocessingQueueRadio(MonitoringRadio): + """A monitoring radio intended which connects over a multiprocessing Queue. + This radio is intended to be used on the submit side, where components + in the submit process, or processes launched by multiprocessing, will have + access to a Queue shared with the monitoring database code (bypassing the + monitoring router). + """ + def __init__(self, queue: Queue) -> None: + self.queue = queue + + def send(self, message: object) -> None: + self.queue.put((message, 0)) diff --git a/parsl/tests/test_monitoring/test_fuzz_zmq.py b/parsl/tests/test_monitoring/test_fuzz_zmq.py index d9be378cda..3aaff7d803 100644 --- a/parsl/tests/test_monitoring/test_fuzz_zmq.py +++ b/parsl/tests/test_monitoring/test_fuzz_zmq.py @@ -4,6 +4,7 @@ import pytest import socket import time +import zmq logger = logging.getLogger(__name__) @@ -48,8 +49,16 @@ def test_row_counts(): s.connect((hub_address, hub_zmq_port)) s.sendall(b'fuzzing\r') + context = zmq.Context() + channel_timeout = 10000 # in milliseconds + hub_channel = context.socket(zmq.DEALER) + hub_channel.setsockopt(zmq.LINGER, 0) + hub_channel.set_hwm(0) + hub_channel.setsockopt(zmq.SNDTIMEO, channel_timeout) + hub_channel.connect("tcp://{}:{}".format(hub_address, hub_zmq_port)) + # this will send a non-object down the DFK's existing ZMQ connection - parsl.dfk().monitoring._dfk_channel.send(b'FuzzyByte\rSTREAM') + hub_channel.send(b'FuzzyByte\rSTREAM') # This following attack is commented out, because monitoring is not resilient # to this.