diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index 9bcce71408..fc4a5d889f 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -1615,6 +1615,12 @@ def fanout_random(child_plan: InProgressPhysicalPlan[PartitionT], num_partitions seed += 1 +def streaming_push_exchange_op( + child_plan: InProgressPhysicalPlan[PartitionT], partition_by: list[PyExpr], num_partitions: int +) -> InProgressPhysicalPlan[PartitionT]: + raise NotImplementedError("TODO: jay") + + def fully_materializing_exchange_op( child_plan: InProgressPhysicalPlan[PartitionT], partition_by: list[PyExpr], num_partitions: int ) -> InProgressPhysicalPlan[PartitionT]: diff --git a/src/daft-plan/src/physical_ops/exchange_op.rs b/src/daft-plan/src/physical_ops/exchange_op.rs index 412b117846..301f0368c6 100644 --- a/src/daft-plan/src/physical_ops/exchange_op.rs +++ b/src/daft-plan/src/physical_ops/exchange_op.rs @@ -12,7 +12,10 @@ pub struct ExchangeOp { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum ExchangeOpStrategy { + /// Fully materialize the data after the Map, and then pull results from the Reduce. FullyMaterializing { target_spec: Arc }, + /// Stand up Reducers and then send data from the mappers into the reducers eagerly + StreamingPush { target_spec: Arc }, } impl ExchangeOp { @@ -24,6 +27,10 @@ impl ExchangeOp { res.push(" Strategy: FullyMaterializing".to_string()); res.push(format!(" Target Spec: {:?}", target_spec)); } + ExchangeOpStrategy::StreamingPush { target_spec } => { + res.push(" Strategy: StreamingPush".to_string()); + res.push(format!(" Target Spec: {:?}", target_spec)); + } } res } diff --git a/src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs b/src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs index 5c32105e72..4ae21d441f 100644 --- a/src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs +++ b/src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs @@ -140,6 +140,13 @@ impl PhysicalOptimizerRule for ReorderPartitionKeys { }); Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate())) } + PhysicalPlan::ExchangeOp(ExchangeOp{input, strategy: ExchangeOpStrategy::StreamingPush { .. }}) => { + let new_plan = PhysicalPlan::ExchangeOp(ExchangeOp { + input: input.clone(), + strategy: ExchangeOpStrategy::StreamingPush { target_spec: new_spec.into() } + }); + Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate())) + } // these depend solely on their input PhysicalPlan::Filter(..) | diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index cbda2de5fa..e5fdb1ad28 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -244,6 +244,10 @@ impl PhysicalPlan { Self::ExchangeOp(ExchangeOp { strategy: ExchangeOpStrategy::FullyMaterializing { target_spec }, .. + }) + | Self::ExchangeOp(ExchangeOp { + strategy: ExchangeOpStrategy::StreamingPush { target_spec }, + .. }) => target_spec.clone(), } } @@ -500,7 +504,7 @@ impl PhysicalPlan { Self::DeltaLakeWrite(DeltaLakeWrite {schema, delta_lake_info, .. }) => Self::DeltaLakeWrite(DeltaLakeWrite::new(schema.clone(), delta_lake_info.clone(), input.clone())), #[cfg(feature = "python")] Self::LanceWrite(LanceWrite { schema, lance_info, .. }) => Self::LanceWrite(LanceWrite::new(schema.clone(), lance_info.clone(), input.clone())), - Self::ExchangeOp(ExchangeOp{strategy: ExchangeOpStrategy::FullyMaterializing{target_spec, .. }, ..}) => Self::ExchangeOp(ExchangeOp{ input: input.clone(), strategy: ExchangeOpStrategy::FullyMaterializing { target_spec: target_spec.clone() }}), + Self::ExchangeOp(ExchangeOp{strategy, ..}) => Self::ExchangeOp(ExchangeOp{ input: input.clone(), strategy: strategy.clone()}), Self::Concat(_) | Self::HashJoin(_) | Self::SortMergeJoin(_) | Self::BroadcastJoin(_) => panic!("{} requires more than 1 input, but received: {}", self, children.len()), }, [input1, input2] => match self { @@ -564,6 +568,10 @@ impl PhysicalPlan { strategy: ExchangeOpStrategy::FullyMaterializing { .. }, .. }) => "ExchangeOp[FullyMaterializing]", + Self::ExchangeOp(ExchangeOp { + strategy: ExchangeOpStrategy::StreamingPush { .. }, + .. + }) => "ExchangeOp[StreamingPush]", }; name.to_string() } diff --git a/src/daft-plan/src/physical_planner/translate.rs b/src/daft-plan/src/physical_planner/translate.rs index 4f8f250c82..c5629e1c85 100644 --- a/src/daft-plan/src/physical_planner/translate.rs +++ b/src/daft-plan/src/physical_planner/translate.rs @@ -8,7 +8,7 @@ use common_daft_config::DaftExecutionConfig; use common_error::DaftResult; use common_file_formats::FileFormat; use daft_core::prelude::*; -use daft_dsl::{col, is_partition_compatible, ApproxPercentileParams, ExprRef, SketchType}; +use daft_dsl::{col, is_partition_compatible, ApproxPercentileParams, Expr, ExprRef, SketchType}; use daft_scan::PhysicalScanInfo; use crate::{ @@ -30,6 +30,42 @@ use crate::{ source_info::{PlaceHolderInfo, SourceInfo}, }; +/// Builds an exchange op (PhysicalPlan node(s) that shuffles data) +fn build_exchange_op( + input: PhysicalPlanRef, + num_partitions: usize, + partition_by: Vec>, +) -> PhysicalPlan { + let exchange_op = std::env::var("DAFT_EXCHANGE_OP"); + match exchange_op.as_deref() { + Err(_) => { + let split_op = + PhysicalPlan::FanoutByHash(FanoutByHash::new(input, num_partitions, partition_by)) + .arced(); + PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op)) + } + Ok("fully_materialize") => PhysicalPlan::ExchangeOp(ExchangeOp { + input, + strategy: ExchangeOpStrategy::FullyMaterializing { + target_spec: Arc::new(ClusteringSpec::Hash(HashClusteringConfig::new( + num_partitions, + partition_by, + ))), + }, + }), + Ok("streaming_push") => PhysicalPlan::ExchangeOp(ExchangeOp { + input, + strategy: ExchangeOpStrategy::StreamingPush { + target_spec: Arc::new(ClusteringSpec::Hash(HashClusteringConfig::new( + num_partitions, + partition_by, + ))), + }, + }), + Ok(exo) => panic!("Unsupported DAFT_EXCHANGE_OP={exo}"), + } +} + pub(super) fn translate_single_logical_node( logical_plan: &LogicalPlan, physical_children: &mut Vec, @@ -206,6 +242,7 @@ pub(super) fn translate_single_logical_node( } } ClusteringSpec::Random(_) => { + // TODO: Support Random clustering spec for ExchangeOps let split_op = PhysicalPlan::FanoutRandom(FanoutRandom::new( input_physical, num_partitions, @@ -213,22 +250,7 @@ pub(super) fn translate_single_logical_node( PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())) } ClusteringSpec::Hash(HashClusteringConfig { by, .. }) => { - // YOLOSWAG: use the new exchange op instead - // let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( - // input_physical, - // num_partitions, - // by.clone(), - // )); - // PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())) - PhysicalPlan::ExchangeOp(ExchangeOp { - input: input_physical, - strategy: ExchangeOpStrategy::FullyMaterializing { - target_spec: Arc::new(ClusteringSpec::Hash(HashClusteringConfig::new( - num_partitions, - by.clone(), - ))), - }, - }) + build_exchange_op(input_physical, num_partitions, by.clone()) } ClusteringSpec::Range(_) => { unreachable!("Repartitioning by range is not supported") @@ -248,14 +270,10 @@ pub(super) fn translate_single_logical_node( PhysicalPlan::Aggregate(Aggregate::new(input_physical, vec![], col_exprs.clone())); let num_partitions = agg_op.clustering_spec().num_partitions(); if num_partitions > 1 { - let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( - agg_op.into(), - num_partitions, - col_exprs.clone(), - )); - let reduce_op = PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())); + let exchange_op = + build_exchange_op(agg_op.into(), num_partitions, col_exprs.clone()); Ok( - PhysicalPlan::Aggregate(Aggregate::new(reduce_op.into(), vec![], col_exprs)) + PhysicalPlan::Aggregate(Aggregate::new(exchange_op.into(), vec![], col_exprs)) .arced(), ) } else { @@ -319,31 +337,15 @@ pub(super) fn translate_single_logical_node( )) .arced() } else { - // let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( - // first_stage_agg, - // min( - // num_input_partitions, - // cfg.shuffle_aggregation_default_partitions, - // ), - // groupby.clone(), - // )) - // .arced(); - // PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op)).arced() - PhysicalPlan::ExchangeOp(ExchangeOp { - input: first_stage_agg, - strategy: ExchangeOpStrategy::FullyMaterializing { - target_spec: Arc::new(ClusteringSpec::Hash( - HashClusteringConfig::new( - min( - num_input_partitions, - cfg.shuffle_aggregation_default_partitions, - ), - groupby.clone(), - ), - )), - }, - }) - .into() + build_exchange_op( + first_stage_agg, + min( + num_input_partitions, + cfg.shuffle_aggregation_default_partitions, + ), + groupby.clone(), + ) + .arced() }; let second_stage_agg = PhysicalPlan::Aggregate(Aggregate::new( @@ -403,7 +405,7 @@ pub(super) fn translate_single_logical_node( )) .arced() } else { - let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( + build_exchange_op( first_stage_agg, min( num_input_partitions, @@ -412,9 +414,8 @@ pub(super) fn translate_single_logical_node( // NOTE: For the shuffle of a pivot operation, we don't include the pivot column for the hashing as we need // to ensure that all rows with the same group_by column values are hashed to the same partition. group_by.clone(), - )) - .arced(); - PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op)).arced() + ) + .arced() }; let second_stage_agg = PhysicalPlan::Aggregate(Aggregate::new( @@ -666,24 +667,16 @@ pub(super) fn translate_single_logical_node( if num_left_partitions != num_partitions || (num_partitions > 1 && !is_left_hash_partitioned) { - let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( - left_physical, - num_partitions, - left_on.clone(), - )); left_physical = - PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())).arced(); + build_exchange_op(left_physical, num_partitions, left_on.clone()) + .arced(); } if num_right_partitions != num_partitions || (num_partitions > 1 && !is_right_hash_partitioned) { - let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( - right_physical, - num_partitions, - right_on.clone(), - )); right_physical = - PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())).arced(); + build_exchange_op(right_physical, num_partitions, right_on.clone()) + .arced(); } Ok(PhysicalPlan::HashJoin(HashJoin::new( left_physical, diff --git a/src/daft-scheduler/src/scheduler.rs b/src/daft-scheduler/src/scheduler.rs index b7811121ba..6446c5f7e6 100644 --- a/src/daft-scheduler/src/scheduler.rs +++ b/src/daft-scheduler/src/scheduler.rs @@ -804,5 +804,25 @@ fn physical_plan_to_partition_tasks( ))?; Ok(py_iter.into()) } + PhysicalPlan::ExchangeOp(ExchangeOp { + input, + strategy: ExchangeOpStrategy::StreamingPush { target_spec }, + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let partition_by_pyexprs: Vec = target_spec + .partition_by() + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let py_iter = py + .import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "streaming_push_exchange_op"))? + .call1(( + upstream_iter, + partition_by_pyexprs, + target_spec.num_partitions(), + ))?; + Ok(py_iter.into()) + } } }