Skip to content

Commit

Permalink
combine
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Oct 1, 2024
1 parent 46c5a7e commit 4a6850a
Show file tree
Hide file tree
Showing 44 changed files with 904 additions and 224 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,7 @@ def dt_truncate(expr: PyExpr, interval: str, relative_to: PyExpr) -> PyExpr: ...
# ---
def explode(expr: PyExpr) -> PyExpr: ...
def list_sort(expr: PyExpr, desc: PyExpr) -> PyExpr: ...
def list_value_counts(expr: PyExpr) -> PyExpr: ...
def list_join(expr: PyExpr, delimiter: PyExpr) -> PyExpr: ...
def list_count(expr: PyExpr, mode: CountMode) -> PyExpr: ...
def list_get(expr: PyExpr, idx: PyExpr, default: PyExpr) -> PyExpr: ...
Expand Down
61 changes: 46 additions & 15 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2922,6 +2922,37 @@ def join(self, delimiter: str | Expression) -> Expression:
delimiter_expr = Expression._to_expression(delimiter)
return Expression._from_pyexpr(native.list_join(self._expr, delimiter_expr._expr))

# todo: do we want type to be a Map expression? how should we do this?
def value_counts(self) -> Expression:
"""Counts the occurrences of each unique value in the list.
Returns:
Expression: A Map<X, UInt64> expression where the keys are unique elements from the
original list of type X, and the values are UInt64 counts representing
the number of times each element appears in the list.
Example:
>>> import daft
>>> df = daft.from_pydict({"letters": [["a", "b", "a"], ["b", "c", "b", "c"]]})
>>> df.with_column("value_counts", df["letters"].list.value_counts()).collect()
╭──────────────┬───────────────────╮
│ letters ┆ value_counts │
│ --- ┆ --- │
│ List[Utf8] ┆ Map[Utf8: UInt64] │
╞══════════════╪═══════════════════╡
│ [a, b, a] ┆ [{key: a, │
│ ┆ value: 2, │
│ ┆ }, {key: … │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ [b, c, b, c] ┆ [{key: b, │
│ ┆ value: 2, │
│ ┆ }, {key: … │
╰──────────────┴───────────────────╯
<BLANKLINE>
(Showing first 2 of 2 rows)
"""
return Expression._from_pyexpr(native.list_value_counts(self._expr))

