Skip to content

Commit

Permalink
Merge branch 'master' into issue_2908
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Oct 22, 2023
2 parents 8d64801 + 65cdfb1 commit bc38eb6
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 66 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ wqex_local_test: $(CCTOOLS_INSTALL) ## run all tests with workqueue_ex config

.PHONY: config_local_test
config_local_test:
pip3 install ".[monitoring,proxystore]"
pip3 install ".[monitoring,visualization,proxystore]"
pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10

.PHONY: site_test
Expand Down
1 change: 1 addition & 0 deletions docs/userguide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ User guide
monitoring
workflow
modularizing
lifted_ops
joins
usage_tracking
plugins
Expand Down
56 changes: 56 additions & 0 deletions docs/userguide/lifted_ops.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
.. _label-liftedops:

Lifted operators
================

Parsl allows some operators (``[]`` and ``.``) to be used on an AppFuture in
a way that makes sense with those operators on the eventually returned
result.

Lifted [] operator
------------------

When an app returns a complex structure such as a ``dict`` or a ``list``,
it is sometimes useful to pass an element of that structure to a subsequent
task, without waiting for that subsequent task to complete.

To help with this, Parsl allows the ``[]`` operator to be used on an
`AppFuture`. This operator will return another `AppFuture` that will
complete after the initial future, with the result of ``[]`` on the value
of the initial future.

The end result is that this assertion will hold:

.. code-block:: python
fut = my_app()
assert fut['x'].result() == fut.result()[x]
but more concurrency will be available, as execution of the main workflow
code will not stop to wait for ``result()`` to complete on the initial
future.

`AppFuture` does not implement other methods commonly associated with
dicts and lists, such as ``len``, because those methods should return a
specific type of result immediately, and that is not possible when the
results are not available until the future.

If a key does not exist in the returned result, then the exception will
appear in the Future returned by ``[]``, rather than at the point that
the ``[]`` operator is applied. This is because the valid values that can
be used are not known until the underlying result is available.

Lifted . operator
-----------------

The ``.`` operator works similarly to ``[]`` described above:

.. code-block:: python
fut = my_app
assert fut.x.result() == fut.result().x
Attributes beginning with ``_`` are not lifted as this usually indicates an
attribute that is used for internal purposes, and to try to avoid mixing
protocols (such as iteration in for loops) defined on AppFutures vs protocols
defined on the underlying result object.
2 changes: 1 addition & 1 deletion docs/userguide/monitoring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Visualization
To run the web dashboard utility ``parsl-visualize`` you first need to install
its dependencies:

$ pip install 'parsl[monitoring]'
$ pip install 'parsl[monitoring,visualization]'

