Skip to content

Commit

Permalink
Support Date32 arguments for generate_series (apache#9420)
Browse files Browse the repository at this point in the history
* Supporting Date type for range and generate_series

* fix lock

* change lock

* optimize code and tests

* recover lock

* change test

* solve conflicts

* resolve conflicts
  • Loading branch information
Lordworms authored Mar 4, 2024
1 parent a84e5f8 commit ac27428
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 14 deletions.
2 changes: 1 addition & 1 deletion datafusion/functions-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ path = "src/lib.rs"

[dependencies]
arrow = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
log = { workspace = true }
Expand Down
55 changes: 50 additions & 5 deletions datafusion/functions-array/src/kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
use arrow::array::ListArray;
use arrow::array::{
Array, ArrayRef, BooleanArray, Float32Array, Float64Array, GenericListArray,
Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, OffsetSizeTrait,
StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array,
GenericListArray, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray,
OffsetSizeTrait, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::Field;
use arrow::datatypes::{DataType, UInt64Type};
use arrow::datatypes::UInt64Type;
use arrow::datatypes::{DataType, Date32Type, IntervalMonthDayNanoType};
use datafusion_common::cast::{
as_int64_array, as_large_list_array, as_list_array, as_string_array,
as_date32_array, as_int64_array, as_interval_mdn_array, as_large_list_array,
as_list_array, as_string_array,
};
use datafusion_common::{exec_err, DataFusionError, Result};
use std::any::type_name;
Expand Down Expand Up @@ -438,3 +440,46 @@ pub fn array_ndims(args: &[ArrayRef]) -> Result<ArrayRef> {
array_type => exec_err!("array_ndims does not support type {array_type:?}"),
}
}
pub fn gen_range_date(
args: &[ArrayRef],
include_upper: i32,
) -> datafusion_common::Result<ArrayRef> {
if args.len() != 3 {
return exec_err!("arguments length does not match");
}
let (start_array, stop_array, step_array) = (
Some(as_date32_array(&args[0])?),
as_date32_array(&args[1])?,
Some(as_interval_mdn_array(&args[2])?),
);

let mut values = vec![];
let mut offsets = vec![0];
for (idx, stop) in stop_array.iter().enumerate() {
let mut stop = stop.unwrap_or(0);
let start = start_array.as_ref().map(|x| x.value(idx)).unwrap_or(0);
let step = step_array.as_ref().map(|arr| arr.value(idx)).unwrap_or(1);
let (months, days, _) = IntervalMonthDayNanoType::to_parts(step);
let neg = months < 0 || days < 0;
if include_upper == 0 {
stop = Date32Type::subtract_month_day_nano(stop, step);
}
let mut new_date = start;
loop {
if neg && new_date < stop || !neg && new_date > stop {
break;
}
values.push(new_date);
new_date = Date32Type::add_month_day_nano(new_date, step);
}
offsets.push(values.len() as i32);
}

let arr = Arc::new(ListArray::try_new(
Arc::new(Field::new("item", DataType::Date32, true)),
OffsetBuffer::new(offsets.into()),
Arc::new(Date32Array::from(values)),
None,
)?);
Ok(arr)
}
29 changes: 27 additions & 2 deletions datafusion/functions-array/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
use arrow::datatypes::DataType;
use arrow::datatypes::Field;
use arrow::datatypes::IntervalUnit::MonthDayNano;
use datafusion_common::exec_err;
use datafusion_common::plan_err;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::Expr;
use datafusion_expr::TypeSignature::Exact;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use std::sync::Arc;

// Create static instances of ScalarUDFs for each function
make_udf_function!(ArrayToString,
array_to_string,
Expand Down Expand Up @@ -106,6 +109,7 @@ impl Range {
Exact(vec![Int64]),
Exact(vec![Int64, Int64]),
Exact(vec![Int64, Int64, Int64]),
Exact(vec![Date32, Date32, Interval(MonthDayNano)]),
],
Volatility::Immutable,
),
Expand Down Expand Up @@ -136,7 +140,17 @@ impl ScalarUDFImpl for Range {

fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
crate::kernels::gen_range(&args, 0).map(ColumnarValue::Array)
match args[0].data_type() {
arrow::datatypes::DataType::Int64 => {
crate::kernels::gen_range(&args, 0).map(ColumnarValue::Array)
}
arrow::datatypes::DataType::Date32 => {
crate::kernels::gen_range_date(&args, 0).map(ColumnarValue::Array)
}
_ => {
exec_err!("unsupported type for range")
}
}
}

fn aliases(&self) -> &[String] {
Expand Down Expand Up @@ -165,6 +179,7 @@ impl GenSeries {
Exact(vec![Int64]),
Exact(vec![Int64, Int64]),
Exact(vec![Int64, Int64, Int64]),
Exact(vec![Date32, Date32, Interval(MonthDayNano)]),
],
Volatility::Immutable,
),
Expand Down Expand Up @@ -195,7 +210,17 @@ impl ScalarUDFImpl for GenSeries {

fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
crate::kernels::gen_range(&args, 1).map(ColumnarValue::Array)
match args[0].data_type() {
arrow::datatypes::DataType::Int64 => {
crate::kernels::gen_range(&args, 1).map(ColumnarValue::Array)
}
arrow::datatypes::DataType::Date32 => {
crate::kernels::gen_range_date(&args, 1).map(ColumnarValue::Array)
}
_ => {
exec_err!("unsupported type for range")
}
}
}

fn aliases(&self) -> &[String] {
Expand Down
60 changes: 54 additions & 6 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5554,26 +5554,74 @@ from arrays_range;
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] [3, 4, 5, 6, 7, 8, 9] [3, 5, 7, 9]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] [4, 5, 6, 7, 8, 9, 10, 11, 12] [4, 7, 10]

