Skip to content

Commit

Permalink
Who knows if this works
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Sep 29, 2024
1 parent 7e0f88d commit 0bc057d
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 13 deletions.
36 changes: 34 additions & 2 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1616,10 +1616,42 @@ def fanout_random(child_plan: InProgressPhysicalPlan[PartitionT], num_partitions
seed += 1


def streaming_push_exchange_op(
def fully_materializing_push_exchange_op(
child_plan: InProgressPhysicalPlan[PartitionT], partition_by: list[PyExpr], num_partitions: int
) -> InProgressPhysicalPlan[PartitionT]:
raise NotImplementedError("TODO: jay")
from daft.expressions import Expression

# Step 1: Naively materialize all child partitions
stage_id_children = next(stage_id_counter)
materialized_partitions: list[SingleOutputPartitionTask] = []
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
task = step.finalize_partition_task_single_output(stage_id=stage_id_children)
materialized_partitions.append(task)
yield task
elif isinstance(step, PartitionTask):
yield step
elif step is None:
yield None
else:
yield step

# Step 2: Wait for all partitions to be done
while any(not p.done() for p in materialized_partitions):
yield None

with get_context().shuffle_service_factory().push_based_shuffle_service_context(
num_partitions, partition_by=ExpressionsProjection([Expression._from_pyexpr(e) for e in partition_by])
) as shuffle_service:
results = shuffle_service.run([p.partition() for p in materialized_partitions])

for reduced_data in results:
reduce_task = PartitionTaskBuilder(
inputs=[reduced_data],
partial_metadatas=None,
resource_request=ResourceRequest(),
)
yield reduce_task


def fully_materializing_exchange_op(
Expand Down
164 changes: 164 additions & 0 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1492,3 +1492,167 @@ def fully_materializing_shuffle_service_context(
)
yield shuffle_service
shuffle_service.teardown()

@contextlib.contextmanager
def push_based_shuffle_service_context(
self,
num_partitions: int,
partition_by: ExpressionsProjection,
) -> Iterator[RayPushBasedShuffle]:
num_cpus = int(ray.cluster_resources()["CPU"])

# Number of mappers is ~2x number of mergers
num_map_tasks = num_cpus // 3
num_merge_tasks = num_map_tasks * 2

yield RayPushBasedShuffle(num_map_tasks, num_merge_tasks, num_partitions, partition_by)


@ray.remote
def map_fn(
map_input: MicroPartition, num_mergers: int, partition_by: ExpressionsProjection, num_partitions: int
) -> list[list[MicroPartition]]:
"""Returns `N` number of inputs, where `N` is the number of mergers"""
# Partition the input data based on the partitioning spec
partitioned_data = map_input.partition_by_hash(partition_by, num_partitions)

outputs: list[list[MicroPartition]] = [[] for _ in range(num_mergers)]

# Distribute the partitioned data across the mergers
for partition_idx, partition in enumerate(partitioned_data):
merger_idx = partition_idx // (num_partitions // num_mergers)
if merger_idx >= num_mergers:
merger_idx = num_mergers - 1
outputs[merger_idx].append(partition)

return outputs


@ray.remote
def merge_fn(*merger_inputs_across_mappers: list[MicroPartition]) -> list[MicroPartition]:
"""Returns `P` number of inputs, where `P` is the number of reducers assigned to this merger"""
num_partitions_for_this_merger = len(merger_inputs_across_mappers[0])
merged_partitions = []
for partition_idx in range(num_partitions_for_this_merger):
partitions_to_merge = [data[partition_idx] for data in merger_inputs_across_mappers]
merged_partition = MicroPartition.concat(partitions_to_merge)
merged_partitions.append(merged_partition)
return merged_partitions


@ray.remote
def reduce_fn(*reduce_inputs_across_rounds: MicroPartition) -> MicroPartition:
"""Returns 1 output, which is the reduced data across rounds"""
# Concatenate all input MicroPartitions across rounds
reduced_partition = MicroPartition.concat(list(reduce_inputs_across_rounds))

# Return the result as a list containing a single MicroPartition
return reduced_partition


class RayPushBasedShuffle:
def __init__(self, num_mappers: int, num_mergers: int, num_reducers: int, partition_by: ExpressionsProjection):
self._num_mappers = num_mappers
self._num_mergers = num_mergers
self._num_reducers = num_reducers
self._partition_by = partition_by

def _num_reducers_for_merger(self, merger_idx: int) -> int:
base_num = self._num_reducers // self._num_mergers
if merger_idx < (self._num_reducers % self._num_mergers):
return base_num + 1
return base_num

def _get_reducer_inputs_location(self, reducer_idx: int) -> tuple[int, int]:
"""Returns the (merger_idx, offset) of where the inputs to a given reducer should live"""
for merger_idx in range(self._num_mergers):
num_reducers = self._num_reducers_for_merger(merger_idx)
if num_reducers > reducer_idx:
return merger_idx, reducer_idx
else:
reducer_idx - num_reducers
raise ValueError(f"Cannot find merger for reducer_idx: {reducer_idx}")

def _merger_options(self, merger_idx: int) -> dict[str, Any]:
# TODO: populate the nth merger's options. Place the nth merge task on the (n % NUM_NODES)th node
#
# node_strategies = {
# node_id: {
# "scheduling_strategy": NodeAffinitySchedulingStrategy(
# node_id, soft=True
# )
# }
# for node_id in set(merge_task_placement)
# }
# self._merge_task_options = [
# node_strategies[node_id] for node_id in merge_task_placement
# ]
return {}

def _reduce_options(self, reducer_idx: int) -> dict[str, Any]:
# TODO: populate the nth merger's options. Place the nth merge task on the (n % NUM_NODES)th node
#
# node_strategies = {
# node_id: {
# "scheduling_strategy": NodeAffinitySchedulingStrategy(
# node_id, soft=True
# )
# }
# for node_id in set(merge_task_placement)
# }
# self._merge_task_options = [
# node_strategies[node_id] for node_id in merge_task_placement
# ]
return {}

def run(self, materialized_inputs: list[ray.ObjectRef]) -> list[ray.ObjectRef]:
"""Runs the Mappers and Mergers in a 2-stage pipeline until all mergers are materialized
There are `R` reducers.
There are `N` mergers. Each merger is "responsible" for `R / N` reducers.
Each Mapper then should run partitioning on the data into `N` chunks.
"""
# [N_ROUNDS, N_MERGERS, N_REDUCERS_PER_MERGER] list of outputs
merge_results: list[list[list[ray.ObjectRef]]] = []
map_results_buffer: list[ray.ObjectRef] = []

# Keep running the pipeline while there is still work to do
while materialized_inputs or map_results_buffer:
# Drain the map_results_buffer, running merge tasks
per_round_merge_results = []
while map_results_buffer:
map_results = map_results_buffer.pop()
assert len(map_results) == self._num_mergers
for merger_idx, merger_input in enumerate(map_results):
merge_results = merge_fn.options(
**self._merger_options(merger_idx), num_returns=self._num_reducers_for_merger(merger_idx)
).remote(*merger_input)
per_round_merge_results.append(merge_results)
if per_round_merge_results:
merge_results.append(per_round_merge_results)

# Run map tasks:
for i in range(self._num_mappers):
if len(materialized_inputs) == 0:
break
else:
map_input = materialized_inputs.pop(0)
map_results = map_fn.options(num_returns=self._num_mergers).remote(
map_input, self._num_mergers, self._partition_by, self._num_reducers
)
map_results_buffer.append(map_results)

# Wait for all tasks in this wave to complete
ray.wait(per_round_merge_results)
ray.wait(map_results_buffer)

# INVARIANT: At this point, the map/merge step is done
# Start running all the reduce functions
# TODO: we could stagger this by num CPUs as well, but here we just YOLO run all
reduce_results = []
for reducer_idx in range(self._num_reducers):
assigned_merger_idx, offset = self._get_reducer_inputs_location(reducer_idx)
reducer_inputs = [merge_results[round][assigned_merger_idx][offset] for round in range(len(merge_results))]
res = reduce_fn.options(**self._reduce_options(reducer_idx)).remote(*reducer_inputs)
reduce_results.append(res)
return reduce_results
6 changes: 3 additions & 3 deletions src/daft-plan/src/physical_ops/exchange_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum ExchangeOpStrategy {
/// Fully materialize the data after the Map, and then pull results from the Reduce.
FullyMaterializing { target_spec: Arc<ClusteringSpec> },
/// Stand up Reducers and then send data from the mappers into the reducers eagerly
StreamingPush { target_spec: Arc<ClusteringSpec> },
FullyMaterializingPush { target_spec: Arc<ClusteringSpec> },
}

impl ExchangeOp {
Expand All @@ -27,8 +27,8 @@ impl ExchangeOp {
res.push(" Strategy: FullyMaterializing".to_string());
res.push(format!(" Target Spec: {:?}", target_spec));
}
ExchangeOpStrategy::StreamingPush { target_spec } => {
res.push(" Strategy: StreamingPush".to_string());
ExchangeOpStrategy::FullyMaterializingPush { target_spec } => {
res.push(" Strategy: FullyMaterializingPush".to_string());
res.push(format!(" Target Spec: {:?}", target_spec));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ impl PhysicalOptimizerRule for ReorderPartitionKeys {
});
Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate()))
}
PhysicalPlan::ExchangeOp(ExchangeOp{input, strategy: ExchangeOpStrategy::StreamingPush { .. }}) => {
PhysicalPlan::ExchangeOp(ExchangeOp{input, strategy: ExchangeOpStrategy::FullyMaterializingPush { .. }}) => {
let new_plan = PhysicalPlan::ExchangeOp(ExchangeOp {
input: input.clone(),
strategy: ExchangeOpStrategy::StreamingPush { target_spec: new_spec.into() }
strategy: ExchangeOpStrategy::FullyMaterializingPush { target_spec: new_spec.into() }
});
Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate()))
}
Expand Down
6 changes: 3 additions & 3 deletions src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl PhysicalPlan {
..
})
| Self::ExchangeOp(ExchangeOp {
strategy: ExchangeOpStrategy::StreamingPush { target_spec },
strategy: ExchangeOpStrategy::FullyMaterializingPush { target_spec },
..
}) => target_spec.clone(),
}
Expand Down Expand Up @@ -569,9 +569,9 @@ impl PhysicalPlan {
..
}) => "ExchangeOp[FullyMaterializing]",
Self::ExchangeOp(ExchangeOp {
strategy: ExchangeOpStrategy::StreamingPush { .. },
strategy: ExchangeOpStrategy::FullyMaterializingPush { .. },
..
}) => "ExchangeOp[StreamingPush]",
}) => "ExchangeOp[FullyMaterializingPush]",
};
name.to_string()
}
Expand Down
2 changes: 1 addition & 1 deletion src/daft-plan/src/physical_planner/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn build_exchange_op(
}),
Ok("streaming_push") => PhysicalPlan::ExchangeOp(ExchangeOp {
input,
strategy: ExchangeOpStrategy::StreamingPush {
strategy: ExchangeOpStrategy::FullyMaterializingPush {
target_spec: Arc::new(ClusteringSpec::Hash(HashClusteringConfig::new(
num_partitions,
partition_by,
Expand Down
4 changes: 2 additions & 2 deletions src/daft-scheduler/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ fn physical_plan_to_partition_tasks(
}
PhysicalPlan::ExchangeOp(ExchangeOp {
input,
strategy: ExchangeOpStrategy::StreamingPush { target_spec },
strategy: ExchangeOpStrategy::FullyMaterializingPush { target_spec },
}) => {
let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?;
let partition_by_pyexprs: Vec<PyExpr> = target_spec
Expand All @@ -816,7 +816,7 @@ fn physical_plan_to_partition_tasks(
.collect();
let py_iter = py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "streaming_push_exchange_op"))?
.getattr(pyo3::intern!(py, "fully_materializing_push_exchange_op"))?
.call1((
upstream_iter,
partition_by_pyexprs,
Expand Down

0 comments on commit 0bc057d

Please sign in to comment.