From 46c5a7e39399dd08dbf7d96d22673cc35caf9a4b Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 26 Sep 2024 12:21:04 -0700 Subject: [PATCH] [CHORE] Refactor Binary Ops (#2876) This PR removes 1.4 million lines of llvm code and reduces the size of `daft-core` by 44%! This was accomplished by dropping the `SeriesBinaryOps` trait which required every Array Type to implement their own binary ops which the default implementation doing a macro expand on both the the rhs type and the output type. This caused a `O(Dtype^2)` expansion for every Array Type. This was done as a way to let each Array define their own behavior for binary ops but we didn't really leverage that outside of a few temporal types. For example if wanted to implement `Timestamp + Duration` we could implement it on `TimestampArray ` But since we may also have `Duration + Timestamp`, we would also have to implement it on `DurationArray`. The new approach is much simpler where we dispatch to the target implementation based of `left_dtype, right_dtype`. the numerics path pretty much stays the same but for temporals theres only a handful of pairs to consider. I also factored out a bunch of macros into functions, especially ones that would perform the binary ops on PythonArrays Breakdown of llvm lines: current main: ``` Lines Copies Function name ----- ------ ------------- 3926120 70662 (TOTAL) 162996 (4.2%, 4.2%) 1208 (1.7%, 1.7%) as core::iter::traits::iterator::Iterator>::fold 135172 (3.4%, 7.6%) 1201 (1.7%, 3.4%) as alloc::vec::spec_from_iter_nested::SpecFromIterNested>::from_iter 127022 (3.2%, 10.8%) 1136 (1.6%, 5.0%) alloc::vec::Vec::extend_trusted 82450 (2.1%, 12.9%) 34 (0.0%, 5.1%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::equal 82450 (2.1%, 15.0%) 34 (0.0%, 5.1%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::gt 82450 (2.1%, 17.1%) 34 (0.0%, 5.2%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::gte 82450 (2.1%, 19.2%) 34 (0.0%, 5.2%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::lt 82450 (2.1%, 21.3%) 34 (0.0%, 5.3%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::lte 82450 (2.1%, 23.4%) 34 (0.0%, 5.3%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::not_equal 79322 (2.0%, 25.5%) 34 (0.0%, 5.4%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::mul 79322 (2.0%, 27.5%) 34 (0.0%, 5.4%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::rem 76880 (2.0%, 29.4%) 31 (0.0%, 5.4%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::add 72323 (1.8%, 31.3%) 31 (0.0%, 5.5%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::sub 67116 (1.7%, 33.0%) 34 (0.0%, 5.5%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::and 67116 (1.7%, 34.7%) 34 (0.0%, 5.6%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::or 67116 (1.7%, 36.4%) 34 (0.0%, 5.6%) daft_core::series::array_impl::binary_ops::SeriesBinaryOps::xor 47428 (1.2%, 37.6%) 334 (0.5%, 6.1%) core::slice::sort::unstable::quicksort::partition_lomuto_branchless_cyclic ``` after: ``` Lines Copies Function name ----- ------ ------------- 2512523 73042 (TOTAL) 136529 (5.4%, 5.4%) 1208 (1.7%, 1.7%) as core::iter::traits::iterator::Iterator>::fold 127090 (5.1%, 10.5%) 1201 (1.6%, 3.3%) as alloc::vec::spec_from_iter_nested::SpecFromIterNested>::from_iter 106918 (4.3%, 14.7%) 1136 (1.6%, 4.9%) alloc::vec::Vec::extend_trusted 42752 (1.7%, 16.4%) 334 (0.5%, 5.3%) core::slice::sort::unstable::quicksort::partition_lomuto_branchless_cyclic 39085 (1.6%, 18.0%) 1371 (1.9%, 7.2%) core::iter::adapters::map::map_fold::{{closure}} 37698 (1.5%, 19.5%) 383 (0.5%, 7.7%) alloc::vec::Vec::extend_desugared 36300 (1.4%, 20.9%) 150 (0.2%, 7.9%) core::slice::sort::shared::find_existing_run 36236 (1.4%, 22.4%) 521 (0.7%, 8.6%) core::iter::traits::iterator::Iterator::try_fold 36018 (1.4%, 23.8%) 373 (0.5%, 9.1%) as core::iter::traits::iterator::Iterator>::try_fold::{{closure}} 34050 (1.4%, 25.2%) 150 (0.2%, 9.3%) core::slice::sort::shared::smallsort::small_sort_general_with_scratch 33082 (1.3%, 26.5%) 278 (0.4%, 9.7%) arrow2::array::utf8::mutable::MutableUtf8Array::try_from_iter 32417 (1.3%, 27.8%) 193 (0.3%, 10.0%) daft_core::array::ops::utf8::substr_compute_result::{{closure}} ``` --- Cargo.toml | 4 + src/daft-core/src/array/ops/arithmetic.rs | 4 +- .../src/series/array_impl/binary_ops.rs | 404 ------------------ .../src/series/array_impl/data_array.rs | 47 +- .../src/series/array_impl/logical_array.rs | 49 +-- src/daft-core/src/series/array_impl/mod.rs | 1 - .../src/series/array_impl/nested_array.rs | 47 +- src/daft-core/src/series/mod.rs | 1 + src/daft-core/src/series/ops/arithmetic.rs | 280 +++++++++++- src/daft-core/src/series/ops/between.rs | 2 +- src/daft-core/src/series/ops/comparison.rs | 78 +++- src/daft-core/src/series/ops/is_in.rs | 2 +- src/daft-core/src/series/ops/logical.rs | 117 +++++ src/daft-core/src/series/ops/mod.rs | 142 +----- src/daft-core/src/series/series_like.rs | 14 - src/daft-core/src/series/utils/mod.rs | 14 + src/daft-core/src/series/utils/python_fn.rs | 157 +++++++ src/daft-core/src/utils/mod.rs | 12 - 18 files changed, 622 insertions(+), 753 deletions(-) delete mode 100644 src/daft-core/src/series/array_impl/binary_ops.rs create mode 100644 src/daft-core/src/series/ops/logical.rs create mode 100644 src/daft-core/src/series/utils/mod.rs create mode 100644 src/daft-core/src/series/utils/python_fn.rs diff --git a/Cargo.toml b/Cargo.toml index 34c6a28c80..1d1065f026 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,8 +82,12 @@ parquet2 = {path = "src/parquet2"} debug = true [profile.dev] +debug = "line-tables-only" overflow-checks = false +[profile.dev.build-override] +opt-level = 3 + [profile.dev-bench] codegen-units = 16 debug = 1 # include symbols diff --git a/src/daft-core/src/array/ops/arithmetic.rs b/src/daft-core/src/array/ops/arithmetic.rs index aa6b067f78..21e23657c6 100644 --- a/src/daft-core/src/array/ops/arithmetic.rs +++ b/src/daft-core/src/array/ops/arithmetic.rs @@ -59,9 +59,7 @@ where let opt_lhs = lhs.get(0); match opt_lhs { None => Ok(DataArray::full_null(rhs.name(), lhs.data_type(), rhs.len())), - // NOTE: naming logic here is wrong, and assigns the rhs name. However, renaming is handled at the Series level so this - // error is obfuscated. - Some(lhs) => rhs.apply(|rhs| operation(lhs, rhs)), + Some(scalar) => Ok(rhs.apply(|rhs| operation(scalar, rhs))?.rename(lhs.name())), } } (a, b) => Err(DaftError::ValueError(format!( diff --git a/src/daft-core/src/series/array_impl/binary_ops.rs b/src/daft-core/src/series/array_impl/binary_ops.rs deleted file mode 100644 index 16e1a80b8d..0000000000 --- a/src/daft-core/src/series/array_impl/binary_ops.rs +++ /dev/null @@ -1,404 +0,0 @@ -use std::ops::{Add, Div, Mul, Rem, Sub}; - -use common_error::DaftResult; - -use super::{ArrayWrapper, IntoSeries, Series}; -use crate::{ - array::{ - ops::{DaftCompare, DaftLogical}, - FixedSizeListArray, ListArray, StructArray, - }, - datatypes::{ - logical::{ - DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray, - FixedShapeSparseTensorArray, FixedShapeTensorArray, ImageArray, MapArray, - SparseTensorArray, TensorArray, TimeArray, TimestampArray, - }, - BinaryArray, BooleanArray, DataType, ExtensionArray, Field, FixedSizeBinaryArray, - Float32Array, Float64Array, InferDataType, Int128Array, Int16Array, Int32Array, Int64Array, - Int8Array, NullArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, Utf8Array, - }, - series::series_like::SeriesLike, - with_match_comparable_daft_types, with_match_integer_daft_types, with_match_numeric_daft_types, -}; -#[cfg(feature = "python")] -use crate::{datatypes::PythonArray, series::ops::py_binary_op_utilfn}; - -#[cfg(feature = "python")] -macro_rules! py_binary_op { - ($lhs:expr, $rhs:expr, $pyoperator:expr) => { - py_binary_op_utilfn!($lhs, $rhs, $pyoperator, "map_operator_arrow_semantics") - }; -} -#[cfg(feature = "python")] -macro_rules! py_binary_op_bool { - ($lhs:expr, $rhs:expr, $pyoperator:expr) => { - py_binary_op_utilfn!($lhs, $rhs, $pyoperator, "map_operator_arrow_semantics_bool") - }; -} - -macro_rules! cast_downcast_op { - ($lhs:expr, $rhs:expr, $ty_expr:expr, $ty_type:ty, $op:ident) => {{ - let lhs = $lhs.cast($ty_expr)?; - let rhs = $rhs.cast($ty_expr)?; - let lhs = lhs.downcast::<$ty_type>()?; - let rhs = rhs.downcast::<$ty_type>()?; - lhs.$op(rhs) - }}; -} - -macro_rules! cast_downcast_op_into_series { - ($lhs:expr, $rhs:expr, $ty_expr:expr, $ty_type:ty, $op:ident) => {{ - Ok(cast_downcast_op!($lhs, $rhs, $ty_expr, $ty_type, $op)? - .into_series() - .rename($lhs.name())) - }}; -} - -macro_rules! apply_fixed_numeric_op { - ($lhs:expr, $rhs:expr, $op:ident) => {{ - $lhs.$op($rhs)? - }}; -} - -macro_rules! fixed_sized_numeric_binary_op { - ($left:expr, $right:expr, $output_type:expr, $op:ident) => {{ - assert!($left.data_type().is_fixed_size_numeric()); - assert!($right.data_type().is_fixed_size_numeric()); - - match ($left.data_type(), $right.data_type()) { - (DataType::FixedSizeList(..), DataType::FixedSizeList(..)) => { - Ok(apply_fixed_numeric_op!( - $left.downcast::().unwrap(), - $right.downcast::().unwrap(), - $op - ) - .into_series()) - } - (DataType::Embedding(..), DataType::Embedding(..)) => { - let physical = apply_fixed_numeric_op!( - &$left.downcast::().unwrap().physical, - &$right.downcast::().unwrap().physical, - $op - ); - let array = - EmbeddingArray::new(Field::new($left.name(), $output_type.clone()), physical); - Ok(array.into_series()) - } - (DataType::FixedShapeTensor(..), DataType::FixedShapeTensor(..)) => { - let physical = apply_fixed_numeric_op!( - &$left.downcast::().unwrap().physical, - &$right.downcast::().unwrap().physical, - $op - ); - let array = FixedShapeTensorArray::new( - Field::new($left.name(), $output_type.clone()), - physical, - ); - Ok(array.into_series()) - } - (left, right) => unimplemented!("cannot add {left} and {right} types"), - } - }}; -} - -macro_rules! binary_op_unimplemented { - ($lhs:expr, $op:expr, $rhs:expr, $output_ty:expr) => { - unimplemented!( - "No implementation for {} {} {} -> {}", - $lhs.data_type(), - $op, - $rhs.data_type(), - $output_ty, - ) - }; -} - -macro_rules! py_numeric_binary_op { - ($self:expr, $rhs:expr, $op:ident, $pyop:expr) => {{ - let output_type = - InferDataType::from($self.data_type()).$op(InferDataType::from($rhs.data_type()))?; - let lhs = $self.into_series(); - match &output_type { - #[cfg(feature = "python")] - DataType::Python => Ok(py_binary_op!(lhs, $rhs, $pyop)), - output_type if output_type.is_numeric() => { - with_match_numeric_daft_types!(output_type, |$T| { - cast_downcast_op_into_series!( - lhs, - $rhs, - output_type, - <$T as DaftDataType>::ArrayType, - $op - ) - }) - } - output_type if output_type.is_fixed_size_numeric() => { - fixed_sized_numeric_binary_op!(&lhs, $rhs, output_type, $op) - } - _ => binary_op_unimplemented!(lhs, $pyop, $rhs, output_type), - } - }}; -} - -macro_rules! physical_logic_op { - ($self:expr, $rhs:expr, $op:ident, $pyop:expr) => {{ - let output_type = InferDataType::from($self.data_type()) - .logical_op(&InferDataType::from($rhs.data_type()))?; - let lhs = $self.into_series(); - match &output_type { - #[cfg(feature = "python")] - DataType::Boolean => match (&lhs.data_type(), &$rhs.data_type()) { - #[cfg(feature = "python")] - (DataType::Python, _) | (_, DataType::Python) => { - Ok(py_binary_op_bool!(lhs, $rhs, $pyop)) - } - _ => { - cast_downcast_op_into_series!(lhs, $rhs, &DataType::Boolean, BooleanArray, $op) - } - }, - output_type if output_type.is_integer() => { - with_match_integer_daft_types!(output_type, |$T| { - cast_downcast_op_into_series!( - lhs, - $rhs, - output_type, - <$T as DaftDataType>::ArrayType, - $op - ) - }) - } - _ => binary_op_unimplemented!(lhs, $pyop, $rhs, output_type), - } - }}; -} - -macro_rules! physical_compare_op { - ($self:expr, $rhs:expr, $op:ident, $pyop:expr) => {{ - let (output_type, intermediate, comp_type) = InferDataType::from($self.data_type()) - .comparison_op(&InferDataType::from($rhs.data_type()))?; - let lhs = $self.into_series(); - let (lhs, rhs) = if let Some(ref it) = intermediate { - (lhs.cast(it)?, $rhs.cast(it)?) - } else { - (lhs, $rhs.clone()) - }; - - if let DataType::Boolean = output_type { - match comp_type { - #[cfg(feature = "python")] - DataType::Python => py_binary_op_bool!(lhs, rhs, $pyop) - .downcast::() - .cloned(), - _ => with_match_comparable_daft_types!(comp_type, |$T| { - cast_downcast_op!(lhs, rhs, &comp_type, <$T as DaftDataType>::ArrayType, $op) - }), - } - } else { - unreachable!() - } - }}; -} - -pub(crate) trait SeriesBinaryOps: SeriesLike { - fn add(&self, rhs: &Series) -> DaftResult { - let output_type = - InferDataType::from(self.data_type()).add(InferDataType::from(rhs.data_type()))?; - let lhs = self.into_series(); - match &output_type { - #[cfg(feature = "python")] - DataType::Python => Ok(py_binary_op!(lhs, rhs, "add")), - DataType::Utf8 => { - cast_downcast_op_into_series!(lhs, rhs, &DataType::Utf8, Utf8Array, add) - } - output_type if output_type.is_numeric() => { - with_match_numeric_daft_types!(output_type, |$T| { - cast_downcast_op_into_series!(lhs, rhs, output_type, <$T as DaftDataType>::ArrayType, add) - }) - } - output_type if output_type.is_fixed_size_numeric() => { - fixed_sized_numeric_binary_op!(&lhs, rhs, output_type, add) - } - _ => binary_op_unimplemented!(lhs, "+", rhs, output_type), - } - } - fn sub(&self, rhs: &Series) -> DaftResult { - py_numeric_binary_op!(self, rhs, sub, "sub") - } - fn mul(&self, rhs: &Series) -> DaftResult { - py_numeric_binary_op!(self, rhs, mul, "mul") - } - fn div(&self, rhs: &Series) -> DaftResult { - let output_type = - InferDataType::from(self.data_type()).div(InferDataType::from(rhs.data_type()))?; - let lhs = self.into_series(); - match &output_type { - #[cfg(feature = "python")] - DataType::Python => Ok(py_binary_op!(lhs, rhs, "truediv")), - DataType::Float64 => { - cast_downcast_op_into_series!(lhs, rhs, &DataType::Float64, Float64Array, div) - } - output_type if output_type.is_fixed_size_numeric() => { - fixed_sized_numeric_binary_op!(&lhs, rhs, output_type, div) - } - _ => binary_op_unimplemented!(lhs, "/", rhs, output_type), - } - } - fn rem(&self, rhs: &Series) -> DaftResult { - py_numeric_binary_op!(self, rhs, rem, "mod") - } - fn and(&self, rhs: &Series) -> DaftResult { - physical_logic_op!(self, rhs, and, "and_") - } - fn or(&self, rhs: &Series) -> DaftResult { - physical_logic_op!(self, rhs, or, "or_") - } - fn xor(&self, rhs: &Series) -> DaftResult { - physical_logic_op!(self, rhs, xor, "xor") - } - fn equal(&self, rhs: &Series) -> DaftResult { - physical_compare_op!(self, rhs, equal, "eq") - } - fn not_equal(&self, rhs: &Series) -> DaftResult { - physical_compare_op!(self, rhs, not_equal, "ne") - } - fn lt(&self, rhs: &Series) -> DaftResult { - physical_compare_op!(self, rhs, lt, "lt") - } - fn lte(&self, rhs: &Series) -> DaftResult { - physical_compare_op!(self, rhs, lte, "le") - } - fn gt(&self, rhs: &Series) -> DaftResult { - physical_compare_op!(self, rhs, gt, "gt") - } - fn gte(&self, rhs: &Series) -> DaftResult { - physical_compare_op!(self, rhs, gte, "ge") - } -} - -#[cfg(feature = "python")] -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper { - fn add(&self, rhs: &Series) -> DaftResult { - let output_type = - (InferDataType::from(self.data_type()) + InferDataType::from(rhs.data_type()))?; - match rhs.data_type() { - DataType::Duration(..) => { - let days = rhs.duration()?.cast_to_days()?; - let physical_result = self.0.physical.add(&days)?; - physical_result.cast(&output_type) - } - _ => binary_op_unimplemented!(self, "+", rhs, output_type), - } - } - fn sub(&self, rhs: &Series) -> DaftResult { - let output_type = - (InferDataType::from(self.data_type()) - InferDataType::from(rhs.data_type()))?; - match rhs.data_type() { - DataType::Date => { - let physical_result = self.0.physical.sub(&rhs.date()?.physical)?; - physical_result.cast(&output_type) - } - DataType::Duration(..) => { - let days = rhs.duration()?.cast_to_days()?; - let physical_result = self.0.physical.sub(&days)?; - physical_result.cast(&output_type) - } - _ => binary_op_unimplemented!(self, "-", rhs, output_type), - } - } -} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper { - fn add(&self, rhs: &Series) -> DaftResult { - let output_type = - (InferDataType::from(self.data_type()) + InferDataType::from(rhs.data_type()))?; - let lhs = self.0.clone().into_series(); - match rhs.data_type() { - DataType::Timestamp(..) => { - let physical_result = self.0.physical.add(&rhs.timestamp()?.physical)?; - physical_result.cast(&output_type) - } - DataType::Duration(..) => { - let physical_result = self.0.physical.add(&rhs.duration()?.physical)?; - physical_result.cast(&output_type) - } - DataType::Date => { - let days = self.0.cast_to_days()?; - let physical_result = days.add(&rhs.date()?.physical)?; - physical_result.cast(&output_type) - } - _ => binary_op_unimplemented!(lhs, "+", rhs, output_type), - } - } - - fn sub(&self, rhs: &Series) -> DaftResult { - let output_type = - (InferDataType::from(self.data_type()) - InferDataType::from(rhs.data_type()))?; - match rhs.data_type() { - DataType::Duration(..) => { - let physical_result = self.0.physical.sub(&rhs.duration()?.physical)?; - physical_result.cast(&output_type) - } - _ => binary_op_unimplemented!(self, "-", rhs, output_type), - } - } -} - -impl SeriesBinaryOps for ArrayWrapper { - fn add(&self, rhs: &Series) -> DaftResult { - let output_type = - (InferDataType::from(self.data_type()) + InferDataType::from(rhs.data_type()))?; - match rhs.data_type() { - DataType::Duration(..) => { - let physical_result = self.0.physical.add(&rhs.duration()?.physical)?; - physical_result.cast(&output_type) - } - _ => binary_op_unimplemented!(self, "+", rhs, output_type), - } - } - fn sub(&self, rhs: &Series) -> DaftResult { - let output_type = - (InferDataType::from(self.data_type()) - InferDataType::from(rhs.data_type()))?; - match rhs.data_type() { - DataType::Duration(..) => { - let physical_result = self.0.physical.sub(&rhs.duration()?.physical)?; - physical_result.cast(&output_type) - } - DataType::Timestamp(..) => { - let physical_result = self.0.physical.sub(&rhs.timestamp()?.physical)?; - physical_result.cast(&output_type) - } - _ => binary_op_unimplemented!(self, "-", rhs, output_type), - } - } -} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} -impl SeriesBinaryOps for ArrayWrapper {} diff --git a/src/daft-core/src/series/array_impl/data_array.rs b/src/daft-core/src/series/array_impl/data_array.rs index c210d5cdb2..1506d8cc9e 100644 --- a/src/daft-core/src/series/array_impl/data_array.rs +++ b/src/daft-core/src/series/array_impl/data_array.rs @@ -12,7 +12,7 @@ use crate::{ DataArray, }, datatypes::{DaftArrowBackedType, DataType, FixedSizeBinaryArray}, - series::{array_impl::binary_ops::SeriesBinaryOps, series_like::SeriesLike}, + series::series_like::SeriesLike, with_match_integer_daft_types, }; @@ -159,51 +159,6 @@ macro_rules! impl_series_like_for_data_array { None => Ok(self.0.list()?.into_series()), } } - - fn add(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::add(self, rhs) - } - fn sub(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::sub(self, rhs) - } - fn mul(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::mul(self, rhs) - } - fn div(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::div(self, rhs) - } - fn rem(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::rem(self, rhs) - } - - fn and(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::and(self, rhs) - } - fn or(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::or(self, rhs) - } - fn xor(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::xor(self, rhs) - } - - fn equal(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::equal(self, rhs) - } - fn not_equal(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::not_equal(self, rhs) - } - fn lt(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::lt(self, rhs) - } - fn lte(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::lte(self, rhs) - } - fn gt(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::gt(self, rhs) - } - fn gte(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::gte(self, rhs) - } } }; } diff --git a/src/daft-core/src/series/array_impl/logical_array.rs b/src/daft-core/src/series/array_impl/logical_array.rs index 9076907579..759af30ccf 100644 --- a/src/daft-core/src/series/array_impl/logical_array.rs +++ b/src/daft-core/src/series/array_impl/logical_array.rs @@ -4,7 +4,7 @@ use super::{ArrayWrapper, IntoSeries, Series}; use crate::{ array::{ops::GroupIndices, prelude::*}, datatypes::prelude::*, - series::{array_impl::binary_ops::SeriesBinaryOps, DaftResult, SeriesLike}, + series::{DaftResult, SeriesLike}, with_match_integer_daft_types, }; @@ -165,53 +165,6 @@ macro_rules! impl_series_like_for_logical_array { ) .into_series()) } - - fn add(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::add(self, rhs) - } - - fn sub(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::sub(self, rhs) - } - - fn mul(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::mul(self, rhs) - } - - fn div(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::div(self, rhs) - } - - fn rem(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::rem(self, rhs) - } - fn and(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::and(self, rhs) - } - fn or(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::or(self, rhs) - } - fn xor(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::xor(self, rhs) - } - fn equal(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::equal(self, rhs) - } - fn not_equal(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::not_equal(self, rhs) - } - fn lt(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::lt(self, rhs) - } - fn lte(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::lte(self, rhs) - } - fn gt(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::gt(self, rhs) - } - fn gte(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::gte(self, rhs) - } } }; } diff --git a/src/daft-core/src/series/array_impl/mod.rs b/src/daft-core/src/series/array_impl/mod.rs index 6a3f0839ad..61f12aa926 100644 --- a/src/daft-core/src/series/array_impl/mod.rs +++ b/src/daft-core/src/series/array_impl/mod.rs @@ -1,4 +1,3 @@ -pub mod binary_ops; pub mod data_array; pub mod logical_array; pub mod nested_array; diff --git a/src/daft-core/src/series/array_impl/nested_array.rs b/src/daft-core/src/series/array_impl/nested_array.rs index 5a6adb8b4c..1bd618e616 100644 --- a/src/daft-core/src/series/array_impl/nested_array.rs +++ b/src/daft-core/src/series/array_impl/nested_array.rs @@ -9,7 +9,7 @@ use crate::{ FixedSizeListArray, ListArray, StructArray, }, datatypes::{BooleanArray, DataType, Field}, - series::{array_impl::binary_ops::SeriesBinaryOps, IntoSeries, Series, SeriesLike}, + series::{IntoSeries, Series, SeriesLike}, with_match_integer_daft_types, }; @@ -148,51 +148,6 @@ macro_rules! impl_series_like_for_nested_arrays { fn str_value(&self, idx: usize) -> DaftResult { self.0.str_value(idx) } - - fn add(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::add(self, rhs) - } - fn sub(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::sub(self, rhs) - } - fn mul(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::mul(self, rhs) - } - fn div(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::div(self, rhs) - } - fn rem(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::rem(self, rhs) - } - - fn and(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::and(self, rhs) - } - fn or(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::or(self, rhs) - } - fn xor(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::xor(self, rhs) - } - - fn equal(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::equal(self, rhs) - } - fn not_equal(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::not_equal(self, rhs) - } - fn lt(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::lt(self, rhs) - } - fn lte(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::lte(self, rhs) - } - fn gt(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::gt(self, rhs) - } - fn gte(&self, rhs: &Series) -> DaftResult { - SeriesBinaryOps::gte(self, rhs) - } } }; } diff --git a/src/daft-core/src/series/mod.rs b/src/daft-core/src/series/mod.rs index 0aa91d281c..128b1bd344 100644 --- a/src/daft-core/src/series/mod.rs +++ b/src/daft-core/src/series/mod.rs @@ -3,6 +3,7 @@ mod from; mod ops; mod serdes; mod series_like; +mod utils; use std::sync::Arc; pub use array_impl::IntoSeries; diff --git a/src/daft-core/src/series/ops/arithmetic.rs b/src/daft-core/src/series/ops/arithmetic.rs index b49eba83d4..1c9b1df7ea 100644 --- a/src/daft-core/src/series/ops/arithmetic.rs +++ b/src/daft-core/src/series/ops/arithmetic.rs @@ -1,18 +1,19 @@ use std::ops::{Add, Div, Mul, Rem, Sub}; use common_error::DaftResult; +use daft_schema::prelude::*; -use crate::series::Series; +#[cfg(feature = "python")] +use crate::series::utils::python_fn::run_python_binary_operator_fn; +use crate::{ + array::prelude::*, + datatypes::{InferDataType, Utf8Array}, + series::{utils::cast::cast_downcast_op, IntoSeries, Series}, + with_match_numeric_daft_types, +}; -macro_rules! impl_arithmetic_for_series { +macro_rules! impl_arithmetic_ref_for_series { ($trait:ident, $op:ident) => { - impl $trait for &Series { - type Output = DaftResult; - fn $op(self, rhs: Self) -> Self::Output { - self.inner.$op(rhs) - } - } - impl $trait for Series { type Output = DaftResult; fn $op(self, rhs: Self) -> Self::Output { @@ -22,11 +23,262 @@ macro_rules! impl_arithmetic_for_series { }; } -impl_arithmetic_for_series!(Add, add); -impl_arithmetic_for_series!(Sub, sub); -impl_arithmetic_for_series!(Mul, mul); -impl_arithmetic_for_series!(Div, div); -impl_arithmetic_for_series!(Rem, rem); +macro_rules! arithmetic_op_not_implemented { + ($lhs:expr, $op:expr, $rhs:expr, $output_ty:expr) => { + unimplemented!( + "No implementation for {} {} {} -> {}", + $lhs.data_type(), + $op, + $rhs.data_type(), + $output_ty, + ) + }; +} + +impl Add for &Series { + type Output = DaftResult; + fn add(self, rhs: Self) -> Self::Output { + let output_type = + InferDataType::from(self.data_type()).add(InferDataType::from(rhs.data_type()))?; + let lhs = self; + match &output_type { + #[cfg(feature = "python")] + DataType::Python => run_python_binary_operator_fn(lhs, rhs, "add"), + DataType::Utf8 => { + Ok(cast_downcast_op!(lhs, rhs, &DataType::Utf8, Utf8Array, add)?.into_series()) + } + output_type if output_type.is_numeric() => { + with_match_numeric_daft_types!(output_type, |$T| { + Ok(cast_downcast_op!(lhs, rhs, output_type, <$T as DaftDataType>::ArrayType, add)?.into_series()) + }) + } + output_type if output_type.is_fixed_size_numeric() => { + fixed_size_binary_op(lhs, rhs, output_type, FixedSizeBinaryOp::Add) + } + output_type + if output_type.is_temporal() || matches!(output_type, DataType::Duration(..)) => + { + match (self.data_type(), rhs.data_type()) { + (DataType::Date, DataType::Duration(..)) => { + let days = rhs.duration()?.cast_to_days()?; + let physical_result = self.date()?.physical.add(&days)?; + physical_result.cast(output_type) + } + (DataType::Duration(..), DataType::Date) => { + let days = lhs.duration()?.cast_to_days()?; + let physical_result = days.add(&rhs.date()?.physical)?; + physical_result.cast(output_type) + } + (DataType::Duration(..), DataType::Duration(..)) => { + let physical_result = + lhs.duration()?.physical.add(&rhs.duration()?.physical)?; + physical_result.cast(output_type) + } + (DataType::Timestamp(..), DataType::Duration(..)) => { + let physical_result = + self.timestamp()?.physical.add(&rhs.duration()?.physical)?; + physical_result.cast(output_type) + } + (DataType::Duration(..), DataType::Timestamp(..)) => { + let physical_result = + lhs.duration()?.physical.add(&rhs.timestamp()?.physical)?; + physical_result.cast(output_type) + } + _ => arithmetic_op_not_implemented!(self, "+", rhs, output_type), + } + } + _ => arithmetic_op_not_implemented!(self, "+", rhs, output_type), + } + } +} + +impl Sub for &Series { + type Output = DaftResult; + fn sub(self, rhs: Self) -> Self::Output { + let output_type = + InferDataType::from(self.data_type()).sub(InferDataType::from(rhs.data_type()))?; + let lhs = self; + match &output_type { + #[cfg(feature = "python")] + DataType::Python => run_python_binary_operator_fn(lhs, rhs, "sub"), + output_type if output_type.is_numeric() => { + with_match_numeric_daft_types!(output_type, |$T| { + Ok(cast_downcast_op!(lhs, rhs, output_type, <$T as DaftDataType>::ArrayType, sub)?.into_series()) + }) + } + output_type + if output_type.is_temporal() || matches!(output_type, DataType::Duration(..)) => + { + match (self.data_type(), rhs.data_type()) { + (DataType::Date, DataType::Duration(..)) => { + let days = rhs.duration()?.cast_to_days()?; + let physical_result = self.date()?.physical.sub(&days)?; + physical_result.cast(output_type) + } + (DataType::Date, DataType::Date) => { + let physical_result = self.date()?.physical.sub(&rhs.date()?.physical)?; + physical_result.cast(output_type) + } + (DataType::Duration(..), DataType::Duration(..)) => { + let physical_result = + lhs.duration()?.physical.sub(&rhs.duration()?.physical)?; + physical_result.cast(output_type) + } + (DataType::Timestamp(..), DataType::Duration(..)) => { + let physical_result = + self.timestamp()?.physical.sub(&rhs.duration()?.physical)?; + physical_result.cast(output_type) + } + (DataType::Timestamp(..), DataType::Timestamp(..)) => { + let physical_result = + self.timestamp()?.physical.sub(&rhs.timestamp()?.physical)?; + physical_result.cast(output_type) + } + _ => arithmetic_op_not_implemented!(self, "-", rhs, output_type), + } + } + output_type if output_type.is_fixed_size_numeric() => { + fixed_size_binary_op(lhs, rhs, output_type, FixedSizeBinaryOp::Sub) + } + _ => arithmetic_op_not_implemented!(self, "-", rhs, output_type), + } + } +} + +impl Mul for &Series { + type Output = DaftResult; + fn mul(self, rhs: Self) -> Self::Output { + let output_type = + InferDataType::from(self.data_type()).mul(InferDataType::from(rhs.data_type()))?; + let lhs = self; + match &output_type { + #[cfg(feature = "python")] + DataType::Python => run_python_binary_operator_fn(lhs, rhs, "mul"), + output_type if output_type.is_numeric() => { + with_match_numeric_daft_types!(output_type, |$T| { + Ok(cast_downcast_op!(lhs, rhs, output_type, <$T as DaftDataType>::ArrayType, mul)?.into_series()) + }) + } + output_type if output_type.is_fixed_size_numeric() => { + fixed_size_binary_op(lhs, rhs, output_type, FixedSizeBinaryOp::Mul) + } + _ => arithmetic_op_not_implemented!(self, "*", rhs, output_type), + } + } +} + +impl Div for &Series { + type Output = DaftResult; + fn div(self, rhs: Self) -> Self::Output { + let output_type = + InferDataType::from(self.data_type()).div(InferDataType::from(rhs.data_type()))?; + let lhs = self; + match &output_type { + #[cfg(feature = "python")] + DataType::Python => run_python_binary_operator_fn(lhs, rhs, "truediv"), + DataType::Float64 => { + Ok( + cast_downcast_op!(lhs, rhs, &DataType::Float64, Float64Array, div)? + .into_series(), + ) + } + output_type if output_type.is_fixed_size_numeric() => { + fixed_size_binary_op(lhs, rhs, output_type, FixedSizeBinaryOp::Div) + } + _ => arithmetic_op_not_implemented!(self, "/", rhs, output_type), + } + } +} + +impl Rem for &Series { + type Output = DaftResult; + fn rem(self, rhs: Self) -> Self::Output { + let output_type = + InferDataType::from(self.data_type()).rem(InferDataType::from(rhs.data_type()))?; + let lhs = self; + match &output_type { + #[cfg(feature = "python")] + DataType::Python => run_python_binary_operator_fn(lhs, rhs, "mod"), + output_type if output_type.is_numeric() => { + with_match_numeric_daft_types!(output_type, |$T| { + Ok(cast_downcast_op!(lhs, rhs, output_type, <$T as DaftDataType>::ArrayType, rem)?.into_series()) + }) + } + output_type if output_type.is_fixed_size_numeric() => { + fixed_size_binary_op(lhs, rhs, output_type, FixedSizeBinaryOp::Rem) + } + _ => arithmetic_op_not_implemented!(self, "%", rhs, output_type), + } + } +} +enum FixedSizeBinaryOp { + Add, + Sub, + Mul, + Div, + Rem, +} + +fn fixed_size_binary_op( + left: &Series, + right: &Series, + output_type: &DataType, + op: FixedSizeBinaryOp, +) -> DaftResult { + fn run_fixed_size_binary_op(lhs: &A, rhs: &A, op: FixedSizeBinaryOp) -> DaftResult + where + for<'a> &'a A: Add> + + Sub> + + Mul> + + Div> + + Rem>, + { + match op { + FixedSizeBinaryOp::Add => lhs.add(rhs), + FixedSizeBinaryOp::Sub => lhs.sub(rhs), + FixedSizeBinaryOp::Mul => lhs.mul(rhs), + FixedSizeBinaryOp::Div => lhs.div(rhs), + FixedSizeBinaryOp::Rem => lhs.rem(rhs), + } + } + + match (left.data_type(), right.data_type()) { + (DataType::FixedSizeList(..), DataType::FixedSizeList(..)) => { + let array = run_fixed_size_binary_op( + left.downcast::().unwrap(), + right.downcast::().unwrap(), + op, + )?; + Ok(array.into_series()) + } + (DataType::Embedding(..), DataType::Embedding(..)) => { + let physical = run_fixed_size_binary_op( + &left.downcast::().unwrap().physical, + &right.downcast::().unwrap().physical, + op, + )?; + let array = EmbeddingArray::new(Field::new(left.name(), output_type.clone()), physical); + Ok(array.into_series()) + } + (DataType::FixedShapeTensor(..), DataType::FixedShapeTensor(..)) => { + let physical = run_fixed_size_binary_op( + &left.downcast::().unwrap().physical, + &right.downcast::().unwrap().physical, + op, + )?; + let array = + FixedShapeTensorArray::new(Field::new(left.name(), output_type.clone()), physical); + Ok(array.into_series()) + } + (left, right) => unimplemented!("cannot add {left} and {right} types"), + } +} + +impl_arithmetic_ref_for_series!(Add, add); +impl_arithmetic_ref_for_series!(Sub, sub); +impl_arithmetic_ref_for_series!(Mul, mul); +impl_arithmetic_ref_for_series!(Div, div); +impl_arithmetic_ref_for_series!(Rem, rem); #[cfg(test)] mod tests { diff --git a/src/daft-core/src/series/ops/between.rs b/src/daft-core/src/series/ops/between.rs index 4e3d8c89d5..6c53cbb86c 100644 --- a/src/daft-core/src/series/ops/between.rs +++ b/src/daft-core/src/series/ops/between.rs @@ -1,7 +1,7 @@ use common_error::DaftResult; #[cfg(feature = "python")] -use crate::series::ops::py_between_op_utilfn; +use crate::series::utils::python_fn::py_between_op_utilfn; use crate::{ array::ops::DaftBetween, datatypes::{BooleanArray, DataType, InferDataType}, diff --git a/src/daft-core/src/series/ops/comparison.rs b/src/daft-core/src/series/ops/comparison.rs index 67ac7c66ec..2d0fd65c79 100644 --- a/src/daft-core/src/series/ops/comparison.rs +++ b/src/daft-core/src/series/ops/comparison.rs @@ -1,34 +1,68 @@ +use std::borrow::Cow; + use common_error::DaftResult; +use daft_schema::prelude::DataType; +#[cfg(feature = "python")] +use crate::series::utils::python_fn::run_python_binary_bool_operator; use crate::{ - array::ops::{DaftCompare, DaftLogical}, - datatypes::BooleanArray, - series::Series, + array::ops::DaftCompare, + datatypes::{BooleanArray, InferDataType}, + series::{utils::cast::cast_downcast_op, Series}, + with_match_comparable_daft_types, }; -macro_rules! call_inner { - ($fname:ident) => { - fn $fname(&self, other: &Series) -> Self::Output { - self.inner.$fname(other) +macro_rules! impl_compare_method { + ($fname:ident, $pyoperator:expr) => { + fn $fname(&self, rhs: &Series) -> Self::Output { + let lhs = self; + let (output_type, intermediate_type, comparison_type) = + InferDataType::from(self.data_type()) + .comparison_op(&InferDataType::from(rhs.data_type()))?; + assert_eq!( + output_type, + DataType::Boolean, + "All {} Comparisons should result in an Boolean output type, got {output_type}", + stringify!($fname) + ); + let (lhs, rhs) = if let Some(intermediate_type) = intermediate_type { + ( + Cow::Owned(lhs.cast(&intermediate_type)?), + Cow::Owned(rhs.cast(&intermediate_type)?), + ) + } else { + (Cow::Borrowed(lhs), Cow::Borrowed(rhs)) + }; + match comparison_type { + #[cfg(feature = "python")] + DataType::Python => { + let output = + run_python_binary_bool_operator(&lhs, &rhs, stringify!($pyoperator))?; + let bool_array = output + .bool() + .expect("We expected a Boolean Series from this Python Comparison"); + Ok(bool_array.clone()) + } + _ => with_match_comparable_daft_types!(comparison_type, |$T| { + cast_downcast_op!( + lhs, + rhs, + &comparison_type, + <$T as DaftDataType>::ArrayType, + $fname + ) + }), + } } }; } impl DaftCompare<&Self> for Series { type Output = DaftResult; - - call_inner!(equal); - call_inner!(not_equal); - call_inner!(lt); - call_inner!(lte); - call_inner!(gt); - call_inner!(gte); -} - -impl DaftLogical<&Self> for Series { - type Output = DaftResult; - - call_inner!(and); - call_inner!(or); - call_inner!(xor); + impl_compare_method!(equal, eq); + impl_compare_method!(not_equal, ne); + impl_compare_method!(lt, lt); + impl_compare_method!(lte, le); + impl_compare_method!(gt, gt); + impl_compare_method!(gte, ge); } diff --git a/src/daft-core/src/series/ops/is_in.rs b/src/daft-core/src/series/ops/is_in.rs index 7b2386b745..d6655d4bb9 100644 --- a/src/daft-core/src/series/ops/is_in.rs +++ b/src/daft-core/src/series/ops/is_in.rs @@ -1,7 +1,7 @@ use common_error::DaftResult; #[cfg(feature = "python")] -use crate::series::ops::py_membership_op_utilfn; +use crate::series::utils::python_fn::py_membership_op_utilfn; use crate::{ array::ops::DaftIsIn, datatypes::{BooleanArray, DataType, InferDataType}, diff --git a/src/daft-core/src/series/ops/logical.rs b/src/daft-core/src/series/ops/logical.rs new file mode 100644 index 0000000000..02dc1bfe3b --- /dev/null +++ b/src/daft-core/src/series/ops/logical.rs @@ -0,0 +1,117 @@ +use common_error::DaftResult; +use daft_schema::dtype::DataType; + +#[cfg(feature = "python")] +use crate::series::utils::python_fn::run_python_binary_bool_operator; +use crate::{ + array::ops::DaftLogical, + datatypes::InferDataType, + prelude::BooleanArray, + series::{utils::cast::cast_downcast_op, IntoSeries, Series}, + with_match_integer_daft_types, +}; +macro_rules! logical_op_not_implemented { + ($self:expr, $rhs:expr, $op:ident) => {{ + let left_dtype = $self.data_type(); + let right_dtype = $rhs.data_type(); + let op_name = stringify!($op); + return Err(common_error::DaftError::ComputeError(format!( + "Logical Op: {op_name} not implemented for {left_dtype}, {right_dtype}" + ))); + }}; +} + +impl DaftLogical<&Self> for Series { + type Output = DaftResult; + + fn and(&self, rhs: &Self) -> Self::Output { + let lhs = self; + let output_type = InferDataType::from(lhs.data_type()) + .logical_op(&InferDataType::from(rhs.data_type()))?; + match &output_type { + DataType::Boolean => match (lhs.data_type(), rhs.data_type()) { + #[cfg(feature = "python")] + (DataType::Python, _) | (_, DataType::Python) => { + run_python_binary_bool_operator(lhs, rhs, "and_") + } + _ => Ok( + cast_downcast_op!(lhs, rhs, &DataType::Boolean, BooleanArray, and)? + .into_series(), + ), + }, + output_type if output_type.is_integer() => { + with_match_integer_daft_types!(output_type, |$T| { + Ok(cast_downcast_op!( + self, + rhs, + output_type, + <$T as DaftDataType>::ArrayType, + and + )?.into_series()) + }) + } + + _ => logical_op_not_implemented!(self, rhs, and), + } + } + + fn or(&self, rhs: &Self) -> Self::Output { + let lhs = self; + let output_type = InferDataType::from(self.data_type()) + .logical_op(&InferDataType::from(rhs.data_type()))?; + match &output_type { + DataType::Boolean => match (lhs.data_type(), rhs.data_type()) { + #[cfg(feature = "python")] + (DataType::Python, _) | (_, DataType::Python) => { + run_python_binary_bool_operator(lhs, rhs, "or_") + } + _ => Ok( + cast_downcast_op!(lhs, rhs, &DataType::Boolean, BooleanArray, or)? + .into_series(), + ), + }, + output_type if output_type.is_integer() => { + with_match_integer_daft_types!(output_type, |$T| { + Ok(cast_downcast_op!( + self, + rhs, + output_type, + <$T as DaftDataType>::ArrayType, + or + )?.into_series()) + }) + } + _ => logical_op_not_implemented!(self, rhs, or), + } + } + + fn xor(&self, rhs: &Self) -> Self::Output { + let lhs = self; + let output_type = InferDataType::from(self.data_type()) + .logical_op(&InferDataType::from(rhs.data_type()))?; + match &output_type { + DataType::Boolean => match (lhs.data_type(), rhs.data_type()) { + #[cfg(feature = "python")] + (DataType::Python, _) | (_, DataType::Python) => { + run_python_binary_bool_operator(lhs, rhs, "xor") + } + _ => Ok( + cast_downcast_op!(lhs, rhs, &DataType::Boolean, BooleanArray, xor)? + .into_series(), + ), + }, + output_type if output_type.is_integer() => { + with_match_integer_daft_types!(output_type, |$T| { + Ok(cast_downcast_op!( + self, + rhs, + output_type, + <$T as DaftDataType>::ArrayType, + xor + )?.into_series()) + }) + } + _ => logical_op_not_implemented!(self, rhs, xor), + } + } +} diff --git a/src/daft-core/src/series/ops/mod.rs b/src/daft-core/src/series/ops/mod.rs index 59f04fcd6b..1c01a200bb 100644 --- a/src/daft-core/src/series/ops/mod.rs +++ b/src/daft-core/src/series/ops/mod.rs @@ -25,6 +25,7 @@ pub mod is_in; pub mod len; pub mod list; pub mod log; +pub mod logical; pub mod map; pub mod minhash; pub mod not; @@ -53,144 +54,3 @@ pub fn cast_series_to_supertype(series: &[&Series]) -> DaftResult> { series.iter().map(|s| s.cast(&supertype)).collect() } - -#[cfg(feature = "python")] -macro_rules! py_binary_op_utilfn { - ($lhs:expr, $rhs:expr, $pyoperator:expr, $utilfn:expr) => {{ - use pyo3::prelude::*; - - use crate::{datatypes::DataType, python::PySeries}; - - let lhs = $lhs.cast(&DataType::Python)?; - let rhs = $rhs.cast(&DataType::Python)?; - - let (lhs, rhs) = match (lhs.len(), rhs.len()) { - (a, b) if a == b => (lhs, rhs), - (a, 1) => (lhs, rhs.broadcast(a)?), - (1, b) => (lhs.broadcast(b)?, rhs), - (a, b) => panic!("Cannot apply operation on arrays of different lengths: {a} vs {b}"), - }; - - let left_pylist = PySeries::from(lhs.clone()).to_pylist()?; - let right_pylist = PySeries::from(rhs.clone()).to_pylist()?; - - let result_series: Series = Python::with_gil(|py| -> PyResult { - let py_operator = PyModule::import_bound(py, pyo3::intern!(py, "operator"))? - .getattr(pyo3::intern!(py, $pyoperator))?; - - let result_pylist = PyModule::import_bound(py, pyo3::intern!(py, "daft.utils"))? - .getattr(pyo3::intern!(py, $utilfn))? - .call1((py_operator, left_pylist, right_pylist))?; - - PyModule::import_bound(py, pyo3::intern!(py, "daft.series"))? - .getattr(pyo3::intern!(py, "Series"))? - .getattr(pyo3::intern!(py, "from_pylist"))? - .call1((result_pylist, lhs.name(), pyo3::intern!(py, "disallow")))? - .getattr(pyo3::intern!(py, "_series"))? - .extract() - })? - .into(); - - result_series - }}; -} -#[cfg(feature = "python")] -pub(super) use py_binary_op_utilfn; - -#[cfg(feature = "python")] -pub(super) fn py_membership_op_utilfn(lhs: &Series, rhs: &Series) -> DaftResult { - use pyo3::prelude::*; - - use crate::{datatypes::DataType, python::PySeries}; - - let lhs_casted = lhs.cast(&DataType::Python)?; - let rhs_casted = rhs.cast(&DataType::Python)?; - - let left_pylist = PySeries::from(lhs_casted.clone()).to_pylist()?; - let right_pylist = PySeries::from(rhs_casted.clone()).to_pylist()?; - - let result_series: Series = Python::with_gil(|py| -> PyResult { - let result_pylist = PyModule::import_bound(py, pyo3::intern!(py, "daft.utils"))? - .getattr(pyo3::intern!(py, "python_list_membership_check"))? - .call1((left_pylist, right_pylist))?; - - PyModule::import_bound(py, pyo3::intern!(py, "daft.series"))? - .getattr(pyo3::intern!(py, "Series"))? - .getattr(pyo3::intern!(py, "from_pylist"))? - .call1(( - result_pylist, - lhs_casted.name(), - pyo3::intern!(py, "disallow"), - ))? - .getattr(pyo3::intern!(py, "_series"))? - .extract() - })? - .into(); - - Ok(result_series) -} - -#[cfg(feature = "python")] -pub(super) fn py_between_op_utilfn( - value: &Series, - lower: &Series, - upper: &Series, -) -> DaftResult { - use pyo3::prelude::*; - - use crate::{datatypes::DataType, python::PySeries}; - - let value_casted = value.cast(&DataType::Python)?; - let lower_casted = lower.cast(&DataType::Python)?; - let upper_casted = upper.cast(&DataType::Python)?; - - let (value_casted, lower_casted, upper_casted) = - match (value_casted.len(), lower_casted.len(), upper_casted.len()) { - (a, b, c) if a == b && b == c => (value_casted, lower_casted, upper_casted), - (1, a, b) if a == b => (value_casted.broadcast(a)?, lower_casted, upper_casted), - (a, 1, b) if a == b => (value_casted, lower_casted.broadcast(a)?, upper_casted), - (a, b, 1) if a == b => (value_casted, lower_casted, upper_casted.broadcast(a)?), - (a, 1, 1) => ( - value_casted, - lower_casted.broadcast(a)?, - upper_casted.broadcast(a)?, - ), - (1, a, 1) => ( - value_casted.broadcast(a)?, - lower_casted, - upper_casted.broadcast(a)?, - ), - (1, 1, a) => ( - value_casted.broadcast(a)?, - lower_casted.broadcast(a)?, - upper_casted, - ), - (a, b, c) => { - panic!("Cannot apply operation on arrays of different lengths: {a} vs {b} vs {c}") - } - }; - - let value_pylist = PySeries::from(value_casted.clone()).to_pylist()?; - let lower_pylist = PySeries::from(lower_casted.clone()).to_pylist()?; - let upper_pylist = PySeries::from(upper_casted.clone()).to_pylist()?; - - let result_series: Series = Python::with_gil(|py| -> PyResult { - let result_pylist = PyModule::import_bound(py, pyo3::intern!(py, "daft.utils"))? - .getattr(pyo3::intern!(py, "python_list_between_check"))? - .call1((value_pylist, lower_pylist, upper_pylist))?; - - PyModule::import_bound(py, pyo3::intern!(py, "daft.series"))? - .getattr(pyo3::intern!(py, "Series"))? - .getattr(pyo3::intern!(py, "from_pylist"))? - .call1(( - result_pylist, - value_casted.name(), - pyo3::intern!(py, "disallow"), - ))? - .getattr(pyo3::intern!(py, "_series"))? - .extract() - })? - .into(); - - Ok(result_series) -} diff --git a/src/daft-core/src/series/series_like.rs b/src/daft-core/src/series/series_like.rs index 463892c8bd..9d152693d9 100644 --- a/src/daft-core/src/series/series_like.rs +++ b/src/daft-core/src/series/series_like.rs @@ -34,18 +34,4 @@ pub trait SeriesLike: Send + Sync + Any + std::fmt::Debug { fn slice(&self, start: usize, end: usize) -> DaftResult; fn take(&self, idx: &Series) -> DaftResult; fn str_value(&self, idx: usize) -> DaftResult; - fn add(&self, rhs: &Series) -> DaftResult; - fn sub(&self, rhs: &Series) -> DaftResult; - fn mul(&self, rhs: &Series) -> DaftResult; - fn div(&self, rhs: &Series) -> DaftResult; - fn rem(&self, rhs: &Series) -> DaftResult; - fn and(&self, rhs: &Series) -> DaftResult; - fn or(&self, rhs: &Series) -> DaftResult; - fn xor(&self, rhs: &Series) -> DaftResult; - fn equal(&self, rhs: &Series) -> DaftResult; - fn not_equal(&self, rhs: &Series) -> DaftResult; - fn lt(&self, rhs: &Series) -> DaftResult; - fn lte(&self, rhs: &Series) -> DaftResult; - fn gt(&self, rhs: &Series) -> DaftResult; - fn gte(&self, rhs: &Series) -> DaftResult; } diff --git a/src/daft-core/src/series/utils/mod.rs b/src/daft-core/src/series/utils/mod.rs new file mode 100644 index 0000000000..a262af9755 --- /dev/null +++ b/src/daft-core/src/series/utils/mod.rs @@ -0,0 +1,14 @@ +#[cfg(feature = "python")] +pub(super) mod python_fn; +pub(crate) mod cast { + macro_rules! cast_downcast_op { + ($lhs:expr, $rhs:expr, $ty_expr:expr, $ty_type:ty, $op:ident) => {{ + let lhs = $lhs.cast($ty_expr)?; + let rhs = $rhs.cast($ty_expr)?; + let lhs = lhs.downcast::<$ty_type>()?; + let rhs = rhs.downcast::<$ty_type>()?; + lhs.$op(rhs) + }}; + } + pub(crate) use cast_downcast_op; +} diff --git a/src/daft-core/src/series/utils/python_fn.rs b/src/daft-core/src/series/utils/python_fn.rs new file mode 100644 index 0000000000..2fb9112775 --- /dev/null +++ b/src/daft-core/src/series/utils/python_fn.rs @@ -0,0 +1,157 @@ +use common_error::DaftResult; + +use crate::series::Series; + +pub(crate) fn run_python_binary_operator_fn( + lhs: &Series, + rhs: &Series, + operator_fn: &str, +) -> DaftResult { + python_binary_op_with_utilfn(lhs, rhs, operator_fn, "map_operator_arrow_semantics") +} + +pub(crate) fn run_python_binary_bool_operator( + lhs: &Series, + rhs: &Series, + operator_fn: &str, +) -> DaftResult { + python_binary_op_with_utilfn(lhs, rhs, operator_fn, "map_operator_arrow_semantics_bool") +} + +fn python_binary_op_with_utilfn( + lhs: &Series, + rhs: &Series, + operator_fn: &str, + util_fn: &str, +) -> DaftResult { + use pyo3::prelude::*; + + use crate::{datatypes::DataType, python::PySeries}; + + let lhs = lhs.cast(&DataType::Python)?; + let rhs = rhs.cast(&DataType::Python)?; + + let (lhs, rhs) = match (lhs.len(), rhs.len()) { + (a, b) if a == b => (lhs, rhs), + (a, 1) => (lhs, rhs.broadcast(a)?), + (1, b) => (lhs.broadcast(b)?, rhs), + (a, b) => panic!("Cannot apply operation on arrays of different lengths: {a} vs {b}"), + }; + + let left_pylist = PySeries::from(lhs.clone()).to_pylist()?; + let right_pylist = PySeries::from(rhs.clone()).to_pylist()?; + + let result_series: Series = Python::with_gil(|py| -> PyResult { + let py_operator = + PyModule::import_bound(py, pyo3::intern!(py, "operator"))?.getattr(operator_fn)?; + + let result_pylist = PyModule::import_bound(py, pyo3::intern!(py, "daft.utils"))? + .getattr(util_fn)? + .call1((py_operator, left_pylist, right_pylist))?; + + PyModule::import_bound(py, pyo3::intern!(py, "daft.series"))? + .getattr(pyo3::intern!(py, "Series"))? + .getattr(pyo3::intern!(py, "from_pylist"))? + .call1((result_pylist, lhs.name(), pyo3::intern!(py, "disallow")))? + .getattr(pyo3::intern!(py, "_series"))? + .extract() + })? + .into(); + Ok(result_series) +} + +pub(crate) fn py_membership_op_utilfn(lhs: &Series, rhs: &Series) -> DaftResult { + use pyo3::prelude::*; + + use crate::{datatypes::DataType, python::PySeries}; + + let lhs_casted = lhs.cast(&DataType::Python)?; + let rhs_casted = rhs.cast(&DataType::Python)?; + + let left_pylist = PySeries::from(lhs_casted.clone()).to_pylist()?; + let right_pylist = PySeries::from(rhs_casted.clone()).to_pylist()?; + + let result_series: Series = Python::with_gil(|py| -> PyResult { + let result_pylist = PyModule::import_bound(py, pyo3::intern!(py, "daft.utils"))? + .getattr(pyo3::intern!(py, "python_list_membership_check"))? + .call1((left_pylist, right_pylist))?; + + PyModule::import_bound(py, pyo3::intern!(py, "daft.series"))? + .getattr(pyo3::intern!(py, "Series"))? + .getattr(pyo3::intern!(py, "from_pylist"))? + .call1(( + result_pylist, + lhs_casted.name(), + pyo3::intern!(py, "disallow"), + ))? + .getattr(pyo3::intern!(py, "_series"))? + .extract() + })? + .into(); + + Ok(result_series) +} + +pub(crate) fn py_between_op_utilfn( + value: &Series, + lower: &Series, + upper: &Series, +) -> DaftResult { + use pyo3::prelude::*; + + use crate::{datatypes::DataType, python::PySeries}; + + let value_casted = value.cast(&DataType::Python)?; + let lower_casted = lower.cast(&DataType::Python)?; + let upper_casted = upper.cast(&DataType::Python)?; + + let (value_casted, lower_casted, upper_casted) = + match (value_casted.len(), lower_casted.len(), upper_casted.len()) { + (a, b, c) if a == b && b == c => (value_casted, lower_casted, upper_casted), + (1, a, b) if a == b => (value_casted.broadcast(a)?, lower_casted, upper_casted), + (a, 1, b) if a == b => (value_casted, lower_casted.broadcast(a)?, upper_casted), + (a, b, 1) if a == b => (value_casted, lower_casted, upper_casted.broadcast(a)?), + (a, 1, 1) => ( + value_casted, + lower_casted.broadcast(a)?, + upper_casted.broadcast(a)?, + ), + (1, a, 1) => ( + value_casted.broadcast(a)?, + lower_casted, + upper_casted.broadcast(a)?, + ), + (1, 1, a) => ( + value_casted.broadcast(a)?, + lower_casted.broadcast(a)?, + upper_casted, + ), + (a, b, c) => { + panic!("Cannot apply operation on arrays of different lengths: {a} vs {b} vs {c}") + } + }; + + let value_pylist = PySeries::from(value_casted.clone()).to_pylist()?; + let lower_pylist = PySeries::from(lower_casted.clone()).to_pylist()?; + let upper_pylist = PySeries::from(upper_casted.clone()).to_pylist()?; + + let result_series: Series = Python::with_gil(|py| -> PyResult { + let result_pylist = PyModule::import_bound(py, pyo3::intern!(py, "daft.utils"))? + .getattr(pyo3::intern!(py, "python_list_between_check"))? + .call1((value_pylist, lower_pylist, upper_pylist))?; + + PyModule::import_bound(py, pyo3::intern!(py, "daft.series"))? + .getattr(pyo3::intern!(py, "Series"))? + .getattr(pyo3::intern!(py, "from_pylist"))? + .call1(( + result_pylist, + value_casted.name(), + pyo3::intern!(py, "disallow"), + ))? + .getattr(pyo3::intern!(py, "_series"))? + .extract() + })? + .into(); + + Ok(result_series) +} diff --git a/src/daft-core/src/utils/mod.rs b/src/daft-core/src/utils/mod.rs index b270516ebd..2e039e6953 100644 --- a/src/daft-core/src/utils/mod.rs +++ b/src/daft-core/src/utils/mod.rs @@ -3,15 +3,3 @@ pub mod display; pub mod dyn_compare; pub mod identity_hash_set; pub mod supertype; - -#[macro_export] -macro_rules! impl_binary_trait_by_reference { - ($ty:ty, $trait:ident, $fname:ident) => { - impl $trait for $ty { - type Output = DaftResult<$ty>; - fn $fname(self, other: Self) -> Self::Output { - (&self).$fname(&other) - } - } - }; -}