Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Swordfish perf + cleanup #3132

Closed
wants to merge 12 commits into from
Closed

Conversation

colin-ho
Copy link
Contributor

No description provided.

@colin-ho colin-ho changed the base branch from colin/swordfish-boost to main October 26, 2024 04:44
@github-actions github-actions bot added the chore label Oct 26, 2024
@colin-ho colin-ho marked this pull request as ready for review October 26, 2024 04:46
Copy link

codspeed-hq bot commented Oct 26, 2024

CodSpeed Performance Report

Merging #3132 will not alter performance

Comparing colin/swordfish-boost-2 (7deb319) with main (96c538b)

Summary

✅ 17 untouched benchmarks

@colin-ho colin-ho marked this pull request as draft October 26, 2024 05:05
Comment on lines 25 to 56
pub(crate) type ProbeStateBridgeRef = Arc<ProbeStateBridge>;
pub(crate) struct ProbeStateBridge {
inner: OnceLock<Arc<ProbeState>>,
notify: tokio::sync::Notify,
}

impl ProbeStateBridge {
fn new() -> Arc<Self> {
Arc::new(Self {
inner: OnceLock::new(),
notify: tokio::sync::Notify::new(),
})
}

fn set_probe_state(&self, state: Arc<ProbeState>) {
assert!(
!self.inner.set(state).is_err(),
"ProbeStateBridge should be set only once"
);
self.notify.notify_waiters();
}

async fn get_probe_state(&self) -> Arc<ProbeState> {
loop {
if let Some(state) = self.inner.get() {
return state.clone();
}
self.notify.notified().await;
}
}
}

Copy link
Contributor Author

@colin-ho colin-ho Oct 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samster25 I was rethinking how we transfer data from hashjoinbuild to hashjoinprobe, and thought that using the same channel for data and probetables was kinda clunky and confusing. So I came up with this mechanism instead, with some inspiration from velox

Essentially, hashjoinbuild and hashjoinprobe will share a copy of the probestatebridge in order to share the probestate data. This way, PipelineNode doesn't need to be aware of any kind of data except micropartitions.

CC @kevinzwang if you have any thoughts as well, i feel like i'm staring at swordfish code way too much lol, need more opinions

Copy link

codecov bot commented Oct 26, 2024

Codecov Report

Attention: Patch coverage is 96.20563% with 31 lines in your changes missing coverage. Please review.

Project coverage is 79.06%. Comparing base (9d4adfb) to head (3ace474).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...local-execution/src/sinks/outer_hash_join_probe.rs 90.05% 17 Missing ⚠️
...ecution/src/intermediate_ops/actor_pool_project.rs 84.61% 4 Missing ⚠️
.../daft-local-execution/src/sinks/hash_join_build.rs 91.42% 3 Missing ⚠️
src/daft-local-execution/src/sinks/pivot.rs 96.70% 3 Missing ⚠️
src/daft-local-execution/src/pipeline.rs 92.30% 2 Missing ⚠️
src/daft-local-execution/src/dispatcher.rs 98.98% 1 Missing ⚠️
.../src/intermediate_ops/anti_semi_hash_join_probe.rs 97.14% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3132      +/-   ##
==========================================
+ Coverage   79.01%   79.06%   +0.05%     
==========================================
  Files         634      633       -1     
  Lines       76942    77136     +194     
==========================================
+ Hits        60792    60984     +192     
- Misses      16150    16152       +2     
Files with missing lines Coverage Δ
src/daft-local-execution/src/channel.rs 98.41% <100.00%> (-0.10%) ⬇️
...ft-local-execution/src/intermediate_ops/explode.rs 100.00% <100.00%> (ø)
...aft-local-execution/src/intermediate_ops/filter.rs 100.00% <100.00%> (ø)
...tion/src/intermediate_ops/inner_hash_join_probe.rs 99.21% <100.00%> (+3.45%) ⬆️
...-execution/src/intermediate_ops/intermediate_op.rs 80.31% <100.00%> (-1.74%) ⬇️
...ft-local-execution/src/intermediate_ops/project.rs 100.00% <100.00%> (ø)
...aft-local-execution/src/intermediate_ops/sample.rs 100.00% <100.00%> (ø)
...ft-local-execution/src/intermediate_ops/unpivot.rs 100.00% <100.00%> (ø)
src/daft-local-execution/src/lib.rs 94.02% <100.00%> (+1.17%) ⬆️
src/daft-local-execution/src/run.rs 88.54% <100.00%> (ø)
... and 19 more

... and 8 files with indirect coverage changes

sender: PipelineSender,
receiver: PipelineReceiver,
pub(crate) fn create_channel<T>(buffer_size: usize) -> (Sender<T>, Receiver<T>) {
loole::bounded(buffer_size)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinzwang made a great callout the other day that the round-robin dispatching could be inefficient if ordering is not required. This can be fixed using https://docs.rs/loole/latest/loole/ channels, which are multi-producer multi-consumer. This essentially makes it work-stealing if maintaining order is not required.

daft.context.set_execution_config(enable_native_executor=True, default_morsel_size=1)


@daft.udf(return_dtype=daft.DataType.int64())
def do_work(x):
    x = x.to_pylist()
    # my laptop has 12 cores, so 12 workers will be spawned to run this udf.
    if x[0] % 12 == 0:
        print("doing a lot of work on ", x)
        time.sleep(1)
    else:
        print("doing a little work on ", x)
        time.sleep(0.1)
    return x


daft.from_pydict({"a": [i for i in range(48)]})
    .with_column("b", do_work(col("a")))
    .agg(col("b").sum()) # agg does not require ordering
    .collect()

This script is 4x faster now.

@colin-ho colin-ho closed this Nov 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant