Skip to content

Commit

Permalink
Merge branch 'master' into bug/3427
Browse files Browse the repository at this point in the history
  • Loading branch information
R1j1t authored May 23, 2024
2 parents 6434f15 + 23afcb1 commit 68b612e
Show file tree
Hide file tree
Showing 45 changed files with 342 additions and 217 deletions.
93 changes: 0 additions & 93 deletions CODE_OF_CONDUCT.md

This file was deleted.

2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ The `Parsl development team <https://github.com/orgs/Parsl/teams>`_ has the addi
Parsl development follows a common pull request-based workflow similar to `GitHub flow <http://scottchacon.com/2011/08/31/github-flow.html>`_. That is:

* every development activity (except very minor changes, which can be discussed in the PR) should have a related GitHub issue
* all development occurs in branches (named with a short descriptive name which includes the associated issue number, for example, `add-globus-transfer-#1`)
* all development occurs in branches (named with a short descriptive name, for example, `add-globus-transfer-#1`)
* the master branch is always stable
* development branches should include tests for added features
* development branches should be tested after being brought up-to-date with the master (in this way, what is being tested is what is actually going into the code; otherwise unexpected issues from merging may come up)
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ Parsl is supported in Python 3.8+. Requirements can be found `here <requirements
Code of Conduct
===============

Parsl seeks to foster an open and welcoming environment - Please see the `Parsl Code of Conduct <https://github.com/Parsl/parsl/blob/master/CODE_OF_CONDUCT.md>`_ for more details.
Parsl seeks to foster an open and welcoming environment - Please see the `Parsl Code of Conduct <https://github.com/Parsl/parsl?tab=coc-ov-file#parsl-code-of-conduct>`_ for more details.

Contributing
============
Expand Down
3 changes: 3 additions & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ Core
parsl.dataflow.futures.AppFuture
parsl.dataflow.dflow.DataFlowKernelLoader
parsl.monitoring.MonitoringHub
parsl.dataflow.dependency_resolvers.DependencyResolver
parsl.dataflow.dependency_resolvers.DEEP_DEPENDENCY_RESOLVER
parsl.dataflow.dependency_resolvers.SHALLOW_DEPENDENCY_RESOLVER

Configuration
=============
Expand Down
16 changes: 16 additions & 0 deletions docs/userguide/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,19 @@ from invoking a Parsl app. This includes as the return value of a

An specific example of this is integrating Globus Compute tasks into a Parsl
task graph. See :ref:`label-join-globus-compute`

Dependency resolution
---------------------

When Parsl examines the arguments to an app, it uses a `DependencyResolver`.
The default `DependencyResolver` will cause Parsl to wait for
``concurrent.futures.Future`` instances (including `AppFuture` and
`DataFuture`), and pass through other arguments without waiting.

This behaviour is pluggable: Parsl comes with another dependency resolver,
`DEEP_DEPENDENCY_RESOLVER` which knows about futures contained with structures
such as tuples, lists, sets and dicts.

This plugin interface might be used to interface other task-like or future-like
objects to the Parsl dependency mechanism, by describing how they can be
interpreted as a Future.
8 changes: 7 additions & 1 deletion parsl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing_extensions import Literal

from parsl.utils import RepresentationMixin
from parsl.dataflow.dependency_resolvers import DependencyResolver
from parsl.executors.base import ParslExecutor
from parsl.executors.threads import ThreadPoolExecutor
from parsl.errors import ConfigurationError
Expand Down Expand Up @@ -35,6 +36,8 @@ class Config(RepresentationMixin, UsageInformation):
checkpoint_period : str, optional
Time interval (in "HH:MM:SS") at which to checkpoint completed tasks. Only has an effect if
``checkpoint_mode='periodic'``.
dependency_resolver: plugin point for custom dependency resolvers. Default: only resolve Futures,
using the `SHALLOW_DEPENDENCY_RESOLVER`.
garbage_collect : bool. optional.
Delete task records from DFK when tasks have completed. Default: True
internal_tasks_max_threads : int, optional
Expand Down Expand Up @@ -88,6 +91,7 @@ def __init__(self,
Literal['dfk_exit'],
Literal['manual']] = None,
checkpoint_period: Optional[str] = None,
dependency_resolver: Optional[DependencyResolver] = None,
garbage_collect: bool = True,
internal_tasks_max_threads: int = 10,
retries: int = 0,
Expand Down Expand Up @@ -123,6 +127,7 @@ def __init__(self,
if checkpoint_mode == 'periodic' and checkpoint_period is None:
checkpoint_period = "00:30:00"
self.checkpoint_period = checkpoint_period
self.dependency_resolver = dependency_resolver
self.garbage_collect = garbage_collect
self.internal_tasks_max_threads = internal_tasks_max_threads
self.retries = retries
Expand Down Expand Up @@ -152,4 +157,5 @@ def _validate_executors(self) -> None:
', '.join(['label={}'.format(repr(d)) for d in duplicates])))

