Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Compute pool for native executor #2986

Merged
merged 15 commits into from
Oct 23, 2024
Merged

[FEAT] Compute pool for native executor #2986

merged 15 commits into from
Oct 23, 2024

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Oct 2, 2024

Create a multithreaded compute runtime for swordfish compute tasks. Switch query runtime to be single threaded, and use IO pool for scan task streams.

Additionally, adds in a tokio_select together with the tokio::signal::ctrlc and main async execution loop so that queries can be cancelled.

import os
import daft
import numpy
import time
import psutil

current_process = psutil.Process(os.getpid())

daft.set_execution_config(enable_native_executor=True, default_morsel_size=1)
dfs = [
    iter(
        daft.from_pydict({"a": numpy.random.rand(10)}).with_column(
            "plus_one", daft.col("a") + 1
        )
    )
    for _ in range(10)
]
while True:
    for i, df in enumerate(dfs):
        time.sleep(0.1)
        try:
            print("threads: ", current_process.num_threads())
            print(next(df))
        except StopIteration:
            dfs.pop(i)
    if not dfs:
        break

If you run this script you can see that the number of threads increases by only 1 per dataframe.

TODO:

@github-actions github-actions bot added the enhancement New feature or request label Oct 2, 2024
Copy link

codspeed-hq bot commented Oct 2, 2024

CodSpeed Performance Report

Merging #2986 will not alter performance

Comparing colin/compute-pool (7aa7b61) with main (4ec76ce)

Summary

✅ 17 untouched benchmarks

Copy link

codecov bot commented Oct 2, 2024

Codecov Report

Attention: Patch coverage is 81.17647% with 64 lines in your changes missing coverage. Please review.

Project coverage is 78.70%. Comparing base (4ec76ce) to head (7aa7b61).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-local-execution/src/sources/scan_task.rs 0.00% 39 Missing ⚠️
src/common/runtime/src/lib.rs 88.59% 13 Missing ⚠️
...-execution/src/intermediate_ops/intermediate_op.rs 87.50% 3 Missing ⚠️
.../src/intermediate_ops/anti_semi_hash_join_probe.rs 86.66% 2 Missing ⚠️
src/daft-local-execution/src/run.rs 80.00% 2 Missing ⚠️
src/daft-csv/src/metadata.rs 50.00% 1 Missing ⚠️
src/daft-io/src/python.rs 50.00% 1 Missing ⚠️
src/daft-json/src/schema.rs 50.00% 1 Missing ⚠️
src/daft-local-execution/src/sinks/concat.rs 90.90% 1 Missing ⚠️
src/daft-parquet/src/read.rs 90.90% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2986      +/-   ##
==========================================
+ Coverage   78.66%   78.70%   +0.03%     
==========================================
  Files         618      619       +1     
  Lines       73219    73412     +193     
==========================================
+ Hits        57601    57776     +175     
- Misses      15618    15636      +18     
Files with missing lines Coverage Δ
src/daft-csv/src/read.rs 99.33% <100.00%> (ø)
src/daft-functions/src/tokenize/bpe.rs 96.35% <100.00%> (ø)
src/daft-functions/src/uri/download.rs 83.46% <100.00%> (ø)
src/daft-functions/src/uri/upload.rs 64.63% <100.00%> (ø)
src/daft-io/src/lib.rs 71.21% <ø> (-3.33%) ⬇️
src/daft-io/src/s3_like.rs 66.41% <ø> (ø)
src/daft-json/src/read.rs 95.42% <100.00%> (ø)
...-local-execution/src/intermediate_ops/aggregate.rs 100.00% <ø> (ø)
...ft-local-execution/src/intermediate_ops/explode.rs 100.00% <ø> (ø)
...aft-local-execution/src/intermediate_ops/filter.rs 100.00% <ø> (ø)
... and 23 more

... and 2 files with indirect coverage changes

@colin-ho colin-ho marked this pull request as ready for review October 2, 2024 21:32
@colin-ho colin-ho requested a review from samster25 October 7, 2024 01:44
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("Executor-Worker-{}", id)
})
Copy link
Contributor Author

@colin-ho colin-ho Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: look into thread priority: https://docs.rs/thread-priority/latest/thread_priority/, we could potentially set priorities for compute and io pool.

Would do it on https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.on_thread_start

@colin-ho colin-ho mentioned this pull request Oct 18, 2024
src/daft-io/src/lib.rs Outdated Show resolved Hide resolved
src/daft-io/src/lib.rs Outdated Show resolved Hide resolved
src/daft-local-execution/src/lib.rs Outdated Show resolved Hide resolved
src/daft-local-execution/src/run.rs Outdated Show resolved Hide resolved
src/daft-local-execution/src/run.rs Outdated Show resolved Hide resolved
@colin-ho colin-ho requested a review from samster25 October 22, 2024 21:48
src/common/runtime/src/lib.rs Show resolved Hide resolved
@@ -377,7 +380,8 @@ impl StreamingSink for OuterHashJoinProbeSink {
Ok(StreamingSinkOutput::NeedMoreInput(None))
}
_ => {
let state = state
let mut guard = state.inner.lock().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can prob put this as a method on StreamingSinkState that does the following:

  • as_any
  • downcasts
  • error message
    like fn get_state_mut<T>(&self) -> &mut T

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opted for

pub(crate) fn with_state_mut<T: DynStreamingSinkState + 'static, F, R>(&self, f: F) -> R
    where
        F: FnOnce(&mut T) -> R,

since returning &mut T will outlive the lifetime of the mutex guard.

@colin-ho colin-ho merged commit 6569cb6 into main Oct 23, 2024
42 checks passed
@colin-ho colin-ho deleted the colin/compute-pool branch October 23, 2024 21:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants