Skip to content

Commit

Permalink
Change limit pushdown to keep logical limit node around after pushing…
Browse files Browse the repository at this point in the history
… limit into source node
  • Loading branch information
Jay Chia committed Oct 17, 2023
1 parent bdd2128 commit f439404
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 73 deletions.
78 changes: 41 additions & 37 deletions src/daft-plan/src/optimization/rules/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use common_error::DaftResult;

use crate::{source_info::SourceInfo, LogicalPlan};
use crate::{logical_ops::Limit as LogicalLimit, source_info::SourceInfo, LogicalPlan};

use super::{ApplyOrder, OptimizerRule, Transformed};

Expand All @@ -22,44 +22,48 @@ impl OptimizerRule for PushDownLimit {
}

fn try_optimize(&self, plan: Arc<LogicalPlan>) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
let limit = match plan.as_ref() {
LogicalPlan::Limit(limit) => limit,
_ => return Ok(Transformed::No(plan)),
};
let child_plan = limit.input.as_ref();
let new_plan = match child_plan {
LogicalPlan::Repartition(_) | LogicalPlan::Coalesce(_) | LogicalPlan::Project(_) => {
// Naive commuting with unary ops.
//
// Limit-UnaryOp -> UnaryOp-Limit
let new_limit = plan.with_new_children(&[child_plan.children()[0].clone()]);
child_plan.with_new_children(&[new_limit])
}
LogicalPlan::Source(source) => {
// Push limit into source.
//
// Limit-Source -> Source[with_limit]

// Limit pushdown is only supported for external sources.
if !matches!(source.source_info.as_ref(), SourceInfo::ExternalInfo(_)) {
return Ok(Transformed::No(plan));
}
let row_limit = limit.limit as usize;
// If source already has limit and the existing limit is less than the new limit, unlink the
// Limit node from the plan and leave the Source node untouched.
if let Some(existing_source_limit) = source.limit && existing_source_limit <= row_limit {
// We directly clone the Limit child rather than creating a new Arc on child_plan to elide
// an extra Arc.
limit.input.clone()
} else {
// Push limit into Source.
let new_source: LogicalPlan = source.with_limit(Some(row_limit)).into();
new_source.into()
match plan.as_ref() {
LogicalPlan::Limit(LogicalLimit { input, limit }) => {
let limit = *limit as usize;
match input.as_ref() {
// Naive commuting with unary ops.
//
// Limit-UnaryOp -> UnaryOp-Limit
LogicalPlan::Repartition(_)
| LogicalPlan::Coalesce(_)
| LogicalPlan::Project(_) => {
let new_limit = plan.with_new_children(&[input.children()[0].clone()]);
Ok(Transformed::Yes(input.with_new_children(&[new_limit])))
}
// Push limit into source as a "local" limit.
//
// Limit-Source -> Limit-Source[with_limit]
LogicalPlan::Source(source) => {
match (source.source_info.as_ref(), source.limit) {
// Limit pushdown is not supported for in-memory sources.
#[cfg(feature = "python")]
(SourceInfo::InMemoryInfo(_), _) => Ok(Transformed::No(plan)),
// Do not pushdown if Source node is already more limited than `limit`
(SourceInfo::ExternalInfo(_), Some(existing_source_limit))
if (existing_source_limit <= limit) =>
{
Ok(Transformed::No(plan))
}
// Pushdown limit into the Source node as a "local" limit
(SourceInfo::ExternalInfo(_), _) => {
let new_source =
LogicalPlan::Source(source.with_limit(Some(limit))).into();
let limit_with_local_limited_source =
plan.with_new_children(&[new_source]);
Ok(Transformed::Yes(limit_with_local_limited_source))
}
}
}
_ => Ok(Transformed::No(plan)),
}
}
_ => return Ok(Transformed::No(plan)),
};
Ok(Transformed::Yes(new_plan))
_ => Ok(Transformed::No(plan)),
}
}
}

