Skip to content

Commit

Permalink
Add another streaming push variant
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Sep 29, 2024
1 parent a9bfb4f commit cef8843
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 66 deletions.
6 changes: 6 additions & 0 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,12 @@ def fanout_random(child_plan: InProgressPhysicalPlan[PartitionT], num_partitions
seed += 1


def streaming_push_exchange_op(
child_plan: InProgressPhysicalPlan[PartitionT], partition_by: list[PyExpr], num_partitions: int
) -> InProgressPhysicalPlan[PartitionT]:
raise NotImplementedError("TODO: jay")

Check warning on line 1621 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L1621

Added line #L1621 was not covered by tests


def fully_materializing_exchange_op(
child_plan: InProgressPhysicalPlan[PartitionT], partition_by: list[PyExpr], num_partitions: int
) -> InProgressPhysicalPlan[PartitionT]:
Expand Down
7 changes: 7 additions & 0 deletions src/daft-plan/src/physical_ops/exchange_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ pub struct ExchangeOp {

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]

Check warning on line 13 in src/daft-plan/src/physical_ops/exchange_op.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_ops/exchange_op.rs#L13

Added line #L13 was not covered by tests
pub enum ExchangeOpStrategy {
/// Fully materialize the data after the Map, and then pull results from the Reduce.
FullyMaterializing { target_spec: Arc<ClusteringSpec> },
/// Stand up Reducers and then send data from the mappers into the reducers eagerly
StreamingPush { target_spec: Arc<ClusteringSpec> },
}

impl ExchangeOp {
Expand All @@ -24,6 +27,10 @@ impl ExchangeOp {
res.push(" Strategy: FullyMaterializing".to_string());
res.push(format!(" Target Spec: {:?}", target_spec));
}
ExchangeOpStrategy::StreamingPush { target_spec } => {
res.push(" Strategy: StreamingPush".to_string());
res.push(format!(" Target Spec: {:?}", target_spec));
}

Check warning on line 33 in src/daft-plan/src/physical_ops/exchange_op.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_ops/exchange_op.rs#L22-L33

Added lines #L22 - L33 were not covered by tests
}
res
}

Check warning on line 36 in src/daft-plan/src/physical_ops/exchange_op.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_ops/exchange_op.rs#L35-L36

Added lines #L35 - L36 were not covered by tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ impl PhysicalOptimizerRule for ReorderPartitionKeys {
});
Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate()))

Check warning on line 141 in src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs#L136-L141

Added lines #L136 - L141 were not covered by tests
}
PhysicalPlan::ExchangeOp(ExchangeOp{input, strategy: ExchangeOpStrategy::StreamingPush { .. }}) => {
let new_plan = PhysicalPlan::ExchangeOp(ExchangeOp {
input: input.clone(),
strategy: ExchangeOpStrategy::StreamingPush { target_spec: new_spec.into() }
});
Ok(Transformed::yes(c.with_plan(new_plan.into()).propagate()))

Check warning on line 148 in src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_optimization/rules/reorder_partition_keys.rs#L143-L148

Added lines #L143 - L148 were not covered by tests
}

// these depend solely on their input
PhysicalPlan::Filter(..) |
Expand Down
10 changes: 9 additions & 1 deletion src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ impl PhysicalPlan {
Self::ExchangeOp(ExchangeOp {
strategy: ExchangeOpStrategy::FullyMaterializing { target_spec },

Check warning on line 245 in src/daft-plan/src/physical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_plan.rs#L245

Added line #L245 was not covered by tests
..
})
| Self::ExchangeOp(ExchangeOp {
strategy: ExchangeOpStrategy::StreamingPush { target_spec },

Check warning on line 249 in src/daft-plan/src/physical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_plan.rs#L249

Added line #L249 was not covered by tests
..
}) => target_spec.clone(),

Check warning on line 251 in src/daft-plan/src/physical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_plan.rs#L251

Added line #L251 was not covered by tests
}
}
Expand Down Expand Up @@ -500,7 +504,7 @@ impl PhysicalPlan {
Self::DeltaLakeWrite(DeltaLakeWrite {schema, delta_lake_info, .. }) => Self::DeltaLakeWrite(DeltaLakeWrite::new(schema.clone(), delta_lake_info.clone(), input.clone())),
#[cfg(feature = "python")]
Self::LanceWrite(LanceWrite { schema, lance_info, .. }) => Self::LanceWrite(LanceWrite::new(schema.clone(), lance_info.clone(), input.clone())),
Self::ExchangeOp(ExchangeOp{strategy: ExchangeOpStrategy::FullyMaterializing{target_spec, .. }, ..}) => Self::ExchangeOp(ExchangeOp{ input: input.clone(), strategy: ExchangeOpStrategy::FullyMaterializing { target_spec: target_spec.clone() }}),
Self::ExchangeOp(ExchangeOp{strategy, ..}) => Self::ExchangeOp(ExchangeOp{ input: input.clone(), strategy: strategy.clone()}),

Check warning on line 507 in src/daft-plan/src/physical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_plan.rs#L507

Added line #L507 was not covered by tests
Self::Concat(_) | Self::HashJoin(_) | Self::SortMergeJoin(_) | Self::BroadcastJoin(_) => panic!("{} requires more than 1 input, but received: {}", self, children.len()),
},
[input1, input2] => match self {
Expand Down Expand Up @@ -564,6 +568,10 @@ impl PhysicalPlan {
strategy: ExchangeOpStrategy::FullyMaterializing { .. },
..
}) => "ExchangeOp[FullyMaterializing]",