To view the web dashboard while or after a Parsl program has executed, run
the ``parsl-visualize`` utility::
Expand Down
93 changes: 47 additions & 46 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,17 +451,17 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional
if isinstance(joinable, Future):
je = joinable.exception()
if je is not None:
if hasattr(joinable, 'task_def'):
tid = joinable.task_def['id']
if hasattr(joinable, 'task_record'):
tid = joinable.task_record['id']
else:
tid = None
exceptions_tids = [(je, tid)]
elif isinstance(joinable, list):
for future in joinable:
je = future.exception()
if je is not None:
if hasattr(joinable, 'task_def'):
tid = joinable.task_def['id']
if hasattr(joinable, 'task_record'):
tid = joinable.task_record['id']
else:
tid = None
exceptions_tids.append((je, tid))
Expand Down Expand Up @@ -854,8 +854,8 @@ def _unwrap_futures(self, args, kwargs):
try:
new_args.extend([dep.result()])
except Exception as e:
if hasattr(dep, 'task_def'):
tid = dep.task_def['id']
if hasattr(dep, 'task_record'):
tid = dep.task_record['id']
else:
tid = None
dep_failures.extend([(e, tid)])
Expand All @@ -869,8 +869,8 @@ def _unwrap_futures(self, args, kwargs):
try:
kwargs[key] = dep.result()
except Exception as e:
if hasattr(dep, 'task_def'):
tid = dep.task_def['id']
if hasattr(dep, 'task_record'):
tid = dep.task_record['id']
else:
tid = None
dep_failures.extend([(e, tid)])
Expand All @@ -883,8 +883,8 @@ def _unwrap_futures(self, args, kwargs):
try:
new_inputs.extend([dep.result()])
except Exception as e:
if hasattr(dep, 'task_def'):
tid = dep.task_def['id']
if hasattr(dep, 'task_record'):
tid = dep.task_record['id']
else:
tid = None
dep_failures.extend([(e, tid)])
Expand Down Expand Up @@ -967,50 +967,51 @@ def submit(self,

resource_specification = app_kwargs.get('parsl_resource_specification', {})

task_def: TaskRecord
task_def = {'depends': [],
'executor': executor,
'func_name': func.__name__,
'memoize': cache,
'hashsum': None,
'exec_fu': None,
'fail_count': 0,
'fail_cost': 0,
'fail_history': [],
'from_memo': None,
'ignore_for_cache': ignore_for_cache,
'join': join,
'joins': None,
'try_id': 0,
'id': task_id,
'time_invoked': datetime.datetime.now(),
'time_returned': None,
'try_time_launched': None,
'try_time_returned': None,
'resource_specification': resource_specification}

self.update_task_state(task_def, States.unsched)

app_fu = AppFuture(task_def)
task_record: TaskRecord
task_record = {'depends': [],
'dfk': self,
'executor': executor,
'func_name': func.__name__,
'memoize': cache,
'hashsum': None,
'exec_fu': None,
'fail_count': 0,
'fail_cost': 0,
'fail_history': [],
'from_memo': None,
'ignore_for_cache': ignore_for_cache,
'join': join,
'joins': None,
'try_id': 0,
'id': task_id,
'time_invoked': datetime.datetime.now(),
'time_returned': None,
'try_time_launched': None,
'try_time_returned': None,
'resource_specification': resource_specification}

self.update_task_state(task_record, States.unsched)

app_fu = AppFuture(task_record)

# Transform remote input files to data futures
app_args, app_kwargs, func = self._add_input_deps(executor, app_args, app_kwargs, func)

func = self._add_output_deps(executor, app_args, app_kwargs, app_fu, func)

task_def.update({
task_record.update({
'args': app_args,
'func': func,
'kwargs': app_kwargs,
'app_fu': app_fu})

assert task_id not in self.tasks

self.tasks[task_id] = task_def
self.tasks[task_id] = task_record

# Get the list of dependencies for the task
depends = self._gather_all_deps(app_args, app_kwargs)
task_def['depends'] = depends
task_record['depends'] = depends

depend_descs = []
for d in depends:
Expand All @@ -1025,16 +1026,16 @@ def submit(self,
waiting_message = "not waiting on any dependency"

logger.info("Task {} submitted for App {}, {}".format(task_id,
task_def['func_name'],
task_record['func_name'],
waiting_message))

task_def['task_launch_lock'] = threading.Lock()
task_record['task_launch_lock'] = threading.Lock()

app_fu.add_done_callback(partial(self.handle_app_update, task_def))
self.update_task_state(task_def, States.pending)
logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_def['app_fu']))
app_fu.add_done_callback(partial(self.handle_app_update, task_record))
self.update_task_state(task_record, States.pending)
logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu']))

self._send_task_log_info(task_def)
self._send_task_log_info(task_record)

# at this point add callbacks to all dependencies to do a launch_if_ready
# call whenever a dependency completes.
Expand All @@ -1051,14 +1052,14 @@ def submit(self,
for d in depends:

def callback_adapter(dep_fut: Future) -> None:
self.launch_if_ready(task_def)
self.launch_if_ready(task_record)

try:
d.add_done_callback(callback_adapter)
except Exception as e:
logger.error("add_done_callback got an exception {} which will be ignored".format(e))

self.launch_if_ready(task_def)
self.launch_if_ready(task_record)

return app_fu

Expand Down
48 changes: 39 additions & 9 deletions parsl/dataflow/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
2. AppFutures which represent the futures on App/Leaf tasks.
"""
from __future__ import annotations

from concurrent.futures import Future
import logging
import threading
from typing import Optional, Sequence
from typing import Any, Optional, Sequence

import parsl.app.app as app

from parsl.app.futures import DataFuture
from parsl.dataflow.taskrecord import TaskRecord
Expand Down Expand Up @@ -59,32 +62,32 @@ class AppFuture(Future):
"""

def __init__(self, task_def: TaskRecord) -> None:
def __init__(self, task_record: TaskRecord) -> None:
"""Initialize the AppFuture.
Args:
KWargs:
- task_def : The DFK task definition dictionary for the task represented
by this future.
- task_record : The TaskRecord for the task represented by
this future.
"""
super().__init__()
self._update_lock = threading.Lock()
self._outputs: Sequence[DataFuture]
self._outputs = []
self.task_def = task_def
self.task_record = task_record

@property
def stdout(self) -> Optional[str]:
return self.task_def['kwargs'].get('stdout')
return self.task_record['kwargs'].get('stdout')

@property
def stderr(self) -> Optional[str]:
return self.task_def['kwargs'].get('stderr')
return self.task_record['kwargs'].get('stderr')

@property
def tid(self) -> int:
return self.task_def['id']
return self.task_record['id']

def cancel(self) -> bool:
raise NotImplementedError("Cancel not implemented")
Expand Down Expand Up @@ -113,8 +116,35 @@ def task_status(self) -> str:
Returns: str
"""
return self.task_def['status'].name
return self.task_record['status'].name

@property
def outputs(self) -> Sequence[DataFuture]:
return self._outputs

def __getitem__(self, key: Any) -> AppFuture:
# This is decorated on each invocation because the getitem task
# should be bound to the same DFK as the task associated with this
# Future.
deferred_getitem_app = app.python_app(deferred_getitem, executors=['_parsl_internal'], data_flow_kernel=self.task_record['dfk'])

return deferred_getitem_app(self, key)

def __getattr__(self, name: str) -> AppFuture:
# this will avoid lifting behaviour on private methods and attributes,
# including __double_underscore__ methods which implement other
# Python syntax (such as iterators in for loops)
if name.startswith("_"):
raise AttributeError()

deferred_getattr_app = app.python_app(deferred_getattr, executors=['_parsl_internal'], data_flow_kernel=self.task_record['dfk'])

return deferred_getattr_app(self, name)


def deferred_getitem(o: Any, k: Any) -> Any:
return o[k]


def deferred_getattr(o: Any, name: str) -> Any:
return getattr(o, name)
7 changes: 7 additions & 0 deletions parsl/dataflow/taskrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,25 @@
from typing_extensions import TypedDict
from concurrent.futures import Future


# only for type checking:
from typing import Any, Callable, Dict, Optional, List, Sequence, TYPE_CHECKING, Union

if TYPE_CHECKING:
from parsl.dataflow.futures import AppFuture

import parsl.dataflow.dflow as dflow

from parsl.dataflow.states import States


class TaskRecord(TypedDict, total=False):
"""This stores most information about a Parsl task"""

dfk: dflow.DataFlowKernel
"""The DataFlowKernel which is managing this task.
"""

func_name: str

status: States
Expand Down
1 change: 0 additions & 1 deletion parsl/executors/workqueue/parsl_coprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import sys
from parsl.app.errors import RemoteExceptionWrapper
import parsl.executors.workqueue.exec_parsl_function as epf

import socket
import json
Expand Down
4 changes: 2 additions & 2 deletions parsl/tests/test_error_handling/test_python_walltime.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def test_python_longer_walltime_at_invocation():
def test_python_walltime_wrapped_names():
f = my_app(0.01, walltime=6)
assert f.result() == 7
assert f.task_def['func'].__name__ == "my_app"
assert f.task_def['func_name'] == "my_app"
assert f.task_record['func'].__name__ == "my_app"
assert f.task_record['func_name'] == "my_app"


def test_python_bad_decorator_args():
Expand Down
Loading

0 comments on commit bc38eb6

Please sign in to comment.