Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Partitioning exprs for Iceberg #1680

Merged
merged 5 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,10 @@ class PySeries:
def dt_month(self) -> PySeries: ...
def dt_year(self) -> PySeries: ...
def dt_day_of_week(self) -> PySeries: ...
def partitioning_days(self) -> PySeries: ...
def partitioning_hours(self) -> PySeries: ...
def partitioning_months(self) -> PySeries: ...
def partitioning_years(self) -> PySeries: ...
def list_lengths(self) -> PySeries: ...
def image_decode(self) -> PySeries: ...
def image_encode(self, image_format: ImageFormat) -> PySeries: ...
Expand Down
18 changes: 18 additions & 0 deletions daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,10 @@ def list(self) -> SeriesListNamespace:
def image(self) -> SeriesImageNamespace:
return SeriesImageNamespace.from_series(self)

@property
def partitioning(self) -> SeriesPartitioningNamespace:
return SeriesPartitioningNamespace.from_series(self)

def __reduce__(self) -> tuple:
if self.datatype()._is_python_type():
return (Series.from_pylist, (self.to_pylist(), self.name(), "force"))
Expand Down Expand Up @@ -578,6 +582,20 @@ def day_of_week(self) -> Series:
return Series._from_pyseries(self._series.dt_day_of_week())


class SeriesPartitioningNamespace(SeriesNamespace):
def days(self) -> Series:
return Series._from_pyseries(self._series.partitioning_days())

def hours(self) -> Series:
return Series._from_pyseries(self._series.partitioning_hours())

def months(self) -> Series:
return Series._from_pyseries(self._series.partitioning_months())

def years(self) -> Series:
return Series._from_pyseries(self._series.partitioning_years())


class SeriesListNamespace(SeriesNamespace):
def lengths(self) -> Series:
return Series._from_pyseries(self._series.list_lengths())
Expand Down
9 changes: 8 additions & 1 deletion src/daft-core/src/array/ops/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow2::{array::PrimitiveArray, compute::arithmetics::basic};

use crate::{
array::DataArray,
datatypes::{DaftNumericType, Float64Array, Utf8Array},
datatypes::{DaftNumericType, Float64Array, Int64Array, Utf8Array},
kernels::utf8::add_utf8_arrays,
};

Expand Down Expand Up @@ -121,6 +121,13 @@ impl Div for &Float64Array {
}
}

impl Div for &Int64Array {
type Output = DaftResult<Int64Array>;
fn div(self, rhs: Self) -> Self::Output {
arithmetic_helper(self, rhs, basic::div, |l, r| l / r)
}
}

