From 7c76de2ff969efdf45275af8aef89f748ffe83d9 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 29 Aug 2024 14:07:00 -0700 Subject: [PATCH] no async trait --- Cargo.lock | 1 - src/daft-local-execution/Cargo.toml | 1 - .../src/intermediate_ops/intermediate_op.rs | 14 +++++--------- src/daft-local-execution/src/pipeline.rs | 4 +--- src/daft-local-execution/src/run.rs | 5 +---- .../src/sinks/blocking_sink.rs | 7 ++----- .../src/sinks/streaming_sink.rs | 7 +++---- src/daft-local-execution/src/sources/source.rs | 5 +---- 8 files changed, 13 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b547e28349..7c06eeb8ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1888,7 +1888,6 @@ dependencies = [ name = "daft-local-execution" version = "0.3.0-dev0" dependencies = [ - "async-trait", "common-daft-config", "common-display", "common-error", diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index 66949390f5..678ea0ccbe 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -1,5 +1,4 @@ [dependencies] -async-trait = {workspace = true} common-daft-config = {path = "../common/daft-config", default-features = false} common-display = {path = "../common/display", default-features = false} common-error = {path = "../common/error", default-features = false} diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index aec8e91be6..d4d6ea4456 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -5,8 +5,6 @@ use common_error::DaftResult; use daft_micropartition::MicroPartition; use tracing::{info_span, instrument}; -use async_trait::async_trait; - use crate::{ channel::{create_channel, PipelineChannel, Receiver, Sender}, pipeline::{PipelineNode, PipelineResultType}, @@ -100,7 +98,7 @@ impl IntermediateNode { Ok(()) } - pub async fn spawn_workers( + pub fn spawn_workers( &self, num_workers: usize, destination_channel: &mut PipelineChannel, @@ -185,7 +183,6 @@ impl TreeDisplay for IntermediateNode { } } -#[async_trait] impl PipelineNode for IntermediateNode { fn children(&self) -> Vec<&dyn PipelineNode> { self.children.iter().map(|v| v.as_ref()).collect() @@ -195,22 +192,21 @@ impl PipelineNode for IntermediateNode { self.intermediate_op.name() } - async fn start( + fn start( &mut self, maintain_order: bool, runtime_handle: &mut ExecutionRuntimeHandle, ) -> crate::Result { let mut child_result_receivers = Vec::with_capacity(self.children.len()); for child in self.children.iter_mut() { - let child_result_channel = child.start(maintain_order, runtime_handle).await?; + let child_result_channel = child.start(maintain_order, runtime_handle)?; child_result_receivers .push(child_result_channel.get_receiver_with_stats(&self.runtime_stats)); } let mut destination_channel = PipelineChannel::new(*NUM_CPUS, maintain_order); - let worker_senders = self - .spawn_workers(*NUM_CPUS, &mut destination_channel, runtime_handle) - .await; + let worker_senders = + self.spawn_workers(*NUM_CPUS, &mut destination_channel, runtime_handle); runtime_handle.spawn( Self::send_to_workers( child_result_receivers, diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index 7b05472ed7..f182b9f9cc 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -16,7 +16,6 @@ use crate::{ ExecutionRuntimeHandle, PipelineCreationSnafu, }; -use async_trait::async_trait; use common_display::{mermaid::MermaidDisplayVisitor, tree::TreeDisplay}; use common_error::DaftResult; use daft_core::{ @@ -72,11 +71,10 @@ impl PipelineResultType { } } -#[async_trait] pub trait PipelineNode: Sync + Send + TreeDisplay { fn children(&self) -> Vec<&dyn PipelineNode>; fn name(&self) -> &'static str; - async fn start( + fn start( &mut self, maintain_order: bool, runtime_handle: &mut ExecutionRuntimeHandle, diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index 6d9c53e01d..ba500c272b 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -136,10 +136,7 @@ pub fn run_local( .expect("Failed to create tokio runtime"); runtime.block_on(async { let mut runtime_handle = ExecutionRuntimeHandle::new(cfg.default_morsel_size); - let mut receiver = pipeline - .start(true, &mut runtime_handle) - .await? - .get_receiver(); + let mut receiver = pipeline.start(true, &mut runtime_handle)?.get_receiver(); while let Some(val) = receiver.recv().await { let _ = tx.send(val.as_data().clone()).await; } diff --git a/src/daft-local-execution/src/sinks/blocking_sink.rs b/src/daft-local-execution/src/sinks/blocking_sink.rs index 87ba335215..09e42ae81f 100644 --- a/src/daft-local-execution/src/sinks/blocking_sink.rs +++ b/src/daft-local-execution/src/sinks/blocking_sink.rs @@ -11,7 +11,6 @@ use crate::{ runtime_stats::RuntimeStatsContext, ExecutionRuntimeHandle, }; -use async_trait::async_trait; pub enum BlockingSinkStatus { NeedMoreInput, #[allow(dead_code)] @@ -68,7 +67,6 @@ impl TreeDisplay for BlockingSinkNode { } } -#[async_trait] impl PipelineNode for BlockingSinkNode { fn children(&self) -> Vec<&dyn PipelineNode> { vec![self.child.as_ref()] @@ -78,15 +76,14 @@ impl PipelineNode for BlockingSinkNode { self.name } - async fn start( + fn start( &mut self, maintain_order: bool, runtime_handle: &mut ExecutionRuntimeHandle, ) -> crate::Result { let child = self.child.as_mut(); let mut child_results_receiver = child - .start(false, runtime_handle) - .await? + .start(false, runtime_handle)? .get_receiver_with_stats(&self.runtime_stats); let mut destination_channel = PipelineChannel::new(1, maintain_order); diff --git a/src/daft-local-execution/src/sinks/streaming_sink.rs b/src/daft-local-execution/src/sinks/streaming_sink.rs index dcf1b9db8d..1804a3e07e 100644 --- a/src/daft-local-execution/src/sinks/streaming_sink.rs +++ b/src/daft-local-execution/src/sinks/streaming_sink.rs @@ -9,7 +9,7 @@ use crate::{ channel::PipelineChannel, pipeline::PipelineNode, runtime_stats::RuntimeStatsContext, ExecutionRuntimeHandle, NUM_CPUS, }; -use async_trait::async_trait; + pub enum StreamSinkOutput { NeedMoreInput(Option>), #[allow(dead_code)] @@ -73,7 +73,6 @@ impl TreeDisplay for StreamingSinkNode { } } -#[async_trait] impl PipelineNode for StreamingSinkNode { fn children(&self) -> Vec<&dyn PipelineNode> { self.children.iter().map(|v| v.as_ref()).collect() @@ -83,7 +82,7 @@ impl PipelineNode for StreamingSinkNode { self.name } - async fn start( + fn start( &mut self, maintain_order: bool, runtime_handle: &mut ExecutionRuntimeHandle, @@ -92,7 +91,7 @@ impl PipelineNode for StreamingSinkNode { .children .get_mut(0) .expect("we should only have 1 child"); - let child_results_channel = child.start(true, runtime_handle).await?; + let child_results_channel = child.start(true, runtime_handle)?; let mut child_results_receiver = child_results_channel.get_receiver_with_stats(&self.runtime_stats); diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index be7d6bef67..175dc66427 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -5,8 +5,6 @@ use daft_io::{IOStatsContext, IOStatsRef}; use daft_micropartition::MicroPartition; use futures::{stream::BoxStream, StreamExt}; -use async_trait::async_trait; - use crate::{ channel::PipelineChannel, pipeline::PipelineNode, runtime_stats::RuntimeStatsContext, ExecutionRuntimeHandle, @@ -62,7 +60,6 @@ impl TreeDisplay for SourceNode { } } -#[async_trait] impl PipelineNode for SourceNode { fn name(&self) -> &'static str { self.source.name() @@ -70,7 +67,7 @@ impl PipelineNode for SourceNode { fn children(&self) -> Vec<&dyn PipelineNode> { vec![] } - async fn start( + fn start( &mut self, maintain_order: bool, runtime_handle: &mut ExecutionRuntimeHandle,