Skip to content

Commit

Permalink
Replace DFK and JobStatusPoller monitoring zmq channels with Queue pl…
Browse files Browse the repository at this point in the history
…ugin

Before this PR, the DataFlowKernel and each JobStatusPoller
PolledExecutorFacade each opened a ZMQ connection to the monitoring router.
These connections are not threadsafe, but (especially in the DFK case) no
reasoning or checking stops the DFK's connection being used in multiple
threads.

Before this PR, the MonitoringHub presented a 'send' method and stored the
DFK's ZMQ connection in self._dfk_channel, and each PolledExecutorFacade
contained a copy of the ZMQ channel code to open its own channel, configured
using parameters from the DFK passed in at construction.

This PR:

* moves the above uses to use the MonitoringRadio interface (which was
originally designed for remote workers to send monitoring information, but
seems ok here too)

* has MonitoringHub construct an appropriate MonitoringRadio instance for
use on the submit side, exposed as self.radio;

* replaces the implementation of send with a new MultiprocessingQueueRadio
which is thread safe but only works in the same multiprocessing environment
as the launched monitoring database manager process (which is true on the
submit side, but for example means this radio cannot be used on most
remote workers)

This work aligns with the prototype in #3315 (which pushes on monitoring
radio configuration for remote workers) and pushes in the direction
(without getting there) of allowing other submit-side hooks.

This work removes some monitoring specific code from the JobStatusPoller,
replacing it with a dependency injection style.
This is part of work to move JobStatusPoller facade state into other classes,
as part of job handling rearrangements in PR #3293

This PR will change how monitoring messages are delivered from the submitting
process, and the most obvious thing I can think of that will change is how
this will behave under load: heavily loaded messaging causing full buffers and
other heavy-load symptoms will now behave as multiprocessing Queues do, rather
than as ZMQ connections do. I have not attempted to characterise either of
those modes.
  • Loading branch information
benclifford committed Apr 10, 2024
1 parent eb25bfc commit 77b3fca
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 46 deletions.
3 changes: 2 additions & 1 deletion parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,11 @@ def __init__(self, config: Config) -> None:

# this must be set before executors are added since add_executors calls
# job_status_poller.add_executors.
radio = self.monitoring.radio if self.monitoring else None
self.job_status_poller = JobStatusPoller(strategy=self.config.strategy,
strategy_period=self.config.strategy_period,
max_idletime=self.config.max_idletime,
dfk=self)
monitoring=radio)

self.executors: Dict[str, ParslExecutor] = {}

Expand Down
29 changes: 7 additions & 22 deletions parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import parsl
import time
import zmq
from typing import Dict, List, Sequence, Optional, Union

from parsl.jobs.states import JobStatus, JobState
Expand All @@ -17,25 +16,11 @@


class PolledExecutorFacade:
def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None):
def __init__(self, executor: BlockProviderExecutor, monitoring: Optional["parsl.monitoring.radios.MonitoringRadio"] = None):
self._executor = executor
self._last_poll_time = 0.0
self._status = {} # type: Dict[str, JobStatus]

# Create a ZMQ channel to send poll status to monitoring

self.hub_channel: Optional[zmq.Socket]

if dfk and dfk.monitoring is not None:
hub_address = dfk.hub_address
hub_port = dfk.hub_zmq_port
context = zmq.Context()
self.hub_channel = context.socket(zmq.DEALER)
self.hub_channel.set_hwm(0)
self.hub_channel.connect("tcp://{}:{}".format(hub_address, hub_port))
logger.info("Monitoring enabled on job status poller")
else:
self.hub_channel = None
self._monitoring = monitoring

def poll(self) -> None:
now = time.time()
Expand All @@ -54,10 +39,10 @@ def poll(self) -> None:

def send_monitoring_info(self, status: Dict) -> None:
# Send monitoring info for HTEX when monitoring enabled
if self.hub_channel:
if self._monitoring:
msg = self._executor.create_monitoring_info(status)
logger.debug("Sending message {} to hub from job status poller".format(msg))
self.hub_channel.send_pyobj((MessageType.BLOCK_INFO, msg))
self._monitoring.send((MessageType.BLOCK_INFO, msg))

@property
def status(self) -> Dict[str, JobStatus]:
Expand Down Expand Up @@ -107,9 +92,9 @@ def __repr__(self) -> str:
class JobStatusPoller(Timer):
def __init__(self, *, strategy: Optional[str], max_idletime: float,
strategy_period: Union[float, int],
dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None) -> None:
monitoring: Optional["parsl.monitoring.radios.MonitoringRadio"] = None) -> None:
self._executor_facades = [] # type: List[PolledExecutorFacade]
self.dfk = dfk
self.monitoring = monitoring
self._strategy = Strategy(strategy=strategy,
max_idletime=max_idletime)
super().__init__(self.poll, interval=strategy_period, name="JobStatusPoller")
Expand All @@ -131,7 +116,7 @@ def add_executors(self, executors: Sequence[BlockProviderExecutor]) -> None:
for executor in executors:
if executor.status_polling_interval > 0:
logger.debug("Adding executor {}".format(executor.label))
self._executor_facades.append(PolledExecutorFacade(executor, self.dfk))
self._executor_facades.append(PolledExecutorFacade(executor, self.monitoring))
self._strategy.add_executors(executors)

