diff --git a/Cargo.lock b/Cargo.lock index 592a0793ba..0295fc8c30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,6 +286,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", + "tracing", "zstd 0.12.4", ] @@ -2150,6 +2151,8 @@ dependencies = [ "pyo3", "serde", "serde_json", + "tracing", + "tracing-subscriber", ] [[package]] diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 9206c9da4d..60569f8e5c 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1273,6 +1273,7 @@ def dt_truncate(expr: PyExpr, interval: str, relative_to: PyExpr) -> PyExpr: ... # --- def explode(expr: PyExpr) -> PyExpr: ... def list_sort(expr: PyExpr, desc: PyExpr) -> PyExpr: ... +def list_value_counts(expr: PyExpr) -> PyExpr: ... def list_join(expr: PyExpr, delimiter: PyExpr) -> PyExpr: ... def list_count(expr: PyExpr, mode: CountMode) -> PyExpr: ... def list_get(expr: PyExpr, idx: PyExpr, default: PyExpr) -> PyExpr: ... diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index 1ae7e90dac..d48cd3176d 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -2922,6 +2922,37 @@ def join(self, delimiter: str | Expression) -> Expression: delimiter_expr = Expression._to_expression(delimiter) return Expression._from_pyexpr(native.list_join(self._expr, delimiter_expr._expr)) + # todo: do we want type to be a Map expression? how should we do this? + def value_counts(self) -> Expression: + """Counts the occurrences of each unique value in the list. + + Returns: + Expression: A Map expression where the keys are unique elements from the + original list of type X, and the values are UInt64 counts representing + the number of times each element appears in the list. + + Example: + >>> import daft + >>> df = daft.from_pydict({"letters": [["a", "b", "a"], ["b", "c", "b", "c"]]}) + >>> df.with_column("value_counts", df["letters"].list.value_counts()).collect() + ╭──────────────┬───────────────────╮ + │ letters ┆ value_counts │ + │ --- ┆ --- │ + │ List[Utf8] ┆ Map[Utf8: UInt64] │ + ╞══════════════╪═══════════════════╡ + │ [a, b, a] ┆ [{key: a, │ + │ ┆ value: 2, │ + │ ┆ }, {key: … │ + ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ + │ [b, c, b, c] ┆ [{key: b, │ + │ ┆ value: 2, │ + │ ┆ }, {key: … │ + ╰──────────────┴───────────────────╯ + + (Showing first 2 of 2 rows) + """ + return Expression._from_pyexpr(native.list_value_counts(self._expr)) + def count(self, mode: CountMode = CountMode.Valid) -> Expression: """Counts the number of elements in each list @@ -3069,21 +3100,21 @@ def get(self, key: Expression) -> Expression: >>> df = daft.from_arrow(pa.table({"map_col": pa_array})) >>> df = df.with_column("a", df["map_col"].map.get("a")) >>> df.show() - ╭──────────────────────────────────────┬───────╮ - │ map_col ┆ a │ - │ --- ┆ --- │ - │ Map[Struct[key: Utf8, value: Int64]] ┆ Int64 │ - ╞══════════════════════════════════════╪═══════╡ - │ [{key: a, ┆ 1 │ - │ value: 1, ┆ │ - │ }] ┆ │ - ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ - │ [] ┆ None │ - ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ - │ [{key: b, ┆ None │ - │ value: 2, ┆ │ - │ }] ┆ │ - ╰──────────────────────────────────────┴───────╯ + ╭──────────────────┬───────╮ + │ map_col ┆ a │ + │ --- ┆ --- │ + │ Map[Utf8: Int64] ┆ Int64 │ + ╞══════════════════╪═══════╡ + │ [{key: a, ┆ 1 │ + │ value: 1, ┆ │ + │ }] ┆ │ + ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ + │ [] ┆ None │ + ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ + │ [{key: b, ┆ None │ + │ value: 2, ┆ │ + │ }] ┆ │ + ╰──────────────────┴───────╯ (Showing first 3 of 3 rows) diff --git a/src/arrow2/Cargo.toml b/src/arrow2/Cargo.toml index b3862e34f0..374f73de83 100644 --- a/src/arrow2/Cargo.toml +++ b/src/arrow2/Cargo.toml @@ -71,6 +71,7 @@ streaming-iterator = {version = "0.1", optional = true} # for division/remainder optimization at runtime strength_reduce = {version = "0.2", optional = true} thiserror = {workspace = true} +tracing = "0.1.40" zstd = {version = "0.12", optional = true} # parquet support diff --git a/src/arrow2/src/array/list/mod.rs b/src/arrow2/src/array/list/mod.rs index 3948e12002..6d0735ca04 100644 --- a/src/arrow2/src/array/list/mod.rs +++ b/src/arrow2/src/array/list/mod.rs @@ -209,12 +209,18 @@ impl ListArray { if O::IS_LARGE { match data_type.to_logical_type() { DataType::LargeList(child) => Ok(child.as_ref()), - _ => Err(Error::oos("ListArray expects DataType::LargeList")), + got => { + let msg = format!("ListArray expects DataType::LargeList, but got {got:?}"); + Err(Error::oos(msg)) + }, } } else { match data_type.to_logical_type() { DataType::List(child) => Ok(child.as_ref()), - _ => Err(Error::oos("ListArray expects DataType::List")), + got => { + let msg = format!("ListArray expects DataType::List, but got {got:?}"); + Err(Error::oos(msg)) + }, } } } diff --git a/src/arrow2/src/array/map/mod.rs b/src/arrow2/src/array/map/mod.rs index d0dcb46efb..d3cece71b3 100644 --- a/src/arrow2/src/array/map/mod.rs +++ b/src/arrow2/src/array/map/mod.rs @@ -1,3 +1,4 @@ +use super::{new_empty_array, specification::try_check_offsets_bounds, Array, ListArray}; use crate::{ bitmap::Bitmap, datatypes::{DataType, Field}, @@ -5,14 +6,13 @@ use crate::{ offset::OffsetsBuffer, }; -use super::{new_empty_array, specification::try_check_offsets_bounds, Array}; - mod ffi; pub(super) mod fmt; mod iterator; #[allow(unused)] pub use iterator::*; + /// An array representing a (key, value), both of arbitrary logical types. #[derive(Clone)] pub struct MapArray { @@ -41,20 +41,27 @@ impl MapArray { try_check_offsets_bounds(&offsets, field.len())?; let inner_field = Self::try_get_field(&data_type)?; - if let DataType::Struct(inner) = inner_field.data_type() { - if inner.len() != 2 { - return Err(Error::InvalidArgumentError( - "MapArray's inner `Struct` must have 2 fields (keys and maps)".to_string(), - )); - } - } else { + + let inner_data_type = inner_field.data_type(); + let DataType::Struct(inner) = inner_data_type else { return Err(Error::InvalidArgumentError( - "MapArray expects `DataType::Struct` as its inner logical type".to_string(), + format!("MapArray expects `DataType::Struct` as its inner logical type, but found {inner_data_type:?}"), )); + }; + + let inner_len = inner.len(); + if inner_len != 2 { + let msg = format!( + "MapArray's inner `Struct` must have 2 fields (keys and maps), but found {} fields", + inner_len + ); + return Err(Error::InvalidArgumentError(msg)); } - if field.data_type() != inner_field.data_type() { + + let field_data_type = field.data_type(); + if field_data_type != inner_field.data_type() { return Err(Error::InvalidArgumentError( - "MapArray expects `field.data_type` to match its inner DataType".to_string(), + format!("MapArray expects `field.data_type` to match its inner DataType, but found \n{field_data_type:?}\nvs\n\n\n{inner_field:?}"), )); } @@ -195,6 +202,43 @@ impl MapArray { impl Array for MapArray { impl_common_array!(); + fn convert_logical_type(&self, target: DataType) -> Box { + tracing::trace!("converting logical type to\n{target:#?}"); + let outer_is_map = matches!(target, DataType::Map { .. }); + + if outer_is_map { + // we can do simple conversion + let mut new = self.to_boxed(); + new.change_type(target); + return new; + } + + let DataType::LargeList(target_inner) = &target else { + panic!("MapArray can only be converted to Map or LargeList"); + }; + + let DataType::Map(current_inner, _) = self.data_type() else { + unreachable!("Somehow DataType is not Map for a MapArray"); + }; + + let current_inner_physical = current_inner.data_type.to_physical_type(); + let target_inner_physical = target_inner.data_type.to_physical_type(); + + if current_inner_physical != target_inner_physical { + panic!("inner types are not equal"); + } + + let mut field = self.field.clone(); + field.change_type(target_inner.data_type.clone()); + + let offsets = self.offsets().clone(); + let offsets = offsets.map(|offset| offset as i64); + + let list = ListArray::new(target, offsets, field, self.validity.clone()); + + Box::new(list) + } + fn validity(&self) -> Option<&Bitmap> { self.validity.as_ref() } diff --git a/src/arrow2/src/array/mod.rs b/src/arrow2/src/array/mod.rs index f77cc5d60d..5ffc66e141 100644 --- a/src/arrow2/src/array/mod.rs +++ b/src/arrow2/src/array/mod.rs @@ -55,6 +55,10 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static { /// When the validity is [`None`], all slots are valid. fn validity(&self) -> Option<&Bitmap>; + fn direct_children<'a>(&'a mut self) -> Box + 'a> { + Box::new(core::iter::empty()) + } + /// The number of null slots on this [`Array`]. /// # Implementation /// This is `O(1)` since the number of null elements is pre-computed. @@ -183,7 +187,7 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static { /// ``` /// # Panics /// Panics iff the `data_type`'s [`PhysicalType`] is not equal to array's `PhysicalType`. - fn to_type(&self, data_type: DataType) -> Box { + fn convert_logical_type(&self, data_type: DataType) -> Box { let mut new = self.to_boxed(); new.change_type(data_type); new @@ -634,15 +638,26 @@ macro_rules! impl_common_array { fn change_type(&mut self, data_type: DataType) { if data_type.to_physical_type() != self.data_type().to_physical_type() { panic!( - "Converting array with logical type {:?} to logical type {:?} failed, physical types do not match: {:?} -> {:?}", + "Converting array with logical type\n{:#?}\n\nto logical type\n{:#?}\nfailed, physical types do not match: {:?} -> {:?}", self.data_type(), data_type, self.data_type().to_physical_type(), data_type.to_physical_type(), ); } - self.data_type = data_type; + + self.data_type = data_type.clone(); + + let mut children = self.direct_children(); + + data_type.direct_children(|child| { + let Some(child_elem) = children.next() else { + return; + }; + child_elem.change_type(child.clone()); + }) } + }; } @@ -737,7 +752,7 @@ pub(crate) use self::ffi::ToFfi; /// This is similar to [`Extend`], but accepted the creation to error. pub trait TryExtend { /// Fallible version of [`Extend::extend`]. - fn try_extend>(&mut self, iter: I) -> Result<()>; + fn try_extend>(&mut self, iter: I) -> Result<()>; } /// A trait describing the ability of a struct to receive new items. @@ -774,3 +789,78 @@ pub unsafe trait GenericBinaryArray: Array { /// The offsets of the array fn offsets(&self) -> &[O]; } + + +#[cfg(test)] +mod tests { + use super::*; + use crate::datatypes::{DataType, Field, TimeUnit, IntervalUnit}; + use crate::array::{Int32Array, Int64Array, Float32Array, Utf8Array, BooleanArray, ListArray, StructArray, UnionArray, MapArray}; + + #[test] + fn test_int32_to_date32() { + let array = Int32Array::from_slice([1, 2, 3]); + let result = array.convert_logical_type(DataType::Date32); + assert_eq!(result.data_type(), &DataType::Date32); + } + + #[test] + fn test_int64_to_timestamp() { + let array = Int64Array::from_slice([1000, 2000, 3000]); + let result = array.convert_logical_type(DataType::Timestamp(TimeUnit::Millisecond, None)); + assert_eq!(result.data_type(), &DataType::Timestamp(TimeUnit::Millisecond, None)); + } + + #[test] + fn test_boolean_to_boolean() { + let array = BooleanArray::from_slice([true, false, true]); + let result = array.convert_logical_type(DataType::Boolean); + assert_eq!(result.data_type(), &DataType::Boolean); + } + + #[test] + fn test_list_to_list() { + let values = Int32Array::from_slice([1, 2, 3, 4, 5]); + let offsets = vec![0, 2, 5]; + let list_array = ListArray::try_new( + DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + offsets.try_into().unwrap(), + Box::new(values), + None, + ).unwrap(); + let result = list_array.convert_logical_type(DataType::List(Box::new(Field::new("item", DataType::Int32, true)))); + assert_eq!(result.data_type(), &DataType::List(Box::new(Field::new("item", DataType::Int32, true)))); + } + + #[test] + fn test_struct_to_struct() { + let boolean = BooleanArray::from_slice([true, false, true]); + let int = Int32Array::from_slice([1, 2, 3]); + let struct_array = StructArray::try_new( + DataType::Struct(vec![ + Field::new("b", DataType::Boolean, true), + Field::new("i", DataType::Int32, true), + ]), + vec![ + Box::new(boolean) as Box, + Box::new(int) as Box, + ], + None, + ).unwrap(); + let result = struct_array.convert_logical_type(DataType::Struct(vec![ + Field::new("b", DataType::Boolean, true), + Field::new("i", DataType::Int32, true), + ])); + assert_eq!(result.data_type(), &DataType::Struct(vec![ + Field::new("b", DataType::Boolean, true), + Field::new("i", DataType::Int32, true), + ])); + } + + #[test] + #[should_panic] + fn test_invalid_conversion() { + let array = Int32Array::from_slice([1, 2, 3]); + array.convert_logical_type(DataType::Utf8); + } +} diff --git a/src/arrow2/src/array/struct_/mod.rs b/src/arrow2/src/array/struct_/mod.rs index fb2812375c..f096e1aeb6 100644 --- a/src/arrow2/src/array/struct_/mod.rs +++ b/src/arrow2/src/array/struct_/mod.rs @@ -1,3 +1,4 @@ +use std::ops::DerefMut; use crate::{ bitmap::Bitmap, datatypes::{DataType, Field}, @@ -246,6 +247,14 @@ impl StructArray { impl Array for StructArray { impl_common_array!(); + fn direct_children<'a>(&'a mut self) -> Box + 'a> { + let iter = self.values + .iter_mut() + .map(|x| x.deref_mut()); + + Box::new(iter) + } + fn validity(&self) -> Option<&Bitmap> { self.validity.as_ref() } diff --git a/src/arrow2/src/compute/cast/mod.rs b/src/arrow2/src/compute/cast/mod.rs index b48949b215..6ad12f2cb4 100644 --- a/src/arrow2/src/compute/cast/mod.rs +++ b/src/arrow2/src/compute/cast/mod.rs @@ -506,16 +506,16 @@ pub fn cast(array: &dyn Array, to_type: &DataType, options: CastOptions) -> Resu match (from_type, to_type) { (Null, _) | (_, Null) => Ok(new_null_array(to_type.clone(), array.len())), (Extension(_, from_inner, _), Extension(_, to_inner, _)) => { - let new_arr = cast(array.to_type(*from_inner.clone()).as_ref(), to_inner, options)?; - Ok(new_arr.to_type(to_type.clone())) + let new_arr = cast(array.convert_logical_type(*from_inner.clone()).as_ref(), to_inner, options)?; + Ok(new_arr.convert_logical_type(to_type.clone())) } (Extension(_, from_inner, _), _) => { - let new_arr = cast(array.to_type(*from_inner.clone()).as_ref(), to_type, options)?; + let new_arr = cast(array.convert_logical_type(*from_inner.clone()).as_ref(), to_type, options)?; Ok(new_arr) } (_, Extension(_, to_inner, _)) => { let new_arr = cast(array, to_inner, options)?; - Ok(new_arr.to_type(to_type.clone())) + Ok(new_arr.convert_logical_type(to_type.clone())) } (Struct(from_fields), Struct(to_fields)) => match (from_fields.len(), to_fields.len()) { (from_len, to_len) if from_len == to_len => { diff --git a/src/arrow2/src/datatypes/field.rs b/src/arrow2/src/datatypes/field.rs index 59eb894a3e..8239d7633a 100644 --- a/src/arrow2/src/datatypes/field.rs +++ b/src/arrow2/src/datatypes/field.rs @@ -25,6 +25,8 @@ pub struct Field { impl Field { /// Creates a new [`Field`]. pub fn new>(name: T, data_type: DataType, is_nullable: bool) -> Self { + let span = tracing::trace_span!("ArrowField::new", data_type = ?data_type, is_nullable); + let _guard = span.enter(); Field { name: name.into(), data_type, diff --git a/src/arrow2/src/datatypes/mod.rs b/src/arrow2/src/datatypes/mod.rs index 2debc5a4f2..54683fb02a 100644 --- a/src/arrow2/src/datatypes/mod.rs +++ b/src/arrow2/src/datatypes/mod.rs @@ -19,6 +19,10 @@ pub type Metadata = BTreeMap; /// typedef for [Option<(String, Option)>] descr pub(crate) type Extension = Option<(String, Option)>; +pub type ArrowDataType = DataType; +pub type ArrowField = Field; + + /// The set of supported logical types in this crate. /// /// Each variant uniquely identifies a logical type, which define specific semantics to the data @@ -159,6 +163,29 @@ pub enum DataType { Extension(String, Box, Option), } +impl DataType { + pub fn map(field: impl Into>, keys_sorted: bool) -> Self { + Self::Map(field.into(), keys_sorted) + } + + pub fn direct_children(&self, mut processor: impl FnMut(&DataType)) { + match self { + DataType::Null | DataType::Boolean | DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Decimal(..) | DataType::Decimal256(..) | + DataType::Int64 | DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 | + DataType::Float16 | DataType::Float32 | DataType::Float64 | DataType::Timestamp(_, _) | + DataType::Date32 | DataType::Date64 | DataType::Time32(_) | DataType::Time64(_) | + DataType::Duration(_) | DataType::Interval(_) | DataType::Binary | DataType::FixedSizeBinary(_) | + DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8 => {} + DataType::List(field) | DataType::FixedSizeList(field, _) | DataType::LargeList(field) | DataType::Map(field, ..) => processor(&field.data_type), + DataType::Struct(fields) | DataType::Union(fields, _, _) => fields.iter().for_each(|field| processor(&field.data_type)), + DataType::Dictionary(..) => { + // todo: not sure what to do here, going to just ignore for now + } + DataType::Extension(_, inner, _) => processor(inner), + } + } +} + /// Mode of [`DataType::Union`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum UnionMode { diff --git a/src/arrow2/src/offset.rs b/src/arrow2/src/offset.rs index 80b45d6680..bbebf585b3 100644 --- a/src/arrow2/src/offset.rs +++ b/src/arrow2/src/offset.rs @@ -71,7 +71,7 @@ impl Offsets { /// Creates a new [`Offsets`] from an iterator of lengths #[inline] - pub fn try_from_iter>(iter: I) -> Result { + pub fn try_from_iter>(iter: I) -> Result { let iterator = iter.into_iter(); let (lower, _) = iterator.size_hint(); let mut offsets = Self::with_capacity(lower); @@ -144,10 +144,7 @@ impl Offsets { /// Returns the last offset of this container. #[inline] pub fn last(&self) -> &O { - match self.0.last() { - Some(element) => element, - None => unsafe { unreachable_unchecked() }, - } + self.0.last().unwrap_or_else(|| unsafe { unreachable_unchecked() }) } /// Returns a range (start, end) corresponding to the position `index` @@ -215,7 +212,7 @@ impl Offsets { /// # Errors /// This function errors iff this operation overflows for the maximum value of `O`. #[inline] - pub fn try_from_lengths>(lengths: I) -> Result { + pub fn try_from_lengths>(lengths: I) -> Result { let mut self_ = Self::with_capacity(lengths.size_hint().0); self_.try_extend_from_lengths(lengths)?; Ok(self_) @@ -225,7 +222,7 @@ impl Offsets { /// # Errors /// This function errors iff this operation overflows for the maximum value of `O`. #[inline] - pub fn try_extend_from_lengths>( + pub fn try_extend_from_lengths>( &mut self, lengths: I, ) -> Result<(), Error> { @@ -338,7 +335,7 @@ fn try_check_offsets(offsets: &[O]) -> Result<(), Error> { /// * Every element is `>= 0` /// * element at position `i` is >= than element at position `i-1`. #[derive(Clone, PartialEq, Debug)] -pub struct OffsetsBuffer(Buffer); +pub struct OffsetsBuffer(Buffer); impl Default for OffsetsBuffer { #[inline] @@ -347,6 +344,17 @@ impl Default for OffsetsBuffer { } } +impl OffsetsBuffer { + pub fn map(&self, f: impl Fn(O) -> T) -> OffsetsBuffer { + let buffer = self.0.iter() + .copied() + .map(f) + .collect(); + + OffsetsBuffer(buffer) + } +} + impl OffsetsBuffer { /// # Safety /// This is safe iff the invariants of this struct are guaranteed in `offsets`. @@ -355,6 +363,7 @@ impl OffsetsBuffer { Self(offsets) } + /// Returns an empty [`OffsetsBuffer`] (i.e. with a single element, the zero) #[inline] pub fn new() -> Self { @@ -401,22 +410,26 @@ impl OffsetsBuffer { *self.last() - *self.first() } + pub fn ranges(&self) -> impl Iterator> + '_ { + self.0.windows(2).map(|w| { + let from = w[0]; + let to = w[1]; + debug_assert!(from <= to, "offsets must be monotonically increasing"); + from..to + }) + } + + /// Returns the first offset. #[inline] pub fn first(&self) -> &O { - match self.0.first() { - Some(element) => element, - None => unsafe { unreachable_unchecked() }, - } + self.0.first().unwrap_or_else(|| unsafe { unreachable_unchecked() }) } /// Returns the last offset. #[inline] pub fn last(&self) -> &O { - match self.0.last() { - Some(element) => element, - None => unsafe { unreachable_unchecked() }, - } + self.0.last().unwrap_or_else(|| unsafe { unreachable_unchecked() }) } /// Returns a range (start, end) corresponding to the position `index` @@ -460,7 +473,7 @@ impl OffsetsBuffer { /// Returns an iterator with the lengths of the offsets #[inline] - pub fn lengths(&self) -> impl Iterator + '_ { + pub fn lengths(&self) -> impl Iterator + '_ { self.0.windows(2).map(|w| (w[1] - w[0]).to_usize()) } diff --git a/src/daft-core/src/array/fixed_size_list_array.rs b/src/daft-core/src/array/fixed_size_list_array.rs index a8b5048b82..bf052b477b 100644 --- a/src/daft-core/src/array/fixed_size_list_array.rs +++ b/src/daft-core/src/array/fixed_size_list_array.rs @@ -1,16 +1,19 @@ use std::sync::Arc; +use arrow2::offset::OffsetsBuffer; use common_error::{DaftError, DaftResult}; use crate::{ array::growable::{Growable, GrowableArray}, datatypes::{DaftArrayType, DataType, Field}, + prelude::ListArray, series::Series, }; #[derive(Clone, Debug)] pub struct FixedSizeListArray { pub field: Arc, + /// contains all the elements of the nested lists flattened into a single contiguous array. pub flat_child: Series, validity: Option, } @@ -37,7 +40,7 @@ impl FixedSizeListArray { "FixedSizeListArray::new received values with len {} but expected it to match len of validity {} * size: {}", flat_child.len(), validity.len(), - (validity.len() * size), + validity.len() * size, ) } if child_dtype.as_ref() != flat_child.data_type() { @@ -174,6 +177,27 @@ impl FixedSizeListArray { validity, )) } + + fn generate_offsets(&self) -> OffsetsBuffer { + let size = self.fixed_element_len(); + let len = self.len(); + + // Create new offsets + let offsets: Vec = (0..=len) + .map(|i| i64::try_from(i * size).unwrap()) + .collect(); + + OffsetsBuffer::try_from(offsets).expect("Failed to create OffsetsBuffer") + } + + pub fn to_list(&self) -> ListArray { + ListArray::new( + self.field.clone(), + self.flat_child.clone(), + self.generate_offsets(), + self.validity.clone(), + ) + } } impl<'a> IntoIterator for &'a FixedSizeListArray { diff --git a/src/daft-core/src/array/list_array.rs b/src/daft-core/src/array/list_array.rs index 538c24e716..5b0058fbb6 100644 --- a/src/daft-core/src/array/list_array.rs +++ b/src/daft-core/src/array/list_array.rs @@ -12,6 +12,8 @@ use crate::{ pub struct ListArray { pub field: Arc, pub flat_child: Series, + + /// Where each row starts and ends. Null rows usually have the same start/end index, but this is not guaranteed. offsets: arrow2::offset::OffsetsBuffer, validity: Option, } diff --git a/src/daft-core/src/array/mod.rs b/src/daft-core/src/array/mod.rs index 7c300c6a38..6698b59eae 100644 --- a/src/daft-core/src/array/mod.rs +++ b/src/daft-core/src/array/mod.rs @@ -18,11 +18,12 @@ pub mod prelude; use std::{marker::PhantomData, sync::Arc}; use common_error::{DaftError, DaftResult}; +use daft_schema::field::DaftField; use crate::datatypes::{DaftArrayType, DaftPhysicalType, DataType, Field}; #[derive(Debug)] -pub struct DataArray { +pub struct DataArray { pub field: Arc, pub data: Box, marker_: PhantomData, @@ -40,30 +41,46 @@ impl DaftArrayType for DataArray { } } -impl DataArray -where - T: DaftPhysicalType, -{ - pub fn new(field: Arc, data: Box) -> DaftResult { +impl DataArray { + pub fn new( + physical_field: Arc, + arrow_array: Box, + ) -> DaftResult { assert!( - field.dtype.is_physical(), + physical_field.dtype.is_physical(), "Can only construct DataArray for PhysicalTypes, got {}", - field.dtype + physical_field.dtype ); - if let Ok(arrow_dtype) = field.dtype.to_physical().to_arrow() { - if !arrow_dtype.eq(data.data_type()) { + if let Ok(expected_arrow_physical_type) = physical_field.dtype.to_arrow() + // For instance Int32 -> Int 32 + { + let arrow_data_type = arrow_array.data_type(); // logical type + + if !expected_arrow_physical_type.eq(arrow_data_type) { panic!( - "expected {:?}, got {:?} when creating a new DataArray", - arrow_dtype, - data.data_type() - ) + "Mismatch between expected and actual Arrow types for DataArray.\n\ + Field name: {}\n\ + Logical type: {}\n\ + Physical type: {}\n\ + Expected Arrow physical type: {:?}\n\ + Actual Arrow Logical type: {:?} + + This error typically occurs when there's a discrepancy between the Daft DataType \ + and the underlying Arrow representation. Please ensure that the physical type \ + of the Daft DataType matches the Arrow type of the provided data.", + physical_field.name, + physical_field.dtype, + physical_field.dtype.to_physical(), + expected_arrow_physical_type, + arrow_data_type + ); } } Ok(Self { - field, - data, + field: physical_field, + data: arrow_array, marker_: PhantomData, }) } diff --git a/src/daft-core/src/array/ops/arrow2/comparison.rs b/src/daft-core/src/array/ops/arrow2/comparison.rs index 37f7b2a37b..700ab4f8d0 100644 --- a/src/daft-core/src/array/ops/arrow2/comparison.rs +++ b/src/daft-core/src/array/ops/arrow2/comparison.rs @@ -49,7 +49,7 @@ fn build_is_equal_with_nan( } } -fn build_is_equal( +pub fn build_is_equal( left: &dyn Array, right: &dyn Array, nulls_equal: bool, diff --git a/src/daft-core/src/array/ops/cast.rs b/src/daft-core/src/array/ops/cast.rs index c3dbe0c209..83af9605b0 100644 --- a/src/daft-core/src/array/ops/cast.rs +++ b/src/daft-core/src/array/ops/cast.rs @@ -2091,7 +2091,7 @@ impl ListArray { } } } - DataType::Map(..) => Ok(MapArray::new( + DataType::Map { .. } => Ok(MapArray::new( Field::new(self.name(), dtype.clone()), self.clone(), ) @@ -2198,7 +2198,10 @@ where { Python::with_gil(|py| { let arrow_dtype = array.data_type().to_arrow()?; - let arrow_array = array.as_arrow().to_type(arrow_dtype).with_validity(None); + let arrow_array = array + .as_arrow() + .convert_logical_type(arrow_dtype) + .with_validity(None); let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; let py_array: Vec = ffi::to_py_array(py, arrow_array.to_boxed(), &pyarrow)? .call_method0(pyo3::intern!(py, "to_pylist"))? diff --git a/src/daft-core/src/array/ops/from_arrow.rs b/src/daft-core/src/array/ops/from_arrow.rs index 1739b524a9..41235eda4d 100644 --- a/src/daft-core/src/array/ops/from_arrow.rs +++ b/src/daft-core/src/array/ops/from_arrow.rs @@ -30,17 +30,14 @@ where ::ArrayType: FromArrow, { fn from_arrow(field: FieldRef, arrow_arr: Box) -> DaftResult { - let data_array_field = Arc::new(Field::new(field.name.clone(), field.dtype.to_physical())); - let physical_arrow_arr = match field.dtype { - // TODO: Consolidate Map to use the same .to_type conversion as other logical types - // Currently, .to_type does not work for Map in Arrow2 because it requires physical types to be equivalent, - // but the physical type of MapArray in Arrow2 is a MapArray, not a ListArray - DataType::Map(..) => arrow_arr, - _ => arrow_arr.to_type(data_array_field.dtype.to_arrow()?), - }; + let target_convert = field.to_physical(); + let target_convert_arrow = target_convert.dtype.to_arrow()?; + + let physical_arrow_array = arrow_arr.convert_logical_type(target_convert_arrow.clone()); + let physical = ::ArrayType::from_arrow( - data_array_field, - physical_arrow_arr, + Arc::new(target_convert), + physical_arrow_array, )?; Ok(Self::new(field.clone(), physical)) } @@ -79,9 +76,9 @@ impl FromArrow for ListArray { DataType::List(daft_child_dtype), arrow2::datatypes::DataType::LargeList(arrow_child_field), ) => { - let arrow_arr = arrow_arr.to_type(arrow2::datatypes::DataType::LargeList( - arrow_child_field.clone(), - )); + let arrow_arr = arrow_arr.convert_logical_type( + arrow2::datatypes::DataType::LargeList(arrow_child_field.clone()), + ); let arrow_arr = arrow_arr .as_any() .downcast_ref::>() @@ -98,7 +95,7 @@ impl FromArrow for ListArray { arrow_arr.validity().cloned(), )) } - (DataType::List(daft_child_dtype), arrow2::datatypes::DataType::Map(..)) => { + (DataType::List(daft_child_dtype), arrow2::datatypes::DataType::Map { .. }) => { let map_arr = arrow_arr .as_any() .downcast_ref::() @@ -128,7 +125,7 @@ impl FromArrow for StructArray { match (&field.dtype, arrow_arr.data_type()) { (DataType::Struct(fields), arrow2::datatypes::DataType::Struct(arrow_fields)) => { if fields.len() != arrow_fields.len() { - return Err(DaftError::ValueError(format!("Attempting to create Daft StructArray with {} fields from Arrow array with {} fields: {} vs {:?}", fields.len(), arrow_fields.len(), &field.dtype, arrow_arr.data_type()))) + return Err(DaftError::ValueError(format!("Attempting to create Daft StructArray with {} fields from Arrow array with {} fields: {} vs {:?}", fields.len(), arrow_fields.len(), &field.dtype, arrow_arr.data_type()))); } let arrow_arr = arrow_arr.as_ref().as_any().downcast_ref::().unwrap(); @@ -143,7 +140,7 @@ impl FromArrow for StructArray { child_series, arrow_arr.validity().cloned(), )) - }, + } (d, a) => Err(DaftError::TypeError(format!("Attempting to create Daft StructArray with type {} from arrow array with type {:?}", d, a))) } } diff --git a/src/daft-core/src/array/ops/groups.rs b/src/daft-core/src/array/ops/groups.rs index 9676ef3a52..870c4d26bc 100644 --- a/src/daft-core/src/array/ops/groups.rs +++ b/src/daft-core/src/array/ops/groups.rs @@ -37,7 +37,7 @@ use crate::{ fn make_groups(iter: impl Iterator) -> DaftResult where T: Hash, - T: std::cmp::Eq, + T: Eq, { const DEFAULT_SIZE: usize = 256; let mut tbl = FnvHashMap::)>::with_capacity_and_hasher( @@ -56,15 +56,15 @@ where } } } - let mut s_indices = Vec::with_capacity(tbl.len()); - let mut g_indices = Vec::with_capacity(tbl.len()); + let mut sample_indices = Vec::with_capacity(tbl.len()); + let mut group_indices = Vec::with_capacity(tbl.len()); - for (s_idx, g_idx) in tbl.into_values() { - s_indices.push(s_idx); - g_indices.push(g_idx); + for (sample_index, group_index) in tbl.into_values() { + sample_indices.push(sample_index); + group_indices.push(group_index); } - Ok((s_indices, g_indices)) + Ok((sample_indices, group_indices)) } impl IntoGroups for DataArray diff --git a/src/daft-core/src/array/ops/list.rs b/src/daft-core/src/array/ops/list.rs index 4dd8cee2a8..068cd681bf 100644 --- a/src/daft-core/src/array/ops/list.rs +++ b/src/daft-core/src/array/ops/list.rs @@ -2,16 +2,24 @@ use std::{iter::repeat, sync::Arc}; use arrow2::offset::OffsetsBuffer; use common_error::DaftResult; +use indexmap::{ + map::{raw_entry_v1::RawEntryMut, RawEntryApiV1}, + IndexMap, +}; use super::as_arrow::AsArrow; use crate::{ array::{ growable::{make_growable, Growable}, - FixedSizeListArray, ListArray, + ops::arrow2::comparison::build_is_equal, + FixedSizeListArray, ListArray, StructArray, }, count_mode::CountMode, datatypes::{BooleanArray, DataType, Field, Int64Array, UInt64Array, Utf8Array}, + kernels::search_sorted::build_is_valid, + prelude::MapArray, series::{IntoSeries, Series}, + utils::identity_hash_set::IdentityBuildHasher, }; fn join_arrow_list_of_utf8s( @@ -244,6 +252,128 @@ fn list_sort_helper_fixed_size( } impl ListArray { + pub fn value_counts(&self) -> DaftResult { + struct IndexRef { + index: usize, + hash: u64, + } + + impl std::hash::Hash for IndexRef { + fn hash(&self, state: &mut H) { + self.hash.hash(state); + } + } + + let original_name = self.name(); + + let hashes = self.flat_child.hash(None)?; + + let flat_child = self.flat_child.to_arrow(); + let flat_child = &*flat_child; + + let is_equal = build_is_equal( + flat_child, flat_child, true, // todo: should nans be considered equal? + true, + )?; + + let is_valid = build_is_valid(flat_child); + + let key_type = self.flat_child.data_type().clone(); + let count_type = DataType::UInt64; + + let mut include_mask = Vec::with_capacity(self.flat_child.len()); + let mut count_array = Vec::new(); + + let mut offsets = Vec::with_capacity(self.len()); + + offsets.push(0_i64); + + let mut map: IndexMap = IndexMap::default(); + for range in self.offsets().ranges() { + map.clear(); + + for index in range { + let index = index as usize; + if !is_valid(index) { + include_mask.push(false); + // skip nulls + continue; + } + + let hash = hashes.get(index).unwrap(); + + let entry = map + .raw_entry_mut_v1() + .from_hash(hash, |other| is_equal(other.index, index)); + + match entry { + RawEntryMut::Occupied(mut entry) => { + include_mask.push(false); + *entry.get_mut() += 1; + } + RawEntryMut::Vacant(vacant) => { + include_mask.push(true); + vacant.insert(IndexRef { hash, index }, 1); + } + } + } + + // indexmap so ordered + for v in map.values() { + count_array.push(*v); + } + + offsets.push(count_array.len() as i64); + } + + let values = UInt64Array::from(("count", count_array)).into_series(); + let boolean_array = BooleanArray::from(("boolean", include_mask.as_slice())); + + let keys = self.flat_child.filter(&boolean_array)?; + + let keys = Series::try_from_field_and_arrow_array( + Field::new("key", key_type.clone()), + keys.to_arrow(), + )?; + + let values = Series::try_from_field_and_arrow_array( + Field::new("value", count_type.clone()), + values.to_arrow(), + )?; + + let struct_type = DataType::Struct(vec![ + Field::new("key", key_type.clone()), + Field::new("value", count_type.clone()), + ]); + + let struct_array = StructArray::new( + Arc::new(Field::new("entries", struct_type.clone())), + vec![keys, values], + None, + ); + + let list_type = DataType::List(Box::new(struct_type)); + + let offsets = OffsetsBuffer::try_from(offsets)?; + + let list_array = Self::new( + Arc::new(Field::new("entries", list_type.clone())), + struct_array.into_series(), + offsets, + None, + ); + + let map_type = DataType::Map { + key: Box::new(key_type), + value: Box::new(count_type), + }; + + Ok(MapArray::new( + Field::new(original_name, map_type.clone()), + list_array, + )) + } + pub fn count(&self, mode: CountMode) -> DaftResult { let counts = match (mode, self.flat_child.validity()) { (CountMode::All, _) | (CountMode::Valid, None) => { @@ -472,6 +602,11 @@ impl ListArray { } impl FixedSizeListArray { + pub fn value_counts(&self) -> DaftResult { + let list = self.to_list(); + list.value_counts() + } + pub fn count(&self, mode: CountMode) -> DaftResult { let size = self.fixed_element_len(); let counts = match (mode, self.flat_child.validity()) { diff --git a/src/daft-core/src/array/ops/map.rs b/src/daft-core/src/array/ops/map.rs index 3b2f6ffd8c..a1613ce19c 100644 --- a/src/daft-core/src/array/ops/map.rs +++ b/src/daft-core/src/array/ops/map.rs @@ -24,19 +24,10 @@ fn single_map_get(structs: &Series, key_to_get: &Series) -> DaftResult { impl MapArray { pub fn map_get(&self, key_to_get: &Series) -> DaftResult { - let value_type = if let DataType::Map(inner_dtype) = self.data_type() { - match *inner_dtype.clone() { - DataType::Struct(fields) if fields.len() == 2 => { - fields[1].dtype.clone() - } - _ => { - return Err(DaftError::TypeError(format!( - "Expected inner type to be a struct type with two fields: key and value, got {:?}", - inner_dtype - ))) - } - } - } else { + let DataType::Map { + value: value_type, .. + } = self.data_type() + else { return Err(DaftError::TypeError(format!( "Expected input to be a map type, got {:?}", self.data_type() @@ -49,7 +40,7 @@ impl MapArray { for series in self.physical.into_iter() { match series { Some(s) if !s.is_empty() => result.push(single_map_get(&s, key_to_get)?), - _ => result.push(Series::full_null("value", &value_type, 1)), + _ => result.push(Series::full_null("value", value_type, 1)), } } Series::concat(&result.iter().collect::>()) @@ -59,7 +50,7 @@ impl MapArray { for (i, series) in self.physical.into_iter().enumerate() { match (series, key_to_get.slice(i, i + 1)?) { (Some(s), k) if !s.is_empty() => result.push(single_map_get(&s, &k)?), - _ => result.push(Series::full_null("value", &value_type, 1)), + _ => result.push(Series::full_null("value", value_type, 1)), } } Series::concat(&result.iter().collect::>()) diff --git a/src/daft-core/src/array/serdes.rs b/src/daft-core/src/array/serdes.rs index cc908c0dd6..0976f53a0a 100644 --- a/src/daft-core/src/array/serdes.rs +++ b/src/daft-core/src/array/serdes.rs @@ -130,7 +130,11 @@ impl serde::Serialize for ExtensionArray { let mut s = serializer.serialize_map(Some(2))?; s.serialize_entry("field", self.field())?; let values = if let DataType::Extension(_, inner, _) = self.data_type() { - Series::try_from(("physical", self.data.to_type(inner.to_arrow().unwrap()))).unwrap() + Series::try_from(( + "physical", + self.data.convert_logical_type(inner.to_arrow().unwrap()), + )) + .unwrap() } else { panic!("Expected Extension Type!") }; diff --git a/src/daft-core/src/array/struct_array.rs b/src/daft-core/src/array/struct_array.rs index 996680ede5..8a228735e4 100644 --- a/src/daft-core/src/array/struct_array.rs +++ b/src/daft-core/src/array/struct_array.rs @@ -11,6 +11,8 @@ use crate::{ #[derive(Clone, Debug)] pub struct StructArray { pub field: Arc, + + /// Column representations pub children: Vec, validity: Option, len: usize, diff --git a/src/daft-core/src/datatypes/matching.rs b/src/daft-core/src/datatypes/matching.rs index b8b8e1660f..c275bb4a2d 100644 --- a/src/daft-core/src/datatypes/matching.rs +++ b/src/daft-core/src/datatypes/matching.rs @@ -8,43 +8,43 @@ macro_rules! with_match_daft_types {( use $crate::datatypes::*; match $key_type { - Null => __with_ty__! { NullType }, + // Float16 => unimplemented!("Array for Float16 DataType not implemented"), + Binary => __with_ty__! { BinaryType }, Boolean => __with_ty__! { BooleanType }, - Int8 => __with_ty__! { Int8Type }, + Date => __with_ty__! { DateType }, + Decimal128(..) => __with_ty__! { Decimal128Type }, + Duration(_) => __with_ty__! { DurationType }, + Embedding(..) => __with_ty__! { EmbeddingType }, + Extension(_, _, _) => __with_ty__! { ExtensionType }, + FixedShapeImage(..) => __with_ty__! { FixedShapeImageType }, + FixedShapeSparseTensor(..) => __with_ty__! { FixedShapeSparseTensorType }, + FixedShapeTensor(..) => __with_ty__! { FixedShapeTensorType }, + FixedSizeBinary(_) => __with_ty__! { FixedSizeBinaryType }, + FixedSizeList(_, _) => __with_ty__! { FixedSizeListType }, + Float32 => __with_ty__! { Float32Type }, + Float64 => __with_ty__! { Float64Type }, + Image(..) => __with_ty__! { ImageType }, + Int128 => __with_ty__! { Int128Type }, Int16 => __with_ty__! { Int16Type }, Int32 => __with_ty__! { Int32Type }, Int64 => __with_ty__! { Int64Type }, - Int128 => __with_ty__! { Int128Type }, - UInt8 => __with_ty__! { UInt8Type }, + Int8 => __with_ty__! { Int8Type }, + List(_) => __with_ty__! { ListType }, + Map{..} => __with_ty__! { MapType }, + Null => __with_ty__! { NullType }, + SparseTensor(..) => __with_ty__! { SparseTensorType }, + Struct(_) => __with_ty__! { StructType }, + Tensor(..) => __with_ty__! { TensorType }, + Time(_) => __with_ty__! { TimeType }, + Timestamp(_, _) => __with_ty__! { TimestampType }, UInt16 => __with_ty__! { UInt16Type }, UInt32 => __with_ty__! { UInt32Type }, UInt64 => __with_ty__! { UInt64Type }, - Float32 => __with_ty__! { Float32Type }, - Float64 => __with_ty__! { Float64Type }, - Timestamp(_, _) => __with_ty__! { TimestampType }, - Date => __with_ty__! { DateType }, - Time(_) => __with_ty__! { TimeType }, - Duration(_) => __with_ty__! { DurationType }, - Binary => __with_ty__! { BinaryType }, - FixedSizeBinary(_) => __with_ty__! { FixedSizeBinaryType }, + UInt8 => __with_ty__! { UInt8Type }, + Unknown => unimplemented!("Array for Unknown DataType not implemented"), Utf8 => __with_ty__! { Utf8Type }, - FixedSizeList(_, _) => __with_ty__! { FixedSizeListType }, - List(_) => __with_ty__! { ListType }, - Struct(_) => __with_ty__! { StructType }, - Map(_) => __with_ty__! { MapType }, - Extension(_, _, _) => __with_ty__! { ExtensionType }, #[cfg(feature = "python")] Python => __with_ty__! { PythonType }, - Embedding(..) => __with_ty__! { EmbeddingType }, - Image(..) => __with_ty__! { ImageType }, - FixedShapeImage(..) => __with_ty__! { FixedShapeImageType }, - Tensor(..) => __with_ty__! { TensorType }, - FixedShapeTensor(..) => __with_ty__! { FixedShapeTensorType }, - SparseTensor(..) => __with_ty__! { SparseTensorType }, - FixedShapeSparseTensor(..) => __with_ty__! { FixedShapeSparseTensorType }, - Decimal128(..) => __with_ty__! { Decimal128Type }, - // Float16 => unimplemented!("Array for Float16 DataType not implemented"), - Unknown => unimplemented!("Array for Unknown DataType not implemented"), // NOTE: We should not implement a default for match here, because this is meant to be // an exhaustive match across **all** Daft types. diff --git a/src/daft-core/src/lib.rs b/src/daft-core/src/lib.rs index 322a0db3ec..5892f75ffb 100644 --- a/src/daft-core/src/lib.rs +++ b/src/daft-core/src/lib.rs @@ -2,6 +2,7 @@ #![feature(int_roundings)] #![feature(iterator_try_reduce)] #![feature(if_let_guard)] +#![feature(hash_raw_entry)] pub mod array; pub mod count_mode; diff --git a/src/daft-core/src/series/from.rs b/src/daft-core/src/series/from.rs index 99776edf64..7bea517aa9 100644 --- a/src/daft-core/src/series/from.rs +++ b/src/daft-core/src/series/from.rs @@ -1,6 +1,8 @@ use std::sync::Arc; +use arrow2::datatypes::ArrowDataType; use common_error::{DaftError, DaftResult}; +use daft_schema::{dtype::DaftDataType, field::DaftField}; use super::Series; use crate::{ @@ -12,9 +14,10 @@ use crate::{ impl Series { pub fn try_from_field_and_arrow_array( - field: Arc, + field: impl Into>, array: Box, ) -> DaftResult { + let field = field.into(); // TODO(Nested): Refactor this out with nested logical types in StructArray and ListArray // Corner-case nested logical types that have not yet been migrated to new Array formats // to hold only casted physical arrow arrays. @@ -46,11 +49,90 @@ impl Series { impl TryFrom<(&str, Box)> for Series { type Error = DaftError; - fn try_from(item: (&str, Box)) -> DaftResult { - let (name, array) = item; - let source_arrow_type = array.data_type(); - let dtype: DataType = source_arrow_type.into(); + fn try_from((name, array): (&str, Box)) -> DaftResult { + println!("trying from {name:?} to {array:?}"); + let source_arrow_type: &ArrowDataType = array.data_type(); + let dtype = DaftDataType::from(source_arrow_type); let field = Arc::new(Field::new(name, dtype.clone())); Self::try_from_field_and_arrow_array(field, array) } } + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use arrow2::{ + array::Array, + datatypes::{ArrowDataType, ArrowField}, + }; + use common_error::DaftResult; + use daft_schema::dtype::DataType; + + static ARROW_DATA_TYPE: LazyLock = LazyLock::new(|| { + ArrowDataType::Map( + Box::new(ArrowField::new( + "entries", + ArrowDataType::Struct(vec![ + ArrowField::new("key", ArrowDataType::LargeUtf8, false), + ArrowField::new("value", ArrowDataType::Date32, true), + ]), + false, + )), + false, + ) + }); + + #[test] + fn test_map_type_conversion() { + let arrow_data_type = ARROW_DATA_TYPE.clone(); + let dtype = DataType::from(&arrow_data_type); + assert_eq!( + dtype, + DataType::Map { + key: Box::new(DataType::Utf8), + value: Box::new(DataType::Date), + }, + ) + } + + #[test] + fn test_() -> DaftResult<()> { + use arrow2::array::MapArray; + + use super::*; + + let arrow_array = MapArray::new( + ARROW_DATA_TYPE.clone(), + vec![0, 1].try_into().unwrap(), + Box::new(arrow2::array::StructArray::new( + ArrowDataType::Struct(vec![ + ArrowField::new("key", ArrowDataType::LargeUtf8, false), + ArrowField::new("value", ArrowDataType::Date32, true), + ]), + vec![ + Box::new(arrow2::array::Utf8Array::::from_slice(["key1"])), + arrow2::array::Int32Array::from_slice([1]) + .convert_logical_type(ArrowDataType::Date32), + ], + None, + )), + None, + ); + + let series = Series::try_from(( + "test_map", + Box::new(arrow_array) as Box, + ))?; + + assert_eq!( + series.data_type(), + &DataType::Map { + key: Box::new(DataType::Utf8), + value: Box::new(DataType::Date), + } + ); + + Ok(()) + } +} diff --git a/src/daft-core/src/series/mod.rs b/src/daft-core/src/series/mod.rs index 128b1bd344..597768e15d 100644 --- a/src/daft-core/src/series/mod.rs +++ b/src/daft-core/src/series/mod.rs @@ -117,6 +117,13 @@ impl Series { self.inner.validity() } + pub fn is_valid(&self, idx: usize) -> bool { + let Some(validity) = self.validity() else { + return true; + }; + validity.get_bit(idx) + } + /// Attempts to downcast the Series to a primitive slice /// This will return an error if the Series is not of the physical type `T` /// # Example diff --git a/src/daft-core/src/series/ops/list.rs b/src/daft-core/src/series/ops/list.rs index d9a17dd087..81a4788067 100644 --- a/src/daft-core/src/series/ops/list.rs +++ b/src/daft-core/src/series/ops/list.rs @@ -7,6 +7,22 @@ use crate::{ }; impl Series { + pub fn list_value_counts(&self) -> DaftResult { + let series = match self.data_type() { + DataType::List(_) => self.list()?.value_counts(), + DataType::FixedSizeList(..) => self.fixed_size_list()?.value_counts(), + dt => { + return Err(DaftError::TypeError(format!( + "List contains not implemented for {}", + dt + ))) + } + }? + .into_series(); + + Ok(series) + } + pub fn explode(&self) -> DaftResult { match self.data_type() { DataType::List(_) => self.list()?.explode(), diff --git a/src/daft-core/src/series/ops/map.rs b/src/daft-core/src/series/ops/map.rs index b624cd8aac..0d4e54820d 100644 --- a/src/daft-core/src/series/ops/map.rs +++ b/src/daft-core/src/series/ops/map.rs @@ -5,7 +5,7 @@ use crate::{datatypes::DataType, series::Series}; impl Series { pub fn map_get(&self, key: &Self) -> DaftResult { match self.data_type() { - DataType::Map(_) => self.map()?.map_get(key), + DataType::Map { .. } => self.map()?.map_get(key), dt => Err(DaftError::TypeError(format!( "map.get not implemented for {}", dt diff --git a/src/daft-core/src/series/serdes.rs b/src/daft-core/src/series/serdes.rs index bf7e42a1e0..76414e30e6 100644 --- a/src/daft-core/src/series/serdes.rs +++ b/src/daft-core/src/series/serdes.rs @@ -158,12 +158,13 @@ impl<'d> serde::Deserialize<'d> for Series { DataType::Extension(..) => { let physical = map.next_value::()?; let physical = physical.to_arrow(); - let ext_array = physical.to_type(field.dtype.to_arrow().unwrap()); + let ext_array = + physical.convert_logical_type(field.dtype.to_arrow().unwrap()); Ok(ExtensionArray::new(Arc::new(field), ext_array) .unwrap() .into_series()) } - DataType::Map(..) => { + DataType::Map { .. } => { let physical = map.next_value::()?; Ok(MapArray::new( Arc::new(field), diff --git a/src/daft-dsl/src/functions/map/get.rs b/src/daft-dsl/src/functions/map/get.rs index ab6eb148f8..bf5f9efdf0 100644 --- a/src/daft-dsl/src/functions/map/get.rs +++ b/src/daft-dsl/src/functions/map/get.rs @@ -13,18 +13,14 @@ impl FunctionEvaluator for GetEvaluator { fn to_field(&self, inputs: &[ExprRef], schema: &Schema, _: &FunctionExpr) -> DaftResult { match inputs { + // what is input and what is key + // input is a map field [input, key] => match (input.to_field(schema), key.to_field(schema)) { (Ok(input_field), Ok(_)) => match input_field.dtype { - DataType::Map(inner) => match inner.as_ref() { - DataType::Struct(fields) if fields.len() == 2 => { - let value_dtype = &fields[1].dtype; - Ok(Field::new("value", value_dtype.clone())) - } - _ => Err(DaftError::TypeError(format!( - "Expected input map to have struct values with 2 fields, got {}", - inner - ))), - }, + DataType::Map { value, .. } => { + // todo: perhaps better naming + Ok(Field::new("value", *value)) + } _ => Err(DaftError::TypeError(format!( "Expected input to be a map, got {}", input_field.dtype diff --git a/src/daft-functions/src/list/mod.rs b/src/daft-functions/src/list/mod.rs index 2ba3f197be..c0ad745b19 100644 --- a/src/daft-functions/src/list/mod.rs +++ b/src/daft-functions/src/list/mod.rs @@ -9,6 +9,7 @@ mod min; mod slice; mod sort; mod sum; +mod value_counts; pub use chunk::{list_chunk as chunk, ListChunk}; pub use count::{list_count as count, ListCount}; @@ -31,6 +32,10 @@ pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_function(wrap_pyfunction_bound!(count::py_list_count, parent)?)?; parent.add_function(wrap_pyfunction_bound!(get::py_list_get, parent)?)?; parent.add_function(wrap_pyfunction_bound!(join::py_list_join, parent)?)?; + parent.add_function(wrap_pyfunction_bound!( + value_counts::py_list_value_counts, + parent + )?)?; parent.add_function(wrap_pyfunction_bound!(max::py_list_max, parent)?)?; parent.add_function(wrap_pyfunction_bound!(min::py_list_min, parent)?)?; diff --git a/src/daft-functions/src/list/value_counts.rs b/src/daft-functions/src/list/value_counts.rs new file mode 100644 index 0000000000..d558db8ac4 --- /dev/null +++ b/src/daft-functions/src/list/value_counts.rs @@ -0,0 +1,72 @@ +use common_error::{DaftError, DaftResult}; +use daft_core::prelude::{DataType, Field, Schema, Series}; +#[cfg(feature = "python")] +use daft_dsl::python::PyExpr; +use daft_dsl::{ + functions::{ScalarFunction, ScalarUDF}, + ExprRef, +}; +#[cfg(feature = "python")] +use pyo3::{pyfunction, PyResult}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +struct ListValueCountsFunction; + +#[typetag::serde] +impl ScalarUDF for ListValueCountsFunction { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &'static str { + "list_value_counts" + } + + fn to_field(&self, inputs: &[ExprRef], schema: &Schema) -> DaftResult { + let [data] = inputs else { + return Err(DaftError::SchemaMismatch(format!( + "Expected 1 input arg, got {}", + inputs.len() + ))); + }; + + let data_field = data.to_field(schema)?; + + let DataType::List(inner_type) = &data_field.dtype else { + return Err(DaftError::TypeError(format!( + "Expected list, got {}", + data_field.dtype + ))); + }; + + let map_type = DataType::Map { + key: inner_type.clone(), + value: Box::new(DataType::UInt64), + }; + + Ok(Field::new(data_field.name, map_type)) + } + + fn evaluate(&self, inputs: &[Series]) -> DaftResult { + let [data] = inputs else { + return Err(DaftError::ValueError(format!( + "Expected 1 input arg, got {}", + inputs.len() + ))); + }; + + data.list_value_counts() + } +} + +pub fn list_value_counts(expr: ExprRef) -> ExprRef { + ScalarFunction::new(ListValueCountsFunction, vec![expr]).into() +} + +#[cfg(feature = "python")] +#[pyfunction] +#[pyo3(name = "list_value_counts")] +pub fn py_list_value_counts(expr: PyExpr) -> PyResult { + Ok(list_value_counts(expr.into()).into()) +} diff --git a/src/daft-schema/Cargo.toml b/src/daft-schema/Cargo.toml index fa189bff24..69f7a5fe1c 100644 --- a/src/daft-schema/Cargo.toml +++ b/src/daft-schema/Cargo.toml @@ -13,6 +13,8 @@ num-traits = {workspace = true} pyo3 = {workspace = true, optional = true} serde = {workspace = true, features = ["rc"]} serde_json = {workspace = true} +tracing = {workspace = true} +tracing-subscriber = "0.3.18" [features] python = [ diff --git a/src/daft-schema/src/dtype.rs b/src/daft-schema/src/dtype.rs index 65cf8f808e..af53aefcf3 100644 --- a/src/daft-schema/src/dtype.rs +++ b/src/daft-schema/src/dtype.rs @@ -7,6 +7,8 @@ use serde::{Deserialize, Serialize}; use crate::{field::Field, image_mode::ImageMode, time_unit::TimeUnit}; +pub type DaftDataType = DataType; + #[derive(Clone, Debug, Display, PartialEq, Eq, Serialize, Deserialize, Hash)] pub enum DataType { // ArrowTypes: @@ -107,8 +109,11 @@ pub enum DataType { Struct(Vec), /// A nested [`DataType`] that is represented as List>. - #[display("Map[{_0}]")] - Map(Box), + #[display("Map[{key}: {value}]")] + Map { + key: Box, + value: Box, + }, /// Extension type. #[display("{_1}")] @@ -233,14 +238,31 @@ impl DataType { Self::List(field) => Ok(ArrowType::LargeList(Box::new( arrow2::datatypes::Field::new("item", field.to_arrow()?, true), ))), - Self::Map(field) => Ok(ArrowType::Map( - Box::new(arrow2::datatypes::Field::new( - "item", - field.to_arrow()?, - true, - )), - false, - )), + Self::Map { key, value } => { + let struct_type = ArrowType::Struct(vec![ + // We never allow null keys in maps for several reasons: + // 1. Null typically represents the absence of a value, which doesn't make sense for a key. + // 2. Null comparisons can be problematic (similar to how f64::NAN != f64::NAN). + // 3. It maintains consistency with common map implementations in arrow (no null keys). + // 4. It simplifies map operations + // + // This decision aligns with the thoughts of team members like Jay and Sammy, who argue that: + // - Nulls in keys could lead to unintuitive behavior + // - If users need to count or group by null values, they can use other constructs like + // group_by operations on non-map types, which offer more explicit control. + // + // By disallowing null keys, we encourage more robust data modeling practices and + // provide a clearer semantic meaning for map types in our system. + arrow2::datatypes::Field::new("key", key.to_arrow()?, true), + arrow2::datatypes::Field::new("value", value.to_arrow()?, true), + ]); + + // entries + let struct_field = + arrow2::datatypes::Field::new("entries", struct_type.clone(), true); + + Ok(ArrowType::map(struct_field, false)) + } Self::Struct(fields) => Ok({ let fields = fields .iter() @@ -288,7 +310,10 @@ impl DataType { FixedSizeList(child_dtype, size) => { FixedSizeList(Box::new(child_dtype.to_physical()), *size) } - Map(child_dtype) => List(Box::new(child_dtype.to_physical())), + Map { key, value } => List(Box::new(Struct(vec![ + Field::new("key", key.to_physical()), + Field::new("value", value.to_physical()), + ]))), Embedding(dtype, size) => FixedSizeList(Box::new(dtype.to_physical()), *size), Image(mode) => Struct(vec![ Field::new( @@ -328,20 +353,6 @@ impl DataType { } } - #[inline] - pub fn nested_dtype(&self) -> Option<&Self> { - match self { - Self::Map(dtype) - | Self::List(dtype) - | Self::FixedSizeList(dtype, _) - | Self::FixedShapeTensor(dtype, _) - | Self::SparseTensor(dtype) - | Self::FixedShapeSparseTensor(dtype, _) - | Self::Tensor(dtype) => Some(dtype), - _ => None, - } - } - #[inline] pub fn is_arrow(&self) -> bool { self.to_arrow().is_ok() @@ -350,21 +361,21 @@ impl DataType { #[inline] pub fn is_numeric(&self) -> bool { match self { - Self::Int8 - | Self::Int16 - | Self::Int32 - | Self::Int64 - | Self::Int128 - | Self::UInt8 - | Self::UInt16 - | Self::UInt32 - | Self::UInt64 - // DataType::Float16 - | Self::Float32 - | Self::Float64 => true, - Self::Extension(_, inner, _) => inner.is_numeric(), - _ => false - } + Self::Int8 + | Self::Int16 + | Self::Int32 + | Self::Int64 + | Self::Int128 + | Self::UInt8 + | Self::UInt16 + | Self::UInt32 + | Self::UInt64 + // DataType::Float16 + | Self::Float32 + | Self::Float64 => true, + Self::Extension(_, inner, _) => inner.is_numeric(), + _ => false + } } #[inline] @@ -453,7 +464,7 @@ impl DataType { #[inline] pub fn is_map(&self) -> bool { - matches!(self, Self::Map(..)) + matches!(self, Self::Map { .. }) } #[inline] @@ -576,7 +587,7 @@ impl DataType { | Self::FixedShapeTensor(..) | Self::SparseTensor(..) | Self::FixedShapeSparseTensor(..) - | Self::Map(..) + | Self::Map { .. } ) } @@ -590,7 +601,7 @@ impl DataType { let p: Self = self.to_physical(); matches!( p, - Self::List(..) | Self::FixedSizeList(..) | Self::Struct(..) | Self::Map(..) + Self::List(..) | Self::FixedSizeList(..) | Self::Struct(..) | Self::Map { .. } ) } @@ -607,7 +618,7 @@ impl DataType { impl From<&ArrowType> for DataType { fn from(item: &ArrowType) -> Self { - match item { + let result = match item { ArrowType::Null => Self::Null, ArrowType::Boolean => Self::Boolean, ArrowType::Int8 => Self::Int8, @@ -638,7 +649,29 @@ impl From<&ArrowType> for DataType { ArrowType::FixedSizeList(field, size) => { Self::FixedSizeList(Box::new(field.as_ref().data_type().into()), *size) } - ArrowType::Map(field, ..) => Self::Map(Box::new(field.as_ref().data_type().into())), + ArrowType::Map(field, ..) => { + // todo: TryFrom in future? want in second pass maybe + + // field should be a struct + let ArrowType::Struct(fields) = &field.data_type else { + panic!("Map should have a struct as its key") + }; + + let [key, value] = fields.as_slice() else { + panic!("Map should have two fields") + }; + + let key = &key.data_type; + let value = &value.data_type; + + let key = Self::from(key); + let value = Self::from(value); + + let key = Box::new(key); + let value = Box::new(value); + + Self::Map { key, value } + } ArrowType::Struct(fields) => { let fields: Vec = fields.iter().map(|fld| fld.into()).collect(); Self::Struct(fields) @@ -659,7 +692,9 @@ impl From<&ArrowType> for DataType { } _ => panic!("DataType :{item:?} is not supported"), - } + }; + + result } } diff --git a/src/daft-schema/src/field.rs b/src/daft-schema/src/field.rs index 774545fee4..f4cd6ecb16 100644 --- a/src/daft-schema/src/field.rs +++ b/src/daft-schema/src/field.rs @@ -18,6 +18,7 @@ pub struct Field { } pub type FieldRef = Arc; +pub type DaftField = Field; #[derive(Clone, Display, Debug, PartialEq, Eq, Deserialize, Serialize, Hash)] #[display("{id}")] @@ -87,6 +88,14 @@ impl Field { ) } + pub fn to_physical(&self) -> Self { + Self { + name: self.name.clone(), + dtype: self.dtype.to_physical(), + metadata: self.metadata.clone(), + } + } + pub fn rename>(&self, name: S) -> Self { Self { name: name.into(), diff --git a/src/daft-schema/src/python/datatype.rs b/src/daft-schema/src/python/datatype.rs index ceff5e18f3..edacbfbdad 100644 --- a/src/daft-schema/src/python/datatype.rs +++ b/src/daft-schema/src/python/datatype.rs @@ -209,10 +209,10 @@ impl PyDataType { #[staticmethod] pub fn map(key_type: Self, value_type: Self) -> PyResult { - Ok(DataType::Map(Box::new(DataType::Struct(vec![ - Field::new("key", key_type.dtype), - Field::new("value", value_type.dtype), - ]))) + Ok(DataType::Map { + key: Box::new(key_type.dtype), + value: Box::new(value_type.dtype), + } .into()) } diff --git a/src/daft-sql/src/planner.rs b/src/daft-sql/src/planner.rs index bac774a4fc..bf55182cc5 100644 --- a/src/daft-sql/src/planner.rs +++ b/src/daft-sql/src/planner.rs @@ -937,7 +937,7 @@ impl SQLPlanner { invalid_operation_err!("Index must be a string literal") } } - DataType::Map(_) => Ok(daft_dsl::functions::map::get(expr, index)), + DataType::Map { .. } => Ok(daft_dsl::functions::map::get(expr, index)), dtype => { invalid_operation_err!("nested access on column with type: {}", dtype) } diff --git a/src/daft-stats/src/column_stats/mod.rs b/src/daft-stats/src/column_stats/mod.rs index df96daa373..d72ba7cb9c 100644 --- a/src/daft-stats/src/column_stats/mod.rs +++ b/src/daft-stats/src/column_stats/mod.rs @@ -71,7 +71,7 @@ impl ColumnRangeStatistics { // UNSUPPORTED TYPES: // Types that don't support comparisons and can't be used as ColumnRangeStatistics - DataType::List(..) | DataType::FixedSizeList(..) | DataType::Image(..) | DataType::FixedShapeImage(..) | DataType::Tensor(..) | DataType::SparseTensor(..) | DataType::FixedShapeSparseTensor(..) | DataType::FixedShapeTensor(..) | DataType::Struct(..) | DataType::Map(..) | DataType::Extension(..) | DataType::Embedding(..) | DataType::Unknown => false, + DataType::List(..) | DataType::FixedSizeList(..) | DataType::Image(..) | DataType::FixedShapeImage(..) | DataType::Tensor(..) | DataType::SparseTensor(..) | DataType::FixedShapeSparseTensor(..) | DataType::FixedShapeTensor(..) | DataType::Struct(..) | DataType::Map { .. } | DataType::Extension(..) | DataType::Embedding(..) | DataType::Unknown => false, #[cfg(feature = "python")] DataType::Python => false, } diff --git a/src/daft-table/src/ffi.rs b/src/daft-table/src/ffi.rs index 37a118c50e..424ea516f2 100644 --- a/src/daft-table/src/ffi.rs +++ b/src/daft-table/src/ffi.rs @@ -42,9 +42,9 @@ pub fn record_batches_to_table( let columns = cols .into_iter() .enumerate() - .map(|(i, c)| { - let c = cast_array_for_daft_if_needed(c); - Series::try_from((names.get(i).unwrap().as_str(), c)) + .map(|(i, array)| { + let cast_array = cast_array_for_daft_if_needed(array); + Series::try_from((names.get(i).unwrap().as_str(), cast_array)) }) .collect::>>()?; tables.push(Table::new_with_size(schema.clone(), columns, num_rows)?) diff --git a/src/daft-table/src/repr_html.rs b/src/daft-table/src/repr_html.rs index 79ecaf063a..0e46bb80b2 100644 --- a/src/daft-table/src/repr_html.rs +++ b/src/daft-table/src/repr_html.rs @@ -102,7 +102,7 @@ pub fn html_value(s: &Series, idx: usize) -> String { let arr = s.struct_().unwrap(); arr.html_value(idx) } - DataType::Map(_) => { + DataType::Map { .. } => { let arr = s.map().unwrap(); arr.html_value(idx) } diff --git a/src/lib.rs b/src/lib.rs index a7a1382538..51560322fa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,7 @@ pub mod pylib { #[pymodule] fn daft(py: Python, m: &Bound) -> PyResult<()> { refresh_logger(py)?; + init_tracing(crate::should_enable_chrome_trace()); common_daft_config::register_modules(m)?; diff --git a/tests/expressions/test_expressions.py b/tests/expressions/test_expressions.py index d3727c2ac3..fceb121e3e 100644 --- a/tests/expressions/test_expressions.py +++ b/tests/expressions/test_expressions.py @@ -508,3 +508,30 @@ def test_repr_series_lit() -> None: s = lit(Series.from_pylist([1, 2, 3])) output = repr(s) assert output == "lit([1, 2, 3])" + + +def test_list_value_counts(): + # Create a MicroPartition with a list column + mp = MicroPartition.from_pydict( + {"list_col": [["a", "b", "a", "c"], ["b", "b", "c"], ["a", "a", "a"], [], ["d", None, "d"]]} + ) + + # # Apply list_value_counts operation + result = mp.eval_expression_list([col("list_col").list.value_counts().alias("value_counts")]) + value_counts = result.to_pydict()["value_counts"] + + # Expected output + expected = [[("a", 2), ("b", 1), ("c", 1)], [("b", 2), ("c", 1)], [("a", 3)], [], [("d", 2)]] + + # Check the result + assert value_counts == expected + + # Test with empty input (no proper type -> should raise error) + empty_mp = MicroPartition.from_pydict({"list_col": []}) + with pytest.raises(ValueError): + empty_mp.eval_expression_list([col("list_col").list.value_counts().alias("value_counts")]) + + # Test with empty input (no proper type -> should raise error) + none_mp = MicroPartition.from_pydict({"list_col": [None, None, None]}) + with pytest.raises(ValueError): + none_mp.eval_expression_list([col("list_col").list.value_counts().alias("value_counts")]) diff --git a/tests/io/test_parquet_roundtrip.py b/tests/io/test_parquet_roundtrip.py index 6904805831..99ea54b64c 100644 --- a/tests/io/test_parquet_roundtrip.py +++ b/tests/io/test_parquet_roundtrip.py @@ -112,15 +112,39 @@ def test_roundtrip_temporal_arrow_types(tmp_path, data, pa_type, expected_dtype) def test_roundtrip_tensor_types(tmp_path): - expected_dtype = DataType.tensor(DataType.int64()) - data = [np.array([[1, 2], [3, 4]]), None, None] - before = daft.from_pydict({"foo": Series.from_pylist(data)}) - before = before.concat(before) - before.write_parquet(str(tmp_path)) - after = daft.read_parquet(str(tmp_path)) - assert before.schema()["foo"].dtype == expected_dtype - assert after.schema()["foo"].dtype == expected_dtype - assert before.to_arrow() == after.to_arrow() + # Define the expected data type for the tensor column + expected_tensor_dtype = DataType.tensor(DataType.int64()) + + # Create sample tensor data with some null values + tensor_data = [np.array([[1, 2], [3, 4]]), None, None] + + # Create a Daft DataFrame with the tensor data + df_original = daft.from_pydict({"tensor_col": Series.from_pylist(tensor_data)}) + + # Double the size of the DataFrame to ensure we test with more data + df_original = df_original.concat(df_original) + + assert df_original.schema()["tensor_col"].dtype == expected_tensor_dtype + + # Write the DataFrame to a Parquet file + df_original.write_parquet(str(tmp_path)) + + # Read the Parquet file back into a new DataFrame + df_roundtrip = daft.read_parquet(str(tmp_path)) + + # Print the schema of the original DataFrame + print("Original DataFrame Schema:") + print(df_original.schema()) + + # Print the schema of the DataFrame after roundtrip + print("\nRoundtrip DataFrame Schema:") + print(df_roundtrip.schema()) + + # Verify that the data type is preserved after the roundtrip + assert df_roundtrip.schema()["tensor_col"].dtype == expected_tensor_dtype + + # Ensure the data content is identical after the roundtrip + assert df_original.to_arrow() == df_roundtrip.to_arrow() @pytest.mark.parametrize("fixed_shape", [True, False])