diff --git a/src/daft-plan/src/physical_ops/exchange_op.rs b/src/daft-plan/src/physical_ops/exchange_op.rs index 5cb684793f..1ea5372484 100644 --- a/src/daft-plan/src/physical_ops/exchange_op.rs +++ b/src/daft-plan/src/physical_ops/exchange_op.rs @@ -13,7 +13,7 @@ pub struct ExchangeOp { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum ExchangeOpStrategy { /// Fully materialize the data after the Map, and then pull results from the Reduce. - FullyMaterializing { target_spec: Arc }, + FullyMaterializingPull { target_spec: Arc }, /// Stand up Reducers and then send data from the mappers into the reducers eagerly FullyMaterializingPush { target_spec: Arc }, } @@ -23,8 +23,8 @@ impl ExchangeOp { let mut res = vec![]; res.push("ExchangeOp:".to_string()); match &self.strategy { - ExchangeOpStrategy::FullyMaterializing { target_spec } => { - res.push(" Strategy: FullyMaterializing".to_string()); + ExchangeOpStrategy::FullyMaterializingPull { target_spec } => { + res.push(" Strategy: FullyMaterializingPull".to_string()); res.push(format!(" Target Spec: {:?}", target_spec)); } ExchangeOpStrategy::FullyMaterializingPush { target_spec } => { diff --git a/src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs b/src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs index bf883293d7..01fd1e5f19 100644 --- a/src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs +++ b/src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs @@ -133,10 +133,10 @@ impl PhysicalOptimizerRule for ReorderPartitionKeys { }); Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate())) } - PhysicalPlan::ExchangeOp(ExchangeOp{input, strategy: ExchangeOpStrategy::FullyMaterializing { .. }}) => { + PhysicalPlan::ExchangeOp(ExchangeOp{input, strategy: ExchangeOpStrategy::FullyMaterializingPull { .. }}) => { let new_plan = PhysicalPlan::ExchangeOp(ExchangeOp { input: input.clone(), - strategy: ExchangeOpStrategy::FullyMaterializing { target_spec: new_spec.into() } + strategy: ExchangeOpStrategy::FullyMaterializingPull { target_spec: new_spec.into() } }); Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate())) } diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index df4d6057b3..813cccf61e 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -242,7 +242,7 @@ impl PhysicalPlan { ClusteringSpec::Unknown(UnknownClusteringConfig::new(1)).into() } Self::ExchangeOp(ExchangeOp { - strategy: ExchangeOpStrategy::FullyMaterializing { target_spec }, + strategy: ExchangeOpStrategy::FullyMaterializingPull { target_spec }, .. }) | Self::ExchangeOp(ExchangeOp { @@ -565,7 +565,7 @@ impl PhysicalPlan { #[cfg(feature = "python")] Self::LanceWrite(..) => "LanceWrite", Self::ExchangeOp(ExchangeOp { - strategy: ExchangeOpStrategy::FullyMaterializing { .. }, + strategy: ExchangeOpStrategy::FullyMaterializingPull { .. }, .. }) => "ExchangeOp[FullyMaterializing]", Self::ExchangeOp(ExchangeOp { diff --git a/src/daft-plan/src/physical_planner/translate.rs b/src/daft-plan/src/physical_planner/translate.rs index bca686e1ca..52197c5c2b 100644 --- a/src/daft-plan/src/physical_planner/translate.rs +++ b/src/daft-plan/src/physical_planner/translate.rs @@ -44,16 +44,16 @@ fn build_exchange_op( .arced(); PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op)) } - Ok("fully_materialize") => PhysicalPlan::ExchangeOp(ExchangeOp { + Ok("pull") => PhysicalPlan::ExchangeOp(ExchangeOp { input, - strategy: ExchangeOpStrategy::FullyMaterializing { + strategy: ExchangeOpStrategy::FullyMaterializingPull { target_spec: Arc::new(ClusteringSpec::Hash(HashClusteringConfig::new( num_partitions, partition_by, ))), }, }), - Ok("streaming_push") => PhysicalPlan::ExchangeOp(ExchangeOp { + Ok("push") => PhysicalPlan::ExchangeOp(ExchangeOp { input, strategy: ExchangeOpStrategy::FullyMaterializingPush { target_spec: Arc::new(ClusteringSpec::Hash(HashClusteringConfig::new( diff --git a/src/daft-scheduler/src/scheduler.rs b/src/daft-scheduler/src/scheduler.rs index a563525c1b..04fa87c2d3 100644 --- a/src/daft-scheduler/src/scheduler.rs +++ b/src/daft-scheduler/src/scheduler.rs @@ -786,7 +786,7 @@ fn physical_plan_to_partition_tasks( ), PhysicalPlan::ExchangeOp(ExchangeOp { input, - strategy: ExchangeOpStrategy::FullyMaterializing { target_spec }, + strategy: ExchangeOpStrategy::FullyMaterializingPull { target_spec }, }) => { let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; let partition_by_pyexprs: Vec = target_spec