From 7622caada72429c63df3c73546c1d711cbb8e078 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 2 Dec 2024 18:43:16 +0000 Subject: [PATCH 1/4] Make result queue poll for shutdown, and tidy up at shutdown (#3709) This poll happens at the configured htex poll period, which defaults to 10ms. Under heavy result load, this shoudn't result in much additional load: the poll loop will already be looping a lot to process the results. Under lower result load, there is a slight observable increase in CPU usage: a 30second sleep task shows this before this PR: before: real 0m37.451s user 0m2.160s sys 0m0.376s run 2, user 2.160s run 3, user 2.116s and this after this PR: real 0m37.473s user 0m2.400s sys 0m0.557s Run 2, 2.457s Run 3, 2.452s At shutdown, the ZMQ socket for incoming results is closed. This reduces both the number of threads and number of file descriptors left behind by the `--config local` tests. For example: $ pytest parsl/tests/test_monitoring/ --config local Before this PR, at end of test: 32 threads, 451 fds open. After this PR, at end of test: 1 thread, 48 fds open. This is part of PR #3397 shutdown tidyup. # Description Please include a summary of the change and (optionally) which issue is fixed. Please also include relevant motivation and context. # Changed Behaviour nothing should be really visible to normal users. Increased CPU usage in the above documented situations. ## Type of change - New feature - Code maintenance/cleanup --- parsl/executors/high_throughput/executor.py | 17 +++++++++++++++-- parsl/executors/high_throughput/zmq_pipes.py | 17 +++++++++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 882776cf5c..d3fa522619 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -331,6 +331,9 @@ def __init__(self, interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD self.interchange_launch_cmd = interchange_launch_cmd + self._result_queue_thread_exit = threading.Event() + self._result_queue_thread: Optional[threading.Thread] = None + radio_mode = "htex" enable_mpi_mode: bool = False mpi_launcher: str = "mpiexec" @@ -455,9 +458,11 @@ def _result_queue_worker(self): """ logger.debug("Result queue worker starting") - while not self.bad_state_is_set: + while not self.bad_state_is_set and not self._result_queue_thread_exit.is_set(): try: - msgs = self.incoming_q.get() + msgs = self.incoming_q.get(timeout_ms=self.poll_period) + if msgs is None: # timeout + continue except IOError as e: logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e)) @@ -515,6 +520,8 @@ def _result_queue_worker(self): else: raise BadMessage("Message received with unknown type {}".format(msg['type'])) + logger.info("Closing result ZMQ pipe") + self.incoming_q.close() logger.info("Result queue worker finished") def _start_local_interchange_process(self) -> None: @@ -817,6 +824,8 @@ def shutdown(self, timeout: float = 10.0): logger.info("Attempting HighThroughputExecutor shutdown") + logger.info("Terminating interchange and result queue thread") + self._result_queue_thread_exit.set() self.interchange_proc.terminate() try: self.interchange_proc.wait(timeout=timeout) @@ -841,6 +850,10 @@ def shutdown(self, timeout: float = 10.0): logger.info("Closing command client") self.command_client.close() + logger.info("Waiting for result queue thread exit") + if self._result_queue_thread: + self._result_queue_thread.join() + logger.info("Finished HighThroughputExecutor shutdown attempt") def get_usage_information(self): diff --git a/parsl/executors/high_throughput/zmq_pipes.py b/parsl/executors/high_throughput/zmq_pipes.py index 54ed8c1da9..a7278cf067 100644 --- a/parsl/executors/high_throughput/zmq_pipes.py +++ b/parsl/executors/high_throughput/zmq_pipes.py @@ -206,12 +206,21 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None): self.port = self.results_receiver.bind_to_random_port(tcp_url(ip_address), min_port=port_range[0], max_port=port_range[1]) + self.poller = zmq.Poller() + self.poller.register(self.results_receiver, zmq.POLLIN) - def get(self): + def get(self, timeout_ms=None): + """Get a message from the queue, returning None if timeout expires + without a message. timeout is measured in milliseconds. + """ logger.debug("Waiting for ResultsIncoming message") - m = self.results_receiver.recv_multipart() - logger.debug("Received ResultsIncoming message") - return m + socks = dict(self.poller.poll(timeout=timeout_ms)) + if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN: + m = self.results_receiver.recv_multipart() + logger.debug("Received ResultsIncoming message") + return m + else: + return None def close(self): self.results_receiver.close() From 1f583af76370eebd9640eff059cfd674ea1d0325 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 2 Dec 2024 18:44:40 +0000 Subject: [PATCH 2/4] Remove monitoring UDP hub_port from example/test configs (#3710) This port will be chosen dynamically, and that is fine in test situations. This is probably also better in environments where users run multiple Parsl instances at once: they cannot all use the removed port 55055 and will interfere with each other. Ongoing work to refactor monitoring radios will remove this hub_port parameter, making it either unnecessary (when non-UDP monitoring radios are used) or specified as part of the radio selection. This PR removes hub_port from tests and examples as preparation for that change. # Changed Behaviour The monitoring UDP port will be chosen dynamically in tests and examples. ## Type of change - Code maintenance/cleanup --- docs/userguide/monitoring.rst | 1 - parsl/configs/ASPIRE1.py | 1 - parsl/tests/configs/htex_local_alternate.py | 1 - parsl/tests/configs/local_threads_monitoring.py | 1 - parsl/tests/manual_tests/test_udp_simple.py | 1 - .../tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py | 1 - parsl/tests/test_monitoring/test_stdouterr.py | 1 - 7 files changed, 7 deletions(-) diff --git a/docs/userguide/monitoring.rst b/docs/userguide/monitoring.rst index 3404862450..02b3177ca7 100644 --- a/docs/userguide/monitoring.rst +++ b/docs/userguide/monitoring.rst @@ -42,7 +42,6 @@ configuration. Here the `parsl.monitoring.MonitoringHub` is specified to use por ], monitoring=MonitoringHub( hub_address=address_by_hostname(), - hub_port=55055, monitoring_debug=False, resource_monitoring_interval=10, ), diff --git a/parsl/configs/ASPIRE1.py b/parsl/configs/ASPIRE1.py index 7792f15dba..017e1061d7 100644 --- a/parsl/configs/ASPIRE1.py +++ b/parsl/configs/ASPIRE1.py @@ -34,7 +34,6 @@ ], monitoring=MonitoringHub( hub_address=address_by_interface('ib0'), - hub_port=55055, resource_monitoring_interval=10, ), strategy='simple', diff --git a/parsl/tests/configs/htex_local_alternate.py b/parsl/tests/configs/htex_local_alternate.py index d84a07ad84..5667f3ff8c 100644 --- a/parsl/tests/configs/htex_local_alternate.py +++ b/parsl/tests/configs/htex_local_alternate.py @@ -62,7 +62,6 @@ def fresh_config(): retries=2, monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, monitoring_debug=False, resource_monitoring_interval=1, ), diff --git a/parsl/tests/configs/local_threads_monitoring.py b/parsl/tests/configs/local_threads_monitoring.py index 81b9095285..9f105af25d 100644 --- a/parsl/tests/configs/local_threads_monitoring.py +++ b/parsl/tests/configs/local_threads_monitoring.py @@ -5,7 +5,6 @@ config = Config(executors=[ThreadPoolExecutor(label='threads', max_threads=4)], monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, resource_monitoring_interval=3, ) ) diff --git a/parsl/tests/manual_tests/test_udp_simple.py b/parsl/tests/manual_tests/test_udp_simple.py index 8de257d8fa..847f8803bf 100644 --- a/parsl/tests/manual_tests/test_udp_simple.py +++ b/parsl/tests/manual_tests/test_udp_simple.py @@ -15,7 +15,6 @@ def local_setup(): ], monitoring=MonitoringHub( hub_address="127.0.0.1", - hub_port=55055, logging_level=logging.INFO, resource_monitoring_interval=10)) diff --git a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py index ada972e747..c54486f011 100644 --- a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py +++ b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py @@ -37,7 +37,6 @@ def fresh_config(run_dir, strategy, db_url): strategy_period=0.1, monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, logging_endpoint=db_url ) ) diff --git a/parsl/tests/test_monitoring/test_stdouterr.py b/parsl/tests/test_monitoring/test_stdouterr.py index d1817164c0..8e1935045f 100644 --- a/parsl/tests/test_monitoring/test_stdouterr.py +++ b/parsl/tests/test_monitoring/test_stdouterr.py @@ -37,7 +37,6 @@ def fresh_config(run_dir): strategy_period=0.1, monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, ) ) From 90d4a8683527af24c8432debe02d85b7a6622e87 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 3 Dec 2024 08:03:09 +0000 Subject: [PATCH 3/4] Move execute_wait from vestigial LocalChannel into parsl.utils (#3705) # Description This moves the execute_wait functionality previously provided by LocalChannel into parsl.utils, as part of #3515. LocalChannel.execute_wait did not reference `self` so it already basically behaved as a function rather than a method. This leaves LocalChannel as solely a place for script_directory, which will be untangled in a subsequent PR. # Changed Behaviour none ## Type of change - Code maintenance/cleanup --- parsl/channels/base.py | 34 +----------------- parsl/channels/local/local.py | 35 ------------------- parsl/providers/cluster_provider.py | 3 +- parsl/providers/local/local.py | 10 +++--- parsl/tests/test_channels/__init__.py | 0 .../tests/test_channels/test_large_output.py | 22 ------------ .../tests/test_channels/test_local_channel.py | 19 ---------- parsl/tests/test_utils/test_execute_wait.py | 35 +++++++++++++++++++ parsl/utils.py | 35 +++++++++++++++++++ 9 files changed, 78 insertions(+), 115 deletions(-) delete mode 100644 parsl/tests/test_channels/__init__.py delete mode 100644 parsl/tests/test_channels/test_large_output.py delete mode 100644 parsl/tests/test_channels/test_local_channel.py create mode 100644 parsl/tests/test_utils/test_execute_wait.py diff --git a/parsl/channels/base.py b/parsl/channels/base.py index 05241b878d..0024d45a6b 100644 --- a/parsl/channels/base.py +++ b/parsl/channels/base.py @@ -1,39 +1,7 @@ -from abc import ABCMeta, abstractmethod, abstractproperty -from typing import Tuple +from abc import ABCMeta, abstractproperty class Channel(metaclass=ABCMeta): - """Channels are abstractions that enable ExecutionProviders to talk to - resource managers of remote compute facilities. - - For certain resources such as campus clusters or supercomputers at - research laboratories, resource requirements may require authentication. - - The only remaining Channel, *LocalChannel*, executes commands locally in a - shell. - - Channels provide the ability to execute commands remotely, using the - execute_wait method, and manipulate the remote file system using methods - such as push_file, pull_file and makedirs. - - Channels should ensure that each launched command runs in a new process - group, so that providers (such as LocalProvider) which terminate long - running commands using process groups can do so. - """ - - @abstractmethod - def execute_wait(self, cmd: str, walltime: int = 0) -> Tuple[int, str, str]: - ''' Executes the cmd, with a defined walltime. - - Args: - - cmd (string): Command string to execute over the channel - - walltime (int) : Timeout in seconds - - Returns: - - (exit_code, stdout, stderr) (int, string, string) - ''' - pass - @abstractproperty def script_dir(self) -> str: ''' This is a property. Returns the directory assigned for storing all internal scripts such as diff --git a/parsl/channels/local/local.py b/parsl/channels/local/local.py index 6ef014ac19..4c1712ef30 100644 --- a/parsl/channels/local/local.py +++ b/parsl/channels/local/local.py @@ -1,6 +1,5 @@ import logging import os -import subprocess from parsl.channels.base import Channel from parsl.utils import RepresentationMixin @@ -21,40 +20,6 @@ def __init__(self): ''' self.script_dir = None - def execute_wait(self, cmd, walltime=None): - ''' Synchronously execute a commandline string on the shell. - - Args: - - cmd (string) : Commandline string to execute - - walltime (int) : walltime in seconds - - Returns: - - retcode : Return code from the execution - - stdout : stdout string - - stderr : stderr string - ''' - try: - logger.debug("Creating process with command '%s'", cmd) - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - preexec_fn=os.setpgrp - ) - logger.debug("Created process with pid %s. Performing communicate", proc.pid) - (stdout, stderr) = proc.communicate(timeout=walltime) - retcode = proc.returncode - logger.debug("Process %s returned %s", proc.pid, proc.returncode) - - except Exception: - logger.exception(f"Execution of command failed:\n{cmd}") - raise - else: - logger.debug("Execution of command in process %s completed normally", proc.pid) - - return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8")) - @property def script_dir(self): return self._script_dir diff --git a/parsl/providers/cluster_provider.py b/parsl/providers/cluster_provider.py index 6bc76bdf22..f2feb2ddf3 100644 --- a/parsl/providers/cluster_provider.py +++ b/parsl/providers/cluster_provider.py @@ -6,6 +6,7 @@ from parsl.launchers.errors import BadLauncher from parsl.providers.base import ExecutionProvider from parsl.providers.errors import SchedulerMissingArgs, ScriptPathError +from parsl.utils import execute_wait logger = logging.getLogger(__name__) @@ -76,7 +77,7 @@ def execute_wait(self, cmd, timeout=None): t = self.cmd_timeout if timeout is not None: t = timeout - return self.channel.execute_wait(cmd, t) + return execute_wait(cmd, t) def _write_submit_script(self, template, script_filename, job_name, configs): """Generate submit script and write it to a file. diff --git a/parsl/providers/local/local.py b/parsl/providers/local/local.py index 5ecf174df2..6357c85cba 100644 --- a/parsl/providers/local/local.py +++ b/parsl/providers/local/local.py @@ -11,7 +11,7 @@ ScriptPathError, SubmitException, ) -from parsl.utils import RepresentationMixin +from parsl.utils import RepresentationMixin, execute_wait logger = logging.getLogger(__name__) @@ -118,7 +118,7 @@ def status(self, job_ids): return [self.resources[jid]['status'] for jid in job_ids] def _is_alive(self, job_dict): - retcode, stdout, stderr = self.channel.execute_wait( + retcode, stdout, stderr = execute_wait( 'ps -p {} > /dev/null 2> /dev/null; echo "STATUS:$?" '.format( job_dict['remote_pid']), self.cmd_timeout) for line in stdout.split('\n'): @@ -223,11 +223,11 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"): # cancel the task later. # # We need to do the >/dev/null 2>&1 so that bash closes stdout, otherwise - # channel.execute_wait hangs reading the process stdout until all the + # execute_wait hangs reading the process stdout until all the # background commands complete. cmd = '/bin/bash -c \'echo - >{0}.ec && {{ {{ bash {0} 1>{0}.out 2>{0}.err ; ' \ 'echo $? > {0}.ec ; }} >/dev/null 2>&1 & echo "PID:$!" ; }}\''.format(script_path) - retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout) + retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout) if retcode != 0: raise SubmitException(job_name, "Launch command exited with code {0}".format(retcode), stdout, stderr) @@ -258,7 +258,7 @@ def cancel(self, job_ids): job_dict['cancelled'] = True logger.debug("Terminating job/process ID: {0}".format(job)) cmd = "kill -- -$(ps -o pgid= {} | grep -o '[0-9]*')".format(job_dict['remote_pid']) - retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout) + retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout) if retcode != 0: logger.warning("Failed to kill PID: {} and child processes on {}".format(job_dict['remote_pid'], self.label)) diff --git a/parsl/tests/test_channels/__init__.py b/parsl/tests/test_channels/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/parsl/tests/test_channels/test_large_output.py b/parsl/tests/test_channels/test_large_output.py deleted file mode 100644 index bfc96f38bc..0000000000 --- a/parsl/tests/test_channels/test_large_output.py +++ /dev/null @@ -1,22 +0,0 @@ -import pytest - -from parsl.channels.local.local import LocalChannel - - -@pytest.mark.local -def test_local_large_output_2210(): - """Regression test for #2210. - The local channel was hanging if the specified command gave too - much output, due to a race condition between process exiting and - pipes filling up. - """ - - c = LocalChannel() - - # this will output 128kb of stdout - c.execute_wait("yes | dd count=128 bs=1024", walltime=60) - - # if this test fails, execute_wait should raise a timeout - # exception. - - # The contents out the output is not verified by this test diff --git a/parsl/tests/test_channels/test_local_channel.py b/parsl/tests/test_channels/test_local_channel.py deleted file mode 100644 index a3f55d096d..0000000000 --- a/parsl/tests/test_channels/test_local_channel.py +++ /dev/null @@ -1,19 +0,0 @@ -import pytest - -from parsl.channels.local.local import LocalChannel - - -@pytest.mark.local -def test_env(): - ''' Regression testing for issue #27 - ''' - - lc = LocalChannel() - rc, stdout, stderr = lc.execute_wait("env", 1) - - stdout = stdout.split('\n') - x = [s for s in stdout if s.startswith("PATH=")] - assert x, "PATH not found" - - x = [s for s in stdout if s.startswith("HOME=")] - assert x, "HOME not found" diff --git a/parsl/tests/test_utils/test_execute_wait.py b/parsl/tests/test_utils/test_execute_wait.py new file mode 100644 index 0000000000..44488c239c --- /dev/null +++ b/parsl/tests/test_utils/test_execute_wait.py @@ -0,0 +1,35 @@ +import pytest + +from parsl.utils import execute_wait + + +@pytest.mark.local +def test_env(): + ''' Regression testing for issue #27 + ''' + + rc, stdout, stderr = execute_wait("env", 1) + + stdout = stdout.split('\n') + x = [s for s in stdout if s.startswith("PATH=")] + assert x, "PATH not found" + + x = [s for s in stdout if s.startswith("HOME=")] + assert x, "HOME not found" + + +@pytest.mark.local +def test_large_output_2210(): + """Regression test for #2210. + execute_wait was hanging if the specified command gave too + much output, due to a race condition between process exiting and + pipes filling up. + """ + + # this will output 128kb of stdout + execute_wait("yes | dd count=128 bs=1024", walltime=60) + + # if this test fails, execute_wait should raise a timeout + # exception. + + # The contents out the output is not verified by this test diff --git a/parsl/utils.py b/parsl/utils.py index 0ea5d7d9eb..b6544d63d2 100644 --- a/parsl/utils.py +++ b/parsl/utils.py @@ -458,3 +458,38 @@ def sanitize_dns_subdomain_rfc1123(raw_string: str) -> str: raise ValueError(f"Sanitized DNS subdomain is empty for input '{raw_string}'") return sanitized + + +def execute_wait(cmd: str, walltime: Optional[int] = None) -> Tuple[int, str, str]: + ''' Synchronously execute a commandline string on the shell. + + Args: + - cmd (string) : Commandline string to execute + - walltime (int) : walltime in seconds + + Returns: + - retcode : Return code from the execution + - stdout : stdout string + - stderr : stderr string + ''' + try: + logger.debug("Creating process with command '%s'", cmd) + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + preexec_fn=os.setpgrp + ) + logger.debug("Created process with pid %s. Performing communicate", proc.pid) + (stdout, stderr) = proc.communicate(timeout=walltime) + retcode = proc.returncode + logger.debug("Process %s returned %s", proc.pid, proc.returncode) + + except Exception: + logger.exception(f"Execution of command failed:\n{cmd}") + raise + else: + logger.debug("Execution of command in process %s completed normally", proc.pid) + + return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8")) From 3a18a4a98435a22551ac61977b242b055cf71efd Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 3 Dec 2024 08:09:13 +0000 Subject: [PATCH 4/4] Move monitoring radios to own modules (#3707) In subsequent PRs, these modules will get more radio-specific code, as part of PR #3315 monitoring radio plugin work: for example, the receiving code for each radio should move here too. I used `git show --color-moved` to check that the moved RadioSender definitions were not changed. # Changed Behaviour none ## Type of change - Code maintenance/cleanup --- parsl/executors/base.py | 2 +- .../executors/high_throughput/interchange.py | 3 +- parsl/monitoring/monitoring.py | 2 +- parsl/monitoring/radios.py | 191 ------------------ parsl/monitoring/radios/__init__.py | 0 parsl/monitoring/radios/base.py | 13 ++ parsl/monitoring/radios/filesystem.py | 52 +++++ parsl/monitoring/radios/htex.py | 57 ++++++ parsl/monitoring/radios/multiprocessing.py | 17 ++ parsl/monitoring/radios/udp.py | 56 +++++ parsl/monitoring/radios/zmq.py | 17 ++ parsl/monitoring/remote.py | 10 +- parsl/monitoring/router.py | 2 +- 13 files changed, 221 insertions(+), 201 deletions(-) delete mode 100644 parsl/monitoring/radios.py create mode 100644 parsl/monitoring/radios/__init__.py create mode 100644 parsl/monitoring/radios/base.py create mode 100644 parsl/monitoring/radios/filesystem.py create mode 100644 parsl/monitoring/radios/htex.py create mode 100644 parsl/monitoring/radios/multiprocessing.py create mode 100644 parsl/monitoring/radios/udp.py create mode 100644 parsl/monitoring/radios/zmq.py diff --git a/parsl/executors/base.py b/parsl/executors/base.py index a112b9eb00..fc97db89d3 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -5,7 +5,7 @@ from typing_extensions import Literal, Self -from parsl.monitoring.radios import MonitoringRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender class ParslExecutor(metaclass=ABCMeta): diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 88bb6c7156..12d3e07f31 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -20,7 +20,8 @@ from parsl.executors.high_throughput.manager_record import ManagerRecord from parsl.executors.high_throughput.manager_selector import ManagerSelector from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.zmq import ZMQRadioSender from parsl.process_loggers import wrap_with_logs from parsl.serialize import serialize as serialize_object from parsl.utils import setproctitle diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index e82c8fb688..3fbe5736ba 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -14,7 +14,7 @@ from parsl.log_utils import set_file_logger from parsl.monitoring.errors import MonitoringHubStartError -from parsl.monitoring.radios import MultiprocessingQueueRadioSender +from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter from parsl.monitoring.types import TaggedMonitoringMessage from parsl.multiprocessing import ForkProcess, SizedQueue diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py deleted file mode 100644 index 14dc046557..0000000000 --- a/parsl/monitoring/radios.py +++ /dev/null @@ -1,191 +0,0 @@ -import logging -import os -import pickle -import socket -import uuid -from abc import ABCMeta, abstractmethod -from multiprocessing.queues import Queue - -import zmq - -logger = logging.getLogger(__name__) - - -class MonitoringRadioSender(metaclass=ABCMeta): - @abstractmethod - def send(self, message: object) -> None: - pass - - -class FilesystemRadioSender(MonitoringRadioSender): - """A MonitoringRadioSender that sends messages over a shared filesystem. - - The messsage directory structure is based on maildir, - https://en.wikipedia.org/wiki/Maildir - - The writer creates a message in tmp/ and then when it is fully - written, moves it atomically into new/ - - The reader ignores tmp/ and only reads and deletes messages from - new/ - - This avoids a race condition of reading partially written messages. - - This radio is likely to give higher shared filesystem load compared to - the UDP radio, but should be much more reliable. - """ - - def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str): - logger.info("filesystem based monitoring channel initializing") - self.base_path = f"{run_dir}/monitor-fs-radio/" - self.tmp_path = f"{self.base_path}/tmp" - self.new_path = f"{self.base_path}/new" - - os.makedirs(self.tmp_path, exist_ok=True) - os.makedirs(self.new_path, exist_ok=True) - - def send(self, message: object) -> None: - logger.info("Sending a monitoring message via filesystem") - - unique_id = str(uuid.uuid4()) - - tmp_filename = f"{self.tmp_path}/{unique_id}" - new_filename = f"{self.new_path}/{unique_id}" - buffer = message - - # this will write the message out then atomically - # move it into new/, so that a partially written - # file will never be observed in new/ - with open(tmp_filename, "wb") as f: - pickle.dump(buffer, f) - os.rename(tmp_filename, new_filename) - - -class HTEXRadioSender(MonitoringRadioSender): - - def __init__(self, monitoring_url: str, timeout: int = 10): - """ - Parameters - ---------- - - monitoring_url : str - URL of the form ://: - timeout : int - timeout, default=10s - """ - logger.info("htex-based monitoring channel initialising") - - def send(self, message: object) -> None: - """ Sends a message to the UDP receiver - - Parameter - --------- - - message: object - Arbitrary pickle-able object that is to be sent - - Returns: - None - """ - - import parsl.executors.high_throughput.monitoring_info - - result_queue = parsl.executors.high_throughput.monitoring_info.result_queue - - # this message needs to go in the result queue tagged so that it is treated - # i) as a monitoring message by the interchange, and then further more treated - # as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO - # which is the implicit default for messages from the interchange) - - # for the interchange, the outer wrapper, this needs to be a dict: - - interchange_msg = { - 'type': 'monitoring', - 'payload': message - } - - if result_queue: - result_queue.put(pickle.dumps(interchange_msg)) - else: - logger.error("result_queue is uninitialized - cannot put monitoring message") - - return - - -class UDPRadioSender(MonitoringRadioSender): - - def __init__(self, monitoring_url: str, timeout: int = 10): - """ - Parameters - ---------- - - monitoring_url : str - URL of the form ://: - timeout : int - timeout, default=10s - """ - self.monitoring_url = monitoring_url - self.sock_timeout = timeout - try: - self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':')) - self.port = int(port) - except Exception: - raise Exception("Failed to parse monitoring url: {}".format(monitoring_url)) - - self.sock = socket.socket(socket.AF_INET, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP) # UDP - self.sock.settimeout(self.sock_timeout) - - def send(self, message: object) -> None: - """ Sends a message to the UDP receiver - - Parameter - --------- - - message: object - Arbitrary pickle-able object that is to be sent - - Returns: - None - """ - try: - buffer = pickle.dumps(message) - except Exception: - logging.exception("Exception during pickling", exc_info=True) - return - - try: - self.sock.sendto(buffer, (self.ip, self.port)) - except socket.timeout: - logging.error("Could not send message within timeout limit") - return - return - - -class MultiprocessingQueueRadioSender(MonitoringRadioSender): - """A monitoring radio which connects over a multiprocessing Queue. - This radio is intended to be used on the submit side, where components - in the submit process, or processes launched by multiprocessing, will have - access to a Queue shared with the monitoring database code (bypassing the - monitoring router). - """ - def __init__(self, queue: Queue) -> None: - self.queue = queue - - def send(self, message: object) -> None: - self.queue.put(message) - - -class ZMQRadioSender(MonitoringRadioSender): - """A monitoring radio which connects over ZMQ. This radio is not - thread-safe, because its use of ZMQ is not thread-safe. - """ - - def __init__(self, hub_address: str, hub_zmq_port: int) -> None: - self._hub_channel = zmq.Context().socket(zmq.DEALER) - self._hub_channel.set_hwm(0) - self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") - - def send(self, message: object) -> None: - self._hub_channel.send_pyobj(message) diff --git a/parsl/monitoring/radios/__init__.py b/parsl/monitoring/radios/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/parsl/monitoring/radios/base.py b/parsl/monitoring/radios/base.py new file mode 100644 index 0000000000..2bb799f256 --- /dev/null +++ b/parsl/monitoring/radios/base.py @@ -0,0 +1,13 @@ +import logging +from abc import ABCMeta, abstractmethod +from typing import Optional + +_db_manager_excepts: Optional[Exception] + +logger = logging.getLogger(__name__) + + +class MonitoringRadioSender(metaclass=ABCMeta): + @abstractmethod + def send(self, message: object) -> None: + pass diff --git a/parsl/monitoring/radios/filesystem.py b/parsl/monitoring/radios/filesystem.py new file mode 100644 index 0000000000..accff87d36 --- /dev/null +++ b/parsl/monitoring/radios/filesystem.py @@ -0,0 +1,52 @@ +import logging +import os +import pickle +import uuid + +from parsl.monitoring.radios.base import MonitoringRadioSender + +logger = logging.getLogger(__name__) + + +class FilesystemRadioSender(MonitoringRadioSender): + """A MonitoringRadioSender that sends messages over a shared filesystem. + + The messsage directory structure is based on maildir, + https://en.wikipedia.org/wiki/Maildir + + The writer creates a message in tmp/ and then when it is fully + written, moves it atomically into new/ + + The reader ignores tmp/ and only reads and deletes messages from + new/ + + This avoids a race condition of reading partially written messages. + + This radio is likely to give higher shared filesystem load compared to + the UDP radio, but should be much more reliable. + """ + + def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str): + logger.info("filesystem based monitoring channel initializing") + self.base_path = f"{run_dir}/monitor-fs-radio/" + self.tmp_path = f"{self.base_path}/tmp" + self.new_path = f"{self.base_path}/new" + + os.makedirs(self.tmp_path, exist_ok=True) + os.makedirs(self.new_path, exist_ok=True) + + def send(self, message: object) -> None: + logger.info("Sending a monitoring message via filesystem") + + unique_id = str(uuid.uuid4()) + + tmp_filename = f"{self.tmp_path}/{unique_id}" + new_filename = f"{self.new_path}/{unique_id}" + buffer = message + + # this will write the message out then atomically + # move it into new/, so that a partially written + # file will never be observed in new/ + with open(tmp_filename, "wb") as f: + pickle.dump(buffer, f) + os.rename(tmp_filename, new_filename) diff --git a/parsl/monitoring/radios/htex.py b/parsl/monitoring/radios/htex.py new file mode 100644 index 0000000000..bdb893b303 --- /dev/null +++ b/parsl/monitoring/radios/htex.py @@ -0,0 +1,57 @@ +import logging +import pickle + +from parsl.monitoring.radios.base import MonitoringRadioSender + +logger = logging.getLogger(__name__) + + +class HTEXRadioSender(MonitoringRadioSender): + + def __init__(self, monitoring_url: str, timeout: int = 10): + """ + Parameters + ---------- + + monitoring_url : str + URL of the form ://: + timeout : int + timeout, default=10s + """ + logger.info("htex-based monitoring channel initialising") + + def send(self, message: object) -> None: + """ Sends a message to the UDP receiver + + Parameter + --------- + + message: object + Arbitrary pickle-able object that is to be sent + + Returns: + None + """ + + import parsl.executors.high_throughput.monitoring_info + + result_queue = parsl.executors.high_throughput.monitoring_info.result_queue + + # this message needs to go in the result queue tagged so that it is treated + # i) as a monitoring message by the interchange, and then further more treated + # as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO + # which is the implicit default for messages from the interchange) + + # for the interchange, the outer wrapper, this needs to be a dict: + + interchange_msg = { + 'type': 'monitoring', + 'payload': message + } + + if result_queue: + result_queue.put(pickle.dumps(interchange_msg)) + else: + logger.error("result_queue is uninitialized - cannot put monitoring message") + + return diff --git a/parsl/monitoring/radios/multiprocessing.py b/parsl/monitoring/radios/multiprocessing.py new file mode 100644 index 0000000000..6274bbfca8 --- /dev/null +++ b/parsl/monitoring/radios/multiprocessing.py @@ -0,0 +1,17 @@ +from multiprocessing.queues import Queue + +from parsl.monitoring.radios.base import MonitoringRadioSender + + +class MultiprocessingQueueRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over a multiprocessing Queue. + This radio is intended to be used on the submit side, where components + in the submit process, or processes launched by multiprocessing, will have + access to a Queue shared with the monitoring database code (bypassing the + monitoring router). + """ + def __init__(self, queue: Queue) -> None: + self.queue = queue + + def send(self, message: object) -> None: + self.queue.put(message) diff --git a/parsl/monitoring/radios/udp.py b/parsl/monitoring/radios/udp.py new file mode 100644 index 0000000000..f2a652e9ac --- /dev/null +++ b/parsl/monitoring/radios/udp.py @@ -0,0 +1,56 @@ +import logging +import pickle +import socket + +from parsl.monitoring.radios.base import MonitoringRadioSender + + +class UDPRadioSender(MonitoringRadioSender): + + def __init__(self, monitoring_url: str, timeout: int = 10): + """ + Parameters + ---------- + + monitoring_url : str + URL of the form ://: + timeout : int + timeout, default=10s + """ + self.monitoring_url = monitoring_url + self.sock_timeout = timeout + try: + self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':')) + self.port = int(port) + except Exception: + raise Exception("Failed to parse monitoring url: {}".format(monitoring_url)) + + self.sock = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP) # UDP + self.sock.settimeout(self.sock_timeout) + + def send(self, message: object) -> None: + """ Sends a message to the UDP receiver + + Parameter + --------- + + message: object + Arbitrary pickle-able object that is to be sent + + Returns: + None + """ + try: + buffer = pickle.dumps(message) + except Exception: + logging.exception("Exception during pickling", exc_info=True) + return + + try: + self.sock.sendto(buffer, (self.ip, self.port)) + except socket.timeout: + logging.error("Could not send message within timeout limit") + return + return diff --git a/parsl/monitoring/radios/zmq.py b/parsl/monitoring/radios/zmq.py new file mode 100644 index 0000000000..397c943568 --- /dev/null +++ b/parsl/monitoring/radios/zmq.py @@ -0,0 +1,17 @@ +import zmq + +from parsl.monitoring.radios.base import MonitoringRadioSender + + +class ZMQRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over ZMQ. This radio is not + thread-safe, because its use of ZMQ is not thread-safe. + """ + + def __init__(self, hub_address: str, hub_zmq_port: int) -> None: + self._hub_channel = zmq.Context().socket(zmq.DEALER) + self._hub_channel.set_hwm(0) + self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") + + def send(self, message: object) -> None: + self._hub_channel.send_pyobj(message) diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index d72b54dc3c..530b39f935 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -7,12 +7,10 @@ from typing import Any, Callable, Dict, List, Sequence, Tuple from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import ( - FilesystemRadioSender, - HTEXRadioSender, - MonitoringRadioSender, - UDPRadioSender, -) +from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.filesystem import FilesystemRadioSender +from parsl.monitoring.radios.htex import HTEXRadioSender +from parsl.monitoring.radios.udp import UDPRadioSender from parsl.multiprocessing import ForkProcess from parsl.process_loggers import wrap_with_logs diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 04e7480a7a..0926712c36 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -14,7 +14,7 @@ import zmq from parsl.log_utils import set_file_logger -from parsl.monitoring.radios import MultiprocessingQueueRadioSender +from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender from parsl.monitoring.types import TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle