Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Predicate Pushdown into Scan Operator #1730

Merged
merged 13 commits into from
Dec 16, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/daft-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ bincode = {workspace = true}
common-daft-config = {path = "../common/daft-config", default-features = false}
common-error = {path = "../common/error", default-features = false}
common-io-config = {path = "../common/io-config", default-features = false}
common-treenode = {path = "../common/treenode", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
Expand Down
9 changes: 6 additions & 3 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,18 @@ impl LogicalPlanBuilder {
Ok(logical_plan.into())
}

pub fn table_scan_with_scan_operator(scan_operator: ScanOperatorRef) -> DaftResult<Self> {
pub fn table_scan_with_scan_operator(
scan_operator: ScanOperatorRef,
pushdowns: Option<Pushdowns>,
) -> DaftResult<Self> {
let schema = scan_operator.0.schema();
let partitioning_keys = scan_operator.0.partitioning_keys();
let source_info =
SourceInfo::ExternalInfo(ExternalSourceInfo::Scan(ScanExternalInfo::new(
scan_operator.clone(),
schema.clone(),
partitioning_keys.into(),
Default::default(),
pushdowns.unwrap_or_default(),
)));
let logical_plan: LogicalPlan =
logical_ops::Source::new(schema.clone(), source_info.into()).into();
Expand Down Expand Up @@ -301,7 +304,7 @@ impl PyLogicalPlanBuilder {

#[staticmethod]
pub fn table_scan_with_scan_operator(scan_operator: ScanOperatorHandle) -> PyResult<Self> {
Ok(LogicalPlanBuilder::table_scan_with_scan_operator(scan_operator.into())?.into())
Ok(LogicalPlanBuilder::table_scan_with_scan_operator(scan_operator.into(), None)?.into())
}

#[staticmethod]
Expand Down
3 changes: 1 addition & 2 deletions src/daft-plan/src/logical_ops/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ impl Source {
partitioning_keys,
pushdowns,
})) => {
res.push("Source:".to_string());
res.push(format!("Scan op = {}", scan_op));
res.push(format!("Source: Operator = {}", scan_op));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this more readable when visualizing the plan? In the past, we've tried to keep the first line of the logical op repr super concise, essentially just a name for the logical op, but it looks like the scan operator repr can be pretty long:

write!(f, "AnonymousScanOperator: File paths=[{}], Format-specific config = {:?}, Storage config = {:?}", self.files.join(", "), self.file_format_config, self.storage_config)

IMO each string in this returned vec should be pretty atomic/granular, and it should be up to the display mode (e.g. tree plan visualization vs. single-line summary) to condense it as desired.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does make it more readable imo! But the main reason to do this was to make it much closer to the Legacy Source repr to be able to write the repr based tests much easier. Both should be able about the same length

res.push(format!("File schema = {}", source_schema.short_string()));
res.push(format!("Partitioning keys = {:?}", partitioning_keys));
res.extend(pushdowns.multiline_display());
Expand Down
113 changes: 111 additions & 2 deletions src/daft-plan/src/optimization/rules/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use std::{
use common_error::DaftResult;
use daft_dsl::{
col,
functions::FunctionExpr,
optimization::{get_required_columns, replace_columns_with_expressions},
Expr,
};

use crate::{
logical_ops::{Concat, Filter, Project},
logical_ops::{Concat, Filter, Project, Source},
source_info::{ExternalInfo, SourceInfo},
LogicalPlan,
};

Expand Down Expand Up @@ -70,6 +72,62 @@ impl OptimizerRule for PushDownFilter {
.unwrap()
.clone()
}
LogicalPlan::Source(source) => {
match source.source_info.as_ref() {
// Filter pushdown is not supported for in-memory sources.
#[cfg(feature = "python")]
SourceInfo::InMemoryInfo(_) => return Ok(Transformed::No(plan)),
// Do not pushdown if Source node is already has a limit
SourceInfo::ExternalInfo(external_info)
if let Some(existing_limit) =
external_info.pushdowns().limit =>
{
return Ok(Transformed::No(plan))
}
// Do not pushdown if we are using python legacy scan info
SourceInfo::ExternalInfo(external_info)
if let ExternalInfo::Legacy(..) = external_info =>
{
return Ok(Transformed::No(plan))
}

// Pushdown filter into the Source node
SourceInfo::ExternalInfo(external_info) => {
let predicate = &filter.predicate;
use common_treenode::{TreeNode, VisitRecursion};

let mut has_udf = false;
predicate.apply(&mut |e: &Expr| {

match e {
#[cfg(feature = "python")]
Expr::Function{func: FunctionExpr::Python(..), .. } => {
has_udf = true;
Ok(VisitRecursion::Stop)
},
Expr::Function{func: FunctionExpr::Uri(..), .. } => {
has_udf = true;
Ok(VisitRecursion::Stop)
},
_ => Ok(VisitRecursion::Continue)
}
})?;
Comment on lines +100 to +114
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So nice!

if has_udf {
return Ok(Transformed::No(plan));
}
let new_predicate = external_info.pushdowns().filters.as_ref().map(|f| predicate.and(f)).unwrap_or(predicate.clone());
let new_pushdowns =
external_info.pushdowns().with_filters(Some(Arc::new(new_predicate)));
let new_external_info = external_info.with_pushdowns(new_pushdowns);
let new_source = LogicalPlan::Source(Source::new(
source.output_schema.clone(),
SourceInfo::ExternalInfo(new_external_info).into(),
))
.into();
return Ok(Transformed::Yes(new_source))
}
}
}
LogicalPlan::Project(child_project) => {
// Commute filter with projection if predicate only depends on projection columns that
// don't involve compute.
Expand Down Expand Up @@ -221,7 +279,7 @@ mod tests {
rules::PushDownFilter,
Optimizer,
},
test::dummy_scan_node,
test::{dummy_scan_node, dummy_scan_operator_node},
JoinType, LogicalPlan, PartitionScheme,
};

Expand Down Expand Up @@ -266,6 +324,57 @@ mod tests {
Ok(())
}

/// Tests combining of two Filters into a ScanOperator
#[test]
fn pushdown_filter_into_scan_operator() -> DaftResult<()> {
let plan = dummy_scan_operator_node(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
])
.filter(col("a").lt(&lit(2)))?
.filter(col("b").eq(&lit("foo")))?
.build();
let expected = "\
Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Filter pushdown = [col(b) == lit(\"foo\")] & [col(a) < lit(2)], Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Tests that we cant pushdown a filter into a ScanOperator with a limit
#[test]
fn pushdown_filter_into_scan_operator_with_limit() -> DaftResult<()> {
let plan = dummy_scan_operator_node(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
])
.limit(1, false)?
.filter(col("a").lt(&lit(2)))?
.build();
let expected = "\
Filter: col(a) < lit(2)\
\n Limit: 1\
\n Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Tests that we cant pushdown a filter into a ScanOperator with an udf-ish expression
#[test]
fn pushdown_filter_into_scan_operator_with_udf() -> DaftResult<()> {
let pred = daft_dsl::functions::uri::download(&col("a"), 1, true, true, None);
let plan = dummy_scan_operator_node(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
])
.filter(pred.is_null())?
.build();
let expected = "\
Filter: is_null(download(col(a)))\
\n Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Tests that Filter commutes with Projections.
#[test]
fn filter_commutes_with_projection() -> DaftResult<()> {
Expand Down
100 changes: 98 additions & 2 deletions src/daft-plan/src/optimization/rules/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ impl OptimizerRule for PushDownLimit {

fn try_optimize(&self, plan: Arc<LogicalPlan>) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
match plan.as_ref() {
LogicalPlan::Limit(LogicalLimit { input, limit, .. }) => {
LogicalPlan::Limit(LogicalLimit {
input,
limit,
eager,
}) => {
let limit = *limit as usize;
match input.as_ref() {
// Naive commuting with unary ops.
Expand Down Expand Up @@ -74,6 +78,30 @@ impl OptimizerRule for PushDownLimit {
}
}
}
// Fold Limit together.
//
// Limit-Limit -> Limit
LogicalPlan::Limit(LogicalLimit {
input,
limit: child_limit,
eager: child_eagar,
}) => {
let new_limit = limit.min(*child_limit as usize);
let new_eager = eager | child_eagar;

let new_plan = Arc::new(LogicalPlan::Limit(LogicalLimit::new(
input.clone(),
new_limit as i64,
new_eager,
)));
// we rerun the optimizer, ideally when we move to a visitor pattern this should go away
let optimized = self
.try_optimize(new_plan.clone())?
.or(Transformed::Yes(new_plan))
.unwrap()
.clone();
Ok(Transformed::Yes(optimized))
}
_ => Ok(Transformed::No(plan)),
}
}
Expand All @@ -99,7 +127,10 @@ mod tests {
rules::PushDownLimit,
Optimizer,
},
test::{dummy_scan_node, dummy_scan_node_with_pushdowns},
test::{
dummy_scan_node, dummy_scan_node_with_pushdowns,
dummy_scan_operator_node_with_pushdowns,
},
LogicalPlan, PartitionScheme,
};

Expand Down Expand Up @@ -166,6 +197,50 @@ mod tests {
Ok(())
}

/// Tests that Limit does not push into external Source with existing smaller limit.
///
/// Limit[x]-Limit[y] -> Limit[min(x,y)]
#[test]
fn limit_folds_with_smaller_limit() -> DaftResult<()> {
let plan = dummy_scan_node_with_pushdowns(
vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
],
Pushdowns::default(),
)
.limit(5, false)?
.limit(10, false)?
.build();
let expected = "\
Limit: 5\
\n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Tests that Limit does not push into external Source with existing smaller limit.
///
/// Limit[x]-Limit[y] -> Limit[min(x,y)]
#[test]
fn limit_folds_with_large_limit() -> DaftResult<()> {
let plan = dummy_scan_node_with_pushdowns(
vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
],
Pushdowns::default().with_limit(Some(20)),
)
.limit(10, false)?
.limit(5, false)?
.build();
let expected = "\
Limit: 5\
\n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Tests that Limit does push into external Source with existing larger limit.
///
/// Limit-Source[existing_limit] -> Source[new_limit]
Expand All @@ -187,6 +262,27 @@ mod tests {
Ok(())
}

/// Tests that Limit does push into external Source with existing larger limit.
///
/// Limit-Source[existing_limit] -> Source[new_limit]
#[test]
fn limit_does_push_into_scan_operator_if_larger_limit() -> DaftResult<()> {
let plan = dummy_scan_operator_node_with_pushdowns(
vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
],
Pushdowns::default().with_limit(Some(10)),
)
.limit(5, false)?
.build();
let expected = "\
Limit: 5\
\n Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Limit pushdown = 5, Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Tests that Limit does not push into in-memory Source.
#[test]
#[cfg(feature = "python")]
Expand Down
24 changes: 24 additions & 0 deletions src/daft-plan/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub fn dummy_scan_node_with_pushdowns(
pushdowns: Pushdowns,
) -> LogicalPlanBuilder {
let schema = Arc::new(Schema::new(fields).unwrap());

LogicalPlanBuilder::table_scan_with_pushdowns(
FileInfos::new_internal(vec!["/foo".to_string()], vec![None], vec![None]),
schema,
Expand All @@ -25,3 +26,26 @@ pub fn dummy_scan_node_with_pushdowns(
)
.unwrap()
}

pub fn dummy_scan_operator_node(fields: Vec<Field>) -> LogicalPlanBuilder {
dummy_scan_operator_node_with_pushdowns(fields, Default::default())
}

/// Create a dummy scan node containing the provided fields in its schema and the provided limit.
pub fn dummy_scan_operator_node_with_pushdowns(
fields: Vec<Field>,
pushdowns: Pushdowns,
) -> LogicalPlanBuilder {
let schema = Arc::new(Schema::new(fields).unwrap());
let anon = daft_scan::AnonymousScanOperator::new(
vec!["/foo".to_string()],
schema,
FileFormatConfig::Json(Default::default()).into(),
StorageConfig::Native(NativeStorageConfig::new_internal(true, None).into()).into(),
);
LogicalPlanBuilder::table_scan_with_scan_operator(
daft_scan::ScanOperatorRef(Arc::new(anon)),
Some(pushdowns),
)
.unwrap()
}
2 changes: 1 addition & 1 deletion src/daft-scan/src/anonymous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl AnonymousScanOperator {

impl Display for AnonymousScanOperator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:#?}", self)
write!(f, "AnonymousScanOperator: File paths=[{}], Format-specific config = {:?}, Storage config = {:?}", self.files.join(", "), self.file_format_config, self.storage_config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to do the same for GlobScanOperator as well!

write!(f, "{:#?}", self)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah was thinking that, can do that in a follow up!

}
}

Expand Down
1 change: 1 addition & 0 deletions src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use file_format::FileFormatConfig;
use serde::{Deserialize, Serialize};

mod anonymous;
pub use anonymous::AnonymousScanOperator;
pub mod file_format;
mod glob;
#[cfg(feature = "python")]
Expand Down
Loading
Loading