Skip to content

Commit

Permalink
feat: support for basic subquery execution (#3536)
Browse files Browse the repository at this point in the history
Subquery execution now possible through rewrite rules that convert them
into joins. This only covers subqueries that can be converted into equi
joins and not general subqueries. However that already gets us to 21/22
TPC-H (although Q16 is still missing count distinct implementation on
the SQL side).

Also includes a drive-by fix for SQL substring.

---

Note on alternative implementations:
- Although Datafusion's optimizer rules for subqueries are more capable
than these, they make use of non-equi joins yet do not allow for
decorrelating all subqueries. Instead, once we have non-equi joins we
should implement the [Unnesting Arbitrary
Subqueries](https://cs.emis.de/LNI/Proceedings/Proceedings241/383.pdf)
paper (which [duckdb also
implements](https://duckdb.org/2023/05/26/correlated-subqueries-in-sql.html#performance)).

---

todo:
- [x]  add more tests
  • Loading branch information
kevinzwang authored Dec 11, 2024
1 parent 8d93a27 commit 5238279
Show file tree
Hide file tree
Showing 15 changed files with 968 additions and 39 deletions.
34 changes: 27 additions & 7 deletions src/daft-dsl/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
mod tests;

use std::{
any::Any,
hash::{DefaultHasher, Hash, Hasher},
io::{self, Write},
sync::Arc,
};
Expand Down Expand Up @@ -38,8 +40,11 @@ use crate::{

pub trait SubqueryPlan: std::fmt::Debug + std::fmt::Display + Send + Sync {
fn as_any(&self) -> &dyn std::any::Any;
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
fn name(&self) -> &'static str;
fn schema(&self) -> SchemaRef;
fn dyn_eq(&self, other: &dyn SubqueryPlan) -> bool;
fn dyn_hash(&self, state: &mut dyn Hasher);
}

#[derive(Display, Debug, Clone)]
Expand All @@ -60,6 +65,14 @@ impl Subquery {
pub fn name(&self) -> &'static str {
self.plan.name()
}

pub fn semantic_id(&self) -> FieldID {
let mut s = DefaultHasher::new();
self.hash(&mut s);
let hash = s.finish();

FieldID::new(format!("subquery({}-{})", self.name(), hash))
}
}

impl Serialize for Subquery {
Expand All @@ -76,16 +89,15 @@ impl<'de> Deserialize<'de> for Subquery {

impl PartialEq for Subquery {
fn eq(&self, other: &Self) -> bool {
self.plan.name() == other.plan.name() && self.plan.schema() == other.plan.schema()
self.plan.dyn_eq(other.plan.as_ref())
}
}

impl Eq for Subquery {}

impl std::hash::Hash for Subquery {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.plan.name().hash(state);
self.plan.schema().hash(state);
self.plan.dyn_hash(state);
}
}

Expand Down Expand Up @@ -177,7 +189,7 @@ pub struct OuterReferenceColumn {

impl Display for OuterReferenceColumn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "outer_col({}, {})", self.field.name, self.depth)
write!(f, "outer_col({}, depth={})", self.field.name, self.depth)
}
}

Expand Down Expand Up @@ -744,10 +756,18 @@ impl Expr {
// Agg: Separate path.
Self::Agg(agg_expr) => agg_expr.semantic_id(schema),
Self::ScalarFunction(sf) => scalar_function_semantic_id(sf, schema),
Self::Subquery(subquery) => subquery.semantic_id(),
Self::InSubquery(expr, subquery) => {
let child_id = expr.semantic_id(schema);
let subquery_id = subquery.semantic_id();

Self::Subquery(..) | Self::InSubquery(..) | Self::Exists(..) => {
FieldID::new("__subquery__")
} // todo: better/unique id
FieldID::new(format!("({child_id} IN {subquery_id})"))
}
Self::Exists(subquery) => {
let subquery_id = subquery.semantic_id();

FieldID::new(format!("(EXISTS {subquery_id})"))
}
Self::OuterReferenceColumn(c) => {
let name = &c.field.name;
let depth = c.depth;
Expand Down
33 changes: 31 additions & 2 deletions src/daft-logical-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::{num::NonZeroUsize, sync::Arc};
use std::{
any::Any,
hash::{Hash, Hasher},
num::NonZeroUsize,
sync::Arc,
};

use common_display::ascii::AsciiTreeDisplay;
use common_error::DaftError;
use daft_dsl::{optimization::get_required_columns, SubqueryPlan};
use daft_dsl::{optimization::get_required_columns, Subquery, SubqueryPlan};
use daft_schema::schema::SchemaRef;
use indexmap::IndexSet;
use snafu::Snafu;
Expand Down Expand Up @@ -396,13 +401,37 @@ impl SubqueryPlan for LogicalPlan {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self
}

fn name(&self) -> &'static str {
Self::name(self)
}

fn schema(&self) -> SchemaRef {
Self::schema(self)
}

fn dyn_eq(&self, other: &dyn SubqueryPlan) -> bool {
other
.as_any()
.downcast_ref::<Self>()
.map_or(false, |other| self == other)
}

fn dyn_hash(&self, mut state: &mut dyn Hasher) {
self.hash(&mut state);
}
}

pub(crate) fn downcast_subquery(subquery: &Subquery) -> LogicalPlanRef {
subquery
.plan
.clone()
.as_any_arc()
.downcast::<LogicalPlan>()
.expect("subquery plan should be a LogicalPlan")
}

#[derive(Debug, Snafu)]
Expand Down
8 changes: 5 additions & 3 deletions src/daft-logical-plan/src/optimization/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{
rules::{
DropRepartition, EliminateCrossJoin, EnrichWithStats, LiftProjectFromAgg, MaterializeScans,
OptimizerRule, PushDownFilter, PushDownLimit, PushDownProjection, SimplifyExpressionsRule,
SplitActorPoolProjects,
SplitActorPoolProjects, UnnestPredicateSubquery, UnnestScalarSubquery,
},
};
use crate::LogicalPlan;
Expand Down Expand Up @@ -93,10 +93,12 @@ impl Optimizer {
// --- Rewrite rules ---
RuleBatch::new(
vec![
Box::new(SplitActorPoolProjects::new()),
Box::new(LiftProjectFromAgg::new()),
Box::new(UnnestScalarSubquery::new()),
Box::new(UnnestPredicateSubquery::new()),
Box::new(SplitActorPoolProjects::new()),
],
RuleExecutionStrategy::Once,
RuleExecutionStrategy::FixedPoint(None),
),
// we want to simplify expressions first to make the rest of the rules easier
RuleBatch::new(
Expand Down
13 changes: 11 additions & 2 deletions src/daft-logical-plan/src/optimization/rules/drop_repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ mod tests {

use crate::{
optimization::{
rules::drop_repartition::DropRepartition, test::assert_optimized_plan_with_rules_eq,
optimizer::{RuleBatch, RuleExecutionStrategy},
rules::drop_repartition::DropRepartition,
test::assert_optimized_plan_with_rules_eq,
},
test::{dummy_scan_node, dummy_scan_operator},
LogicalPlan,
Expand All @@ -65,7 +67,14 @@ mod tests {
plan: Arc<LogicalPlan>,
expected: Arc<LogicalPlan>,
) -> DaftResult<()> {
assert_optimized_plan_with_rules_eq(plan, expected, vec![Box::new(DropRepartition::new())])
assert_optimized_plan_with_rules_eq(
plan,
expected,
vec![RuleBatch::new(
vec![Box::new(DropRepartition::new())],
RuleExecutionStrategy::Once,
)],
)
}

/// Tests that DropRepartition does drops the upstream Repartition in back-to-back Repartitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ mod tests {

use super::LiftProjectFromAgg;
use crate::{
optimization::test::assert_optimized_plan_with_rules_eq,
optimization::{
optimizer::{RuleBatch, RuleExecutionStrategy},
test::assert_optimized_plan_with_rules_eq,
},
test::{dummy_scan_node, dummy_scan_operator},
LogicalPlan,
};
Expand All @@ -127,7 +130,10 @@ mod tests {
assert_optimized_plan_with_rules_eq(
plan,
expected,
vec![Box::new(LiftProjectFromAgg::new())],
vec![RuleBatch::new(
vec![Box::new(LiftProjectFromAgg::new())],
RuleExecutionStrategy::Once,
)],
)
}

Expand Down
2 changes: 2 additions & 0 deletions src/daft-logical-plan/src/optimization/rules/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod reorder_joins;
mod rule;
mod simplify_expressions;
mod split_actor_pool_projects;
mod unnest_subquery;

pub use drop_repartition::DropRepartition;
pub use eliminate_cross_join::EliminateCrossJoin;
Expand All @@ -22,3 +23,4 @@ pub use push_down_projection::PushDownProjection;
pub use rule::OptimizerRule;
pub use simplify_expressions::SimplifyExpressionsRule;
pub use split_actor_pool_projects::SplitActorPoolProjects;
pub use unnest_subquery::{UnnestPredicateSubquery, UnnestScalarSubquery};
15 changes: 13 additions & 2 deletions src/daft-logical-plan/src/optimization/rules/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,11 @@ mod tests {
use rstest::rstest;

use crate::{
optimization::{rules::PushDownFilter, test::assert_optimized_plan_with_rules_eq},
optimization::{
optimizer::{RuleBatch, RuleExecutionStrategy},
rules::PushDownFilter,
test::assert_optimized_plan_with_rules_eq,
},
test::{dummy_scan_node, dummy_scan_node_with_pushdowns, dummy_scan_operator},
LogicalPlan,
};
Expand All @@ -371,7 +375,14 @@ mod tests {
plan: Arc<LogicalPlan>,
expected: Arc<LogicalPlan>,
) -> DaftResult<()> {
assert_optimized_plan_with_rules_eq(plan, expected, vec![Box::new(PushDownFilter::new())])
assert_optimized_plan_with_rules_eq(
plan,
expected,
vec![RuleBatch::new(
vec![Box::new(PushDownFilter::new())],
RuleExecutionStrategy::Once,
)],
)
}

/// Tests that we can't pushdown a filter into a ScanOperator that has a limit.
Expand Down
15 changes: 13 additions & 2 deletions src/daft-logical-plan/src/optimization/rules/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ mod tests {
use rstest::rstest;

use crate::{
optimization::{rules::PushDownLimit, test::assert_optimized_plan_with_rules_eq},
optimization::{
optimizer::{RuleBatch, RuleExecutionStrategy},
rules::PushDownLimit,
test::assert_optimized_plan_with_rules_eq,
},
test::{dummy_scan_node, dummy_scan_node_with_pushdowns, dummy_scan_operator},
LogicalPlan, LogicalPlanBuilder,
};
Expand All @@ -149,7 +153,14 @@ mod tests {
plan: Arc<LogicalPlan>,
expected: Arc<LogicalPlan>,
) -> DaftResult<()> {
assert_optimized_plan_with_rules_eq(plan, expected, vec![Box::new(PushDownLimit::new())])
assert_optimized_plan_with_rules_eq(
plan,
expected,
vec![RuleBatch::new(
vec![Box::new(PushDownLimit::new())],
RuleExecutionStrategy::Once,
)],
)
}

/// Tests that Limit pushes into external Source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,11 @@ mod tests {
};

use crate::{
optimization::{rules::PushDownProjection, test::assert_optimized_plan_with_rules_eq},
optimization::{
optimizer::{RuleBatch, RuleExecutionStrategy},
rules::PushDownProjection,
test::assert_optimized_plan_with_rules_eq,
},
test::{dummy_scan_node, dummy_scan_node_with_pushdowns, dummy_scan_operator},
LogicalPlan,
};
Expand All @@ -695,7 +699,10 @@ mod tests {
assert_optimized_plan_with_rules_eq(
plan,
expected,
vec![Box::new(PushDownProjection::new())],
vec![RuleBatch::new(
vec![Box::new(PushDownProjection::new())],
RuleExecutionStrategy::Once,
)],
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,11 @@ mod tests {
use super::SplitActorPoolProjects;
use crate::{
ops::{ActorPoolProject, Project},
optimization::{rules::PushDownProjection, test::assert_optimized_plan_with_rules_eq},
optimization::{
optimizer::{RuleBatch, RuleExecutionStrategy},
rules::PushDownProjection,
test::assert_optimized_plan_with_rules_eq,
},
test::{dummy_scan_node, dummy_scan_operator},
LogicalPlan,
};
Expand All @@ -531,7 +535,10 @@ mod tests {
assert_optimized_plan_with_rules_eq(
plan,
expected,
vec![Box::new(SplitActorPoolProjects {})],
vec![RuleBatch::new(
vec![Box::new(SplitActorPoolProjects::new())],
RuleExecutionStrategy::Once,
)],
)
}

Expand All @@ -545,10 +552,13 @@ mod tests {
assert_optimized_plan_with_rules_eq(
plan,
expected,
vec![
Box::new(SplitActorPoolProjects {}),
Box::new(PushDownProjection::new()),
],
vec![RuleBatch::new(
vec![
Box::new(SplitActorPoolProjects::new()),
Box::new(PushDownProjection::new()),
],
RuleExecutionStrategy::Once,
)],
)
}

Expand Down
Loading

0 comments on commit 5238279

Please sign in to comment.