From eb3e5fc53b14c3133c82376006bcf79def18593c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sun, 5 May 2024 13:18:03 +0000 Subject: [PATCH 01/16] sketch of core-aware scaling for wq --- parsl/executors/workqueue/executor.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 67628dacb2..920d13f431 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -245,12 +245,14 @@ def __init__(self, full_debug: bool = True, worker_executable: str = 'work_queue_worker', function_dir: Optional[str] = None, - coprocess: bool = False): + coprocess: bool = False, + scaling_assume_core_slots_per_worker: int = 1): BlockProviderExecutor.__init__(self, provider=provider, block_error_handler=True) if not _work_queue_enabled: raise OptionalModuleMissing(['work_queue'], "WorkQueueExecutor requires the work_queue module.") + self.scaling_assume_core_slots_per_worker = scaling_assume_core_slots_per_worker self.label = label self.task_queue = multiprocessing.Queue() # type: multiprocessing.Queue self.collector_queue = multiprocessing.Queue() # type: multiprocessing.Queue @@ -470,6 +472,8 @@ def submit(self, func, resource_specification, *args, **kwargs): # Create a Future object and have it be mapped from the task ID in the tasks dictionary fu = Future() fu.parsl_executor_task_id = executor_task_id + assert isinstance(resource_specification, dict) + fu.resource_specification = resource_specification logger.debug("Getting tasks_lock to set WQ-level task entry") with self.tasks_lock: logger.debug("Got tasks_lock to set WQ-level task entry") @@ -655,20 +659,31 @@ def initialize_scaling(self): @property def outstanding(self) -> int: - """Count the number of outstanding tasks. This is inefficiently + """Count the number of outstanding slots required. This is inefficiently implemented and probably could be replaced with a counter. """ + logger.debug("Calculating outstanding task slot load") outstanding = 0 + tasks = 0 # only for log message... with self.tasks_lock: for fut in self.tasks.values(): if not fut.done(): - outstanding += 1 - logger.debug(f"Counted {outstanding} outstanding tasks") + # if a task defines a resource spec with a core count, use the number + # of cores as required task slot count, instead of 1 slot. + assert isinstance(fut.resource_specification, dict) # type: ignore[attr-defined] + + # TODO: functional style defaulting to 1...? + if 'cores' in fut.resource_specification: # type: ignore[attr-defined] + outstanding += fut.resource_specification['cores'] # type: ignore[attr-defined] + else: + outstanding += 1 + tasks += 1 + logger.debug(f"Counted {tasks} outstanding tasks with {outstanding} outstanding slots") return outstanding @property def workers_per_node(self) -> Union[int, float]: - return 1 + return self.scaling_assume_core_slots_per_worker def scale_in(self, count: int) -> List[str]: """Scale in method. From dfaba5812fd4328d8e9aa18af777eff5613e959f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 4 Jun 2024 07:57:43 +0000 Subject: [PATCH 02/16] Use whole worker when cores not defined, to align with actual WQ behaviour --- parsl/executors/workqueue/executor.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index f15c47e8d0..e87e0262cd 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -671,11 +671,8 @@ def outstanding(self) -> int: # of cores as required task slot count, instead of 1 slot. assert isinstance(fut.resource_specification, dict) # type: ignore[attr-defined] - # TODO: functional style defaulting to 1...? - if 'cores' in fut.resource_specification: # type: ignore[attr-defined] - outstanding += fut.resource_specification['cores'] # type: ignore[attr-defined] - else: - outstanding += 1 + outstanding += fut.resource_specification.get('cores', # type: ignore[attr-defined] + self.scaling_assume_core_slots_per_worker) tasks += 1 logger.debug(f"Counted {tasks} outstanding tasks with {outstanding} outstanding slots") return outstanding From 34f3af44f1c11e9a5b9e55b14bcbf929b8891609 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 5 Jun 2024 09:42:18 +0000 Subject: [PATCH 03/16] Add a fairly obvious assert on block length --- parsl/executors/status_handling.py | 1 + 1 file changed, 1 insertion(+) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 4d29439670..3d4ec93f44 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -233,6 +233,7 @@ def _get_block_and_job_ids(self) -> Tuple[List[str], List[Any]]: job_ids = [] # types: List[Any] for bid in block_ids: job_ids.append(self.blocks_to_job_id[bid]) + assert len(block_ids) == len(job_ids) return block_ids, job_ids @abstractproperty From 6ea226678e30dfc7a2fef5c6c007cab9c2278b8b Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 5 Jun 2024 09:42:35 +0000 Subject: [PATCH 04/16] add some debugging around block choice for wq scale in --- parsl/executors/workqueue/executor.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 0b931bbc31..d2dbd93906 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -672,13 +672,25 @@ def workers_per_node(self) -> Union[int, float]: def scale_in(self, count: int) -> List[str]: """Scale in method. """ + logger.debug("Number of jobs requested for scale in: %s", count) + logger.debug("Number of jobs in blocks_to_job_id map: %s", len(self.blocks_to_job_id)) + # Obtain list of blocks to kill to_kill = list(self.blocks_to_job_id.keys())[:count] + + logger.debug("List of blocks to scale in: %s", to_kill) + for block_id in to_kill: + if block_id in self._status: + logger.debug("status of block %s is %s", block_id, self._status[block_id]) + else: + logger.debug("block %s has no recorded status", block_id) + kill_ids = [self.blocks_to_job_id[block] for block in to_kill] # Cancel the blocks provisioned if self.provider: logger.info(f"Scaling in jobs: {kill_ids}") + r = self.provider.cancel(kill_ids) job_ids = self._filter_scale_in_ids(kill_ids, r) block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] From 77fbb4ca8b81e12a6fb267524c9a6b6c3e920b59 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 5 Jun 2024 09:48:02 +0000 Subject: [PATCH 05/16] Add a bit of simulated status info --- parsl/executors/status_handling.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 3d4ec93f44..d5c053b364 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -123,6 +123,8 @@ def status(self) -> Dict[str, JobStatus]: status = self._make_status_dict(block_ids, self._provider.status(job_ids)) else: status = {} + + logger.debug("Adding these simulated status entries onto provider-returned status: %s", self._simulated_status) status.update(self._simulated_status) return status @@ -301,6 +303,8 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[ for block_id in block_ids: new_status[block_id] = JobStatus(JobState.CANCELLED) del self._status[block_id] + if block_id in self._simulated_status: + logger.warning("block %s is in simulated status: not properly deleted", block_id) self.send_monitoring_info(new_status) return block_ids From 49203a9dc5d04ccee4f1ebf34920be80ed07ea4f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 5 Jun 2024 14:03:13 +0000 Subject: [PATCH 06/16] Add a stack-depth test --- .../test_python_apps/test_dependency_deep.py | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 parsl/tests/test_python_apps/test_dependency_deep.py diff --git a/parsl/tests/test_python_apps/test_dependency_deep.py b/parsl/tests/test_python_apps/test_dependency_deep.py new file mode 100644 index 0000000000..87d05096c4 --- /dev/null +++ b/parsl/tests/test_python_apps/test_dependency_deep.py @@ -0,0 +1,47 @@ +import inspect +from concurrent.futures import Future +from typing import Any, Callable, Dict + +import pytest + +import parsl +from parsl.executors.base import ParslExecutor + +N = 100 +MAX_STACK = 50 + +def local_config(): + return parsl.Config(executors=[ImmediateExecutor()]) + +class ImmediateExecutor(ParslExecutor): + def start(self): + pass + + def shutdown(self): + pass + + def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future: + stack_depth = len(inspect.stack()) + assert stack_depth < MAX_STACK, "tasks should not be launched deep in the Python stack" + fut = Future() + res = func(*args, **kwargs) + fut.set_result(res) + return fut + +@parsl.python_app +def chain(upstream): + stack_depth = len(inspect.stack()) + assert stack_depth < MAX_STACK, "chained dependencies should not be launched deep in the Python stack" + + +@pytest.mark.local +def test_deep_dependency_stack_depth(): + + fut = Future() + here = fut + + for _ in range(N): + here = chain(here) + + fut.set_result(None) + here.result() From b9eba7a4d609cd3d1cfde9808d7dd26e4580cf35 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 5 Jun 2024 14:24:09 +0000 Subject: [PATCH 07/16] Add notes on test constant --- parsl/tests/test_python_apps/test_dependency_deep.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/parsl/tests/test_python_apps/test_dependency_deep.py b/parsl/tests/test_python_apps/test_dependency_deep.py index 87d05096c4..8c3a0a2a1d 100644 --- a/parsl/tests/test_python_apps/test_dependency_deep.py +++ b/parsl/tests/test_python_apps/test_dependency_deep.py @@ -7,7 +7,16 @@ import parsl from parsl.executors.base import ParslExecutor +# N is the number of tasks to chain +# With mid-2024 Parsl, N>140 causes Parsl to hang N = 100 + +# MAX_STACK is the maximum Python stack depth allowed for either +# task submission to an executor or execution of a task. +# With mid-2024 Parsl, 2-3 stack entries will be used per +# recursively launched parsl task. So this should be smaller than +# 2*N, but big enough to allow regular pytest+parsl stuff to +# happen. MAX_STACK = 50 def local_config(): From d0b369b3471b3be0cfc1f6938e08ed4ba541b791 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 5 Jun 2024 14:24:57 +0000 Subject: [PATCH 08/16] fix flake8, mypy --- parsl/tests/test_python_apps/test_dependency_deep.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/parsl/tests/test_python_apps/test_dependency_deep.py b/parsl/tests/test_python_apps/test_dependency_deep.py index 8c3a0a2a1d..c728e1246e 100644 --- a/parsl/tests/test_python_apps/test_dependency_deep.py +++ b/parsl/tests/test_python_apps/test_dependency_deep.py @@ -17,11 +17,13 @@ # recursively launched parsl task. So this should be smaller than # 2*N, but big enough to allow regular pytest+parsl stuff to # happen. -MAX_STACK = 50 +MAX_STACK = 50 + def local_config(): return parsl.Config(executors=[ImmediateExecutor()]) + class ImmediateExecutor(ParslExecutor): def start(self): pass @@ -32,11 +34,12 @@ def shutdown(self): def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future: stack_depth = len(inspect.stack()) assert stack_depth < MAX_STACK, "tasks should not be launched deep in the Python stack" - fut = Future() + fut: Future[None] = Future() res = func(*args, **kwargs) fut.set_result(res) return fut + @parsl.python_app def chain(upstream): stack_depth = len(inspect.stack()) From 0e978eb1b2386fa97b6bfee11f39b22f2d2a47c5 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 5 Jun 2024 16:35:45 +0000 Subject: [PATCH 09/16] WIP: dependency launcher pool --- parsl/dataflow/dflow.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index dffa7e52fd..86d91c0d21 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -11,6 +11,7 @@ import sys import threading import time +import concurrent.futures as cf from concurrent.futures import Future from functools import partial from getpass import getuser @@ -209,6 +210,8 @@ def __init__(self, config: Config) -> None: self.tasks: Dict[int, TaskRecord] = {} self.submitter_lock = threading.Lock() + self.dependency_launch_pool = cf.ThreadPoolExecutor(max_workers=1, thread_name_prefix="Dependency-Launch") + self.dependency_resolver = self.config.dependency_resolver if self.config.dependency_resolver is not None \ else SHALLOW_DEPENDENCY_RESOLVER @@ -1271,6 +1274,10 @@ def cleanup(self) -> None: self.monitoring.close() logger.info("Terminated monitoring") + logger.info("Terminating dependency launch pool") + self.dependency_launch_pool.shutdown(cancel_futures=True) + logger.info("Terminated dependency launch pool") + logger.info("Unregistering atexit hook") atexit.unregister(self.atexit_cleanup) logger.info("Unregistered atexit hook") From 3764b931a8891b6679a946b177b919dddd523818 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 7 Jun 2024 08:26:21 +0000 Subject: [PATCH 10/16] perform launch_if_ready asynchronously, at start of a new stack ... rather than recursively deep in a dependency processing chain probably some change in performance? launch_if_ready was already intended to be launched multiple times from multiple threads. this PR might make the invocation to launch_if_ready happen a bit later, but correctness-wise that should be fine: a task can only become *more* ready to run, not less *ready*. --- parsl/dataflow/dflow.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 86d91c0d21..5b2502ceec 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -614,6 +614,13 @@ def check_staging_inhibited(kwargs: Dict[str, Any]) -> bool: return kwargs.get('_parsl_staging_inhibit', False) def launch_if_ready(self, task_record: TaskRecord) -> None: + """schedules a task record for re-inspection to see if it is ready + for launch. The call will return immediately, asynchronous to + whether than check and launch has happened or not. + """ + self.dependency_launch_pool.submit(self._launch_if_ready_async, task_record) + + def _launch_if_ready_async(self, task_record: TaskRecord) -> None: """ launch_if_ready will launch the specified task, if it is ready to run (for example, without dependencies, and in pending state). From 7ad35bb039ad8ecfc7a43f397a0ec05dace12b58 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 7 Jun 2024 08:36:19 +0000 Subject: [PATCH 11/16] use _status not block ID table --- parsl/executors/workqueue/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 5652ff0e6e..23969803a1 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -688,7 +688,7 @@ def scale_in(self, count: int) -> List[str]: logger.debug("Number of jobs in blocks_to_job_id map: %s", len(self.blocks_to_job_id)) # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:count] + to_kill = list(self._status.keys())[:count] logger.debug("List of blocks to scale in: %s", to_kill) for block_id in to_kill: From f5b27f2ceaea934b722139112fe7ac9ae3a3396e Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 7 Jun 2024 02:09:53 -0700 Subject: [PATCH 12/16] Filter on non-terminal blocks, like htex does --- parsl/executors/workqueue/executor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 23969803a1..4612128310 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -31,6 +31,7 @@ from parsl.executors.errors import ExecutorError from parsl.executors.status_handling import BlockProviderExecutor from parsl.executors.workqueue import exec_parsl_function +from parsl.jobs.states import TERMINAL_STATES from parsl.process_loggers import wrap_with_logs from parsl.providers import CondorProvider, LocalProvider from parsl.providers.base import ExecutionProvider @@ -685,10 +686,12 @@ def scale_in(self, count: int) -> List[str]: """Scale in method. """ logger.debug("Number of jobs requested for scale in: %s", count) - logger.debug("Number of jobs in blocks_to_job_id map: %s", len(self.blocks_to_job_id)) + + candidate_blocks = [block_id for block_id, job_status in self._status.items() if job_status.state not in TERMINAL_STATES] + logger.debug("scale in candidate blocks: %s", candidate_blocks) # Obtain list of blocks to kill - to_kill = list(self._status.keys())[:count] + to_kill = candidate_blocks[:count] logger.debug("List of blocks to scale in: %s", to_kill) for block_id in to_kill: From 324465a89e2b7279f73cbbc9e9389808f73d3103 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 7 Jun 2024 09:19:03 +0000 Subject: [PATCH 13/16] isort --- parsl/dataflow/dflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 5b2502ceec..ad14094c23 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1,6 +1,7 @@ from __future__ import annotations import atexit +import concurrent.futures as cf import datetime import inspect import logging @@ -11,7 +12,6 @@ import sys import threading import time -import concurrent.futures as cf from concurrent.futures import Future from functools import partial from getpass import getuser From dce6eb3dc76a8a12607edaa12c726b28d8d3f7f8 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 7 Jun 2024 09:27:12 +0000 Subject: [PATCH 14/16] Remove cancel futures because it needs Python 3.9 --- parsl/dataflow/dflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index ad14094c23..a81f8ac967 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1282,7 +1282,7 @@ def cleanup(self) -> None: logger.info("Terminated monitoring") logger.info("Terminating dependency launch pool") - self.dependency_launch_pool.shutdown(cancel_futures=True) + self.dependency_launch_pool.shutdown() logger.info("Terminated dependency launch pool") logger.info("Unregistering atexit hook") From 464c71ee2d968c98cb1c15c2e229df1fed371000 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 7 Jun 2024 09:31:50 +0000 Subject: [PATCH 15/16] rename for consistency with other test --- .../{test_dependency_deep.py => test_dependencies_deep.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename parsl/tests/test_python_apps/{test_dependency_deep.py => test_dependencies_deep.py} (100%) diff --git a/parsl/tests/test_python_apps/test_dependency_deep.py b/parsl/tests/test_python_apps/test_dependencies_deep.py similarity index 100% rename from parsl/tests/test_python_apps/test_dependency_deep.py rename to parsl/tests/test_python_apps/test_dependencies_deep.py From d6930a4602a5ee9596d2104eebf9ea2d9a442c81 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 7 Jun 2024 09:39:40 +0000 Subject: [PATCH 16/16] Logging and docstrings --- parsl/dataflow/dflow.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index a81f8ac967..3f9a2316ca 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -614,16 +614,9 @@ def check_staging_inhibited(kwargs: Dict[str, Any]) -> bool: return kwargs.get('_parsl_staging_inhibit', False) def launch_if_ready(self, task_record: TaskRecord) -> None: - """schedules a task record for re-inspection to see if it is ready - for launch. The call will return immediately, asynchronous to - whether than check and launch has happened or not. - """ - self.dependency_launch_pool.submit(self._launch_if_ready_async, task_record) - - def _launch_if_ready_async(self, task_record: TaskRecord) -> None: - """ - launch_if_ready will launch the specified task, if it is ready - to run (for example, without dependencies, and in pending state). + """Schedules a task record for re-inspection to see if it is ready + for launch and for launch if it is ready. The call will return + immediately. This should be called by any piece of the DataFlowKernel that thinks a task may have become ready to run. @@ -632,13 +625,17 @@ def _launch_if_ready_async(self, task_record: TaskRecord) -> None: ready to run - launch_if_ready will not incorrectly launch that task. - It is also not an error to call launch_if_ready on a task that has - already been launched - launch_if_ready will not re-launch that - task. - launch_if_ready is thread safe, so may be called from any thread or callback. """ + self.dependency_launch_pool.submit(self._launch_if_ready_async, task_record) + + @wrap_with_logs + def _launch_if_ready_async(self, task_record: TaskRecord) -> None: + """ + _launch_if_ready will launch the specified task, if it is ready + to run (for example, without dependencies, and in pending state). + """ exec_fu = None task_id = task_record['id']