Skip to content

Commit

Permalink
need to drop sender
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Sep 19, 2024
1 parent bb0a944 commit c797da4
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions src/daft-local-execution/src/sinks/streaming_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,19 @@ impl StreamingSinkNode {
fn spawn_workers(
op: Arc<dyn StreamingSink>,
input_receivers: Vec<Receiver<(usize, PipelineResultType)>>,
output_senders: Vec<Sender<Arc<MicroPartition>>>,
worker_set: &mut WorkerSet<DaftResult<Box<dyn StreamingSinkState>>>,
stats: Arc<RuntimeStatsContext>,
) {
for (input_receiver, output_sender) in input_receivers.into_iter().zip(output_senders) {
) -> Receiver<Arc<MicroPartition>> {
let (output_sender, output_receiver) = create_channel(input_receivers.len());
for input_receiver in input_receivers {
worker_set.spawn(Self::run_worker(
op.clone(),
input_receiver,
output_sender,
output_sender.clone(),
stats.clone(),
));
}
output_receiver
}

async fn forward_input_to_workers(
Expand Down Expand Up @@ -198,13 +199,11 @@ impl PipelineNode for StreamingSinkNode {
let num_workers = op.max_concurrency();
let (input_senders, input_receivers) =
(0..num_workers).map(|_| create_channel(1)).unzip();
let (output_sender, mut output_receiver) = create_channel(num_workers);

let mut worker_set = create_worker_set();
Self::spawn_workers(
let mut output_receiver = Self::spawn_workers(
op.clone(),
input_receivers,
(0..num_workers).map(|_| output_sender.clone()).collect(),
&mut worker_set,
runtime_stats.clone(),
);
Expand Down

0 comments on commit c797da4

Please sign in to comment.