Skip to content

Commit

Permalink
feat: add HashTableAllocExt
Browse files Browse the repository at this point in the history
This is similar to `RawTableAllocExt` and will help apache#13256.
  • Loading branch information
crepererum committed Nov 21, 2024
1 parent 9fb7aee commit cd3d227
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 2 deletions.
73 changes: 72 additions & 1 deletion datafusion/common/src/utils/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

//! [`VecAllocExt`] and [`RawTableAllocExt`] to help tracking of memory allocations

use hashbrown::raw::{Bucket, RawTable};
use hashbrown::{
hash_table::HashTable,
raw::{Bucket, RawTable},
};
use std::mem::size_of;

/// Extension trait for [`Vec`] to account for allocations.
Expand Down Expand Up @@ -173,3 +176,71 @@ impl<T> RawTableAllocExt for RawTable<T> {
}
}
}

/// Extension trait for hash browns [`HashTable`] to account for allocations.
pub trait HashTableAllocExt {
/// Item type.
type T;

/// Insert new element into table and increase
/// `accounting` by any newly allocated bytes.
///
/// Returns the bucket where the element was inserted.
/// Note that allocation counts capacity, not size.
///
/// # Example:
/// ```
/// # use datafusion_common::utils::proxy::HashTableAllocExt;
/// # use hashbrown::hash_table::HashTable;
/// let mut table = HashTable::new();
/// let mut allocated = 0;
/// let hash_fn = |x: &u32| (*x as u64) % 1000;
/// // pretend 0x3117 is the hash value for 1
/// table.insert_accounted(1, hash_fn, &mut allocated);
/// assert_eq!(allocated, 64);
///
/// // insert more values
/// for i in 0..100 { table.insert_accounted(i, hash_fn, &mut allocated); }
/// assert_eq!(allocated, 400);
/// ```
fn insert_accounted(
&mut self,
x: Self::T,
hasher: impl Fn(&Self::T) -> u64,
accounting: &mut usize,
);
}

impl<T> HashTableAllocExt for HashTable<T>
where
T: Eq,
{
type T = T;

fn insert_accounted(
&mut self,
x: Self::T,
hasher: impl Fn(&Self::T) -> u64,
accounting: &mut usize,
) {
let hash = hasher(&x);

// NOTE: `find_entry` does NOT grow!
match self.find_entry(hash, |y| y == &x) {
Ok(_occupied) => {}
Err(_absent) => {
if self.len() == self.capacity() {
// need to request more memory
let bump_elements = self.capacity().max(16);
let bump_size = bump_elements * size_of::<T>();
*accounting = (*accounting).checked_add(bump_size).expect("overflow");

self.reserve(bump_elements, &hasher);
}

// still need to insert the element since first try failed
self.entry(hash, |y| y == &x, hasher).insert(x);
}
}
}
}
4 changes: 3 additions & 1 deletion datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::{cmp::Ordering, sync::Arc};

mod pool;
pub mod proxy {
pub use datafusion_common::utils::proxy::{RawTableAllocExt, VecAllocExt};
pub use datafusion_common::utils::proxy::{
HashTableAllocExt, RawTableAllocExt, VecAllocExt,
};
}

pub use pool::*;
Expand Down

0 comments on commit cd3d227

Please sign in to comment.