Skip to content

Commit

Permalink
Unbreak task packing
Browse files Browse the repository at this point in the history
When submitting a task group, only attempt to upload task inputs
corresponding to nodes external to the task group since only those
will have been resolved.
  • Loading branch information
cjao committed Dec 13, 2023
1 parent 6e331d4 commit d79c751
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,10 @@ jobs:
if: env.BUILD_AND_RUN_ALL
id: covalent_start
run: |
export COVALENT_ENABLE_TASK_PACKING=1
covalent db migrate
if [ "${{ matrix.backend }}" = 'dask' ] ; then
COVALENT_ENABLE_TASK_PACKING=1 covalent start -d
covalent start -d
elif [ "${{ matrix.backend }}" = 'local' ] ; then
covalent start --no-cluster -d
else
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Reduced number of assets to upload when submitting a dispatch.
- Fixed inaccuracies in task packing exposed by no longer uploading null attributes upon dispatch.

### Operations

Expand Down
15 changes: 13 additions & 2 deletions covalent_dispatcher/_core/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ async def _submit_task_group(dispatch_id: str, sorted_nodes: List[int], task_gro
app_log.debug("8A: Update node success (run_planned_workflow).")

else:
# Nodes whose values have already been resolved
known_nodes = []

# Skip the group if all task outputs can be reused from a
Expand All @@ -196,6 +197,8 @@ async def _submit_task_group(dispatch_id: str, sorted_nodes: List[int], task_gro
# Gather inputs for each task and send the task spec sequence to the runner
task_specs = []

sorted_nodes_set = set(sorted_nodes)

for node_id in sorted_nodes:
app_log.debug(f"Gathering inputs for task {node_id} (run_planned_workflow).")

Expand All @@ -214,8 +217,16 @@ async def _submit_task_group(dispatch_id: str, sorted_nodes: List[int], task_gro
"args_ids": abs_task_input["args"],
"kwargs_ids": abs_task_input["kwargs"],
}
known_nodes += abs_task_input["args"]
known_nodes += list(abs_task_input["kwargs"].values())
# Task inputs that don't belong to the task group have already beeen resolved
external_task_args = filter(
lambda x: x not in sorted_nodes_set, abs_task_input["args"]
)
known_nodes.extend(external_task_args)
external_task_kwargs = filter(
lambda x: x not in sorted_nodes_set, abs_task_input["kwargs"].values()
)
known_nodes.extend(external_task_kwargs)

task_specs.append(task_spec)

app_log.debug(
Expand Down

0 comments on commit d79c751

Please sign in to comment.