Skip to content

Commit

Permalink
Merge branches 'benc-tmp-sander' and 'benc-3472-recursion' into benc-…
Browse files Browse the repository at this point in the history
…sander-paper
  • Loading branch information
benclifford committed Jun 7, 2024
2 parents f5b27f2 + 3764b93 commit b342639
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
14 changes: 14 additions & 0 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sys
import threading
import time
import concurrent.futures as cf
from concurrent.futures import Future
from functools import partial
from getpass import getuser
Expand Down Expand Up @@ -209,6 +210,8 @@ def __init__(self, config: Config) -> None:
self.tasks: Dict[int, TaskRecord] = {}
self.submitter_lock = threading.Lock()

self.dependency_launch_pool = cf.ThreadPoolExecutor(max_workers=1, thread_name_prefix="Dependency-Launch")

self.dependency_resolver = self.config.dependency_resolver if self.config.dependency_resolver is not None \
else SHALLOW_DEPENDENCY_RESOLVER

Expand Down Expand Up @@ -611,6 +614,13 @@ def check_staging_inhibited(kwargs: Dict[str, Any]) -> bool:
return kwargs.get('_parsl_staging_inhibit', False)

def launch_if_ready(self, task_record: TaskRecord) -> None:
"""schedules a task record for re-inspection to see if it is ready
for launch. The call will return immediately, asynchronous to
whether than check and launch has happened or not.
"""
self.dependency_launch_pool.submit(self._launch_if_ready_async, task_record)

def _launch_if_ready_async(self, task_record: TaskRecord) -> None:
"""
launch_if_ready will launch the specified task, if it is ready
to run (for example, without dependencies, and in pending state).
Expand Down Expand Up @@ -1271,6 +1281,10 @@ def cleanup(self) -> None:
self.monitoring.close()
logger.info("Terminated monitoring")

logger.info("Terminating dependency launch pool")
self.dependency_launch_pool.shutdown(cancel_futures=True)
logger.info("Terminated dependency launch pool")

logger.info("Unregistering atexit hook")
atexit.unregister(self.atexit_cleanup)
logger.info("Unregistered atexit hook")
Expand Down
59 changes: 59 additions & 0 deletions parsl/tests/test_python_apps/test_dependency_deep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import inspect
from concurrent.futures import Future
from typing import Any, Callable, Dict

import pytest

import parsl
from parsl.executors.base import ParslExecutor

# N is the number of tasks to chain
# With mid-2024 Parsl, N>140 causes Parsl to hang
N = 100

# MAX_STACK is the maximum Python stack depth allowed for either
# task submission to an executor or execution of a task.
# With mid-2024 Parsl, 2-3 stack entries will be used per
# recursively launched parsl task. So this should be smaller than
# 2*N, but big enough to allow regular pytest+parsl stuff to
# happen.
MAX_STACK = 50


def local_config():
return parsl.Config(executors=[ImmediateExecutor()])


class ImmediateExecutor(ParslExecutor):
def start(self):
pass

def shutdown(self):
pass

def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
stack_depth = len(inspect.stack())
assert stack_depth < MAX_STACK, "tasks should not be launched deep in the Python stack"
fut: Future[None] = Future()
res = func(*args, **kwargs)
fut.set_result(res)
return fut


@parsl.python_app
def chain(upstream):
stack_depth = len(inspect.stack())
assert stack_depth < MAX_STACK, "chained dependencies should not be launched deep in the Python stack"


@pytest.mark.local
def test_deep_dependency_stack_depth():

fut = Future()
here = fut

for _ in range(N):
here = chain(here)

fut.set_result(None)
here.result()

0 comments on commit b342639

Please sign in to comment.