Skip to content

Commit

Permalink
add deferred EXISTS
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 committed Nov 14, 2024
1 parent 59ac3d2 commit 1e87ddd
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 19 deletions.
6 changes: 3 additions & 3 deletions benchmarking/tpcds/queries/58.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ WITH ss_items AS
WHERE d_week_seq =
(SELECT d_week_seq
FROM date_dim
WHERE d_date = '2000-01-03'))
WHERE d_date = DATE '2000-01-03'))
AND ss_sold_date_sk = d_date_sk
GROUP BY i_item_id),
cs_items AS
Expand All @@ -27,7 +27,7 @@ WITH ss_items AS
WHERE d_week_seq =
(SELECT d_week_seq
FROM date_dim
WHERE d_date = '2000-01-03'))
WHERE d_date = DATE '2000-01-03'))
AND cs_sold_date_sk = d_date_sk
GROUP BY i_item_id),
ws_items AS
Expand All @@ -43,7 +43,7 @@ WITH ss_items AS
WHERE d_week_seq =
(SELECT d_week_seq
FROM date_dim
WHERE d_date = '2000-01-03'))
WHERE d_date = DATE '2000-01-03'))
AND ws_sold_date_sk = d_date_sk
GROUP BY i_item_id)
SELECT ss_items.item_id,
Expand Down
18 changes: 9 additions & 9 deletions benchmarking/tpcds/queries/83.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ WITH sr_items AS
WHERE d_week_seq IN
(SELECT d_week_seq
FROM date_dim
WHERE d_date IN ('2000-06-30',
'2000-09-27',
'2000-11-17')))
WHERE d_date IN (DATE'2000-06-30',
DATE'2000-09-27',
DATE'2000-11-17')))
AND sr_returned_date_sk = d_date_sk
GROUP BY i_item_id),
cr_items AS
Expand All @@ -29,9 +29,9 @@ WITH sr_items AS
WHERE d_week_seq IN
(SELECT d_week_seq
FROM date_dim
WHERE d_date IN ('2000-06-30',
'2000-09-27',
'2000-11-17')))
WHERE d_date IN (DATE'2000-06-30',
DATE'2000-09-27',
DATE'2000-11-17')))
AND cr_returned_date_sk = d_date_sk
GROUP BY i_item_id),
wr_items AS
Expand All @@ -47,9 +47,9 @@ WITH sr_items AS
WHERE d_week_seq IN
(SELECT d_week_seq
FROM date_dim
WHERE d_date IN ('2000-06-30',
'2000-09-27',
'2000-11-17')))
WHERE d_date IN (DATE'2000-06-30',
DATE'2000-09-27',
DATE'2000-11-17')))
AND wr_returned_date_sk = d_date_sk
GROUP BY i_item_id)
SELECT sr_items.item_id ,
Expand Down
2 changes: 1 addition & 1 deletion benchmarking/tpcds/queries/95.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ FROM web_sales ws1 ,
date_dim ,
customer_address ,
web_site
WHERE d_date BETWEEN '1999-02-01' AND cast('1999-04-02' AS date)
WHERE d_date BETWEEN DATE'1999-02-01' AND cast('1999-04-02' AS date)
AND ws1.ws_ship_date_sk = d_date_sk
AND ws1.ws_ship_addr_sk = ca_address_sk
AND ca_state = 'IL'
Expand Down
14 changes: 11 additions & 3 deletions src/daft-dsl/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ pub enum Expr {
Subquery(Subquery),
#[display("{_0}, {_1}")]
InSubquery(ExprRef, Subquery),
#[display("{_0}")]
Exists(Subquery),
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash, Eq)]
Expand Down Expand Up @@ -709,7 +711,9 @@ impl Expr {
Self::Agg(agg_expr) => agg_expr.semantic_id(schema),
Self::ScalarFunction(sf) => scalar_function_semantic_id(sf, schema),

Self::Subquery(..) | Self::InSubquery(..) => FieldID::new("__subquery__"), // todo: better/unique id
Self::Subquery(..) | Self::InSubquery(..) | Self::Exists(..) => {
FieldID::new("__subquery__")
} // todo: better/unique id
}
}