pub fn binary_with_nulls<T, F>(
lhs: &PrimitiveArray<T>,
rhs: &PrimitiveArray<T>,
Expand Down
16 changes: 16 additions & 0 deletions src/daft-core/src/python/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,22 @@ impl PySeries {
Ok(self.series.dt_day_of_week()?.into())
}

pub fn partitioning_days(&self) -> PyResult<Self> {
Ok(self.series.partitioning_days()?.into())
}

pub fn partitioning_hours(&self) -> PyResult<Self> {
Ok(self.series.partitioning_hours()?.into())
}

pub fn partitioning_months(&self) -> PyResult<Self> {
Ok(self.series.partitioning_months()?.into())
}

pub fn partitioning_years(&self) -> PyResult<Self> {
Ok(self.series.partitioning_years()?.into())
}

pub fn list_lengths(&self) -> PyResult<Self> {
Ok(self.series.list_lengths()?.into_series().into())
}
Expand Down
1 change: 1 addition & 0 deletions src/daft-core/src/series/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod len;
pub mod list;
pub mod not;
pub mod null;
pub mod partitioning;
pub mod search_sorted;
pub mod sort;
pub mod take;
Expand Down
98 changes: 98 additions & 0 deletions src/daft-core/src/series/ops/partitioning.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use crate::datatypes::logical::TimestampArray;
use crate::datatypes::{Int32Array, Int64Array, TimeUnit};
use crate::series::array_impl::IntoSeries;
use crate::{
datatypes::{logical::DateArray, DataType},
series::Series,
};
use common_error::{DaftError, DaftResult};

impl Series {
pub fn partitioning_years(&self) -> DaftResult<Self> {
let epoch_year = Int32Array::from(("1970", vec![1970])).into_series();

match self.data_type() {
DataType::Date | DataType::Timestamp(_, None) => {
let years_since_ce = self.dt_year()?;
&years_since_ce - &epoch_year
}
DataType::Timestamp(tu, Some(_)) => {
let array = self.cast(&DataType::Timestamp(*tu, None))?;
let years_since_ce = array.dt_year()?;
&years_since_ce - &epoch_year
}
_ => Err(DaftError::ComputeError(format!(
"Can only run partitioning_years() operation on temporal types, got {}",
self.data_type()
))),
}
}

pub fn partitioning_months(&self) -> DaftResult<Self> {
let months_in_year = Int32Array::from(("months", vec![12])).into_series();
let month_of_epoch = Int32Array::from(("months", vec![1])).into_series();
match self.data_type() {
DataType::Date | DataType::Timestamp(_, None) => {
let years_since_1970 = self.partitioning_years()?;
let months_of_this_year = self.dt_month()?;
((&years_since_1970 * &months_in_year)? + months_of_this_year)? - month_of_epoch
}
DataType::Timestamp(tu, Some(_)) => {
let array = self.cast(&DataType::Timestamp(*tu, None))?;
let years_since_1970 = array.partitioning_years()?;
let months_of_this_year = array.dt_month()?;
((&years_since_1970 * &months_in_year)? + months_of_this_year)? - month_of_epoch
}
_ => Err(DaftError::ComputeError(format!(
"Can only run partitioning_years() operation on temporal types, got {}",
self.data_type()
))),
}
}

pub fn partitioning_days(&self) -> DaftResult<Self> {
match self.data_type() {
DataType::Date => {
let downcasted = self.downcast::<DateArray>()?;
downcasted.cast(&DataType::Int32)
}
DataType::Timestamp(_, None) => {
let ts_array = self.downcast::<TimestampArray>()?;
ts_array.date()?.cast(&DataType::Int32)
}

DataType::Timestamp(tu, Some(_)) => {
let array = self.cast(&DataType::Timestamp(*tu, None))?;
let ts_array = array.downcast::<TimestampArray>()?;
ts_array.date()?.cast(&DataType::Int32)
}

_ => Err(DaftError::ComputeError(format!(
"Can only run partitioning_days() operation on temporal types, got {}",
self.data_type()
))),
}
}

pub fn partitioning_hours(&self) -> DaftResult<Self> {
match self.data_type() {
DataType::Timestamp(unit, _) => {
let ts_array = self.downcast::<TimestampArray>()?;
let physical = &ts_array.physical;
let unit_to_hours: i64 = match unit {
TimeUnit::Nanoseconds => 3_600_000_000_000,
TimeUnit::Microseconds => 3_600_000_000,
TimeUnit::Milliseconds => 3_600_000,
TimeUnit::Seconds => 3_600,
};
let divider = Int64Array::from(("divider", vec![unit_to_hours]));
let hours = (physical / &divider)?;
Ok(hours.into_series())
}
_ => Err(DaftError::ComputeError(format!(
"Can only run partitioning_hours() operation on timestamp types, got {}",
self.data_type()
))),
}
}
}
4 changes: 4 additions & 0 deletions src/daft-dsl/src/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ pub mod float;
pub mod image;
pub mod list;
pub mod numeric;
pub mod partitioning;
pub mod temporal;
pub mod uri;
pub mod utf8;

use self::image::ImageExpr;
use self::list::ListExpr;
use self::numeric::NumericExpr;
use self::partitioning::PartitioningExpr;
use self::temporal::TemporalExpr;
use self::utf8::Utf8Expr;
use self::{float::FloatExpr, uri::UriExpr};
Expand All @@ -33,6 +35,7 @@ pub enum FunctionExpr {
Image(ImageExpr),
#[cfg(feature = "python")]
Python(PythonUDF),
Partitioning(PartitioningExpr),
Uri(UriExpr),
}

Expand All @@ -56,6 +59,7 @@ impl FunctionExpr {
Uri(expr) => expr.get_evaluator(),
#[cfg(feature = "python")]
Python(expr) => expr,
Partitioning(expr) => expr.get_evaluator(),
}
}
}
Expand Down
57 changes: 57 additions & 0 deletions src/daft-dsl/src/functions/partitioning/evaluators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use daft_core::{
datatypes::{DataType, Field},
schema::Schema,
series::Series,
};

