Skip to content

Commit

Permalink
[FEAT] Implement hashing and groupby on lists (#2464)
Browse files Browse the repository at this point in the history
Implements hashing and groupby on lists and fixed-size lists. Works by
recursively hashing the inner items and then hashing the hashes.

Closes #1983

Example:
```
>>> df = daft.from_pydict({"a": [[1, 2], [1, 2], [1, 3, 4], [1, 2], [1, 3, 4]], "b": [1, 2, 3, 4, 5]})
>>> df.groupby("a").agg_list("b").show()
╭─────────────┬─────────────╮
│ a           ┆ b           │
│ ---         ┆ ---         │
│ List[Int64] ┆ List[Int64] │
╞═════════════╪═════════════╡
│ [1, 3, 4]   ┆ [3, 5]      │
├╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ [1, 2]      ┆ [1, 2, 4]   │
╰─────────────┴─────────────╯
```
  • Loading branch information
Vince7778 authored Jul 9, 2024
1 parent 510786b commit c3d43cf
Show file tree
Hide file tree
Showing 7 changed files with 397 additions and 7 deletions.
14 changes: 13 additions & 1 deletion src/daft-core/src/array/ops/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow2::array::Array;
use fnv::FnvHashMap;

use crate::{
array::DataArray,
array::{DataArray, FixedSizeListArray, ListArray},
datatypes::{
BinaryArray, BooleanArray, DaftIntegerType, DaftNumericType, FixedSizeBinaryArray,
Float32Array, Float64Array, NullArray, Utf8Array,
Expand Down Expand Up @@ -170,3 +170,15 @@ impl IntoGroups for NullArray {
Ok((vec![0], vec![v]))
}
}

impl IntoGroups for ListArray {
fn make_groups(&self) -> DaftResult<super::GroupIndicesPair> {
self.hash(None)?.make_groups()
}
}

impl IntoGroups for FixedSizeListArray {
fn make_groups(&self) -> DaftResult<super::GroupIndicesPair> {
self.hash(None)?.make_groups()
}
}
100 changes: 99 additions & 1 deletion src/daft-core/src/array/ops/hash.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use crate::{
array::DataArray,
array::{DataArray, FixedSizeListArray, ListArray},
datatypes::{
logical::{DateArray, Decimal128Array, TimeArray, TimestampArray},
BinaryArray, BooleanArray, DaftNumericType, FixedSizeBinaryArray, Int16Array, Int32Array,
Int64Array, Int8Array, NullArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
Utf8Array,
},
kernels,
utils::arrow::arrow_bitmap_and_helper,
Series,
};

use arrow2::types::Index;
use common_error::DaftResult;
use xxhash_rust::xxh3::{xxh3_64, xxh3_64_with_seed};

use super::as_arrow::AsArrow;

Expand Down Expand Up @@ -88,6 +92,100 @@ impl NullArray {
}
}

fn hash_list(
name: &str,
offsets: &[i64],
flat_child: &Series,
validity: Option<&arrow2::bitmap::Bitmap>,
seed: Option<&UInt64Array>,
) -> DaftResult<UInt64Array> {
// first we hash the flat child
// turning [[stuff], [stuff, stuff], ...] into [[hash], [hash, hash], ...]
// then we hash each sublist as bytes, giving us [hash, hash, ...] as desired
// if seed is provided, the sublists are hashed with the seed broadcasted

if let Some(seed_arr) = seed {
let combined_validity = arrow_bitmap_and_helper(validity, seed.unwrap().validity());
UInt64Array::from_iter(
name,
u64::range(0, offsets.len() - 1).unwrap().map(|i| {
let start = offsets[i as usize] as usize;
let end = offsets[i as usize + 1] as usize;
// apply the current seed across this row
let cur_seed_opt = seed_arr.get(i as usize);
let flat_seed = UInt64Array::from_iter(
"seed",
std::iter::repeat(cur_seed_opt).take(end - start),
);
let hashed_child = flat_child
.slice(start, end)
.ok()?
.hash(Some(&flat_seed))
.ok()?;
let child_bytes: Vec<u8> = hashed_child
.as_arrow()
.values_iter()
.flat_map(|v| v.to_le_bytes())
.collect();
if let Some(cur_seed) = cur_seed_opt {
Some(xxh3_64_with_seed(&child_bytes, cur_seed))
} else {
Some(xxh3_64(&child_bytes))
}
}),
)
.with_validity(combined_validity)
} else {
// since we don't have a seed we can hash entire flat child at once
let hashed_child = flat_child.hash(None)?;
// hashing collects the array anyways so this collect doesn't matter
let child_bytes: Vec<u8> = hashed_child
.as_arrow()
.values_iter()
.flat_map(|v| v.to_le_bytes())
.collect();
const OFFSET: usize = (u64::BITS as usize) / 8; // how many bytes per u64
let combined_validity = validity.cloned();
UInt64Array::from_iter(
name,
u64::range(0, offsets.len() - 1).unwrap().map(|i| {
let start = (offsets[i as usize] as usize) * OFFSET;
let end = (offsets[i as usize + 1] as usize) * OFFSET;
Some(xxh3_64(&child_bytes[start..end]))
}),
)
.with_validity(combined_validity)
}
}

impl ListArray {
pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult<UInt64Array> {
hash_list(
self.name(),
self.offsets(),
&self.flat_child,
self.validity(),
seed,
)
}
}

impl FixedSizeListArray {
pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult<UInt64Array> {
let size = self.fixed_element_len();
let len = self.flat_child.len() as i64;
// see comment on hash_list for why we are collecting
let offsets: Vec<i64> = (0..=len).step_by(size).collect();
hash_list(
self.name(),
&offsets,
&self.flat_child,
self.validity(),
seed,
)
}
}

macro_rules! impl_int_murmur3_32 {
($ArrayT:ty) => {
impl $ArrayT {
Expand Down
33 changes: 33 additions & 0 deletions src/daft-core/src/datatypes/matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,39 @@ macro_rules! with_match_comparable_daft_types {(
}
})}

#[macro_export]
macro_rules! with_match_hashable_daft_types {(
$key_type:expr, | $_:tt $T:ident | $($body:tt)*
) => ({
macro_rules! __with_ty__ {( $_ $T:ident ) => ( $($body)* )}
use $crate::datatypes::DataType::*;
#[allow(unused_imports)]
use $crate::datatypes::*;

match $key_type {
Null => __with_ty__! { NullType },
Boolean => __with_ty__! { BooleanType },
Int8 => __with_ty__! { Int8Type },
Int16 => __with_ty__! { Int16Type },
Int32 => __with_ty__! { Int32Type },
Int64 => __with_ty__! { Int64Type },
Int128 => __with_ty__! { Int128Type },
UInt8 => __with_ty__! { UInt8Type },
UInt16 => __with_ty__! { UInt16Type },
UInt32 => __with_ty__! { UInt32Type },
UInt64 => __with_ty__! { UInt64Type },
// Float16 => __with_ty__! { Float16Type },
Float32 => __with_ty__! { Float32Type },
Float64 => __with_ty__! { Float64Type },
Utf8 => __with_ty__! { Utf8Type },
Binary => __with_ty__! { BinaryType },
FixedSizeBinary(_) => __with_ty__! { FixedSizeBinaryType },
List(_) => __with_ty__! { ListType },
FixedSizeList(_, _) => __with_ty__! { FixedSizeListType },
_ => panic!("{:?} not implemented", $key_type)
}
})}

#[macro_export]
macro_rules! with_match_numeric_daft_types {(
$key_type:expr, | $_:tt $T:ident | $($body:tt)*
Expand Down
4 changes: 2 additions & 2 deletions src/daft-core/src/series/ops/groups.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::{
array::ops::{GroupIndicesPair, IntoGroups},
series::Series,
with_match_comparable_daft_types,
with_match_hashable_daft_types,
};
use common_error::DaftResult;

impl IntoGroups for Series {
fn make_groups(&self) -> DaftResult<GroupIndicesPair> {
let s = self.as_physical()?;
with_match_comparable_daft_types!(s.data_type(), |$T| {
with_match_hashable_daft_types!(s.data_type(), |$T| {
let array = s.downcast::<<$T as DaftDataType>::ArrayType>()?;
array.make_groups()
})
Expand Down
4 changes: 2 additions & 2 deletions src/daft-core/src/series/ops/hash.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::{
datatypes::{Int32Array, UInt64Array},
series::Series,
with_match_comparable_daft_types,
with_match_hashable_daft_types,
};
use common_error::DaftResult;

impl Series {
pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult<UInt64Array> {
let s = self.as_physical()?;
with_match_comparable_daft_types!(s.data_type(), |$T| {
with_match_hashable_daft_types!(s.data_type(), |$T| {
let downcasted = s.downcast::<<$T as DaftDataType>::ArrayType>()?;
downcasted.hash(seed)
})
Expand Down
Loading

0 comments on commit c3d43cf

Please sign in to comment.