Skip to content

Commit

Permalink
dispatcher_spawner
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Nov 13, 2024
1 parent e20bf09 commit d5032be
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 83 deletions.
80 changes: 44 additions & 36 deletions src/daft-local-execution/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::sync::Arc;

use common_error::DaftResult;
use common_runtime::RuntimeTask;
use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;

use crate::{
buffer::RowBasedBuffer,
channel::{create_channel, Receiver, Sender},
runtime_stats::CountingReceiver,
ExecutionRuntimeHandle,
};

/// The `Dispatcher` trait defines the interface for dispatching morsels to workers.
Expand All @@ -23,14 +23,17 @@ use crate::{
///
/// Implementations must spawn a task on the runtime handle that reads from the
/// input receivers and distributes morsels to the worker receivers.
pub(crate) trait Dispatcher {
fn dispatch(
pub(crate) trait DispatcherSpawner {
fn spawn_dispatcher(
&self,
input_receivers: Vec<CountingReceiver>,
num_workers: usize,
runtime_handle: &mut ExecutionRuntimeHandle,
name: &'static str,
) -> Vec<Receiver<Arc<MicroPartition>>>;
) -> SpawnedDispatcherResult;
}

pub(crate) struct SpawnedDispatcherResult {
pub receivers: Vec<Receiver<Arc<MicroPartition>>>,
pub dispatch_handle: RuntimeTask<DaftResult<()>>,
}

/// A dispatcher that distributes morsels to workers in a round-robin fashion.
Expand Down Expand Up @@ -85,22 +88,23 @@ impl RoundRobinDispatcher {
}
}

impl Dispatcher for RoundRobinDispatcher {
fn dispatch(
impl DispatcherSpawner for RoundRobinDispatcher {
fn spawn_dispatcher(
&self,
input_receivers: Vec<CountingReceiver>,
num_workers: usize,
runtime_handle: &mut ExecutionRuntimeHandle,
name: &'static str,
) -> Vec<Receiver<Arc<MicroPartition>>> {
) -> SpawnedDispatcherResult {
let (worker_senders, worker_receivers): (Vec<_>, Vec<_>) =
(0..num_workers).map(|_| create_channel(1)).unzip();
let morsel_size = self.morsel_size;
runtime_handle.spawn(
Self::dispatch_inner(worker_senders, input_receivers, morsel_size),
name,
);
worker_receivers
let current_handle = tokio::runtime::Handle::current();
let dispatcher_task = RuntimeTask::new(&current_handle, async move {
Self::dispatch_inner(worker_senders, input_receivers, morsel_size).await
});
SpawnedDispatcherResult {
receivers: worker_receivers,
dispatch_handle: dispatcher_task,
}
}
}

Expand Down Expand Up @@ -149,22 +153,25 @@ impl UnorderedDispatcher {
}
}

impl Dispatcher for UnorderedDispatcher {
fn dispatch(
impl DispatcherSpawner for UnorderedDispatcher {
fn spawn_dispatcher(
&self,
receiver: Vec<CountingReceiver>,
num_workers: usize,
runtime_handle: &mut ExecutionRuntimeHandle,
name: &'static str,
) -> Vec<Receiver<Arc<MicroPartition>>> {
) -> SpawnedDispatcherResult {
let (worker_sender, worker_receiver) = create_channel(num_workers);
let worker_receivers = vec![worker_receiver; num_workers];
let morsel_size = self.morsel_size;
runtime_handle.spawn(
Self::dispatch_inner(worker_sender, receiver, morsel_size),
name,
);
worker_receivers

let current_handle = tokio::runtime::Handle::current();
let dispatcher_task = RuntimeTask::new(&current_handle, async move {
Self::dispatch_inner(worker_sender, receiver, morsel_size).await
});

SpawnedDispatcherResult {
receivers: worker_receivers,
dispatch_handle: dispatcher_task,
}
}
}

Expand Down Expand Up @@ -199,21 +206,22 @@ impl PartitionedDispatcher {
}
}

impl Dispatcher for PartitionedDispatcher {
fn dispatch(
impl DispatcherSpawner for PartitionedDispatcher {
fn spawn_dispatcher(
&self,
input_receivers: Vec<CountingReceiver>,
num_workers: usize,
runtime_handle: &mut ExecutionRuntimeHandle,
name: &'static str,
) -> Vec<Receiver<Arc<MicroPartition>>> {
) -> SpawnedDispatcherResult {
let (worker_senders, worker_receivers): (Vec<_>, Vec<_>) =
(0..num_workers).map(|_| create_channel(1)).unzip();
let partition_by = self.partition_by.clone();
runtime_handle.spawn(
Self::dispatch_inner(worker_senders, input_receivers, partition_by),
name,
);
worker_receivers
let current_handle = tokio::runtime::Handle::current();
let dispatcher_task = RuntimeTask::new(&current_handle, async move {
Self::dispatch_inner(worker_senders, input_receivers, partition_by).await
});
SpawnedDispatcherResult {
receivers: worker_receivers,
dispatch_handle: dispatcher_task,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,11 @@ impl IntermediateOperator for ActorPoolProjectOperator {
self.concurrency
}

fn make_dispatcher(
fn dispatcher_spawner(
&self,
runtime_handle: &crate::ExecutionRuntimeHandle,
maintain_order: bool,
) -> Arc<dyn crate::dispatcher::Dispatcher> {
) -> Arc<dyn crate::dispatcher::DispatcherSpawner> {
if maintain_order {
Arc::new(RoundRobinDispatcher::new(Some(
self.batch_size
Expand Down
22 changes: 10 additions & 12 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ use common_display::tree::TreeDisplay;
use common_error::DaftResult;
use common_runtime::{get_compute_runtime, RuntimeRef};
use daft_micropartition::MicroPartition;
use futures::FutureExt;
use tracing::{info_span, instrument};

use crate::{
channel::{
create_channel, create_ordering_aware_receiver_channel, OrderingAwareReceiver, Receiver,
Sender,
},
dispatcher::{Dispatcher, RoundRobinDispatcher, UnorderedDispatcher},
dispatcher::{DispatcherSpawner, RoundRobinDispatcher, UnorderedDispatcher},
pipeline::PipelineNode,
runtime_stats::{CountingReceiver, CountingSender, RuntimeStatsContext},
ExecutionRuntimeHandle, OperatorOutput, NUM_CPUS,
Expand Down Expand Up @@ -53,11 +54,11 @@ pub trait IntermediateOperator: Send + Sync {
*NUM_CPUS
}

fn make_dispatcher(
fn dispatcher_spawner(
&self,
runtime_handle: &ExecutionRuntimeHandle,
maintain_order: bool,
) -> Arc<dyn Dispatcher> {
) -> Arc<dyn DispatcherSpawner> {
if maintain_order {
Arc::new(RoundRobinDispatcher::new(Some(
runtime_handle.default_morsel_size(),
Expand Down Expand Up @@ -212,17 +213,14 @@ impl PipelineNode for IntermediateNode {
let (destination_sender, destination_receiver) = create_channel(1);
let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone());

let dispatcher = self
let dispatcher_result = self
.intermediate_op
.make_dispatcher(runtime_handle, maintain_order);
let input_receivers = dispatcher.dispatch(
child_result_receivers,
num_workers,
runtime_handle,
self.name(),
);
.dispatcher_spawner(runtime_handle, maintain_order)
.spawn_dispatcher(child_result_receivers, num_workers);
runtime_handle.spawn(dispatcher_result.dispatch_handle.map(|r| r?), self.name());

let mut output_receiver =
self.spawn_workers(input_receivers, runtime_handle, maintain_order);
self.spawn_workers(dispatcher_result.receivers, runtime_handle, maintain_order);
runtime_handle.spawn(
async move {
while let Some(morsel) = output_receiver.recv().await {
Expand Down
22 changes: 12 additions & 10 deletions src/daft-local-execution/src/sinks/blocking_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use common_display::tree::TreeDisplay;
use common_error::DaftResult;
use common_runtime::{get_compute_runtime, RuntimeRef};
use daft_micropartition::MicroPartition;
use futures::FutureExt;
use snafu::ResultExt;
use tracing::{info_span, instrument};

use crate::{
channel::{create_channel, Receiver},
dispatcher::{Dispatcher, UnorderedDispatcher},
dispatcher::{DispatcherSpawner, UnorderedDispatcher},
pipeline::PipelineNode,
runtime_stats::{CountingReceiver, CountingSender, RuntimeStatsContext},
ExecutionRuntimeHandle, JoinSnafu, OperatorOutput, TaskSet,
Expand Down Expand Up @@ -41,7 +42,10 @@ pub trait BlockingSink: Send + Sync {
) -> BlockingSinkFinalizeResult;
fn name(&self) -> &'static str;
fn make_state(&self) -> DaftResult<Box<dyn BlockingSinkState>>;
fn make_dispatcher(&self, runtime_handle: &ExecutionRuntimeHandle) -> Arc<dyn Dispatcher> {
fn dispatcher_spawner(
&self,
runtime_handle: &ExecutionRuntimeHandle,
) -> Arc<dyn DispatcherSpawner> {
Arc::new(UnorderedDispatcher::new(Some(
runtime_handle.default_morsel_size(),
)))
Expand Down Expand Up @@ -151,20 +155,18 @@ impl PipelineNode for BlockingSinkNode {
let op = self.op.clone();
let runtime_stats = self.runtime_stats.clone();
let num_workers = op.max_concurrency();
let dispatcher = op.make_dispatcher(runtime_handle);
let input_receivers = dispatcher.dispatch(
vec![counting_receiver],
num_workers,
runtime_handle,
self.name(),
);

let dispatcher_result = op
.dispatcher_spawner(runtime_handle)
.spawn_dispatcher(vec![counting_receiver], num_workers);
runtime_handle.spawn(dispatcher_result.dispatch_handle.map(|r| r?), self.name);

runtime_handle.spawn(
async move {
let mut task_set = TaskSet::new();
Self::spawn_workers(
op.clone(),
input_receivers,
dispatcher_result.receivers,
&mut task_set,
runtime_stats.clone(),
);
Expand Down
6 changes: 3 additions & 3 deletions src/daft-local-execution/src/sinks/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::streaming_sink::{
StreamingSinkState,
};
use crate::{
dispatcher::{Dispatcher, RoundRobinDispatcher, UnorderedDispatcher},
dispatcher::{DispatcherSpawner, RoundRobinDispatcher, UnorderedDispatcher},
ExecutionRuntimeHandle, NUM_CPUS,
};

Expand Down Expand Up @@ -56,11 +56,11 @@ impl StreamingSink for ConcatSink {
*NUM_CPUS
}

fn make_dispatcher(
fn dispatcher_spawner(
&self,
runtime_handle: &ExecutionRuntimeHandle,
maintain_order: bool,
) -> Arc<dyn Dispatcher> {
) -> Arc<dyn DispatcherSpawner> {
if maintain_order {
Arc::new(RoundRobinDispatcher::new(Some(
runtime_handle.default_morsel_size(),
Expand Down
6 changes: 3 additions & 3 deletions src/daft-local-execution/src/sinks/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::streaming_sink::{
StreamingSinkState,
};
use crate::{
dispatcher::{Dispatcher, UnorderedDispatcher},
dispatcher::{DispatcherSpawner, UnorderedDispatcher},
ExecutionRuntimeHandle,
};

Expand Down Expand Up @@ -101,11 +101,11 @@ impl StreamingSink for LimitSink {
1
}

fn make_dispatcher(
fn dispatcher_spawner(
&self,
_runtime_handle: &ExecutionRuntimeHandle,
_maintain_order: bool,
) -> Arc<dyn Dispatcher> {
) -> Arc<dyn DispatcherSpawner> {
// LimitSink should be greedy, and accept all input as soon as possible.
// It is also not concurrent, so we don't need to worry about ordering.
Arc::new(UnorderedDispatcher::new(None))
Expand Down
6 changes: 3 additions & 3 deletions src/daft-local-execution/src/sinks/outer_hash_join_probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::{
},
};
use crate::{
dispatcher::{Dispatcher, RoundRobinDispatcher, UnorderedDispatcher},
dispatcher::{DispatcherSpawner, RoundRobinDispatcher, UnorderedDispatcher},
ExecutionRuntimeHandle,
};

Expand Down Expand Up @@ -471,11 +471,11 @@ impl StreamingSink for OuterHashJoinProbeSink {
}
}

fn make_dispatcher(
fn dispatcher_spawner(
&self,
runtime_handle: &ExecutionRuntimeHandle,
maintain_order: bool,
) -> Arc<dyn Dispatcher> {
) -> Arc<dyn DispatcherSpawner> {
if maintain_order {
Arc::new(RoundRobinDispatcher::new(Some(
runtime_handle.default_morsel_size(),
Expand Down
22 changes: 11 additions & 11 deletions src/daft-local-execution/src/sinks/streaming_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use common_display::tree::TreeDisplay;
use common_error::DaftResult;
use common_runtime::{get_compute_runtime, RuntimeRef};
use daft_micropartition::MicroPartition;
use futures::FutureExt;
use snafu::ResultExt;
use tracing::{info_span, instrument};

Expand All @@ -12,7 +13,7 @@ use crate::{
create_channel, create_ordering_aware_receiver_channel, OrderingAwareReceiver, Receiver,
Sender,
},
dispatcher::Dispatcher,
dispatcher::DispatcherSpawner,
pipeline::PipelineNode,
runtime_stats::{CountingReceiver, CountingSender, RuntimeStatsContext},
ExecutionRuntimeHandle, JoinSnafu, OperatorOutput, TaskSet, NUM_CPUS,
Expand Down Expand Up @@ -63,11 +64,11 @@ pub trait StreamingSink: Send + Sync {
*NUM_CPUS
}

fn make_dispatcher(
fn dispatcher_spawner(
&self,
runtime_handle: &ExecutionRuntimeHandle,
maintain_order: bool,
) -> Arc<dyn Dispatcher>;
) -> Arc<dyn DispatcherSpawner>;
}

pub struct StreamingSinkNode {
Expand Down Expand Up @@ -210,19 +211,18 @@ impl PipelineNode for StreamingSinkNode {
let op = self.op.clone();
let runtime_stats = self.runtime_stats.clone();
let num_workers = op.max_concurrency();
let dispatcher = op.make_dispatcher(runtime_handle, maintain_order);
let input_receivers = dispatcher.dispatch(
child_result_receivers,
num_workers,
runtime_handle,
self.name(),
);

let dispatcher = op
.dispatcher_spawner(runtime_handle, maintain_order)
.spawn_dispatcher(child_result_receivers, num_workers);
runtime_handle.spawn(dispatcher.dispatch_handle.map(|r| r?), self.name());

runtime_handle.spawn(
async move {
let mut task_set = TaskSet::new();
let mut output_receiver = Self::spawn_workers(
op.clone(),
input_receivers,
dispatcher.receivers,
&mut task_set,
runtime_stats.clone(),
maintain_order,
Expand Down
Loading

0 comments on commit d5032be

Please sign in to comment.