Skip to content

Commit

Permalink
feat: Support Variance
Browse files Browse the repository at this point in the history
  • Loading branch information
Huaxin Gao committed Apr 21, 2024
1 parent 8e73cfd commit bddbc71
Show file tree
Hide file tree
Showing 7 changed files with 390 additions and 2 deletions.
5 changes: 4 additions & 1 deletion EXPRESSIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,7 @@ The following Spark expressions are currently available:
+ BitXor
+ BoolAnd
+ BoolOr
+ Covariance
+ CovPopulation
+ CovSample
+ VariancePop
+ VarianceSamp
1 change: 1 addition & 0 deletions core/src/execution/datafusion/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ pub mod subquery;
pub mod sum_decimal;
pub mod temporal;
mod utils;
pub mod variance;
259 changes: 259 additions & 0 deletions core/src/execution/datafusion/expressions/variance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines physical expressions that can evaluated at runtime during query execution
use std::{any::Any, sync::Arc};

use crate::execution::datafusion::expressions::{stats::StatsType, utils::down_cast_any_ref};
use arrow::{
array::{ArrayRef, Float64Array},
compute::cast,
datatypes::{DataType, Field},
};
use datafusion::logical_expr::Accumulator;
use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr};

/// VAR_SAMP and VAR_POP aggregate expression
/// The implementation mostly is the same as the DataFusion's implementation. The reason
/// we have our own implementation is that DataFusion has UInt64 for state_field `count`,
/// while Spark has Double for count. Also we have added `null_on_divide_by_zero`
/// to be consistent with Spark's implementation.
#[derive(Debug)]
pub struct Variance {
name: String,
expr: Arc<dyn PhysicalExpr>,
stats_type: StatsType,
null_on_divide_by_zero: bool,
}

impl Variance {
/// Create a new VARIANCE aggregate function
pub fn new(
expr: Arc<dyn PhysicalExpr>,
name: impl Into<String>,
data_type: DataType,
stats_type: StatsType,
null_on_divide_by_zero: bool,
) -> Self {
// the result of variance just support FLOAT64 data type.
assert!(matches!(data_type, DataType::Float64));
Self {
name: name.into(),
expr,
stats_type,
null_on_divide_by_zero,
}
}
}

impl AggregateExpr for Variance {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
Ok(Field::new(&self.name, DataType::Float64, true))
}

fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(VarianceAccumulator::try_new(
self.stats_type,
self.null_on_divide_by_zero,
)?))
}

fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(VarianceAccumulator::try_new(
self.stats_type,
self.null_on_divide_by_zero,
)?))
}

fn state_fields(&self) -> Result<Vec<Field>> {
Ok(vec![
Field::new(
format_state_name(&self.name, "count"),
DataType::Float64,
true,
),
Field::new(
format_state_name(&self.name, "mean"),
DataType::Float64,
true,
),
Field::new(format_state_name(&self.name, "m2"), DataType::Float64, true),
])
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}

fn name(&self) -> &str {
&self.name
}
}

impl PartialEq<dyn Any> for Variance {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| {
self.name == x.name && self.expr.eq(&x.expr) && self.stats_type == x.stats_type
})
.unwrap_or(false)
}
}

/// An accumulator to compute variance
#[derive(Debug)]
pub struct VarianceAccumulator {
m2: f64,
mean: f64,
count: f64,
stats_type: StatsType,
null_on_divide_by_zero: bool,
}

impl VarianceAccumulator {
/// Creates a new `VarianceAccumulator`
pub fn try_new(s_type: StatsType, null_on_divide_by_zero: bool) -> Result<Self> {
Ok(Self {
m2: 0_f64,
mean: 0_f64,
count: 0_f64,
stats_type: s_type,
null_on_divide_by_zero,
})
}

pub fn get_count(&self) -> f64 {
self.count
}

pub fn get_mean(&self) -> f64 {
self.mean
}

pub fn get_m2(&self) -> f64 {
self.m2
}
}

impl Accumulator for VarianceAccumulator {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::from(self.count),
ScalarValue::from(self.mean),
ScalarValue::from(self.m2),
])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &cast(&values[0], &DataType::Float64)?;
let arr = downcast_value!(values, Float64Array).iter().flatten();

for value in arr {
let new_count = self.count + 1.0;
let delta1 = value - self.mean;
let new_mean = delta1 / new_count + self.mean;
let delta2 = value - new_mean;
let new_m2 = self.m2 + delta1 * delta2;

self.count += 1.0;
self.mean = new_mean;
self.m2 = new_m2;
}

Ok(())
}

fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &cast(&values[0], &DataType::Float64)?;
let arr = downcast_value!(values, Float64Array).iter().flatten();

