Skip to content

Commit

Permalink
refactor: migrate GroupValuesRows to HashTable
Browse files Browse the repository at this point in the history
  • Loading branch information
crepererum committed Nov 22, 2024
1 parent 207e855 commit 5227895
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use arrow_array::{Array, ArrayRef, ListArray, StructArray};
use arrow_schema::{DataType, SchemaRef};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::Result;
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
use hashbrown::raw::RawTable;
use hashbrown::hash_table::HashTable;
use log::debug;
use std::mem::size_of;
use std::sync::Arc;
Expand Down Expand Up @@ -54,7 +54,7 @@ pub struct GroupValuesRows {
///
/// keys: u64 hashes of the GroupValue
/// values: (hash, group_index)
map: RawTable<(u64, usize)>,
map: HashTable<(u64, usize)>,

/// The size of `map` in bytes
map_size: usize,
Expand Down Expand Up @@ -92,7 +92,7 @@ impl GroupValuesRows {
.collect(),
)?;

let map = RawTable::with_capacity(0);
let map = HashTable::with_capacity(0);

let starting_rows_capacity = 1000;

Expand Down Expand Up @@ -135,7 +135,7 @@ impl GroupValues for GroupValuesRows {
create_hashes(cols, &self.random_state, batch_hashes)?;

for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self.map.get_mut(target_hash, |(exist_hash, group_idx)| {
let entry = self.map.find_mut(target_hash, |(exist_hash, group_idx)| {
// Somewhat surprisingly, this closure can be called even if the
// hash doesn't match, so check the hash first with an integer
// comparison first avoid the more expensive comparison with
Expand Down Expand Up @@ -216,18 +216,18 @@ impl GroupValues for GroupValuesRows {
}
std::mem::swap(&mut new_group_values, &mut group_values);

// SAFETY: self.map outlives iterator and is not modified concurrently
unsafe {
for bucket in self.map.iter() {
// Decrement group index by n
match bucket.as_ref().1.checked_sub(n) {
// Group index was >= n, shift value down
Some(sub) => bucket.as_mut().1 = sub,
// Group index was < n, so remove from table
None => self.map.erase(bucket),
self.map.retain(|(_exists_hash, group_idx)| {
// Decrement group index by n
match group_idx.checked_sub(n) {
// Group index was >= n, shift value down
Some(sub) => {
*group_idx = sub;
true
}
// Group index was < n, so remove from table
None => false,
}
}
});
output
}
};
Expand Down

0 comments on commit 5227895

Please sign in to comment.