From c788de6d06325713d4631a206a3a9866e3b9cc30 Mon Sep 17 00:00:00 2001 From: "Andrew S. Rosen" Date: Fri, 20 Oct 2023 10:59:11 -0700 Subject: [PATCH 1/4] Remove unused import (#2917) --- parsl/executors/workqueue/parsl_coprocess.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parsl/executors/workqueue/parsl_coprocess.py b/parsl/executors/workqueue/parsl_coprocess.py index 009e91a991..89741302d4 100755 --- a/parsl/executors/workqueue/parsl_coprocess.py +++ b/parsl/executors/workqueue/parsl_coprocess.py @@ -2,7 +2,6 @@ import sys from parsl.app.errors import RemoteExceptionWrapper -import parsl.executors.workqueue.exec_parsl_function as epf import socket import json From c66ca1398478d93f090a733ac80f3ee7b2ca7deb Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 20 Oct 2023 13:37:33 -0500 Subject: [PATCH 2/4] Use the term "task record" instead of "task def(inition)" more consistently (#2913) This will be a breaking change for any user that references the (now renamed) task_def attribute of an AppFuture. --- parsl/dataflow/dflow.py | 92 +++++++++---------- parsl/dataflow/futures.py | 16 ++-- .../test_python_walltime.py | 4 +- .../test_memoization_representation.py | 10 +- 4 files changed, 61 insertions(+), 61 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index c42d536c70..daabd9c191 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -451,8 +451,8 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional if isinstance(joinable, Future): je = joinable.exception() if je is not None: - if hasattr(joinable, 'task_def'): - tid = joinable.task_def['id'] + if hasattr(joinable, 'task_record'): + tid = joinable.task_record['id'] else: tid = None exceptions_tids = [(je, tid)] @@ -460,8 +460,8 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional for future in joinable: je = future.exception() if je is not None: - if hasattr(joinable, 'task_def'): - tid = joinable.task_def['id'] + if hasattr(joinable, 'task_record'): + tid = joinable.task_record['id'] else: tid = None exceptions_tids.append((je, tid)) @@ -854,8 +854,8 @@ def _unwrap_futures(self, args, kwargs): try: new_args.extend([dep.result()]) except Exception as e: - if hasattr(dep, 'task_def'): - tid = dep.task_def['id'] + if hasattr(dep, 'task_record'): + tid = dep.task_record['id'] else: tid = None dep_failures.extend([(e, tid)]) @@ -869,8 +869,8 @@ def _unwrap_futures(self, args, kwargs): try: kwargs[key] = dep.result() except Exception as e: - if hasattr(dep, 'task_def'): - tid = dep.task_def['id'] + if hasattr(dep, 'task_record'): + tid = dep.task_record['id'] else: tid = None dep_failures.extend([(e, tid)]) @@ -883,8 +883,8 @@ def _unwrap_futures(self, args, kwargs): try: new_inputs.extend([dep.result()]) except Exception as e: - if hasattr(dep, 'task_def'): - tid = dep.task_def['id'] + if hasattr(dep, 'task_record'): + tid = dep.task_record['id'] else: tid = None dep_failures.extend([(e, tid)]) @@ -967,38 +967,38 @@ def submit(self, resource_specification = app_kwargs.get('parsl_resource_specification', {}) - task_def: TaskRecord - task_def = {'depends': [], - 'executor': executor, - 'func_name': func.__name__, - 'memoize': cache, - 'hashsum': None, - 'exec_fu': None, - 'fail_count': 0, - 'fail_cost': 0, - 'fail_history': [], - 'from_memo': None, - 'ignore_for_cache': ignore_for_cache, - 'join': join, - 'joins': None, - 'try_id': 0, - 'id': task_id, - 'time_invoked': datetime.datetime.now(), - 'time_returned': None, - 'try_time_launched': None, - 'try_time_returned': None, - 'resource_specification': resource_specification} - - self.update_task_state(task_def, States.unsched) - - app_fu = AppFuture(task_def) + task_record: TaskRecord + task_record = {'depends': [], + 'executor': executor, + 'func_name': func.__name__, + 'memoize': cache, + 'hashsum': None, + 'exec_fu': None, + 'fail_count': 0, + 'fail_cost': 0, + 'fail_history': [], + 'from_memo': None, + 'ignore_for_cache': ignore_for_cache, + 'join': join, + 'joins': None, + 'try_id': 0, + 'id': task_id, + 'time_invoked': datetime.datetime.now(), + 'time_returned': None, + 'try_time_launched': None, + 'try_time_returned': None, + 'resource_specification': resource_specification} + + self.update_task_state(task_record, States.unsched) + + app_fu = AppFuture(task_record) # Transform remote input files to data futures app_args, app_kwargs, func = self._add_input_deps(executor, app_args, app_kwargs, func) func = self._add_output_deps(executor, app_args, app_kwargs, app_fu, func) - task_def.update({ + task_record.update({ 'args': app_args, 'func': func, 'kwargs': app_kwargs, @@ -1006,11 +1006,11 @@ def submit(self, assert task_id not in self.tasks - self.tasks[task_id] = task_def + self.tasks[task_id] = task_record # Get the list of dependencies for the task depends = self._gather_all_deps(app_args, app_kwargs) - task_def['depends'] = depends + task_record['depends'] = depends depend_descs = [] for d in depends: @@ -1025,16 +1025,16 @@ def submit(self, waiting_message = "not waiting on any dependency" logger.info("Task {} submitted for App {}, {}".format(task_id, - task_def['func_name'], + task_record['func_name'], waiting_message)) - task_def['task_launch_lock'] = threading.Lock() + task_record['task_launch_lock'] = threading.Lock() - app_fu.add_done_callback(partial(self.handle_app_update, task_def)) - self.update_task_state(task_def, States.pending) - logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_def['app_fu'])) + app_fu.add_done_callback(partial(self.handle_app_update, task_record)) + self.update_task_state(task_record, States.pending) + logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu'])) - self._send_task_log_info(task_def) + self._send_task_log_info(task_record) # at this point add callbacks to all dependencies to do a launch_if_ready # call whenever a dependency completes. @@ -1051,14 +1051,14 @@ def submit(self, for d in depends: def callback_adapter(dep_fut: Future) -> None: - self.launch_if_ready(task_def) + self.launch_if_ready(task_record) try: d.add_done_callback(callback_adapter) except Exception as e: logger.error("add_done_callback got an exception {} which will be ignored".format(e)) - self.launch_if_ready(task_def) + self.launch_if_ready(task_record) return app_fu diff --git a/parsl/dataflow/futures.py b/parsl/dataflow/futures.py index 610932af4c..e41db45648 100644 --- a/parsl/dataflow/futures.py +++ b/parsl/dataflow/futures.py @@ -59,32 +59,32 @@ class AppFuture(Future): """ - def __init__(self, task_def: TaskRecord) -> None: + def __init__(self, task_record: TaskRecord) -> None: """Initialize the AppFuture. Args: KWargs: - - task_def : The DFK task definition dictionary for the task represented - by this future. + - task_record : The TaskRecord for the task represented by + this future. """ super().__init__() self._update_lock = threading.Lock() self._outputs: Sequence[DataFuture] self._outputs = [] - self.task_def = task_def + self.task_record = task_record @property def stdout(self) -> Optional[str]: - return self.task_def['kwargs'].get('stdout') + return self.task_record['kwargs'].get('stdout') @property def stderr(self) -> Optional[str]: - return self.task_def['kwargs'].get('stderr') + return self.task_record['kwargs'].get('stderr') @property def tid(self) -> int: - return self.task_def['id'] + return self.task_record['id'] def cancel(self) -> bool: raise NotImplementedError("Cancel not implemented") @@ -113,7 +113,7 @@ def task_status(self) -> str: Returns: str """ - return self.task_def['status'].name + return self.task_record['status'].name @property def outputs(self) -> Sequence[DataFuture]: diff --git a/parsl/tests/test_error_handling/test_python_walltime.py b/parsl/tests/test_error_handling/test_python_walltime.py index aa07111f05..7e570c8c2d 100644 --- a/parsl/tests/test_error_handling/test_python_walltime.py +++ b/parsl/tests/test_error_handling/test_python_walltime.py @@ -27,8 +27,8 @@ def test_python_longer_walltime_at_invocation(): def test_python_walltime_wrapped_names(): f = my_app(0.01, walltime=6) assert f.result() == 7 - assert f.task_def['func'].__name__ == "my_app" - assert f.task_def['func_name'] == "my_app" + assert f.task_record['func'].__name__ == "my_app" + assert f.task_record['func_name'] == "my_app" def test_python_bad_decorator_args(): diff --git a/parsl/tests/test_monitoring/test_memoization_representation.py b/parsl/tests/test_monitoring/test_memoization_representation.py index 04e4f99691..c8bc72c0b0 100644 --- a/parsl/tests/test_monitoring/test_memoization_representation.py +++ b/parsl/tests/test_monitoring/test_memoization_representation.py @@ -41,9 +41,9 @@ def test_hashsum(): f4 = this_app(4) assert f4.result() == 5 - assert f1.task_def['hashsum'] == f3.task_def['hashsum'] - assert f1.task_def['hashsum'] == f4.task_def['hashsum'] - assert f1.task_def['hashsum'] != f2.task_def['hashsum'] + assert f1.task_record['hashsum'] == f3.task_record['hashsum'] + assert f1.task_record['hashsum'] == f4.task_record['hashsum'] + assert f1.task_record['hashsum'] != f2.task_record['hashsum'] logger.info("cleaning up parsl") parsl.dfk().cleanup() @@ -62,11 +62,11 @@ def test_hashsum(): assert task_count == 4 # this will check that the number of task rows for each hashsum matches the above app invocations - result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f1.task_def['hashsum']}'")) + result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f1.task_record['hashsum']}'")) (hashsum_count, ) = result.first() assert hashsum_count == 3 - result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f2.task_def['hashsum']}'")) + result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f2.task_record['hashsum']}'")) (hashsum_count, ) = result.first() assert hashsum_count == 1 From 84377789e931caef1d4f46e7344189856eb96ddb Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 21 Oct 2023 16:21:21 -0500 Subject: [PATCH 3/4] Separate monitoring core and visualization dependencies (#2911) This reflects a difference in code maturity and maintenance between the two: monitoring core is much more maintained and stable than visualization at the moment. Work with LSST has repeatedly encountered dependency problems caused by the visualization dependencies, even though those dependencies are not need by LSST, and this separation allows the visualization dependencies to be skipped. Breaking change: If you are using visualization you will now need to: pip install parsl[monitoring,visualization] to get all of the dependencies previously installed by [monitoring] --- Makefile | 2 +- docs/userguide/monitoring.rst | 2 +- setup.py | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 0739954e9a..bac23fbddc 100644 --- a/Makefile +++ b/Makefile @@ -74,7 +74,7 @@ wqex_local_test: $(CCTOOLS_INSTALL) ## run all tests with workqueue_ex config .PHONY: config_local_test config_local_test: - pip3 install ".[monitoring,proxystore]" + pip3 install ".[monitoring,visualization,proxystore]" pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10 .PHONY: site_test diff --git a/docs/userguide/monitoring.rst b/docs/userguide/monitoring.rst index 5f26885099..b83994498b 100644 --- a/docs/userguide/monitoring.rst +++ b/docs/userguide/monitoring.rst @@ -56,7 +56,7 @@ Visualization To run the web dashboard utility ``parsl-visualize`` you first need to install its dependencies: - $ pip install 'parsl[monitoring]' + $ pip install 'parsl[monitoring,visualization]' To view the web dashboard while or after a Parsl program has executed, run the ``parsl-visualize`` utility:: diff --git a/setup.py b/setup.py index aec30ccd7d..e06ca085db 100755 --- a/setup.py +++ b/setup.py @@ -8,7 +8,9 @@ extras_require = { 'monitoring' : [ - 'sqlalchemy>=1.4,<2', + 'sqlalchemy>=1.4,<2' + ], + 'visualization' : [ 'pydot', 'networkx>=2.5,<2.6', 'Flask>=1.0.2', From 65cdfb1c18fc538f856e6440551a4421fd42e1bb Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 21 Oct 2023 16:56:29 -0500 Subject: [PATCH 4/4] Lift `[]` and `.` operators into AppFutures (#2904) The aim is to allow more python syntax to "work" within parsl's DAG framework. This same style of implementation could be used for many other dunder methods defined on objects but not defined on Future, I think - for example to allow some_app()+1 or some_app() + some_other_app() - if the approach in this PR works for [], then I think it would be mostly mechanical work to implement the same approach for some other dunder methods. The dunder methods where this makes sense are the methods where any type can be returned: for example, af[x] returns a future that will itself contain the value of af.result()[x] when the result is available. The dunder methods where this does not make sense are roughly the methods where there is a stronger meaning to the return value: for example len(af), af.len() must always return an int, and that does not make sense in the world of deferring to the future: the length is not known at the time of invoking len(af) and so no meaningful integer can be returned. --- docs/userguide/index.rst | 1 + docs/userguide/lifted_ops.rst | 56 ++++++++ parsl/dataflow/dflow.py | 1 + parsl/dataflow/futures.py | 32 ++++- parsl/dataflow/taskrecord.py | 7 + parsl/tests/test_python_apps/test_lifted.py | 137 ++++++++++++++++++++ 6 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 docs/userguide/lifted_ops.rst create mode 100644 parsl/tests/test_python_apps/test_lifted.py diff --git a/docs/userguide/index.rst b/docs/userguide/index.rst index 21de9eb704..3d667816a5 100644 --- a/docs/userguide/index.rst +++ b/docs/userguide/index.rst @@ -16,6 +16,7 @@ User guide monitoring workflow modularizing + lifted_ops joins usage_tracking plugins diff --git a/docs/userguide/lifted_ops.rst b/docs/userguide/lifted_ops.rst new file mode 100644 index 0000000000..6e258b9b62 --- /dev/null +++ b/docs/userguide/lifted_ops.rst @@ -0,0 +1,56 @@ +.. _label-liftedops: + +Lifted operators +================ + +Parsl allows some operators (``[]`` and ``.``) to be used on an AppFuture in +a way that makes sense with those operators on the eventually returned +result. + +Lifted [] operator +------------------ + +When an app returns a complex structure such as a ``dict`` or a ``list``, +it is sometimes useful to pass an element of that structure to a subsequent +task, without waiting for that subsequent task to complete. + +To help with this, Parsl allows the ``[]`` operator to be used on an +`AppFuture`. This operator will return another `AppFuture` that will +complete after the initial future, with the result of ``[]`` on the value +of the initial future. + +The end result is that this assertion will hold: + +.. code-block:: python + + fut = my_app() + assert fut['x'].result() == fut.result()[x] + +but more concurrency will be available, as execution of the main workflow +code will not stop to wait for ``result()`` to complete on the initial +future. + +`AppFuture` does not implement other methods commonly associated with +dicts and lists, such as ``len``, because those methods should return a +specific type of result immediately, and that is not possible when the +results are not available until the future. + +If a key does not exist in the returned result, then the exception will +appear in the Future returned by ``[]``, rather than at the point that +the ``[]`` operator is applied. This is because the valid values that can +be used are not known until the underlying result is available. + +Lifted . operator +----------------- + +The ``.`` operator works similarly to ``[]`` described above: + +.. code-block:: python + + fut = my_app + assert fut.x.result() == fut.result().x + +Attributes beginning with ``_`` are not lifted as this usually indicates an +attribute that is used for internal purposes, and to try to avoid mixing +protocols (such as iteration in for loops) defined on AppFutures vs protocols +defined on the underlying result object. diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index daabd9c191..c494346533 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -969,6 +969,7 @@ def submit(self, task_record: TaskRecord task_record = {'depends': [], + 'dfk': self, 'executor': executor, 'func_name': func.__name__, 'memoize': cache, diff --git a/parsl/dataflow/futures.py b/parsl/dataflow/futures.py index e41db45648..7beac4e599 100644 --- a/parsl/dataflow/futures.py +++ b/parsl/dataflow/futures.py @@ -5,11 +5,14 @@ 2. AppFutures which represent the futures on App/Leaf tasks. """ +from __future__ import annotations from concurrent.futures import Future import logging import threading -from typing import Optional, Sequence +from typing import Any, Optional, Sequence + +import parsl.app.app as app from parsl.app.futures import DataFuture from parsl.dataflow.taskrecord import TaskRecord @@ -118,3 +121,30 @@ def task_status(self) -> str: @property def outputs(self) -> Sequence[DataFuture]: return self._outputs + + def __getitem__(self, key: Any) -> AppFuture: + # This is decorated on each invocation because the getitem task + # should be bound to the same DFK as the task associated with this + # Future. + deferred_getitem_app = app.python_app(deferred_getitem, executors=['_parsl_internal'], data_flow_kernel=self.task_record['dfk']) + + return deferred_getitem_app(self, key) + + def __getattr__(self, name: str) -> AppFuture: + # this will avoid lifting behaviour on private methods and attributes, + # including __double_underscore__ methods which implement other + # Python syntax (such as iterators in for loops) + if name.startswith("_"): + raise AttributeError() + + deferred_getattr_app = app.python_app(deferred_getattr, executors=['_parsl_internal'], data_flow_kernel=self.task_record['dfk']) + + return deferred_getattr_app(self, name) + + +def deferred_getitem(o: Any, k: Any) -> Any: + return o[k] + + +def deferred_getattr(o: Any, name: str) -> Any: + return getattr(o, name) diff --git a/parsl/dataflow/taskrecord.py b/parsl/dataflow/taskrecord.py index a5df6f144d..34d5ef4ca5 100644 --- a/parsl/dataflow/taskrecord.py +++ b/parsl/dataflow/taskrecord.py @@ -5,18 +5,25 @@ from typing_extensions import TypedDict from concurrent.futures import Future + # only for type checking: from typing import Any, Callable, Dict, Optional, List, Sequence, TYPE_CHECKING, Union if TYPE_CHECKING: from parsl.dataflow.futures import AppFuture +import parsl.dataflow.dflow as dflow + from parsl.dataflow.states import States class TaskRecord(TypedDict, total=False): """This stores most information about a Parsl task""" + dfk: dflow.DataFlowKernel + """The DataFlowKernel which is managing this task. + """ + func_name: str status: States diff --git a/parsl/tests/test_python_apps/test_lifted.py b/parsl/tests/test_python_apps/test_lifted.py new file mode 100644 index 0000000000..644792205a --- /dev/null +++ b/parsl/tests/test_python_apps/test_lifted.py @@ -0,0 +1,137 @@ +import pytest + +from concurrent.futures import Future +from parsl import python_app + + +@python_app +def returns_a_dict(): + return {"a": "X", "b": "Y"} + + +@python_app +def returns_a_list(): + return ["X", "Y"] + + +@python_app +def returns_a_tuple(): + return ("X", "Y") + + +@python_app +def returns_a_class(): + from dataclasses import dataclass + + @dataclass + class MyClass: + a: str = "X" + b: str = "Y" + + return MyClass + + +class MyOuterClass(): + def __init__(self): + self.q = "A" + self.r = "B" + + +@python_app +def returns_a_class_instance(): + return MyOuterClass() + + +def test_returns_a_dict(): + + # precondition that returns_a_dict behaves + # correctly + assert returns_a_dict().result()["a"] == "X" + + # check that the deferred __getitem__ functionality works, + # allowing [] to be used on an AppFuture + assert returns_a_dict()["a"].result() == "X" + + +def test_returns_a_list(): + + # precondition that returns_a_list behaves + # correctly + assert returns_a_list().result()[0] == "X" + + # check that the deferred __getitem__ functionality works, + # allowing [] to be used on an AppFuture + assert returns_a_list()[0].result() == "X" + + +def test_returns_a_tuple(): + + # precondition that returns_a_tuple behaves + # correctly + assert returns_a_tuple().result()[0] == "X" + + # check that the deferred __getitem__ functionality works, + # allowing [] to be used on an AppFuture + assert returns_a_tuple()[0].result() == "X" + + +def test_lifted_getitem_on_dict_bad_key(): + assert isinstance(returns_a_dict()["invalid"].exception(), KeyError) + + +def test_returns_a_class_instance(): + # precondition + assert returns_a_class_instance().result().q == "A" + + # test of commuting . and result() + assert returns_a_class_instance().q.result() == "A" + + +def test_returns_a_class_instance_no_underscores(): + # test that _underscore attribute references are not lifted + with pytest.raises(AttributeError): + returns_a_class_instance()._nosuchattribute.result() + + +@pytest.mark.skip("returning classes is not supported in WorkQueue or Task Vine - see issue #2908") +def test_returns_a_class(): + + # precondition that returns_a_class behaves + # correctly + assert returns_a_class().result().a == "X" + + # check that the deferred __getattr__ functionality works, + # allowing [] to be used on an AppFuture + assert returns_a_class().a.result() == "X" + + # when the result is not indexable, a sensible error should + # appear in the appropriate future + + +@python_app +def passthrough(v): + return v + + +def test_lifted_getitem_ordering(): + # this should test that lifting getitem has the correct execution + # order: that it does not defer the execution of following code + + f_prereq = Future() + + f_top = passthrough(f_prereq) + + f_a = f_top['a'] + + # lifted ['a'] should not defer execution here (so it should not + # implicitly call result() on f_top). If it does, this test will + # hang at this point, waiting for f_top to get a value, which + # will not happen until f_prereq gets a value.. + # which doesn't happen until: + + f_prereq.set_result({"a": "X"}) + + # now at this point it should be safe to wait for f_a to get a result + # while passthrough and lifted getitem run... + + assert f_a.result() == "X"