use crate::Expr;

use common_error::{DaftError, DaftResult};

use super::super::FunctionEvaluator;

macro_rules! impl_func_evaluator_for_partitioning {
($name:ident, $op:ident, $kernel:ident) => {
pub(super) struct $name {}

impl FunctionEvaluator for $name {
fn fn_name(&self) -> &'static str {
"$op"
}

fn to_field(&self, inputs: &[Expr], schema: &Schema, _: &Expr) -> DaftResult<Field> {
match inputs {
[input] => match input.to_field(schema) {
Ok(field) if field.dtype.is_temporal() => {
Ok(Field::new(field.name, DataType::Int32))
}
Ok(field) => Err(DaftError::TypeError(format!(
"Expected input to $op to be temporal, got {}",
field.dtype
))),
Err(e) => Err(e),
},
_ => Err(DaftError::SchemaMismatch(format!(
"Expected 1 input arg, got {}",
inputs.len()
))),
}
}

fn evaluate(&self, inputs: &[Series], _: &Expr) -> DaftResult<Series> {
match inputs {
[input] => input.$kernel(),
_ => Err(DaftError::ValueError(format!(
"Expected 1 input arg, got {}",
inputs.len()
))),
}
}
}
};
}

impl_func_evaluator_for_partitioning!(YearsEvaluator, years, partitioning_years);
impl_func_evaluator_for_partitioning!(MonthsEvaluator, months, partitioning_months);
impl_func_evaluator_for_partitioning!(DaysEvaluator, days, partitioning_days);
impl_func_evaluator_for_partitioning!(HoursEvaluator, hours, partitioning_hours);
61 changes: 61 additions & 0 deletions src/daft-dsl/src/functions/partitioning/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
mod evaluators;

use serde::{Deserialize, Serialize};

use crate::{
functions::partitioning::evaluators::{
DaysEvaluator, HoursEvaluator, MonthsEvaluator, YearsEvaluator,
},
Expr,
};

use super::FunctionEvaluator;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum PartitioningExpr {
Years,
Months,
Days,
Hours,
}

impl PartitioningExpr {
#[inline]
pub fn get_evaluator(&self) -> &dyn FunctionEvaluator {
use PartitioningExpr::*;
match self {
Years => &YearsEvaluator {},
Months => &MonthsEvaluator {},
Days => &DaysEvaluator {},
Hours => &HoursEvaluator {},
}
}
}

pub fn days(input: &Expr) -> Expr {
Expr::Function {
func: super::FunctionExpr::Partitioning(PartitioningExpr::Days),
inputs: vec![input.clone()],
}
}

pub fn hours(input: &Expr) -> Expr {
Expr::Function {
func: super::FunctionExpr::Partitioning(PartitioningExpr::Hours),
inputs: vec![input.clone()],
}
}

pub fn months(input: &Expr) -> Expr {
Expr::Function {
func: super::FunctionExpr::Partitioning(PartitioningExpr::Months),
inputs: vec![input.clone()],
}
}

pub fn years(input: &Expr) -> Expr {
Expr::Function {
func: super::FunctionExpr::Partitioning(PartitioningExpr::Years),
inputs: vec![input.clone()],
}
}
20 changes: 20 additions & 0 deletions src/daft-dsl/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,26 @@ impl PyExpr {
Ok(day_of_week(&self.expr).into())
}

pub fn partitioning_days(&self) -> PyResult<Self> {
use functions::partitioning::days;
Ok(days(&self.expr).into())
}

pub fn partitioning_hours(&self) -> PyResult<Self> {
use functions::partitioning::hours;
Ok(hours(&self.expr).into())
}

pub fn partitioning_months(&self) -> PyResult<Self> {
use functions::partitioning::months;
Ok(months(&self.expr).into())
}

pub fn partitioning_years(&self) -> PyResult<Self> {
use functions::partitioning::years;
Ok(years(&self.expr).into())
}

pub fn utf8_endswith(&self, pattern: &Self) -> PyResult<Self> {
use crate::functions::utf8::endswith;
Ok(endswith(&self.expr, &pattern.expr).into())
Expand Down
Loading
Loading