diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index c42d536c70..daabd9c191 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -451,8 +451,8 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional if isinstance(joinable, Future): je = joinable.exception() if je is not None: - if hasattr(joinable, 'task_def'): - tid = joinable.task_def['id'] + if hasattr(joinable, 'task_record'): + tid = joinable.task_record['id'] else: tid = None exceptions_tids = [(je, tid)] @@ -460,8 +460,8 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional for future in joinable: je = future.exception() if je is not None: - if hasattr(joinable, 'task_def'): - tid = joinable.task_def['id'] + if hasattr(joinable, 'task_record'): + tid = joinable.task_record['id'] else: tid = None exceptions_tids.append((je, tid)) @@ -854,8 +854,8 @@ def _unwrap_futures(self, args, kwargs): try: new_args.extend([dep.result()]) except Exception as e: - if hasattr(dep, 'task_def'): - tid = dep.task_def['id'] + if hasattr(dep, 'task_record'): + tid = dep.task_record['id'] else: tid = None dep_failures.extend([(e, tid)]) @@ -869,8 +869,8 @@ def _unwrap_futures(self, args, kwargs): try: kwargs[key] = dep.result() except Exception as e: - if hasattr(dep, 'task_def'): - tid = dep.task_def['id'] + if hasattr(dep, 'task_record'): + tid = dep.task_record['id'] else: tid = None dep_failures.extend([(e, tid)]) @@ -883,8 +883,8 @@ def _unwrap_futures(self, args, kwargs): try: new_inputs.extend([dep.result()]) except Exception as e: - if hasattr(dep, 'task_def'): - tid = dep.task_def['id'] + if hasattr(dep, 'task_record'): + tid = dep.task_record['id'] else: tid = None dep_failures.extend([(e, tid)]) @@ -967,38 +967,38 @@ def submit(self, resource_specification = app_kwargs.get('parsl_resource_specification', {}) - task_def: TaskRecord - task_def = {'depends': [], - 'executor': executor, - 'func_name': func.__name__, - 'memoize': cache, - 'hashsum': None, - 'exec_fu': None, - 'fail_count': 0, - 'fail_cost': 0, - 'fail_history': [], - 'from_memo': None, - 'ignore_for_cache': ignore_for_cache, - 'join': join, - 'joins': None, - 'try_id': 0, - 'id': task_id, - 'time_invoked': datetime.datetime.now(), - 'time_returned': None, - 'try_time_launched': None, - 'try_time_returned': None, - 'resource_specification': resource_specification} - - self.update_task_state(task_def, States.unsched) - - app_fu = AppFuture(task_def) + task_record: TaskRecord + task_record = {'depends': [], + 'executor': executor, + 'func_name': func.__name__, + 'memoize': cache, + 'hashsum': None, + 'exec_fu': None, + 'fail_count': 0, + 'fail_cost': 0, + 'fail_history': [], + 'from_memo': None, + 'ignore_for_cache': ignore_for_cache, + 'join': join, + 'joins': None, + 'try_id': 0, + 'id': task_id, + 'time_invoked': datetime.datetime.now(), + 'time_returned': None, + 'try_time_launched': None, + 'try_time_returned': None, + 'resource_specification': resource_specification} + + self.update_task_state(task_record, States.unsched) + + app_fu = AppFuture(task_record) # Transform remote input files to data futures app_args, app_kwargs, func = self._add_input_deps(executor, app_args, app_kwargs, func) func = self._add_output_deps(executor, app_args, app_kwargs, app_fu, func) - task_def.update({ + task_record.update({ 'args': app_args, 'func': func, 'kwargs': app_kwargs, @@ -1006,11 +1006,11 @@ def submit(self, assert task_id not in self.tasks - self.tasks[task_id] = task_def + self.tasks[task_id] = task_record # Get the list of dependencies for the task depends = self._gather_all_deps(app_args, app_kwargs) - task_def['depends'] = depends + task_record['depends'] = depends depend_descs = [] for d in depends: @@ -1025,16 +1025,16 @@ def submit(self, waiting_message = "not waiting on any dependency" logger.info("Task {} submitted for App {}, {}".format(task_id, - task_def['func_name'], + task_record['func_name'], waiting_message)) - task_def['task_launch_lock'] = threading.Lock() + task_record['task_launch_lock'] = threading.Lock() - app_fu.add_done_callback(partial(self.handle_app_update, task_def)) - self.update_task_state(task_def, States.pending) - logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_def['app_fu'])) + app_fu.add_done_callback(partial(self.handle_app_update, task_record)) + self.update_task_state(task_record, States.pending) + logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu'])) - self._send_task_log_info(task_def) + self._send_task_log_info(task_record) # at this point add callbacks to all dependencies to do a launch_if_ready # call whenever a dependency completes. @@ -1051,14 +1051,14 @@ def submit(self, for d in depends: def callback_adapter(dep_fut: Future) -> None: - self.launch_if_ready(task_def) + self.launch_if_ready(task_record) try: d.add_done_callback(callback_adapter) except Exception as e: logger.error("add_done_callback got an exception {} which will be ignored".format(e)) - self.launch_if_ready(task_def) + self.launch_if_ready(task_record) return app_fu diff --git a/parsl/dataflow/futures.py b/parsl/dataflow/futures.py index 610932af4c..e41db45648 100644 --- a/parsl/dataflow/futures.py +++ b/parsl/dataflow/futures.py @@ -59,32 +59,32 @@ class AppFuture(Future): """ - def __init__(self, task_def: TaskRecord) -> None: + def __init__(self, task_record: TaskRecord) -> None: """Initialize the AppFuture. Args: KWargs: - - task_def : The DFK task definition dictionary for the task represented - by this future. + - task_record : The TaskRecord for the task represented by + this future. """ super().__init__() self._update_lock = threading.Lock() self._outputs: Sequence[DataFuture] self._outputs = [] - self.task_def = task_def + self.task_record = task_record @property def stdout(self) -> Optional[str]: - return self.task_def['kwargs'].get('stdout') + return self.task_record['kwargs'].get('stdout') @property def stderr(self) -> Optional[str]: - return self.task_def['kwargs'].get('stderr') + return self.task_record['kwargs'].get('stderr') @property def tid(self) -> int: - return self.task_def['id'] + return self.task_record['id'] def cancel(self) -> bool: raise NotImplementedError("Cancel not implemented") @@ -113,7 +113,7 @@ def task_status(self) -> str: Returns: str """ - return self.task_def['status'].name + return self.task_record['status'].name @property def outputs(self) -> Sequence[DataFuture]: diff --git a/parsl/tests/test_error_handling/test_python_walltime.py b/parsl/tests/test_error_handling/test_python_walltime.py index aa07111f05..7e570c8c2d 100644 --- a/parsl/tests/test_error_handling/test_python_walltime.py +++ b/parsl/tests/test_error_handling/test_python_walltime.py @@ -27,8 +27,8 @@ def test_python_longer_walltime_at_invocation(): def test_python_walltime_wrapped_names(): f = my_app(0.01, walltime=6) assert f.result() == 7 - assert f.task_def['func'].__name__ == "my_app" - assert f.task_def['func_name'] == "my_app" + assert f.task_record['func'].__name__ == "my_app" + assert f.task_record['func_name'] == "my_app" def test_python_bad_decorator_args(): diff --git a/parsl/tests/test_monitoring/test_memoization_representation.py b/parsl/tests/test_monitoring/test_memoization_representation.py index 04e4f99691..c8bc72c0b0 100644 --- a/parsl/tests/test_monitoring/test_memoization_representation.py +++ b/parsl/tests/test_monitoring/test_memoization_representation.py @@ -41,9 +41,9 @@ def test_hashsum(): f4 = this_app(4) assert f4.result() == 5 - assert f1.task_def['hashsum'] == f3.task_def['hashsum'] - assert f1.task_def['hashsum'] == f4.task_def['hashsum'] - assert f1.task_def['hashsum'] != f2.task_def['hashsum'] + assert f1.task_record['hashsum'] == f3.task_record['hashsum'] + assert f1.task_record['hashsum'] == f4.task_record['hashsum'] + assert f1.task_record['hashsum'] != f2.task_record['hashsum'] logger.info("cleaning up parsl") parsl.dfk().cleanup() @@ -62,11 +62,11 @@ def test_hashsum(): assert task_count == 4 # this will check that the number of task rows for each hashsum matches the above app invocations - result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f1.task_def['hashsum']}'")) + result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f1.task_record['hashsum']}'")) (hashsum_count, ) = result.first() assert hashsum_count == 3 - result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f2.task_def['hashsum']}'")) + result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f2.task_record['hashsum']}'")) (hashsum_count, ) = result.first() assert hashsum_count == 1