Skip to content

Commit

Permalink
Introduced StatisticsArgs
Browse files Browse the repository at this point in the history
  • Loading branch information
edmondop committed Sep 28, 2024
1 parent 016decf commit f41dd3e
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 84 deletions.
2 changes: 1 addition & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub use logical_plan::*;
pub use partition_evaluator::PartitionEvaluator;
pub use sqlparser;
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
pub use udaf::{AggregateUDF, AggregateUDFImpl, ReversedUDAF};
pub use udaf::{AggregateUDF, AggregateUDFImpl, ReversedUDAF, StatisticsArgs};
pub use udf::{ScalarUDF, ScalarUDFImpl};
pub use udwf::{WindowUDF, WindowUDFImpl};
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
Expand Down
26 changes: 15 additions & 11 deletions datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ impl fmt::Display for AggregateUDF {
}
}

pub struct StatisticsArgs<'a> {
pub statistics: &'a Statistics,
pub return_type: &'a DataType,
/// Whether the aggregate function is distinct.
///
/// ```sql /// SELECT COUNT(DISTINCT column1) FROM t;
/// ```
pub is_distinct: bool,
/// The physical expression of arguments the aggregate function takes.
pub exprs: &'a [Arc<dyn PhysicalExpr>],
}

impl AggregateUDF {
/// Create a new AggregateUDF
///
Expand Down Expand Up @@ -265,12 +277,9 @@ impl AggregateUDF {

pub fn value_from_stats(
&self,
statistics: &Statistics,
data_type: &DataType,
arguments: &[Arc<dyn PhysicalExpr>],
statistics_args: &StatisticsArgs,
) -> Option<ScalarValue> {
self.inner
.value_from_stats(statistics, &data_type, arguments)
self.inner.value_from_stats(statistics_args)
}

/// See [`AggregateUDFImpl::default_value`] for more details.
Expand Down Expand Up @@ -586,12 +595,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
None
}
// Return the value of the current UDF from the statistics
fn value_from_stats(
&self,
_statistics: &Statistics,
_data_type: &DataType,
_arguments: &[Arc<dyn PhysicalExpr>],
) -> Option<ScalarValue> {
fn value_from_stats(&self, _statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
None
}

Expand Down
31 changes: 16 additions & 15 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion_expr::{
function::AccumulatorArgs, utils::format_state_name, Accumulator, AggregateUDFImpl,
EmitTo, GroupsAccumulator, Signature, Volatility,
};
use datafusion_expr::{Expr, ReversedUDAF, TypeSignature};
use datafusion_expr::{Expr, ReversedUDAF, StatisticsArgs, TypeSignature};
use datafusion_functions_aggregate_common::aggregate::count_distinct::{
BytesDistinctCountAccumulator, FloatDistinctCountAccumulator,
PrimitiveDistinctCountAccumulator,
Expand Down Expand Up @@ -295,25 +295,26 @@ impl AggregateUDFImpl for Count {
Ok(ScalarValue::Int64(Some(0)))
}

fn value_from_stats(
&self,
statistics: &datafusion_common::Statistics,
_data_type: &DataType,
arguments: &[Arc<dyn datafusion_physical_expr::PhysicalExpr>],
) -> Option<ScalarValue> {
if let Precision::Exact(num_rows) = statistics.num_rows {
if arguments.len() == 1 {
fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
if statistics_args.is_distinct {
return None;
}
if let Precision::Exact(num_rows) = statistics_args.statistics.num_rows {
if statistics_args.exprs.len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) =
arguments[0].as_any().downcast_ref::<expressions::Column>()
if let Some(col_expr) = statistics_args.exprs[0]
.as_any()
.downcast_ref::<expressions::Column>()
{
let current_val =
&statistics.column_statistics[col_expr.index()].null_count;
let current_val = &statistics_args.statistics.column_statistics
[col_expr.index()]
.null_count;
if let &Precision::Exact(val) = current_val {
return Some(ScalarValue::Int64(Some((num_rows - val) as i64)));
}
} else if let Some(lit_expr) =
arguments[0].as_any().downcast_ref::<expressions::Literal>()
} else if let Some(lit_expr) = statistics_args.exprs[0]
.as_any()
.downcast_ref::<expressions::Literal>()
{
if lit_expr.value() == &COUNT_STAR_EXPANSION {
return Some(ScalarValue::Int64(Some(num_rows as i64)));
Expand Down
43 changes: 15 additions & 28 deletions datafusion/functions-aggregate/src/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// under the License.

//! [`Max`] and [`MaxAccumulator`] accumulator for the `max` function
//! [`Min`] and [`MinAccumulator`] accumulator for the `max` function
//! [`Min`] and [`MinAccumulator`] accumulator for the `min` function

// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
Expand Down Expand Up @@ -52,12 +52,10 @@ use arrow_schema::IntervalUnit;
use datafusion_common::stats::Precision;
use datafusion_common::{
downcast_value, exec_err, internal_err, ColumnStatistics, DataFusionError, Result,
Statistics,
};
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
use datafusion_physical_expr::{expressions, PhysicalExpr};
use datafusion_physical_expr::expressions;
use std::fmt::Debug;
use std::sync::Arc;

use arrow::datatypes::i256;
use arrow::datatypes::{
Expand All @@ -67,10 +65,10 @@ use arrow::datatypes::{
};

use datafusion_common::ScalarValue;
use datafusion_expr::GroupsAccumulator;
use datafusion_expr::{
function::AccumulatorArgs, Accumulator, AggregateUDFImpl, Signature, Volatility,
};
use datafusion_expr::{GroupsAccumulator, StatisticsArgs};
use half::f16;
use std::ops::Deref;

Expand Down Expand Up @@ -159,19 +157,18 @@ trait FromColumnStatistics {

fn value_from_statistics(
&self,
statistics: &Statistics,
data_type: &DataType,
arguments: &[Arc<dyn PhysicalExpr>],
statistics_args: &StatisticsArgs,
) -> Option<ScalarValue> {
if let Precision::Exact(num_rows) = &statistics.num_rows {
if let Precision::Exact(num_rows) = &statistics_args.statistics.num_rows {
match *num_rows {
0 => return ScalarValue::try_from(data_type).ok(),
0 => return ScalarValue::try_from(statistics_args.return_type).ok(),
value if value > 0 => {
let col_stats = &statistics.column_statistics;
if arguments.len() == 1 {
let col_stats = &statistics_args.statistics.column_statistics;
if statistics_args.exprs.len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) =
arguments[0].as_any().downcast_ref::<expressions::Column>()
if let Some(col_expr) = statistics_args.exprs[0]
.as_any()
.downcast_ref::<expressions::Column>()
{
return self.value_from_column_statistics(
&col_stats[col_expr.index()],
Expand Down Expand Up @@ -336,13 +333,8 @@ impl AggregateUDFImpl for Max {
fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
datafusion_expr::ReversedUDAF::Identical
}
fn value_from_stats(
&self,
statistics: &Statistics,
data_type: &DataType,
arguments: &[Arc<dyn PhysicalExpr>],
) -> Option<ScalarValue> {
self.value_from_statistics(statistics, data_type, arguments)
fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
self.value_from_statistics(statistics_args)
}
}

Expand Down Expand Up @@ -1128,13 +1120,8 @@ impl AggregateUDFImpl for Min {
Some(false)
}

fn value_from_stats(
&self,
statistics: &Statistics,
data_type: &DataType,
arguments: &[Arc<dyn PhysicalExpr>],
) -> Option<ScalarValue> {
self.value_from_statistics(statistics, data_type, arguments)
fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
self.value_from_statistics(statistics_args)
}
fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity {
datafusion_expr::utils::AggregateOrderSensitivity::Insensitive
Expand Down
45 changes: 16 additions & 29 deletions datafusion/physical-optimizer/src/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use datafusion_common::scalar::ScalarValue;
use datafusion_common::Result;
use datafusion_physical_plan::aggregates::AggregateExec;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{expressions, ExecutionPlan, Statistics};
use datafusion_physical_plan::{expressions, ExecutionPlan};

use crate::PhysicalOptimizerRule;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs};

/// Optimizer that uses available statistics for aggregate functions
#[derive(Default)]
Expand All @@ -55,14 +55,19 @@ impl PhysicalOptimizerRule for AggregateStatistics {
let stats = partial_agg_exec.input().statistics()?;
let mut projections = vec![];
for expr in partial_agg_exec.aggr_expr() {
if let Some((non_null_rows, name)) =
take_optimizable_column_and_table_count(expr, &stats)
let field = expr.field();
let args = expr.expressions();
let statistics_args = StatisticsArgs {
statistics: &stats,
return_type: field.data_type(),
is_distinct: expr.is_distinct(),
exprs: args.as_slice(),
};
if let Some((optimizable_statistic, name)) =
take_optimizable_value_from_statistics(&statistics_args, expr)
{
projections.push((expressions::lit(non_null_rows), name.to_owned()));
} else if let Some((min_or_max, name)) =
take_optimizable_value_from_statistics(expr, &stats)
{
projections.push((expressions::lit(min_or_max), name.to_owned()));
projections
.push((expressions::lit(optimizable_statistic), name.to_owned()));
} else {
// TODO: we need all aggr_expr to be resolved (cf TODO fullres)
break;
Expand Down Expand Up @@ -133,29 +138,11 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>>
None
}

/// If this agg_expr is a count that can be exactly derived from the statistics, return it.
fn take_optimizable_column_and_table_count(
agg_expr: &AggregateFunctionExpr,
stats: &Statistics,
) -> Option<(ScalarValue, String)> {
if !agg_expr.is_distinct() {
if let Some((val, name)) = take_optimizable_value_from_statistics(agg_expr, stats)
{
return Some((val, name));
}
}
None
}

/// If this agg_expr is a max that is exactly defined in the statistics, return it.
fn take_optimizable_value_from_statistics(
statistics_args: &StatisticsArgs,
agg_expr: &AggregateFunctionExpr,
stats: &Statistics,
) -> Option<(ScalarValue, String)> {
let value = agg_expr.fun().value_from_stats(
&stats,
agg_expr.field().data_type(),
agg_expr.expressions().as_slice(),
);
let value = agg_expr.fun().value_from_stats(statistics_args);
value.map(|val| (val, agg_expr.name().to_string()))
}
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub mod windows;
pub mod work_table;

pub mod udaf {
pub use datafusion_expr::StatisticsArgs;
pub use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
}

Expand Down

0 comments on commit f41dd3e

Please sign in to comment.