Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Launch tasks in from a shallow Python stack to avoid recursion errors #3478

Merged
merged 11 commits into from
Jun 10, 2024
25 changes: 18 additions & 7 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import atexit
import concurrent.futures as cf
import datetime
import inspect
import logging
Expand Down Expand Up @@ -209,6 +210,8 @@ def __init__(self, config: Config) -> None:
self.tasks: Dict[int, TaskRecord] = {}
self.submitter_lock = threading.Lock()

self.dependency_launch_pool = cf.ThreadPoolExecutor(max_workers=1, thread_name_prefix="Dependency-Launch")

self.dependency_resolver = self.config.dependency_resolver if self.config.dependency_resolver is not None \
else SHALLOW_DEPENDENCY_RESOLVER

Expand Down Expand Up @@ -626,9 +629,9 @@ def check_staging_inhibited(kwargs: Dict[str, Any]) -> bool:
return kwargs.get('_parsl_staging_inhibit', False)

def launch_if_ready(self, task_record: TaskRecord) -> None:
"""
launch_if_ready will launch the specified task, if it is ready
to run (for example, without dependencies, and in pending state).
"""Schedules a task record for re-inspection to see if it is ready
for launch and for launch if it is ready. The call will return
immediately.

This should be called by any piece of the DataFlowKernel that
thinks a task may have become ready to run.
Expand All @@ -637,13 +640,17 @@ def launch_if_ready(self, task_record: TaskRecord) -> None:
ready to run - launch_if_ready will not incorrectly launch that
task.

It is also not an error to call launch_if_ready on a task that has
already been launched - launch_if_ready will not re-launch that
task.

launch_if_ready is thread safe, so may be called from any thread
or callback.
"""
self.dependency_launch_pool.submit(self._launch_if_ready_async, task_record)

@wrap_with_logs
def _launch_if_ready_async(self, task_record: TaskRecord) -> None:
"""
_launch_if_ready will launch the specified task, if it is ready
to run (for example, without dependencies, and in pending state).
"""
exec_fu = None

task_id = task_record['id']
Expand Down Expand Up @@ -1286,6 +1293,10 @@ def cleanup(self) -> None:
self.monitoring.close()
logger.info("Terminated monitoring")

logger.info("Terminating dependency launch pool")
self.dependency_launch_pool.shutdown()
logger.info("Terminated dependency launch pool")

logger.info("Unregistering atexit hook")
atexit.unregister(self.atexit_cleanup)
logger.info("Unregistered atexit hook")
Expand Down
59 changes: 59 additions & 0 deletions parsl/tests/test_python_apps/test_dependencies_deep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import inspect
from concurrent.futures import Future
from typing import Any, Callable, Dict

import pytest

import parsl
from parsl.executors.base import ParslExecutor

# N is the number of tasks to chain
# With mid-2024 Parsl, N>140 causes Parsl to hang
N = 100

# MAX_STACK is the maximum Python stack depth allowed for either
# task submission to an executor or execution of a task.
# With mid-2024 Parsl, 2-3 stack entries will be used per
# recursively launched parsl task. So this should be smaller than
# 2*N, but big enough to allow regular pytest+parsl stuff to
# happen.
MAX_STACK = 50


def local_config():
return parsl.Config(executors=[ImmediateExecutor()])


class ImmediateExecutor(ParslExecutor):
def start(self):
pass

def shutdown(self):
pass

def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
stack_depth = len(inspect.stack())
assert stack_depth < MAX_STACK, "tasks should not be launched deep in the Python stack"
fut: Future[None] = Future()
res = func(*args, **kwargs)
fut.set_result(res)
return fut


@parsl.python_app
def chain(upstream):
stack_depth = len(inspect.stack())
assert stack_depth < MAX_STACK, "chained dependencies should not be launched deep in the Python stack"


@pytest.mark.local
def test_deep_dependency_stack_depth():

fut = Future()
here = fut

for _ in range(N):
here = chain(here)

fut.set_result(None)
here.result()
Loading