Skip to content

Commit

Permalink
naming
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Sep 29, 2024
1 parent 0bc057d commit 37faaa0
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 11 deletions.
6 changes: 3 additions & 3 deletions src/daft-plan/src/physical_ops/exchange_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct ExchangeOp {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum ExchangeOpStrategy {
/// Fully materialize the data after the Map, and then pull results from the Reduce.
FullyMaterializing { target_spec: Arc<ClusteringSpec> },
FullyMaterializingPull { target_spec: Arc<ClusteringSpec> },
/// Stand up Reducers and then send data from the mappers into the reducers eagerly
FullyMaterializingPush { target_spec: Arc<ClusteringSpec> },
}
Expand All @@ -23,8 +23,8 @@ impl ExchangeOp {
let mut res = vec![];
res.push("ExchangeOp:".to_string());
match &self.strategy {
ExchangeOpStrategy::FullyMaterializing { target_spec } => {
res.push(" Strategy: FullyMaterializing".to_string());
ExchangeOpStrategy::FullyMaterializingPull { target_spec } => {
res.push(" Strategy: FullyMaterializingPull".to_string());
res.push(format!(" Target Spec: {:?}", target_spec));
}
ExchangeOpStrategy::FullyMaterializingPush { target_spec } => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ impl PhysicalOptimizerRule for ReorderPartitionKeys {
});
Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate()))
}
PhysicalPlan::ExchangeOp(ExchangeOp{input, strategy: ExchangeOpStrategy::FullyMaterializing { .. }}) => {
PhysicalPlan::ExchangeOp(ExchangeOp{input, strategy: ExchangeOpStrategy::FullyMaterializingPull { .. }}) => {
let new_plan = PhysicalPlan::ExchangeOp(ExchangeOp {
input: input.clone(),
strategy: ExchangeOpStrategy::FullyMaterializing { target_spec: new_spec.into() }
strategy: ExchangeOpStrategy::FullyMaterializingPull { target_spec: new_spec.into() }
});
Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate()))
}
Expand Down
4 changes: 2 additions & 2 deletions src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl PhysicalPlan {
ClusteringSpec::Unknown(UnknownClusteringConfig::new(1)).into()
}
Self::ExchangeOp(ExchangeOp {
strategy: ExchangeOpStrategy::FullyMaterializing { target_spec },
strategy: ExchangeOpStrategy::FullyMaterializingPull { target_spec },
..
})
| Self::ExchangeOp(ExchangeOp {
Expand Down Expand Up @@ -565,7 +565,7 @@ impl PhysicalPlan {
#[cfg(feature = "python")]
Self::LanceWrite(..) => "LanceWrite",
Self::ExchangeOp(ExchangeOp {
strategy: ExchangeOpStrategy::FullyMaterializing { .. },
strategy: ExchangeOpStrategy::FullyMaterializingPull { .. },
..
}) => "ExchangeOp[FullyMaterializing]",
Self::ExchangeOp(ExchangeOp {
Expand Down
6 changes: 3 additions & 3 deletions src/daft-plan/src/physical_planner/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ fn build_exchange_op(
.arced();
PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op))
}
Ok("fully_materialize") => PhysicalPlan::ExchangeOp(ExchangeOp {
Ok("pull") => PhysicalPlan::ExchangeOp(ExchangeOp {
input,
strategy: ExchangeOpStrategy::FullyMaterializing {
strategy: ExchangeOpStrategy::FullyMaterializingPull {
target_spec: Arc::new(ClusteringSpec::Hash(HashClusteringConfig::new(
num_partitions,
partition_by,
))),
},
}),
Ok("streaming_push") => PhysicalPlan::ExchangeOp(ExchangeOp {
Ok("push") => PhysicalPlan::ExchangeOp(ExchangeOp {
input,
strategy: ExchangeOpStrategy::FullyMaterializingPush {
target_spec: Arc::new(ClusteringSpec::Hash(HashClusteringConfig::new(
Expand Down
2 changes: 1 addition & 1 deletion src/daft-scheduler/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ fn physical_plan_to_partition_tasks(
),
PhysicalPlan::ExchangeOp(ExchangeOp {
input,
strategy: ExchangeOpStrategy::FullyMaterializing { target_spec },
strategy: ExchangeOpStrategy::FullyMaterializingPull { target_spec },
}) => {
let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?;
let partition_by_pyexprs: Vec<PyExpr> = target_spec
Expand Down

0 comments on commit 37faaa0

Please sign in to comment.