diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index dffa7e52fd..5b2502ceec 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -11,6 +11,7 @@ import sys import threading import time +import concurrent.futures as cf from concurrent.futures import Future from functools import partial from getpass import getuser @@ -209,6 +210,8 @@ def __init__(self, config: Config) -> None: self.tasks: Dict[int, TaskRecord] = {} self.submitter_lock = threading.Lock() + self.dependency_launch_pool = cf.ThreadPoolExecutor(max_workers=1, thread_name_prefix="Dependency-Launch") + self.dependency_resolver = self.config.dependency_resolver if self.config.dependency_resolver is not None \ else SHALLOW_DEPENDENCY_RESOLVER @@ -611,6 +614,13 @@ def check_staging_inhibited(kwargs: Dict[str, Any]) -> bool: return kwargs.get('_parsl_staging_inhibit', False) def launch_if_ready(self, task_record: TaskRecord) -> None: + """schedules a task record for re-inspection to see if it is ready + for launch. The call will return immediately, asynchronous to + whether than check and launch has happened or not. + """ + self.dependency_launch_pool.submit(self._launch_if_ready_async, task_record) + + def _launch_if_ready_async(self, task_record: TaskRecord) -> None: """ launch_if_ready will launch the specified task, if it is ready to run (for example, without dependencies, and in pending state). @@ -1271,6 +1281,10 @@ def cleanup(self) -> None: self.monitoring.close() logger.info("Terminated monitoring") + logger.info("Terminating dependency launch pool") + self.dependency_launch_pool.shutdown(cancel_futures=True) + logger.info("Terminated dependency launch pool") + logger.info("Unregistering atexit hook") atexit.unregister(self.atexit_cleanup) logger.info("Unregistered atexit hook") diff --git a/parsl/tests/test_python_apps/test_dependency_deep.py b/parsl/tests/test_python_apps/test_dependency_deep.py new file mode 100644 index 0000000000..c728e1246e --- /dev/null +++ b/parsl/tests/test_python_apps/test_dependency_deep.py @@ -0,0 +1,59 @@ +import inspect +from concurrent.futures import Future +from typing import Any, Callable, Dict + +import pytest + +import parsl +from parsl.executors.base import ParslExecutor + +# N is the number of tasks to chain +# With mid-2024 Parsl, N>140 causes Parsl to hang +N = 100 + +# MAX_STACK is the maximum Python stack depth allowed for either +# task submission to an executor or execution of a task. +# With mid-2024 Parsl, 2-3 stack entries will be used per +# recursively launched parsl task. So this should be smaller than +# 2*N, but big enough to allow regular pytest+parsl stuff to +# happen. +MAX_STACK = 50 + + +def local_config(): + return parsl.Config(executors=[ImmediateExecutor()]) + + +class ImmediateExecutor(ParslExecutor): + def start(self): + pass + + def shutdown(self): + pass + + def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future: + stack_depth = len(inspect.stack()) + assert stack_depth < MAX_STACK, "tasks should not be launched deep in the Python stack" + fut: Future[None] = Future() + res = func(*args, **kwargs) + fut.set_result(res) + return fut + + +@parsl.python_app +def chain(upstream): + stack_depth = len(inspect.stack()) + assert stack_depth < MAX_STACK, "chained dependencies should not be launched deep in the Python stack" + + +@pytest.mark.local +def test_deep_dependency_stack_depth(): + + fut = Future() + here = fut + + for _ in range(N): + here = chain(here) + + fut.set_result(None) + here.result()