Skip to content

Commit

Permalink
Add support for functional dependency for ROW_NUMBER window function. (
Browse files Browse the repository at this point in the history
…apache#8737)

* Add primary key support for row_number window function

* Add comments, minor changes

* Add new test

* Review

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
mustafasrepo and ozankabak authored Jan 4, 2024
1 parent e6b9f52 commit 819d357
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 8 deletions.
59 changes: 52 additions & 7 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use std::sync::Arc;
use super::dml::CopyTo;
use super::DdlStatement;
use crate::dml::CopyOptions;
use crate::expr::{Alias, Exists, InSubquery, Placeholder, Sort as SortExpr};
use crate::expr::{
Alias, Exists, InSubquery, Placeholder, Sort as SortExpr, WindowFunction,
};
use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
Expand All @@ -36,9 +38,9 @@ use crate::utils::{
split_conjunction,
};
use crate::{
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Expr,
ExprSchemable, LogicalPlanBuilder, Operator, TableProviderFilterPushDown,
TableSource,
build_join_schema, expr_vec_fmt, BinaryExpr, BuiltInWindowFunction,
CreateMemoryTable, CreateView, Expr, ExprSchemable, LogicalPlanBuilder, Operator,
TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Expand All @@ -48,9 +50,10 @@ use datafusion_common::tree_node::{
};
use datafusion_common::{
aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
DFField, DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependencies,
OwnedTableReference, ParamValues, Result, UnnestOptions,
DFField, DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
FunctionalDependencies, OwnedTableReference, ParamValues, Result, UnnestOptions,
};

// backwards compatibility
pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};
Expand Down Expand Up @@ -1967,7 +1970,9 @@ pub struct Window {
impl Window {
/// Create a new window operator.
pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
let mut window_fields: Vec<DFField> = input.schema().fields().clone();
let fields = input.schema().fields();
let input_len = fields.len();
let mut window_fields = fields.clone();
window_fields.extend_from_slice(&exprlist_to_fields(window_expr.iter(), &input)?);
let metadata = input.schema().metadata().clone();

Expand All @@ -1976,6 +1981,46 @@ impl Window {
input.schema().functional_dependencies().clone();
window_func_dependencies.extend_target_indices(window_fields.len());

// Since we know that ROW_NUMBER outputs will be unique (i.e. it consists
// of consecutive numbers per partition), we can represent this fact with
// functional dependencies.
let mut new_dependencies = window_expr
.iter()
.enumerate()
.filter_map(|(idx, expr)| {
if let Expr::WindowFunction(WindowFunction {
// Function is ROW_NUMBER
fun:
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::RowNumber,
),
partition_by,
..
}) = expr
{
// When there is no PARTITION BY, row number will be unique
// across the entire table.
if partition_by.is_empty() {
return Some(idx + input_len);
}
}
None
})
.map(|idx| {
FunctionalDependence::new(vec![idx], vec![], false)
.with_mode(Dependency::Single)
})
.collect::<Vec<_>>();

if !new_dependencies.is_empty() {
for dependence in new_dependencies.iter_mut() {
dependence.target_indices = (0..window_fields.len()).collect();
}
// Add the dependency introduced because of ROW_NUMBER window function to the functional dependency
let new_deps = FunctionalDependencies::new(new_dependencies);
window_func_dependencies.extend(new_deps);
}

Ok(Window {
input,
window_expr,
Expand Down
40 changes: 39 additions & 1 deletion datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3832,4 +3832,42 @@ select row_number() over (partition by 1 order by 1) rn,
from (select 1 a union all select 2 a) x;
----
1 1 1 1 1 1
2 1 1 2 2 1
2 1 1 2 2 1

# when partition by expression is empty row number result will be unique.
query TII
SELECT *
FROM (SELECT c1, c2, ROW_NUMBER() OVER() as rn
FROM aggregate_test_100
LIMIT 5)
GROUP BY rn
ORDER BY rn;
----
c 2 1
d 5 2
b 1 3
a 1 4
b 5 5

# when partition by expression is constant row number result will be unique.
query TII
SELECT *
FROM (SELECT c1, c2, ROW_NUMBER() OVER(PARTITION BY 3) as rn
FROM aggregate_test_100
LIMIT 5)
GROUP BY rn
ORDER BY rn;
----
c 2 1
d 5 2
b 1 3
a 1 4
b 5 5

statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression aggregate_test_100.c1 could not be resolved from available columns: rn
SELECT *
FROM (SELECT c1, c2, ROW_NUMBER() OVER(PARTITION BY c1) as rn
FROM aggregate_test_100
LIMIT 5)
GROUP BY rn
ORDER BY rn;

0 comments on commit 819d357

Please sign in to comment.