for value in arr {
let new_count = self.count - 1.0;
let delta1 = self.mean - value;
let new_mean = delta1 / new_count + self.mean;
let delta2 = new_mean - value;
let new_m2 = self.m2 - delta1 * delta2;

self.count -= 1.0;
self.mean = new_mean;
self.m2 = new_m2;
}

Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
let counts = downcast_value!(states[0], Float64Array);
let means = downcast_value!(states[1], Float64Array);
let m2s = downcast_value!(states[2], Float64Array);

for i in 0..counts.len() {
let c = counts.value(i);
if c == 0_f64 {
continue;
}
let new_count = self.count + c;
let new_mean = self.mean * self.count / new_count + means.value(i) * c / new_count;
let delta = self.mean - means.value(i);
let new_m2 = self.m2 + m2s.value(i) + delta * delta * self.count * c / new_count;

self.count = new_count;
self.mean = new_mean;
self.m2 = new_m2;
}
Ok(())
}

fn evaluate(&mut self) -> Result<ScalarValue> {
let count = match self.stats_type {
StatsType::Population => self.count,
StatsType::Sample => {
if self.count > 0.0 {
self.count - 1.0
} else {
self.count
}
}
};

Ok(ScalarValue::Float64(match self.count {
count if count == 0.0 => None,
count if count == 1.0 => {
if let StatsType::Population = self.stats_type {
Some(0.0)
} else if self.null_on_divide_by_zero {
None
} else {
Some(f64::NAN)
}
}
_ => Some(self.m2 / count),
}))
}

fn size(&self) -> usize {
std::mem::size_of_val(self)
}
}
23 changes: 23 additions & 0 deletions core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ use crate::{
subquery::Subquery,
sum_decimal::SumDecimal,
temporal::{DateTruncExec, HourExec, MinuteExec, SecondExec, TimestampTruncExec},
variance::Variance,
NormalizeNaNAndZero,
},
operators::expand::CometExpandExec,
Expand Down Expand Up @@ -1219,6 +1220,28 @@ impl PhysicalPlanner {
StatsType::Population,
)))
}
AggExprStruct::VarianceSample(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?;
let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap());
Ok(Arc::new(Variance::new(
child,
"variance",
datatype,
StatsType::Sample,
expr.null_on_divide_by_zero,
)))
}
AggExprStruct::VariancePopulation(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?;
let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap());
Ok(Arc::new(Variance::new(
child,
"variance_pop",
datatype,
StatsType::Population,
expr.null_on_divide_by_zero,
)))
}
}
}

Expand Down
14 changes: 14 additions & 0 deletions core/src/execution/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ message AggExpr {
BitXorAgg bitXorAgg = 11;
CovSample covSample = 12;
CovPopulation covPopulation = 13;
VarianceSample varianceSample = 14;
VariancePopulation variancePopulation = 15;
}
}

Expand Down Expand Up @@ -165,6 +167,18 @@ message CovPopulation {
DataType datatype = 4;
}

message VarianceSample {
Expr child = 1;
bool null_on_divide_by_zero = 2;
DataType datatype = 3;
}

message VariancePopulation {
Expr child = 1;
bool null_on_divide_by_zero = 2;
DataType datatype = 3;
}

message Literal {
oneof value {
bool bool_val = 1;
Expand Down
38 changes: 37 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, Count, CovPopulation, CovSample, Final, First, Last, Max, Min, Partial, Sum}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, Count, CovPopulation, CovSample, Final, First, Last, Max, Min, Partial, Sum, VariancePop, VarianceSamp}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.optimizer.{BuildRight, NormalizeNaNAndZero}
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -426,6 +426,42 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde {
} else {
None
}
case variance @ VarianceSamp(child, nullOnDivideByZero) =>
val childExpr = exprToProto(child, inputs, binding)
val dataType = serializeDataType(variance.dataType)

if (childExpr.isDefined && dataType.isDefined) {
val varBuilder = ExprOuterClass.VarianceSample.newBuilder()
varBuilder.setChild(childExpr.get)
varBuilder.setNullOnDivideByZero(nullOnDivideByZero)
varBuilder.setDatatype(dataType.get)

Some(
ExprOuterClass.AggExpr
.newBuilder()
.setVarianceSample(varBuilder)
.build())
} else {
None
}
case variancePop @ VariancePop(child, nullOnDivideByZero) =>
val childExpr = exprToProto(child, inputs, binding)
val dataType = serializeDataType(variancePop.dataType)

if (childExpr.isDefined && dataType.isDefined) {
val varBuilder = ExprOuterClass.VariancePopulation.newBuilder()
varBuilder.setChild(childExpr.get)
varBuilder.setNullOnDivideByZero(nullOnDivideByZero)
varBuilder.setDatatype(dataType.get)

Some(
ExprOuterClass.AggExpr
.newBuilder()
.setVariancePopulation(varBuilder)
.build())
} else {
None
}
case fn =>
emitWarning(s"unsupported Spark aggregate function: $fn")
None
Expand Down
Loading

0 comments on commit bddbc71

Please sign in to comment.