diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index af1f8fbf1d8cd..9bdddd96fa0d7 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -44,15 +44,17 @@ use arrow::record_batch::RecordBatch; use datafusion_common::utils::transpose; use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; -use datafusion_execution::memory_pool::MemoryConsumer; +use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, PhysicalSortExpr}; use futures::stream::Stream; + use futures::{FutureExt, StreamExt, TryStreamExt}; use hashbrown::HashMap; use log::trace; -use parking_lot::Mutex; +use parking_lot::lock_api::{Mutex as ParkingLotAPIMutex}; +use parking_lot::{RawMutex, Mutex}; mod distributor_channels; @@ -79,6 +81,66 @@ struct RepartitionExecState { } impl RepartitionExecState { + + fn handle_non_hash_partitioning( + input: Arc, + i: usize, + txs: HashMap>>, Arc>)>, + partitioning: Partitioning, + r_metrics: RepartitionMetrics, + context: Arc) -> SpawnedTask<()>{ + let input_task = SpawnedTask::spawn(RepartitionExec::pull_from_input( + input, + i, + txs, + partitioning, + r_metrics, + context, + )); + + // In a separate task, wait for each input to be done + // (and pass along any errors, including panic!s) + SpawnedTask::spawn(RepartitionExec::wait_for_task( + input_task, + txs.into_iter() + .map(|(partition, (tx, _reservation))| (partition, tx)) + .collect(), + )) + + } + + fn handle_hash_partitioning( + input: Arc, + i: usize, + txs: HashMap>>, Arc>)>, + hash_expr: Vec>, + partition_count: usize, + r_metrics: RepartitionMetrics, + context: Arc) -> SpawnedTask<()>{ + let input_task = SpawnedTask::spawn(RepartitionExec::pull_from_input_helper( + input, + i, + txs, + partition_count, + r_metrics, + context, + )); + + // Hashing here? + + // In a separate task, wait for each input to be done + // (and pass along any errors, including panic!s) + SpawnedTask::spawn(RepartitionExec::wait_for_task( + input_task, + txs.into_iter() + .map(|(partition, (tx, _reservation))| (partition, tx)) + .collect(), + )) + + } + + + fn new( input: Arc, partitioning: Partitioning, @@ -120,9 +182,8 @@ impl RepartitionExecState { channels.insert(partition, (tx, rx, reservation)); } - // launch one async task per *input* partition + // launch one async task per input partition when round robin, multiple when hash let mut spawned_tasks = Vec::with_capacity(num_input_partitions); - for i in 0..num_input_partitions { let txs: HashMap<_, _> = channels .iter() @@ -132,25 +193,16 @@ impl RepartitionExecState { .collect(); let r_metrics = RepartitionMetrics::new(i, num_output_partitions, &metrics); - let input_data = RepartitionExec::split_input_data( input,i, partitioning.clone(), context.clone(), r_metrics.clone())?; - let input_task = SpawnedTask::spawn(RepartitionExec::pull_from_input( - input.clone(), - i, - num_output_partitions, - txs.clone(), - partitioning.clone(), - r_metrics, - context.clone(), - )); + let wait_for_task = match partitioning { + Partitioning::Hash(exprs, size) => { + RepartitionExecState::handle_hash_partitioning(input, i, txs.clone(), exprs, size, r_metrics, context.clone()); + }, + _ => { + RepartitionExecState::handle_non_hash_partitioning(input, i, txs.clone(), partitioning.clone(), r_metrics, context.clone()); + } + }; + - // In a separate task, wait for each input to be done - // (and pass along any errors, including panic!s) - let wait_for_task = SpawnedTask::spawn(RepartitionExec::wait_for_task( - input_task, - txs.into_iter() - .map(|(partition, (tx, _reservation))| (partition, tx)) - .collect(), - )); spawned_tasks.push(wait_for_task); } @@ -774,28 +826,189 @@ impl RepartitionExec { } } + + async fn pull_from_input_helper( + input: Arc, + partition: usize, + mut output_channels: HashMap< + usize, + (DistributionSender, SharedMemoryReservation), + >, + partition_count: usize, + metrics: RepartitionMetrics, + context: Arc, + ) -> Result<()> { + let mut partitioner = + BatchPartitioner::try_new(Partitioning::RoundRobinBatch(partition_count), metrics.repartition_time.clone())?; - /// Pulls data from the specified input plan, feeding it to the + // execute the child operator + let timer = metrics.fetch_time.timer(); + let mut stream = input.execute(partition, context)?; + timer.done(); + + // While there are still outputs to send to, keep pulling inputs + let mut batches_until_yield = partitioner.num_partitions(); + while !output_channels.is_empty() { + // fetch the next batch + let timer = metrics.fetch_time.timer(); + let result = stream.next().await; + timer.done(); + + // Input is done + let batch = match result { + Some(result) => result?, + None => break, + }; + + for res in partitioner.partition_iter(batch)? { + let (partition, batch) = res?; + let size = batch.get_array_memory_size(); + + let timer = metrics.send_time[partition].timer(); + // if there is still a receiver, send to it + if let Some((tx, reservation)) = output_channels.get_mut(&partition) { + reservation.lock().try_grow(size)?; + + if tx.send(Some(Ok(batch))).await.is_err() { + // If the other end has hung up, it was an early shutdown (e.g. LIMIT) + reservation.lock().shrink(size); + output_channels.remove(&partition); + } + } + timer.done(); + } + + // If the input stream is endless, we may spin forever and + // never yield back to tokio. See + // https://github.com/apache/datafusion/issues/5278. + // + // However, yielding on every batch causes a bottleneck + // when running with multiple cores. See + // https://github.com/apache/datafusion/issues/6290 + // + // Thus, heuristically yield after producing num_partition + // batches + // + // In round robin this is ideal as each input will get a + // new batch. In hash partitioning it may yield too often + // on uneven distributions even if some partition can not + // make progress, but parallelism is going to be limited + // in that case anyways + if batches_until_yield == 0 { + tokio::task::yield_now().await; + batches_until_yield = partitioner.num_partitions(); + } else { + batches_until_yield -= 1; + } + } + + Ok(()) + } + + + // /// Pulls data from the specified input plan, feeding it to the + // /// output partitions based on the desired partitioning + // /// + // /// txs hold the output sending channels for each output partition + // async fn pull_from_input( + // mut stream: SendableRecordBatchStream, + // // partition: Partition, + // mut output_channels: HashMap< + // usize, + // (DistributionSender, SharedMemoryReservation), + // >, + // partitioning: Partitioning, + // metrics: RepartitionMetrics, + + // ) -> Result<()> { + // let mut partitioner = + // BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; + + // // execute the child operator + // let timer = metrics.fetch_time.timer(); + // // let mut stream = input.execute(partition.index(), context)?; + // timer.done(); + + // // While there are still outputs to send to, keep pulling inputs + // let mut batches_until_yield = partitioner.num_partitions(); + // while !output_channels.is_empty() { + // // fetch the next batch + // let timer = metrics.fetch_time.timer(); + // let result = stream.next().await; + // timer.done(); + + // // Input is done + // let batch = match result { + // Some(result) => result?, + // None => break, + // }; + + // for res in partitioner.partition_iter(batch)? { + // let (partition, batch) = res?; + // let size = batch.get_array_memory_size(); + + // let timer = metrics.send_time[partition].timer(); + // // if there is still a receiver, send to it + // if let Some((tx, reservation)) = output_channels.get_mut(&partition) { + // reservation.lock().try_grow(size)?; + + // if tx.send(Some(Ok(batch))).await.is_err() { + // // If the other end has hung up, it was an early shutdown (e.g. LIMIT) + // reservation.lock().shrink(size); + // output_channels.remove(&partition); + // } + // } + // timer.done(); + // } + + // // If the input stream is endless, we may spin forever and + // // never yield back to tokio. See + // // https://github.com/apache/datafusion/issues/5278. + // // + // // However, yielding on every batch causes a bottleneck + // // when running with multiple cores. See + // // https://github.com/apache/datafusion/issues/6290 + // // + // // Thus, heuristically yield after producing num_partition + // // batches + // // + // // In round robin this is ideal as each input will get a + // // new batch. In hash partitioning it may yield too often + // // on uneven distributions even if some partition can not + // // make progress, but parallelism is going to be limited + // // in that case anyways + // if batches_until_yield == 0 { + // tokio::task::yield_now().await; + // batches_until_yield = partitioner.num_partitions(); + // } else { + // batches_until_yield -= 1; + // } + // } + + // Ok(()) + // } + + /// Pulls data from the specified input plan, feeding it to the /// output partitions based on the desired partitioning /// /// txs hold the output sending channels for each output partition async fn pull_from_input( - mut stream: SendableRecordBatchStream, - // partition: Partition, + input: Arc, + partition: usize, mut output_channels: HashMap< usize, (DistributionSender, SharedMemoryReservation), >, partitioning: Partitioning, metrics: RepartitionMetrics, - + context: Arc, ) -> Result<()> { let mut partitioner = BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; // execute the child operator let timer = metrics.fetch_time.timer(); - // let mut stream = input.execute(partition.index(), context)?; + let mut stream = input.execute(partition, context)?; timer.done(); // While there are still outputs to send to, keep pulling inputs @@ -857,6 +1070,7 @@ impl RepartitionExec { Ok(()) } + /// Waits for `input_task` which is consuming one of the inputs to /// complete. Upon each successful completion, sends a `None` to /// each of the output tx channels to signal one of the inputs is