From 13ae8e502371daf82c9cf4054da1360fe0e5c546 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 18 Jul 2024 10:18:27 +0200 Subject: [PATCH 01/38] Gather four block/job status structures together (#3528) This is part of work to make it easier to understand the four structures and how they relate to each other. This should not change any behaviour. --- parsl/executors/status_handling.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 7956992f2e..e22c199521 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -59,20 +59,28 @@ def __init__(self, *, else: self.block_error_handler = block_error_handler - # errors can happen during the submit call to the provider; this is used - # to keep track of such errors so that they can be handled in one place - # together with errors reported by status() - self._simulated_status: Dict[str, JobStatus] = {} self._executor_bad_state = threading.Event() self._executor_exception: Optional[Exception] = None self._block_id_counter = AtomicIDCounter() self._tasks = {} # type: Dict[object, Future] + + self._last_poll_time = 0.0 + + # these four structures track, in loosely coordinated fashion, the + # existence of blocks and jobs and how to map between their + # identifiers. self.blocks_to_job_id = {} # type: Dict[str, str] self.job_ids_to_block = {} # type: Dict[str, str] - self._last_poll_time = 0.0 + # errors can happen during the submit call to the provider; this is used + # to keep track of such errors so that they can be handled in one place + # together with errors reported by status() + self._simulated_status: Dict[str, JobStatus] = {} + + # this stores an approximation (sometimes delayed) of the latest status + # of pending, active and recently terminated blocks self._status = {} # type: Dict[str, JobStatus] def _make_status_dict(self, block_ids: List[str], status_list: List[JobStatus]) -> Dict[str, JobStatus]: From 2b1594c7ec80ce609708cd4ea4c9f7f157be53b3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 19 Jul 2024 20:37:45 +0200 Subject: [PATCH 02/38] Mark scale_out method as internal to BlockProviderExecutor (#3529) See PEP-8 https://peps.python.org/pep-0008/#descriptive-naming-styles This should not change any behaviour. --- parsl/executors/status_handling.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index e22c199521..f6d92e2af7 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -174,7 +174,7 @@ def _filter_scale_in_ids(self, to_kill, killed): # Filters first iterable by bool values in second return list(compress(to_kill, killed)) - def scale_out(self, blocks: int = 1) -> List[str]: + def _scale_out(self, blocks: int = 1) -> List[str]: """Scales out the number of blocks by "blocks" """ if not self.provider: @@ -312,7 +312,7 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[ return block_ids def scale_out_facade(self, n: int) -> List[str]: - block_ids = self.scale_out(n) + block_ids = self._scale_out(n) if block_ids is not None: new_status = {} for block_id in block_ids: From 9798260c06da16f7d1a75dc2859c513d13992dc3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 23 Jul 2024 19:28:44 +0200 Subject: [PATCH 03/38] Move FluxExecutor ZMQ into thread and explicitly clean it up (#3517) Prior to this PR, there were frequent hangs in CI at cleanup of the ZMQ objects used by the FluxExecutor. See issue #3484 for some more information. This PR attempts to remove some dangerous behaviour there: i) creation of ZMQ context and socket is moved into the thread which makes use of them - before this PR, the socket was created on the main thread and passed into the submission thread which uses it. This removes some thread safety issues where a socket cannot be safely moved between threads. ii) ZMQ context and socket are more explicitly closed (using with-blocks) rather than leaving that to the garbage collector. In the hung tests, the ZMQ context was being garbage collected in the main thread, which is documented as being unsafe when sockets are open belonging to another thread (the submission thread) On my laptop I could see a hang around 50% of test runs before this PR. After this PR, I have run about 100 iterations of the flux tests without seeing any hangs. --- parsl/executors/flux/executor.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/parsl/executors/flux/executor.py b/parsl/executors/flux/executor.py index c4926abb68..f1b981f7e0 100644 --- a/parsl/executors/flux/executor.py +++ b/parsl/executors/flux/executor.py @@ -200,7 +200,6 @@ def __init__( raise EnvironmentError("Cannot find Flux installation in PATH") self.flux_path = os.path.abspath(flux_path) self._task_id_counter = itertools.count() - self._socket = zmq.Context().socket(zmq.REP) # Assumes a launch command cannot be None or empty self.launch_cmd = launch_cmd or self.DEFAULT_LAUNCH_CMD self._submission_queue: queue.Queue = queue.Queue() @@ -213,7 +212,6 @@ def __init__( args=( self._submission_queue, self._stop_event, - self._socket, self.working_dir, self.flux_executor_kwargs, self.provider, @@ -306,11 +304,13 @@ def _submit_wrapper( If an exception is thrown, error out all submitted tasks. """ - try: - _submit_flux_jobs(submission_queue, stop_event, *args, **kwargs) - except Exception as exc: - _error_out_jobs(submission_queue, stop_event, exc) - raise + with zmq.Context() as ctx: + with ctx.socket(zmq.REP) as socket: + try: + _submit_flux_jobs(submission_queue, stop_event, socket, *args, **kwargs) + except Exception as exc: + _error_out_jobs(submission_queue, stop_event, exc) + raise def _error_out_jobs( From 449d25e3a10cb31784b454e42edfb8f8d175310f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 23 Jul 2024 20:12:28 +0200 Subject: [PATCH 04/38] Remove unused dfk.memo_lookup_table attributed. (#3536) This attribute is initialised sometimes, but not always, and is never read from. There's a similarly named attribute in Memoizer, so I think this was mistakenly introduced in commit 307b419dbcc847aeaf021f04c45b5149aa81d190. --- parsl/dataflow/dflow.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 3ecabd11fe..ebb4d2a31c 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1460,8 +1460,6 @@ def load_checkpoints(self, checkpointDirs: Optional[Sequence[str]]) -> Dict[str, Returns: - dict containing, hashed -> future mappings """ - self.memo_lookup_table = None - if checkpointDirs: return self._load_checkpoints(checkpointDirs) else: From b225c715f1bb48a4e714d2987bb528e30d017103 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 23 Jul 2024 21:00:01 +0200 Subject: [PATCH 05/38] Remove unused None message codepath from htex queue management thread (#3523) This code path looks like it was originally intended to cause the thread to exit, but is never used - this PR removes the entire if statement and re-indents so that the else case is the only code path that happens now. --- parsl/executors/high_throughput/executor.py | 91 ++++++++++----------- 1 file changed, 42 insertions(+), 49 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index ee6cb5a105..69183364f7 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -456,8 +456,6 @@ def _result_queue_worker(self): "task_id" : "exception" : serialized exception object, on failure } - - The `None` message is a die request. """ logger.debug("Result queue worker starting") @@ -475,58 +473,53 @@ def _result_queue_worker(self): else: - if msgs is None: - logger.debug("Got None, exiting") - return + for serialized_msg in msgs: + try: + msg = pickle.loads(serialized_msg) + except pickle.UnpicklingError: + raise BadMessage("Message received could not be unpickled") - else: - for serialized_msg in msgs: + if msg['type'] == 'heartbeat': + continue + elif msg['type'] == 'result': try: - msg = pickle.loads(serialized_msg) - except pickle.UnpicklingError: - raise BadMessage("Message received could not be unpickled") + tid = msg['task_id'] + except Exception: + raise BadMessage("Message received does not contain 'task_id' field") + + if tid == -1 and 'exception' in msg: + logger.warning("Executor shutting down due to exception from interchange") + exception = deserialize(msg['exception']) + self.set_bad_state_and_fail_all(exception) + break + + task_fut = self.tasks.pop(tid) + + if 'result' in msg: + result = deserialize(msg['result']) + task_fut.set_result(result) - if msg['type'] == 'heartbeat': - continue - elif msg['type'] == 'result': + elif 'exception' in msg: try: - tid = msg['task_id'] - except Exception: - raise BadMessage("Message received does not contain 'task_id' field") - - if tid == -1 and 'exception' in msg: - logger.warning("Executor shutting down due to exception from interchange") - exception = deserialize(msg['exception']) - self.set_bad_state_and_fail_all(exception) - break - - task_fut = self.tasks.pop(tid) - - if 'result' in msg: - result = deserialize(msg['result']) - task_fut.set_result(result) - - elif 'exception' in msg: - try: - s = deserialize(msg['exception']) - # s should be a RemoteExceptionWrapper... so we can reraise it - if isinstance(s, RemoteExceptionWrapper): - try: - s.reraise() - except Exception as e: - task_fut.set_exception(e) - elif isinstance(s, Exception): - task_fut.set_exception(s) - else: - raise ValueError("Unknown exception-like type received: {}".format(type(s))) - except Exception as e: - # TODO could be a proper wrapped exception? - task_fut.set_exception( - DeserializationError("Received exception, but handling also threw an exception: {}".format(e))) - else: - raise BadMessage("Message received is neither result or exception") + s = deserialize(msg['exception']) + # s should be a RemoteExceptionWrapper... so we can reraise it + if isinstance(s, RemoteExceptionWrapper): + try: + s.reraise() + except Exception as e: + task_fut.set_exception(e) + elif isinstance(s, Exception): + task_fut.set_exception(s) + else: + raise ValueError("Unknown exception-like type received: {}".format(type(s))) + except Exception as e: + # TODO could be a proper wrapped exception? + task_fut.set_exception( + DeserializationError("Received exception, but handling also threw an exception: {}".format(e))) else: - raise BadMessage("Message received with unknown type {}".format(msg['type'])) + raise BadMessage("Message received is neither result or exception") + else: + raise BadMessage("Message received with unknown type {}".format(msg['type'])) logger.info("Result queue worker finished") From 74fe660db8b285e462a86a94ae31be7c0b4e504c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 24 Jul 2024 09:40:40 +0200 Subject: [PATCH 06/38] Remove explicit pytest flux start, because FluxExecutor does the real flux start (#3511) Prior to this PR, flux pytests were run inside a flux start command; but inside that, FluxExecutor does its owns flux start - see around line 177 in parsl/parsl/executors/flux/executor.py This second, inner flux is what is used to execute tasks as if it was a batch allocation on a cluster. So the outer pytest flux is only used to run the coordinating test workflow and launch that inner flux, as if it were on a submitting/login node. This is unnecessary. --- .github/workflows/parsl+flux.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/parsl+flux.yaml b/.github/workflows/parsl+flux.yaml index e733f14199..8b8c43d8b2 100644 --- a/.github/workflows/parsl+flux.yaml +++ b/.github/workflows/parsl+flux.yaml @@ -31,12 +31,12 @@ jobs: run: | pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/local_threads.py --random-order --durations 10 - - name: Start Flux and Test Parsl with Flux + - name: Test Parsl with Flux run: | - flux start pytest parsl/tests/test_flux.py --config local --random-order + pytest parsl/tests/test_flux.py --config local --random-order - name: Test Parsl with Flux Config run: | - flux start pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/flux_local.py --random-order --durations 10 + pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/flux_local.py --random-order --durations 10 From 03ce73c2ee58145e86e0685d089786e37c198a4d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 24 Jul 2024 10:35:28 +0200 Subject: [PATCH 07/38] Free up the *Radio namespace for future config structures (#3520) Ongoing monitoring radio work (see PR #3315) introduces per-radio configuration classes using *Radio names. This PR frees up the *Radio namespace for that use, by renaming non-user-exposed internal classes out of the way. --- parsl/executors/base.py | 8 ++++---- parsl/monitoring/monitoring.py | 4 ++-- parsl/monitoring/radios.py | 14 +++++++------- parsl/monitoring/remote.py | 24 ++++++++++++------------ 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/parsl/executors/base.py b/parsl/executors/base.py index b00aa55680..941f392e9f 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -5,7 +5,7 @@ from typing_extensions import Literal, Self -from parsl.monitoring.radios import MonitoringRadio +from parsl.monitoring.radios import MonitoringRadioSender class ParslExecutor(metaclass=ABCMeta): @@ -52,7 +52,7 @@ def __init__( *, hub_address: Optional[str] = None, hub_zmq_port: Optional[int] = None, - monitoring_radio: Optional[MonitoringRadio] = None, + monitoring_radio: Optional[MonitoringRadioSender] = None, run_dir: str = ".", run_id: Optional[str] = None, ): @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None: self._hub_zmq_port = value @property - def monitoring_radio(self) -> Optional[MonitoringRadio]: + def monitoring_radio(self) -> Optional[MonitoringRadioSender]: """Local radio for sending monitoring messages """ return self._monitoring_radio @monitoring_radio.setter - def monitoring_radio(self, value: Optional[MonitoringRadio]) -> None: + def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: self._monitoring_radio = value diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 8e4770a32a..14b0506b17 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -13,7 +13,7 @@ from parsl.log_utils import set_file_logger from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import MultiprocessingQueueRadio +from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter from parsl.monitoring.types import AddressedMonitoringMessage from parsl.multiprocessing import ForkProcess, SizedQueue @@ -187,7 +187,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat self.filesystem_proc.start() logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") - self.radio = MultiprocessingQueueRadio(self.block_msgs) + self.radio = MultiprocessingQueueRadioSender(self.block_msgs) try: comm_q_result = comm_q.get(block=True, timeout=120) diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py index 070869bdba..6c77fd37b1 100644 --- a/parsl/monitoring/radios.py +++ b/parsl/monitoring/radios.py @@ -15,14 +15,14 @@ logger = logging.getLogger(__name__) -class MonitoringRadio(metaclass=ABCMeta): +class MonitoringRadioSender(metaclass=ABCMeta): @abstractmethod def send(self, message: object) -> None: pass -class FilesystemRadio(MonitoringRadio): - """A MonitoringRadio that sends messages over a shared filesystem. +class FilesystemRadioSender(MonitoringRadioSender): + """A MonitoringRadioSender that sends messages over a shared filesystem. The messsage directory structure is based on maildir, https://en.wikipedia.org/wiki/Maildir @@ -36,7 +36,7 @@ class FilesystemRadio(MonitoringRadio): This avoids a race condition of reading partially written messages. This radio is likely to give higher shared filesystem load compared to - the UDPRadio, but should be much more reliable. + the UDP radio, but should be much more reliable. """ def __init__(self, *, monitoring_url: str, source_id: int, timeout: int = 10, run_dir: str): @@ -66,7 +66,7 @@ def send(self, message: object) -> None: os.rename(tmp_filename, new_filename) -class HTEXRadio(MonitoringRadio): +class HTEXRadioSender(MonitoringRadioSender): def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): """ @@ -120,7 +120,7 @@ def send(self, message: object) -> None: return -class UDPRadio(MonitoringRadio): +class UDPRadioSender(MonitoringRadioSender): def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): """ @@ -174,7 +174,7 @@ def send(self, message: object) -> None: return -class MultiprocessingQueueRadio(MonitoringRadio): +class MultiprocessingQueueRadioSender(MonitoringRadioSender): """A monitoring radio which connects over a multiprocessing Queue. This radio is intended to be used on the submit side, where components in the submit process, or processes launched by multiprocessing, will have diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 98168aa858..055a013627 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -8,10 +8,10 @@ from parsl.monitoring.message_type import MessageType from parsl.monitoring.radios import ( - FilesystemRadio, - HTEXRadio, - MonitoringRadio, - UDPRadio, + FilesystemRadioSender, + HTEXRadioSender, + MonitoringRadioSender, + UDPRadioSender, ) from parsl.multiprocessing import ForkProcess from parsl.process_loggers import wrap_with_logs @@ -100,17 +100,17 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: return (wrapped, args, new_kwargs) -def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadio: - radio: MonitoringRadio +def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadioSender: + radio: MonitoringRadioSender if radio_mode == "udp": - radio = UDPRadio(monitoring_hub_url, - source_id=task_id) + radio = UDPRadioSender(monitoring_hub_url, + source_id=task_id) elif radio_mode == "htex": - radio = HTEXRadio(monitoring_hub_url, - source_id=task_id) + radio = HTEXRadioSender(monitoring_hub_url, + source_id=task_id) elif radio_mode == "filesystem": - radio = FilesystemRadio(monitoring_url=monitoring_hub_url, - source_id=task_id, run_dir=run_dir) + radio = FilesystemRadioSender(monitoring_url=monitoring_hub_url, + source_id=task_id, run_dir=run_dir) else: raise RuntimeError(f"Unknown radio mode: {radio_mode}") return radio From 16305d13209374dea1056cb74d38fc689464e1cd Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 24 Jul 2024 11:40:25 +0200 Subject: [PATCH 08/38] Update checkpoint docs to follow #1945 and #2667 (#3537) --- docs/userguide/checkpoints.rst | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/userguide/checkpoints.rst b/docs/userguide/checkpoints.rst index dbcfcfc760..0f71b019ff 100644 --- a/docs/userguide/checkpoints.rst +++ b/docs/userguide/checkpoints.rst @@ -49,15 +49,17 @@ during development. Using app caching will ensure that only modified apps are re App equivalence ^^^^^^^^^^^^^^^ -Parsl determines app equivalence by storing the hash -of the app function. Thus, any changes to the app code (e.g., -its signature, its body, or even the docstring within the body) -will invalidate cached values. +Parsl determines app equivalence using the name of the app function: +if two apps have the same name, then they are equivalent under this +relation. -However, Parsl does not traverse the call graph of the app function, -so changes inside functions called by an app will not invalidate +Changes inside the app, or by functions called by an app will not invalidate cached values. +There are lots of other ways functions might be compared for equivalence, +and `parsl.dataflow.memoization.id_for_memo` provides a hook to plug in +alternate application-specific implementations. + Invocation equivalence ^^^^^^^^^^^^^^^^^^^^^^ From f6d288936ce5152c4db0d6ca25b9113758a32702 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 24 Jul 2024 13:35:59 +0200 Subject: [PATCH 09/38] Update block monitoring log message (#3527) This monitoring message is not coming from the job status poller - this moved in PR #3349. This monitoring message is not being sent to the hub, but rather to the monitoring router on the far end of the monitoring radio. Debug messages should be formatted with deferred logger formatting. --- parsl/executors/status_handling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index f6d92e2af7..652ba09a1c 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -237,7 +237,7 @@ def send_monitoring_info(self, status: Dict) -> None: # Send monitoring info for HTEX when monitoring enabled if self.monitoring_radio: msg = self.create_monitoring_info(status) - logger.debug("Sending message {} to hub from job status poller".format(msg)) + logger.debug("Sending block monitoring message: %r", msg) self.monitoring_radio.send((MessageType.BLOCK_INFO, msg)) def create_monitoring_info(self, status: Dict[str, JobStatus]) -> Sequence[object]: From a2af30ce57b7a840d565da4414a9dbcf91018b1f Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Wed, 24 Jul 2024 10:50:29 -0500 Subject: [PATCH 10/38] Adding warning about provider options in MPI context (#3516) Warns users about per-task and per-node options to the provider conflicting with MPIExecutor --- docs/userguide/mpi_apps.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/userguide/mpi_apps.rst b/docs/userguide/mpi_apps.rst index a40c03e004..82123123b6 100644 --- a/docs/userguide/mpi_apps.rst +++ b/docs/userguide/mpi_apps.rst @@ -60,6 +60,13 @@ An example for ALCF's Polaris supercomputer that will run 3 MPI tasks of 2 nodes ) +.. warning:: + Please note that ``Provider`` options that specify per-task or per-node resources, for example, + ``SlurmProvider(cores_per_node=N, ...)`` should not be used with :class:`~parsl.executors.high_throughput.MPIExecutor`. + Parsl primarily uses a pilot job model and assumptions from that context do not translate to the MPI context. For + more info refer to : + `github issue #3006 `_ + Writing an MPI App ------------------ From ec8dd620cae9bf01bb3492cd139945bca9fcf7e0 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 25 Jul 2024 00:11:00 +0200 Subject: [PATCH 11/38] Refactor naive scale in behaviour for Work Queue and Task Vine (#3526) The intended behaviour of this scale in code, which is only for scaling in all blocks (for example at the end of a workflow) makes sense as a default for all BlockProviderExecutors. This PR makes that refactor. This code is buggy (before and after) - see issue #3471. This PR does not attempt to fix that, but moves code into a better place for bugfixing, and a subsequent PR will fix it. --- parsl/executors/status_handling.py | 21 +++++++++++++++++++-- parsl/executors/taskvine/executor.py | 18 ------------------ parsl/executors/workqueue/executor.py | 18 ------------------ 3 files changed, 19 insertions(+), 38 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 652ba09a1c..13ddef1256 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -193,15 +193,32 @@ def _scale_out(self, blocks: int = 1) -> List[str]: self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) return block_ids - @abstractmethod def scale_in(self, blocks: int) -> List[str]: """Scale in method. Cause the executor to reduce the number of blocks by count. + The default implementation will kill blocks without regard to their + status or whether they are executing tasks. Executors with more + nuanced scaling strategies might overload this method to work with + that strategy - see the HighThroughputExecutor for an example of that. + :return: A list of block ids corresponding to the blocks that were removed. """ - pass + # Obtain list of blocks to kill + to_kill = list(self.blocks_to_job_id.keys())[:blocks] + kill_ids = [self.blocks_to_job_id[block] for block in to_kill] + + # Cancel the blocks provisioned + if self.provider: + logger.info(f"Scaling in jobs: {kill_ids}") + r = self.provider.cancel(kill_ids) + job_ids = self._filter_scale_in_ids(kill_ids, r) + block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] + return block_ids_killed + else: + logger.error("No execution provider available to scale in") + return [] def _launch_block(self, block_id: str) -> Any: launch_cmd = self._get_launch_command(block_id) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 6cfedf92bb..bebed1a51b 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -573,24 +573,6 @@ def outstanding(self) -> int: def workers_per_node(self) -> Union[int, float]: return 1 - def scale_in(self, count: int) -> List[str]: - """Scale in method. Cancel a given number of blocks - """ - # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:count] - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] - - # Cancel the blocks provisioned - if self.provider: - logger.info(f"Scaling in jobs: {kill_ids}") - r = self.provider.cancel(kill_ids) - job_ids = self._filter_scale_in_ids(kill_ids, r) - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] - return block_ids_killed - else: - logger.error("No execution provider available to scale") - return [] - def shutdown(self, *args, **kwargs): """Shutdown the executor. Sets flag to cancel the submit process and collector thread, which shuts down the TaskVine system submission. diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index e715c23891..a1ad49bca9 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -689,24 +689,6 @@ def outstanding(self) -> int: def workers_per_node(self) -> Union[int, float]: return self.scaling_cores_per_worker - def scale_in(self, count: int) -> List[str]: - """Scale in method. - """ - # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:count] - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] - - # Cancel the blocks provisioned - if self.provider: - logger.info(f"Scaling in jobs: {kill_ids}") - r = self.provider.cancel(kill_ids) - job_ids = self._filter_scale_in_ids(kill_ids, r) - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] - return block_ids_killed - else: - logger.error("No execution provider available to scale in") - return [] - def shutdown(self, *args, **kwargs): """Shutdown the executor. Sets flag to cancel the submit process and collector thread, which shuts down the Work Queue system submission. From 878889bb8baadc16dccd9589020ee31708bd8db3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 25 Jul 2024 00:48:37 +0200 Subject: [PATCH 12/38] Fix broken markup for hyperlink (#3539) --- docs/userguide/checkpoints.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/userguide/checkpoints.rst b/docs/userguide/checkpoints.rst index 0f71b019ff..8867107b7a 100644 --- a/docs/userguide/checkpoints.rst +++ b/docs/userguide/checkpoints.rst @@ -94,7 +94,7 @@ Attempting to cache apps invoked with other, non-hashable, data types will lead to an exception at invocation. In that case, mechanisms to hash new types can be registered by a program by -implementing the ``parsl.dataflow.memoization.id_for_memo`` function for +implementing the `parsl.dataflow.memoization.id_for_memo` function for the new type. Ignoring arguments From 71d9c711cee30211aaadb1725490d1ff0c7f194a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 25 Jul 2024 08:30:04 +0200 Subject: [PATCH 13/38] Don't copy monitoring address/port parameters into the DFK. (#3522) Prior to this PR, monitoring hub address and ZMQ port were stored as attributes of the DFK. The address also existed as an attribute on dfk.monitoring, and the ZMQ port was returned by dfk.monitoring.start Afte this PR, those values are not added to the DFK, but instead are accessed via dfk.monitoring. These two attributes are now only set on a new executor when monitoring is enabled, rather than always being intialised by the DFK. Default values now come from the executor __init__ method, which is a more usual style in Python for providing default values. See PR #3361 This is part of ongoing work to introduce more pluggable monitoring network connectivity - see PR #3315 --- parsl/dataflow/dflow.py | 10 +++------- parsl/monitoring/monitoring.py | 4 ++-- parsl/tests/test_monitoring/test_fuzz_zmq.py | 4 ++-- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index ebb4d2a31c..a62a2261d0 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -113,14 +113,10 @@ def __init__(self, config: Config) -> None: self.monitoring: Optional[MonitoringHub] self.monitoring = config.monitoring - # hub address and port for interchange to connect - self.hub_address = None # type: Optional[str] - self.hub_zmq_port = None # type: Optional[int] if self.monitoring: if self.monitoring.logdir is None: self.monitoring.logdir = self.run_dir - self.hub_address = self.monitoring.hub_address - self.hub_zmq_port = self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir) + self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir) self.time_began = datetime.datetime.now() self.time_completed: Optional[datetime.datetime] = None @@ -1181,9 +1177,9 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: for executor in executors: executor.run_id = self.run_id executor.run_dir = self.run_dir - executor.hub_address = self.hub_address - executor.hub_zmq_port = self.hub_zmq_port if self.monitoring: + executor.hub_address = self.monitoring.hub_address + executor.hub_zmq_port = self.monitoring.hub_zmq_port executor.monitoring_radio = self.monitoring.radio if hasattr(executor, 'provider'): if hasattr(executor.provider, 'script_dir'): diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 14b0506b17..f86bf81e87 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -105,7 +105,7 @@ def __init__(self, self.resource_monitoring_enabled = resource_monitoring_enabled self.resource_monitoring_interval = resource_monitoring_interval - def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> int: + def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: logger.debug("Starting MonitoringHub") @@ -207,7 +207,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat logger.info("Monitoring Hub initialized") - return zmq_port + self.hub_zmq_port = zmq_port # TODO: tighten the Any message format def send(self, mtype: MessageType, message: Any) -> None: diff --git a/parsl/tests/test_monitoring/test_fuzz_zmq.py b/parsl/tests/test_monitoring/test_fuzz_zmq.py index 36f048efb3..3f50385564 100644 --- a/parsl/tests/test_monitoring/test_fuzz_zmq.py +++ b/parsl/tests/test_monitoring/test_fuzz_zmq.py @@ -44,8 +44,8 @@ def test_row_counts(): # the latter is what i'm most suspicious of in my present investigation # dig out the interchange port... - hub_address = parsl.dfk().hub_address - hub_zmq_port = parsl.dfk().hub_zmq_port + hub_address = parsl.dfk().monitoring.hub_address + hub_zmq_port = parsl.dfk().monitoring.hub_zmq_port # this will send a string to a new socket connection with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: From 0c24d7b0b84ebeac5d91a216c5cea6d7a86e607c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 26 Jul 2024 08:10:31 +0100 Subject: [PATCH 14/38] Clarify dev instructions in README.rst (#3545) If one tries to follow step 3 after step 2, which I think is something somewhat reasonable to expect, they end up inside directory `parsl/parsl`, where there's no `setup.py` script. Instead, the script is in the top-level directory, so if you already entered `parsl`, you don't need to go into `parsl/parsl`. This adds a comment to clarify this possible point of confusion. --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index fb1070e7d7..72048d39f4 100644 --- a/README.rst +++ b/README.rst @@ -109,7 +109,7 @@ For Developers 3. Install:: - $ cd parsl + $ cd parsl # only if you didn't enter the top-level directory in step 2 above $ python3 setup.py install 4. Use Parsl! From b96a2dd98ffd2abd00ecc5c217b73a8315ea89f6 Mon Sep 17 00:00:00 2001 From: matthewc2003 Date: Fri, 26 Jul 2024 10:54:38 -0700 Subject: [PATCH 15/38] Make htex managers track start_time (#3546) Managers now record their start time and forward this information to the interchange during registration. The ManagerRecord was updated to support this functionality. Adding this will allow for better manager selection by the interchange in the future. --- parsl/executors/high_throughput/interchange.py | 1 + parsl/executors/high_throughput/manager_record.py | 1 + parsl/executors/high_throughput/process_worker_pool.py | 2 ++ 3 files changed, 4 insertions(+) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 819836e95f..18bdc65610 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -410,6 +410,7 @@ def process_task_outgoing_incoming( self._ready_managers[manager_id] = {'last_heartbeat': time.time(), 'idle_since': time.time(), 'block_id': None, + 'start_time': msg['start_time'], 'max_capacity': 0, 'worker_count': 0, 'active': True, diff --git a/parsl/executors/high_throughput/manager_record.py b/parsl/executors/high_throughput/manager_record.py index 7e58b53954..a48c18cbd9 100644 --- a/parsl/executors/high_throughput/manager_record.py +++ b/parsl/executors/high_throughput/manager_record.py @@ -6,6 +6,7 @@ class ManagerRecord(TypedDict, total=False): block_id: Optional[str] + start_time: float tasks: List[Any] worker_count: int max_capacity: int diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 5c766123d7..59efe501f1 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -184,6 +184,7 @@ def __init__(self, *, self.uid = uid self.block_id = block_id + self.start_time = time.time() self.enable_mpi_mode = enable_mpi_mode self.mpi_launcher = mpi_launcher @@ -263,6 +264,7 @@ def create_reg_message(self): 'worker_count': self.worker_count, 'uid': self.uid, 'block_id': self.block_id, + 'start_time': self.start_time, 'prefetch_capacity': self.prefetch_capacity, 'max_capacity': self.worker_count + self.prefetch_capacity, 'os': platform.system(), From 1652304959face86933921116ae571d472800b31 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 27 Jul 2024 17:45:11 +0200 Subject: [PATCH 16/38] Move scale_out_facade next to scale_out (#3550) See PR #3530 which does this for status() This is a buildup to some simplification and eventual merge of scale_out and scale_out_facade in upcoming PRs. This PR should not change any behaviour --- parsl/executors/status_handling.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 13ddef1256..90773591b6 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -174,6 +174,16 @@ def _filter_scale_in_ids(self, to_kill, killed): # Filters first iterable by bool values in second return list(compress(to_kill, killed)) + def scale_out_facade(self, n: int) -> List[str]: + block_ids = self._scale_out(n) + if block_ids is not None: + new_status = {} + for block_id in block_ids: + new_status[block_id] = JobStatus(JobState.PENDING) + self.send_monitoring_info(new_status) + self._status.update(new_status) + return block_ids + def _scale_out(self, blocks: int = 1) -> List[str]: """Scales out the number of blocks by "blocks" """ @@ -327,13 +337,3 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[ del self._status[block_id] self.send_monitoring_info(new_status) return block_ids - - def scale_out_facade(self, n: int) -> List[str]: - block_ids = self._scale_out(n) - if block_ids is not None: - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) - return block_ids From 64e163ceaf4b43746909f30c9659738f29dd84e1 Mon Sep 17 00:00:00 2001 From: rjmello <30907815+rjmello@users.noreply.github.com> Date: Tue, 30 Jul 2024 11:50:40 -0400 Subject: [PATCH 17/38] Accept multi-token interchange launch commands (#3543) --- parsl/executors/high_throughput/executor.py | 12 ++++---- parsl/tests/test_htex/test_htex.py | 31 +++++++++++++-------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 69183364f7..7c7dea82ac 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -56,7 +56,7 @@ "--mpi-launcher={mpi_launcher} " "--available-accelerators {accelerators}") -DEFAULT_INTERCHANGE_LAUNCH_CMD = "interchange.py" +DEFAULT_INTERCHANGE_LAUNCH_CMD = ["interchange.py"] GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider` Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`, @@ -78,9 +78,9 @@ cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example: launch_cmd="process_worker_pool.py {debug} -c {cores_per_worker} --task_url={task_url} --result_url={result_url}" - interchange_launch_cmd : str - Custom command line string to launch the interchange process from the executor. If undefined, - the executor will use the default "interchange.py" command. + interchange_launch_cmd : Sequence[str] + Custom sequence of command line tokens to launch the interchange process from the executor. If + undefined, the executor will use the default "interchange.py" command. address : string An address to connect to the main Parsl process which is reachable from the network in which @@ -238,7 +238,7 @@ def __init__(self, label: str = 'HighThroughputExecutor', provider: ExecutionProvider = LocalProvider(), launch_cmd: Optional[str] = None, - interchange_launch_cmd: Optional[str] = None, + interchange_launch_cmd: Optional[Sequence[str]] = None, address: Optional[str] = None, worker_ports: Optional[Tuple[int, int]] = None, worker_port_range: Optional[Tuple[int, int]] = (54000, 55000), @@ -548,7 +548,7 @@ def _start_local_interchange_process(self) -> None: config_pickle = pickle.dumps(interchange_config) - self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd.encode("utf-8"), stdin=subprocess.PIPE) + self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd, stdin=subprocess.PIPE) stdin = self.interchange_proc.stdin assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode" diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index 2d1aafda85..fca68c3c2f 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -1,6 +1,6 @@ import pathlib -import warnings from subprocess import Popen, TimeoutExpired +from typing import Optional, Sequence from unittest import mock import pytest @@ -139,13 +139,22 @@ def test_max_workers_per_node(): @pytest.mark.local -def test_htex_launch_cmd(): - htex = HighThroughputExecutor() - assert htex.launch_cmd.startswith("process_worker_pool.py") - assert htex.interchange_launch_cmd == "interchange.py" - - launch_cmd = "custom-launch-cmd" - ix_launch_cmd = "custom-ix-launch-cmd" - htex = HighThroughputExecutor(launch_cmd=launch_cmd, interchange_launch_cmd=ix_launch_cmd) - assert htex.launch_cmd == launch_cmd - assert htex.interchange_launch_cmd == ix_launch_cmd +@pytest.mark.parametrize("cmd", (None, "custom-launch-cmd")) +def test_htex_worker_pool_launch_cmd(cmd: Optional[str]): + if cmd: + htex = HighThroughputExecutor(launch_cmd=cmd) + assert htex.launch_cmd == cmd + else: + htex = HighThroughputExecutor() + assert htex.launch_cmd.startswith("process_worker_pool.py") + + +@pytest.mark.local +@pytest.mark.parametrize("cmd", (None, ["custom", "launch", "cmd"])) +def test_htex_interchange_launch_cmd(cmd: Optional[Sequence[str]]): + if cmd: + htex = HighThroughputExecutor(interchange_launch_cmd=cmd) + assert htex.interchange_launch_cmd == cmd + else: + htex = HighThroughputExecutor() + assert htex.interchange_launch_cmd == ["interchange.py"] From 4da6657df88bbc96fbc4238d845150b25cca7fa0 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 30 Jul 2024 22:25:25 +0200 Subject: [PATCH 18/38] Remove unused codepath from executor scale-out (#3551) block_ids is populatd by _scale_out which always returns a list, according to its type signature. So the `None` codepath should not ever be reached. mypy agrees: after adding an else clause (to act on the `None` case), mypy then marks the added clause as unreachable. --- parsl/executors/status_handling.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 90773591b6..772fc9a69a 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -176,12 +176,11 @@ def _filter_scale_in_ids(self, to_kill, killed): def scale_out_facade(self, n: int) -> List[str]: block_ids = self._scale_out(n) - if block_ids is not None: - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) + new_status = {} + for block_id in block_ids: + new_status[block_id] = JobStatus(JobState.PENDING) + self.send_monitoring_info(new_status) + self._status.update(new_status) return block_ids def _scale_out(self, blocks: int = 1) -> List[str]: From 5eb30f17361cea67f6da433e33e34b0567c9144d Mon Sep 17 00:00:00 2001 From: matthewc2003 Date: Wed, 31 Jul 2024 02:56:48 -0700 Subject: [PATCH 19/38] Add Modular Manager Selector Interface (#3547) Added a Manager Selector interface which allows users to choose an algorithm to sort the interesting managers. This will allow for flexible testing and implementation of manager selection strategies to optimize efficiency of the interchange. --- parsl/executors/high_throughput/executor.py | 7 ++++++ .../executors/high_throughput/interchange.py | 8 +++--- .../high_throughput/manager_selector.py | 25 +++++++++++++++++++ parsl/tests/test_htex/test_zmq_binding.py | 2 ++ parsl/tests/test_mpi_apps/test_mpiex.py | 2 +- 5 files changed, 40 insertions(+), 4 deletions(-) create mode 100644 parsl/executors/high_throughput/manager_selector.py diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 7c7dea82ac..6c181cdee7 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -20,6 +20,10 @@ from parsl.executors.errors import BadMessage, ScalingFailed from parsl.executors.high_throughput import zmq_pipes from parsl.executors.high_throughput.errors import CommandClientTimeoutError +from parsl.executors.high_throughput.manager_selector import ( + ManagerSelector, + RandomManagerSelector, +) from parsl.executors.high_throughput.mpi_prefix_composer import ( VALID_LAUNCHERS, validate_resource_spec, @@ -261,6 +265,7 @@ def __init__(self, worker_logdir_root: Optional[str] = None, enable_mpi_mode: bool = False, mpi_launcher: str = "mpiexec", + manager_selector: ManagerSelector = RandomManagerSelector(), block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, encrypted: bool = False): @@ -276,6 +281,7 @@ def __init__(self, self.prefetch_capacity = prefetch_capacity self.address = address self.address_probe_timeout = address_probe_timeout + self.manager_selector = manager_selector if self.address: self.all_addresses = address else: @@ -544,6 +550,7 @@ def _start_local_interchange_process(self) -> None: "poll_period": self.poll_period, "logging_level": logging.DEBUG if self.worker_debug else logging.INFO, "cert_dir": self.cert_dir, + "manager_selector": self.manager_selector, } config_pickle = pickle.dumps(interchange_config) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 18bdc65610..9ebe6b95b9 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -6,7 +6,6 @@ import pickle import platform import queue -import random import signal import sys import threading @@ -19,6 +18,7 @@ from parsl.app.errors import RemoteExceptionWrapper from parsl.executors.high_throughput.errors import ManagerLost, VersionMismatch from parsl.executors.high_throughput.manager_record import ManagerRecord +from parsl.executors.high_throughput.manager_selector import ManagerSelector from parsl.monitoring.message_type import MessageType from parsl.process_loggers import wrap_with_logs from parsl.serialize import serialize as serialize_object @@ -53,6 +53,7 @@ def __init__(self, logging_level: int, poll_period: int, cert_dir: Optional[str], + manager_selector: ManagerSelector, ) -> None: """ Parameters @@ -160,6 +161,8 @@ def __init__(self, self.heartbeat_threshold = heartbeat_threshold + self.manager_selector = manager_selector + self.current_platform = {'parsl_v': PARSL_VERSION, 'python_v': "{}.{}.{}".format(sys.version_info.major, sys.version_info.minor, @@ -485,8 +488,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: interesting=len(interesting_managers))) if interesting_managers and not self.pending_task_queue.empty(): - shuffled_managers = list(interesting_managers) - random.shuffle(shuffled_managers) + shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers) while shuffled_managers and not self.pending_task_queue.empty(): # cf. the if statement above... manager_id = shuffled_managers.pop() diff --git a/parsl/executors/high_throughput/manager_selector.py b/parsl/executors/high_throughput/manager_selector.py new file mode 100644 index 0000000000..0ede28ee7d --- /dev/null +++ b/parsl/executors/high_throughput/manager_selector.py @@ -0,0 +1,25 @@ +import random +from abc import ABCMeta, abstractmethod +from typing import Dict, List, Set + +from parsl.executors.high_throughput.manager_record import ManagerRecord + + +class ManagerSelector(metaclass=ABCMeta): + + @abstractmethod + def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]: + """ Sort a given list of managers. + + Any operations pertaining to the sorting and rearrangement of the + interesting_managers Set should be performed here. + """ + pass + + +class RandomManagerSelector(ManagerSelector): + + def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]: + c_manager_list = list(manager_list) + random.shuffle(c_manager_list) + return c_manager_list diff --git a/parsl/tests/test_htex/test_zmq_binding.py b/parsl/tests/test_htex/test_zmq_binding.py index 1194e632d0..2273443b99 100644 --- a/parsl/tests/test_htex/test_zmq_binding.py +++ b/parsl/tests/test_htex/test_zmq_binding.py @@ -9,6 +9,7 @@ from parsl import curvezmq from parsl.executors.high_throughput.interchange import Interchange +from parsl.executors.high_throughput.manager_selector import RandomManagerSelector def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[str]) -> Interchange: @@ -23,6 +24,7 @@ def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[s heartbeat_threshold=60, logdir=".", logging_level=logging.INFO, + manager_selector=RandomManagerSelector(), poll_period=10) diff --git a/parsl/tests/test_mpi_apps/test_mpiex.py b/parsl/tests/test_mpi_apps/test_mpiex.py index 1b3e86e0b8..a85547abea 100644 --- a/parsl/tests/test_mpi_apps/test_mpiex.py +++ b/parsl/tests/test_mpi_apps/test_mpiex.py @@ -44,7 +44,7 @@ def test_init(): new_kwargs = {'max_workers_per_block'} excluded_kwargs = {'available_accelerators', 'enable_mpi_mode', 'cores_per_worker', 'max_workers_per_node', - 'mem_per_worker', 'cpu_affinity', 'max_workers'} + 'mem_per_worker', 'cpu_affinity', 'max_workers', 'manager_selector'} # Get the kwargs from both HTEx and MPIEx htex_kwargs = set(signature(HighThroughputExecutor.__init__).parameters) From 2981a287bbffc4aead9f70e2a427c1def3fa9f36 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 31 Jul 2024 13:17:07 +0200 Subject: [PATCH 20/38] Move monitoring router parameters into object attributes (#3521) This is to support rearrangement of the structure of the router code into multiple threads and methods, without having to manually wire all of the multiprocessing objects between the new methods and threads. These objects are part of the context of the router object, rather than parameters to individual methods which might change, and they are all multiprocessing objects which are thread-safe. --- parsl/monitoring/router.py | 49 +++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 70b4862295..9a422027c1 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -32,7 +32,12 @@ def __init__(self, logdir: str = ".", run_id: str, logging_level: int = logging.INFO, - atexit_timeout: int = 3 # in seconds + atexit_timeout: int = 3, # in seconds + priority_msgs: "queue.Queue[AddressedMonitoringMessage]", + node_msgs: "queue.Queue[AddressedMonitoringMessage]", + block_msgs: "queue.Queue[AddressedMonitoringMessage]", + resource_msgs: "queue.Queue[AddressedMonitoringMessage]", + exit_event: Event, ): """ Initializes a monitoring configuration class. @@ -51,7 +56,11 @@ def __init__(self, Logging level as defined in the logging module. Default: logging.INFO atexit_timeout : float, optional The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. + *_msgs : Queue + Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag. + exit_event : Event + An event that the main Parsl process will set to signal that the monitoring router should shut down. """ os.makedirs(logdir, exist_ok=True) self.logger = set_file_logger("{}/monitoring_router.log".format(logdir), @@ -93,19 +102,20 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - def start(self, - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", - exit_event: Event) -> None: + self.priority_msgs = priority_msgs + self.node_msgs = node_msgs + self.block_msgs = block_msgs + self.resource_msgs = resource_msgs + self.exit_event = exit_event + + def start(self) -> None: try: - while not exit_event.is_set(): + while not self.exit_event.is_set(): try: data, addr = self.udp_sock.recvfrom(2048) resource_msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - resource_msgs.put((resource_msg, addr)) + self.resource_msgs.put((resource_msg, addr)) except socket.timeout: pass @@ -125,15 +135,15 @@ def start(self, if msg[0] == MessageType.NODE_INFO: msg[1]['run_id'] = self.run_id - node_msgs.put(msg_0) + self.node_msgs.put(msg_0) elif msg[0] == MessageType.RESOURCE_INFO: - resource_msgs.put(msg_0) + self.resource_msgs.put(msg_0) elif msg[0] == MessageType.BLOCK_INFO: - block_msgs.put(msg_0) + self.block_msgs.put(msg_0) elif msg[0] == MessageType.TASK_INFO: - priority_msgs.put(msg_0) + self.priority_msgs.put(msg_0) elif msg[0] == MessageType.WORKFLOW_INFO: - priority_msgs.put(msg_0) + self.priority_msgs.put(msg_0) else: # There is a type: ignore here because if msg[0] # is of the correct type, this code is unreachable, @@ -158,7 +168,7 @@ def start(self, data, addr = self.udp_sock.recvfrom(2048) msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - resource_msgs.put((msg, addr)) + self.resource_msgs.put((msg, addr)) last_msg_received_time = time.time() except socket.timeout: pass @@ -191,7 +201,12 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - run_id=run_id) + run_id=run_id, + priority_msgs=priority_msgs, + node_msgs=node_msgs, + block_msgs=block_msgs, + resource_msgs=resource_msgs, + exit_event=exit_event) except Exception as e: logger.error("MonitoringRouter construction failed.", exc_info=True) comm_q.put(f"Monitoring router construction failed: {e}") @@ -200,7 +215,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", router.logger.info("Starting MonitoringRouter in router_starter") try: - router.start(priority_msgs, node_msgs, block_msgs, resource_msgs, exit_event) + router.start() except Exception as e: router.logger.exception("router.start exception") exception_q.put(('Hub', str(e))) From 9c982c5c3bca64205ac57fed93fae1a8ad365d3c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 31 Jul 2024 13:47:25 +0200 Subject: [PATCH 21/38] Add type annotation and error log to _filter_scale_in_ids (#3549) Especially this log message is intended to help user understanding when Parsl is not scaling in as they expected - before this PR, any blocks marked as not-scaled-in were not reported to the user (perhaps on the assumption that it might work next time round?) --- parsl/executors/status_handling.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 772fc9a69a..1e4ea3c0b4 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -167,10 +167,18 @@ def tasks(self) -> Dict[object, Future]: def provider(self): return self._provider - def _filter_scale_in_ids(self, to_kill, killed): + def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) -> Sequence[Any]: """ Filter out job id's that were not killed """ assert len(to_kill) == len(killed) + + if False in killed: + killed_job_ids = [jid for jid, k in zip(to_kill, killed) if k] + not_killed_job_ids = [jid for jid, k in zip(to_kill, killed) if not k] + logger.warning("Some jobs were not killed successfully: " + f"killed jobs: {killed_job_ids}, " + f"not-killed jobs: {not_killed_job_ids}") + # Filters first iterable by bool values in second return list(compress(to_kill, killed)) From 7867b576365ff46ec2613ebe47bc4ad2778493b7 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 09:45:26 +0200 Subject: [PATCH 22/38] Use caplog fixure rather than mock logging when testing HTEX shutdown (#3559) This replaces mock-captured call sequences which are over-testing the exact position of tested log calls in relation to any other log calls, which makes this test fragile when working on logs in the affected area of the code. This is consistent with other parts of the test suite which test log messages using caplog. --- parsl/tests/test_htex/test_htex.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index fca68c3c2f..80a4e91bd5 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -71,12 +71,11 @@ def test_htex_start_encrypted( @pytest.mark.local @pytest.mark.parametrize("started", (True, False)) @pytest.mark.parametrize("timeout_expires", (True, False)) -@mock.patch(f"{_MOCK_BASE}.logger") def test_htex_shutdown( - mock_logger: mock.MagicMock, started: bool, timeout_expires: bool, htex: HighThroughputExecutor, + caplog ): mock_ix_proc = mock.Mock(spec=Popen) @@ -110,20 +109,19 @@ def kill_interchange(*args, **kwargs): htex.shutdown() - mock_logs = mock_logger.info.call_args_list if started: assert mock_ix_proc.terminate.called assert mock_ix_proc.wait.called assert {"timeout": 10} == mock_ix_proc.wait.call_args[1] if timeout_expires: - assert "Unable to terminate Interchange" in mock_logs[1][0][0] + assert "Unable to terminate Interchange" in caplog.text assert mock_ix_proc.kill.called - assert "Attempting" in mock_logs[0][0][0] - assert "Finished" in mock_logs[-1][0][0] + assert "Attempting HighThroughputExecutor shutdown" in caplog.text + assert "Finished HighThroughputExecutor shutdown" in caplog.text else: assert not mock_ix_proc.terminate.called assert not mock_ix_proc.wait.called - assert "has not started" in mock_logs[0][0][0] + assert "HighThroughputExecutor has not started" in caplog.text @pytest.mark.local From 8d606c93af6a7de4df8c60b99752aa4176e57096 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 10:49:31 +0200 Subject: [PATCH 23/38] Rename submit-side monitoring radio for clarification (#3557) A Parsl executor is configured with a submit-side monitoring radio sender, which is used by the BlockProviderExecutor to send block status messages to the monitoring subsystem. Parsl executors also have a notion of a remote monitoring radio, used by remote workers to sending monitoring messages. This can be confusing when both of these radio senders are referred to in the same piece of code, as happened in ongoing monitoring plugin development in draft PR #3315. This PR is intended to make this sitution much less ambiguous by avoiding the mention of a monitoring radio in executor code without qualifying whether it is a submit-side or remote-worker-side radio definition. A future PR from the #3315 stack will introduce other monitoring radio references with the remote prefix, replacing the current radio_mode and related attributes. --- parsl/dataflow/dflow.py | 2 +- parsl/executors/base.py | 14 +++++++------- parsl/executors/status_handling.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index a62a2261d0..88ef063230 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1180,7 +1180,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: if self.monitoring: executor.hub_address = self.monitoring.hub_address executor.hub_zmq_port = self.monitoring.hub_zmq_port - executor.monitoring_radio = self.monitoring.radio + executor.submit_monitoring_radio = self.monitoring.radio if hasattr(executor, 'provider'): if hasattr(executor.provider, 'script_dir'): executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts') diff --git a/parsl/executors/base.py b/parsl/executors/base.py index 941f392e9f..a112b9eb00 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -52,13 +52,13 @@ def __init__( *, hub_address: Optional[str] = None, hub_zmq_port: Optional[int] = None, - monitoring_radio: Optional[MonitoringRadioSender] = None, + submit_monitoring_radio: Optional[MonitoringRadioSender] = None, run_dir: str = ".", run_id: Optional[str] = None, ): self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port - self.monitoring_radio = monitoring_radio + self.submit_monitoring_radio = submit_monitoring_radio self.run_dir = os.path.abspath(run_dir) self.run_id = run_id @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None: self._hub_zmq_port = value @property - def monitoring_radio(self) -> Optional[MonitoringRadioSender]: + def submit_monitoring_radio(self) -> Optional[MonitoringRadioSender]: """Local radio for sending monitoring messages """ - return self._monitoring_radio + return self._submit_monitoring_radio - @monitoring_radio.setter - def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: - self._monitoring_radio = value + @submit_monitoring_radio.setter + def submit_monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: + self._submit_monitoring_radio = value diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 1e4ea3c0b4..0f7ed90592 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -269,10 +269,10 @@ def workers_per_node(self) -> Union[int, float]: def send_monitoring_info(self, status: Dict) -> None: # Send monitoring info for HTEX when monitoring enabled - if self.monitoring_radio: + if self.submit_monitoring_radio: msg = self.create_monitoring_info(status) logger.debug("Sending block monitoring message: %r", msg) - self.monitoring_radio.send((MessageType.BLOCK_INFO, msg)) + self.submit_monitoring_radio.send((MessageType.BLOCK_INFO, msg)) def create_monitoring_info(self, status: Dict[str, JobStatus]) -> Sequence[object]: """Create a monitoring message for each block based on the poll status. From 5ee584d26b1dcb0d22ab07de37df3cdd9be1248d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 11:17:40 +0200 Subject: [PATCH 24/38] Only scale in blocks that are in _status and non-terminal (#3548) In the BlockProviderExecutor, the block ID to job ID mapping structures contain the full historical list of blocks. Prior to this PR, the mapping was used as source of current jobs that should/could be scaled in. This was incorrect. and resulted in scaling in code attempting to: scale in blocks that had already finished, because it continues to see those blocks as eligible for scale-in not scale in blocks that were active - because rather than choosing to scale in an alive block, the code would choose to attempt to scale in a non-alive block After this PR, the _status structure which should contain reasonably up to date status information is used instead of the block/job ID mapping structures. (as a more general principle, those block/job ID mapping structures should never be examined as a whole but only used for mapping) Changed Behaviour: Scaling in should work better in executors using the default scaling in that was refactored in PR #3526, which right now is Work Queue and Task Vine. Fixes #3471 --- parsl/executors/status_handling.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 0f7ed90592..200b43cc41 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -12,7 +12,7 @@ from parsl.executors.base import ParslExecutor from parsl.executors.errors import BadStateException, ScalingFailed from parsl.jobs.error_handlers import noop_error_handler, simple_error_handler -from parsl.jobs.states import JobState, JobStatus +from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus from parsl.monitoring.message_type import MessageType from parsl.providers.base import ExecutionProvider from parsl.utils import AtomicIDCounter @@ -222,16 +222,20 @@ def scale_in(self, blocks: int) -> List[str]: :return: A list of block ids corresponding to the blocks that were removed. """ - # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:blocks] - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] + + active_blocks = [block_id for block_id, status in self._status.items() + if status.state not in TERMINAL_STATES] + + block_ids_to_kill = active_blocks[:blocks] + + job_ids_to_kill = [self.blocks_to_job_id[block] for block in block_ids_to_kill] # Cancel the blocks provisioned if self.provider: - logger.info(f"Scaling in jobs: {kill_ids}") - r = self.provider.cancel(kill_ids) - job_ids = self._filter_scale_in_ids(kill_ids, r) - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] + logger.info(f"Scaling in jobs: {job_ids_to_kill}") + r = self.provider.cancel(job_ids_to_kill) + job_ids = self._filter_scale_in_ids(job_ids_to_kill, r) + block_ids_killed = [self.job_ids_to_block[job_id] for job_id in job_ids] return block_ids_killed else: logger.error("No execution provider available to scale in") From a24bc932117c1ba50701992b41efdda79f3e3eed Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 11:48:57 +0200 Subject: [PATCH 25/38] Test UDP monitoring radio (#3555) Before this PR, this radio was omitted because all of executors in the basic monitoring test had over time been switched to different defaults. --- parsl/tests/test_monitoring/test_basic.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/parsl/tests/test_monitoring/test_basic.py b/parsl/tests/test_monitoring/test_basic.py index c900670ec8..1c792a9d82 100644 --- a/parsl/tests/test_monitoring/test_basic.py +++ b/parsl/tests/test_monitoring/test_basic.py @@ -25,10 +25,23 @@ def this_app(): # a configuration that is suitably configured for monitoring. def htex_config(): + """This config will use htex's default htex-specific monitoring radio mode""" from parsl.tests.configs.htex_local_alternate import fresh_config return fresh_config() +def htex_udp_config(): + """This config will force UDP""" + from parsl.tests.configs.htex_local_alternate import fresh_config + c = fresh_config() + assert len(c.executors) == 1 + + assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio" + c.executors[0].radio_mode = "udp" + + return c + + def workqueue_config(): from parsl.tests.configs.workqueue_ex import fresh_config c = fresh_config() @@ -48,7 +61,7 @@ def taskvine_config(): @pytest.mark.local -@pytest.mark.parametrize("fresh_config", [htex_config, workqueue_config, taskvine_config]) +@pytest.mark.parametrize("fresh_config", [htex_config, htex_udp_config, workqueue_config, taskvine_config]) def test_row_counts(tmpd_cwd, fresh_config): # this is imported here rather than at module level because # it isn't available in a plain parsl install, so this module From 11d88db8aa63d7228681b3a08f384a8a0088f63c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 13:32:15 +0200 Subject: [PATCH 26/38] Inline _scale_out in BlockProviderExecutor (#3554) This brings two related pieces of code together into a single method, removing the possibility of the _scale_out code being called in any other way than via scale_out_facade. Future PRs will rearrange the now unified code and make a bugfix that will be more easily fixable now. This PR should not change behaviour as it is only a code movement. --- parsl/executors/status_handling.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 200b43cc41..5425094bca 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -183,22 +183,13 @@ def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) - return list(compress(to_kill, killed)) def scale_out_facade(self, n: int) -> List[str]: - block_ids = self._scale_out(n) - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) - return block_ids - - def _scale_out(self, blocks: int = 1) -> List[str]: """Scales out the number of blocks by "blocks" """ if not self.provider: raise ScalingFailed(self, "No execution provider available") block_ids = [] - logger.info(f"Scaling out by {blocks} blocks") - for _ in range(blocks): + logger.info(f"Scaling out by {n} blocks") + for _ in range(n): block_id = str(self._block_id_counter.get_id()) logger.info(f"Allocated block ID {block_id}") try: @@ -208,6 +199,12 @@ def _scale_out(self, blocks: int = 1) -> List[str]: block_ids.append(block_id) except Exception as ex: self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + + new_status = {} + for block_id in block_ids: + new_status[block_id] = JobStatus(JobState.PENDING) + self.send_monitoring_info(new_status) + self._status.update(new_status) return block_ids def scale_in(self, blocks: int) -> List[str]: From 1fca73c92403255e1d6b58808e021836fcb2b2e0 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 15:43:56 +0200 Subject: [PATCH 27/38] Bring block table updates together in scale out (#3562) There are multiple structures that contain information about blocks and the status of those blocks. This PR is part of ongoing work to make the information contained in those blocks more consistent. This PR brings updates to executor._status (before PR #3352, living in parsl.jobs.job_status_poller) together with updates to the block/job id mapping structures (which has existed in the executor layer under different names since commit a1963bf36fa6bac5bb5b28757a2c5d4a1fbe0462 introduced self.engines in 2018). This PR should not change behaviour: it moves code around in a way that should not affect how the various structures are left populated at the end of the method. This PR makes the cause of issue #3235 clearer, without attempting to fix it: in the successful code path touched by this PR, executor._status is updated immediately, but in the exception path, the status update only goes into _simulated_status and does not appear in executor._status until much later (when self.status() merges provider-provided status and simulated status driven by the job status poller). A subsequent PR will address issue #3235 --- parsl/executors/status_handling.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 5425094bca..5ca70c5877 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -188,23 +188,27 @@ def scale_out_facade(self, n: int) -> List[str]: if not self.provider: raise ScalingFailed(self, "No execution provider available") block_ids = [] + monitoring_status_changes = {} logger.info(f"Scaling out by {n} blocks") for _ in range(n): block_id = str(self._block_id_counter.get_id()) logger.info(f"Allocated block ID {block_id}") try: job_id = self._launch_block(block_id) + + pending_status = JobStatus(JobState.PENDING) + self.blocks_to_job_id[block_id] = job_id self.job_ids_to_block[job_id] = block_id + self._status[block_id] = pending_status + + monitoring_status_changes[block_id] = pending_status block_ids.append(block_id) + except Exception as ex: self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) + self.send_monitoring_info(monitoring_status_changes) return block_ids def scale_in(self, blocks: int) -> List[str]: From 21362b5702316370aa1f68479e431b30ba1e8de5 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 16:40:38 +0200 Subject: [PATCH 28/38] Update _status cache on failed blocks (#3563) _status stores the status of blocks. This structure is periodically updated by executor.status(), but any changes to block status that happen before such an update happens need to be done explicitly, to make them appear in _status immediately. Before this PR, this was done for launched blocks, 6 lines before this change, but not for failed blocks. This is the cause of issue #3235 - scaling code does not become aware of failed blocks until executor.status() updates _status on a slow poll. This PR makes _status be updated with that failed block information immediately (in addition to the existing code which places it in _simulated_status for when executor.status() rebuilds the entire _status structure). This PR has a test case which fails before this PR's change to status_handling.py but passes afterwards. Changed Behaviour Earlier recognition of failed blocks by the scaling code, which should lead to earlier overall failure when the scaling code decides to abort. Fixes #3235 --- parsl/executors/status_handling.py | 4 +- ...st_disconnected_blocks_failing_provider.py | 71 +++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 5ca70c5877..34db2300f6 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -206,7 +206,9 @@ def scale_out_facade(self, n: int) -> List[str]: block_ids.append(block_id) except Exception as ex: - self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + failed_status = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + self._simulated_status[block_id] = failed_status + self._status[block_id] = failed_status self.send_monitoring_info(monitoring_status_changes) return block_ids diff --git a/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py b/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py new file mode 100644 index 0000000000..b2fa507aca --- /dev/null +++ b/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py @@ -0,0 +1,71 @@ +import logging + +import pytest + +import parsl +from parsl import Config +from parsl.executors import HighThroughputExecutor +from parsl.executors.errors import BadStateException +from parsl.jobs.states import JobState, JobStatus +from parsl.providers import LocalProvider + + +class FailingProvider(LocalProvider): + def submit(*args, **kwargs): + raise RuntimeError("Deliberate failure of provider.submit") + + +def local_config(): + """Config to simulate failing blocks without connecting""" + return Config( + executors=[ + HighThroughputExecutor( + label="HTEX", + heartbeat_period=1, + heartbeat_threshold=2, + poll_period=100, + max_workers_per_node=1, + provider=FailingProvider( + init_blocks=0, + max_blocks=2, + min_blocks=0, + ), + ) + ], + max_idletime=0.5, + strategy='htex_auto_scale', + strategy_period=0.1 + # this strategy period needs to be a few times smaller than the + # status_polling_interval of FailingProvider, which is 5s at + # time of writing + ) + + +@parsl.python_app +def double(x): + return x * 2 + + +@pytest.mark.local +def test_disconnected_blocks(): + """Test reporting of blocks that fail to connect from HTEX""" + dfk = parsl.dfk() + executor = dfk.executors["HTEX"] + + connected_blocks = executor.connected_blocks() + assert not connected_blocks, "Expected 0 blocks" + + future = double(5) + with pytest.raises(BadStateException): + future.result() + + assert isinstance(future.exception(), BadStateException) + + status_dict = executor.status() + assert len(status_dict) == 1, "Expected exactly 1 block" + for status in status_dict.values(): + assert isinstance(status, JobStatus) + assert status.state == JobState.MISSING + + connected_blocks = executor.connected_blocks() + assert connected_blocks == [], "Expected exactly 0 connected blocks" From 2b01411bb24f2d05547f36033b83fe7790d431be Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 2 Aug 2024 12:45:48 +0200 Subject: [PATCH 29/38] Make monitoring hub start into its own Exception (#3561) This is in line with the principle that Parsl exceptions should all be subclasses of ParslError. --- parsl/monitoring/errors.py | 6 ++++++ parsl/monitoring/monitoring.py | 3 ++- 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 parsl/monitoring/errors.py diff --git a/parsl/monitoring/errors.py b/parsl/monitoring/errors.py new file mode 100644 index 0000000000..f41225ff44 --- /dev/null +++ b/parsl/monitoring/errors.py @@ -0,0 +1,6 @@ +from parsl.errors import ParslError + + +class MonitoringHubStartError(ParslError): + def __str__(self) -> str: + return "Hub failed to start" diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index f86bf81e87..c9a2dc9ed7 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -12,6 +12,7 @@ import typeguard from parsl.log_utils import set_file_logger +from parsl.monitoring.errors import MonitoringHubStartError from parsl.monitoring.message_type import MessageType from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter @@ -195,7 +196,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat comm_q.join_thread() except queue.Empty: logger.error("Hub has not completed initialization in 120s. Aborting") - raise Exception("Hub failed to start") + raise MonitoringHubStartError() if isinstance(comm_q_result, str): logger.error(f"MonitoringRouter sent an error message: {comm_q_result}") From 4f139c28118d625c893f67417b0bc391f535ac8e Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 2 Aug 2024 18:55:24 +0200 Subject: [PATCH 30/38] Factor interchange monitoring code into a ZMQRadioSender (#3556) From an interchange perspective: this is a refactoring intended to clarify that the interchange isn't doing anything special wrt. monitoring messages and that it can send monitoring messages in the same way that remote workers can. From a monitoring perspective: this pulls ZMQ sender code out of the interchange and puts it in a place that is more natural for ongoing development. For example, a potential future use with Work Queue and Task Vine is that workers would also benefit from using ZMQ to send monitoring messages. In some potential use cases, it might be desirable to configure the radio used by the interchange instead of the hard-coded ZMQRadio. On-going work in draft PR #3315 addresses configuration of different types of radio and that work should be relevant here too. --- .../executors/high_throughput/interchange.py | 65 +++++++++---------- parsl/monitoring/radios.py | 16 +++++ 2 files changed, 47 insertions(+), 34 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 9ebe6b95b9..5da83ae3ca 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -20,6 +20,7 @@ from parsl.executors.high_throughput.manager_record import ManagerRecord from parsl.executors.high_throughput.manager_selector import ManagerSelector from parsl.monitoring.message_type import MessageType +from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender from parsl.process_loggers import wrap_with_logs from parsl.serialize import serialize as serialize_object from parsl.utils import setproctitle @@ -219,27 +220,15 @@ def task_puller(self) -> NoReturn: task_counter += 1 logger.debug(f"Fetched {task_counter} tasks so far") - def _create_monitoring_channel(self) -> Optional[zmq.Socket]: - if self.hub_address and self.hub_zmq_port: - logger.info("Connecting to MonitoringHub") - # This is a one-off because monitoring is unencrypted - hub_channel = zmq.Context().socket(zmq.DEALER) - hub_channel.set_hwm(0) - hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_zmq_port)) - logger.info("Connected to MonitoringHub") - return hub_channel - else: - return None - - def _send_monitoring_info(self, hub_channel: Optional[zmq.Socket], manager: ManagerRecord) -> None: - if hub_channel: + def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None: + if monitoring_radio: logger.info("Sending message {} to MonitoringHub".format(manager)) d: Dict = cast(Dict, manager.copy()) d['timestamp'] = datetime.datetime.now() d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat']) - hub_channel.send_pyobj((MessageType.NODE_INFO, d)) + monitoring_radio.send((MessageType.NODE_INFO, d)) @wrap_with_logs(target="interchange") def _command_server(self) -> NoReturn: @@ -247,8 +236,11 @@ def _command_server(self) -> NoReturn: """ logger.debug("Command Server Starting") - # Need to create a new ZMQ socket for command server thread - hub_channel = self._create_monitoring_channel() + if self.hub_address is not None and self.hub_zmq_port is not None: + logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port) + monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port) + else: + monitoring_radio = None reply: Any # the type of reply depends on the command_req received (aka this needs dependent types...) @@ -298,7 +290,7 @@ def _command_server(self) -> NoReturn: if manager_id in self._ready_managers: m = self._ready_managers[manager_id] m['active'] = False - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) else: logger.warning("Worker to hold was not in ready managers list") @@ -333,9 +325,14 @@ def start(self) -> None: # parent-process-inheritance problems. signal.signal(signal.SIGTERM, signal.SIG_DFL) - logger.info("Incoming ports bound") + logger.info("Starting main interchange method") - hub_channel = self._create_monitoring_channel() + if self.hub_address is not None and self.hub_zmq_port is not None: + logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port) + monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port) + logger.debug("Created monitoring radio") + else: + monitoring_radio = None poll_period = self.poll_period @@ -366,10 +363,10 @@ def start(self) -> None: while not kill_event.is_set(): self.socks = dict(poller.poll(timeout=poll_period)) - self.process_task_outgoing_incoming(interesting_managers, hub_channel, kill_event) - self.process_results_incoming(interesting_managers, hub_channel) - self.expire_bad_managers(interesting_managers, hub_channel) - self.expire_drained_managers(interesting_managers, hub_channel) + self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event) + self.process_results_incoming(interesting_managers, monitoring_radio) + self.expire_bad_managers(interesting_managers, monitoring_radio) + self.expire_drained_managers(interesting_managers, monitoring_radio) self.process_tasks_to_send(interesting_managers) self.zmq_context.destroy() @@ -380,7 +377,7 @@ def start(self) -> None: def process_task_outgoing_incoming( self, interesting_managers: Set[bytes], - hub_channel: Optional[zmq.Socket], + monitoring_radio: Optional[MonitoringRadioSender], kill_event: threading.Event ) -> None: """Process one message from manager on the task_outgoing channel. @@ -434,7 +431,7 @@ def process_task_outgoing_incoming( m.update(msg) # type: ignore[typeddict-item] logger.info("Registration info for manager {!r}: {}".format(manager_id, msg)) - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or msg['parsl_v'] != self.current_platform['parsl_v']): @@ -465,7 +462,7 @@ def process_task_outgoing_incoming( logger.error(f"Unexpected message type received from manager: {msg['type']}") logger.debug("leaving task_outgoing section") - def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None: for manager_id in list(interesting_managers): # is it always true that a draining manager will be in interesting managers? @@ -478,7 +475,7 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: self._ready_managers.pop(manager_id) m['active'] = False - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers @@ -521,7 +518,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: else: logger.debug("either no interesting managers or no tasks, so skipping manager pass") - def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None: # Receive any results and forward to client if self.results_incoming in self.socks and self.socks[self.results_incoming] == zmq.POLLIN: logger.debug("entering results_incoming section") @@ -541,11 +538,11 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel elif r['type'] == 'monitoring': # the monitoring code makes the assumption that no # monitoring messages will be received if monitoring - # is not configured, and that hub_channel will only + # is not configured, and that monitoring_radio will only # be None when monitoring is not configurated. - assert hub_channel is not None + assert monitoring_radio is not None - hub_channel.send_pyobj(r['payload']) + monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") b_messages.append((p_message, r)) @@ -589,7 +586,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel interesting_managers.add(manager_id) logger.debug("leaving results_incoming section") - def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + def expire_bad_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None: bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if time.time() - m['last_heartbeat'] > self.heartbeat_threshold] for (manager_id, m) in bad_managers: @@ -597,7 +594,7 @@ def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Opt logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager") if m['active']: m['active'] = False - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) logger.warning(f"Cancelling htex tasks {m['tasks']} on removed manager") for tid in m['tasks']: diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py index 6c77fd37b1..37bef0b06a 100644 --- a/parsl/monitoring/radios.py +++ b/parsl/monitoring/radios.py @@ -7,6 +7,8 @@ from multiprocessing.queues import Queue from typing import Optional +import zmq + from parsl.serialize import serialize _db_manager_excepts: Optional[Exception] @@ -186,3 +188,17 @@ def __init__(self, queue: Queue) -> None: def send(self, message: object) -> None: self.queue.put((message, 0)) + + +class ZMQRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over ZMQ. This radio is not + thread-safe, because its use of ZMQ is not thread-safe. + """ + + def __init__(self, hub_address: str, hub_zmq_port: int) -> None: + self._hub_channel = zmq.Context().socket(zmq.DEALER) + self._hub_channel.set_hwm(0) + self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") + + def send(self, message: object) -> None: + self._hub_channel.send_pyobj(message) From 2f6a185c82f15800f335bd4c371ff93e2df1def9 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 3 Aug 2024 16:13:19 +0200 Subject: [PATCH 31/38] Fix pytest caplog heisenbug introduced in PR #3559 (#3565) Prior to this PR, this usage of caplog is dependent on the level of the root logger, which is not set by this test and so ends up being dependent on which tests have run before: sometimes the INFO logs output by htex.shutdown are not captured by caplog. This PR explicitly tells caplog to get at least INFO level logs, to capture the expected messages. --- parsl/tests/test_htex/test_htex.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index 80a4e91bd5..810236c1b4 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -1,3 +1,4 @@ +import logging import pathlib from subprocess import Popen, TimeoutExpired from typing import Optional, Sequence @@ -107,7 +108,8 @@ def kill_interchange(*args, **kwargs): mock_ix_proc.terminate.side_effect = kill_interchange - htex.shutdown() + with caplog.at_level(logging.INFO): + htex.shutdown() if started: assert mock_ix_proc.terminate.called From d8e8d4b3d99b2f034b1d5e80f89f58e1278486c2 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 5 Aug 2024 16:29:13 +0200 Subject: [PATCH 32/38] Split monitoring router into two radio-specific receiver threads (#3558) This is part of two ongoing strands of development: * Preparation for arbitrary monitoring radio receivers, where UDP is not special compared to other methods. This code separates out the UDP specific code making it easier to move around later (see development in PR #3315) * Avoiding poll-one-thing, poll-another-thing tight polling loops in favour of multiple blocking loops. The two router receiver sections used to run in a single tight non-blocking loop, each component waiting loop_freq (= 10 ms) for something to happen. After this PR, the separated loops are more amenable to longer blocking times - they only need to discover when exit event is set which probably can be more on the order of 1 second. --- parsl/monitoring/router.py | 54 ++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 9a422027c1..bf395e3662 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -5,6 +5,7 @@ import pickle import queue import socket +import threading import time from multiprocessing.synchronize import Event from typing import Optional, Tuple, Union @@ -108,7 +109,24 @@ def __init__(self, self.resource_msgs = resource_msgs self.exit_event = exit_event + @wrap_with_logs(target="monitoring_router") def start(self) -> None: + self.logger.info("Starting UDP listener thread") + udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener, daemon=True) + udp_radio_receiver_thread.start() + + self.logger.info("Starting ZMQ listener thread") + zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener, daemon=True) + zmq_radio_receiver_thread.start() + + self.logger.info("Joining on ZMQ listener thread") + zmq_radio_receiver_thread.join() + self.logger.info("Joining on UDP listener thread") + udp_radio_receiver_thread.join() + self.logger.info("Joined on both ZMQ and UDP listener threads") + + @wrap_with_logs(target="monitoring_router") + def start_udp_listener(self) -> None: try: while not self.exit_event.is_set(): try: @@ -119,6 +137,26 @@ def start(self) -> None: except socket.timeout: pass + self.logger.info("UDP listener draining") + last_msg_received_time = time.time() + while time.time() - last_msg_received_time < self.atexit_timeout: + try: + data, addr = self.udp_sock.recvfrom(2048) + msg = pickle.loads(data) + self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) + self.resource_msgs.put((msg, addr)) + last_msg_received_time = time.time() + except socket.timeout: + pass + + self.logger.info("UDP listener finishing normally") + finally: + self.logger.info("UDP listener finished") + + @wrap_with_logs(target="monitoring_router") + def start_zmq_listener(self) -> None: + try: + while not self.exit_event.is_set(): try: dfk_loop_start = time.time() while time.time() - dfk_loop_start < 1.0: # TODO make configurable @@ -161,21 +199,9 @@ def start(self) -> None: # thing to do. self.logger.warning("Failure processing a ZMQ message", exc_info=True) - self.logger.info("Monitoring router draining") - last_msg_received_time = time.time() - while time.time() - last_msg_received_time < self.atexit_timeout: - try: - data, addr = self.udp_sock.recvfrom(2048) - msg = pickle.loads(data) - self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.resource_msgs.put((msg, addr)) - last_msg_received_time = time.time() - except socket.timeout: - pass - - self.logger.info("Monitoring router finishing normally") + self.logger.info("ZMQ listener finishing normally") finally: - self.logger.info("Monitoring router finished") + self.logger.info("ZMQ listener finished") @wrap_with_logs From 10a6a00144bbbcf12923e95b8f940370fcf76e9a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 6 Aug 2024 23:39:05 +0200 Subject: [PATCH 33/38] Remove monitoring router modification of node message (#3567) Prior to this PR, the monitoring router would add a run_id field to every NODE_INFO message that it received. These are messages from the interchange describing worker pools. The monitoring router does not modify any other messages. This PR sets the run_id at the point of message origination inside the interchange (in _send_monitoring_info), and makes the router leave NODE_INFO messages unmodified (like the other message types). This is part of work to make the router less aware of message types by removing a bunch of message-type specific handling. This PR brings in a bunch of rewiring to get the run id into the interchange rather than into the monitoring router. * Changed Behaviour This should not change any workflow-user-facing behaviour. Globus Compute (or anyone else building a fake Parsl environment) will maybe have to change how they fake their Parsl implementation to pass in a run id (the executor.run_id part of dfk.add_executors). --- parsl/dataflow/dflow.py | 2 +- parsl/executors/high_throughput/executor.py | 1 + parsl/executors/high_throughput/interchange.py | 4 ++++ parsl/monitoring/monitoring.py | 3 +-- parsl/monitoring/router.py | 7 +------ parsl/tests/test_htex/test_zmq_binding.py | 3 ++- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 88ef063230..344173c4b1 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -116,7 +116,7 @@ def __init__(self, config: Config) -> None: if self.monitoring: if self.monitoring.logdir is None: self.monitoring.logdir = self.run_dir - self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir) + self.monitoring.start(self.run_dir, self.config.run_dir) self.time_began = datetime.datetime.now() self.time_completed: Optional[datetime.datetime] = None diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 6c181cdee7..1a56195c07 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -551,6 +551,7 @@ def _start_local_interchange_process(self) -> None: "logging_level": logging.DEBUG if self.worker_debug else logging.INFO, "cert_dir": self.cert_dir, "manager_selector": self.manager_selector, + "run_id": self.run_id, } config_pickle = pickle.dumps(interchange_config) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 5da83ae3ca..fa0969d398 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -55,6 +55,7 @@ def __init__(self, poll_period: int, cert_dir: Optional[str], manager_selector: ManagerSelector, + run_id: str, ) -> None: """ Parameters @@ -125,6 +126,8 @@ def __init__(self, self.command_channel.connect("tcp://{}:{}".format(client_address, client_ports[2])) logger.info("Connected to client") + self.run_id = run_id + self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port @@ -227,6 +230,7 @@ def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender d: Dict = cast(Dict, manager.copy()) d['timestamp'] = datetime.datetime.now() d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat']) + d['run_id'] = self.run_id monitoring_radio.send((MessageType.NODE_INFO, d)) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index c9a2dc9ed7..9dccbecd35 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -106,7 +106,7 @@ def __init__(self, self.resource_monitoring_enabled = resource_monitoring_enabled self.resource_monitoring_interval = resource_monitoring_interval - def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: + def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: logger.debug("Starting MonitoringHub") @@ -161,7 +161,6 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat "zmq_port_range": self.hub_port_range, "logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, - "run_id": run_id }, name="Monitoring-Router-Process", daemon=True, diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index bf395e3662..4be454b797 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -31,7 +31,6 @@ def __init__(self, monitoring_hub_address: str = "127.0.0.1", logdir: str = ".", - run_id: str, logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds priority_msgs: "queue.Queue[AddressedMonitoringMessage]", @@ -71,7 +70,6 @@ def __init__(self, self.hub_address = hub_address self.atexit_timeout = atexit_timeout - self.run_id = run_id self.loop_freq = 10.0 # milliseconds @@ -172,7 +170,6 @@ def start_zmq_listener(self) -> None: msg_0 = (msg, 0) if msg[0] == MessageType.NODE_INFO: - msg[1]['run_id'] = self.run_id self.node_msgs.put(msg_0) elif msg[0] == MessageType.RESOURCE_INFO: self.resource_msgs.put(msg_0) @@ -218,8 +215,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range: Tuple[int, int], logdir: str, - logging_level: int, - run_id: str) -> None: + logging_level: int) -> None: setproctitle("parsl: monitoring router") try: router = MonitoringRouter(hub_address=hub_address, @@ -227,7 +223,6 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - run_id=run_id, priority_msgs=priority_msgs, node_msgs=node_msgs, block_msgs=block_msgs, diff --git a/parsl/tests/test_htex/test_zmq_binding.py b/parsl/tests/test_htex/test_zmq_binding.py index 2273443b99..e21c065d0d 100644 --- a/parsl/tests/test_htex/test_zmq_binding.py +++ b/parsl/tests/test_htex/test_zmq_binding.py @@ -25,7 +25,8 @@ def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[s logdir=".", logging_level=logging.INFO, manager_selector=RandomManagerSelector(), - poll_period=10) + poll_period=10, + run_id="test_run_id") @pytest.fixture From 1c7a0e40ed37b4ffe6c31633d4de4e1d9360e9f9 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 8 Aug 2024 10:40:22 +0200 Subject: [PATCH 34/38] Aggressively deprecate Channels and AdHocProvider (#3569) This is described in issue #3515. Issue #3515 has not received any comments arguing in favour of retaining channels and the AdHocprovider, and several in support of removing them, and so this PR takes a heavy handed approach that is well on the way to the end goal of #3515 of deleting channels and the AdHocProvider entirely: Channels except LocalChannel are renamed, so that any users of other channels will have to make a change to their code and actively observe the word "Deprecated" in the name. The AdHocProvider is renamed in the same way. Most documentation (but not docstrings) about channels and the ad-hoc provider is removed or replaced with a link to issue Tests which much be manually run, and so in effect are never run and shouldn't be expected to work now - parsl/tests/manual_tests and parsl/tests/integration/ - are deleted rather than fixed to follow the above naming change. The tests for SSH channels and the AdHocProvider that run in CI are modified to continue passing. Exposure of the deprecated components via top level parsl.providers and parsl.channels re-export is removed. To use this components, the deprecated modules must be imported directly. --- docs/historical/changelog.rst | 8 +-- docs/reference.rst | 13 ++--- docs/userguide/configuring.rst | 38 +++----------- docs/userguide/examples/config.py | 5 +- docs/userguide/execution.rst | 3 +- docs/userguide/plugins.rst | 11 ++--- parsl/channels/__init__.py | 5 +- parsl/channels/oauth_ssh/oauth_ssh.py | 4 +- parsl/channels/ssh/ssh.py | 2 +- parsl/channels/ssh_il/ssh_il.py | 4 +- parsl/configs/ad_hoc.py | 38 -------------- parsl/providers/__init__.py | 4 -- parsl/providers/ad_hoc/ad_hoc.py | 8 ++- parsl/tests/configs/ad_hoc_cluster_htex.py | 35 ------------- parsl/tests/configs/htex_ad_hoc_cluster.py | 26 ---------- parsl/tests/configs/local_adhoc.py | 4 +- parsl/tests/configs/swan_htex.py | 43 ---------------- .../integration/test_channels/test_scp_1.py | 45 ----------------- .../integration/test_channels/test_ssh_1.py | 40 --------------- .../test_channels/test_ssh_errors.py | 46 ----------------- .../test_channels/test_ssh_file_transport.py | 41 ---------------- .../test_channels/test_ssh_interactive.py | 24 --------- parsl/tests/manual_tests/test_ad_hoc_htex.py | 49 ------------------- parsl/tests/manual_tests/test_oauth_ssh.py | 13 ----- .../test_providers/test_local_provider.py | 11 +++-- 25 files changed, 40 insertions(+), 480 deletions(-) delete mode 100644 parsl/configs/ad_hoc.py delete mode 100644 parsl/tests/configs/ad_hoc_cluster_htex.py delete mode 100644 parsl/tests/configs/htex_ad_hoc_cluster.py delete mode 100644 parsl/tests/configs/swan_htex.py delete mode 100644 parsl/tests/integration/test_channels/test_scp_1.py delete mode 100644 parsl/tests/integration/test_channels/test_ssh_1.py delete mode 100644 parsl/tests/integration/test_channels/test_ssh_errors.py delete mode 100644 parsl/tests/integration/test_channels/test_ssh_file_transport.py delete mode 100644 parsl/tests/integration/test_channels/test_ssh_interactive.py delete mode 100644 parsl/tests/manual_tests/test_ad_hoc_htex.py delete mode 100644 parsl/tests/manual_tests/test_oauth_ssh.py diff --git a/docs/historical/changelog.rst b/docs/historical/changelog.rst index 18fe6ca5b1..931998f93d 100644 --- a/docs/historical/changelog.rst +++ b/docs/historical/changelog.rst @@ -334,7 +334,7 @@ New Functionality * New launcher: `parsl.launchers.WrappedLauncher` for launching tasks inside containers. -* `parsl.channels.SSHChannel` now supports a ``key_filename`` kwarg `issue#1639 `_ +* ``parsl.channels.SSHChannel`` now supports a ``key_filename`` kwarg `issue#1639 `_ * Newly added Makefile wraps several frequent developer operations such as: @@ -442,7 +442,7 @@ New Functionality module, parsl.data_provider.globus * `parsl.executors.WorkQueueExecutor`: a new executor that integrates functionality from `Work Queue `_ is now available. -* New provider to support for Ad-Hoc clusters `parsl.providers.AdHocProvider` +* New provider to support for Ad-Hoc clusters ``parsl.providers.AdHocProvider`` * New provider added to support LSF on Summit `parsl.providers.LSFProvider` * Support for CPU and Memory resource hints to providers `(github) `_. * The ``logging_level=logging.INFO`` in `parsl.monitoring.MonitoringHub` is replaced with ``monitoring_debug=False``: @@ -468,7 +468,7 @@ New Functionality * Several test-suite improvements that have dramatically reduced test duration. * Several improvements to the Monitoring interface. -* Configurable port on `parsl.channels.SSHChannel`. +* Configurable port on ``parsl.channels.SSHChannel``. * ``suppress_failure`` now defaults to True. * `parsl.executors.HighThroughputExecutor` is the recommended executor, and ``IPyParallelExecutor`` is deprecated. * `parsl.executors.HighThroughputExecutor` will expose worker information via environment variables: ``PARSL_WORKER_RANK`` and ``PARSL_WORKER_COUNT`` @@ -532,7 +532,7 @@ New Functionality * Cleaner user app file log management. * Updated configurations using `parsl.executors.HighThroughputExecutor` in the configuration section of the userguide. -* Support for OAuth based SSH with `parsl.channels.OAuthSSHChannel`. +* Support for OAuth based SSH with ``parsl.channels.OAuthSSHChannel``. Bug Fixes ^^^^^^^^^ diff --git a/docs/reference.rst b/docs/reference.rst index 1af850792c..d8e18bd244 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -38,15 +38,9 @@ Configuration Channels ======== -.. autosummary:: - :toctree: stubs - :nosignatures: - - parsl.channels.base.Channel - parsl.channels.LocalChannel - parsl.channels.SSHChannel - parsl.channels.OAuthSSHChannel - parsl.channels.SSHInteractiveLoginChannel +Channels are deprecated in Parsl. See +`issue 3515 `_ +for further discussion. Data management =============== @@ -109,7 +103,6 @@ Providers :toctree: stubs :nosignatures: - parsl.providers.AdHocProvider parsl.providers.AWSProvider parsl.providers.CobaltProvider parsl.providers.CondorProvider diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index 24ce0ca938..bb3a3949e3 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -15,7 +15,7 @@ queues, durations, and data management options. The following example shows a basic configuration object (:class:`~parsl.config.Config`) for the Frontera supercomputer at TACC. This config uses the `parsl.executors.HighThroughputExecutor` to submit -tasks from a login node (`parsl.channels.LocalChannel`). It requests an allocation of +tasks from a login node. It requests an allocation of 128 nodes, deploying 1 worker for each of the 56 cores per node, from the normal partition. To limit network connections to just the internal network the config specifies the address used by the infiniband interface with ``address_by_interface('ib0')`` @@ -23,7 +23,6 @@ used by the infiniband interface with ``address_by_interface('ib0')`` .. code-block:: python from parsl.config import Config - from parsl.channels import LocalChannel from parsl.providers import SlurmProvider from parsl.executors import HighThroughputExecutor from parsl.launchers import SrunLauncher @@ -36,7 +35,6 @@ used by the infiniband interface with ``address_by_interface('ib0')`` address=address_by_interface('ib0'), max_workers_per_node=56, provider=SlurmProvider( - channel=LocalChannel(), nodes_per_block=128, init_blocks=1, partition='normal', @@ -197,22 +195,6 @@ Stepping through the following question should help formulate a suitable configu are on a **native Slurm** system like :ref:`configuring_nersc_cori` -4) Where will the main Parsl program run and how will it communicate with the apps? - -+------------------------+--------------------------+---------------------------------------------------+ -| Parsl program location | App execution target | Suitable channel | -+========================+==========================+===================================================+ -| Laptop/Workstation | Laptop/Workstation | `parsl.channels.LocalChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Cloud Resources | No channel is needed | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Clusters with no 2FA | `parsl.channels.SSHChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Clusters with 2FA | `parsl.channels.SSHInteractiveLoginChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Login node | Cluster/Supercomputer | `parsl.channels.LocalChannel` | -+------------------------+--------------------------+---------------------------------------------------+ - Heterogeneous Resources ----------------------- @@ -337,7 +319,6 @@ Provide either the number of executors (Parsl will assume they are named in inte worker_debug=True, available_accelerators=2, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), @@ -372,7 +353,6 @@ Select the best blocking strategy for processor's cache hierarchy (choose ``alte worker_debug=True, cpu_affinity='alternating', provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), @@ -412,18 +392,12 @@ These include ``OMP_NUM_THREADS``, ``GOMP_COMP_AFFINITY``, and ``KMP_THREAD_AFFI Ad-Hoc Clusters --------------- -Any collection of compute nodes without a scheduler can be considered an -ad-hoc cluster. Often these machines have a shared file system such as NFS or Lustre. -In order to use these resources with Parsl, they need to set-up for password-less SSH access. - -To use these ssh-accessible collection of nodes as an ad-hoc cluster, we use -the `parsl.providers.AdHocProvider` with an `parsl.channels.SSHChannel` to each node. An example -configuration follows. +Parsl's support of ad-hoc clusters of compute nodes without a scheduler +is deprecated. -.. literalinclude:: ../../parsl/configs/ad_hoc.py - -.. note:: - Multiple blocks should not be assigned to each node when using the `parsl.executors.HighThroughputExecutor` +See +`issue #3515 `_ +for further discussion. Amazon Web Services ------------------- diff --git a/docs/userguide/examples/config.py b/docs/userguide/examples/config.py index 166faaf4ac..68057d2b01 100644 --- a/docs/userguide/examples/config.py +++ b/docs/userguide/examples/config.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider @@ -8,9 +7,7 @@ HighThroughputExecutor( label="htex_local", cores_per_worker=1, - provider=LocalProvider( - channel=LocalChannel(), - ), + provider=LocalProvider(), ) ], ) diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index 4168367f9d..df17dc458f 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -47,8 +47,7 @@ Parsl currently supports the following providers: 7. `parsl.providers.AWSProvider`: This provider allows you to provision and manage cloud nodes from Amazon Web Services. 8. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud. 9. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster. -10. `parsl.providers.AdHocProvider`: This provider allows you manage execution over a collection of nodes to form an ad-hoc cluster. -11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. +10. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. diff --git a/docs/userguide/plugins.rst b/docs/userguide/plugins.rst index 4ecff86cfe..c3c38dea63 100644 --- a/docs/userguide/plugins.rst +++ b/docs/userguide/plugins.rst @@ -16,8 +16,8 @@ executor to run code on the local submitting host, while another executor can run the same code on a large supercomputer. -Providers, Launchers and Channels ---------------------------------- +Providers and Launchers +----------------------- Some executors are based on blocks of workers (for example the `parsl.executors.HighThroughputExecutor`: the submit side requires a batch system (eg slurm, kubernetes) to start worker processes, which then @@ -34,10 +34,9 @@ add on any wrappers that are needed to launch the command (eg srun inside slurm). Providers and launchers are usually paired together for a particular system type. -A `Channel` allows the commands used to interact with an `ExecutionProvider` to be -executed on a remote system. The default channel executes commands on the -local system, but a few variants of an `parsl.channels.SSHChannel` are provided. - +Parsl also has a deprecated ``Channel`` abstraction. See +`issue 3515 `_ +for further discussion. File staging ------------ diff --git a/parsl/channels/__init__.py b/parsl/channels/__init__.py index 5a45d15278..c81f6a8bf1 100644 --- a/parsl/channels/__init__.py +++ b/parsl/channels/__init__.py @@ -1,7 +1,4 @@ from parsl.channels.base import Channel from parsl.channels.local.local import LocalChannel -from parsl.channels.oauth_ssh.oauth_ssh import OAuthSSHChannel -from parsl.channels.ssh.ssh import SSHChannel -from parsl.channels.ssh_il.ssh_il import SSHInteractiveLoginChannel -__all__ = ['Channel', 'SSHChannel', 'LocalChannel', 'SSHInteractiveLoginChannel', 'OAuthSSHChannel'] +__all__ = ['Channel', 'LocalChannel'] diff --git a/parsl/channels/oauth_ssh/oauth_ssh.py b/parsl/channels/oauth_ssh/oauth_ssh.py index c9efa27767..3173b163a8 100644 --- a/parsl/channels/oauth_ssh/oauth_ssh.py +++ b/parsl/channels/oauth_ssh/oauth_ssh.py @@ -3,7 +3,7 @@ import paramiko -from parsl.channels.ssh.ssh import SSHChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel from parsl.errors import OptionalModuleMissing try: @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) -class OAuthSSHChannel(SSHChannel): +class DeprecatedOAuthSSHChannel(DeprecatedSSHChannel): """SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel uses Globus based OAuth tokens for authentication. """ diff --git a/parsl/channels/ssh/ssh.py b/parsl/channels/ssh/ssh.py index bf33727e63..38b8afe47b 100644 --- a/parsl/channels/ssh/ssh.py +++ b/parsl/channels/ssh/ssh.py @@ -24,7 +24,7 @@ def _auth(self, username, *args): return -class SSHChannel(Channel, RepresentationMixin): +class DeprecatedSSHChannel(Channel, RepresentationMixin): ''' SSH persistent channel. This enables remote execution on sites accessible via ssh. It is assumed that the user has setup host keys so as to ssh to the remote host. Which goes to say that the following diff --git a/parsl/channels/ssh_il/ssh_il.py b/parsl/channels/ssh_il/ssh_il.py index 02e7a58cd4..3a5e0c5096 100644 --- a/parsl/channels/ssh_il/ssh_il.py +++ b/parsl/channels/ssh_il/ssh_il.py @@ -3,12 +3,12 @@ import paramiko -from parsl.channels.ssh.ssh import SSHChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel logger = logging.getLogger(__name__) -class SSHInteractiveLoginChannel(SSHChannel): +class DeprecatedSSHInteractiveLoginChannel(DeprecatedSSHChannel): """SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel supports interactive login and is appropriate when keys are not set up. diff --git a/parsl/configs/ad_hoc.py b/parsl/configs/ad_hoc.py deleted file mode 100644 index 05b0e8190d..0000000000 --- a/parsl/configs/ad_hoc.py +++ /dev/null @@ -1,38 +0,0 @@ -from typing import Any, Dict - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider -from parsl.usage_tracking.levels import LEVEL_1 - -user_opts: Dict[str, Dict[str, Any]] -user_opts = {'adhoc': - {'username': 'YOUR_USERNAME', - 'script_dir': 'YOUR_SCRIPT_DIR', - 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } - } - - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - max_workers_per_node=2, - worker_logdir_root=user_opts['adhoc']['script_dir'], - provider=AdHocProvider( - # Command to be run before starting a worker, such as: - # 'module load Anaconda; source activate parsl_env'. - worker_init='', - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], - # AdHoc Clusters should not be setup with scaling strategy. - strategy='none', - usage_tracking=LEVEL_1, -) diff --git a/parsl/providers/__init__.py b/parsl/providers/__init__.py index 475737f1f9..150f425f3d 100644 --- a/parsl/providers/__init__.py +++ b/parsl/providers/__init__.py @@ -1,6 +1,3 @@ -# Workstation Provider -from parsl.providers.ad_hoc.ad_hoc import AdHocProvider - # Cloud Providers from parsl.providers.aws.aws import AWSProvider from parsl.providers.azure.azure import AzureProvider @@ -24,7 +21,6 @@ 'SlurmProvider', 'TorqueProvider', 'LSFProvider', - 'AdHocProvider', 'PBSProProvider', 'AWSProvider', 'GoogleCloudProvider', diff --git a/parsl/providers/ad_hoc/ad_hoc.py b/parsl/providers/ad_hoc/ad_hoc.py index 207dd55738..9059648101 100644 --- a/parsl/providers/ad_hoc/ad_hoc.py +++ b/parsl/providers/ad_hoc/ad_hoc.py @@ -12,8 +12,12 @@ logger = logging.getLogger(__name__) -class AdHocProvider(ExecutionProvider, RepresentationMixin): - """ Ad-hoc execution provider +class DeprecatedAdHocProvider(ExecutionProvider, RepresentationMixin): + """ Deprecated ad-hoc execution provider + + The (former) AdHocProvider is deprecated. See + `issue #3515 `_ + for further discussion. This provider is used to provision execution resources over one or more ad hoc nodes that are each accessible over a Channel (say, ssh) but otherwise lack a cluster scheduler. diff --git a/parsl/tests/configs/ad_hoc_cluster_htex.py b/parsl/tests/configs/ad_hoc_cluster_htex.py deleted file mode 100644 index 0949b82392..0000000000 --- a/parsl/tests/configs/ad_hoc_cluster_htex.py +++ /dev/null @@ -1,35 +0,0 @@ -from typing import Any, Dict - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider - -user_opts = {'adhoc': - {'username': 'YOUR_USERNAME', - 'script_dir': 'YOUR_SCRIPT_DIR', - 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } - } # type: Dict[str, Dict[str, Any]] - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - max_workers_per_node=2, - worker_logdir_root=user_opts['adhoc']['script_dir'], - encrypted=True, - provider=AdHocProvider( - # Command to be run before starting a worker, such as: - # 'module load Anaconda; source activate parsl_env'. - worker_init='', - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], - # AdHoc Clusters should not be setup with scaling strategy. - strategy='none', -) diff --git a/parsl/tests/configs/htex_ad_hoc_cluster.py b/parsl/tests/configs/htex_ad_hoc_cluster.py deleted file mode 100644 index db24b42ab2..0000000000 --- a/parsl/tests/configs/htex_ad_hoc_cluster.py +++ /dev/null @@ -1,26 +0,0 @@ -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider -from parsl.tests.configs.user_opts import user_opts - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - cores_per_worker=1, - worker_debug=False, - address=user_opts['public_ip'], - encrypted=True, - provider=AdHocProvider( - move_files=False, - parallelism=1, - worker_init=user_opts['adhoc']['worker_init'], - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], -) diff --git a/parsl/tests/configs/local_adhoc.py b/parsl/tests/configs/local_adhoc.py index 25b1f38d61..9b1f951842 100644 --- a/parsl/tests/configs/local_adhoc.py +++ b/parsl/tests/configs/local_adhoc.py @@ -1,7 +1,7 @@ from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider +from parsl.providers.ad_hoc.ad_hoc import DeprecatedAdHocProvider def fresh_config(): @@ -10,7 +10,7 @@ def fresh_config(): HighThroughputExecutor( label='AdHoc', encrypted=True, - provider=AdHocProvider( + provider=DeprecatedAdHocProvider( channels=[LocalChannel(), LocalChannel()] ) ) diff --git a/parsl/tests/configs/swan_htex.py b/parsl/tests/configs/swan_htex.py deleted file mode 100644 index 3b1b6785ab..0000000000 --- a/parsl/tests/configs/swan_htex.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -================== Block -| ++++++++++++++ | Node -| | | | -| | Task | | . . . -| | | | -| ++++++++++++++ | -================== -""" -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import AprunLauncher -from parsl.providers import TorqueProvider - -# If you are a developer running tests, make sure to update parsl/tests/configs/user_opts.py -# If you are a user copying-and-pasting this as an example, make sure to either -# 1) create a local `user_opts.py`, or -# 2) delete the user_opts import below and replace all appearances of `user_opts` with the literal value -# (i.e., user_opts['swan']['username'] -> 'your_username') -from .user_opts import user_opts - -config = Config( - executors=[ - HighThroughputExecutor( - label='swan_htex', - encrypted=True, - provider=TorqueProvider( - channel=SSHChannel( - hostname='swan.cray.com', - username=user_opts['swan']['username'], - script_dir=user_opts['swan']['script_dir'], - ), - nodes_per_block=1, - init_blocks=1, - max_blocks=1, - launcher=AprunLauncher(), - scheduler_options=user_opts['swan']['scheduler_options'], - worker_init=user_opts['swan']['worker_init'], - ), - ) - ] -) diff --git a/parsl/tests/integration/test_channels/test_scp_1.py b/parsl/tests/integration/test_channels/test_scp_1.py deleted file mode 100644 index c11df3c663..0000000000 --- a/parsl/tests/integration/test_channels/test_scp_1.py +++ /dev/null @@ -1,45 +0,0 @@ -import os - -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - out = '' - conn = SSH(hostname, username=username) - conn.push_file(os.path.abspath('remote_run.sh'), '/home/davidk/') - # ec, out, err = conn.execute_wait("ls /tmp/remote_run.sh; bash /tmp/remote_run.sh") - conn.close() - return out - - -script = '''#!/bin/bash -echo "Hostname: $HOSTNAME" -echo "Cpu info -----" -cat /proc/cpuinfo -echo "Done----------" -''' - - -def test_connect_1(): - with open('remote_run.sh', 'w') as f: - f.write(script) - - sites = { - 'midway': { - 'url': 'midway.rcc.uchicago.edu', - 'uname': 'yadunand' - }, - 'swift': { - 'url': 'swift.rcc.uchicago.edu', - 'uname': 'yadunand' - } - } - - for site in sites.values(): - out = connect_and_list(site['url'], site['uname']) - print("Sitename :{0} hostname:{1}".format(site['url'], out)) - - -if __name__ == "__main__": - - test_connect_1() diff --git a/parsl/tests/integration/test_channels/test_ssh_1.py b/parsl/tests/integration/test_channels/test_ssh_1.py deleted file mode 100644 index 61ab3f2705..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_1.py +++ /dev/null @@ -1,40 +0,0 @@ -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_midway(): - ''' Test ssh channels to midway - ''' - url = 'midway.rcc.uchicago.edu' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -def test_beagle(): - ''' Test ssh channels to beagle - ''' - url = 'login04.beagle.ci.uchicago.edu' - uname = 'yadunandb' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -def test_osg(): - ''' Test ssh connectivity to osg - ''' - url = 'login.osgconnect.net' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -if __name__ == "__main__": - - pass diff --git a/parsl/tests/integration/test_channels/test_ssh_errors.py b/parsl/tests/integration/test_channels/test_ssh_errors.py deleted file mode 100644 index 7483e30a5c..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_errors.py +++ /dev/null @@ -1,46 +0,0 @@ -from parsl.channels.errors import BadHostKeyException, SSHException -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_error_1(): - try: - connect_and_list("bad.url.gov", "ubuntu") - except Exception as e: - assert type(e) is SSHException, "Expected SSException, got: {0}".format(e) - - -def test_error_2(): - try: - connect_and_list("swift.rcc.uchicago.edu", "mango") - except SSHException: - print("Caught the right exception") - else: - raise Exception("Expected SSException, got: {0}".format(e)) - - -def test_error_3(): - ''' This should work - ''' - try: - connect_and_list("edison.nersc.gov", "yadunand") - except BadHostKeyException as e: - print("Caught exception BadHostKeyException: ", e) - else: - assert False, "Expected SSException, got: {0}".format(e) - - -if __name__ == "__main__": - - tests = [test_error_1, test_error_2, test_error_3] - - for test in tests: - print("---------Running : {0}---------------".format(test)) - test() - print("----------------------DONE--------------------------") diff --git a/parsl/tests/integration/test_channels/test_ssh_file_transport.py b/parsl/tests/integration/test_channels/test_ssh_file_transport.py deleted file mode 100644 index 61672c3ff5..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_file_transport.py +++ /dev/null @@ -1,41 +0,0 @@ -import parsl -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_push(conn, fname="test001.txt"): - - with open(fname, 'w') as f: - f.write("Hello from parsl.ssh testing\n") - - conn.push_file(fname, "/tmp") - ec, out, err = conn.execute_wait("ls /tmp/{0}".format(fname)) - print(ec, out, err) - - -def test_pull(conn, fname="test001.txt"): - - local = "foo" - conn.pull_file("/tmp/{0}".format(fname), local) - - with open("{0}/{1}".format(local, fname), 'r') as f: - print(f.readlines()) - - -if __name__ == "__main__": - - parsl.set_stream_logger() - - # This is for testing - conn = SSH("midway.rcc.uchicago.edu", username="yadunand") - - test_push(conn) - test_pull(conn) - - conn.close() diff --git a/parsl/tests/integration/test_channels/test_ssh_interactive.py b/parsl/tests/integration/test_channels/test_ssh_interactive.py deleted file mode 100644 index c6f9b9dea9..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_interactive.py +++ /dev/null @@ -1,24 +0,0 @@ -import parsl -from parsl.channels.ssh_il.ssh_il import SSHInteractiveLoginChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_cooley(): - ''' Test ssh channels to midway - ''' - url = 'cooley.alcf.anl.gov' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - return - - -if __name__ == "__main__": - parsl.set_stream_logger() - test_cooley() diff --git a/parsl/tests/manual_tests/test_ad_hoc_htex.py b/parsl/tests/manual_tests/test_ad_hoc_htex.py deleted file mode 100644 index dfa34ec0d1..0000000000 --- a/parsl/tests/manual_tests/test_ad_hoc_htex.py +++ /dev/null @@ -1,49 +0,0 @@ -import parsl -from parsl import python_app - -parsl.set_stream_logger() - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider - -remotes = ['midway2-login2.rcc.uchicago.edu', 'midway2-login1.rcc.uchicago.edu'] - -config = Config( - executors=[ - HighThroughputExecutor( - label='AdHoc', - max_workers_per_node=2, - worker_logdir_root="/scratch/midway2/yadunand/parsl_scripts", - encrypted=True, - provider=AdHocProvider( - worker_init="source /scratch/midway2/yadunand/parsl_env_setup.sh", - channels=[SSHChannel(hostname=m, - username="yadunand", - script_dir="/scratch/midway2/yadunand/parsl_cluster") - for m in remotes] - ) - ) - ] -) - - -@python_app -def platform(sleep=2, stdout=None): - import platform - import time - time.sleep(sleep) - return platform.uname() - - -def test_raw_provider(): - - parsl.load(config) - - x = [platform() for i in range(10)] - print([i.result() for i in x]) - - -if __name__ == "__main__": - test_raw_provider() diff --git a/parsl/tests/manual_tests/test_oauth_ssh.py b/parsl/tests/manual_tests/test_oauth_ssh.py deleted file mode 100644 index 3d464bcc0e..0000000000 --- a/parsl/tests/manual_tests/test_oauth_ssh.py +++ /dev/null @@ -1,13 +0,0 @@ -from parsl.channels import OAuthSSHChannel - - -def test_channel(): - channel = OAuthSSHChannel(hostname='ssh.demo.globus.org', username='yadunand') - x, stdout, stderr = channel.execute_wait('ls') - print(x, stdout, stderr) - assert x == 0, "Expected exit code 0, got {}".format(x) - - -if __name__ == '__main__': - - test_channel() diff --git a/parsl/tests/test_providers/test_local_provider.py b/parsl/tests/test_providers/test_local_provider.py index c6844b00c0..497c13370d 100644 --- a/parsl/tests/test_providers/test_local_provider.py +++ b/parsl/tests/test_providers/test_local_provider.py @@ -11,7 +11,8 @@ import pytest -from parsl.channels import LocalChannel, SSHChannel +from parsl.channels import LocalChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel from parsl.jobs.states import JobState from parsl.launchers import SingleNodeLauncher from parsl.providers import LocalProvider @@ -92,10 +93,10 @@ def test_ssh_channel(): # already exist, so create it here. pathlib.Path('{}/known.hosts'.format(config_dir)).touch(mode=0o600) script_dir = tempfile.mkdtemp() - channel = SSHChannel('127.0.0.1', port=server_port, - script_dir=remote_script_dir, - host_keys_filename='{}/known.hosts'.format(config_dir), - key_filename=priv_key) + channel = DeprecatedSSHChannel('127.0.0.1', port=server_port, + script_dir=remote_script_dir, + host_keys_filename='{}/known.hosts'.format(config_dir), + key_filename=priv_key) try: p = LocalProvider(channel=channel, launcher=SingleNodeLauncher(debug=False)) From 114e701b81f1abbc71a4fd438896fece16784f4d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 8 Aug 2024 16:08:47 +0200 Subject: [PATCH 35/38] Close processes in Work Queue and Task Vine shutdown (#3576) This releases 2 file descriptors with work queue (from 21 to 19 at the end of CI Work Queue test) and 4 file descriptors with Task Vine (from 19 to 15 at the end of CI Task Vine test) This is part of work being merged from draft PR #3397 to shut down components more cleanly, rather than relying on process exit. --- parsl/executors/taskvine/executor.py | 2 ++ parsl/executors/workqueue/executor.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index bebed1a51b..2e1efb211f 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -589,11 +589,13 @@ def shutdown(self, *args, **kwargs): # Join all processes before exiting logger.debug("Joining on submit process") self._submit_process.join() + self._submit_process.close() logger.debug("Joining on collector thread") self._collector_thread.join() if self.worker_launch_method == 'factory': logger.debug("Joining on factory process") self._factory_process.join() + self._factory_process.close() # Shutdown multiprocessing queues self._ready_task_queue.close() diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index a1ad49bca9..ae39f8c118 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -704,6 +704,8 @@ def shutdown(self, *args, **kwargs): logger.debug("Joining on submit process") self.submit_process.join() + self.submit_process.close() + logger.debug("Joining on collector thread") self.collector_thread.join() From ec9bbf63807c2d55fea6a8fccbdfb9bec7077950 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 8 Aug 2024 17:00:10 +0200 Subject: [PATCH 36/38] Promote unable to terminate warning to logger.WARNING (#3574) Even if the subsequent SIGKILL works, this is an exceptional circumstance that should be logged. --- parsl/executors/high_throughput/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 1a56195c07..301052c4c5 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -832,7 +832,7 @@ def shutdown(self, timeout: float = 10.0): try: self.interchange_proc.wait(timeout=timeout) except subprocess.TimeoutExpired: - logger.info("Unable to terminate Interchange process; sending SIGKILL") + logger.warning("Unable to terminate Interchange process; sending SIGKILL") self.interchange_proc.kill() logger.info("Closing ZMQ pipes") From 03e94c3619943db468feb25051f7b7e2c9933f09 Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Fri, 9 Aug 2024 15:22:30 -0500 Subject: [PATCH 37/38] Adding notes on `available_accelerators` (#3577) * Adding notes on how to specify list of strings to available_accelerators * Clarify how to bind multiple GPUs to workers --- docs/userguide/configuring.rst | 43 ++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index bb3a3949e3..f3fe5cc407 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -306,9 +306,13 @@ and Work Queue does not require Python to run. Accelerators ------------ -Many modern clusters provide multiple accelerators per compute note, yet many applications are best suited to using a single accelerator per task. -Parsl supports pinning each worker to difference accelerators using ``available_accelerators`` option of the :class:`~parsl.executors.HighThroughputExecutor`. -Provide either the number of executors (Parsl will assume they are named in integers starting from zero) or a list of the names of the accelerators available on the node. +Many modern clusters provide multiple accelerators per compute note, yet many applications are best suited to using a +single accelerator per task. Parsl supports pinning each worker to different accelerators using +``available_accelerators`` option of the :class:`~parsl.executors.HighThroughputExecutor`. Provide either the number of +executors (Parsl will assume they are named in integers starting from zero) or a list of the names of the accelerators +available on the node. Parsl will limit the number of workers it launches to the number of accelerators specified, +in other words, you cannot have more workers per node than there are accelerators. By default, Parsl will launch +as many workers as the accelerators specified via ``available_accelerators``. .. code-block:: python @@ -327,7 +331,38 @@ Provide either the number of executors (Parsl will assume they are named in inte strategy='none', ) -For hardware that uses Nvidia devices, Parsl allows for the oversubscription of workers to GPUS. This is intended to make use of Nvidia's `Multi-Process Service (MPS) `_ available on many of their GPUs that allows users to run multiple concurrent processes on a single GPU. The user needs to set in the ``worker_init`` commands to start MPS on every node in the block (this is machine dependent). The ``available_accelerators`` option should then be set to the total number of GPU partitions run on a single node in the block. For example, for a node with 4 Nvidia GPUs, to create 8 workers per GPU, set ``available_accelerators=32``. GPUs will be assigned to workers in ascending order in contiguous blocks. In the example, workers 0-7 will be placed on GPU 0, workers 8-15 on GPU 1, workers 16-23 on GPU 2, and workers 24-31 on GPU 3. +It is possible to bind multiple/specific accelerators to each worker by specifying a list of comma separated strings +each specifying accelerators. In the context of binding to NVIDIA GPUs, this works by setting ``CUDA_VISIBLE_DEVICES`` +on each worker to a specific string in the list supplied to ``available_accelerators``. + +Here's an example: + +.. code-block:: python + + # The following config is trimmed for clarity + local_config = Config( + executors=[ + HighThroughputExecutor( + # Starts 2 workers per node, each bound to 2 GPUs + available_accelerators=["0,1", "2,3"], + + # Start a single worker bound to all 4 GPUs + # available_accelerators=["0,1,2,3"] + ) + ], + ) + +GPU Oversubscription +"""""""""""""""""""" + +For hardware that uses Nvidia devices, Parsl allows for the oversubscription of workers to GPUS. This is intended to +make use of Nvidia's `Multi-Process Service (MPS) `_ available on many of their +GPUs that allows users to run multiple concurrent processes on a single GPU. The user needs to set in the +``worker_init`` commands to start MPS on every node in the block (this is machine dependent). The +``available_accelerators`` option should then be set to the total number of GPU partitions run on a single node in the +block. For example, for a node with 4 Nvidia GPUs, to create 8 workers per GPU, set ``available_accelerators=32``. +GPUs will be assigned to workers in ascending order in contiguous blocks. In the example, workers 0-7 will be placed +on GPU 0, workers 8-15 on GPU 1, workers 16-23 on GPU 2, and workers 24-31 on GPU 3. Multi-Threaded Applications --------------------------- From 2067b407bbf6e0d9d9ab66ab5b2393642907a1ae Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 10 Aug 2024 09:34:44 +0200 Subject: [PATCH 38/38] Convert monitoring type annotations to PEP-526 from comments (#3573) This is in preparation for future type work in the monitoring codebase (for example, see PR #3572). This PR does not claim that the types it is moving around are correct (and PR #3572 contains some instances where the types are incorrect). It is a purely syntactic PR. After this PR, $ git grep '# type:' parsl/monitoring/ returns two remaining comment style annotations, which are 'type: ignore' exclusions not specific types. --- parsl/monitoring/db_manager.py | 20 ++++++++++---------- parsl/monitoring/remote.py | 8 ++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 9f19cd9f4d..8f9f302640 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -283,7 +283,7 @@ def __init__(self, ): self.workflow_end = False - self.workflow_start_message = None # type: Optional[MonitoringMessage] + self.workflow_start_message: Optional[MonitoringMessage] = None self.logdir = logdir os.makedirs(self.logdir, exist_ok=True) @@ -299,10 +299,10 @@ def __init__(self, self.batching_interval = batching_interval self.batching_threshold = batching_threshold - self.pending_priority_queue = queue.Queue() # type: queue.Queue[TaggedMonitoringMessage] - self.pending_node_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] - self.pending_block_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] - self.pending_resource_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] + self.pending_priority_queue: queue.Queue[TaggedMonitoringMessage] = queue.Queue() + self.pending_node_queue: queue.Queue[MonitoringMessage] = queue.Queue() + self.pending_block_queue: queue.Queue[MonitoringMessage] = queue.Queue() + self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue() def start(self, priority_queue: "queue.Queue[TaggedMonitoringMessage]", @@ -351,18 +351,18 @@ def start(self, If that happens, the message will be added to deferred_resource_messages and processed later. """ - inserted_tasks = set() # type: Set[object] + inserted_tasks: Set[object] = set() """ like inserted_tasks but for task,try tuples """ - inserted_tries = set() # type: Set[Any] + inserted_tries: Set[Any] = set() # for any task ID, we can defer exactly one message, which is the # assumed-to-be-unique first message (with first message flag set). # The code prior to this patch will discard previous message in # the case of multiple messages to defer. - deferred_resource_messages = {} # type: MonitoringMessage + deferred_resource_messages: MonitoringMessage = {} exception_happened = False @@ -505,7 +505,7 @@ def start(self, "Got {} messages from block queue".format(len(block_info_messages))) # block_info_messages is possibly a nested list of dict (at different polling times) # Each dict refers to the info of a job/block at one polling time - block_messages_to_insert = [] # type: List[Any] + block_messages_to_insert: List[Any] = [] for block_msg in block_info_messages: block_messages_to_insert.extend(block_msg) self._insert(table=BLOCK, messages=block_messages_to_insert) @@ -686,7 +686,7 @@ def _insert(self, table: str, messages: List[MonitoringMessage]) -> None: logger.exception("Rollback failed") def _get_messages_in_batch(self, msg_queue: "queue.Queue[X]") -> List[X]: - messages = [] # type: List[X] + messages: List[X] = [] start = time.time() while True: if time.time() - start >= self.batching_interval or len(messages) >= self.batching_threshold: diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 055a013627..d374338dee 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -199,10 +199,10 @@ def monitor(pid: int, pm = psutil.Process(pid) - children_user_time = {} # type: Dict[int, float] - children_system_time = {} # type: Dict[int, float] - children_num_ctx_switches_voluntary = {} # type: Dict[int, float] - children_num_ctx_switches_involuntary = {} # type: Dict[int, float] + children_user_time: Dict[int, float] = {} + children_system_time: Dict[int, float] = {} + children_num_ctx_switches_voluntary: Dict[int, float] = {} + children_num_ctx_switches_involuntary: Dict[int, float] = {} def accumulate_and_prepare() -> Dict[str, Any]: d = {"psutil_process_" + str(k): v for k, v in pm.as_dict().items() if k in simple}