Skip to content

Commit

Permalink
Add simple pull-based shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Sep 29, 2024
1 parent 005c804 commit 7e0f88d
Showing 1 changed file with 66 additions and 2 deletions.
68 changes: 66 additions & 2 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

from daft.daft import FileFormat, IOConfig, JoinType, PyExpr

Check warning on line 60 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L60

Added line #L60 was not covered by tests
from daft.logical.schema import Schema
from daft.runners.partitioning import PartialPartitionMetadata

Check warning on line 62 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L62

Added line #L62 was not covered by tests


# A PhysicalPlan that is still being built - may yield both PartitionTaskBuilders and PartitionTasks.
Expand Down Expand Up @@ -1624,8 +1625,71 @@ def streaming_push_exchange_op(
def fully_materializing_exchange_op(
child_plan: InProgressPhysicalPlan[PartitionT], partition_by: list[PyExpr], num_partitions: int
) -> InProgressPhysicalPlan[PartitionT]:
# 1. Materialize everything
raise NotImplementedError("TODO: Sammy")
from daft.expressions import Expression

Check warning on line 1628 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L1628

Added line #L1628 was not covered by tests

# Step 1: Naively materialize all child partitions
stage_id_children = next(stage_id_counter)
materialized_partitions: list[SingleOutputPartitionTask] = []
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
task = step.finalize_partition_task_single_output(stage_id=stage_id_children)
materialized_partitions.append(task)
yield task
elif isinstance(step, PartitionTask):
yield step
elif step is None:
yield None

Check warning on line 1641 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L1631-L1641

Added lines #L1631 - L1641 were not covered by tests
else:
yield step

Check warning on line 1643 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L1643

Added line #L1643 was not covered by tests

# Step 2: Wait for all partitions to be done
while any(not p.done() for p in materialized_partitions):
yield None

Check warning on line 1647 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L1646-L1647

Added lines #L1646 - L1647 were not covered by tests

# Step 3: Yield the map tasks
stage_id_map_tasks = next(stage_id_counter)
materialized_map_partitions: list[MultiOutputPartitionTask] = []
while materialized_partitions:
materialized_child_partition = materialized_partitions.pop(0)
map_task = (

Check warning on line 1654 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L1650-L1654

Added lines #L1650 - L1654 were not covered by tests
PartitionTaskBuilder(
inputs=[materialized_child_partition.partition()],
partial_metadatas=materialized_child_partition.partial_metadatas,
resource_request=ResourceRequest(),
)
.add_instruction(
execution_step.FanoutHash(
_num_outputs=num_partitions,
partition_by=ExpressionsProjection([Expression._from_pyexpr(expr) for expr in partition_by]),
),
ResourceRequest(),
)
.finalize_partition_task_multi_output(stage_id=stage_id_map_tasks)
)
materialized_map_partitions.append(map_task)
yield map_task

Check warning on line 1670 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L1669-L1670

Added lines #L1669 - L1670 were not covered by tests

# Step 4: Wait on all the map tasks to complete
while any(not p.done() for p in materialized_map_partitions):
yield None

Check warning on line 1674 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L1673-L1674

Added lines #L1673 - L1674 were not covered by tests

# Step 5: "Transpose the results" and run reduce tasks
transposed_results: list[list[tuple[PartitionT, PartialPartitionMetadata]]] = [[] for _ in range(num_partitions)]
for map_task in materialized_map_partitions:
partitions = map_task.partitions()
partition_metadatas = map_task.partial_metadatas
for i, (partition, meta) in enumerate(zip(partitions, partition_metadatas)):
transposed_results[i].append((partition, meta))

Check warning on line 1682 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L1677-L1682

Added lines #L1677 - L1682 were not covered by tests

for i, partitions in enumerate(transposed_results):
reduce_task = PartitionTaskBuilder(

Check warning on line 1685 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L1684-L1685

Added lines #L1684 - L1685 were not covered by tests
inputs=[p for p, _ in partitions],
partial_metadatas=[m for _, m in partitions],
resource_request=ResourceRequest(),
).add_instruction(
instruction=execution_step.ReduceMerge(),
)
yield reduce_task

Check warning on line 1692 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L1692

Added line #L1692 was not covered by tests


# This was the complicated one...
Expand Down

0 comments on commit 7e0f88d

Please sign in to comment.