From 4106e79161f2a21879428fef1e553786e0159f23 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 19 Sep 2024 12:01:53 -0700 Subject: [PATCH 01/14] initial implementation --- Cargo.lock | 1 + src/arrow2/src/bitmap/mutable.rs | 19 ++++ src/daft-local-execution/Cargo.toml | 1 + src/daft-local-execution/src/channel.rs | 69 +++++++------- .../src/intermediate_ops/intermediate_op.rs | 90 +++++++++++-------- src/daft-local-execution/src/runtime_stats.rs | 7 +- .../src/sinks/blocking_sink.rs | 7 +- .../src/sources/source.rs | 4 +- 8 files changed, 113 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c57309e10f..9445921e93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1952,6 +1952,7 @@ dependencies = [ name = "daft-local-execution" version = "0.3.0-dev0" dependencies = [ + "arrow2", "common-daft-config", "common-display", "common-error", diff --git a/src/arrow2/src/bitmap/mutable.rs b/src/arrow2/src/bitmap/mutable.rs index cb77decd84..edf373a506 100644 --- a/src/arrow2/src/bitmap/mutable.rs +++ b/src/arrow2/src/bitmap/mutable.rs @@ -325,6 +325,25 @@ impl MutableBitmap { pub(crate) fn bitchunks_exact_mut(&mut self) -> BitChunksExactMut { BitChunksExactMut::new(&mut self.buffer, self.length) } + + pub fn or(&self, other: &MutableBitmap) -> MutableBitmap { + assert_eq!( + self.length, other.length, + "Bitmaps must have the same length" + ); + + let new_buffer: Vec = self + .buffer + .iter() + .zip(other.buffer.iter()) + .map(|(&a, &b)| a | b) // Apply bitwise OR on each pair of bytes + .collect(); + + MutableBitmap { + buffer: new_buffer, + length: self.length, + } + } } impl From for Bitmap { diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index cd061c1c35..16ff80bfb1 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -1,4 +1,5 @@ [dependencies] +arrow2 = {workspace = true} common-daft-config = {path = "../common/daft-config", default-features = false} common-display = {path = "../common/display", default-features = false} common-error = {path = "../common/error", default-features = false} diff --git a/src/daft-local-execution/src/channel.rs b/src/daft-local-execution/src/channel.rs index f16e3bd061..e3428f7b4e 100644 --- a/src/daft-local-execution/src/channel.rs +++ b/src/daft-local-execution/src/channel.rs @@ -13,8 +13,8 @@ pub fn create_channel(buffer_size: usize) -> (Sender, Receiver) { } pub struct PipelineChannel { - sender: PipelineSender, - receiver: PipelineReceiver, + sender: Sender, + receiver: Receiver, } impl PipelineChannel { @@ -39,54 +39,47 @@ impl PipelineChannel { } } - pub(crate) fn get_next_sender_with_stats( - &mut self, - rt: &Arc, - ) -> CountingSender { - CountingSender::new(self.get_next_sender(), rt.clone()) + pub(crate) fn get_receiver_with_stats( + self, + stats: &Arc, + ) -> CountingReceiver { + CountingReceiver::new(self.receiver, stats.clone()) } - pub fn get_receiver(self) -> PipelineReceiver { + pub(crate) fn get_receiver(self) -> Receiver { self.receiver } - - pub(crate) fn get_receiver_with_stats(self, rt: &Arc) -> CountingReceiver { - CountingReceiver::new(self.get_receiver(), rt.clone()) - } -} - -pub enum PipelineSender { - InOrder(RoundRobinSender), - OutOfOrder(Sender), } -pub struct RoundRobinSender { - senders: Vec>, - curr_sender_idx: usize, -} - -impl RoundRobinSender { - pub fn new(senders: Vec>) -> Self { - Self { - senders, - curr_sender_idx: 0, +pub(crate) fn make_ordering_aware_channel( + buffer_size: usize, + ordered: bool, +) -> (Vec>, OrderingAwareReceiver) { + match ordered { + true => { + let (senders, receivers) = (0..buffer_size).map(|_| create_channel(1)).unzip(); + ( + senders, + OrderingAwareReceiver::Ordered(RoundRobinReceiver::new(receivers)), + ) + } + false => { + let (sender, receiver) = create_channel(buffer_size); + ( + (0..buffer_size).map(|_| sender.clone()).collect(), + OrderingAwareReceiver::Unordered(receiver), + ) } - } - - pub fn get_next_sender(&mut self) -> Sender { - let next_idx = self.curr_sender_idx; - self.curr_sender_idx = (next_idx + 1) % self.senders.len(); - self.senders[next_idx].clone() } } -pub enum PipelineReceiver { - InOrder(RoundRobinReceiver), - OutOfOrder(Receiver), +pub enum OrderingAwareReceiver { + Ordered(RoundRobinReceiver), + Unordered(Receiver), } -impl PipelineReceiver { - pub async fn recv(&mut self) -> Option { +impl OrderingAwareReceiver { + pub async fn recv(&mut self) -> Option { match self { Self::InOrder(rr) => rr.recv().await, Self::OutOfOrder(r) => r.recv().await, diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index 7b0267c7c0..59ff22e3e0 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -7,9 +7,9 @@ use tracing::{info_span, instrument}; use super::buffer::OperatorBuffer; use crate::{ - channel::{create_channel, PipelineChannel, Receiver, Sender}, + channel::{create_channel, make_ordering_aware_channel, PipelineChannel, Receiver, Sender}, pipeline::{PipelineNode, PipelineResultType}, - runtime_stats::{CountingReceiver, CountingSender, RuntimeStatsContext}, + runtime_stats::{CountingReceiver, RuntimeStatsContext}, ExecutionRuntimeHandle, NUM_CPUS, }; @@ -68,28 +68,28 @@ impl IntermediateNode { } #[instrument(level = "info", skip_all, name = "IntermediateOperator::run_worker")] - pub async fn run_worker( + async fn run_worker( op: Arc, - mut receiver: Receiver<(usize, PipelineResultType)>, - sender: CountingSender, + mut input_receiver: Receiver<(usize, PipelineResultType)>, + output_sender: Sender>, rt_context: Arc, ) -> DaftResult<()> { let span = info_span!("IntermediateOp::execute"); let mut state = op.make_state(); - while let Some((idx, morsel)) = receiver.recv().await { + while let Some((idx, morsel)) = input_receiver.recv().await { loop { let result = rt_context.in_span(&span, || op.execute(idx, &morsel, state.as_mut()))?; match result { IntermediateOperatorResult::NeedMoreInput(Some(mp)) => { - let _ = sender.send(mp.into()).await; + let _ = output_sender.send(mp).await; break; } IntermediateOperatorResult::NeedMoreInput(None) => { break; } IntermediateOperatorResult::HasMoreOutput(mp) => { - let _ = sender.send(mp.into()).await; + let _ = output_sender.send(mp).await; } } } @@ -97,32 +97,24 @@ impl IntermediateNode { Ok(()) } - pub fn spawn_workers( - &self, - num_workers: usize, - destination_channel: &mut PipelineChannel, - runtime_handle: &mut ExecutionRuntimeHandle, - ) -> Vec> { - let mut worker_senders = Vec::with_capacity(num_workers); - for _ in 0..num_workers { - let (worker_sender, worker_receiver) = create_channel(1); - let destination_sender = - destination_channel.get_next_sender_with_stats(&self.runtime_stats); - runtime_handle.spawn( - Self::run_worker( - self.intermediate_op.clone(), - worker_receiver, - destination_sender, - self.runtime_stats.clone(), - ), - self.intermediate_op.name(), - ); - worker_senders.push(worker_sender); + fn spawn_workers( + op: Arc, + input_receivers: Vec>, + output_senders: Vec>>, + worker_set: &mut tokio::task::JoinSet>, + stats: Arc, + ) { + for (input_receiver, output_sender) in input_receivers.into_iter().zip(output_senders) { + worker_set.spawn(Self::run_worker( + op.clone(), + input_receiver, + output_sender, + stats.clone(), + )); } - worker_senders } - pub async fn send_to_workers( + async fn forward_input_to_workers( receivers: Vec, worker_senders: Vec>, morsel_size: usize, @@ -203,16 +195,36 @@ impl PipelineNode for IntermediateNode { child_result_receivers .push(child_result_channel.get_receiver_with_stats(&self.runtime_stats)); } - let mut destination_channel = PipelineChannel::new(*NUM_CPUS, maintain_order); - let worker_senders = - self.spawn_workers(*NUM_CPUS, &mut destination_channel, runtime_handle); + let destination_channel = PipelineChannel::new(); + let destination_sender = destination_channel.get_sender_with_stats(&self.runtime_stats); + + let op = self.intermediate_op.clone(); + let stats = self.runtime_stats.clone(); + let morsel_size = runtime_handle.default_morsel_size(); runtime_handle.spawn( - Self::send_to_workers( - child_result_receivers, - worker_senders, - runtime_handle.default_morsel_size(), - ), + async move { + let num_workers = *NUM_CPUS; + let (input_senders, input_receivers) = + (0..num_workers).map(|_| create_channel(1)).unzip(); + let (output_senders, mut output_receiver) = + make_ordering_aware_channel(num_workers, maintain_order); + let mut worker_set = tokio::task::JoinSet::new(); + Self::spawn_workers( + op.clone(), + input_receivers, + output_senders, + &mut worker_set, + stats.clone(), + ); + Self::forward_input_to_workers(child_result_receivers, input_senders, morsel_size) + .await?; + + while let Some(morsel) = output_receiver.recv().await { + let _ = destination_sender.send(morsel.into()).await; + } + Ok(()) + }, self.intermediate_op.name(), ); Ok(destination_channel) diff --git a/src/daft-local-execution/src/runtime_stats.rs b/src/daft-local-execution/src/runtime_stats.rs index 566d253e9c..f3b3f63e87 100644 --- a/src/daft-local-execution/src/runtime_stats.rs +++ b/src/daft-local-execution/src/runtime_stats.rs @@ -8,7 +8,7 @@ use std::{ use tokio::sync::mpsc::error::SendError; use crate::{ - channel::{PipelineReceiver, Sender}, + channel::{Receiver, Sender}, pipeline::PipelineResultType, }; @@ -140,7 +140,10 @@ pub struct CountingReceiver { } impl CountingReceiver { - pub(crate) fn new(receiver: PipelineReceiver, rt: Arc) -> Self { + pub(crate) fn new( + receiver: Receiver, + rt: Arc, + ) -> Self { Self { receiver, rt } } #[inline] diff --git a/src/daft-local-execution/src/sinks/blocking_sink.rs b/src/daft-local-execution/src/sinks/blocking_sink.rs index dc38e1df34..1cdc1fe8b0 100644 --- a/src/daft-local-execution/src/sinks/blocking_sink.rs +++ b/src/daft-local-execution/src/sinks/blocking_sink.rs @@ -76,7 +76,7 @@ impl PipelineNode for BlockingSinkNode { fn start( &mut self, - maintain_order: bool, + _maintain_order: bool, runtime_handle: &mut ExecutionRuntimeHandle, ) -> crate::Result { let child = self.child.as_mut(); @@ -84,9 +84,8 @@ impl PipelineNode for BlockingSinkNode { .start(false, runtime_handle)? .get_receiver_with_stats(&self.runtime_stats); - let mut destination_channel = PipelineChannel::new(1, maintain_order); - let destination_sender = - destination_channel.get_next_sender_with_stats(&self.runtime_stats); + let destination_channel = PipelineChannel::new(); + let destination_sender = destination_channel.get_sender_with_stats(&self.runtime_stats); let op = self.op.clone(); let rt_context = self.runtime_stats.clone(); runtime_handle.spawn( diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index 8c55401db2..c39aa0e38f 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -74,8 +74,8 @@ impl PipelineNode for SourceNode { self.source .get_data(maintain_order, runtime_handle, self.io_stats.clone())?; - let mut channel = PipelineChannel::new(1, maintain_order); - let counting_sender = channel.get_next_sender_with_stats(&self.runtime_stats); + let channel = PipelineChannel::new(); + let counting_sender = channel.get_sender_with_stats(&self.runtime_stats); runtime_handle.spawn( async move { while let Some(part) = source_stream.next().await { From 8d65a53c1dae2c9df5811c5c62221b31f6a7ccb2 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 19 Sep 2024 13:26:05 -0700 Subject: [PATCH 02/14] clean up lil bit and fix test --- src/daft-local-execution/src/channel.rs | 2 +- .../src/intermediate_ops/intermediate_op.rs | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/daft-local-execution/src/channel.rs b/src/daft-local-execution/src/channel.rs index e3428f7b4e..3c150a2dfc 100644 --- a/src/daft-local-execution/src/channel.rs +++ b/src/daft-local-execution/src/channel.rs @@ -51,7 +51,7 @@ impl PipelineChannel { } } -pub(crate) fn make_ordering_aware_channel( +pub(crate) fn create_ordering_aware_channel( buffer_size: usize, ordered: bool, ) -> (Vec>, OrderingAwareReceiver) { diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index 59ff22e3e0..8a35a86a42 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -3,14 +3,15 @@ use std::sync::Arc; use common_display::tree::TreeDisplay; use common_error::DaftResult; use daft_micropartition::MicroPartition; +use snafu::ResultExt; use tracing::{info_span, instrument}; use super::buffer::OperatorBuffer; use crate::{ - channel::{create_channel, make_ordering_aware_channel, PipelineChannel, Receiver, Sender}, + channel::{create_channel, create_ordering_aware_channel, PipelineChannel, Receiver, Sender}, pipeline::{PipelineNode, PipelineResultType}, runtime_stats::{CountingReceiver, RuntimeStatsContext}, - ExecutionRuntimeHandle, NUM_CPUS, + ExecutionRuntimeHandle, JoinSnafu, NUM_CPUS, }; pub trait IntermediateOperatorState: Send + Sync { @@ -208,7 +209,7 @@ impl PipelineNode for IntermediateNode { let (input_senders, input_receivers) = (0..num_workers).map(|_| create_channel(1)).unzip(); let (output_senders, mut output_receiver) = - make_ordering_aware_channel(num_workers, maintain_order); + create_ordering_aware_channel(num_workers, maintain_order); let mut worker_set = tokio::task::JoinSet::new(); Self::spawn_workers( op.clone(), @@ -223,6 +224,9 @@ impl PipelineNode for IntermediateNode { while let Some(morsel) = output_receiver.recv().await { let _ = destination_sender.send(morsel.into()).await; } + while let Some(result) = worker_set.join_next().await { + result.context(JoinSnafu)??; + } Ok(()) }, self.intermediate_op.name(), From c7ec6795e8e899b8d2d84ccbec5e74e211f6f461 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 19 Sep 2024 13:52:47 -0700 Subject: [PATCH 03/14] more cleanup --- .../src/intermediate_ops/intermediate_op.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index 8a35a86a42..200d1f5b22 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -9,9 +9,10 @@ use tracing::{info_span, instrument}; use super::buffer::OperatorBuffer; use crate::{ channel::{create_channel, create_ordering_aware_channel, PipelineChannel, Receiver, Sender}, + create_worker_set, pipeline::{PipelineNode, PipelineResultType}, runtime_stats::{CountingReceiver, RuntimeStatsContext}, - ExecutionRuntimeHandle, JoinSnafu, NUM_CPUS, + ExecutionRuntimeHandle, JoinSnafu, WorkerSet, NUM_CPUS, }; pub trait IntermediateOperatorState: Send + Sync { @@ -102,7 +103,7 @@ impl IntermediateNode { op: Arc, input_receivers: Vec>, output_senders: Vec>>, - worker_set: &mut tokio::task::JoinSet>, + worker_set: &mut WorkerSet>, stats: Arc, ) { for (input_receiver, output_sender) in input_receivers.into_iter().zip(output_senders) { @@ -210,7 +211,8 @@ impl PipelineNode for IntermediateNode { (0..num_workers).map(|_| create_channel(1)).unzip(); let (output_senders, mut output_receiver) = create_ordering_aware_channel(num_workers, maintain_order); - let mut worker_set = tokio::task::JoinSet::new(); + + let mut worker_set = create_worker_set(); Self::spawn_workers( op.clone(), input_receivers, @@ -218,6 +220,7 @@ impl PipelineNode for IntermediateNode { &mut worker_set, stats.clone(), ); + Self::forward_input_to_workers(child_result_receivers, input_senders, morsel_size) .await?; From 9408aad0d92ff95923f593143d8d91c627747125 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 19 Sep 2024 14:06:12 -0700 Subject: [PATCH 04/14] reduce code --- src/daft-local-execution/src/channel.rs | 69 +++++++------ .../src/intermediate_ops/intermediate_op.rs | 99 ++++++++----------- src/daft-local-execution/src/runtime_stats.rs | 7 +- .../src/sinks/blocking_sink.rs | 7 +- .../src/sources/source.rs | 4 +- 5 files changed, 86 insertions(+), 100 deletions(-) diff --git a/src/daft-local-execution/src/channel.rs b/src/daft-local-execution/src/channel.rs index 3c150a2dfc..f16e3bd061 100644 --- a/src/daft-local-execution/src/channel.rs +++ b/src/daft-local-execution/src/channel.rs @@ -13,8 +13,8 @@ pub fn create_channel(buffer_size: usize) -> (Sender, Receiver) { } pub struct PipelineChannel { - sender: Sender, - receiver: Receiver, + sender: PipelineSender, + receiver: PipelineReceiver, } impl PipelineChannel { @@ -39,47 +39,54 @@ impl PipelineChannel { } } - pub(crate) fn get_receiver_with_stats( - self, - stats: &Arc, - ) -> CountingReceiver { - CountingReceiver::new(self.receiver, stats.clone()) + pub(crate) fn get_next_sender_with_stats( + &mut self, + rt: &Arc, + ) -> CountingSender { + CountingSender::new(self.get_next_sender(), rt.clone()) } - pub(crate) fn get_receiver(self) -> Receiver { + pub fn get_receiver(self) -> PipelineReceiver { self.receiver } + + pub(crate) fn get_receiver_with_stats(self, rt: &Arc) -> CountingReceiver { + CountingReceiver::new(self.get_receiver(), rt.clone()) + } } -pub(crate) fn create_ordering_aware_channel( - buffer_size: usize, - ordered: bool, -) -> (Vec>, OrderingAwareReceiver) { - match ordered { - true => { - let (senders, receivers) = (0..buffer_size).map(|_| create_channel(1)).unzip(); - ( - senders, - OrderingAwareReceiver::Ordered(RoundRobinReceiver::new(receivers)), - ) - } - false => { - let (sender, receiver) = create_channel(buffer_size); - ( - (0..buffer_size).map(|_| sender.clone()).collect(), - OrderingAwareReceiver::Unordered(receiver), - ) +pub enum PipelineSender { + InOrder(RoundRobinSender), + OutOfOrder(Sender), +} + +pub struct RoundRobinSender { + senders: Vec>, + curr_sender_idx: usize, +} + +impl RoundRobinSender { + pub fn new(senders: Vec>) -> Self { + Self { + senders, + curr_sender_idx: 0, } } + + pub fn get_next_sender(&mut self) -> Sender { + let next_idx = self.curr_sender_idx; + self.curr_sender_idx = (next_idx + 1) % self.senders.len(); + self.senders[next_idx].clone() + } } -pub enum OrderingAwareReceiver { - Ordered(RoundRobinReceiver), - Unordered(Receiver), +pub enum PipelineReceiver { + InOrder(RoundRobinReceiver), + OutOfOrder(Receiver), } -impl OrderingAwareReceiver { - pub async fn recv(&mut self) -> Option { +impl PipelineReceiver { + pub async fn recv(&mut self) -> Option { match self { Self::InOrder(rr) => rr.recv().await, Self::OutOfOrder(r) => r.recv().await, diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index 200d1f5b22..7b0267c7c0 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -3,16 +3,14 @@ use std::sync::Arc; use common_display::tree::TreeDisplay; use common_error::DaftResult; use daft_micropartition::MicroPartition; -use snafu::ResultExt; use tracing::{info_span, instrument}; use super::buffer::OperatorBuffer; use crate::{ - channel::{create_channel, create_ordering_aware_channel, PipelineChannel, Receiver, Sender}, - create_worker_set, + channel::{create_channel, PipelineChannel, Receiver, Sender}, pipeline::{PipelineNode, PipelineResultType}, - runtime_stats::{CountingReceiver, RuntimeStatsContext}, - ExecutionRuntimeHandle, JoinSnafu, WorkerSet, NUM_CPUS, + runtime_stats::{CountingReceiver, CountingSender, RuntimeStatsContext}, + ExecutionRuntimeHandle, NUM_CPUS, }; pub trait IntermediateOperatorState: Send + Sync { @@ -70,28 +68,28 @@ impl IntermediateNode { } #[instrument(level = "info", skip_all, name = "IntermediateOperator::run_worker")] - async fn run_worker( + pub async fn run_worker( op: Arc, - mut input_receiver: Receiver<(usize, PipelineResultType)>, - output_sender: Sender>, + mut receiver: Receiver<(usize, PipelineResultType)>, + sender: CountingSender, rt_context: Arc, ) -> DaftResult<()> { let span = info_span!("IntermediateOp::execute"); let mut state = op.make_state(); - while let Some((idx, morsel)) = input_receiver.recv().await { + while let Some((idx, morsel)) = receiver.recv().await { loop { let result = rt_context.in_span(&span, || op.execute(idx, &morsel, state.as_mut()))?; match result { IntermediateOperatorResult::NeedMoreInput(Some(mp)) => { - let _ = output_sender.send(mp).await; + let _ = sender.send(mp.into()).await; break; } IntermediateOperatorResult::NeedMoreInput(None) => { break; } IntermediateOperatorResult::HasMoreOutput(mp) => { - let _ = output_sender.send(mp).await; + let _ = sender.send(mp.into()).await; } } } @@ -99,24 +97,32 @@ impl IntermediateNode { Ok(()) } - fn spawn_workers( - op: Arc, - input_receivers: Vec>, - output_senders: Vec>>, - worker_set: &mut WorkerSet>, - stats: Arc, - ) { - for (input_receiver, output_sender) in input_receivers.into_iter().zip(output_senders) { - worker_set.spawn(Self::run_worker( - op.clone(), - input_receiver, - output_sender, - stats.clone(), - )); + pub fn spawn_workers( + &self, + num_workers: usize, + destination_channel: &mut PipelineChannel, + runtime_handle: &mut ExecutionRuntimeHandle, + ) -> Vec> { + let mut worker_senders = Vec::with_capacity(num_workers); + for _ in 0..num_workers { + let (worker_sender, worker_receiver) = create_channel(1); + let destination_sender = + destination_channel.get_next_sender_with_stats(&self.runtime_stats); + runtime_handle.spawn( + Self::run_worker( + self.intermediate_op.clone(), + worker_receiver, + destination_sender, + self.runtime_stats.clone(), + ), + self.intermediate_op.name(), + ); + worker_senders.push(worker_sender); } + worker_senders } - async fn forward_input_to_workers( + pub async fn send_to_workers( receivers: Vec, worker_senders: Vec>, morsel_size: usize, @@ -197,41 +203,16 @@ impl PipelineNode for IntermediateNode { child_result_receivers .push(child_result_channel.get_receiver_with_stats(&self.runtime_stats)); } + let mut destination_channel = PipelineChannel::new(*NUM_CPUS, maintain_order); - let destination_channel = PipelineChannel::new(); - let destination_sender = destination_channel.get_sender_with_stats(&self.runtime_stats); - - let op = self.intermediate_op.clone(); - let stats = self.runtime_stats.clone(); - let morsel_size = runtime_handle.default_morsel_size(); + let worker_senders = + self.spawn_workers(*NUM_CPUS, &mut destination_channel, runtime_handle); runtime_handle.spawn( - async move { - let num_workers = *NUM_CPUS; - let (input_senders, input_receivers) = - (0..num_workers).map(|_| create_channel(1)).unzip(); - let (output_senders, mut output_receiver) = - create_ordering_aware_channel(num_workers, maintain_order); - - let mut worker_set = create_worker_set(); - Self::spawn_workers( - op.clone(), - input_receivers, - output_senders, - &mut worker_set, - stats.clone(), - ); - - Self::forward_input_to_workers(child_result_receivers, input_senders, morsel_size) - .await?; - - while let Some(morsel) = output_receiver.recv().await { - let _ = destination_sender.send(morsel.into()).await; - } - while let Some(result) = worker_set.join_next().await { - result.context(JoinSnafu)??; - } - Ok(()) - }, + Self::send_to_workers( + child_result_receivers, + worker_senders, + runtime_handle.default_morsel_size(), + ), self.intermediate_op.name(), ); Ok(destination_channel) diff --git a/src/daft-local-execution/src/runtime_stats.rs b/src/daft-local-execution/src/runtime_stats.rs index f3b3f63e87..566d253e9c 100644 --- a/src/daft-local-execution/src/runtime_stats.rs +++ b/src/daft-local-execution/src/runtime_stats.rs @@ -8,7 +8,7 @@ use std::{ use tokio::sync::mpsc::error::SendError; use crate::{ - channel::{Receiver, Sender}, + channel::{PipelineReceiver, Sender}, pipeline::PipelineResultType, }; @@ -140,10 +140,7 @@ pub struct CountingReceiver { } impl CountingReceiver { - pub(crate) fn new( - receiver: Receiver, - rt: Arc, - ) -> Self { + pub(crate) fn new(receiver: PipelineReceiver, rt: Arc) -> Self { Self { receiver, rt } } #[inline] diff --git a/src/daft-local-execution/src/sinks/blocking_sink.rs b/src/daft-local-execution/src/sinks/blocking_sink.rs index 1cdc1fe8b0..dc38e1df34 100644 --- a/src/daft-local-execution/src/sinks/blocking_sink.rs +++ b/src/daft-local-execution/src/sinks/blocking_sink.rs @@ -76,7 +76,7 @@ impl PipelineNode for BlockingSinkNode { fn start( &mut self, - _maintain_order: bool, + maintain_order: bool, runtime_handle: &mut ExecutionRuntimeHandle, ) -> crate::Result { let child = self.child.as_mut(); @@ -84,8 +84,9 @@ impl PipelineNode for BlockingSinkNode { .start(false, runtime_handle)? .get_receiver_with_stats(&self.runtime_stats); - let destination_channel = PipelineChannel::new(); - let destination_sender = destination_channel.get_sender_with_stats(&self.runtime_stats); + let mut destination_channel = PipelineChannel::new(1, maintain_order); + let destination_sender = + destination_channel.get_next_sender_with_stats(&self.runtime_stats); let op = self.op.clone(); let rt_context = self.runtime_stats.clone(); runtime_handle.spawn( diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index c39aa0e38f..8c55401db2 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -74,8 +74,8 @@ impl PipelineNode for SourceNode { self.source .get_data(maintain_order, runtime_handle, self.io_stats.clone())?; - let channel = PipelineChannel::new(); - let counting_sender = channel.get_sender_with_stats(&self.runtime_stats); + let mut channel = PipelineChannel::new(1, maintain_order); + let counting_sender = channel.get_next_sender_with_stats(&self.runtime_stats); runtime_handle.spawn( async move { while let Some(part) = source_stream.next().await { From 3ef935cfa4dff1f2e155edd47226dbfaca658717 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Wed, 25 Sep 2024 15:41:29 -0700 Subject: [PATCH 05/14] bitmap improvements + probe state --- Cargo.lock | 1 - src/arrow2/src/bitmap/mutable.rs | 19 ------------------- src/daft-core/src/prelude.rs | 2 ++ src/daft-local-execution/Cargo.toml | 1 - 4 files changed, 2 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9445921e93..c57309e10f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1952,7 +1952,6 @@ dependencies = [ name = "daft-local-execution" version = "0.3.0-dev0" dependencies = [ - "arrow2", "common-daft-config", "common-display", "common-error", diff --git a/src/arrow2/src/bitmap/mutable.rs b/src/arrow2/src/bitmap/mutable.rs index edf373a506..cb77decd84 100644 --- a/src/arrow2/src/bitmap/mutable.rs +++ b/src/arrow2/src/bitmap/mutable.rs @@ -325,25 +325,6 @@ impl MutableBitmap { pub(crate) fn bitchunks_exact_mut(&mut self) -> BitChunksExactMut { BitChunksExactMut::new(&mut self.buffer, self.length) } - - pub fn or(&self, other: &MutableBitmap) -> MutableBitmap { - assert_eq!( - self.length, other.length, - "Bitmaps must have the same length" - ); - - let new_buffer: Vec = self - .buffer - .iter() - .zip(other.buffer.iter()) - .map(|(&a, &b)| a | b) // Apply bitwise OR on each pair of bytes - .collect(); - - MutableBitmap { - buffer: new_buffer, - length: self.length, - } - } } impl From for Bitmap { diff --git a/src/daft-core/src/prelude.rs b/src/daft-core/src/prelude.rs index 6f6ecaf5a5..9cae154a8d 100644 --- a/src/daft-core/src/prelude.rs +++ b/src/daft-core/src/prelude.rs @@ -5,6 +5,8 @@ // Re-export arrow2 bitmap pub use arrow2::bitmap; // Re-export core series structures +// Re-export arrow2 bitmap +pub use arrow2::bitmap; pub use daft_schema::schema::{Schema, SchemaRef}; // Re-export count mode enum diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index 16ff80bfb1..cd061c1c35 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -1,5 +1,4 @@ [dependencies] -arrow2 = {workspace = true} common-daft-config = {path = "../common/daft-config", default-features = false} common-display = {path = "../common/display", default-features = false} common-error = {path = "../common/error", default-features = false} From 9ddd42d9d14e956208b3957ad022cdad5af61bb2 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Wed, 25 Sep 2024 22:15:37 -0700 Subject: [PATCH 06/14] use mask filter --- src/daft-core/src/prelude.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/daft-core/src/prelude.rs b/src/daft-core/src/prelude.rs index 9cae154a8d..e0041ea1a8 100644 --- a/src/daft-core/src/prelude.rs +++ b/src/daft-core/src/prelude.rs @@ -7,6 +7,7 @@ pub use arrow2::bitmap; // Re-export core series structures // Re-export arrow2 bitmap pub use arrow2::bitmap; +// Re-export core series structures pub use daft_schema::schema::{Schema, SchemaRef}; // Re-export count mode enum From ff8d566fa22f0e0061d433e5656ef6762ebc6315 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Mon, 30 Sep 2024 13:35:12 -0700 Subject: [PATCH 07/14] concat --- src/daft-local-execution/src/pipeline.rs | 11 +- src/daft-local-execution/src/sinks/concat.rs | 110 +++++++++---------- tests/dataframe/test_concat.py | 7 -- tests/dataframe/test_transform.py | 6 - 4 files changed, 54 insertions(+), 80 deletions(-) diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index 4e397ff45d..761c6c61cc 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -152,12 +152,11 @@ pub fn physical_plan_to_pipeline( let child_node = physical_plan_to_pipeline(input, psets)?; StreamingSinkNode::new(Arc::new(sink), vec![child_node]).boxed() } - LocalPhysicalPlan::Concat(_) => { - todo!("concat") - // let sink = ConcatSink::new(); - // let left_child = physical_plan_to_pipeline(input, psets)?; - // let right_child = physical_plan_to_pipeline(other, psets)?; - // PipelineNode::double_sink(sink, left_child, right_child) + LocalPhysicalPlan::Concat(Concat { input, other, .. }) => { + let left_child = physical_plan_to_pipeline(input, psets)?; + let right_child = physical_plan_to_pipeline(other, psets)?; + let sink = crate::sinks::concat::ConcatSink {}; + StreamingSinkNode::new(Arc::new(sink), vec![left_child, right_child]).boxed() } LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate { input, diff --git a/src/daft-local-execution/src/sinks/concat.rs b/src/daft-local-execution/src/sinks/concat.rs index 010bed0aaf..058fa1fe58 100644 --- a/src/daft-local-execution/src/sinks/concat.rs +++ b/src/daft-local-execution/src/sinks/concat.rs @@ -1,61 +1,49 @@ -// use std::sync::Arc; - -// use common_error::DaftResult; -// use daft_micropartition::MicroPartition; -// use tracing::instrument; - -// use super::sink::{Sink, SinkResultType}; - -// #[derive(Clone)] -// pub struct ConcatSink { -// result_left: Vec>, -// result_right: Vec>, -// } - -// impl ConcatSink { -// pub fn new() -> Self { -// Self { -// result_left: Vec::new(), -// result_right: Vec::new(), -// } -// } - -// #[instrument(skip_all, name = "ConcatSink::sink")] -// fn sink_left(&mut self, input: &Arc) -> DaftResult { -// self.result_left.push(input.clone()); -// Ok(SinkResultType::NeedMoreInput) -// } - -// #[instrument(skip_all, name = "ConcatSink::sink")] -// fn sink_right(&mut self, input: &Arc) -> DaftResult { -// self.result_right.push(input.clone()); -// Ok(SinkResultType::NeedMoreInput) -// } -// } - -// impl Sink for ConcatSink { -// fn sink(&mut self, index: usize, input: &Arc) -> DaftResult { -// match index { -// 0 => self.sink_left(input), -// 1 => self.sink_right(input), -// _ => panic!("concat only supports 2 inputs, got {index}"), -// } -// } - -// fn in_order(&self) -> bool { -// true -// } - -// fn num_inputs(&self) -> usize { -// 2 -// } - -// #[instrument(skip_all, name = "ConcatSink::finalize")] -// fn finalize(self: Box) -> DaftResult>> { -// Ok(self -// .result_left -// .into_iter() -// .chain(self.result_right.into_iter()) -// .collect()) -// } -// } +use std::sync::Arc; + +use common_error::DaftResult; +use daft_micropartition::MicroPartition; +use tracing::instrument; + +use super::streaming_sink::{StreamingSink, StreamingSinkOutput, StreamingSinkState}; +use crate::pipeline::PipelineResultType; + +struct ConcatSinkState {} +impl StreamingSinkState for ConcatSinkState { + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } +} + +pub struct ConcatSink {} + +impl StreamingSink for ConcatSink { + #[instrument(skip_all, name = "ConcatSink::sink")] + fn execute( + &self, + _index: usize, + input: &PipelineResultType, + _state: &mut dyn StreamingSinkState, + ) -> DaftResult { + let input = input.as_data(); + Ok(StreamingSinkOutput::NeedMoreInput(Some(input.clone()))) + } + + fn name(&self) -> &'static str { + "Concat" + } + + fn finalize( + &self, + _states: Vec>, + ) -> DaftResult>> { + Ok(None) + } + + fn make_state(&self) -> Box { + Box::new(ConcatSinkState {}) + } + + fn max_concurrency(&self) -> usize { + 1 + } +} diff --git a/tests/dataframe/test_concat.py b/tests/dataframe/test_concat.py index 07e06df59c..f3caf56bb1 100644 --- a/tests/dataframe/test_concat.py +++ b/tests/dataframe/test_concat.py @@ -2,13 +2,6 @@ import pytest -from daft import context - -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", -) - def test_simple_concat(make_df): df1 = make_df({"foo": [1, 2, 3]}) diff --git a/tests/dataframe/test_transform.py b/tests/dataframe/test_transform.py index 277c378bad..a698b6e7fd 100644 --- a/tests/dataframe/test_transform.py +++ b/tests/dataframe/test_transform.py @@ -3,12 +3,6 @@ import pytest import daft -from daft import context - -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", -) def add_1(df): From fe2c0b179e1611ec473db729fe90a38d2b452fbb Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Mon, 21 Oct 2024 18:43:59 -0700 Subject: [PATCH 08/14] cleanup --- src/daft-core/src/prelude.rs | 3 --- src/daft-local-execution/src/pipeline.rs | 4 ++-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/daft-core/src/prelude.rs b/src/daft-core/src/prelude.rs index e0041ea1a8..6f6ecaf5a5 100644 --- a/src/daft-core/src/prelude.rs +++ b/src/daft-core/src/prelude.rs @@ -2,9 +2,6 @@ //! //! This module re-exports commonly used items from the Daft core library. -// Re-export arrow2 bitmap -pub use arrow2::bitmap; -// Re-export core series structures // Re-export arrow2 bitmap pub use arrow2::bitmap; // Re-export core series structures diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index 761c6c61cc..e67688938a 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -10,8 +10,8 @@ use daft_core::{ use daft_dsl::{col, join::get_common_join_keys, Expr}; use daft_micropartition::MicroPartition; use daft_physical_plan::{ - EmptyScan, Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, Pivot, - Project, Sample, Sort, UnGroupedAggregate, + Concat, EmptyScan, Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, + Pivot, Project, Sample, Sort, UnGroupedAggregate, }; use daft_plan::{populate_aggregation_stages, JoinType}; use daft_table::ProbeState; From 4ca982808991a8782af15aac794171087d5349d0 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Mon, 21 Oct 2024 21:15:15 -0700 Subject: [PATCH 09/14] add sample test --- tests/dataframe/test_sample.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/dataframe/test_sample.py b/tests/dataframe/test_sample.py index 791e2a2211..109b9f332b 100644 --- a/tests/dataframe/test_sample.py +++ b/tests/dataframe/test_sample.py @@ -2,8 +2,6 @@ import pytest -from daft import context - def test_sample_fraction(make_df, valid_data: list[dict[str, float]]) -> None: df = make_df(valid_data) @@ -100,10 +98,6 @@ def test_sample_without_replacement(make_df, valid_data: list[dict[str, float]]) assert pylist[0] != pylist[1] -@pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for concat", -) def test_sample_with_concat(make_df, valid_data: list[dict[str, float]]) -> None: df1 = make_df(valid_data) df2 = make_df(valid_data) From 5064839ff3df271f8a2e13124fdb5aad96f4392f Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Mon, 21 Oct 2024 21:17:52 -0700 Subject: [PATCH 10/14] add approx count distinct test --- tests/dataframe/test_approx_count_distinct.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/dataframe/test_approx_count_distinct.py b/tests/dataframe/test_approx_count_distinct.py index 78d2a7b181..68d7057ca0 100644 --- a/tests/dataframe/test_approx_count_distinct.py +++ b/tests/dataframe/test_approx_count_distinct.py @@ -2,12 +2,7 @@ import pytest import daft -from daft import col, context - -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", -) +from daft import col TESTS = [ [[], 0], From 294f588e80e436a4ce75a619b6eb3d50d503db3e Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Mon, 21 Oct 2024 21:42:13 -0700 Subject: [PATCH 11/14] comments --- src/daft-local-execution/src/sinks/concat.rs | 32 +++++++++++++++---- .../src/sinks/streaming_sink.rs | 12 +++++++ 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/src/daft-local-execution/src/sinks/concat.rs b/src/daft-local-execution/src/sinks/concat.rs index 058fa1fe58..eed4021c65 100644 --- a/src/daft-local-execution/src/sinks/concat.rs +++ b/src/daft-local-execution/src/sinks/concat.rs @@ -1,13 +1,16 @@ use std::sync::Arc; -use common_error::DaftResult; +use common_error::{DaftError, DaftResult}; use daft_micropartition::MicroPartition; use tracing::instrument; use super::streaming_sink::{StreamingSink, StreamingSinkOutput, StreamingSinkState}; use crate::pipeline::PipelineResultType; -struct ConcatSinkState {} +struct ConcatSinkState { + // The index of the last morsel of data that was received, which should be strictly non-decreasing. + pub curr_idx: usize, +} impl StreamingSinkState for ConcatSinkState { fn as_any_mut(&mut self) -> &mut dyn std::any::Any { self @@ -17,15 +20,29 @@ impl StreamingSinkState for ConcatSinkState { pub struct ConcatSink {} impl StreamingSink for ConcatSink { + /// Execute for the ConcatSink operator does not do any computation and simply returns the input data. + /// It only expects that the indices of the input data are strictly non-decreasing. #[instrument(skip_all, name = "ConcatSink::sink")] fn execute( &self, - _index: usize, + index: usize, input: &PipelineResultType, - _state: &mut dyn StreamingSinkState, + state: &mut dyn StreamingSinkState, ) -> DaftResult { - let input = input.as_data(); - Ok(StreamingSinkOutput::NeedMoreInput(Some(input.clone()))) + let state = state + .as_any_mut() + .downcast_mut::() + .expect("ConcatSink should have ConcatSinkState"); + + // If the index is the same as the current index or one more than the current index, then we can accept the morsel. + if state.curr_idx == index || state.curr_idx + 1 == index { + state.curr_idx = index; + Ok(StreamingSinkOutput::NeedMoreInput(Some( + input.as_data().clone(), + ))) + } else { + Err(DaftError::ComputeError(format!("Concat sink received out-of-order data. Expected index to be {} or {}, but got {}.", state.curr_idx, state.curr_idx + 1, index))) + } } fn name(&self) -> &'static str { @@ -40,9 +57,10 @@ impl StreamingSink for ConcatSink { } fn make_state(&self) -> Box { - Box::new(ConcatSinkState {}) + Box::new(ConcatSinkState { curr_idx: 0 }) } + /// Since the ConcatSink does not do any computation, it does not need to spawn multiple workers. fn max_concurrency(&self) -> usize { 1 } diff --git a/src/daft-local-execution/src/sinks/streaming_sink.rs b/src/daft-local-execution/src/sinks/streaming_sink.rs index 0a7000af8f..85867c4ebb 100644 --- a/src/daft-local-execution/src/sinks/streaming_sink.rs +++ b/src/daft-local-execution/src/sinks/streaming_sink.rs @@ -26,18 +26,30 @@ pub enum StreamingSinkOutput { } pub trait StreamingSink: Send + Sync { + /// Execute the StreamingSink operator on the morsel of input data, + /// received from the child with the given index, + /// with the given state. fn execute( &self, index: usize, input: &PipelineResultType, state: &mut dyn StreamingSinkState, ) -> DaftResult; + + /// Finalize the StreamingSink operator, with the given states from each worker. fn finalize( &self, states: Vec>, ) -> DaftResult>>; + + /// The name of the StreamingSink operator. fn name(&self) -> &'static str; + + /// Create a new worker-local state for this StreamingSink. fn make_state(&self) -> Box; + + /// The maximum number of concurrent workers that can be spawned for this sink. + /// Each worker will has its own StreamingSinkState. fn max_concurrency(&self) -> usize { *NUM_CPUS } From 128c2d292615cf1059c4dbc02aa34f656fa7d60d Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Mon, 21 Oct 2024 21:43:36 -0700 Subject: [PATCH 12/14] minor nit --- src/daft-local-execution/src/pipeline.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index 785bcb177b..f29135afb5 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -27,7 +27,7 @@ use crate::{ sample::SampleOperator, unpivot::UnpivotOperator, }, sinks::{ - aggregate::AggregateSink, blocking_sink::BlockingSinkNode, + aggregate::AggregateSink, blocking_sink::BlockingSinkNode, concat::ConcatSink, hash_join_build::HashJoinBuildSink, limit::LimitSink, outer_hash_join_probe::OuterHashJoinProbeSink, sort::SortSink, streaming_sink::StreamingSinkNode, @@ -155,7 +155,7 @@ pub fn physical_plan_to_pipeline( LocalPhysicalPlan::Concat(Concat { input, other, .. }) => { let left_child = physical_plan_to_pipeline(input, psets)?; let right_child = physical_plan_to_pipeline(other, psets)?; - let sink = crate::sinks::concat::ConcatSink {}; + let sink = ConcatSink {}; StreamingSinkNode::new(Arc::new(sink), vec![left_child, right_child]).boxed() } LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate { From c82fb49cec619926e60b49772c8052d575f17631 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Tue, 22 Oct 2024 09:39:51 -0700 Subject: [PATCH 13/14] comment about forwarding --- src/daft-local-execution/src/sinks/streaming_sink.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/daft-local-execution/src/sinks/streaming_sink.rs b/src/daft-local-execution/src/sinks/streaming_sink.rs index 85867c4ebb..6e8a022cdb 100644 --- a/src/daft-local-execution/src/sinks/streaming_sink.rs +++ b/src/daft-local-execution/src/sinks/streaming_sink.rs @@ -130,6 +130,8 @@ impl StreamingSinkNode { output_receiver } + // Forwards input from the children to the workers in a round-robin fashion. + // Always exhausts the input from one child before moving to the next. async fn forward_input_to_workers( receivers: Vec, worker_senders: Vec>, From 102970ee597c662c29dfcacbbc1e9f69c16a4c27 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Tue, 22 Oct 2024 13:35:43 -0700 Subject: [PATCH 14/14] add todo --- src/daft-local-execution/src/sinks/concat.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/daft-local-execution/src/sinks/concat.rs b/src/daft-local-execution/src/sinks/concat.rs index eed4021c65..5b98cb84c6 100644 --- a/src/daft-local-execution/src/sinks/concat.rs +++ b/src/daft-local-execution/src/sinks/concat.rs @@ -22,6 +22,7 @@ pub struct ConcatSink {} impl StreamingSink for ConcatSink { /// Execute for the ConcatSink operator does not do any computation and simply returns the input data. /// It only expects that the indices of the input data are strictly non-decreasing. + /// TODO(Colin): If maintain_order is false, technically we could accept any index. Make this optimization later. #[instrument(skip_all, name = "ConcatSink::sink")] fn execute( &self,