Skip to content

Commit

Permalink
use mask filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Sep 26, 2024
1 parent f93847a commit d609336
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 65 deletions.
2 changes: 1 addition & 1 deletion src/daft-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
//!
//! This module re-exports commonly used items from the Daft core library.
// Re-export core series structures
// Re-export arrow2 bitmap
pub use arrow2::bitmap;
// Re-export core series structures
pub use daft_schema::schema::{Schema, SchemaRef};

// Re-export count mode enum
Expand Down
93 changes: 29 additions & 64 deletions src/daft-local-execution/src/sinks/outer_hash_join_probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::sync::Arc;
use common_error::DaftResult;
use daft_core::{
prelude::{
bitmap::{or, Bitmap, MutableBitmap},
Schema, SchemaRef,
bitmap::{and, Bitmap, MutableBitmap},
BooleanArray, Schema, SchemaRef,
},
series::Series,
series::{IntoSeries, Series},
};
use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;
Expand All @@ -19,78 +19,51 @@ use super::streaming_sink::{StreamingSink, StreamingSinkOutput, StreamingSinkSta
use crate::pipeline::PipelineResultType;

struct IndexBitmapBuilder {
bitmap: MutableBitmap,
prefix_sums: Vec<usize>,
mutable_bitmaps: Vec<MutableBitmap>,
}

impl IndexBitmapBuilder {
fn new(tables: &[Table]) -> Self {
let prefix_sums = tables
.iter()
.map(|t| t.len())
.scan(0, |acc, x| {
let prev = *acc;
*acc += x;
Some(prev)
})
.collect::<Vec<_>>();
let total_len = prefix_sums.last().unwrap() + tables.last().unwrap().len();
Self {
bitmap: MutableBitmap::from_len_zeroed(total_len),
prefix_sums,
mutable_bitmaps: tables
.iter()
.map(|t| MutableBitmap::from_len_set(t.len()))
.collect(),
}
}

#[inline]
fn mark_used(&mut self, table_idx: usize, row_idx: usize) {
let idx = self.prefix_sums[table_idx] + row_idx;
self.bitmap.set(idx, true);
self.mutable_bitmaps[table_idx].set(row_idx, false);
}

fn build(self) -> IndexBitmap {
IndexBitmap {
bitmap: self.bitmap.into(),
prefix_sums: self.prefix_sums,
bitmaps: self.mutable_bitmaps.into_iter().map(|b| b.into()).collect(),
}
}
}

struct IndexBitmap {
bitmap: Bitmap,
prefix_sums: Vec<usize>,
bitmaps: Vec<Bitmap>,
}

impl IndexBitmap {
fn or(&self, other: &Self) -> Self {
assert_eq!(self.prefix_sums, other.prefix_sums);
fn merge(&self, other: &Self) -> Self {
Self {
bitmap: or(&self.bitmap, &other.bitmap),
prefix_sums: self.prefix_sums.clone(),
bitmaps: self
.bitmaps
.iter()
.zip(other.bitmaps.iter())
.map(|(a, b)| and(a, b))
.collect(),
}
}

fn unset_bits(&self) -> usize {
self.bitmap.unset_bits()
}

fn get_unused_indices(&self) -> impl Iterator<Item = (usize, usize)> + '_ {
let mut curr_table = 0;
self.bitmap
.iter()
.enumerate()
.filter_map(move |(idx, is_set)| {
if is_set {
None
} else {
while curr_table < self.prefix_sums.len() - 1
&& idx >= self.prefix_sums[curr_table + 1]
{
curr_table += 1;
}
let row_idx = idx - self.prefix_sums[curr_table];
Some((curr_table, row_idx))
}
})
fn convert_to_boolean_arrays(self) -> impl Iterator<Item = BooleanArray> {
self.bitmaps
.into_iter()
.map(|b| BooleanArray::from(("bitmap", b)))
}
}

Expand Down Expand Up @@ -196,15 +169,13 @@ impl OuterHashJoinProbeSink {
};

let _growables = info_span!("OuterHashJoinProbeSink::build_growables").entered();

let mut build_side_growable = GrowableTable::new(
&tables.iter().collect::<Vec<_>>(),
true,
tables.iter().map(|t| t.len()).sum(),
)?;

let input_tables = input.get_tables()?;

let mut probe_side_growable =
GrowableTable::new(&input_tables.iter().collect::<Vec<_>>(), false, input.len())?;

Expand Down Expand Up @@ -265,7 +236,6 @@ impl OuterHashJoinProbeSink {
};
let bitmap_builder = state.get_bitmap_builder();
let _growables = info_span!("OuterHashJoinProbeSink::build_growables").entered();

// Need to set use_validity to true here because we add nulls to the build side
let mut build_side_growable = GrowableTable::new(
&tables.iter().collect::<Vec<_>>(),
Expand All @@ -274,7 +244,6 @@ impl OuterHashJoinProbeSink {
)?;

let input_tables = input.get_tables()?;

let mut probe_side_growable =
GrowableTable::new(&input_tables.iter().collect::<Vec<_>>(), false, input.len())?;

Expand Down Expand Up @@ -351,22 +320,18 @@ impl OuterHashJoinProbeSink {
});
bitmaps.fold(None, |acc, x| match acc {
None => Some(x),
Some(acc) => Some(acc.or(&x)),
Some(acc) => Some(acc.merge(&x)),
})
}
.expect("at least one bitmap should be present");

let mut build_side_growable = GrowableTable::new(
&tables.iter().collect::<Vec<_>>(),
true,
merged_bitmap.unset_bits(),
)?;

for (table_idx, row_idx) in merged_bitmap.get_unused_indices() {
build_side_growable.extend(table_idx, row_idx, 1);
}
let leftovers = merged_bitmap
.convert_to_boolean_arrays()
.zip(tables.iter())
.map(|(bitmap, table)| table.mask_filter(&bitmap.into_series()))
.collect::<DaftResult<Vec<_>>>()?;

let build_side_table = build_side_growable.build()?;
let build_side_table = Table::concat(&leftovers)?;

let join_table = build_side_table.get_columns(&self.common_join_keys)?;
let left = build_side_table.get_columns(&self.left_non_join_columns)?;
Expand Down

0 comments on commit d609336

Please sign in to comment.