From cd3d22730f096b564fd0fc6afb6aeafdb10668e5 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 21 Nov 2024 10:27:16 +0100 Subject: [PATCH] feat: add `HashTableAllocExt` This is similar to `RawTableAllocExt` and will help #13256. --- datafusion/common/src/utils/proxy.rs | 73 ++++++++++++++++++++- datafusion/execution/src/memory_pool/mod.rs | 4 +- 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/utils/proxy.rs b/datafusion/common/src/utils/proxy.rs index 5d14a1517129..b32164f682fa 100644 --- a/datafusion/common/src/utils/proxy.rs +++ b/datafusion/common/src/utils/proxy.rs @@ -17,7 +17,10 @@ //! [`VecAllocExt`] and [`RawTableAllocExt`] to help tracking of memory allocations -use hashbrown::raw::{Bucket, RawTable}; +use hashbrown::{ + hash_table::HashTable, + raw::{Bucket, RawTable}, +}; use std::mem::size_of; /// Extension trait for [`Vec`] to account for allocations. @@ -173,3 +176,71 @@ impl RawTableAllocExt for RawTable { } } } + +/// Extension trait for hash browns [`HashTable`] to account for allocations. +pub trait HashTableAllocExt { + /// Item type. + type T; + + /// Insert new element into table and increase + /// `accounting` by any newly allocated bytes. + /// + /// Returns the bucket where the element was inserted. + /// Note that allocation counts capacity, not size. + /// + /// # Example: + /// ``` + /// # use datafusion_common::utils::proxy::HashTableAllocExt; + /// # use hashbrown::hash_table::HashTable; + /// let mut table = HashTable::new(); + /// let mut allocated = 0; + /// let hash_fn = |x: &u32| (*x as u64) % 1000; + /// // pretend 0x3117 is the hash value for 1 + /// table.insert_accounted(1, hash_fn, &mut allocated); + /// assert_eq!(allocated, 64); + /// + /// // insert more values + /// for i in 0..100 { table.insert_accounted(i, hash_fn, &mut allocated); } + /// assert_eq!(allocated, 400); + /// ``` + fn insert_accounted( + &mut self, + x: Self::T, + hasher: impl Fn(&Self::T) -> u64, + accounting: &mut usize, + ); +} + +impl HashTableAllocExt for HashTable +where + T: Eq, +{ + type T = T; + + fn insert_accounted( + &mut self, + x: Self::T, + hasher: impl Fn(&Self::T) -> u64, + accounting: &mut usize, + ) { + let hash = hasher(&x); + + // NOTE: `find_entry` does NOT grow! + match self.find_entry(hash, |y| y == &x) { + Ok(_occupied) => {} + Err(_absent) => { + if self.len() == self.capacity() { + // need to request more memory + let bump_elements = self.capacity().max(16); + let bump_size = bump_elements * size_of::(); + *accounting = (*accounting).checked_add(bump_size).expect("overflow"); + + self.reserve(bump_elements, &hasher); + } + + // still need to insert the element since first try failed + self.entry(hash, |y| y == &x, hasher).insert(x); + } + } + } +} diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index 5bf30b724d0b..45d467f133bf 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -23,7 +23,9 @@ use std::{cmp::Ordering, sync::Arc}; mod pool; pub mod proxy { - pub use datafusion_common::utils::proxy::{RawTableAllocExt, VecAllocExt}; + pub use datafusion_common::utils::proxy::{ + HashTableAllocExt, RawTableAllocExt, VecAllocExt, + }; } pub use pool::*;