Skip to content

Commit

Permalink
Exit monitoring router exit on multiprocessing event, not exit message (
Browse files Browse the repository at this point in the history
#3330)

Prior to this PR, the monitoring router exited due to receiving a
WORKFLOW_INFO message with an exit_now field set to True, but only if that
message was received through a specific path.

This PR removes that exit_now field, and makes the monitoring router exit
on a multiprocessing event. This removes the need for the exit message to
arrive through that specific path into the router, which makes message
handling more consistent, and opens up opportunities to feed messages into
monitoring through different paths.

Slowly ongoing work has been trying to make all the different monitoring
message paths behave the same with a goal of eliminating some of them, and
this change also works towards that.

Co-authored-by: Kevin Hunter Kesling <[email protected]>
  • Loading branch information
benclifford and khk-globus authored Apr 10, 2024
1 parent 76855fd commit eb25bfc
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
3 changes: 1 addition & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1227,8 +1227,7 @@ def cleanup(self) -> None:
'tasks_completed_count': self.task_state_counts[States.exec_done],
"time_began": self.time_began,
'time_completed': self.time_completed,
'run_id': self.run_id, 'rundir': self.run_dir,
'exit_now': True})
'run_id': self.run_id, 'rundir': self.run_dir})

logger.info("Terminating monitoring")
self.monitoring.close()
Expand Down
10 changes: 9 additions & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import os
import time
import logging
import multiprocessing.synchronize as ms
import typeguard
import zmq

import queue

from parsl.multiprocessing import ForkProcess, SizedQueue
from multiprocessing import Process
from multiprocessing import Event
from multiprocessing.queues import Queue
from parsl.log_utils import set_file_logger
from parsl.utils import RepresentationMixin
Expand Down Expand Up @@ -157,8 +159,12 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
self.block_msgs: Queue[AddressedMonitoringMessage]
self.block_msgs = SizedQueue()

self.router_exit_event: ms.Event
self.router_exit_event = Event()

self.router_proc = ForkProcess(target=router_starter,
args=(comm_q, self.exception_q, self.priority_msgs, self.node_msgs, self.block_msgs, self.resource_msgs),
args=(comm_q, self.exception_q, self.priority_msgs, self.node_msgs,
self.block_msgs, self.resource_msgs, self.router_exit_event),
kwargs={"hub_address": self.hub_address,
"udp_port": self.hub_port,
"zmq_port_range": self.hub_port_range,
Expand Down Expand Up @@ -249,6 +255,8 @@ def close(self) -> None:
self.router_proc.terminate()
self.dbm_proc.terminate()
self.filesystem_proc.terminate()
logger.info("Setting router termination event")
self.router_exit_event.set()
logger.info("Waiting for router to terminate")
self.router_proc.join()
logger.debug("Finished waiting for router termination")
Expand Down
13 changes: 7 additions & 6 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from parsl.monitoring.message_type import MessageType
from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage

from multiprocessing.synchronize import Event
from typing import Optional, Tuple, Union


Expand Down Expand Up @@ -98,10 +100,10 @@ def start(self,
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
resource_msgs: "queue.Queue[AddressedMonitoringMessage]") -> None:
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
exit_event: Event) -> None:
try:
router_keep_going = True
while router_keep_going:
while not exit_event.is_set():
try:
data, addr = self.udp_sock.recvfrom(2048)
resource_msg = pickle.loads(data)
Expand Down Expand Up @@ -135,8 +137,6 @@ def start(self,
priority_msgs.put(msg_0)
elif msg[0] == MessageType.WORKFLOW_INFO:
priority_msgs.put(msg_0)
if 'exit_now' in msg[1] and msg[1]['exit_now']:
router_keep_going = False
else:
# There is a type: ignore here because if msg[0]
# is of the correct type, this code is unreachable,
Expand Down Expand Up @@ -178,6 +178,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
exit_event: Event,

hub_address: str,
udp_port: Optional[int],
Expand All @@ -202,7 +203,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",

router.logger.info("Starting MonitoringRouter in router_starter")
try:
router.start(priority_msgs, node_msgs, block_msgs, resource_msgs)
router.start(priority_msgs, node_msgs, block_msgs, resource_msgs, exit_event)
except Exception as e:
router.logger.exception("router.start exception")
exception_q.put(('Hub', str(e)))

0 comments on commit eb25bfc

Please sign in to comment.