Skip to content

Commit

Permalink
no async trait
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Aug 29, 2024
1 parent 05524b8 commit 7c76de2
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 31 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
[dependencies]
async-trait = {workspace = true}
common-daft-config = {path = "../common/daft-config", default-features = false}
common-display = {path = "../common/display", default-features = false}
common-error = {path = "../common/error", default-features = false}
Expand Down
14 changes: 5 additions & 9 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use common_error::DaftResult;
use daft_micropartition::MicroPartition;
use tracing::{info_span, instrument};

use async_trait::async_trait;

use crate::{
channel::{create_channel, PipelineChannel, Receiver, Sender},
pipeline::{PipelineNode, PipelineResultType},
Expand Down Expand Up @@ -100,7 +98,7 @@ impl IntermediateNode {
Ok(())
}

pub async fn spawn_workers(
pub fn spawn_workers(
&self,
num_workers: usize,
destination_channel: &mut PipelineChannel,
Expand Down Expand Up @@ -185,7 +183,6 @@ impl TreeDisplay for IntermediateNode {
}
}

#[async_trait]
impl PipelineNode for IntermediateNode {
fn children(&self) -> Vec<&dyn PipelineNode> {
self.children.iter().map(|v| v.as_ref()).collect()
Expand All @@ -195,22 +192,21 @@ impl PipelineNode for IntermediateNode {
self.intermediate_op.name()
}

async fn start(
fn start(
&mut self,
maintain_order: bool,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> crate::Result<PipelineChannel> {
let mut child_result_receivers = Vec::with_capacity(self.children.len());
for child in self.children.iter_mut() {
let child_result_channel = child.start(maintain_order, runtime_handle).await?;
let child_result_channel = child.start(maintain_order, runtime_handle)?;
child_result_receivers
.push(child_result_channel.get_receiver_with_stats(&self.runtime_stats));
}
let mut destination_channel = PipelineChannel::new(*NUM_CPUS, maintain_order);

let worker_senders = self
.spawn_workers(*NUM_CPUS, &mut destination_channel, runtime_handle)
.await;
let worker_senders =
self.spawn_workers(*NUM_CPUS, &mut destination_channel, runtime_handle);
runtime_handle.spawn(
Self::send_to_workers(
child_result_receivers,
Expand Down
4 changes: 1 addition & 3 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::{
ExecutionRuntimeHandle, PipelineCreationSnafu,
};

use async_trait::async_trait;
use common_display::{mermaid::MermaidDisplayVisitor, tree::TreeDisplay};
use common_error::DaftResult;
use daft_core::{
Expand Down Expand Up @@ -72,11 +71,10 @@ impl PipelineResultType {
}
}

#[async_trait]
pub trait PipelineNode: Sync + Send + TreeDisplay {
fn children(&self) -> Vec<&dyn PipelineNode>;
fn name(&self) -> &'static str;
async fn start(
fn start(
&mut self,
maintain_order: bool,
runtime_handle: &mut ExecutionRuntimeHandle,
Expand Down
5 changes: 1 addition & 4 deletions src/daft-local-execution/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ pub fn run_local(
.expect("Failed to create tokio runtime");
runtime.block_on(async {
let mut runtime_handle = ExecutionRuntimeHandle::new(cfg.default_morsel_size);
let mut receiver = pipeline
.start(true, &mut runtime_handle)
.await?
.get_receiver();
let mut receiver = pipeline.start(true, &mut runtime_handle)?.get_receiver();
while let Some(val) = receiver.recv().await {
let _ = tx.send(val.as_data().clone()).await;
}
Expand Down
7 changes: 2 additions & 5 deletions src/daft-local-execution/src/sinks/blocking_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::{
runtime_stats::RuntimeStatsContext,
ExecutionRuntimeHandle,
};
use async_trait::async_trait;
pub enum BlockingSinkStatus {
NeedMoreInput,
#[allow(dead_code)]
Expand Down Expand Up @@ -68,7 +67,6 @@ impl TreeDisplay for BlockingSinkNode {
}
}

#[async_trait]
impl PipelineNode for BlockingSinkNode {
fn children(&self) -> Vec<&dyn PipelineNode> {
vec![self.child.as_ref()]
Expand All @@ -78,15 +76,14 @@ impl PipelineNode for BlockingSinkNode {
self.name
}

async fn start(
fn start(
&mut self,
maintain_order: bool,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> crate::Result<PipelineChannel> {
let child = self.child.as_mut();
let mut child_results_receiver = child
.start(false, runtime_handle)
.await?
.start(false, runtime_handle)?
.get_receiver_with_stats(&self.runtime_stats);

let mut destination_channel = PipelineChannel::new(1, maintain_order);
Expand Down
7 changes: 3 additions & 4 deletions src/daft-local-execution/src/sinks/streaming_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
channel::PipelineChannel, pipeline::PipelineNode, runtime_stats::RuntimeStatsContext,
ExecutionRuntimeHandle, NUM_CPUS,
};
use async_trait::async_trait;

pub enum StreamSinkOutput {
NeedMoreInput(Option<Arc<MicroPartition>>),
#[allow(dead_code)]
Expand Down Expand Up @@ -73,7 +73,6 @@ impl TreeDisplay for StreamingSinkNode {
}
}

#[async_trait]
impl PipelineNode for StreamingSinkNode {
fn children(&self) -> Vec<&dyn PipelineNode> {
self.children.iter().map(|v| v.as_ref()).collect()
Expand All @@ -83,7 +82,7 @@ impl PipelineNode for StreamingSinkNode {
self.name
}

async fn start(
fn start(
&mut self,
maintain_order: bool,
runtime_handle: &mut ExecutionRuntimeHandle,
Expand All @@ -92,7 +91,7 @@ impl PipelineNode for StreamingSinkNode {
.children
.get_mut(0)
.expect("we should only have 1 child");
let child_results_channel = child.start(true, runtime_handle).await?;
let child_results_channel = child.start(true, runtime_handle)?;
let mut child_results_receiver =
child_results_channel.get_receiver_with_stats(&self.runtime_stats);

Expand Down
5 changes: 1 addition & 4 deletions src/daft-local-execution/src/sources/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use daft_io::{IOStatsContext, IOStatsRef};
use daft_micropartition::MicroPartition;
use futures::{stream::BoxStream, StreamExt};

use async_trait::async_trait;

use crate::{
channel::PipelineChannel, pipeline::PipelineNode, runtime_stats::RuntimeStatsContext,
ExecutionRuntimeHandle,
Expand Down Expand Up @@ -62,15 +60,14 @@ impl TreeDisplay for SourceNode {
}
}

#[async_trait]
impl PipelineNode for SourceNode {
fn name(&self) -> &'static str {
self.source.name()
}
fn children(&self) -> Vec<&dyn PipelineNode> {
vec![]
}
async fn start(
fn start(
&mut self,
maintain_order: bool,
runtime_handle: &mut ExecutionRuntimeHandle,
Expand Down

0 comments on commit 7c76de2

Please sign in to comment.