-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kurtosis udaf #4
Changes from 5 commits
26f4e39
02b0a58
b7b873e
32bb384
1342ef9
23ffedd
33fbfd4
ed24a8c
a0cf8a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,10 +77,20 @@ SELECT min_by(x, y) FROM VALUES (1, 10), (2, 5), (3, 15), (4, 8) as tab(x, y); | |
-- | 2 | | ||
-- +---------------------+ | ||
|
||
-- Get the kurtosis value of a column of numeric data. | ||
SELECT kurtosis(col) FROM VALUES (1.0), (10.0), (100.0), (10.0), (1.0) as tab(col); | ||
-- Results in | ||
- +-------------------+ | ||
- "| kurtosis(tab.col) |" | ||
- +-------------------+ | ||
- "| 4.777292927667962 |" | ||
- +-------------------+ | ||
|
||
``` | ||
|
||
## Done | ||
|
||
- [x] `mode(expression) -> scalar` - Returns the most frequent (mode) value from a column of data. | ||
- [x] `max_by(expression1, expression2) -> scalar` - Returns the value of `expression1` associated with the maximum value of `expression2`. | ||
- [x] `min_by(expression1, expression2) -> scalar` - Returns the value of `expression1` associated with the minimum value of `expression2`. | ||
- [x] `kurtosis(expression) -> scalar` - Returns the kurtosis, a measure of the `tailedness` of the distribution, for the given numeric values in `expression`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This documentation is not same as one in code, we should have consistent documentation everywhere. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. made all the mentioned changes @dharanad |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use arrow::array::{ArrayRef, Float64Array, UInt64Array}; | ||
use arrow::datatypes::{DataType, Field}; | ||
use datafusion::arrow; | ||
use std::any::Any; | ||
use std::fmt::Debug; | ||
|
||
use datafusion::common::cast::as_float64_array; | ||
use datafusion::common::downcast_value; | ||
use datafusion::common::DataFusionError; | ||
use datafusion::error::Result; | ||
use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs}; | ||
use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; | ||
use datafusion::scalar::ScalarValue; | ||
|
||
make_udaf_expr_and_func!( | ||
KurtosisFunction, | ||
kurtosis, | ||
x, | ||
"Calculates the excess kurtosis (Fisher’s definition) with bias correction according to the sample size.", | ||
kurtosis_udaf | ||
); | ||
|
||
pub struct KurtosisFunction { | ||
signature: Signature, | ||
} | ||
|
||
impl Debug for KurtosisFunction { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("KurtosisFunction") | ||
.field("signature", &self.signature) | ||
.finish() | ||
} | ||
} | ||
Comment on lines
+44
to
+50
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that an explicit implementation of Debug will be more informative here. |
||
|
||
impl Default for KurtosisFunction { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
impl KurtosisFunction { | ||
pub fn new() -> Self { | ||
Self { | ||
signature: Signature::coercible(vec![DataType::Float64], Volatility::Immutable), | ||
} | ||
} | ||
} | ||
|
||
impl AggregateUDFImpl for KurtosisFunction { | ||
fn as_any(&self) -> &dyn Any { | ||
self | ||
} | ||
|
||
fn name(&self) -> &str { | ||
"kurtosis" | ||
} | ||
|
||
fn signature(&self) -> &Signature { | ||
&self.signature | ||
} | ||
|
||
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { | ||
Ok(DataType::Float64) | ||
} | ||
|
||
fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> { | ||
Ok(Box::new(KurtosisAccumulator::new())) | ||
} | ||
|
||
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<Field>> { | ||
Ok(vec![ | ||
Field::new("count", DataType::UInt64, true), | ||
Field::new("sum", DataType::Float64, true), | ||
Field::new("sum_sqr", DataType::Float64, true), | ||
Field::new("sum_cub", DataType::Float64, true), | ||
Field::new("sum_four", DataType::Float64, true), | ||
]) | ||
} | ||
} | ||
|
||
/// Accumulator for calculating the excess kurtosis (Fisher’s definition) with bias correction according to the sample size. | ||
/// This implementation follows the [DuckDB implementation]: | ||
/// <https://github.com/duckdb/duckdb/blob/main/src/core_functions/aggregate/distributive/kurtosis.cpp> | ||
#[derive(Debug, Default)] | ||
pub struct KurtosisAccumulator { | ||
count: u64, | ||
sum: f64, | ||
sum_sqr: f64, | ||
sum_cub: f64, | ||
sum_four: f64, | ||
} | ||
|
||
impl KurtosisAccumulator { | ||
pub fn new() -> Self { | ||
Self { | ||
count: 0, | ||
sum: 0.0, | ||
sum_sqr: 0.0, | ||
sum_cub: 0.0, | ||
sum_four: 0.0, | ||
} | ||
} | ||
} | ||
|
||
impl Accumulator for KurtosisAccumulator { | ||
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { | ||
let array = as_float64_array(&values[0])?; | ||
for value in array.iter().flatten() { | ||
self.count += 1; | ||
self.sum += value; | ||
self.sum_sqr += value.powi(2); | ||
self.sum_cub += value.powi(3); | ||
self.sum_four += value.powi(4); | ||
} | ||
Ok(()) | ||
} | ||
|
||
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { | ||
let counts = downcast_value!(states[0], UInt64Array); | ||
let sums = downcast_value!(states[1], Float64Array); | ||
let sum_sqrs = downcast_value!(states[2], Float64Array); | ||
let sum_cubs = downcast_value!(states[3], Float64Array); | ||
let sum_fours = downcast_value!(states[4], Float64Array); | ||
|
||
for i in 0..counts.len() { | ||
let c = counts.value(i); | ||
if c == 0 { | ||
continue; | ||
} | ||
self.count += c; | ||
self.sum += sums.value(i); | ||
self.sum_sqr += sum_sqrs.value(i); | ||
self.sum_cub += sum_cubs.value(i); | ||
self.sum_four += sum_fours.value(i); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
fn evaluate(&mut self) -> Result<ScalarValue> { | ||
if self.count <= 3 { | ||
return Ok(ScalarValue::Float64(None)); | ||
} | ||
|
||
let count_64 = 1_f64 / self.count as f64; | ||
let m4 = count_64 | ||
* (self.sum_four - 4.0 * self.sum_cub * self.sum * count_64 | ||
+ 6.0 * self.sum_sqr * self.sum.powi(2) * count_64.powi(2) | ||
- 3.0 * self.sum.powi(4) * count_64.powi(3)); | ||
|
||
let m2 = (self.sum_sqr - self.sum.powi(2) * count_64) * count_64; | ||
if m2 <= 0.0 { | ||
return Ok(ScalarValue::Float64(None)); | ||
} | ||
|
||
let count = self.count as f64; | ||
let numerator = (count - 1.0) * ((count + 1.0) * m4 / m2.powi(2) - 3.0 * (count - 1.0)); | ||
let denominator = (count - 2.0) * (count - 3.0); | ||
|
||
let target = numerator / denominator; | ||
|
||
Ok(ScalarValue::Float64(Some(target))) | ||
} | ||
|
||
fn size(&self) -> usize { | ||
std::mem::size_of_val(self) | ||
} | ||
|
||
fn state(&mut self) -> Result<Vec<ScalarValue>> { | ||
Ok(vec![ | ||
ScalarValue::from(self.count), | ||
ScalarValue::from(self.sum), | ||
ScalarValue::from(self.sum_sqr), | ||
ScalarValue::from(self.sum_cub), | ||
ScalarValue::from(self.sum_four), | ||
]) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,16 +26,24 @@ use datafusion::logical_expr::AggregateUDF; | |
#[macro_use] | ||
pub mod macros; | ||
pub mod common; | ||
pub mod kurtosis; | ||
pub mod max_min_by; | ||
pub mod mode; | ||
|
||
pub mod expr_extra_fn { | ||
pub use super::kurtosis::kurtosis; | ||
pub use super::max_min_by::max_by; | ||
pub use super::max_min_by::min_by; | ||
pub use super::mode::mode; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pls add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
|
||
pub fn all_extra_aggregate_functions() -> Vec<Arc<AggregateUDF>> { | ||
vec![mode_udaf(), max_min_by::max_by_udaf(), max_min_by::min_by_udaf()] | ||
vec![ | ||
mode_udaf(), | ||
max_min_by::max_by_udaf(), | ||
max_min_by::min_by_udaf(), | ||
kurtosis::kurtosis_udaf(), | ||
] | ||
} | ||
|
||
/// Registers all enabled packages with a [`FunctionRegistry`] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we have enough example, just a description in "Done" section, will be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made all the mentioned changes @dmitrybugakov