Skip to content

Commit

Permalink
Implement translation into Python generators
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 20, 2024
1 parent 4b20889 commit 43a00fe
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/daft-plan/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ impl RandomClusteringConfig {
Self { num_partitions }
}

pub fn num_partitions(&self) -> usize {
self.num_partitions
}

pub fn multiline_display(&self) -> Vec<String> {
vec![format!("Num partitions = {}", self.num_partitions)]
}
Expand Down
70 changes: 68 additions & 2 deletions src/daft-scheduler/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ fn physical_plan_to_partition_tasks(
py: Python,
psets: &HashMap<String, Vec<PyObject>>,
) -> PyResult<PyObject> {
use daft_plan::physical_ops::{ShuffleExchange, ShuffleExchangeStrategy};

match physical_plan {
PhysicalPlan::InMemoryScan(InMemoryScan {
in_memory_info: InMemoryInfo { cache_key, .. },
Expand Down Expand Up @@ -469,8 +471,72 @@ fn physical_plan_to_partition_tasks(
))?;
Ok(py_iter.into())
}
PhysicalPlan::ShuffleExchange(_) => {
todo!("Translate shuffle exchange to Python generator thinggies");
PhysicalPlan::ShuffleExchange(ShuffleExchange { input, strategy }) => {
let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?;
let input_num_partitions = input.clustering_spec().num_partitions();
match strategy {
ShuffleExchangeStrategy::NaiveFullyMaterializingMapReduce { target_spec } => {
let mapped = match target_spec.as_ref() {
daft_plan::ClusteringSpec::Hash(hash_clustering_config) => {
let partition_by_pyexprs: Vec<PyExpr> = hash_clustering_config
.by
.iter()
.map(|expr| PyExpr::from(expr.clone()))
.collect();
py.import_bound(pyo3::intern!(
py,
"daft.execution.rust_physical_plan_shim"
))?
.getattr(pyo3::intern!(py, "split_by_hash"))?
.call1((
upstream_iter,
hash_clustering_config.num_partitions,
partition_by_pyexprs,
))?
}
daft_plan::ClusteringSpec::Random(random_clustering_config) => py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "fanout_random"))?
.call1((upstream_iter, random_clustering_config.num_partitions()))?,
daft_plan::ClusteringSpec::Range(_) => {
unimplemented!("FanoutByRange not implemented, since only use case (sorting) doesn't need it yet.");
}
daft_plan::ClusteringSpec::Unknown(_) => {
unreachable!("Cannot use NaiveFullyMaterializingMapReduce ShuffleExchange to map to an Unknown ClusteringSpec");
}
};
let flattened = py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "flatten_plan"))?
.call1((mapped,))?;
let reduced = py
.import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))?
.getattr(pyo3::intern!(py, "reduce_merge"))?
.call1((flattened,))?;
Ok(reduced.into())
}
ShuffleExchangeStrategy::SplitOrCoalesceToTargetNum {
target_num_partitions,
} => {
if target_num_partitions >= &input_num_partitions {
let split = py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "split"))?
.call1((upstream_iter, input_num_partitions, *target_num_partitions))?;
let flattened = py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "flatten_plan"))?
.call1((split,))?;
Ok(flattened.into())
} else {
let coalesced = py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "coalesce"))?
.call1((upstream_iter, input_num_partitions, *target_num_partitions))?;
Ok(coalesced.into())
}
}
}
}
PhysicalPlan::Aggregate(Aggregate {
aggregations,
Expand Down

0 comments on commit 43a00fe

Please sign in to comment.