Skip to content

Commit

Permalink
[FEAT] Improved projection folding. (#1374)
Browse files Browse the repository at this point in the history
While looking at the projection folding behaviour in the old query
planner, I noticed there were some projections that could be folded that
it didn't fold.

For example, (a + 1) -> (a as c) could be folded into (a + 1 as c), but
the old query planner does not do that:

```
>>> daft.from_pydict({"a": [1, 2], "b": [3, 4]}).select((col("a")+1)).select(col("a").alias("c")).explain(show_optimized=True)
2023-09-13 19:34:09.494 | INFO     | daft.context:runner:87 - Using PyRunner
┌─Projection
│    output=[col(a) AS c]
│    partitioning={'by': None, 'num_partitions': 1, 'scheme': PartitionScheme.Unknown}
│
├──Projection
│    output=[col(a) + lit(1)]
│    partitioning={'by': None, 'num_partitions': 1, 'scheme': PartitionScheme.Unknown}
│
└──InMemoryScan
     output=[col(a), col(b)]
     cache_id='fc55df21728b4bf9bb1d6143559af9d7'
     partitioning={'by': None, 'num_partitions': 1, 'scheme': PartitionScheme.Unknown}
```

The rules of the old projection folding were that:

- A projection P with a child projection C can be folded together if P
only references columns that are no computation in C (no-ops/aliases).

However, there are projections with computation in C that can be folded
into P. Specifically, they can be safely folded if all columns in C are
referenced at most once in P. Then, the expressions in C can be directly
substituted for the column references in P without changing any
execution semantics.

This PR implements improved projection folding with this new rule. The
new query planner will fold the example above into:

```
* Project: col(a) + lit(1) AS c
|
* Output schema = a (Int64)
```

---------

Co-authored-by: Xiayue Charles Lin <[email protected]>
Co-authored-by: Clark Zinzow <[email protected]>
  • Loading branch information
3 people authored Sep 21, 2023
1 parent cc60b73 commit a0031d5
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 31 deletions.
5 changes: 3 additions & 2 deletions daft/logical/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def __init__(self) -> None:
self.register_fn(Projection, Projection, self._drop_double_projection)

def _drop_double_projection(self, parent: Projection, child: Projection) -> LogicalPlan | None:
"""Folds two projections into one if the parent's expressions have no dependencies on the child's outputs
"""Folds two projections into one if the parent's expressions depend only on no-computation columns of the child.
Projection-Projection-* -> <Projection with combined expressions and resource requests>-*
"""
Expand All @@ -385,7 +385,7 @@ def _drop_double_projection(self, parent: Projection, child: Projection) -> Logi
can_skip_child = required_columns.issubset(child_mapping.keys())

if can_skip_child:
logger.debug(f"Folding: {parent} into {child}")
logger.debug(f"Folding: {parent}\ninto {child}")

new_exprs = []
for e in parent_projection:
Expand Down Expand Up @@ -426,6 +426,7 @@ def _drop_unneeded_projection(self, parent: Projection, child: LogicalPlan) -> L
and len(parent_projection) == len(child_output)
and all(p.name() == c.name for p, c in zip(parent_projection, child_output))
):
logger.debug(f"Dropping no-op: {parent}\nas parent of: {child}")
return child
else:
return None
Expand Down
31 changes: 22 additions & 9 deletions src/daft-plan/src/logical_ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ impl Project {
// (a maybe new input node, and a maybe new list of projection expressions).
let upstream_schema = input.schema();
let (projection, substitutions) = Self::factor_expressions(projection, &upstream_schema);

// If there are substitutions to factor out,
// create a child projection node to do the factoring.
let input = if substitutions.is_empty() {
Expand Down Expand Up @@ -283,6 +282,9 @@ impl Project {
expr.children()
} else {
let expr_id = expr.semantic_id(schema);
if let Expr::Column(..) = expr.as_ref() {
column_name_substitutions.insert(expr_id.clone(), expr.clone());
}
// Mark expr as seen
let newly_seen = seen_subexpressions.insert(expr_id.clone());
if newly_seen {
Expand All @@ -292,9 +294,6 @@ impl Project {
// If previously seen, cache the expression (if it involves computation)
if optimization::requires_computation(expr) {
subexpressions_to_cache.insert(expr_id.clone(), expr.clone());
} else if let Expr::Column(..) = expr.as_ref() {
column_name_substitutions
.insert(expr.semantic_id(schema), expr.clone());
}
// Stop recursing if previously seen;
// we only want top-level repeated subexpressions
Expand All @@ -317,10 +316,23 @@ impl Project {
let substituted_expressions = exprs
.iter()
.map(|e| {
replace_column_with_semantic_id(e.clone().into(), &subexprs_to_replace, schema)
.unwrap()
.as_ref()
.clone()
let new_expr = replace_column_with_semantic_id(
e.clone().into(),
&subexprs_to_replace,
schema,
)
.unwrap()
.as_ref()
.clone();
// The substitution can unintentionally change the expression's name
// (since the name depends on the first column referenced, which can be substituted away)
// so re-alias the original name here if it has changed.
let old_name = e.name().unwrap();
if new_expr.name().unwrap() != old_name {
new_expr.alias(old_name)
} else {
new_expr
}
})
.collect::<Vec<_>>();

Expand Down Expand Up @@ -603,7 +615,8 @@ mod tests {
Ok(())
}

/// Test that common leaf expressions are not factored out.
/// Test that common leaf expressions are not factored out
/// (since this would not save computation and only introduces another materialization)
/// e.g.
/// 3 as x, 3 as y, a as w, a as z
/// ->
Expand Down
Loading

0 comments on commit a0031d5

Please sign in to comment.