-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Outer joins for native executor #2860
Conversation
CodSpeed Performance ReportMerging #2860 will not alter performanceComparing Summary
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2860 +/- ##
==========================================
+ Coverage 78.49% 78.58% +0.09%
==========================================
Files 613 614 +1
Lines 72761 73064 +303
==========================================
+ Hits 57111 57418 +307
+ Misses 15650 15646 -4
|
input: &Arc<MicroPartition>, | ||
state: &mut InnerHashJoinProbeState, | ||
) -> DaftResult<Arc<MicroPartition>> { | ||
let (probe_table, tables) = state.get_probeable_and_table(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be worth making a struct type for
struct ProbeState {
probe_table
tables
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense! Implemented it
src/daft-local-execution/src/intermediate_ops/inner_hash_join_probe.rs
Outdated
Show resolved
Hide resolved
let mut build_side_growable = | ||
GrowableTable::new(&tables.iter().collect::<Vec<_>>(), true, 20)?; | ||
|
||
for (table_idx, row_idx) in merged_bitmap.get_unused_indices() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be much more performant using a BitmapIter
pub struct BitmapIter<'a> { |
Which will compress the adjacent valid bits so we can reduce the calls to extend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also convert the bitmap into a BooleanArray and use mask_filter
https://github.com/Eventual-Inc/Daft/blob/b1ea3b9749e01512f48dfd45f9899a329fc9799f/src/daft-table/src/lib.rs#L321
instead of iterating
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the BitmapIter returns an iterator over the individual bits? I just checked and there is also SlicesIterator
: https://github.com/Eventual-Inc/Daft/blob/b1ea3b9749e01512f48dfd45f9899a329fc9799f/src/arrow2/src/bitmap/utils/slice_iterator.rs which is a Iterator over a bitmap that returns slices of set regions
, did you mean this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, yup thats the one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I prefer the BooleanArray as a mask_filter
method more, it's a lot cleaner. Went with that in the latest commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very clean now!
} | ||
} | ||
|
||
fn get_probe_state(&self) -> Arc<ProbeState> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer &Arc<ProbeState>
since the caller may not need to clone.
Self { remaining } | ||
} | ||
|
||
fn get_remaining(&self) -> usize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also implement this as get_remaining_mut() -> &mut usize
that will give you a mutable reference to remaining
. That way you don't have to do it as two different steps.
Implement outer joins for Swordfish.
(Yes, this PR is a little big. But:
Outer join probes (and left/right now) are implemented as a Streaming Sink.
execute
phase of the streaming sink, probing is done concurrently via workers (this is the same implementation as all the other join types). The only difference is that during probing, workers will save the indices on the left side that have matches (using a mutable bitmap).finalize
phase, we merge together all the bitmaps across the concurrent workers (via a bitwise OR) to get a global view of all the indices that had matches. Then, we take all the indices that didn't get a match and return them (with nulls for the right side). This is the same logic we currently use for the python runner.used_indices
bitmaps for left/right joins as well.Note: I had to make Streaming Sink concurrency-aware to allow this. The changes in particular are:
max concurrency
, currently only LIMIT will have this set to 1.execute
accepts somemut state
and finalize will consolidate all of the state, i.e.Vec<Box<dyn State>>
.finalize
method doesn't get called before the workers are done with theexecutes
.