-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PERF] Predicate Pushdown into Scan Operator #1730
Changes from all commits
e64008a
ffb65c2
41eadcb
90f8190
8bc2198
cfa53b1
25b6808
fd0d6b6
85dc31f
a089b06
ead6c80
6d21747
b6d6669
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,12 +6,14 @@ use std::{ | |
use common_error::DaftResult; | ||
use daft_dsl::{ | ||
col, | ||
functions::FunctionExpr, | ||
optimization::{get_required_columns, replace_columns_with_expressions}, | ||
Expr, | ||
}; | ||
|
||
use crate::{ | ||
logical_ops::{Concat, Filter, Project}, | ||
logical_ops::{Concat, Filter, Project, Source}, | ||
source_info::{ExternalInfo, SourceInfo}, | ||
LogicalPlan, | ||
}; | ||
|
||
|
@@ -70,6 +72,62 @@ impl OptimizerRule for PushDownFilter { | |
.unwrap() | ||
.clone() | ||
} | ||
LogicalPlan::Source(source) => { | ||
match source.source_info.as_ref() { | ||
// Filter pushdown is not supported for in-memory sources. | ||
#[cfg(feature = "python")] | ||
SourceInfo::InMemoryInfo(_) => return Ok(Transformed::No(plan)), | ||
// Do not pushdown if Source node is already has a limit | ||
SourceInfo::ExternalInfo(external_info) | ||
if let Some(existing_limit) = | ||
external_info.pushdowns().limit => | ||
{ | ||
return Ok(Transformed::No(plan)) | ||
} | ||
// Do not pushdown if we are using python legacy scan info | ||
SourceInfo::ExternalInfo(external_info) | ||
if let ExternalInfo::Legacy(..) = external_info => | ||
{ | ||
return Ok(Transformed::No(plan)) | ||
} | ||
|
||
// Pushdown filter into the Source node | ||
SourceInfo::ExternalInfo(external_info) => { | ||
let predicate = &filter.predicate; | ||
use common_treenode::{TreeNode, VisitRecursion}; | ||
|
||
let mut has_udf = false; | ||
predicate.apply(&mut |e: &Expr| { | ||
|
||
match e { | ||
#[cfg(feature = "python")] | ||
Expr::Function{func: FunctionExpr::Python(..), .. } => { | ||
has_udf = true; | ||
Ok(VisitRecursion::Stop) | ||
}, | ||
Expr::Function{func: FunctionExpr::Uri(..), .. } => { | ||
has_udf = true; | ||
Ok(VisitRecursion::Stop) | ||
}, | ||
_ => Ok(VisitRecursion::Continue) | ||
} | ||
})?; | ||
Comment on lines
+100
to
+114
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So nice! |
||
if has_udf { | ||
return Ok(Transformed::No(plan)); | ||
} | ||
let new_predicate = external_info.pushdowns().filters.as_ref().map(|f| predicate.and(f)).unwrap_or(predicate.clone()); | ||
let new_pushdowns = | ||
external_info.pushdowns().with_filters(Some(Arc::new(new_predicate))); | ||
let new_external_info = external_info.with_pushdowns(new_pushdowns); | ||
let new_source = LogicalPlan::Source(Source::new( | ||
source.output_schema.clone(), | ||
SourceInfo::ExternalInfo(new_external_info).into(), | ||
)) | ||
.into(); | ||
return Ok(Transformed::Yes(new_source)) | ||
} | ||
} | ||
} | ||
LogicalPlan::Project(child_project) => { | ||
// Commute filter with projection if predicate only depends on projection columns that | ||
// don't involve compute. | ||
|
@@ -221,7 +279,7 @@ mod tests { | |
rules::PushDownFilter, | ||
Optimizer, | ||
}, | ||
test::dummy_scan_node, | ||
test::{dummy_scan_node, dummy_scan_operator_node}, | ||
JoinType, LogicalPlan, PartitionScheme, | ||
}; | ||
|
||
|
@@ -266,6 +324,57 @@ mod tests { | |
Ok(()) | ||
} | ||
|
||
/// Tests combining of two Filters into a ScanOperator | ||
#[test] | ||
fn pushdown_filter_into_scan_operator() -> DaftResult<()> { | ||
let plan = dummy_scan_operator_node(vec![ | ||
Field::new("a", DataType::Int64), | ||
Field::new("b", DataType::Utf8), | ||
]) | ||
.filter(col("a").lt(&lit(2)))? | ||
.filter(col("b").eq(&lit("foo")))? | ||
.build(); | ||
let expected = "\ | ||
Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Filter pushdown = [col(b) == lit(\"foo\")] & [col(a) < lit(2)], Output schema = a (Int64), b (Utf8)"; | ||
assert_optimized_plan_eq(plan, expected)?; | ||
Ok(()) | ||
} | ||
|
||
/// Tests that we cant pushdown a filter into a ScanOperator with a limit | ||
#[test] | ||
fn pushdown_filter_into_scan_operator_with_limit() -> DaftResult<()> { | ||
let plan = dummy_scan_operator_node(vec![ | ||
Field::new("a", DataType::Int64), | ||
Field::new("b", DataType::Utf8), | ||
]) | ||
.limit(1, false)? | ||
.filter(col("a").lt(&lit(2)))? | ||
.build(); | ||
let expected = "\ | ||
Filter: col(a) < lit(2)\ | ||
\n Limit: 1\ | ||
\n Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Output schema = a (Int64), b (Utf8)"; | ||
assert_optimized_plan_eq(plan, expected)?; | ||
Ok(()) | ||
} | ||
|
||
/// Tests that we cant pushdown a filter into a ScanOperator with an udf-ish expression | ||
#[test] | ||
fn pushdown_filter_into_scan_operator_with_udf() -> DaftResult<()> { | ||
let pred = daft_dsl::functions::uri::download(&col("a"), 1, true, true, None); | ||
let plan = dummy_scan_operator_node(vec![ | ||
Field::new("a", DataType::Int64), | ||
Field::new("b", DataType::Utf8), | ||
]) | ||
.filter(pred.is_null())? | ||
.build(); | ||
let expected = "\ | ||
Filter: is_null(download(col(a)))\ | ||
\n Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Output schema = a (Int64), b (Utf8)"; | ||
assert_optimized_plan_eq(plan, expected)?; | ||
Ok(()) | ||
} | ||
|
||
/// Tests that Filter commutes with Projections. | ||
#[test] | ||
fn filter_commutes_with_projection() -> DaftResult<()> { | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -33,7 +33,7 @@ impl AnonymousScanOperator { | |||
|
||||
impl Display for AnonymousScanOperator { | ||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||||
write!(f, "{:#?}", self) | ||||
write!(f, "AnonymousScanOperator: File paths=[{}], Format-specific config = {:?}, Storage config = {:?}", self.files.join(", "), self.file_format_config, self.storage_config) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may want to do the same for Daft/src/daft-scan/src/glob.rs Line 217 in b6d6669
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah was thinking that, can do that in a follow up! |
||||
} | ||||
} | ||||
|
||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this more readable when visualizing the plan? In the past, we've tried to keep the first line of the logical op repr super concise, essentially just a name for the logical op, but it looks like the scan operator repr can be pretty long:
Daft/src/daft-scan/src/anonymous.rs
Line 36 in b6d6669
IMO each string in this returned vec should be pretty atomic/granular, and it should be up to the display mode (e.g. tree plan visualization vs. single-line summary) to condense it as desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does make it more readable imo! But the main reason to do this was to make it much closer to the Legacy Source repr to be able to write the repr based tests much easier. Both should be able about the same length