From c3d43cfecde0b381a44f95d28802ee717013df0f Mon Sep 17 00:00:00 2001 From: Conor Kennedy <32619800+Vince7778@users.noreply.github.com> Date: Mon, 8 Jul 2024 17:36:34 -0700 Subject: [PATCH] [FEAT] Implement hashing and groupby on lists (#2464) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements hashing and groupby on lists and fixed-size lists. Works by recursively hashing the inner items and then hashing the hashes. Closes #1983 Example: ``` >>> df = daft.from_pydict({"a": [[1, 2], [1, 2], [1, 3, 4], [1, 2], [1, 3, 4]], "b": [1, 2, 3, 4, 5]}) >>> df.groupby("a").agg_list("b").show() ╭─────────────┬─────────────╮ │ a ┆ b │ │ --- ┆ --- │ │ List[Int64] ┆ List[Int64] │ ╞═════════════╪═════════════╡ │ [1, 3, 4] ┆ [3, 5] │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ [1, 2] ┆ [1, 2, 4] │ ╰─────────────┴─────────────╯ ``` --- src/daft-core/src/array/ops/groups.rs | 14 +- src/daft-core/src/array/ops/hash.rs | 100 ++++++++++- src/daft-core/src/datatypes/matching.rs | 33 ++++ src/daft-core/src/series/ops/groups.rs | 4 +- src/daft-core/src/series/ops/hash.rs | 4 +- tests/series/test_hash.py | 211 ++++++++++++++++++++++++ tests/table/test_table_aggs.py | 38 ++++- 7 files changed, 397 insertions(+), 7 deletions(-) diff --git a/src/daft-core/src/array/ops/groups.rs b/src/daft-core/src/array/ops/groups.rs index dfa7adf450..07de9d644c 100644 --- a/src/daft-core/src/array/ops/groups.rs +++ b/src/daft-core/src/array/ops/groups.rs @@ -4,7 +4,7 @@ use arrow2::array::Array; use fnv::FnvHashMap; use crate::{ - array::DataArray, + array::{DataArray, FixedSizeListArray, ListArray}, datatypes::{ BinaryArray, BooleanArray, DaftIntegerType, DaftNumericType, FixedSizeBinaryArray, Float32Array, Float64Array, NullArray, Utf8Array, @@ -170,3 +170,15 @@ impl IntoGroups for NullArray { Ok((vec![0], vec![v])) } } + +impl IntoGroups for ListArray { + fn make_groups(&self) -> DaftResult { + self.hash(None)?.make_groups() + } +} + +impl IntoGroups for FixedSizeListArray { + fn make_groups(&self) -> DaftResult { + self.hash(None)?.make_groups() + } +} diff --git a/src/daft-core/src/array/ops/hash.rs b/src/daft-core/src/array/ops/hash.rs index 9a9833f143..df6c4c80ee 100644 --- a/src/daft-core/src/array/ops/hash.rs +++ b/src/daft-core/src/array/ops/hash.rs @@ -1,5 +1,5 @@ use crate::{ - array::DataArray, + array::{DataArray, FixedSizeListArray, ListArray}, datatypes::{ logical::{DateArray, Decimal128Array, TimeArray, TimestampArray}, BinaryArray, BooleanArray, DaftNumericType, FixedSizeBinaryArray, Int16Array, Int32Array, @@ -7,9 +7,13 @@ use crate::{ Utf8Array, }, kernels, + utils::arrow::arrow_bitmap_and_helper, + Series, }; +use arrow2::types::Index; use common_error::DaftResult; +use xxhash_rust::xxh3::{xxh3_64, xxh3_64_with_seed}; use super::as_arrow::AsArrow; @@ -88,6 +92,100 @@ impl NullArray { } } +fn hash_list( + name: &str, + offsets: &[i64], + flat_child: &Series, + validity: Option<&arrow2::bitmap::Bitmap>, + seed: Option<&UInt64Array>, +) -> DaftResult { + // first we hash the flat child + // turning [[stuff], [stuff, stuff], ...] into [[hash], [hash, hash], ...] + // then we hash each sublist as bytes, giving us [hash, hash, ...] as desired + // if seed is provided, the sublists are hashed with the seed broadcasted + + if let Some(seed_arr) = seed { + let combined_validity = arrow_bitmap_and_helper(validity, seed.unwrap().validity()); + UInt64Array::from_iter( + name, + u64::range(0, offsets.len() - 1).unwrap().map(|i| { + let start = offsets[i as usize] as usize; + let end = offsets[i as usize + 1] as usize; + // apply the current seed across this row + let cur_seed_opt = seed_arr.get(i as usize); + let flat_seed = UInt64Array::from_iter( + "seed", + std::iter::repeat(cur_seed_opt).take(end - start), + ); + let hashed_child = flat_child + .slice(start, end) + .ok()? + .hash(Some(&flat_seed)) + .ok()?; + let child_bytes: Vec = hashed_child + .as_arrow() + .values_iter() + .flat_map(|v| v.to_le_bytes()) + .collect(); + if let Some(cur_seed) = cur_seed_opt { + Some(xxh3_64_with_seed(&child_bytes, cur_seed)) + } else { + Some(xxh3_64(&child_bytes)) + } + }), + ) + .with_validity(combined_validity) + } else { + // since we don't have a seed we can hash entire flat child at once + let hashed_child = flat_child.hash(None)?; + // hashing collects the array anyways so this collect doesn't matter + let child_bytes: Vec = hashed_child + .as_arrow() + .values_iter() + .flat_map(|v| v.to_le_bytes()) + .collect(); + const OFFSET: usize = (u64::BITS as usize) / 8; // how many bytes per u64 + let combined_validity = validity.cloned(); + UInt64Array::from_iter( + name, + u64::range(0, offsets.len() - 1).unwrap().map(|i| { + let start = (offsets[i as usize] as usize) * OFFSET; + let end = (offsets[i as usize + 1] as usize) * OFFSET; + Some(xxh3_64(&child_bytes[start..end])) + }), + ) + .with_validity(combined_validity) + } +} + +impl ListArray { + pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult { + hash_list( + self.name(), + self.offsets(), + &self.flat_child, + self.validity(), + seed, + ) + } +} + +impl FixedSizeListArray { + pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult { + let size = self.fixed_element_len(); + let len = self.flat_child.len() as i64; + // see comment on hash_list for why we are collecting + let offsets: Vec = (0..=len).step_by(size).collect(); + hash_list( + self.name(), + &offsets, + &self.flat_child, + self.validity(), + seed, + ) + } +} + macro_rules! impl_int_murmur3_32 { ($ArrayT:ty) => { impl $ArrayT { diff --git a/src/daft-core/src/datatypes/matching.rs b/src/daft-core/src/datatypes/matching.rs index e3c72528f8..217f3ff30e 100644 --- a/src/daft-core/src/datatypes/matching.rs +++ b/src/daft-core/src/datatypes/matching.rs @@ -151,6 +151,39 @@ macro_rules! with_match_comparable_daft_types {( } })} +#[macro_export] +macro_rules! with_match_hashable_daft_types {( + $key_type:expr, | $_:tt $T:ident | $($body:tt)* +) => ({ + macro_rules! __with_ty__ {( $_ $T:ident ) => ( $($body)* )} + use $crate::datatypes::DataType::*; + #[allow(unused_imports)] + use $crate::datatypes::*; + + match $key_type { + Null => __with_ty__! { NullType }, + Boolean => __with_ty__! { BooleanType }, + Int8 => __with_ty__! { Int8Type }, + Int16 => __with_ty__! { Int16Type }, + Int32 => __with_ty__! { Int32Type }, + Int64 => __with_ty__! { Int64Type }, + Int128 => __with_ty__! { Int128Type }, + UInt8 => __with_ty__! { UInt8Type }, + UInt16 => __with_ty__! { UInt16Type }, + UInt32 => __with_ty__! { UInt32Type }, + UInt64 => __with_ty__! { UInt64Type }, + // Float16 => __with_ty__! { Float16Type }, + Float32 => __with_ty__! { Float32Type }, + Float64 => __with_ty__! { Float64Type }, + Utf8 => __with_ty__! { Utf8Type }, + Binary => __with_ty__! { BinaryType }, + FixedSizeBinary(_) => __with_ty__! { FixedSizeBinaryType }, + List(_) => __with_ty__! { ListType }, + FixedSizeList(_, _) => __with_ty__! { FixedSizeListType }, + _ => panic!("{:?} not implemented", $key_type) + } +})} + #[macro_export] macro_rules! with_match_numeric_daft_types {( $key_type:expr, | $_:tt $T:ident | $($body:tt)* diff --git a/src/daft-core/src/series/ops/groups.rs b/src/daft-core/src/series/ops/groups.rs index d15b411f3e..f2886e4876 100644 --- a/src/daft-core/src/series/ops/groups.rs +++ b/src/daft-core/src/series/ops/groups.rs @@ -1,14 +1,14 @@ use crate::{ array::ops::{GroupIndicesPair, IntoGroups}, series::Series, - with_match_comparable_daft_types, + with_match_hashable_daft_types, }; use common_error::DaftResult; impl IntoGroups for Series { fn make_groups(&self) -> DaftResult { let s = self.as_physical()?; - with_match_comparable_daft_types!(s.data_type(), |$T| { + with_match_hashable_daft_types!(s.data_type(), |$T| { let array = s.downcast::<<$T as DaftDataType>::ArrayType>()?; array.make_groups() }) diff --git a/src/daft-core/src/series/ops/hash.rs b/src/daft-core/src/series/ops/hash.rs index cd831b0f9f..368ed38990 100644 --- a/src/daft-core/src/series/ops/hash.rs +++ b/src/daft-core/src/series/ops/hash.rs @@ -1,14 +1,14 @@ use crate::{ datatypes::{Int32Array, UInt64Array}, series::Series, - with_match_comparable_daft_types, + with_match_hashable_daft_types, }; use common_error::DaftResult; impl Series { pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult { let s = self.as_physical()?; - with_match_comparable_daft_types!(s.data_type(), |$T| { + with_match_hashable_daft_types!(s.data_type(), |$T| { let downcasted = s.downcast::<<$T as DaftDataType>::ArrayType>()?; downcasted.hash(seed) }) diff --git a/tests/series/test_hash.py b/tests/series/test_hash.py index 33b1d7821b..b413e926fe 100644 --- a/tests/series/test_hash.py +++ b/tests/series/test_hash.py @@ -10,6 +10,7 @@ from daft.datatype import DataType from daft.series import Series +from tests.test_datatypes import daft_numeric_types @pytest.mark.parametrize( @@ -141,6 +142,216 @@ def test_hash_int_array_with_bad_length(): arr.hash(bad_seed) +@pytest.mark.parametrize("dtype", daft_numeric_types) +def test_hash_list_array_no_seed(dtype): + arr = Series.from_pylist([[1, 2], [1, 3], [1, 2], [1, 2, 3], [], [], [2, 1]]).cast(DataType.list(dtype)) + + hashed = arr.hash().to_pylist() + assert hashed[0] == hashed[2] + assert hashed[4] == hashed[5] + + different_inds = [0, 1, 3, 4, 6] + for i in range(len(different_inds)): + for j in range(i): + assert hashed[different_inds[i]] != hashed[different_inds[j]] + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +@pytest.mark.parametrize("seed", [1, 2, 42]) +def test_hash_list_array_seeded(dtype, seed): + arr = Series.from_pylist([[1, 2], [1, 3], [1, 2], [1, 2, 3], [], [], [2, 1]]).cast(DataType.list(dtype)) + seeds = Series.from_pylist([seed] * 9).cast(DataType.uint64()) + + hashed = arr.hash(seeds).to_pylist() + assert hashed[0] == hashed[2] + assert hashed[4] == hashed[5] + + different_inds = [0, 1, 3, 4, 6] + for i in range(len(different_inds)): + for j in range(i): + assert hashed[different_inds[i]] != hashed[different_inds[j]] + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +def test_hash_list_array_no_seed_with_invalid(dtype): + arr = Series.from_pylist([[1, 2], [1, 3], [1, 2], [1, 2, 3], [], [], None, [2, 1], None]).cast(DataType.list(dtype)) + + hashed = arr.hash().to_pylist() + assert hashed[0] == hashed[2] + assert hashed[4] == hashed[5] + assert hashed[6] is None + assert hashed[8] is None + + different_inds = [0, 1, 3, 4, 7] + for i in range(len(different_inds)): + for j in range(i): + assert hashed[different_inds[i]] != hashed[different_inds[j]] + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +@pytest.mark.parametrize("seed", [1, 2, 42]) +def test_hash_list_array_seeded_with_invalid(dtype, seed): + arr = Series.from_pylist([[1, 2], [1, 3], [1, 2], [1, 2, 3], [], [], None, [2, 1], None]).cast(DataType.list(dtype)) + seeds = Series.from_pylist([seed] * 9).cast(DataType.uint64()) + + hashed = arr.hash(seeds).to_pylist() + assert hashed[0] == hashed[2] + assert hashed[4] == hashed[5] + assert hashed[6] is None + assert hashed[8] is None + + different_inds = [0, 1, 3, 4, 7] + for i in range(len(different_inds)): + for j in range(i): + assert hashed[different_inds[i]] != hashed[different_inds[j]] + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +def test_hash_list_array_different_seeds(dtype): + arr = Series.from_pylist([[1, 2], [1, 2], [1, 2], [1, 2]]).cast(DataType.list(dtype)) + seeds = Series.from_pylist([1, 2, 3, 4]).cast(DataType.uint64()) + + hashed = arr.hash(seeds).to_pylist() + + different_inds = [0, 1, 2, 3] + for i in range(len(different_inds)): + for j in range(i): + assert hashed[different_inds[i]] != hashed[different_inds[j]] + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +def test_hash_list_array_nested_lists(dtype): + arr = Series.from_pylist( + [ + [[1, 2], [3, 4]], + [[1, 2], [3, 5]], + [[1, 2], [3, 4]], + [[3, 4], [1, 2]], + [[1], [2]], + [[], []], + [[]], + [], + [[1, 2, 3]], + [[1], [2], [3]], + ] + ).cast(DataType.list(DataType.list(dtype))) + + hashed = arr.hash().to_pylist() + assert hashed[0] == hashed[2] + + different_inds = [0, 1, 3, 4, 5, 6, 7, 8, 9] + for i in range(len(different_inds)): + for j in range(i): + assert hashed[different_inds[i]] != hashed[different_inds[j]] + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +def test_hash_list_array_consistency(dtype): + data = [[1, 2], [1, 3], [1, 2], [1, 2, 3], [], [], None, [2, 1], None] + arr1 = Series.from_pylist(data).cast(DataType.list(dtype)) + arr2 = Series.from_pylist(data).cast(DataType.list(dtype)) + + hashed1 = arr1.hash().to_pylist() + hashed2 = arr2.hash().to_pylist() + assert hashed1 == hashed2 + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +def test_hash_fixed_size_list_array_no_seed(dtype): + arr = Series.from_pylist([[1, 2], [1, 3], [1, 2], [1, 4], [5, 5], [5, 5], [2, 1]]).cast( + DataType.fixed_size_list(dtype, 2) + ) + + hashed = arr.hash().to_pylist() + assert hashed[0] == hashed[2] + assert hashed[4] == hashed[5] + + different_inds = [0, 1, 3, 4, 6] + for i in range(len(different_inds)): + for j in range(i): + assert hashed[different_inds[i]] != hashed[different_inds[j]] + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +@pytest.mark.parametrize("seed", [1, 2, 42]) +def test_hash_fixed_size_list_array_seeded(dtype, seed): + arr = Series.from_pylist([[1, 2], [1, 3], [1, 2], [1, 4], [5, 5], [5, 5], [2, 1]]).cast( + DataType.fixed_size_list(dtype, 2) + ) + seeds = Series.from_pylist([seed] * 9).cast(DataType.uint64()) + + hashed = arr.hash(seeds).to_pylist() + assert hashed[0] == hashed[2] + assert hashed[4] == hashed[5] + + different_inds = [0, 1, 3, 4, 6] + for i in range(len(different_inds)): + for j in range(i): + assert hashed[different_inds[i]] != hashed[different_inds[j]] + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +def test_hash_fixed_size_list_array_no_seed_with_invalid(dtype): + arr = Series.from_pylist([[1, 2], [1, 3], [1, 2], [1, 4], [5, 5], [5, 5], None, [2, 1], None]).cast( + DataType.fixed_size_list(dtype, 2) + ) + + hashed = arr.hash().to_pylist() + assert hashed[0] == hashed[2] + assert hashed[4] == hashed[5] + assert hashed[6] is None + assert hashed[8] is None + + different_inds = [0, 1, 3, 4, 7] + for i in range(len(different_inds)): + for j in range(i): + assert hashed[different_inds[i]] != hashed[different_inds[j]] + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +@pytest.mark.parametrize("seed", [1, 2, 42]) +def test_hash_fixed_size_list_array_seeded_with_invalid(dtype, seed): + arr = Series.from_pylist([[1, 2], [1, 3], [1, 2], [1, 4], [5, 5], [5, 5], None, [2, 1], None]).cast( + DataType.fixed_size_list(dtype, 2) + ) + seeds = Series.from_pylist([seed] * 9).cast(DataType.uint64()) + + hashed = arr.hash(seeds).to_pylist() + assert hashed[0] == hashed[2] + assert hashed[4] == hashed[5] + assert hashed[6] is None + assert hashed[8] is None + + different_inds = [0, 1, 3, 4, 7] + for i in range(len(different_inds)): + for j in range(i): + assert hashed[different_inds[i]] != hashed[different_inds[j]] + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +def test_hash_fixed_size_list_array_different_seeds(dtype): + arr = Series.from_pylist([[1, 2], [1, 2], [1, 2], [1, 2]]).cast(DataType.fixed_size_list(dtype, 2)) + seeds = Series.from_pylist([1, 2, 3, 4]).cast(DataType.uint64()) + + hashed = arr.hash(seeds).to_pylist() + + different_inds = [0, 1, 2, 3] + for i in range(len(different_inds)): + for j in range(i): + assert hashed[different_inds[i]] != hashed[different_inds[j]] + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +def test_hash_fixed_size_list_array_consistency(dtype): + data = [[1, 2], [1, 3], [1, 2], [1, 4], [5, 5], [5, 5], None, [2, 1], None] + arr1 = Series.from_pylist(data).cast(DataType.fixed_size_list(dtype, 2)) + arr2 = Series.from_pylist(data).cast(DataType.fixed_size_list(dtype, 2)) + + hashed1 = arr1.hash().to_pylist() + hashed2 = arr2.hash().to_pylist() + assert hashed1 == hashed2 + + @pytest.mark.parametrize( "dtype", [ diff --git a/tests/table/test_table_aggs.py b/tests/table/test_table_aggs.py index 84545ff954..2aba4b1f73 100644 --- a/tests/table/test_table_aggs.py +++ b/tests/table/test_table_aggs.py @@ -7,7 +7,7 @@ import pyarrow as pa import pytest -from daft import DataType, col, utils +from daft import DataType, col, from_pydict, utils from daft.logical.schema import Schema from daft.series import Series from daft.table import MicroPartition @@ -816,3 +816,39 @@ def test_concat_aggs_empty() -> None: res = daft_table.to_pydict() assert res == {"col_B": [], "concat": []} + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +def test_groupby_list(dtype) -> None: + df = from_pydict( + { + "a": [[1, 2, 3], [1, 2, 3], [1, 2], [], [1, 2, 3], [], [1, 2]], + "b": [0, 1, 2, 3, 4, 5, 6], + } + ).with_column("a", col("a").cast(DataType.list(dtype))) + res = df.groupby("a").agg_list("b").to_pydict() + expected = [[0, 1, 4], [2, 6], [3, 5]] + for lt in expected: + assert lt in res["b"] + + +@pytest.mark.parametrize("dtype", daft_numeric_types) +def test_groupby_fixed_size_list(dtype) -> None: + df = from_pydict( + { + "a": [ + [1, 2, 3], + [1, 2, 3], + [1, 2, 4], + [3, 2, 1], + [1, 2, 3], + [3, 2, 1], + [1, 2, 4], + ], + "b": [0, 1, 2, 3, 4, 5, 6], + } + ).with_column("a", col("a").cast(DataType.fixed_size_list(dtype, 3))) + res = df.groupby("a").agg_list("b").to_pydict() + expected = [[0, 1, 4], [2, 6], [3, 5]] + for lt in expected: + assert lt in res["b"]