def close(self):
Expand Down
27 changes: 5 additions & 22 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
import multiprocessing.synchronize as ms
import typeguard
import zmq

import queue

Expand All @@ -20,6 +19,7 @@

from parsl.serialize import deserialize

from parsl.monitoring.radios import MultiprocessingQueueRadio
from parsl.monitoring.router import router_starter
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.types import AddressedMonitoringMessage
Expand Down Expand Up @@ -92,12 +92,6 @@ def __init__(self,
Default: 30 seconds
"""

# Any is used to disable typechecking on uses of _dfk_channel,
# because it is used in the code as if it points to a channel, but
# the static type is that it can also be None. The code relies on
# .start() being called and initialising this to a real channel.
self._dfk_channel = None # type: Any

if _db_manager_excepts:
raise _db_manager_excepts

Expand Down Expand Up @@ -197,6 +191,8 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
self.filesystem_proc.start()
logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")

self.radio = MultiprocessingQueueRadio(self.block_msgs)

try:
comm_q_result = comm_q.get(block=True, timeout=120)
except queue.Empty:
Expand All @@ -211,26 +207,14 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat

self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port)

context = zmq.Context()
self.dfk_channel_timeout = 10000 # in milliseconds
self._dfk_channel = context.socket(zmq.DEALER)
self._dfk_channel.setsockopt(zmq.LINGER, 0)
self._dfk_channel.set_hwm(0)
self._dfk_channel.setsockopt(zmq.SNDTIMEO, self.dfk_channel_timeout)
self._dfk_channel.connect("tcp://{}:{}".format(self.hub_address, zmq_port))

logger.info("Monitoring Hub initialized")

return zmq_port

# TODO: tighten the Any message format
def send(self, mtype: MessageType, message: Any) -> None:
logger.debug("Sending message type {}".format(mtype))
try:
self._dfk_channel.send_pyobj((mtype, message))
except zmq.Again:
logger.exception(
"The monitoring message sent from DFK to router timed-out after {}ms".format(self.dfk_channel_timeout))
self.radio.send((mtype, message))

def close(self) -> None:
logger.info("Terminating Monitoring Hub")
Expand All @@ -241,9 +225,8 @@ def close(self) -> None:
logger.error("There was a queued exception (Either router or DBM process got exception much earlier?)")
except queue.Empty:
break
if self._dfk_channel and self.monitoring_hub_active:
if self.monitoring_hub_active:
self.monitoring_hub_active = False
self._dfk_channel.close()
if exception_msgs:
for exception_msg in exception_msgs:
logger.error(
Expand Down
15 changes: 15 additions & 0 deletions parsl/monitoring/radios.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from abc import ABCMeta, abstractmethod

from multiprocessing.queues import Queue
from typing import Optional

from parsl.serialize import serialize
Expand Down Expand Up @@ -173,3 +174,17 @@ def send(self, message: object) -> None:
logging.error("Could not send message within timeout limit")
return
return


class MultiprocessingQueueRadio(MonitoringRadio):
"""A monitoring radio intended which connects over a multiprocessing Queue.
This radio is intended to be used on the submit side, where components
in the submit process, or processes launched by multiprocessing, will have
access to a Queue shared with the monitoring database code (bypassing the
monitoring router).
"""
def __init__(self, queue: Queue) -> None:
self.queue = queue

def send(self, message: object) -> None:
self.queue.put((message, 0))
11 changes: 10 additions & 1 deletion parsl/tests/test_monitoring/test_fuzz_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
import socket
import time
import zmq

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,8 +49,16 @@ def test_row_counts():
s.connect((hub_address, hub_zmq_port))
s.sendall(b'fuzzing\r')

context = zmq.Context()
channel_timeout = 10000 # in milliseconds
hub_channel = context.socket(zmq.DEALER)
hub_channel.setsockopt(zmq.LINGER, 0)
hub_channel.set_hwm(0)
hub_channel.setsockopt(zmq.SNDTIMEO, channel_timeout)
hub_channel.connect("tcp://{}:{}".format(hub_address, hub_zmq_port))

# this will send a non-object down the DFK's existing ZMQ connection
parsl.dfk().monitoring._dfk_channel.send(b'FuzzyByte\rSTREAM')
hub_channel.send(b'FuzzyByte\rSTREAM')

# This following attack is commented out, because monitoring is not resilient
# to this.
Expand Down

0 comments on commit 77b3fca

Please sign in to comment.