Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Outer joins for native executor #2860

Merged
merged 16 commits into from
Oct 22, 2024
Merged

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Sep 19, 2024

Implement outer joins for Swordfish.

(Yes, this PR is a little big. But:

  1. at least tests run in CI now, so you don't need to just take my word for it now.
  2. A lot of the diff is because I moved left/right joins to be together with the outer join operator. Therefore the HashJoinProbe operator is now just InnerHashJoinProbeOperator)

Outer join probes (and left/right now) are implemented as a Streaming Sink.

  • During the execute phase of the streaming sink, probing is done concurrently via workers (this is the same implementation as all the other join types). The only difference is that during probing, workers will save the indices on the left side that have matches (using a mutable bitmap).
  • During the finalize phase, we merge together all the bitmaps across the concurrent workers (via a bitwise OR) to get a global view of all the indices that had matches. Then, we take all the indices that didn't get a match and return them (with nulls for the right side). This is the same logic we currently use for the python runner.
  • Why is left/right with outer joins now? In the future, we may want to choose the build side for left/right/outer joins based on cardinality. This means that we may need the used_indices bitmaps for left/right joins as well.

Note: I had to make Streaming Sink concurrency-aware to allow this. The changes in particular are:

  • Streaming Sinks can specify max concurrency, currently only LIMIT will have this set to 1.
  • execute accepts some mut state and finalize will consolidate all of the state, i.e. Vec<Box<dyn State>>.
  • In order to make sure that all the workers are done, they are spawned on a Worker Set, and return their state when done. This ensures that the finalize method doesn't get called before the workers are done with the executes.

@github-actions github-actions bot added the enhancement New feature or request label Sep 19, 2024
Copy link

codspeed-hq bot commented Sep 19, 2024

CodSpeed Performance Report

Merging #2860 will not alter performance

Comparing colin/swordfish-outer-join (d4e8942) with main (4a8244b)

Summary

✅ 17 untouched benchmarks

@colin-ho colin-ho marked this pull request as ready for review September 19, 2024 21:16
@colin-ho colin-ho requested a review from samster25 September 19, 2024 21:17
Copy link

codecov bot commented Sep 19, 2024

Codecov Report

Attention: Patch coverage is 98.68668% with 7 lines in your changes missing coverage. Please review.

Project coverage is 78.58%. Comparing base (4a8244b) to head (d4e8942).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...local-execution/src/sinks/outer_hash_join_probe.rs 98.52% 4 Missing ⚠️
...tion/src/intermediate_ops/inner_hash_join_probe.rs 97.16% 3 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2860      +/-   ##
==========================================
+ Coverage   78.49%   78.58%   +0.09%     
==========================================
  Files         613      614       +1     
  Lines       72761    73064     +303     
==========================================
+ Hits        57111    57418     +307     
+ Misses      15650    15646       -4     
Files with missing lines Coverage Δ
.../src/intermediate_ops/anti_semi_hash_join_probe.rs 96.61% <100.00%> (+0.24%) ⬆️
src/daft-local-execution/src/pipeline.rs 93.56% <100.00%> (+1.22%) ⬆️
src/daft-local-execution/src/runtime_stats.rs 55.67% <100.00%> (ø)
.../daft-local-execution/src/sinks/hash_join_build.rs 95.00% <100.00%> (-0.24%) ⬇️
src/daft-local-execution/src/sinks/limit.rs 100.00% <100.00%> (ø)
...c/daft-local-execution/src/sinks/streaming_sink.rs 80.15% <100.00%> (+14.27%) ⬆️
src/daft-table/src/lib.rs 90.05% <ø> (ø)
src/daft-table/src/probeable/mod.rs 100.00% <100.00%> (ø)
...tion/src/intermediate_ops/inner_hash_join_probe.rs 97.16% <97.16%> (ø)
...local-execution/src/sinks/outer_hash_join_probe.rs 98.52% <98.52%> (ø)

... and 5 files with indirect coverage changes

src/arrow2/src/bitmap/mutable.rs Outdated Show resolved Hide resolved
src/arrow2/src/bitmap/mutable.rs Outdated Show resolved Hide resolved
src/daft-local-execution/Cargo.toml Outdated Show resolved Hide resolved
input: &Arc<MicroPartition>,
state: &mut InnerHashJoinProbeState,
) -> DaftResult<Arc<MicroPartition>> {
let (probe_table, tables) = state.get_probeable_and_table();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be worth making a struct type for

struct ProbeState {
  probe_table
  tables
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! Implemented it

let mut build_side_growable =
GrowableTable::new(&tables.iter().collect::<Vec<_>>(), true, 20)?;

for (table_idx, row_idx) in merged_bitmap.get_unused_indices() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be much more performant using a BitmapIter

pub struct BitmapIter<'a> {

Which will compress the adjacent valid bits so we can reduce the calls to extend.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also convert the bitmap into a BooleanArray and use mask_filter https://github.com/Eventual-Inc/Daft/blob/b1ea3b9749e01512f48dfd45f9899a329fc9799f/src/daft-table/src/lib.rs#L321 instead of iterating

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the BitmapIter returns an iterator over the individual bits? I just checked and there is also SlicesIterator: https://github.com/Eventual-Inc/Daft/blob/b1ea3b9749e01512f48dfd45f9899a329fc9799f/src/arrow2/src/bitmap/utils/slice_iterator.rs which is a Iterator over a bitmap that returns slices of set regions, did you mean this one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, yup thats the one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I prefer the BooleanArray as a mask_filter method more, it's a lot cleaner. Went with that in the latest commit.

Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very clean now!

}
}

fn get_probe_state(&self) -> Arc<ProbeState> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer &Arc<ProbeState> since the caller may not need to clone.

Self { remaining }
}

fn get_remaining(&self) -> usize {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also implement this as get_remaining_mut() -> &mut usize that will give you a mutable reference to remaining. That way you don't have to do it as two different steps.

@colin-ho colin-ho enabled auto-merge (squash) October 22, 2024 01:20
@colin-ho colin-ho merged commit b9b2d72 into main Oct 22, 2024
40 checks passed
@colin-ho colin-ho deleted the colin/swordfish-outer-join branch October 22, 2024 01:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants