From 1e618fa1e12030673470c587ea8588c3f6da686e Mon Sep 17 00:00:00 2001 From: Christine Simpson <48525133+cms21@users.noreply.github.com> Date: Mon, 15 Jul 2024 16:44:16 -0500 Subject: [PATCH 01/20] Allow multiple workers to share a CUDA device, intended for use with MPS mode (#3509) This change allows in the case of CUDA devices the ability to set the same value of CUDA_VISIBLE_DEVICES for multiple Parsl workers on a node when using the high throughput executor. This allows the user to make use of the MPS mode for CUDA devices to partition a GPU to run multiple processes per GPU. To use MPS mode with this functionality several settings must be set by the user in their config. * available_accelerators should be set to the total number of GPU processes to be run on the node. For example, for a node with 4 Nvidia GPUS, if you wish to run 4 processes per GPU, available_accelerators should be set to 16. * worker_init should include commands to start the MPS service and set any associated environment variables. For example on the ALCF machine Polaris, it is recommended the user make use of a bash script that starts the MPS service on a node called enable_mps_polaris.sh. worker_init should then contain: worker_init='export NNODES='wc -l < $PBS_NODEFILE'; mpiexec -n ${NNODES} --ppn 1 /path/to/mps/script/enable_mps_polaris.sh' --- docs/userguide/configuring.rst | 3 ++- .../high_throughput/process_worker_pool.py | 21 ++++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index b4165411dd..24ce0ca938 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -346,7 +346,8 @@ Provide either the number of executors (Parsl will assume they are named in inte strategy='none', ) - +For hardware that uses Nvidia devices, Parsl allows for the oversubscription of workers to GPUS. This is intended to make use of Nvidia's `Multi-Process Service (MPS) `_ available on many of their GPUs that allows users to run multiple concurrent processes on a single GPU. The user needs to set in the ``worker_init`` commands to start MPS on every node in the block (this is machine dependent). The ``available_accelerators`` option should then be set to the total number of GPU partitions run on a single node in the block. For example, for a node with 4 Nvidia GPUs, to create 8 workers per GPU, set ``available_accelerators=32``. GPUs will be assigned to workers in ascending order in contiguous blocks. In the example, workers 0-7 will be placed on GPU 0, workers 8-15 on GPU 1, workers 16-23 on GPU 2, and workers 24-31 on GPU 3. + Multi-Threaded Applications --------------------------- diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 5a3b383dad..2d1e2dc720 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -9,6 +9,7 @@ import pickle import platform import queue +import subprocess import sys import threading import time @@ -731,9 +732,27 @@ def worker( os.sched_setaffinity(0, my_cores) # type: ignore[attr-defined, unused-ignore] logger.info("Set worker CPU affinity to {}".format(my_cores)) + # If CUDA devices, find total number of devices to allow for MPS + # See: https://developer.nvidia.com/system-management-interface + nvidia_smi_cmd = "nvidia-smi -L > /dev/null && nvidia-smi -L | wc -l" + nvidia_smi_ret = subprocess.run(nvidia_smi_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if nvidia_smi_ret.returncode == 0: + num_cuda_devices = int(nvidia_smi_ret.stdout.split()[0]) + else: + num_cuda_devices = None + # If desired, pin to accelerator if accelerator is not None: - os.environ["CUDA_VISIBLE_DEVICES"] = accelerator + try: + if num_cuda_devices is not None: + procs_per_cuda_device = pool_size // num_cuda_devices + partitioned_accelerator = str(int(accelerator) // procs_per_cuda_device) # multiple workers will share a GPU + os.environ["CUDA_VISIBLE_DEVICES"] = partitioned_accelerator + logger.info(f'Pinned worker to partitioned cuda device: {partitioned_accelerator}') + else: + os.environ["CUDA_VISIBLE_DEVICES"] = accelerator + except (TypeError, ValueError, ZeroDivisionError): + os.environ["CUDA_VISIBLE_DEVICES"] = accelerator os.environ["ROCR_VISIBLE_DEVICES"] = accelerator os.environ["ZE_AFFINITY_MASK"] = accelerator os.environ["ZE_ENABLE_PCI_ID_DEVICE_ORDER"] = '1' From c3df044b862bd93cd492332217a6e7d9b493a87a Mon Sep 17 00:00:00 2001 From: Christine Simpson <48525133+cms21@users.noreply.github.com> Date: Tue, 16 Jul 2024 12:56:27 -0500 Subject: [PATCH 02/20] Only query cuda devices if available_accelerators is set (#3531) --- .../high_throughput/process_worker_pool.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 2d1e2dc720..5c766123d7 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -732,17 +732,18 @@ def worker( os.sched_setaffinity(0, my_cores) # type: ignore[attr-defined, unused-ignore] logger.info("Set worker CPU affinity to {}".format(my_cores)) - # If CUDA devices, find total number of devices to allow for MPS - # See: https://developer.nvidia.com/system-management-interface - nvidia_smi_cmd = "nvidia-smi -L > /dev/null && nvidia-smi -L | wc -l" - nvidia_smi_ret = subprocess.run(nvidia_smi_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if nvidia_smi_ret.returncode == 0: - num_cuda_devices = int(nvidia_smi_ret.stdout.split()[0]) - else: - num_cuda_devices = None - # If desired, pin to accelerator if accelerator is not None: + + # If CUDA devices, find total number of devices to allow for MPS + # See: https://developer.nvidia.com/system-management-interface + nvidia_smi_cmd = "nvidia-smi -L > /dev/null && nvidia-smi -L | wc -l" + nvidia_smi_ret = subprocess.run(nvidia_smi_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if nvidia_smi_ret.returncode == 0: + num_cuda_devices = int(nvidia_smi_ret.stdout.split()[0]) + else: + num_cuda_devices = None + try: if num_cuda_devices is not None: procs_per_cuda_device = pool_size // num_cuda_devices From 2c19a8fca72681298a3ac71fefb7c325d873f883 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 18 Jul 2024 09:49:42 +0200 Subject: [PATCH 03/20] Bring status() next to poll_facade() which is the only user (#3530) This should not change any behaviour --- parsl/executors/status_handling.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 4d29439670..7956992f2e 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -113,20 +113,6 @@ def outstanding(self) -> int: raise NotImplementedError("Classes inheriting from BlockProviderExecutor must implement " "outstanding()") - def status(self) -> Dict[str, JobStatus]: - """Return the status of all jobs/blocks currently known to this executor. - - :return: a dictionary mapping block ids (in string) to job status - """ - if self._provider: - block_ids, job_ids = self._get_block_and_job_ids() - status = self._make_status_dict(block_ids, self._provider.status(job_ids)) - else: - status = {} - status.update(self._simulated_status) - - return status - def set_bad_state_and_fail_all(self, exception: Exception): """Allows external error handlers to mark this executor as irrecoverably bad and cause all tasks submitted to it now and in the future to fail. The executor is responsible @@ -276,6 +262,20 @@ def poll_facade(self) -> None: if delta_status: self.send_monitoring_info(delta_status) + def status(self) -> Dict[str, JobStatus]: + """Return the status of all jobs/blocks currently known to this executor. + + :return: a dictionary mapping block ids (in string) to job status + """ + if self._provider: + block_ids, job_ids = self._get_block_and_job_ids() + status = self._make_status_dict(block_ids, self._provider.status(job_ids)) + else: + status = {} + status.update(self._simulated_status) + + return status + @property def status_facade(self) -> Dict[str, JobStatus]: """Return the status of all jobs/blocks of the executor of this poller. From 13ae8e502371daf82c9cf4054da1360fe0e5c546 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 18 Jul 2024 10:18:27 +0200 Subject: [PATCH 04/20] Gather four block/job status structures together (#3528) This is part of work to make it easier to understand the four structures and how they relate to each other. This should not change any behaviour. --- parsl/executors/status_handling.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 7956992f2e..e22c199521 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -59,20 +59,28 @@ def __init__(self, *, else: self.block_error_handler = block_error_handler - # errors can happen during the submit call to the provider; this is used - # to keep track of such errors so that they can be handled in one place - # together with errors reported by status() - self._simulated_status: Dict[str, JobStatus] = {} self._executor_bad_state = threading.Event() self._executor_exception: Optional[Exception] = None self._block_id_counter = AtomicIDCounter() self._tasks = {} # type: Dict[object, Future] + + self._last_poll_time = 0.0 + + # these four structures track, in loosely coordinated fashion, the + # existence of blocks and jobs and how to map between their + # identifiers. self.blocks_to_job_id = {} # type: Dict[str, str] self.job_ids_to_block = {} # type: Dict[str, str] - self._last_poll_time = 0.0 + # errors can happen during the submit call to the provider; this is used + # to keep track of such errors so that they can be handled in one place + # together with errors reported by status() + self._simulated_status: Dict[str, JobStatus] = {} + + # this stores an approximation (sometimes delayed) of the latest status + # of pending, active and recently terminated blocks self._status = {} # type: Dict[str, JobStatus] def _make_status_dict(self, block_ids: List[str], status_list: List[JobStatus]) -> Dict[str, JobStatus]: From 2b1594c7ec80ce609708cd4ea4c9f7f157be53b3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 19 Jul 2024 20:37:45 +0200 Subject: [PATCH 05/20] Mark scale_out method as internal to BlockProviderExecutor (#3529) See PEP-8 https://peps.python.org/pep-0008/#descriptive-naming-styles This should not change any behaviour. --- parsl/executors/status_handling.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index e22c199521..f6d92e2af7 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -174,7 +174,7 @@ def _filter_scale_in_ids(self, to_kill, killed): # Filters first iterable by bool values in second return list(compress(to_kill, killed)) - def scale_out(self, blocks: int = 1) -> List[str]: + def _scale_out(self, blocks: int = 1) -> List[str]: """Scales out the number of blocks by "blocks" """ if not self.provider: @@ -312,7 +312,7 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[ return block_ids def scale_out_facade(self, n: int) -> List[str]: - block_ids = self.scale_out(n) + block_ids = self._scale_out(n) if block_ids is not None: new_status = {} for block_id in block_ids: From 9798260c06da16f7d1a75dc2859c513d13992dc3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 23 Jul 2024 19:28:44 +0200 Subject: [PATCH 06/20] Move FluxExecutor ZMQ into thread and explicitly clean it up (#3517) Prior to this PR, there were frequent hangs in CI at cleanup of the ZMQ objects used by the FluxExecutor. See issue #3484 for some more information. This PR attempts to remove some dangerous behaviour there: i) creation of ZMQ context and socket is moved into the thread which makes use of them - before this PR, the socket was created on the main thread and passed into the submission thread which uses it. This removes some thread safety issues where a socket cannot be safely moved between threads. ii) ZMQ context and socket are more explicitly closed (using with-blocks) rather than leaving that to the garbage collector. In the hung tests, the ZMQ context was being garbage collected in the main thread, which is documented as being unsafe when sockets are open belonging to another thread (the submission thread) On my laptop I could see a hang around 50% of test runs before this PR. After this PR, I have run about 100 iterations of the flux tests without seeing any hangs. --- parsl/executors/flux/executor.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/parsl/executors/flux/executor.py b/parsl/executors/flux/executor.py index c4926abb68..f1b981f7e0 100644 --- a/parsl/executors/flux/executor.py +++ b/parsl/executors/flux/executor.py @@ -200,7 +200,6 @@ def __init__( raise EnvironmentError("Cannot find Flux installation in PATH") self.flux_path = os.path.abspath(flux_path) self._task_id_counter = itertools.count() - self._socket = zmq.Context().socket(zmq.REP) # Assumes a launch command cannot be None or empty self.launch_cmd = launch_cmd or self.DEFAULT_LAUNCH_CMD self._submission_queue: queue.Queue = queue.Queue() @@ -213,7 +212,6 @@ def __init__( args=( self._submission_queue, self._stop_event, - self._socket, self.working_dir, self.flux_executor_kwargs, self.provider, @@ -306,11 +304,13 @@ def _submit_wrapper( If an exception is thrown, error out all submitted tasks. """ - try: - _submit_flux_jobs(submission_queue, stop_event, *args, **kwargs) - except Exception as exc: - _error_out_jobs(submission_queue, stop_event, exc) - raise + with zmq.Context() as ctx: + with ctx.socket(zmq.REP) as socket: + try: + _submit_flux_jobs(submission_queue, stop_event, socket, *args, **kwargs) + except Exception as exc: + _error_out_jobs(submission_queue, stop_event, exc) + raise def _error_out_jobs( From 449d25e3a10cb31784b454e42edfb8f8d175310f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 23 Jul 2024 20:12:28 +0200 Subject: [PATCH 07/20] Remove unused dfk.memo_lookup_table attributed. (#3536) This attribute is initialised sometimes, but not always, and is never read from. There's a similarly named attribute in Memoizer, so I think this was mistakenly introduced in commit 307b419dbcc847aeaf021f04c45b5149aa81d190. --- parsl/dataflow/dflow.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 3ecabd11fe..ebb4d2a31c 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1460,8 +1460,6 @@ def load_checkpoints(self, checkpointDirs: Optional[Sequence[str]]) -> Dict[str, Returns: - dict containing, hashed -> future mappings """ - self.memo_lookup_table = None - if checkpointDirs: return self._load_checkpoints(checkpointDirs) else: From b225c715f1bb48a4e714d2987bb528e30d017103 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 23 Jul 2024 21:00:01 +0200 Subject: [PATCH 08/20] Remove unused None message codepath from htex queue management thread (#3523) This code path looks like it was originally intended to cause the thread to exit, but is never used - this PR removes the entire if statement and re-indents so that the else case is the only code path that happens now. --- parsl/executors/high_throughput/executor.py | 91 ++++++++++----------- 1 file changed, 42 insertions(+), 49 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index ee6cb5a105..69183364f7 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -456,8 +456,6 @@ def _result_queue_worker(self): "task_id" : "exception" : serialized exception object, on failure } - - The `None` message is a die request. """ logger.debug("Result queue worker starting") @@ -475,58 +473,53 @@ def _result_queue_worker(self): else: - if msgs is None: - logger.debug("Got None, exiting") - return + for serialized_msg in msgs: + try: + msg = pickle.loads(serialized_msg) + except pickle.UnpicklingError: + raise BadMessage("Message received could not be unpickled") - else: - for serialized_msg in msgs: + if msg['type'] == 'heartbeat': + continue + elif msg['type'] == 'result': try: - msg = pickle.loads(serialized_msg) - except pickle.UnpicklingError: - raise BadMessage("Message received could not be unpickled") + tid = msg['task_id'] + except Exception: + raise BadMessage("Message received does not contain 'task_id' field") + + if tid == -1 and 'exception' in msg: + logger.warning("Executor shutting down due to exception from interchange") + exception = deserialize(msg['exception']) + self.set_bad_state_and_fail_all(exception) + break + + task_fut = self.tasks.pop(tid) + + if 'result' in msg: + result = deserialize(msg['result']) + task_fut.set_result(result) - if msg['type'] == 'heartbeat': - continue - elif msg['type'] == 'result': + elif 'exception' in msg: try: - tid = msg['task_id'] - except Exception: - raise BadMessage("Message received does not contain 'task_id' field") - - if tid == -1 and 'exception' in msg: - logger.warning("Executor shutting down due to exception from interchange") - exception = deserialize(msg['exception']) - self.set_bad_state_and_fail_all(exception) - break - - task_fut = self.tasks.pop(tid) - - if 'result' in msg: - result = deserialize(msg['result']) - task_fut.set_result(result) - - elif 'exception' in msg: - try: - s = deserialize(msg['exception']) - # s should be a RemoteExceptionWrapper... so we can reraise it - if isinstance(s, RemoteExceptionWrapper): - try: - s.reraise() - except Exception as e: - task_fut.set_exception(e) - elif isinstance(s, Exception): - task_fut.set_exception(s) - else: - raise ValueError("Unknown exception-like type received: {}".format(type(s))) - except Exception as e: - # TODO could be a proper wrapped exception? - task_fut.set_exception( - DeserializationError("Received exception, but handling also threw an exception: {}".format(e))) - else: - raise BadMessage("Message received is neither result or exception") + s = deserialize(msg['exception']) + # s should be a RemoteExceptionWrapper... so we can reraise it + if isinstance(s, RemoteExceptionWrapper): + try: + s.reraise() + except Exception as e: + task_fut.set_exception(e) + elif isinstance(s, Exception): + task_fut.set_exception(s) + else: + raise ValueError("Unknown exception-like type received: {}".format(type(s))) + except Exception as e: + # TODO could be a proper wrapped exception? + task_fut.set_exception( + DeserializationError("Received exception, but handling also threw an exception: {}".format(e))) else: - raise BadMessage("Message received with unknown type {}".format(msg['type'])) + raise BadMessage("Message received is neither result or exception") + else: + raise BadMessage("Message received with unknown type {}".format(msg['type'])) logger.info("Result queue worker finished") From 74fe660db8b285e462a86a94ae31be7c0b4e504c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 24 Jul 2024 09:40:40 +0200 Subject: [PATCH 09/20] Remove explicit pytest flux start, because FluxExecutor does the real flux start (#3511) Prior to this PR, flux pytests were run inside a flux start command; but inside that, FluxExecutor does its owns flux start - see around line 177 in parsl/parsl/executors/flux/executor.py This second, inner flux is what is used to execute tasks as if it was a batch allocation on a cluster. So the outer pytest flux is only used to run the coordinating test workflow and launch that inner flux, as if it were on a submitting/login node. This is unnecessary. --- .github/workflows/parsl+flux.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/parsl+flux.yaml b/.github/workflows/parsl+flux.yaml index e733f14199..8b8c43d8b2 100644 --- a/.github/workflows/parsl+flux.yaml +++ b/.github/workflows/parsl+flux.yaml @@ -31,12 +31,12 @@ jobs: run: | pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/local_threads.py --random-order --durations 10 - - name: Start Flux and Test Parsl with Flux + - name: Test Parsl with Flux run: | - flux start pytest parsl/tests/test_flux.py --config local --random-order + pytest parsl/tests/test_flux.py --config local --random-order - name: Test Parsl with Flux Config run: | - flux start pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/flux_local.py --random-order --durations 10 + pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/flux_local.py --random-order --durations 10 From 03ce73c2ee58145e86e0685d089786e37c198a4d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 24 Jul 2024 10:35:28 +0200 Subject: [PATCH 10/20] Free up the *Radio namespace for future config structures (#3520) Ongoing monitoring radio work (see PR #3315) introduces per-radio configuration classes using *Radio names. This PR frees up the *Radio namespace for that use, by renaming non-user-exposed internal classes out of the way. --- parsl/executors/base.py | 8 ++++---- parsl/monitoring/monitoring.py | 4 ++-- parsl/monitoring/radios.py | 14 +++++++------- parsl/monitoring/remote.py | 24 ++++++++++++------------ 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/parsl/executors/base.py b/parsl/executors/base.py index b00aa55680..941f392e9f 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -5,7 +5,7 @@ from typing_extensions import Literal, Self -from parsl.monitoring.radios import MonitoringRadio +from parsl.monitoring.radios import MonitoringRadioSender class ParslExecutor(metaclass=ABCMeta): @@ -52,7 +52,7 @@ def __init__( *, hub_address: Optional[str] = None, hub_zmq_port: Optional[int] = None, - monitoring_radio: Optional[MonitoringRadio] = None, + monitoring_radio: Optional[MonitoringRadioSender] = None, run_dir: str = ".", run_id: Optional[str] = None, ): @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None: self._hub_zmq_port = value @property - def monitoring_radio(self) -> Optional[MonitoringRadio]: + def monitoring_radio(self) -> Optional[MonitoringRadioSender]: """Local radio for sending monitoring messages """ return self._monitoring_radio @monitoring_radio.setter - def monitoring_radio(self, value: Optional[MonitoringRadio]) -> None: + def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: self._monitoring_radio = value diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 8e4770a32a..14b0506b17 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -13,7 +13,7 @@ from parsl.log_utils import set_file_logger from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import MultiprocessingQueueRadio +from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter from parsl.monitoring.types import AddressedMonitoringMessage from parsl.multiprocessing import ForkProcess, SizedQueue @@ -187,7 +187,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat self.filesystem_proc.start() logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") - self.radio = MultiprocessingQueueRadio(self.block_msgs) + self.radio = MultiprocessingQueueRadioSender(self.block_msgs) try: comm_q_result = comm_q.get(block=True, timeout=120) diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py index 070869bdba..6c77fd37b1 100644 --- a/parsl/monitoring/radios.py +++ b/parsl/monitoring/radios.py @@ -15,14 +15,14 @@ logger = logging.getLogger(__name__) -class MonitoringRadio(metaclass=ABCMeta): +class MonitoringRadioSender(metaclass=ABCMeta): @abstractmethod def send(self, message: object) -> None: pass -class FilesystemRadio(MonitoringRadio): - """A MonitoringRadio that sends messages over a shared filesystem. +class FilesystemRadioSender(MonitoringRadioSender): + """A MonitoringRadioSender that sends messages over a shared filesystem. The messsage directory structure is based on maildir, https://en.wikipedia.org/wiki/Maildir @@ -36,7 +36,7 @@ class FilesystemRadio(MonitoringRadio): This avoids a race condition of reading partially written messages. This radio is likely to give higher shared filesystem load compared to - the UDPRadio, but should be much more reliable. + the UDP radio, but should be much more reliable. """ def __init__(self, *, monitoring_url: str, source_id: int, timeout: int = 10, run_dir: str): @@ -66,7 +66,7 @@ def send(self, message: object) -> None: os.rename(tmp_filename, new_filename) -class HTEXRadio(MonitoringRadio): +class HTEXRadioSender(MonitoringRadioSender): def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): """ @@ -120,7 +120,7 @@ def send(self, message: object) -> None: return -class UDPRadio(MonitoringRadio): +class UDPRadioSender(MonitoringRadioSender): def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): """ @@ -174,7 +174,7 @@ def send(self, message: object) -> None: return -class MultiprocessingQueueRadio(MonitoringRadio): +class MultiprocessingQueueRadioSender(MonitoringRadioSender): """A monitoring radio which connects over a multiprocessing Queue. This radio is intended to be used on the submit side, where components in the submit process, or processes launched by multiprocessing, will have diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 98168aa858..055a013627 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -8,10 +8,10 @@ from parsl.monitoring.message_type import MessageType from parsl.monitoring.radios import ( - FilesystemRadio, - HTEXRadio, - MonitoringRadio, - UDPRadio, + FilesystemRadioSender, + HTEXRadioSender, + MonitoringRadioSender, + UDPRadioSender, ) from parsl.multiprocessing import ForkProcess from parsl.process_loggers import wrap_with_logs @@ -100,17 +100,17 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: return (wrapped, args, new_kwargs) -def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadio: - radio: MonitoringRadio +def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadioSender: + radio: MonitoringRadioSender if radio_mode == "udp": - radio = UDPRadio(monitoring_hub_url, - source_id=task_id) + radio = UDPRadioSender(monitoring_hub_url, + source_id=task_id) elif radio_mode == "htex": - radio = HTEXRadio(monitoring_hub_url, - source_id=task_id) + radio = HTEXRadioSender(monitoring_hub_url, + source_id=task_id) elif radio_mode == "filesystem": - radio = FilesystemRadio(monitoring_url=monitoring_hub_url, - source_id=task_id, run_dir=run_dir) + radio = FilesystemRadioSender(monitoring_url=monitoring_hub_url, + source_id=task_id, run_dir=run_dir) else: raise RuntimeError(f"Unknown radio mode: {radio_mode}") return radio From 16305d13209374dea1056cb74d38fc689464e1cd Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 24 Jul 2024 11:40:25 +0200 Subject: [PATCH 11/20] Update checkpoint docs to follow #1945 and #2667 (#3537) --- docs/userguide/checkpoints.rst | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/userguide/checkpoints.rst b/docs/userguide/checkpoints.rst index dbcfcfc760..0f71b019ff 100644 --- a/docs/userguide/checkpoints.rst +++ b/docs/userguide/checkpoints.rst @@ -49,15 +49,17 @@ during development. Using app caching will ensure that only modified apps are re App equivalence ^^^^^^^^^^^^^^^ -Parsl determines app equivalence by storing the hash -of the app function. Thus, any changes to the app code (e.g., -its signature, its body, or even the docstring within the body) -will invalidate cached values. +Parsl determines app equivalence using the name of the app function: +if two apps have the same name, then they are equivalent under this +relation. -However, Parsl does not traverse the call graph of the app function, -so changes inside functions called by an app will not invalidate +Changes inside the app, or by functions called by an app will not invalidate cached values. +There are lots of other ways functions might be compared for equivalence, +and `parsl.dataflow.memoization.id_for_memo` provides a hook to plug in +alternate application-specific implementations. + Invocation equivalence ^^^^^^^^^^^^^^^^^^^^^^ From f6d288936ce5152c4db0d6ca25b9113758a32702 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 24 Jul 2024 13:35:59 +0200 Subject: [PATCH 12/20] Update block monitoring log message (#3527) This monitoring message is not coming from the job status poller - this moved in PR #3349. This monitoring message is not being sent to the hub, but rather to the monitoring router on the far end of the monitoring radio. Debug messages should be formatted with deferred logger formatting. --- parsl/executors/status_handling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index f6d92e2af7..652ba09a1c 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -237,7 +237,7 @@ def send_monitoring_info(self, status: Dict) -> None: # Send monitoring info for HTEX when monitoring enabled if self.monitoring_radio: msg = self.create_monitoring_info(status) - logger.debug("Sending message {} to hub from job status poller".format(msg)) + logger.debug("Sending block monitoring message: %r", msg) self.monitoring_radio.send((MessageType.BLOCK_INFO, msg)) def create_monitoring_info(self, status: Dict[str, JobStatus]) -> Sequence[object]: From a2af30ce57b7a840d565da4414a9dbcf91018b1f Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Wed, 24 Jul 2024 10:50:29 -0500 Subject: [PATCH 13/20] Adding warning about provider options in MPI context (#3516) Warns users about per-task and per-node options to the provider conflicting with MPIExecutor --- docs/userguide/mpi_apps.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/userguide/mpi_apps.rst b/docs/userguide/mpi_apps.rst index a40c03e004..82123123b6 100644 --- a/docs/userguide/mpi_apps.rst +++ b/docs/userguide/mpi_apps.rst @@ -60,6 +60,13 @@ An example for ALCF's Polaris supercomputer that will run 3 MPI tasks of 2 nodes ) +.. warning:: + Please note that ``Provider`` options that specify per-task or per-node resources, for example, + ``SlurmProvider(cores_per_node=N, ...)`` should not be used with :class:`~parsl.executors.high_throughput.MPIExecutor`. + Parsl primarily uses a pilot job model and assumptions from that context do not translate to the MPI context. For + more info refer to : + `github issue #3006 `_ + Writing an MPI App ------------------ From ec8dd620cae9bf01bb3492cd139945bca9fcf7e0 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 25 Jul 2024 00:11:00 +0200 Subject: [PATCH 14/20] Refactor naive scale in behaviour for Work Queue and Task Vine (#3526) The intended behaviour of this scale in code, which is only for scaling in all blocks (for example at the end of a workflow) makes sense as a default for all BlockProviderExecutors. This PR makes that refactor. This code is buggy (before and after) - see issue #3471. This PR does not attempt to fix that, but moves code into a better place for bugfixing, and a subsequent PR will fix it. --- parsl/executors/status_handling.py | 21 +++++++++++++++++++-- parsl/executors/taskvine/executor.py | 18 ------------------ parsl/executors/workqueue/executor.py | 18 ------------------ 3 files changed, 19 insertions(+), 38 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 652ba09a1c..13ddef1256 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -193,15 +193,32 @@ def _scale_out(self, blocks: int = 1) -> List[str]: self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) return block_ids - @abstractmethod def scale_in(self, blocks: int) -> List[str]: """Scale in method. Cause the executor to reduce the number of blocks by count. + The default implementation will kill blocks without regard to their + status or whether they are executing tasks. Executors with more + nuanced scaling strategies might overload this method to work with + that strategy - see the HighThroughputExecutor for an example of that. + :return: A list of block ids corresponding to the blocks that were removed. """ - pass + # Obtain list of blocks to kill + to_kill = list(self.blocks_to_job_id.keys())[:blocks] + kill_ids = [self.blocks_to_job_id[block] for block in to_kill] + + # Cancel the blocks provisioned + if self.provider: + logger.info(f"Scaling in jobs: {kill_ids}") + r = self.provider.cancel(kill_ids) + job_ids = self._filter_scale_in_ids(kill_ids, r) + block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] + return block_ids_killed + else: + logger.error("No execution provider available to scale in") + return [] def _launch_block(self, block_id: str) -> Any: launch_cmd = self._get_launch_command(block_id) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 6cfedf92bb..bebed1a51b 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -573,24 +573,6 @@ def outstanding(self) -> int: def workers_per_node(self) -> Union[int, float]: return 1 - def scale_in(self, count: int) -> List[str]: - """Scale in method. Cancel a given number of blocks - """ - # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:count] - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] - - # Cancel the blocks provisioned - if self.provider: - logger.info(f"Scaling in jobs: {kill_ids}") - r = self.provider.cancel(kill_ids) - job_ids = self._filter_scale_in_ids(kill_ids, r) - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] - return block_ids_killed - else: - logger.error("No execution provider available to scale") - return [] - def shutdown(self, *args, **kwargs): """Shutdown the executor. Sets flag to cancel the submit process and collector thread, which shuts down the TaskVine system submission. diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index e715c23891..a1ad49bca9 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -689,24 +689,6 @@ def outstanding(self) -> int: def workers_per_node(self) -> Union[int, float]: return self.scaling_cores_per_worker - def scale_in(self, count: int) -> List[str]: - """Scale in method. - """ - # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:count] - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] - - # Cancel the blocks provisioned - if self.provider: - logger.info(f"Scaling in jobs: {kill_ids}") - r = self.provider.cancel(kill_ids) - job_ids = self._filter_scale_in_ids(kill_ids, r) - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] - return block_ids_killed - else: - logger.error("No execution provider available to scale in") - return [] - def shutdown(self, *args, **kwargs): """Shutdown the executor. Sets flag to cancel the submit process and collector thread, which shuts down the Work Queue system submission. From 878889bb8baadc16dccd9589020ee31708bd8db3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 25 Jul 2024 00:48:37 +0200 Subject: [PATCH 15/20] Fix broken markup for hyperlink (#3539) --- docs/userguide/checkpoints.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/userguide/checkpoints.rst b/docs/userguide/checkpoints.rst index 0f71b019ff..8867107b7a 100644 --- a/docs/userguide/checkpoints.rst +++ b/docs/userguide/checkpoints.rst @@ -94,7 +94,7 @@ Attempting to cache apps invoked with other, non-hashable, data types will lead to an exception at invocation. In that case, mechanisms to hash new types can be registered by a program by -implementing the ``parsl.dataflow.memoization.id_for_memo`` function for +implementing the `parsl.dataflow.memoization.id_for_memo` function for the new type. Ignoring arguments From 71d9c711cee30211aaadb1725490d1ff0c7f194a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 25 Jul 2024 08:30:04 +0200 Subject: [PATCH 16/20] Don't copy monitoring address/port parameters into the DFK. (#3522) Prior to this PR, monitoring hub address and ZMQ port were stored as attributes of the DFK. The address also existed as an attribute on dfk.monitoring, and the ZMQ port was returned by dfk.monitoring.start Afte this PR, those values are not added to the DFK, but instead are accessed via dfk.monitoring. These two attributes are now only set on a new executor when monitoring is enabled, rather than always being intialised by the DFK. Default values now come from the executor __init__ method, which is a more usual style in Python for providing default values. See PR #3361 This is part of ongoing work to introduce more pluggable monitoring network connectivity - see PR #3315 --- parsl/dataflow/dflow.py | 10 +++------- parsl/monitoring/monitoring.py | 4 ++-- parsl/tests/test_monitoring/test_fuzz_zmq.py | 4 ++-- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index ebb4d2a31c..a62a2261d0 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -113,14 +113,10 @@ def __init__(self, config: Config) -> None: self.monitoring: Optional[MonitoringHub] self.monitoring = config.monitoring - # hub address and port for interchange to connect - self.hub_address = None # type: Optional[str] - self.hub_zmq_port = None # type: Optional[int] if self.monitoring: if self.monitoring.logdir is None: self.monitoring.logdir = self.run_dir - self.hub_address = self.monitoring.hub_address - self.hub_zmq_port = self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir) + self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir) self.time_began = datetime.datetime.now() self.time_completed: Optional[datetime.datetime] = None @@ -1181,9 +1177,9 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: for executor in executors: executor.run_id = self.run_id executor.run_dir = self.run_dir - executor.hub_address = self.hub_address - executor.hub_zmq_port = self.hub_zmq_port if self.monitoring: + executor.hub_address = self.monitoring.hub_address + executor.hub_zmq_port = self.monitoring.hub_zmq_port executor.monitoring_radio = self.monitoring.radio if hasattr(executor, 'provider'): if hasattr(executor.provider, 'script_dir'): diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 14b0506b17..f86bf81e87 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -105,7 +105,7 @@ def __init__(self, self.resource_monitoring_enabled = resource_monitoring_enabled self.resource_monitoring_interval = resource_monitoring_interval - def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> int: + def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: logger.debug("Starting MonitoringHub") @@ -207,7 +207,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat logger.info("Monitoring Hub initialized") - return zmq_port + self.hub_zmq_port = zmq_port # TODO: tighten the Any message format def send(self, mtype: MessageType, message: Any) -> None: diff --git a/parsl/tests/test_monitoring/test_fuzz_zmq.py b/parsl/tests/test_monitoring/test_fuzz_zmq.py index 36f048efb3..3f50385564 100644 --- a/parsl/tests/test_monitoring/test_fuzz_zmq.py +++ b/parsl/tests/test_monitoring/test_fuzz_zmq.py @@ -44,8 +44,8 @@ def test_row_counts(): # the latter is what i'm most suspicious of in my present investigation # dig out the interchange port... - hub_address = parsl.dfk().hub_address - hub_zmq_port = parsl.dfk().hub_zmq_port + hub_address = parsl.dfk().monitoring.hub_address + hub_zmq_port = parsl.dfk().monitoring.hub_zmq_port # this will send a string to a new socket connection with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: From 0c24d7b0b84ebeac5d91a216c5cea6d7a86e607c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 26 Jul 2024 08:10:31 +0100 Subject: [PATCH 17/20] Clarify dev instructions in README.rst (#3545) If one tries to follow step 3 after step 2, which I think is something somewhat reasonable to expect, they end up inside directory `parsl/parsl`, where there's no `setup.py` script. Instead, the script is in the top-level directory, so if you already entered `parsl`, you don't need to go into `parsl/parsl`. This adds a comment to clarify this possible point of confusion. --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index fb1070e7d7..72048d39f4 100644 --- a/README.rst +++ b/README.rst @@ -109,7 +109,7 @@ For Developers 3. Install:: - $ cd parsl + $ cd parsl # only if you didn't enter the top-level directory in step 2 above $ python3 setup.py install 4. Use Parsl! From b96a2dd98ffd2abd00ecc5c217b73a8315ea89f6 Mon Sep 17 00:00:00 2001 From: matthewc2003 Date: Fri, 26 Jul 2024 10:54:38 -0700 Subject: [PATCH 18/20] Make htex managers track start_time (#3546) Managers now record their start time and forward this information to the interchange during registration. The ManagerRecord was updated to support this functionality. Adding this will allow for better manager selection by the interchange in the future. --- parsl/executors/high_throughput/interchange.py | 1 + parsl/executors/high_throughput/manager_record.py | 1 + parsl/executors/high_throughput/process_worker_pool.py | 2 ++ 3 files changed, 4 insertions(+) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 819836e95f..18bdc65610 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -410,6 +410,7 @@ def process_task_outgoing_incoming( self._ready_managers[manager_id] = {'last_heartbeat': time.time(), 'idle_since': time.time(), 'block_id': None, + 'start_time': msg['start_time'], 'max_capacity': 0, 'worker_count': 0, 'active': True, diff --git a/parsl/executors/high_throughput/manager_record.py b/parsl/executors/high_throughput/manager_record.py index 7e58b53954..a48c18cbd9 100644 --- a/parsl/executors/high_throughput/manager_record.py +++ b/parsl/executors/high_throughput/manager_record.py @@ -6,6 +6,7 @@ class ManagerRecord(TypedDict, total=False): block_id: Optional[str] + start_time: float tasks: List[Any] worker_count: int max_capacity: int diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 5c766123d7..59efe501f1 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -184,6 +184,7 @@ def __init__(self, *, self.uid = uid self.block_id = block_id + self.start_time = time.time() self.enable_mpi_mode = enable_mpi_mode self.mpi_launcher = mpi_launcher @@ -263,6 +264,7 @@ def create_reg_message(self): 'worker_count': self.worker_count, 'uid': self.uid, 'block_id': self.block_id, + 'start_time': self.start_time, 'prefetch_capacity': self.prefetch_capacity, 'max_capacity': self.worker_count + self.prefetch_capacity, 'os': platform.system(), From 1652304959face86933921116ae571d472800b31 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 27 Jul 2024 17:45:11 +0200 Subject: [PATCH 19/20] Move scale_out_facade next to scale_out (#3550) See PR #3530 which does this for status() This is a buildup to some simplification and eventual merge of scale_out and scale_out_facade in upcoming PRs. This PR should not change any behaviour --- parsl/executors/status_handling.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 13ddef1256..90773591b6 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -174,6 +174,16 @@ def _filter_scale_in_ids(self, to_kill, killed): # Filters first iterable by bool values in second return list(compress(to_kill, killed)) + def scale_out_facade(self, n: int) -> List[str]: + block_ids = self._scale_out(n) + if block_ids is not None: + new_status = {} + for block_id in block_ids: + new_status[block_id] = JobStatus(JobState.PENDING) + self.send_monitoring_info(new_status) + self._status.update(new_status) + return block_ids + def _scale_out(self, blocks: int = 1) -> List[str]: """Scales out the number of blocks by "blocks" """ @@ -327,13 +337,3 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[ del self._status[block_id] self.send_monitoring_info(new_status) return block_ids - - def scale_out_facade(self, n: int) -> List[str]: - block_ids = self._scale_out(n) - if block_ids is not None: - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) - return block_ids From 64e163ceaf4b43746909f30c9659738f29dd84e1 Mon Sep 17 00:00:00 2001 From: rjmello <30907815+rjmello@users.noreply.github.com> Date: Tue, 30 Jul 2024 11:50:40 -0400 Subject: [PATCH 20/20] Accept multi-token interchange launch commands (#3543) --- parsl/executors/high_throughput/executor.py | 12 ++++---- parsl/tests/test_htex/test_htex.py | 31 +++++++++++++-------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 69183364f7..7c7dea82ac 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -56,7 +56,7 @@ "--mpi-launcher={mpi_launcher} " "--available-accelerators {accelerators}") -DEFAULT_INTERCHANGE_LAUNCH_CMD = "interchange.py" +DEFAULT_INTERCHANGE_LAUNCH_CMD = ["interchange.py"] GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider` Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`, @@ -78,9 +78,9 @@ cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example: launch_cmd="process_worker_pool.py {debug} -c {cores_per_worker} --task_url={task_url} --result_url={result_url}" - interchange_launch_cmd : str - Custom command line string to launch the interchange process from the executor. If undefined, - the executor will use the default "interchange.py" command. + interchange_launch_cmd : Sequence[str] + Custom sequence of command line tokens to launch the interchange process from the executor. If + undefined, the executor will use the default "interchange.py" command. address : string An address to connect to the main Parsl process which is reachable from the network in which @@ -238,7 +238,7 @@ def __init__(self, label: str = 'HighThroughputExecutor', provider: ExecutionProvider = LocalProvider(), launch_cmd: Optional[str] = None, - interchange_launch_cmd: Optional[str] = None, + interchange_launch_cmd: Optional[Sequence[str]] = None, address: Optional[str] = None, worker_ports: Optional[Tuple[int, int]] = None, worker_port_range: Optional[Tuple[int, int]] = (54000, 55000), @@ -548,7 +548,7 @@ def _start_local_interchange_process(self) -> None: config_pickle = pickle.dumps(interchange_config) - self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd.encode("utf-8"), stdin=subprocess.PIPE) + self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd, stdin=subprocess.PIPE) stdin = self.interchange_proc.stdin assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode" diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index 2d1aafda85..fca68c3c2f 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -1,6 +1,6 @@ import pathlib -import warnings from subprocess import Popen, TimeoutExpired +from typing import Optional, Sequence from unittest import mock import pytest @@ -139,13 +139,22 @@ def test_max_workers_per_node(): @pytest.mark.local -def test_htex_launch_cmd(): - htex = HighThroughputExecutor() - assert htex.launch_cmd.startswith("process_worker_pool.py") - assert htex.interchange_launch_cmd == "interchange.py" - - launch_cmd = "custom-launch-cmd" - ix_launch_cmd = "custom-ix-launch-cmd" - htex = HighThroughputExecutor(launch_cmd=launch_cmd, interchange_launch_cmd=ix_launch_cmd) - assert htex.launch_cmd == launch_cmd - assert htex.interchange_launch_cmd == ix_launch_cmd +@pytest.mark.parametrize("cmd", (None, "custom-launch-cmd")) +def test_htex_worker_pool_launch_cmd(cmd: Optional[str]): + if cmd: + htex = HighThroughputExecutor(launch_cmd=cmd) + assert htex.launch_cmd == cmd + else: + htex = HighThroughputExecutor() + assert htex.launch_cmd.startswith("process_worker_pool.py") + + +@pytest.mark.local +@pytest.mark.parametrize("cmd", (None, ["custom", "launch", "cmd"])) +def test_htex_interchange_launch_cmd(cmd: Optional[Sequence[str]]): + if cmd: + htex = HighThroughputExecutor(interchange_launch_cmd=cmd) + assert htex.interchange_launch_cmd == cmd + else: + htex = HighThroughputExecutor() + assert htex.interchange_launch_cmd == ["interchange.py"]