diff --git a/daft/daft.pyi b/daft/daft.pyi index 3de0124309..42bd30c7e8 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -805,6 +805,10 @@ class PySeries: def dt_month(self) -> PySeries: ... def dt_year(self) -> PySeries: ... def dt_day_of_week(self) -> PySeries: ... + def partitioning_days(self) -> PySeries: ... + def partitioning_hours(self) -> PySeries: ... + def partitioning_months(self) -> PySeries: ... + def partitioning_years(self) -> PySeries: ... def list_lengths(self) -> PySeries: ... def image_decode(self) -> PySeries: ... def image_encode(self, image_format: ImageFormat) -> PySeries: ... diff --git a/daft/series.py b/daft/series.py index 5c2c202873..807f23bbe2 100644 --- a/daft/series.py +++ b/daft/series.py @@ -487,6 +487,10 @@ def list(self) -> SeriesListNamespace: def image(self) -> SeriesImageNamespace: return SeriesImageNamespace.from_series(self) + @property + def partitioning(self) -> SeriesPartitioningNamespace: + return SeriesPartitioningNamespace.from_series(self) + def __reduce__(self) -> tuple: if self.datatype()._is_python_type(): return (Series.from_pylist, (self.to_pylist(), self.name(), "force")) @@ -578,6 +582,20 @@ def day_of_week(self) -> Series: return Series._from_pyseries(self._series.dt_day_of_week()) +class SeriesPartitioningNamespace(SeriesNamespace): + def days(self) -> Series: + return Series._from_pyseries(self._series.partitioning_days()) + + def hours(self) -> Series: + return Series._from_pyseries(self._series.partitioning_hours()) + + def months(self) -> Series: + return Series._from_pyseries(self._series.partitioning_months()) + + def years(self) -> Series: + return Series._from_pyseries(self._series.partitioning_years()) + + class SeriesListNamespace(SeriesNamespace): def lengths(self) -> Series: return Series._from_pyseries(self._series.list_lengths()) diff --git a/src/daft-core/src/array/ops/arithmetic.rs b/src/daft-core/src/array/ops/arithmetic.rs index 1066a7a5f4..9209b529a3 100644 --- a/src/daft-core/src/array/ops/arithmetic.rs +++ b/src/daft-core/src/array/ops/arithmetic.rs @@ -4,7 +4,7 @@ use arrow2::{array::PrimitiveArray, compute::arithmetics::basic}; use crate::{ array::DataArray, - datatypes::{DaftNumericType, Float64Array, Utf8Array}, + datatypes::{DaftNumericType, Float64Array, Int64Array, Utf8Array}, kernels::utf8::add_utf8_arrays, }; @@ -121,6 +121,13 @@ impl Div for &Float64Array { } } +impl Div for &Int64Array { + type Output = DaftResult; + fn div(self, rhs: Self) -> Self::Output { + arithmetic_helper(self, rhs, basic::div, |l, r| l / r) + } +} + pub fn binary_with_nulls( lhs: &PrimitiveArray, rhs: &PrimitiveArray, diff --git a/src/daft-core/src/python/series.rs b/src/daft-core/src/python/series.rs index a750a49ab0..ae63e2832f 100644 --- a/src/daft-core/src/python/series.rs +++ b/src/daft-core/src/python/series.rs @@ -288,6 +288,22 @@ impl PySeries { Ok(self.series.dt_day_of_week()?.into()) } + pub fn partitioning_days(&self) -> PyResult { + Ok(self.series.partitioning_days()?.into()) + } + + pub fn partitioning_hours(&self) -> PyResult { + Ok(self.series.partitioning_hours()?.into()) + } + + pub fn partitioning_months(&self) -> PyResult { + Ok(self.series.partitioning_months()?.into()) + } + + pub fn partitioning_years(&self) -> PyResult { + Ok(self.series.partitioning_years()?.into()) + } + pub fn list_lengths(&self) -> PyResult { Ok(self.series.list_lengths()?.into_series().into()) } diff --git a/src/daft-core/src/series/ops/mod.rs b/src/daft-core/src/series/ops/mod.rs index 868b71f038..f2bdf59b90 100644 --- a/src/daft-core/src/series/ops/mod.rs +++ b/src/daft-core/src/series/ops/mod.rs @@ -22,6 +22,7 @@ pub mod len; pub mod list; pub mod not; pub mod null; +pub mod partitioning; pub mod search_sorted; pub mod sort; pub mod take; diff --git a/src/daft-core/src/series/ops/partitioning.rs b/src/daft-core/src/series/ops/partitioning.rs new file mode 100644 index 0000000000..7469e1783e --- /dev/null +++ b/src/daft-core/src/series/ops/partitioning.rs @@ -0,0 +1,98 @@ +use crate::datatypes::logical::TimestampArray; +use crate::datatypes::{Int32Array, Int64Array, TimeUnit}; +use crate::series::array_impl::IntoSeries; +use crate::{ + datatypes::{logical::DateArray, DataType}, + series::Series, +}; +use common_error::{DaftError, DaftResult}; + +impl Series { + pub fn partitioning_years(&self) -> DaftResult { + let epoch_year = Int32Array::from(("1970", vec![1970])).into_series(); + + match self.data_type() { + DataType::Date | DataType::Timestamp(_, None) => { + let years_since_ce = self.dt_year()?; + &years_since_ce - &epoch_year + } + DataType::Timestamp(tu, Some(_)) => { + let array = self.cast(&DataType::Timestamp(*tu, None))?; + let years_since_ce = array.dt_year()?; + &years_since_ce - &epoch_year + } + _ => Err(DaftError::ComputeError(format!( + "Can only run partitioning_years() operation on temporal types, got {}", + self.data_type() + ))), + } + } + + pub fn partitioning_months(&self) -> DaftResult { + let months_in_year = Int32Array::from(("months", vec![12])).into_series(); + let month_of_epoch = Int32Array::from(("months", vec![1])).into_series(); + match self.data_type() { + DataType::Date | DataType::Timestamp(_, None) => { + let years_since_1970 = self.partitioning_years()?; + let months_of_this_year = self.dt_month()?; + ((&years_since_1970 * &months_in_year)? + months_of_this_year)? - month_of_epoch + } + DataType::Timestamp(tu, Some(_)) => { + let array = self.cast(&DataType::Timestamp(*tu, None))?; + let years_since_1970 = array.partitioning_years()?; + let months_of_this_year = array.dt_month()?; + ((&years_since_1970 * &months_in_year)? + months_of_this_year)? - month_of_epoch + } + _ => Err(DaftError::ComputeError(format!( + "Can only run partitioning_years() operation on temporal types, got {}", + self.data_type() + ))), + } + } + + pub fn partitioning_days(&self) -> DaftResult { + match self.data_type() { + DataType::Date => { + let downcasted = self.downcast::()?; + downcasted.cast(&DataType::Int32) + } + DataType::Timestamp(_, None) => { + let ts_array = self.downcast::()?; + ts_array.date()?.cast(&DataType::Int32) + } + + DataType::Timestamp(tu, Some(_)) => { + let array = self.cast(&DataType::Timestamp(*tu, None))?; + let ts_array = array.downcast::()?; + ts_array.date()?.cast(&DataType::Int32) + } + + _ => Err(DaftError::ComputeError(format!( + "Can only run partitioning_days() operation on temporal types, got {}", + self.data_type() + ))), + } + } + + pub fn partitioning_hours(&self) -> DaftResult { + match self.data_type() { + DataType::Timestamp(unit, _) => { + let ts_array = self.downcast::()?; + let physical = &ts_array.physical; + let unit_to_hours: i64 = match unit { + TimeUnit::Nanoseconds => 3_600_000_000_000, + TimeUnit::Microseconds => 3_600_000_000, + TimeUnit::Milliseconds => 3_600_000, + TimeUnit::Seconds => 3_600, + }; + let divider = Int64Array::from(("divider", vec![unit_to_hours])); + let hours = (physical / ÷r)?; + Ok(hours.into_series()) + } + _ => Err(DaftError::ComputeError(format!( + "Can only run partitioning_hours() operation on timestamp types, got {}", + self.data_type() + ))), + } + } +} diff --git a/src/daft-dsl/src/functions/mod.rs b/src/daft-dsl/src/functions/mod.rs index 79e5287eac..7508c9de5a 100644 --- a/src/daft-dsl/src/functions/mod.rs +++ b/src/daft-dsl/src/functions/mod.rs @@ -2,6 +2,7 @@ pub mod float; pub mod image; pub mod list; pub mod numeric; +pub mod partitioning; pub mod temporal; pub mod uri; pub mod utf8; @@ -9,6 +10,7 @@ pub mod utf8; use self::image::ImageExpr; use self::list::ListExpr; use self::numeric::NumericExpr; +use self::partitioning::PartitioningExpr; use self::temporal::TemporalExpr; use self::utf8::Utf8Expr; use self::{float::FloatExpr, uri::UriExpr}; @@ -33,6 +35,7 @@ pub enum FunctionExpr { Image(ImageExpr), #[cfg(feature = "python")] Python(PythonUDF), + Partitioning(PartitioningExpr), Uri(UriExpr), } @@ -56,6 +59,7 @@ impl FunctionExpr { Uri(expr) => expr.get_evaluator(), #[cfg(feature = "python")] Python(expr) => expr, + Partitioning(expr) => expr.get_evaluator(), } } } diff --git a/src/daft-dsl/src/functions/partitioning/evaluators.rs b/src/daft-dsl/src/functions/partitioning/evaluators.rs new file mode 100644 index 0000000000..5ef92b5758 --- /dev/null +++ b/src/daft-dsl/src/functions/partitioning/evaluators.rs @@ -0,0 +1,57 @@ +use daft_core::{ + datatypes::{DataType, Field}, + schema::Schema, + series::Series, +}; + +use crate::Expr; + +use common_error::{DaftError, DaftResult}; + +use super::super::FunctionEvaluator; + +macro_rules! impl_func_evaluator_for_partitioning { + ($name:ident, $op:ident, $kernel:ident) => { + pub(super) struct $name {} + + impl FunctionEvaluator for $name { + fn fn_name(&self) -> &'static str { + "$op" + } + + fn to_field(&self, inputs: &[Expr], schema: &Schema, _: &Expr) -> DaftResult { + match inputs { + [input] => match input.to_field(schema) { + Ok(field) if field.dtype.is_temporal() => { + Ok(Field::new(field.name, DataType::Int32)) + } + Ok(field) => Err(DaftError::TypeError(format!( + "Expected input to $op to be temporal, got {}", + field.dtype + ))), + Err(e) => Err(e), + }, + _ => Err(DaftError::SchemaMismatch(format!( + "Expected 1 input arg, got {}", + inputs.len() + ))), + } + } + + fn evaluate(&self, inputs: &[Series], _: &Expr) -> DaftResult { + match inputs { + [input] => input.$kernel(), + _ => Err(DaftError::ValueError(format!( + "Expected 1 input arg, got {}", + inputs.len() + ))), + } + } + } + }; +} + +impl_func_evaluator_for_partitioning!(YearsEvaluator, years, partitioning_years); +impl_func_evaluator_for_partitioning!(MonthsEvaluator, months, partitioning_months); +impl_func_evaluator_for_partitioning!(DaysEvaluator, days, partitioning_days); +impl_func_evaluator_for_partitioning!(HoursEvaluator, hours, partitioning_hours); diff --git a/src/daft-dsl/src/functions/partitioning/mod.rs b/src/daft-dsl/src/functions/partitioning/mod.rs new file mode 100644 index 0000000000..f3aebb6fcd --- /dev/null +++ b/src/daft-dsl/src/functions/partitioning/mod.rs @@ -0,0 +1,61 @@ +mod evaluators; + +use serde::{Deserialize, Serialize}; + +use crate::{ + functions::partitioning::evaluators::{ + DaysEvaluator, HoursEvaluator, MonthsEvaluator, YearsEvaluator, + }, + Expr, +}; + +use super::FunctionEvaluator; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub enum PartitioningExpr { + Years, + Months, + Days, + Hours, +} + +impl PartitioningExpr { + #[inline] + pub fn get_evaluator(&self) -> &dyn FunctionEvaluator { + use PartitioningExpr::*; + match self { + Years => &YearsEvaluator {}, + Months => &MonthsEvaluator {}, + Days => &DaysEvaluator {}, + Hours => &HoursEvaluator {}, + } + } +} + +pub fn days(input: &Expr) -> Expr { + Expr::Function { + func: super::FunctionExpr::Partitioning(PartitioningExpr::Days), + inputs: vec![input.clone()], + } +} + +pub fn hours(input: &Expr) -> Expr { + Expr::Function { + func: super::FunctionExpr::Partitioning(PartitioningExpr::Hours), + inputs: vec![input.clone()], + } +} + +pub fn months(input: &Expr) -> Expr { + Expr::Function { + func: super::FunctionExpr::Partitioning(PartitioningExpr::Months), + inputs: vec![input.clone()], + } +} + +pub fn years(input: &Expr) -> Expr { + Expr::Function { + func: super::FunctionExpr::Partitioning(PartitioningExpr::Years), + inputs: vec![input.clone()], + } +} diff --git a/src/daft-dsl/src/python.rs b/src/daft-dsl/src/python.rs index 1a04d28c46..c5f9ed03ec 100644 --- a/src/daft-dsl/src/python.rs +++ b/src/daft-dsl/src/python.rs @@ -275,6 +275,26 @@ impl PyExpr { Ok(day_of_week(&self.expr).into()) } + pub fn partitioning_days(&self) -> PyResult { + use functions::partitioning::days; + Ok(days(&self.expr).into()) + } + + pub fn partitioning_hours(&self) -> PyResult { + use functions::partitioning::hours; + Ok(hours(&self.expr).into()) + } + + pub fn partitioning_months(&self) -> PyResult { + use functions::partitioning::months; + Ok(months(&self.expr).into()) + } + + pub fn partitioning_years(&self) -> PyResult { + use functions::partitioning::years; + Ok(years(&self.expr).into()) + } + pub fn utf8_endswith(&self, pattern: &Self) -> PyResult { use crate::functions::utf8::endswith; Ok(endswith(&self.expr, &pattern.expr).into()) diff --git a/tests/series/test_partitioning.py b/tests/series/test_partitioning.py new file mode 100644 index 0000000000..66fa7422dd --- /dev/null +++ b/tests/series/test_partitioning.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import pytest + +from daft import DataType, TimeUnit +from daft.series import Series + + +@pytest.mark.parametrize( + "input,dtype,expected", + [ + ([-1], DataType.date(), [-1]), + ([-1, None, 17501], DataType.date(), [-1, None, 17501]), + ([], DataType.date(), []), + ([None], DataType.date(), [None]), + ([1512151975038194111], DataType.timestamp(timeunit=TimeUnit.from_str("ns")), [17501]), + ([1512151975038194], DataType.timestamp(timeunit=TimeUnit.from_str("us")), [17501]), + ([1512151975038], DataType.timestamp(timeunit=TimeUnit.from_str("ms")), [17501]), + ([1512151975], DataType.timestamp(timeunit=TimeUnit.from_str("s")), [17501]), + ([-1], DataType.timestamp(timeunit=TimeUnit.from_str("us")), [-1]), + ([-1], DataType.timestamp(timeunit=TimeUnit.from_str("us"), timezone="-08:00"), [-1]), + ([-13 * 3_600_000_000], DataType.timestamp(timeunit=TimeUnit.from_str("us"), timezone="-12:00"), [-1]), + ], +) +def test_partitioning_days(input, dtype, expected): + s = Series.from_pylist(input).cast(dtype) + assert s.partitioning.days().to_pylist() == expected + + +@pytest.mark.parametrize( + "input,dtype,expected", + [ + ([-1], DataType.date(), [-1]), + ([-1, 0, -13, None, 17501], DataType.date(), [-1, 0, -1, None, 575]), + ([], DataType.date(), []), + ([None], DataType.date(), [None]), + ([1512151975038194111], DataType.timestamp(timeunit=TimeUnit.from_str("ns")), [575]), + ([1512151975038194], DataType.timestamp(timeunit=TimeUnit.from_str("us")), [575]), + ([1512151975038], DataType.timestamp(timeunit=TimeUnit.from_str("ms")), [575]), + ([1512151975], DataType.timestamp(timeunit=TimeUnit.from_str("s")), [575]), + ([-1], DataType.timestamp(timeunit=TimeUnit.from_str("us")), [-1]), + ([-1], DataType.timestamp(timeunit=TimeUnit.from_str("us"), timezone="-08:00"), [-1]), + ( + [(-24 * 31 + 11) * 3_600_000_000], + DataType.timestamp(timeunit=TimeUnit.from_str("us"), timezone="-12:00"), + [-1], + ), + ], +) +def test_partitioning_months(input, dtype, expected): + s = Series.from_pylist(input).cast(dtype) + assert s.partitioning.months().to_pylist() == expected + + +@pytest.mark.parametrize( + "input,dtype,expected", + [ + ([-1], DataType.date(), [-1]), + ([-1, 0, -13, None, 17501], DataType.date(), [-1, 0, -1, None, 47]), + ([], DataType.date(), []), + ([None], DataType.date(), [None]), + ([-364, -366, 364, 366], DataType.date(), [-1, -2, 0, 1]), + ([1512151975038194111], DataType.timestamp(timeunit=TimeUnit.from_str("ns")), [47]), + ([1512151975038194], DataType.timestamp(timeunit=TimeUnit.from_str("us")), [47]), + ([1512151975038], DataType.timestamp(timeunit=TimeUnit.from_str("ms")), [47]), + ([1512151975], DataType.timestamp(timeunit=TimeUnit.from_str("s")), [47]), + ([-1], DataType.timestamp(timeunit=TimeUnit.from_str("us")), [-1]), + ([-1], DataType.timestamp(timeunit=TimeUnit.from_str("us"), timezone="-08:00"), [-1]), + ], +) +def test_partitioning_years(input, dtype, expected): + s = Series.from_pylist(input).cast(dtype) + assert s.partitioning.years().to_pylist() == expected + + +@pytest.mark.parametrize( + "input,dtype,expected", + [ + ([1512151975038194111], DataType.timestamp(timeunit=TimeUnit.from_str("ns")), [420042]), + ([1512151975038194], DataType.timestamp(timeunit=TimeUnit.from_str("us")), [420042]), + ([1512151975038], DataType.timestamp(timeunit=TimeUnit.from_str("ms")), [420042]), + ([1512151975], DataType.timestamp(timeunit=TimeUnit.from_str("s")), [420042]), + ([-1], DataType.timestamp(timeunit=TimeUnit.from_str("us")), [0]), + ([-1], DataType.timestamp(timeunit=TimeUnit.from_str("us"), timezone="-08:00"), [0]), + ( + [-3_600_000_000 + 1, -3_600_000_000, 3_600_000_000 + -1, 3_600_000_000 + 1], + DataType.timestamp(timeunit=TimeUnit.from_str("us"), timezone="-08:00"), + [0, -1, 0, 1], + ), + ], +) +def test_partitioning_hours(input, dtype, expected): + s = Series.from_pylist(input).cast(dtype) + assert s.partitioning.hours().to_pylist() == expected