Check warning on line 570 in src/daft-plan/src/physical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_plan.rs#L570

Added line #L570 was not covered by tests
Self::ExchangeOp(ExchangeOp {
strategy: ExchangeOpStrategy::StreamingPush { .. },
..
}) => "ExchangeOp[StreamingPush]",

Check warning on line 574 in src/daft-plan/src/physical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_plan.rs#L574

Added line #L574 was not covered by tests
};
name.to_string()
}
Expand Down
123 changes: 58 additions & 65 deletions src/daft-plan/src/physical_planner/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
use common_file_formats::FileFormat;
use daft_core::prelude::*;
use daft_dsl::{col, is_partition_compatible, ApproxPercentileParams, ExprRef, SketchType};
use daft_dsl::{col, is_partition_compatible, ApproxPercentileParams, Expr, ExprRef, SketchType};
use daft_scan::PhysicalScanInfo;

use crate::{
Expand All @@ -30,6 +30,42 @@ use crate::{
source_info::{PlaceHolderInfo, SourceInfo},
};

/// Builds an exchange op (PhysicalPlan node(s) that shuffles data)
fn build_exchange_op(
input: PhysicalPlanRef,
num_partitions: usize,
partition_by: Vec<Arc<Expr>>,
) -> PhysicalPlan {
let exchange_op = std::env::var("DAFT_EXCHANGE_OP");
match exchange_op.as_deref() {
Err(_) => {
let split_op =
PhysicalPlan::FanoutByHash(FanoutByHash::new(input, num_partitions, partition_by))
.arced();
PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op))
}
Ok("fully_materialize") => PhysicalPlan::ExchangeOp(ExchangeOp {
input,
strategy: ExchangeOpStrategy::FullyMaterializing {
target_spec: Arc::new(ClusteringSpec::Hash(HashClusteringConfig::new(
num_partitions,
partition_by,
))),
},
}),
Ok("streaming_push") => PhysicalPlan::ExchangeOp(ExchangeOp {
input,
strategy: ExchangeOpStrategy::StreamingPush {
target_spec: Arc::new(ClusteringSpec::Hash(HashClusteringConfig::new(
num_partitions,
partition_by,
))),
},
}),
Ok(exo) => panic!("Unsupported DAFT_EXCHANGE_OP={exo}"),

Check warning on line 65 in src/daft-plan/src/physical_planner/translate.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_planner/translate.rs#L47-L65

Added lines #L47 - L65 were not covered by tests
}
}