def count(self, mode: CountMode = CountMode.Valid) -> Expression:
"""Counts the number of elements in each list
Expand Down Expand Up @@ -3069,21 +3100,21 @@ def get(self, key: Expression) -> Expression:
>>> df = daft.from_arrow(pa.table({"map_col": pa_array}))
>>> df = df.with_column("a", df["map_col"].map.get("a"))
>>> df.show()
╭──────────────────────────────────────┬───────╮
│ map_col ┆ a │
│ --- ┆ --- │
│ Map[Struct[key: Utf8, value: Int64]] ┆ Int64 │
╞══════════════════════════════════════╪═══════╡
│ [{key: a, ┆ 1 │
│ value: 1, ┆ │
│ }] ┆ │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ [] ┆ None │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ [{key: b, ┆ None │
│ value: 2, ┆ │
│ }] ┆ │
╰──────────────────────────────────────┴───────╯
╭──────────────────┬───────╮
│ map_col ┆ a │
│ --- ┆ --- │
│ Map[Utf8: Int64] ┆ Int64 │
╞══════════════════╪═══════╡
│ [{key: a, ┆ 1 │
│ value: 1, ┆ │
│ }] ┆ │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ [] ┆ None │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ [{key: b, ┆ None │
│ value: 2, ┆ │
│ }] ┆ │
╰──────────────────┴───────╯
<BLANKLINE>
(Showing first 3 of 3 rows)
Expand Down
1 change: 1 addition & 0 deletions src/arrow2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ streaming-iterator = {version = "0.1", optional = true}
# for division/remainder optimization at runtime
strength_reduce = {version = "0.2", optional = true}
thiserror = {workspace = true}
tracing = "0.1.40"
zstd = {version = "0.12", optional = true}

# parquet support
Expand Down
10 changes: 8 additions & 2 deletions src/arrow2/src/array/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,18 @@ impl<O: Offset> ListArray<O> {
if O::IS_LARGE {
match data_type.to_logical_type() {
DataType::LargeList(child) => Ok(child.as_ref()),
_ => Err(Error::oos("ListArray<i64> expects DataType::LargeList")),
got => {
let msg = format!("ListArray<i64> expects DataType::LargeList, but got {got:?}");
Err(Error::oos(msg))
},
}
} else {
match data_type.to_logical_type() {
DataType::List(child) => Ok(child.as_ref()),
_ => Err(Error::oos("ListArray<i32> expects DataType::List")),
got => {
let msg = format!("ListArray<i32> expects DataType::List, but got {got:?}");
Err(Error::oos(msg))
},
}
}
}
Expand Down
68 changes: 56 additions & 12 deletions src/arrow2/src/array/map/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use super::{new_empty_array, specification::try_check_offsets_bounds, Array, ListArray};
use crate::{
bitmap::Bitmap,
datatypes::{DataType, Field},
error::Error,
offset::OffsetsBuffer,
};

use super::{new_empty_array, specification::try_check_offsets_bounds, Array};

mod ffi;
pub(super) mod fmt;
mod iterator;
#[allow(unused)]
pub use iterator::*;


/// An array representing a (key, value), both of arbitrary logical types.
#[derive(Clone)]
pub struct MapArray {
Expand Down Expand Up @@ -41,20 +41,27 @@ impl MapArray {
try_check_offsets_bounds(&offsets, field.len())?;

let inner_field = Self::try_get_field(&data_type)?;
if let DataType::Struct(inner) = inner_field.data_type() {
if inner.len() != 2 {
return Err(Error::InvalidArgumentError(
"MapArray's inner `Struct` must have 2 fields (keys and maps)".to_string(),
));
}
} else {

let inner_data_type = inner_field.data_type();
let DataType::Struct(inner) = inner_data_type else {
return Err(Error::InvalidArgumentError(
"MapArray expects `DataType::Struct` as its inner logical type".to_string(),
format!("MapArray expects `DataType::Struct` as its inner logical type, but found {inner_data_type:?}"),
));
};

let inner_len = inner.len();
if inner_len != 2 {
let msg = format!(
"MapArray's inner `Struct` must have 2 fields (keys and maps), but found {} fields",
inner_len
);
return Err(Error::InvalidArgumentError(msg));
}
if field.data_type() != inner_field.data_type() {

let field_data_type = field.data_type();
if field_data_type != inner_field.data_type() {
return Err(Error::InvalidArgumentError(
"MapArray expects `field.data_type` to match its inner DataType".to_string(),
format!("MapArray expects `field.data_type` to match its inner DataType, but found \n{field_data_type:?}\nvs\n\n\n{inner_field:?}"),
));
}

Expand Down Expand Up @@ -195,6 +202,43 @@ impl MapArray {
impl Array for MapArray {
impl_common_array!();

fn convert_logical_type(&self, target: DataType) -> Box<dyn Array> {
tracing::trace!("converting logical type to\n{target:#?}");
let outer_is_map = matches!(target, DataType::Map { .. });

if outer_is_map {
// we can do simple conversion
let mut new = self.to_boxed();
new.change_type(target);
return new;
}

let DataType::LargeList(target_inner) = &target else {
panic!("MapArray can only be converted to Map or LargeList");
};

let DataType::Map(current_inner, _) = self.data_type() else {
unreachable!("Somehow DataType is not Map for a MapArray");
};

let current_inner_physical = current_inner.data_type.to_physical_type();
let target_inner_physical = target_inner.data_type.to_physical_type();

if current_inner_physical != target_inner_physical {
panic!("inner types are not equal");
}

let mut field = self.field.clone();
field.change_type(target_inner.data_type.clone());

let offsets = self.offsets().clone();
let offsets = offsets.map(|offset| offset as i64);

let list = ListArray::new(target, offsets, field, self.validity.clone());

Box::new(list)
}

fn validity(&self) -> Option<&Bitmap> {
self.validity.as_ref()
}
Expand Down
98 changes: 94 additions & 4 deletions src/arrow2/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static {
/// When the validity is [`None`], all slots are valid.
fn validity(&self) -> Option<&Bitmap>;

fn direct_children<'a>(&'a mut self) -> Box<dyn Iterator<Item=&'a mut dyn Array> + 'a> {
Box::new(core::iter::empty())
}

/// The number of null slots on this [`Array`].
/// # Implementation
/// This is `O(1)` since the number of null elements is pre-computed.
Expand Down Expand Up @@ -183,7 +187,7 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static {
/// ```
/// # Panics
/// Panics iff the `data_type`'s [`PhysicalType`] is not equal to array's `PhysicalType`.
fn to_type(&self, data_type: DataType) -> Box<dyn Array> {
fn convert_logical_type(&self, data_type: DataType) -> Box<dyn Array> {
let mut new = self.to_boxed();
new.change_type(data_type);
new
Expand Down Expand Up @@ -634,15 +638,26 @@ macro_rules! impl_common_array {
fn change_type(&mut self, data_type: DataType) {
if data_type.to_physical_type() != self.data_type().to_physical_type() {
panic!(
"Converting array with logical type {:?} to logical type {:?} failed, physical types do not match: {:?} -> {:?}",
"Converting array with logical type\n{:#?}\n\nto logical type\n{:#?}\nfailed, physical types do not match: {:?} -> {:?}",
self.data_type(),
data_type,
self.data_type().to_physical_type(),
data_type.to_physical_type(),
);
}
self.data_type = data_type;

self.data_type = data_type.clone();

let mut children = self.direct_children();

data_type.direct_children(|child| {
let Some(child_elem) = children.next() else {
return;
};
child_elem.change_type(child.clone());
})
}

};
}

Expand Down Expand Up @@ -737,7 +752,7 @@ pub(crate) use self::ffi::ToFfi;
/// This is similar to [`Extend`], but accepted the creation to error.
pub trait TryExtend<A> {
/// Fallible version of [`Extend::extend`].
fn try_extend<I: IntoIterator<Item = A>>(&mut self, iter: I) -> Result<()>;
fn try_extend<I: IntoIterator<Item=A>>(&mut self, iter: I) -> Result<()>;
}

/// A trait describing the ability of a struct to receive new items.
Expand Down Expand Up @@ -774,3 +789,78 @@ pub unsafe trait GenericBinaryArray<O: crate::offset::Offset>: Array {
/// The offsets of the array
fn offsets(&self) -> &[O];
}


#[cfg(test)]
mod tests {
use super::*;
use crate::datatypes::{DataType, Field, TimeUnit, IntervalUnit};
use crate::array::{Int32Array, Int64Array, Float32Array, Utf8Array, BooleanArray, ListArray, StructArray, UnionArray, MapArray};

#[test]
fn test_int32_to_date32() {
let array = Int32Array::from_slice([1, 2, 3]);
let result = array.convert_logical_type(DataType::Date32);
assert_eq!(result.data_type(), &DataType::Date32);
}

#[test]
fn test_int64_to_timestamp() {
let array = Int64Array::from_slice([1000, 2000, 3000]);
let result = array.convert_logical_type(DataType::Timestamp(TimeUnit::Millisecond, None));
assert_eq!(result.data_type(), &DataType::Timestamp(TimeUnit::Millisecond, None));
}

#[test]
fn test_boolean_to_boolean() {
let array = BooleanArray::from_slice([true, false, true]);
let result = array.convert_logical_type(DataType::Boolean);
assert_eq!(result.data_type(), &DataType::Boolean);
}

#[test]
fn test_list_to_list() {
let values = Int32Array::from_slice([1, 2, 3, 4, 5]);
let offsets = vec![0, 2, 5];
let list_array = ListArray::try_new(
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
offsets.try_into().unwrap(),
Box::new(values),
None,
).unwrap();
let result = list_array.convert_logical_type(DataType::List(Box::new(Field::new("item", DataType::Int32, true))));
assert_eq!(result.data_type(), &DataType::List(Box::new(Field::new("item", DataType::Int32, true))));
}

#[test]
fn test_struct_to_struct() {
let boolean = BooleanArray::from_slice([true, false, true]);
let int = Int32Array::from_slice([1, 2, 3]);
let struct_array = StructArray::try_new(
DataType::Struct(vec![
Field::new("b", DataType::Boolean, true),
Field::new("i", DataType::Int32, true),
]),
vec![
Box::new(boolean) as Box<dyn Array>,
Box::new(int) as Box<dyn Array>,
],
None,
).unwrap();
let result = struct_array.convert_logical_type(DataType::Struct(vec![
Field::new("b", DataType::Boolean, true),
Field::new("i", DataType::Int32, true),
]));
assert_eq!(result.data_type(), &DataType::Struct(vec![
Field::new("b", DataType::Boolean, true),
Field::new("i", DataType::Int32, true),
]));
}

#[test]
#[should_panic]
fn test_invalid_conversion() {
let array = Int32Array::from_slice([1, 2, 3]);
array.convert_logical_type(DataType::Utf8);
}
}
9 changes: 9 additions & 0 deletions src/arrow2/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::ops::DerefMut;
use crate::{
bitmap::Bitmap,
datatypes::{DataType, Field},
Expand Down Expand Up @@ -246,6 +247,14 @@ impl StructArray {
impl Array for StructArray {
impl_common_array!();

fn direct_children<'a>(&'a mut self) -> Box<dyn Iterator<Item=&'a mut dyn Array> + 'a> {
let iter = self.values
.iter_mut()
.map(|x| x.deref_mut());

Box::new(iter)
}

fn validity(&self) -> Option<&Bitmap> {
self.validity.as_ref()
}
Expand Down
Loading

0 comments on commit 4a6850a

Please sign in to comment.