Expand All @@ -719,6 +723,7 @@ impl Expr {
Self::Column(..) => vec![],
Self::Literal(..) => vec![],
Self::Subquery(..) => vec![],
Self::Exists(..) => vec![],

// One child.
Self::Not(expr)
Expand Down Expand Up @@ -753,7 +758,7 @@ impl Expr {
pub fn with_new_children(&self, children: Vec<ExprRef>) -> Self {
match self {
// no children
Self::Column(..) | Self::Literal(..) | Self::Subquery(..) => {
Self::Column(..) | Self::Literal(..) | Self::Subquery(..) | Self::Exists(..) => {
assert!(children.is_empty(), "Should have no children");
self.clone()
}
Expand Down Expand Up @@ -990,6 +995,7 @@ impl Expr {
Ok(first_field.clone())
}
Self::InSubquery(expr, _) => Ok(Field::new(expr.name(), DataType::Boolean)),
Self::Exists(_) => Ok(Field::new("exists", DataType::Boolean)),
}
}

Expand Down Expand Up @@ -1022,6 +1028,7 @@ impl Expr {
Self::IfElse { if_true, .. } => if_true.name(),
Self::Subquery(subquery) => subquery.name(),
Self::InSubquery(expr, _) => expr.name(),
Self::Exists(subquery) => subquery.name(),
}
}

Expand Down Expand Up @@ -1096,7 +1103,8 @@ impl Expr {
| Expr::FillNull(..)
| Expr::ScalarFunction { .. }
| Expr::Subquery(..)
| Expr::InSubquery(..) => Err(io::Error::new(
| Expr::InSubquery(..)
| Expr::Exists(..) => Err(io::Error::new(
io::ErrorKind::Other,
"Unsupported expression for SQL translation",
)),
Expand Down
3 changes: 2 additions & 1 deletion src/daft-dsl/src/optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub fn requires_computation(e: &Expr) -> bool {
| Expr::Between { .. }
| Expr::IfElse { .. }
| Expr::Subquery { .. }
| Expr::InSubquery { .. } => true,
| Expr::InSubquery { .. }
| Expr::Exists(..) => true,
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/daft-logical-plan/src/ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ fn replace_column_with_semantic_id(
Transformed::yes(new_expr.into())
} else {
match e.as_ref() {
Expr::Column(_) | Expr::Literal(_) | Expr::Subquery(_) => Transformed::no(e),
Expr::Column(_) | Expr::Literal(_) | Expr::Subquery(_) | Expr::Exists(_) => {
Transformed::no(e)
}
Expr::Agg(agg_expr) => replace_column_with_semantic_id_aggexpr(
agg_expr.clone(),
subexprs_to_replace,
Expand Down
1 change: 1 addition & 0 deletions src/daft-logical-plan/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ fn translate_clustering_spec_expr(
},
Expr::Literal(_) => Ok(clustering_spec_expr.clone()),
Expr::Subquery(_) => Ok(clustering_spec_expr.clone()),
Expr::Exists(_) => Ok(clustering_spec_expr.clone()),
Expr::Alias(child, name) => {
let newchild = translate_clustering_spec_expr(child, old_colname_to_new_colname)?;
Ok(newchild.alias(name.clone()))
Expand Down
13 changes: 12 additions & 1 deletion src/daft-sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,18 @@ impl SQLPlanner {
},
)
}
SQLExpr::Exists { .. } => unsupported_sql_err!("EXISTS"),
SQLExpr::Exists { subquery, negated } => {
let mut this = self.clone();
let subquery = this.plan_query(subquery)?;
let subquery = Subquery {
plan: subquery.build(),
};
if *negated {
Ok(Expr::Exists(subquery).arced().not())
} else {
Ok(Expr::Exists(subquery).arced())
}
}
SQLExpr::Subquery(subquery) => {
let mut this = self.clone();
let subquery = this.plan_query(subquery)?;
Expand Down
3 changes: 3 additions & 0 deletions src/daft-table/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,9 @@ impl Table {
InSubquery(_expr, _subquery) => Err(DaftError::ComputeError(
"IN <SUBQUERY> should be optimized away before evaluation. This indicates a bug in the query optimizer.".to_string(),
)),
Exists(_subquery) => Err(DaftError::ComputeError(
"EXISTS <SUBQUERY> should be optimized away before evaluation. This indicates a bug in the query optimizer.".to_string(),
)),
}?;

if expected_field.name != series.field().name {
Expand Down

0 comments on commit 1e87ddd

Please sign in to comment.