pub(super) fn translate_single_logical_node(
logical_plan: &LogicalPlan,
physical_children: &mut Vec<PhysicalPlanRef>,
Expand Down Expand Up @@ -206,29 +242,15 @@ pub(super) fn translate_single_logical_node(
}
}
ClusteringSpec::Random(_) => {
// TODO: Support Random clustering spec for ExchangeOps
let split_op = PhysicalPlan::FanoutRandom(FanoutRandom::new(
input_physical,
num_partitions,
));
PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()))
}
ClusteringSpec::Hash(HashClusteringConfig { by, .. }) => {
// YOLOSWAG: use the new exchange op instead
// let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new(
// input_physical,
// num_partitions,
// by.clone(),
// ));
// PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()))
PhysicalPlan::ExchangeOp(ExchangeOp {
input: input_physical,
strategy: ExchangeOpStrategy::FullyMaterializing {
target_spec: Arc::new(ClusteringSpec::Hash(HashClusteringConfig::new(
num_partitions,
by.clone(),
))),
},
})
build_exchange_op(input_physical, num_partitions, by.clone())
}
ClusteringSpec::Range(_) => {
unreachable!("Repartitioning by range is not supported")
Expand All @@ -248,14 +270,10 @@ pub(super) fn translate_single_logical_node(
PhysicalPlan::Aggregate(Aggregate::new(input_physical, vec![], col_exprs.clone()));
let num_partitions = agg_op.clustering_spec().num_partitions();
if num_partitions > 1 {
let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new(
agg_op.into(),
num_partitions,
col_exprs.clone(),
));
let reduce_op = PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()));
let exchange_op =
build_exchange_op(agg_op.into(), num_partitions, col_exprs.clone());
Ok(
PhysicalPlan::Aggregate(Aggregate::new(reduce_op.into(), vec![], col_exprs))
PhysicalPlan::Aggregate(Aggregate::new(exchange_op.into(), vec![], col_exprs))
.arced(),
)
} else {
Expand Down Expand Up @@ -319,31 +337,15 @@ pub(super) fn translate_single_logical_node(
))
.arced()
} else {
// let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new(
// first_stage_agg,
// min(
// num_input_partitions,
// cfg.shuffle_aggregation_default_partitions,
// ),
// groupby.clone(),
// ))
// .arced();
// PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op)).arced()
PhysicalPlan::ExchangeOp(ExchangeOp {
input: first_stage_agg,
strategy: ExchangeOpStrategy::FullyMaterializing {
target_spec: Arc::new(ClusteringSpec::Hash(
HashClusteringConfig::new(
min(
num_input_partitions,
cfg.shuffle_aggregation_default_partitions,
),
groupby.clone(),
),
)),
},
})
.into()
build_exchange_op(
first_stage_agg,
min(
num_input_partitions,
cfg.shuffle_aggregation_default_partitions,
),
groupby.clone(),
)
.arced()
};

let second_stage_agg = PhysicalPlan::Aggregate(Aggregate::new(
Expand Down Expand Up @@ -403,7 +405,7 @@ pub(super) fn translate_single_logical_node(
))
.arced()
} else {
let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new(
build_exchange_op(
first_stage_agg,
min(
num_input_partitions,
Expand All @@ -412,9 +414,8 @@ pub(super) fn translate_single_logical_node(
// NOTE: For the shuffle of a pivot operation, we don't include the pivot column for the hashing as we need
// to ensure that all rows with the same group_by column values are hashed to the same partition.
group_by.clone(),
))
.arced();
PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op)).arced()
)
.arced()
};

let second_stage_agg = PhysicalPlan::Aggregate(Aggregate::new(
Expand Down Expand Up @@ -666,24 +667,16 @@ pub(super) fn translate_single_logical_node(
if num_left_partitions != num_partitions
|| (num_partitions > 1 && !is_left_hash_partitioned)
{
let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new(
left_physical,
num_partitions,
left_on.clone(),
));
left_physical =
PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())).arced();
build_exchange_op(left_physical, num_partitions, left_on.clone())
.arced();
}
if num_right_partitions != num_partitions
|| (num_partitions > 1 && !is_right_hash_partitioned)
{
let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new(
right_physical,
num_partitions,
right_on.clone(),
));
right_physical =
PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())).arced();
build_exchange_op(right_physical, num_partitions, right_on.clone())
.arced();
}
Ok(PhysicalPlan::HashJoin(HashJoin::new(
left_physical,
Expand Down
20 changes: 20 additions & 0 deletions src/daft-scheduler/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,5 +804,25 @@ fn physical_plan_to_partition_tasks(
))?;
Ok(py_iter.into())

Check warning on line 805 in src/daft-scheduler/src/scheduler.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-scheduler/src/scheduler.rs#L791-L805

Added lines #L791 - L805 were not covered by tests
}
PhysicalPlan::ExchangeOp(ExchangeOp {
input,
strategy: ExchangeOpStrategy::StreamingPush { target_spec },

Check warning on line 809 in src/daft-scheduler/src/scheduler.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-scheduler/src/scheduler.rs#L808-L809

Added lines #L808 - L809 were not covered by tests
}) => {
let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?;
let partition_by_pyexprs: Vec<PyExpr> = target_spec
.partition_by()
.iter()
.map(|expr| PyExpr::from(expr.clone()))
.collect();
let py_iter = py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "streaming_push_exchange_op"))?
.call1((
upstream_iter,
partition_by_pyexprs,
target_spec.num_partitions(),
))?;
Ok(py_iter.into())

Check warning on line 825 in src/daft-scheduler/src/scheduler.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-scheduler/src/scheduler.rs#L811-L825

Added lines #L811 - L825 were not covered by tests
}
}
}

0 comments on commit cef8843

Please sign in to comment.