Expand Down
46 changes: 10 additions & 36 deletions src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use {
ExternalInfo, FileFormat, FileFormatConfig, FileInfos, InMemoryInfo,
PyFileFormatConfig, PyStorageConfig, StorageConfig,
},
PartitionSpec,
},
daft_core::python::schema::PySchema,
daft_core::schema::SchemaRef,
Expand Down Expand Up @@ -111,7 +110,6 @@ fn tabular_scan(
file_infos: &Arc<FileInfos>,
file_format_config: &Arc<FileFormatConfig>,
storage_config: &Arc<StorageConfig>,
partition_spec: &PartitionSpec,
limit: &Option<usize>,
is_ray_runner: bool,
) -> PyResult<PyObject> {
Expand Down Expand Up @@ -140,16 +138,7 @@ fn tabular_scan(
is_ray_runner,
))?;

if let Some(limit) = limit {
apply_limit(
py,
py_iter.into(),
*limit as i64,
partition_spec.num_partitions,
)
} else {
Ok(py_iter.into())
}
Ok(py_iter.into())
}

#[cfg(feature = "python")]
Expand Down Expand Up @@ -181,23 +170,6 @@ fn tabular_write(
Ok(py_iter.into())
}

#[cfg(feature = "python")]
fn apply_limit(
py: Python<'_>,
upstream_iter: PyObject,
limit: i64,
num_partitions: usize,
) -> PyResult<PyObject> {
let py_physical_plan = py.import(pyo3::intern!(py, "daft.execution.physical_plan"))?;
let local_limit_iter = py_physical_plan
.getattr(pyo3::intern!(py, "local_limit"))?
.call1((upstream_iter, limit))?;
let global_limit_iter = py_physical_plan
.getattr(pyo3::intern!(py, "global_limit"))?
.call1((local_limit_iter, limit, num_partitions))?;
Ok(global_limit_iter.into())
}

#[cfg(feature = "python")]
impl PhysicalPlan {
pub fn to_partition_tasks(
Expand Down Expand Up @@ -231,7 +203,6 @@ impl PhysicalPlan {
storage_config,
..
},
partition_spec,
limit,
..
}) => tabular_scan(
Expand All @@ -241,7 +212,6 @@ impl PhysicalPlan {
file_infos,
file_format_config,
storage_config,
partition_spec,
limit,
is_ray_runner,
),
Expand All @@ -255,7 +225,6 @@ impl PhysicalPlan {
storage_config,
..
},
partition_spec,
limit,
..
}) => tabular_scan(
Expand All @@ -265,7 +234,6 @@ impl PhysicalPlan {
file_infos,
file_format_config,
storage_config,
partition_spec,
limit,
is_ray_runner,
),
Expand All @@ -279,7 +247,6 @@ impl PhysicalPlan {
storage_config,
..
},
partition_spec,
limit,
..
}) => tabular_scan(
Expand All @@ -289,7 +256,6 @@ impl PhysicalPlan {
file_infos,
file_format_config,
storage_config,
partition_spec,
limit,
is_ray_runner,
),
Expand Down Expand Up @@ -340,7 +306,15 @@ impl PhysicalPlan {
num_partitions,
}) => {
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
apply_limit(py, upstream_iter, *limit, *num_partitions)
let py_physical_plan =
py.import(pyo3::intern!(py, "daft.execution.physical_plan"))?;
let local_limit_iter = py_physical_plan
.getattr(pyo3::intern!(py, "local_limit"))?
.call1((upstream_iter, *limit))?;
let global_limit_iter = py_physical_plan
.getattr(pyo3::intern!(py, "global_limit"))?
.call1((local_limit_iter, *limit, *num_partitions))?;
Ok(global_limit_iter.into())
}
PhysicalPlan::Explode(Explode { input, to_explode }) => {
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
Expand Down

0 comments on commit f439404

Please sign in to comment.