Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the term "task record" instead of "task def(inition)" more consistently #2913

Merged
merged 2 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,17 +451,17 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional
if isinstance(joinable, Future):
je = joinable.exception()
if je is not None:
if hasattr(joinable, 'task_def'):
tid = joinable.task_def['id']
if hasattr(joinable, 'task_record'):
tid = joinable.task_record['id']
else:
tid = None
exceptions_tids = [(je, tid)]
elif isinstance(joinable, list):
for future in joinable:
je = future.exception()
if je is not None:
if hasattr(joinable, 'task_def'):
tid = joinable.task_def['id']
if hasattr(joinable, 'task_record'):
tid = joinable.task_record['id']
else:
tid = None
exceptions_tids.append((je, tid))
Expand Down Expand Up @@ -854,8 +854,8 @@ def _unwrap_futures(self, args, kwargs):
try:
new_args.extend([dep.result()])
except Exception as e:
if hasattr(dep, 'task_def'):
tid = dep.task_def['id']
if hasattr(dep, 'task_record'):
tid = dep.task_record['id']
else:
tid = None
dep_failures.extend([(e, tid)])
Expand All @@ -869,8 +869,8 @@ def _unwrap_futures(self, args, kwargs):
try:
kwargs[key] = dep.result()
except Exception as e:
if hasattr(dep, 'task_def'):
tid = dep.task_def['id']
if hasattr(dep, 'task_record'):
tid = dep.task_record['id']
else:
tid = None
dep_failures.extend([(e, tid)])
Expand All @@ -883,8 +883,8 @@ def _unwrap_futures(self, args, kwargs):
try:
new_inputs.extend([dep.result()])
except Exception as e:
if hasattr(dep, 'task_def'):
tid = dep.task_def['id']
if hasattr(dep, 'task_record'):
tid = dep.task_record['id']
else:
tid = None
dep_failures.extend([(e, tid)])
Expand Down Expand Up @@ -967,50 +967,50 @@ def submit(self,

resource_specification = app_kwargs.get('parsl_resource_specification', {})

task_def: TaskRecord
task_def = {'depends': [],
'executor': executor,
'func_name': func.__name__,
'memoize': cache,
'hashsum': None,
'exec_fu': None,
'fail_count': 0,
'fail_cost': 0,
'fail_history': [],
'from_memo': None,
'ignore_for_cache': ignore_for_cache,
'join': join,
'joins': None,
'try_id': 0,
'id': task_id,
'time_invoked': datetime.datetime.now(),
'time_returned': None,
'try_time_launched': None,
'try_time_returned': None,
'resource_specification': resource_specification}

self.update_task_state(task_def, States.unsched)

app_fu = AppFuture(task_def)
task_record: TaskRecord
task_record = {'depends': [],
'executor': executor,
'func_name': func.__name__,
'memoize': cache,
'hashsum': None,
'exec_fu': None,
'fail_count': 0,
'fail_cost': 0,
'fail_history': [],
'from_memo': None,
'ignore_for_cache': ignore_for_cache,
'join': join,
'joins': None,
'try_id': 0,
'id': task_id,
'time_invoked': datetime.datetime.now(),
'time_returned': None,
'try_time_launched': None,
'try_time_returned': None,
'resource_specification': resource_specification}

self.update_task_state(task_record, States.unsched)

app_fu = AppFuture(task_record)

# Transform remote input files to data futures
app_args, app_kwargs, func = self._add_input_deps(executor, app_args, app_kwargs, func)

func = self._add_output_deps(executor, app_args, app_kwargs, app_fu, func)

task_def.update({
task_record.update({
'args': app_args,
'func': func,
'kwargs': app_kwargs,
'app_fu': app_fu})

assert task_id not in self.tasks

self.tasks[task_id] = task_def
self.tasks[task_id] = task_record

# Get the list of dependencies for the task
depends = self._gather_all_deps(app_args, app_kwargs)
task_def['depends'] = depends
task_record['depends'] = depends

depend_descs = []
for d in depends:
Expand All @@ -1025,16 +1025,16 @@ def submit(self,
waiting_message = "not waiting on any dependency"

logger.info("Task {} submitted for App {}, {}".format(task_id,
task_def['func_name'],
task_record['func_name'],
waiting_message))

task_def['task_launch_lock'] = threading.Lock()
task_record['task_launch_lock'] = threading.Lock()

app_fu.add_done_callback(partial(self.handle_app_update, task_def))
self.update_task_state(task_def, States.pending)
logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_def['app_fu']))
app_fu.add_done_callback(partial(self.handle_app_update, task_record))
self.update_task_state(task_record, States.pending)
logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu']))

self._send_task_log_info(task_def)
self._send_task_log_info(task_record)

# at this point add callbacks to all dependencies to do a launch_if_ready
# call whenever a dependency completes.
Expand All @@ -1051,14 +1051,14 @@ def submit(self,
for d in depends:

def callback_adapter(dep_fut: Future) -> None:
self.launch_if_ready(task_def)
self.launch_if_ready(task_record)

try:
d.add_done_callback(callback_adapter)
except Exception as e:
logger.error("add_done_callback got an exception {} which will be ignored".format(e))

self.launch_if_ready(task_def)
self.launch_if_ready(task_record)

return app_fu

Expand Down
16 changes: 8 additions & 8 deletions parsl/dataflow/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,32 +59,32 @@ class AppFuture(Future):

"""

def __init__(self, task_def: TaskRecord) -> None:
def __init__(self, task_record: TaskRecord) -> None:
"""Initialize the AppFuture.

Args:

KWargs:
- task_def : The DFK task definition dictionary for the task represented
by this future.
- task_record : The TaskRecord for the task represented by
this future.
"""
super().__init__()
self._update_lock = threading.Lock()
self._outputs: Sequence[DataFuture]
self._outputs = []
self.task_def = task_def
self.task_record = task_record

@property
def stdout(self) -> Optional[str]:
return self.task_def['kwargs'].get('stdout')
return self.task_record['kwargs'].get('stdout')

@property
def stderr(self) -> Optional[str]:
return self.task_def['kwargs'].get('stderr')
return self.task_record['kwargs'].get('stderr')

@property
def tid(self) -> int:
return self.task_def['id']
return self.task_record['id']

def cancel(self) -> bool:
raise NotImplementedError("Cancel not implemented")
Expand Down Expand Up @@ -113,7 +113,7 @@ def task_status(self) -> str:

Returns: str
"""
return self.task_def['status'].name
return self.task_record['status'].name

@property
def outputs(self) -> Sequence[DataFuture]:
Expand Down
4 changes: 2 additions & 2 deletions parsl/tests/test_error_handling/test_python_walltime.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def test_python_longer_walltime_at_invocation():
def test_python_walltime_wrapped_names():
f = my_app(0.01, walltime=6)
assert f.result() == 7
assert f.task_def['func'].__name__ == "my_app"
assert f.task_def['func_name'] == "my_app"
assert f.task_record['func'].__name__ == "my_app"
assert f.task_record['func_name'] == "my_app"


def test_python_bad_decorator_args():
Expand Down
10 changes: 5 additions & 5 deletions parsl/tests/test_monitoring/test_memoization_representation.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def test_hashsum():
f4 = this_app(4)
assert f4.result() == 5

assert f1.task_def['hashsum'] == f3.task_def['hashsum']
assert f1.task_def['hashsum'] == f4.task_def['hashsum']
assert f1.task_def['hashsum'] != f2.task_def['hashsum']
assert f1.task_record['hashsum'] == f3.task_record['hashsum']
assert f1.task_record['hashsum'] == f4.task_record['hashsum']
assert f1.task_record['hashsum'] != f2.task_record['hashsum']

logger.info("cleaning up parsl")
parsl.dfk().cleanup()
Expand All @@ -62,11 +62,11 @@ def test_hashsum():
assert task_count == 4

# this will check that the number of task rows for each hashsum matches the above app invocations
result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f1.task_def['hashsum']}'"))
result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f1.task_record['hashsum']}'"))
(hashsum_count, ) = result.first()
assert hashsum_count == 3

result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f2.task_def['hashsum']}'"))
result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f2.task_record['hashsum']}'"))
(hashsum_count, ) = result.first()
assert hashsum_count == 1

Expand Down
Loading