def get_usage_information(self):
return {"executors_len": len(self.executors)}
return {"executors_len": len(self.executors),
"dependency_resolver": self.dependency_resolver is not None}
115 changes: 115 additions & 0 deletions parsl/dataflow/dependency_resolvers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from concurrent.futures import Future
from dataclasses import dataclass
from functools import singledispatch
from typing import Callable


@dataclass
class DependencyResolver:
"""A DependencyResolver describes how app dependencies can be resolved.
It is specified as two functions: `traverse_to_gather` which turns an
app parameter into a list of futures which must be waited for before
the task can be executed (for example, in the case of
`DEEP_DEPENDENCY_RESOLVER` this traverses structures such as lists to
find every contained ``Future``), and `traverse_to_unwrap` which turns an
app parameter into its value to be passed to the app on execution
(for example in the case of `DEEP_DEPENDENCY_RESOLVER` this replaces a
list containing futures with a new list containing the values of those
resolved futures).
By default, Parsl will use `SHALLOW_DEPENDENCY_RESOLVER` which only
resolves Futures passed directly as arguments.
"""
traverse_to_gather: Callable
traverse_to_unwrap: Callable


@singledispatch
def shallow_traverse_to_gather(o):
# objects in general do not expose futures that we can see
return []


@singledispatch
def shallow_traverse_to_unwrap(o):
# objects in general unwrap to themselves
return o


@shallow_traverse_to_gather.register
def _(fut: Future):
return [fut]


@shallow_traverse_to_unwrap.register
@singledispatch
def _(fut: Future):
return fut.result()


@singledispatch
def deep_traverse_to_gather(o):
# objects in general do not expose futures that we can see
return []


@singledispatch
def deep_traverse_to_unwrap(o):
# objects in general unwrap to themselves
return o


@deep_traverse_to_gather.register
def _(fut: Future):
return [fut]


@deep_traverse_to_unwrap.register
@singledispatch
def _(fut: Future):
return fut.result()


@deep_traverse_to_gather.register(tuple)
@deep_traverse_to_gather.register(list)
@deep_traverse_to_gather.register(set)
def _(iterable):
return [e for v in iterable for e in deep_traverse_to_gather(v) if isinstance(e, Future)]


@deep_traverse_to_unwrap.register(tuple)
@deep_traverse_to_unwrap.register(list)
@deep_traverse_to_unwrap.register(set)
@singledispatch
def _(iterable):

type_ = type(iterable)
return type_(map(deep_traverse_to_unwrap, iterable))


@deep_traverse_to_gather.register(dict)
def _(dictionary):
futures = []
for key, value in dictionary.items():
if isinstance(key, Future):
futures.append(key)
if isinstance(value, Future):
futures.append(value)
return futures


@deep_traverse_to_unwrap.register(dict)
def _(dictionary):
unwrapped_dict = {}
for key, value in dictionary.items():
key = deep_traverse_to_unwrap(key)
value = deep_traverse_to_unwrap(value)
unwrapped_dict[key] = value
return unwrapped_dict


DEEP_DEPENDENCY_RESOLVER = DependencyResolver(traverse_to_gather=deep_traverse_to_gather,
traverse_to_unwrap=deep_traverse_to_unwrap)

SHALLOW_DEPENDENCY_RESOLVER = DependencyResolver(traverse_to_gather=shallow_traverse_to_gather,
traverse_to_unwrap=shallow_traverse_to_unwrap)
Loading

0 comments on commit 68b612e

Please sign in to comment.