From d60933691640f30bc1ec9534a20fc51997ade143 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Wed, 25 Sep 2024 22:15:37 -0700 Subject: [PATCH] use mask filter --- src/daft-core/src/prelude.rs | 2 +- .../src/sinks/outer_hash_join_probe.rs | 93 ++++++------------- 2 files changed, 30 insertions(+), 65 deletions(-) diff --git a/src/daft-core/src/prelude.rs b/src/daft-core/src/prelude.rs index 0de797d1b0..6f6ecaf5a5 100644 --- a/src/daft-core/src/prelude.rs +++ b/src/daft-core/src/prelude.rs @@ -2,9 +2,9 @@ //! //! This module re-exports commonly used items from the Daft core library. -// Re-export core series structures // Re-export arrow2 bitmap pub use arrow2::bitmap; +// Re-export core series structures pub use daft_schema::schema::{Schema, SchemaRef}; // Re-export count mode enum diff --git a/src/daft-local-execution/src/sinks/outer_hash_join_probe.rs b/src/daft-local-execution/src/sinks/outer_hash_join_probe.rs index 2a15bf062f..f1bf813823 100644 --- a/src/daft-local-execution/src/sinks/outer_hash_join_probe.rs +++ b/src/daft-local-execution/src/sinks/outer_hash_join_probe.rs @@ -3,10 +3,10 @@ use std::sync::Arc; use common_error::DaftResult; use daft_core::{ prelude::{ - bitmap::{or, Bitmap, MutableBitmap}, - Schema, SchemaRef, + bitmap::{and, Bitmap, MutableBitmap}, + BooleanArray, Schema, SchemaRef, }, - series::Series, + series::{IntoSeries, Series}, }; use daft_dsl::ExprRef; use daft_micropartition::MicroPartition; @@ -19,78 +19,51 @@ use super::streaming_sink::{StreamingSink, StreamingSinkOutput, StreamingSinkSta use crate::pipeline::PipelineResultType; struct IndexBitmapBuilder { - bitmap: MutableBitmap, - prefix_sums: Vec, + mutable_bitmaps: Vec, } impl IndexBitmapBuilder { fn new(tables: &[Table]) -> Self { - let prefix_sums = tables - .iter() - .map(|t| t.len()) - .scan(0, |acc, x| { - let prev = *acc; - *acc += x; - Some(prev) - }) - .collect::>(); - let total_len = prefix_sums.last().unwrap() + tables.last().unwrap().len(); Self { - bitmap: MutableBitmap::from_len_zeroed(total_len), - prefix_sums, + mutable_bitmaps: tables + .iter() + .map(|t| MutableBitmap::from_len_set(t.len())) + .collect(), } } #[inline] fn mark_used(&mut self, table_idx: usize, row_idx: usize) { - let idx = self.prefix_sums[table_idx] + row_idx; - self.bitmap.set(idx, true); + self.mutable_bitmaps[table_idx].set(row_idx, false); } fn build(self) -> IndexBitmap { IndexBitmap { - bitmap: self.bitmap.into(), - prefix_sums: self.prefix_sums, + bitmaps: self.mutable_bitmaps.into_iter().map(|b| b.into()).collect(), } } } struct IndexBitmap { - bitmap: Bitmap, - prefix_sums: Vec, + bitmaps: Vec, } impl IndexBitmap { - fn or(&self, other: &Self) -> Self { - assert_eq!(self.prefix_sums, other.prefix_sums); + fn merge(&self, other: &Self) -> Self { Self { - bitmap: or(&self.bitmap, &other.bitmap), - prefix_sums: self.prefix_sums.clone(), + bitmaps: self + .bitmaps + .iter() + .zip(other.bitmaps.iter()) + .map(|(a, b)| and(a, b)) + .collect(), } } - fn unset_bits(&self) -> usize { - self.bitmap.unset_bits() - } - - fn get_unused_indices(&self) -> impl Iterator + '_ { - let mut curr_table = 0; - self.bitmap - .iter() - .enumerate() - .filter_map(move |(idx, is_set)| { - if is_set { - None - } else { - while curr_table < self.prefix_sums.len() - 1 - && idx >= self.prefix_sums[curr_table + 1] - { - curr_table += 1; - } - let row_idx = idx - self.prefix_sums[curr_table]; - Some((curr_table, row_idx)) - } - }) + fn convert_to_boolean_arrays(self) -> impl Iterator { + self.bitmaps + .into_iter() + .map(|b| BooleanArray::from(("bitmap", b))) } } @@ -196,7 +169,6 @@ impl OuterHashJoinProbeSink { }; let _growables = info_span!("OuterHashJoinProbeSink::build_growables").entered(); - let mut build_side_growable = GrowableTable::new( &tables.iter().collect::>(), true, @@ -204,7 +176,6 @@ impl OuterHashJoinProbeSink { )?; let input_tables = input.get_tables()?; - let mut probe_side_growable = GrowableTable::new(&input_tables.iter().collect::>(), false, input.len())?; @@ -265,7 +236,6 @@ impl OuterHashJoinProbeSink { }; let bitmap_builder = state.get_bitmap_builder(); let _growables = info_span!("OuterHashJoinProbeSink::build_growables").entered(); - // Need to set use_validity to true here because we add nulls to the build side let mut build_side_growable = GrowableTable::new( &tables.iter().collect::>(), @@ -274,7 +244,6 @@ impl OuterHashJoinProbeSink { )?; let input_tables = input.get_tables()?; - let mut probe_side_growable = GrowableTable::new(&input_tables.iter().collect::>(), false, input.len())?; @@ -351,22 +320,18 @@ impl OuterHashJoinProbeSink { }); bitmaps.fold(None, |acc, x| match acc { None => Some(x), - Some(acc) => Some(acc.or(&x)), + Some(acc) => Some(acc.merge(&x)), }) } .expect("at least one bitmap should be present"); - let mut build_side_growable = GrowableTable::new( - &tables.iter().collect::>(), - true, - merged_bitmap.unset_bits(), - )?; - - for (table_idx, row_idx) in merged_bitmap.get_unused_indices() { - build_side_growable.extend(table_idx, row_idx, 1); - } + let leftovers = merged_bitmap + .convert_to_boolean_arrays() + .zip(tables.iter()) + .map(|(bitmap, table)| table.mask_filter(&bitmap.into_series())) + .collect::>>()?; - let build_side_table = build_side_growable.build()?; + let build_side_table = Table::concat(&leftovers)?; let join_table = build_side_table.get_columns(&self.common_join_keys)?; let left = build_side_table.get_columns(&self.left_non_join_columns)?;