query ??????
query ??????????
select range(5),
range(2, 5),
range(2, 10, 3),
range(1, 5, -1),
range(1, -5, 1),
range(1, -5, -1)
range(1, -5, -1),
range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH),
range(DATE '1993-02-01', DATE '1993-01-01', INTERVAL '-1' DAY),
range(DATE '1989-04-01', DATE '1993-03-01', INTERVAL '1' YEAR),
range(DATE '1993-03-01', DATE '1989-04-01', INTERVAL '1' YEAR)
;
----
[0, 1, 2, 3, 4] [2, 3, 4] [2, 5, 8] [] [] [1, 0, -1, -2, -3, -4]
[0, 1, 2, 3, 4] [2, 3, 4] [2, 5, 8] [] [] [1, 0, -1, -2, -3, -4] [1992-09-01, 1992-10-01, 1992-11-01, 1992-12-01, 1993-01-01, 1993-02-01] [1993-02-01, 1993-01-31, 1993-01-30, 1993-01-29, 1993-01-28, 1993-01-27, 1993-01-26, 1993-01-25, 1993-01-24, 1993-01-23, 1993-01-22, 1993-01-21, 1993-01-20, 1993-01-19, 1993-01-18, 1993-01-17, 1993-01-16, 1993-01-15, 1993-01-14, 1993-01-13, 1993-01-12, 1993-01-11, 1993-01-10, 1993-01-09, 1993-01-08, 1993-01-07, 1993-01-06, 1993-01-05, 1993-01-04, 1993-01-03, 1993-01-02] [1989-04-01, 1990-04-01, 1991-04-01] []

## should throw error
query error
select range(DATE '1992-09-01', NULL, INTERVAL '1' YEAR);

query error
select range(DATE '1992-09-01', DATE '1993-03-01', NULL);

query error
select range(NULL, DATE '1993-03-01', INTERVAL '1' YEAR);

query ?
select range(DATE '1989-04-01', DATE '1993-03-01', INTERVAL '-1' YEAR)
----
[]

query ?
select range(DATE '1993-03-01', DATE '1989-04-01', INTERVAL '1' YEAR)
----
[]

query ?????
query ????????
select generate_series(5),
generate_series(2, 5),
generate_series(2, 10, 3),
generate_series(1, 5, 1),
generate_series(5, 1, -1)
generate_series(5, 1, -1),
generate_series(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH),
generate_series(DATE '1993-02-01', DATE '1993-01-01', INTERVAL '-1' DAY),
generate_series(DATE '1989-04-01', DATE '1993-03-01', INTERVAL '1' YEAR)
;
----
[0, 1, 2, 3, 4, 5] [2, 3, 4, 5] [2, 5, 8] [1, 2, 3, 4, 5] [5, 4, 3, 2, 1]
[0, 1, 2, 3, 4, 5] [2, 3, 4, 5] [2, 5, 8] [1, 2, 3, 4, 5] [5, 4, 3, 2, 1] [1992-09-01, 1992-10-01, 1992-11-01, 1992-12-01, 1993-01-01, 1993-02-01, 1993-03-01] [1993-02-01, 1993-01-31, 1993-01-30, 1993-01-29, 1993-01-28, 1993-01-27, 1993-01-26, 1993-01-25, 1993-01-24, 1993-01-23, 1993-01-22, 1993-01-21, 1993-01-20, 1993-01-19, 1993-01-18, 1993-01-17, 1993-01-16, 1993-01-15, 1993-01-14, 1993-01-13, 1993-01-12, 1993-01-11, 1993-01-10, 1993-01-09, 1993-01-08, 1993-01-07, 1993-01-06, 1993-01-05, 1993-01-04, 1993-01-03, 1993-01-02, 1993-01-01] [1989-04-01, 1990-04-01, 1991-04-01, 1992-04-01]

## should throw error
query error
select generate_series(DATE '1992-09-01', NULL, INTERVAL '1' YEAR);

query error
select generate_series(DATE '1992-09-01', DATE '1993-03-01', NULL);

query error
select generate_series(NULL, DATE '1993-03-01', INTERVAL '1' YEAR);


query ?
select generate_series(DATE '1989-04-01', DATE '1993-03-01', INTERVAL '-1' YEAR)
----
[]

query ?
select generate_series(DATE '1993-03-01', DATE '1989-04-01', INTERVAL '1' YEAR)
----
[]

## array_except

Expand Down

0 comments on commit ac27428

Please sign in to comment.