Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into benc-remove-channels
Browse files Browse the repository at this point in the history
Conflicts:
parsl/channels/base.py
parsl/channels/local/local.py
parsl/tests/test_channels/test_large_output.py
parsl/tests/test_channels/test_local_channel.py
parsl/utils.py
  • Loading branch information
benclifford committed Dec 3, 2024
2 parents 46d0b9b + 3a18a4a commit e045a6f
Show file tree
Hide file tree
Showing 24 changed files with 266 additions and 234 deletions.
1 change: 0 additions & 1 deletion docs/userguide/monitoring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ configuration. Here the `parsl.monitoring.MonitoringHub` is specified to use por
],
monitoring=MonitoringHub(
hub_address=address_by_hostname(),
hub_port=55055,
monitoring_debug=False,
resource_monitoring_interval=10,
),
Expand Down
1 change: 0 additions & 1 deletion parsl/configs/ASPIRE1.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
],
monitoring=MonitoringHub(
hub_address=address_by_interface('ib0'),
hub_port=55055,
resource_monitoring_interval=10,
),
strategy='simple',
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from typing_extensions import Literal, Self

from parsl.monitoring.radios import MonitoringRadioSender
from parsl.monitoring.radios.base import MonitoringRadioSender


class ParslExecutor(metaclass=ABCMeta):
Expand Down
17 changes: 15 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ def __init__(self,
interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD
self.interchange_launch_cmd = interchange_launch_cmd

self._result_queue_thread_exit = threading.Event()
self._result_queue_thread: Optional[threading.Thread] = None

radio_mode = "htex"
enable_mpi_mode: bool = False
mpi_launcher: str = "mpiexec"
Expand Down Expand Up @@ -455,9 +458,11 @@ def _result_queue_worker(self):
"""
logger.debug("Result queue worker starting")

while not self.bad_state_is_set:
while not self.bad_state_is_set and not self._result_queue_thread_exit.is_set():
try:
msgs = self.incoming_q.get()
msgs = self.incoming_q.get(timeout_ms=self.poll_period)
if msgs is None: # timeout
continue

except IOError as e:
logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e))
Expand Down Expand Up @@ -515,6 +520,8 @@ def _result_queue_worker(self):
else:
raise BadMessage("Message received with unknown type {}".format(msg['type']))

logger.info("Closing result ZMQ pipe")
self.incoming_q.close()
logger.info("Result queue worker finished")

def _start_local_interchange_process(self) -> None:
Expand Down Expand Up @@ -817,6 +824,8 @@ def shutdown(self, timeout: float = 10.0):

logger.info("Attempting HighThroughputExecutor shutdown")

logger.info("Terminating interchange and result queue thread")
self._result_queue_thread_exit.set()
self.interchange_proc.terminate()
try:
self.interchange_proc.wait(timeout=timeout)
Expand All @@ -841,6 +850,10 @@ def shutdown(self, timeout: float = 10.0):
logger.info("Closing command client")
self.command_client.close()

logger.info("Waiting for result queue thread exit")
if self._result_queue_thread:
self._result_queue_thread.join()

logger.info("Finished HighThroughputExecutor shutdown attempt")

def get_usage_information(self):
Expand Down
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.executors.high_throughput.manager_selector import ManagerSelector
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
from parsl.monitoring.radios.base import MonitoringRadioSender
from parsl.monitoring.radios.zmq import ZMQRadioSender
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize as serialize_object
from parsl.utils import setproctitle
Expand Down
17 changes: 13 additions & 4 deletions parsl/executors/high_throughput/zmq_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,21 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None):
self.port = self.results_receiver.bind_to_random_port(tcp_url(ip_address),
min_port=port_range[0],
max_port=port_range[1])
self.poller = zmq.Poller()
self.poller.register(self.results_receiver, zmq.POLLIN)

def get(self):
def get(self, timeout_ms=None):
"""Get a message from the queue, returning None if timeout expires
without a message. timeout is measured in milliseconds.
"""
logger.debug("Waiting for ResultsIncoming message")
m = self.results_receiver.recv_multipart()
logger.debug("Received ResultsIncoming message")
return m
socks = dict(self.poller.poll(timeout=timeout_ms))
if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN:
m = self.results_receiver.recv_multipart()
logger.debug("Received ResultsIncoming message")
return m
else:
return None

def close(self):
self.results_receiver.close()
Expand Down
2 changes: 1 addition & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from parsl.log_utils import set_file_logger
from parsl.monitoring.errors import MonitoringHubStartError
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
from parsl.monitoring.router import router_starter
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.multiprocessing import ForkProcess, SizedQueue
Expand Down
191 changes: 0 additions & 191 deletions parsl/monitoring/radios.py

This file was deleted.

File renamed without changes.
13 changes: 13 additions & 0 deletions parsl/monitoring/radios/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import logging
from abc import ABCMeta, abstractmethod
from typing import Optional

_db_manager_excepts: Optional[Exception]

logger = logging.getLogger(__name__)


class MonitoringRadioSender(metaclass=ABCMeta):
@abstractmethod
def send(self, message: object) -> None:
pass
52 changes: 52 additions & 0 deletions parsl/monitoring/radios/filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
import os
import pickle
import uuid

from parsl.monitoring.radios.base import MonitoringRadioSender

logger = logging.getLogger(__name__)


class FilesystemRadioSender(MonitoringRadioSender):
"""A MonitoringRadioSender that sends messages over a shared filesystem.
The messsage directory structure is based on maildir,
https://en.wikipedia.org/wiki/Maildir
The writer creates a message in tmp/ and then when it is fully
written, moves it atomically into new/
The reader ignores tmp/ and only reads and deletes messages from
new/
This avoids a race condition of reading partially written messages.
This radio is likely to give higher shared filesystem load compared to
the UDP radio, but should be much more reliable.
"""

def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str):
logger.info("filesystem based monitoring channel initializing")
self.base_path = f"{run_dir}/monitor-fs-radio/"
self.tmp_path = f"{self.base_path}/tmp"
self.new_path = f"{self.base_path}/new"

os.makedirs(self.tmp_path, exist_ok=True)
os.makedirs(self.new_path, exist_ok=True)

def send(self, message: object) -> None:
logger.info("Sending a monitoring message via filesystem")

unique_id = str(uuid.uuid4())

tmp_filename = f"{self.tmp_path}/{unique_id}"
new_filename = f"{self.new_path}/{unique_id}"
buffer = message

# this will write the message out then atomically
# move it into new/, so that a partially written
# file will never be observed in new/
with open(tmp_filename, "wb") as f:
pickle.dump(buffer, f)
os.rename(tmp_filename, new_filename)
Loading

0 comments on commit e045a6f

Please sign in to comment.