Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: change some hashbrown RawTable uses to HashTable (round 3) #13658

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};

use futures::stream::Stream;
use futures::{ready, StreamExt};
use hashbrown::raw::RawTable;
use hashbrown::hash_table::HashTable;
use indexmap::IndexMap;
use log::debug;

Expand Down Expand Up @@ -442,16 +442,16 @@ pub struct LinearSearch {
/// is ordered by a, b and the window expression contains a PARTITION BY b, a
/// clause, this attribute stores [1, 0].
ordered_partition_by_indices: Vec<usize>,
/// We use this [`RawTable`] to calculate unique partitions for each new
/// We use this [`HashTable`] to calculate unique partitions for each new
/// RecordBatch. First entry in the tuple is the hash value, the second
/// entry is the unique ID for each partition (increments from 0 to n).
row_map_batch: RawTable<(u64, usize)>,
/// We use this [`RawTable`] to calculate the output columns that we can
row_map_batch: HashTable<(u64, usize)>,
/// We use this [`HashTable`] to calculate the output columns that we can
/// produce at each cycle. First entry in the tuple is the hash value, the
/// second entry is the unique ID for each partition (increments from 0 to n).
/// The third entry stores how many new outputs are calculated for the
/// corresponding partition.
row_map_out: RawTable<(u64, usize, usize)>,
row_map_out: HashTable<(u64, usize, usize)>,
input_schema: SchemaRef,
}

Expand Down Expand Up @@ -610,8 +610,8 @@ impl LinearSearch {
input_buffer_hashes: VecDeque::new(),
random_state: Default::default(),
ordered_partition_by_indices,
row_map_batch: RawTable::with_capacity(256),
row_map_out: RawTable::with_capacity(256),
row_map_batch: HashTable::with_capacity(256),
row_map_out: HashTable::with_capacity(256),
input_schema,
}
}
Expand All @@ -631,7 +631,7 @@ impl LinearSearch {
// res stores PartitionKey and row indices (indices where these partition occurs in the `batch`) for each partition.
let mut result: Vec<(PartitionKey, Vec<u32>)> = vec![];
for (hash, row_idx) in batch_hashes.into_iter().zip(0u32..) {
let entry = self.row_map_batch.get_mut(hash, |(_, group_idx)| {
let entry = self.row_map_batch.find_mut(hash, |(_, group_idx)| {
// We can safely get the first index of the partition indices
// since partition indices has one element during initialization.
let row = get_row_at_idx(columns, row_idx as usize).unwrap();
Expand All @@ -641,8 +641,11 @@ impl LinearSearch {
if let Some((_, group_idx)) = entry {
result[*group_idx].1.push(row_idx)
} else {
self.row_map_batch
.insert(hash, (hash, result.len()), |(hash, _)| *hash);
self.row_map_batch.insert_unique(
hash,
(hash, result.len()),
|(hash, _)| *hash,
);
let row = get_row_at_idx(columns, row_idx as usize)?;
// This is a new partition its only index is row_idx for now.
result.push((row, vec![row_idx]));
Expand All @@ -667,7 +670,7 @@ impl LinearSearch {
self.row_map_out.clear();
let mut partition_indices: Vec<(PartitionKey, Vec<u32>)> = vec![];
for (hash, row_idx) in self.input_buffer_hashes.iter().zip(0u32..) {
let entry = self.row_map_out.get_mut(*hash, |(_, group_idx, _)| {
let entry = self.row_map_out.find_mut(*hash, |(_, group_idx, _)| {
let row =
get_row_at_idx(&partition_by_columns, row_idx as usize).unwrap();
row == partition_indices[*group_idx].0
Expand All @@ -693,7 +696,7 @@ impl LinearSearch {
if min_out == 0 {
break;
}
self.row_map_out.insert(
self.row_map_out.insert_unique(
*hash,
(*hash, partition_indices.len(), min_out),
|(hash, _, _)| *hash,
Expand Down
Loading