diff --git a/Cargo.lock b/Cargo.lock index 2649a44c19..a2dd03b600 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1374,6 +1374,7 @@ dependencies = [ "common-daft-config", "common-error", "common-io-config", + "common-treenode", "daft-core", "daft-dsl", "daft-scan", diff --git a/src/daft-plan/Cargo.toml b/src/daft-plan/Cargo.toml index d05a91f73a..50b110d32d 100644 --- a/src/daft-plan/Cargo.toml +++ b/src/daft-plan/Cargo.toml @@ -4,6 +4,7 @@ bincode = {workspace = true} common-daft-config = {path = "../common/daft-config", default-features = false} common-error = {path = "../common/error", default-features = false} common-io-config = {path = "../common/io-config", default-features = false} +common-treenode = {path = "../common/treenode", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} daft-scan = {path = "../daft-scan", default-features = false} diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 1ebb2b6dd3..28b106f9f2 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -71,7 +71,10 @@ impl LogicalPlanBuilder { Ok(logical_plan.into()) } - pub fn table_scan_with_scan_operator(scan_operator: ScanOperatorRef) -> DaftResult { + pub fn table_scan_with_scan_operator( + scan_operator: ScanOperatorRef, + pushdowns: Option, + ) -> DaftResult { let schema = scan_operator.0.schema(); let partitioning_keys = scan_operator.0.partitioning_keys(); let source_info = @@ -79,7 +82,7 @@ impl LogicalPlanBuilder { scan_operator.clone(), schema.clone(), partitioning_keys.into(), - Default::default(), + pushdowns.unwrap_or_default(), ))); let logical_plan: LogicalPlan = logical_ops::Source::new(schema.clone(), source_info.into()).into(); @@ -301,7 +304,7 @@ impl PyLogicalPlanBuilder { #[staticmethod] pub fn table_scan_with_scan_operator(scan_operator: ScanOperatorHandle) -> PyResult { - Ok(LogicalPlanBuilder::table_scan_with_scan_operator(scan_operator.into())?.into()) + Ok(LogicalPlanBuilder::table_scan_with_scan_operator(scan_operator.into(), None)?.into()) } #[staticmethod] diff --git a/src/daft-plan/src/logical_ops/source.rs b/src/daft-plan/src/logical_ops/source.rs index 285130acf4..62c8b48d89 100644 --- a/src/daft-plan/src/logical_ops/source.rs +++ b/src/daft-plan/src/logical_ops/source.rs @@ -53,8 +53,7 @@ impl Source { partitioning_keys, pushdowns, })) => { - res.push("Source:".to_string()); - res.push(format!("Scan op = {}", scan_op)); + res.push(format!("Source: Operator = {}", scan_op)); res.push(format!("File schema = {}", source_schema.short_string())); res.push(format!("Partitioning keys = {:?}", partitioning_keys)); res.extend(pushdowns.multiline_display()); diff --git a/src/daft-plan/src/optimization/rules/push_down_filter.rs b/src/daft-plan/src/optimization/rules/push_down_filter.rs index dbe7c72102..834ee9b785 100644 --- a/src/daft-plan/src/optimization/rules/push_down_filter.rs +++ b/src/daft-plan/src/optimization/rules/push_down_filter.rs @@ -6,12 +6,14 @@ use std::{ use common_error::DaftResult; use daft_dsl::{ col, + functions::FunctionExpr, optimization::{get_required_columns, replace_columns_with_expressions}, Expr, }; use crate::{ - logical_ops::{Concat, Filter, Project}, + logical_ops::{Concat, Filter, Project, Source}, + source_info::{ExternalInfo, SourceInfo}, LogicalPlan, }; @@ -70,6 +72,62 @@ impl OptimizerRule for PushDownFilter { .unwrap() .clone() } + LogicalPlan::Source(source) => { + match source.source_info.as_ref() { + // Filter pushdown is not supported for in-memory sources. + #[cfg(feature = "python")] + SourceInfo::InMemoryInfo(_) => return Ok(Transformed::No(plan)), + // Do not pushdown if Source node is already has a limit + SourceInfo::ExternalInfo(external_info) + if let Some(existing_limit) = + external_info.pushdowns().limit => + { + return Ok(Transformed::No(plan)) + } + // Do not pushdown if we are using python legacy scan info + SourceInfo::ExternalInfo(external_info) + if let ExternalInfo::Legacy(..) = external_info => + { + return Ok(Transformed::No(plan)) + } + + // Pushdown filter into the Source node + SourceInfo::ExternalInfo(external_info) => { + let predicate = &filter.predicate; + use common_treenode::{TreeNode, VisitRecursion}; + + let mut has_udf = false; + predicate.apply(&mut |e: &Expr| { + + match e { + #[cfg(feature = "python")] + Expr::Function{func: FunctionExpr::Python(..), .. } => { + has_udf = true; + Ok(VisitRecursion::Stop) + }, + Expr::Function{func: FunctionExpr::Uri(..), .. } => { + has_udf = true; + Ok(VisitRecursion::Stop) + }, + _ => Ok(VisitRecursion::Continue) + } + })?; + if has_udf { + return Ok(Transformed::No(plan)); + } + let new_predicate = external_info.pushdowns().filters.as_ref().map(|f| predicate.and(f)).unwrap_or(predicate.clone()); + let new_pushdowns = + external_info.pushdowns().with_filters(Some(Arc::new(new_predicate))); + let new_external_info = external_info.with_pushdowns(new_pushdowns); + let new_source = LogicalPlan::Source(Source::new( + source.output_schema.clone(), + SourceInfo::ExternalInfo(new_external_info).into(), + )) + .into(); + return Ok(Transformed::Yes(new_source)) + } + } + } LogicalPlan::Project(child_project) => { // Commute filter with projection if predicate only depends on projection columns that // don't involve compute. @@ -221,7 +279,7 @@ mod tests { rules::PushDownFilter, Optimizer, }, - test::dummy_scan_node, + test::{dummy_scan_node, dummy_scan_operator_node}, JoinType, LogicalPlan, PartitionScheme, }; @@ -266,6 +324,57 @@ mod tests { Ok(()) } + /// Tests combining of two Filters into a ScanOperator + #[test] + fn pushdown_filter_into_scan_operator() -> DaftResult<()> { + let plan = dummy_scan_operator_node(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Utf8), + ]) + .filter(col("a").lt(&lit(2)))? + .filter(col("b").eq(&lit("foo")))? + .build(); + let expected = "\ + Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Filter pushdown = [col(b) == lit(\"foo\")] & [col(a) < lit(2)], Output schema = a (Int64), b (Utf8)"; + assert_optimized_plan_eq(plan, expected)?; + Ok(()) + } + + /// Tests that we cant pushdown a filter into a ScanOperator with a limit + #[test] + fn pushdown_filter_into_scan_operator_with_limit() -> DaftResult<()> { + let plan = dummy_scan_operator_node(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Utf8), + ]) + .limit(1, false)? + .filter(col("a").lt(&lit(2)))? + .build(); + let expected = "\ + Filter: col(a) < lit(2)\ + \n Limit: 1\ + \n Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Output schema = a (Int64), b (Utf8)"; + assert_optimized_plan_eq(plan, expected)?; + Ok(()) + } + + /// Tests that we cant pushdown a filter into a ScanOperator with an udf-ish expression + #[test] + fn pushdown_filter_into_scan_operator_with_udf() -> DaftResult<()> { + let pred = daft_dsl::functions::uri::download(&col("a"), 1, true, true, None); + let plan = dummy_scan_operator_node(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Utf8), + ]) + .filter(pred.is_null())? + .build(); + let expected = "\ + Filter: is_null(download(col(a)))\ + \n Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Output schema = a (Int64), b (Utf8)"; + assert_optimized_plan_eq(plan, expected)?; + Ok(()) + } + /// Tests that Filter commutes with Projections. #[test] fn filter_commutes_with_projection() -> DaftResult<()> { diff --git a/src/daft-plan/src/optimization/rules/push_down_limit.rs b/src/daft-plan/src/optimization/rules/push_down_limit.rs index 8983f766b6..a679fef3a0 100644 --- a/src/daft-plan/src/optimization/rules/push_down_limit.rs +++ b/src/daft-plan/src/optimization/rules/push_down_limit.rs @@ -28,7 +28,11 @@ impl OptimizerRule for PushDownLimit { fn try_optimize(&self, plan: Arc) -> DaftResult>> { match plan.as_ref() { - LogicalPlan::Limit(LogicalLimit { input, limit, .. }) => { + LogicalPlan::Limit(LogicalLimit { + input, + limit, + eager, + }) => { let limit = *limit as usize; match input.as_ref() { // Naive commuting with unary ops. @@ -74,6 +78,30 @@ impl OptimizerRule for PushDownLimit { } } } + // Fold Limit together. + // + // Limit-Limit -> Limit + LogicalPlan::Limit(LogicalLimit { + input, + limit: child_limit, + eager: child_eagar, + }) => { + let new_limit = limit.min(*child_limit as usize); + let new_eager = eager | child_eagar; + + let new_plan = Arc::new(LogicalPlan::Limit(LogicalLimit::new( + input.clone(), + new_limit as i64, + new_eager, + ))); + // we rerun the optimizer, ideally when we move to a visitor pattern this should go away + let optimized = self + .try_optimize(new_plan.clone())? + .or(Transformed::Yes(new_plan)) + .unwrap() + .clone(); + Ok(Transformed::Yes(optimized)) + } _ => Ok(Transformed::No(plan)), } } @@ -99,7 +127,10 @@ mod tests { rules::PushDownLimit, Optimizer, }, - test::{dummy_scan_node, dummy_scan_node_with_pushdowns}, + test::{ + dummy_scan_node, dummy_scan_node_with_pushdowns, + dummy_scan_operator_node_with_pushdowns, + }, LogicalPlan, PartitionScheme, }; @@ -166,6 +197,50 @@ mod tests { Ok(()) } + /// Tests that Limit does not push into external Source with existing smaller limit. + /// + /// Limit[x]-Limit[y] -> Limit[min(x,y)] + #[test] + fn limit_folds_with_smaller_limit() -> DaftResult<()> { + let plan = dummy_scan_node_with_pushdowns( + vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Utf8), + ], + Pushdowns::default(), + ) + .limit(5, false)? + .limit(10, false)? + .build(); + let expected = "\ + Limit: 5\ + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)"; + assert_optimized_plan_eq(plan, expected)?; + Ok(()) + } + + /// Tests that Limit does not push into external Source with existing smaller limit. + /// + /// Limit[x]-Limit[y] -> Limit[min(x,y)] + #[test] + fn limit_folds_with_large_limit() -> DaftResult<()> { + let plan = dummy_scan_node_with_pushdowns( + vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Utf8), + ], + Pushdowns::default().with_limit(Some(20)), + ) + .limit(10, false)? + .limit(5, false)? + .build(); + let expected = "\ + Limit: 5\ + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)"; + assert_optimized_plan_eq(plan, expected)?; + Ok(()) + } + /// Tests that Limit does push into external Source with existing larger limit. /// /// Limit-Source[existing_limit] -> Source[new_limit] @@ -187,6 +262,27 @@ mod tests { Ok(()) } + /// Tests that Limit does push into external Source with existing larger limit. + /// + /// Limit-Source[existing_limit] -> Source[new_limit] + #[test] + fn limit_does_push_into_scan_operator_if_larger_limit() -> DaftResult<()> { + let plan = dummy_scan_operator_node_with_pushdowns( + vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Utf8), + ], + Pushdowns::default().with_limit(Some(10)), + ) + .limit(5, false)? + .build(); + let expected = "\ + Limit: 5\ + \n Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Limit pushdown = 5, Output schema = a (Int64), b (Utf8)"; + assert_optimized_plan_eq(plan, expected)?; + Ok(()) + } + /// Tests that Limit does not push into in-memory Source. #[test] #[cfg(feature = "python")] diff --git a/src/daft-plan/src/test/mod.rs b/src/daft-plan/src/test/mod.rs index 1039f84ea1..37dbff01ee 100644 --- a/src/daft-plan/src/test/mod.rs +++ b/src/daft-plan/src/test/mod.rs @@ -16,6 +16,7 @@ pub fn dummy_scan_node_with_pushdowns( pushdowns: Pushdowns, ) -> LogicalPlanBuilder { let schema = Arc::new(Schema::new(fields).unwrap()); + LogicalPlanBuilder::table_scan_with_pushdowns( FileInfos::new_internal(vec!["/foo".to_string()], vec![None], vec![None]), schema, @@ -25,3 +26,26 @@ pub fn dummy_scan_node_with_pushdowns( ) .unwrap() } + +pub fn dummy_scan_operator_node(fields: Vec) -> LogicalPlanBuilder { + dummy_scan_operator_node_with_pushdowns(fields, Default::default()) +} + +/// Create a dummy scan node containing the provided fields in its schema and the provided limit. +pub fn dummy_scan_operator_node_with_pushdowns( + fields: Vec, + pushdowns: Pushdowns, +) -> LogicalPlanBuilder { + let schema = Arc::new(Schema::new(fields).unwrap()); + let anon = daft_scan::AnonymousScanOperator::new( + vec!["/foo".to_string()], + schema, + FileFormatConfig::Json(Default::default()).into(), + StorageConfig::Native(NativeStorageConfig::new_internal(true, None).into()).into(), + ); + LogicalPlanBuilder::table_scan_with_scan_operator( + daft_scan::ScanOperatorRef(Arc::new(anon)), + Some(pushdowns), + ) + .unwrap() +} diff --git a/src/daft-scan/src/anonymous.rs b/src/daft-scan/src/anonymous.rs index 54f5deef26..8fd6ce0957 100644 --- a/src/daft-scan/src/anonymous.rs +++ b/src/daft-scan/src/anonymous.rs @@ -33,7 +33,7 @@ impl AnonymousScanOperator { impl Display for AnonymousScanOperator { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:#?}", self) + write!(f, "AnonymousScanOperator: File paths=[{}], Format-specific config = {:?}, Storage config = {:?}", self.files.join(", "), self.file_format_config, self.storage_config) } } diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 31e2794afe..b283425498 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -12,6 +12,7 @@ use file_format::FileFormatConfig; use serde::{Deserialize, Serialize}; mod anonymous; +pub use anonymous::AnonymousScanOperator; pub mod file_format; mod glob; #[cfg(feature = "python")] diff --git a/src/daft-stats/src/table_stats.rs b/src/daft-stats/src/table_stats.rs index b44f8c4be4..862336d34e 100644 --- a/src/daft-stats/src/table_stats.rs +++ b/src/daft-stats/src/table_stats.rs @@ -1,4 +1,7 @@ -use std::{fmt::Display, ops::Not}; +use std::{ + fmt::Display, + ops::{BitAnd, BitOr, Not}, +}; use common_error::DaftError; use daft_dsl::Expr; @@ -108,6 +111,8 @@ impl TableStatistics { Gt => lhs.gt(&rhs), Plus => &lhs + &rhs, Minus => &lhs - &rhs, + And => lhs.bitand(&rhs), + Or => lhs.bitor(&rhs), _ => Ok(ColumnRangeStatistics::Missing), } }