diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 10b00cf74fdb7..3431ab6dd5028 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -42,11 +42,11 @@ use arrow_array::{Array, ArrayRef}; use arrow_schema::{DataType, Schema, SchemaRef, TimeUnit}; use datafusion_common::hash_utils::create_hashes; use datafusion_common::{not_impl_err, DataFusionError, Result}; -use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; +use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; -use hashbrown::raw::RawTable; +use hashbrown::hash_table::HashTable; const NON_INLINED_FLAG: u64 = 0x8000000000000000; const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF; @@ -180,7 +180,7 @@ pub struct GroupValuesColumn { /// And we use [`GroupIndexView`] to represent such `group indices` in table. /// /// - map: RawTable<(u64, GroupIndexView)>, + map: HashTable<(u64, GroupIndexView)>, /// The size of `map` in bytes map_size: usize, @@ -261,7 +261,7 @@ impl GroupValuesColumn { /// Create a new instance of GroupValuesColumn if supported for the specified schema pub fn try_new(schema: SchemaRef) -> Result { - let map = RawTable::with_capacity(0); + let map = HashTable::with_capacity(0); Ok(Self { schema, map, @@ -338,7 +338,7 @@ impl GroupValuesColumn { for (row, &target_hash) in batch_hashes.iter().enumerate() { let entry = self .map - .get_mut(target_hash, |(exist_hash, group_idx_view)| { + .find_mut(target_hash, |(exist_hash, group_idx_view)| { // It is ensured to be inlined in `scalarized_intern` debug_assert!(!group_idx_view.is_non_inlined()); @@ -506,7 +506,7 @@ impl GroupValuesColumn { for (row, &target_hash) in batch_hashes.iter().enumerate() { let entry = self .map - .get(target_hash, |(exist_hash, _)| target_hash == *exist_hash); + .find(target_hash, |(exist_hash, _)| target_hash == *exist_hash); let Some((_, group_index_view)) = entry else { // 1. Bucket not found case @@ -733,7 +733,7 @@ impl GroupValuesColumn { for &row in &self.vectorized_operation_buffers.remaining_row_indices { let target_hash = batch_hashes[row]; - let entry = map.get_mut(target_hash, |(exist_hash, _)| { + let entry = map.find_mut(target_hash, |(exist_hash, _)| { // Somewhat surprisingly, this closure can be called even if the // hash doesn't match, so check the hash first with an integer // comparison first avoid the more expensive comparison with @@ -852,7 +852,7 @@ impl GroupValuesColumn { /// Return group indices of the hash, also if its `group_index_view` is non-inlined #[cfg(test)] fn get_indices_by_hash(&self, hash: u64) -> Option<(Vec, GroupIndexView)> { - let entry = self.map.get(hash, |(exist_hash, _)| hash == *exist_hash); + let entry = self.map.find(hash, |(exist_hash, _)| hash == *exist_hash); match entry { Some((_, group_index_view)) => { @@ -1022,67 +1022,63 @@ impl GroupValues for GroupValuesColumn { .collect::>(); let mut next_new_list_offset = 0; - // SAFETY: self.map outlives iterator and is not modified concurrently - unsafe { - for bucket in self.map.iter() { - // In non-streaming case, we need to check if the `group index view` - // is `inlined` or `non-inlined` - if !STREAMING && bucket.as_ref().1.is_non_inlined() { - // Non-inlined case - // We take `group_index_list` from `old_group_index_lists` - - // list_offset is incrementally - self.emit_group_index_list_buffer.clear(); - let list_offset = bucket.as_ref().1.value() as usize; - for group_index in self.group_index_lists[list_offset].iter() - { - if let Some(remaining) = group_index.checked_sub(n) { - self.emit_group_index_list_buffer.push(remaining); - } + self.map.retain(|(_exist_hash, group_idx_view)| { + // In non-streaming case, we need to check if the `group index view` + // is `inlined` or `non-inlined` + if !STREAMING && group_idx_view.is_non_inlined() { + // Non-inlined case + // We take `group_index_list` from `old_group_index_lists` + + // list_offset is incrementally + self.emit_group_index_list_buffer.clear(); + let list_offset = group_idx_view.value() as usize; + for group_index in self.group_index_lists[list_offset].iter() { + if let Some(remaining) = group_index.checked_sub(n) { + self.emit_group_index_list_buffer.push(remaining); } - - // The possible results: - // - `new_group_index_list` is empty, we should erase this bucket - // - only one value in `new_group_index_list`, switch the `view` to `inlined` - // - still multiple values in `new_group_index_list`, build and set the new `unlined view` - if self.emit_group_index_list_buffer.is_empty() { - self.map.erase(bucket); - } else if self.emit_group_index_list_buffer.len() == 1 { - let group_index = - self.emit_group_index_list_buffer.first().unwrap(); - bucket.as_mut().1 = - GroupIndexView::new_inlined(*group_index as u64); - } else { - let group_index_list = - &mut self.group_index_lists[next_new_list_offset]; - group_index_list.clear(); - group_index_list - .extend(self.emit_group_index_list_buffer.iter()); - bucket.as_mut().1 = GroupIndexView::new_non_inlined( - next_new_list_offset as u64, - ); - next_new_list_offset += 1; - } - - continue; } + // The possible results: + // - `new_group_index_list` is empty, we should erase this bucket + // - only one value in `new_group_index_list`, switch the `view` to `inlined` + // - still multiple values in `new_group_index_list`, build and set the new `unlined view` + if self.emit_group_index_list_buffer.is_empty() { + false + } else if self.emit_group_index_list_buffer.len() == 1 { + let group_index = + self.emit_group_index_list_buffer.first().unwrap(); + *group_idx_view = + GroupIndexView::new_inlined(*group_index as u64); + true + } else { + let group_index_list = + &mut self.group_index_lists[next_new_list_offset]; + group_index_list.clear(); + group_index_list + .extend(self.emit_group_index_list_buffer.iter()); + *group_idx_view = GroupIndexView::new_non_inlined( + next_new_list_offset as u64, + ); + next_new_list_offset += 1; + true + } + } else { // In `streaming case`, the `group index view` is ensured to be `inlined` - debug_assert!(!bucket.as_ref().1.is_non_inlined()); + debug_assert!(!group_idx_view.is_non_inlined()); // Inlined case, we just decrement group index by n) - let group_index = bucket.as_ref().1.value() as usize; + let group_index = group_idx_view.value() as usize; match group_index.checked_sub(n) { // Group index was >= n, shift value down Some(sub) => { - bucket.as_mut().1 = - GroupIndexView::new_inlined(sub as u64) + *group_idx_view = GroupIndexView::new_inlined(sub as u64); + true } // Group index was < n, so remove from table - None => self.map.erase(bucket), + None => false, } } - } + }); if !STREAMING { self.group_index_lists.truncate(next_new_list_offset); @@ -1173,7 +1169,7 @@ mod tests { use arrow::{compute::concat_batches, util::pretty::pretty_format_batches}; use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray, StringViewArray}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::utils::proxy::RawTableAllocExt; + use datafusion_common::utils::proxy::HashTableAllocExt; use datafusion_expr::EmitTo; use crate::aggregates::group_values::{