Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
edmondop committed Jun 19, 2024
1 parent 26a915c commit 6a180f2
Showing 1 changed file with 241 additions and 27 deletions.
268 changes: 241 additions & 27 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::utils::transpose;
use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, PhysicalSortExpr};

use futures::stream::Stream;

use futures::{FutureExt, StreamExt, TryStreamExt};
use hashbrown::HashMap;
use log::trace;
use parking_lot::Mutex;
use parking_lot::lock_api::{Mutex as ParkingLotAPIMutex};
use parking_lot::{RawMutex, Mutex};

mod distributor_channels;

Expand All @@ -79,6 +81,66 @@ struct RepartitionExecState {
}

impl RepartitionExecState {

fn handle_non_hash_partitioning(
input: Arc<dyn ExecutionPlan>,
i: usize,
txs: HashMap<usize, (DistributionSender<Option<Result<RecordBatch, DataFusionError>>>, Arc<ParkingLotAPIMutex<RawMutex, MemoryReservation>>)>,
partitioning: Partitioning,
r_metrics: RepartitionMetrics,
context: Arc<TaskContext>) -> SpawnedTask<()>{
let input_task = SpawnedTask::spawn(RepartitionExec::pull_from_input(
input,
i,
txs,
partitioning,
r_metrics,
context,
));

// In a separate task, wait for each input to be done
// (and pass along any errors, including panic!s)
SpawnedTask::spawn(RepartitionExec::wait_for_task(
input_task,
txs.into_iter()
.map(|(partition, (tx, _reservation))| (partition, tx))
.collect(),
))

}

fn handle_hash_partitioning(
input: Arc<dyn ExecutionPlan>,
i: usize,
txs: HashMap<usize, (DistributionSender<Option<Result<RecordBatch, DataFusionError>>>, Arc<ParkingLotAPIMutex<RawMutex, MemoryReservation>>)>,
hash_expr: Vec<Arc<dyn PhysicalExpr>>,
partition_count: usize,
r_metrics: RepartitionMetrics,
context: Arc<TaskContext>) -> SpawnedTask<()>{
let input_task = SpawnedTask::spawn(RepartitionExec::pull_from_input_helper(
input,
i,
txs,
partition_count,
r_metrics,
context,
));

// Hashing here?

// In a separate task, wait for each input to be done
// (and pass along any errors, including panic!s)
SpawnedTask::spawn(RepartitionExec::wait_for_task(
input_task,
txs.into_iter()
.map(|(partition, (tx, _reservation))| (partition, tx))
.collect(),
))

}



fn new(
input: Arc<dyn ExecutionPlan>,
partitioning: Partitioning,
Expand Down Expand Up @@ -120,9 +182,8 @@ impl RepartitionExecState {
channels.insert(partition, (tx, rx, reservation));
}

// launch one async task per *input* partition
// launch one async task per input partition when round robin, multiple when hash
let mut spawned_tasks = Vec::with_capacity(num_input_partitions);

for i in 0..num_input_partitions {
let txs: HashMap<_, _> = channels
.iter()
Expand All @@ -132,25 +193,16 @@ impl RepartitionExecState {
.collect();

let r_metrics = RepartitionMetrics::new(i, num_output_partitions, &metrics);
let input_data = RepartitionExec::split_input_data( input,i, partitioning.clone(), context.clone(), r_metrics.clone())?;
let input_task = SpawnedTask::spawn(RepartitionExec::pull_from_input(
input.clone(),
i,
num_output_partitions,
txs.clone(),
partitioning.clone(),
r_metrics,
context.clone(),
));
let wait_for_task = match partitioning {
Partitioning::Hash(exprs, size) => {
RepartitionExecState::handle_hash_partitioning(input, i, txs.clone(), exprs, size, r_metrics, context.clone());
},
_ => {
RepartitionExecState::handle_non_hash_partitioning(input, i, txs.clone(), partitioning.clone(), r_metrics, context.clone());
}
};


// In a separate task, wait for each input to be done
// (and pass along any errors, including panic!s)
let wait_for_task = SpawnedTask::spawn(RepartitionExec::wait_for_task(
input_task,
txs.into_iter()
.map(|(partition, (tx, _reservation))| (partition, tx))
.collect(),
));
spawned_tasks.push(wait_for_task);
}

Expand Down Expand Up @@ -774,28 +826,189 @@ impl RepartitionExec {
}

}

async fn pull_from_input_helper(
input: Arc<dyn ExecutionPlan>,
partition: usize,
mut output_channels: HashMap<
usize,
(DistributionSender<MaybeBatch>, SharedMemoryReservation),
>,
partition_count: usize,
metrics: RepartitionMetrics,
context: Arc<TaskContext>,
) -> Result<()> {
let mut partitioner =
BatchPartitioner::try_new(Partitioning::RoundRobinBatch(partition_count), metrics.repartition_time.clone())?;

/// Pulls data from the specified input plan, feeding it to the
// execute the child operator
let timer = metrics.fetch_time.timer();
let mut stream = input.execute(partition, context)?;
timer.done();

// While there are still outputs to send to, keep pulling inputs
let mut batches_until_yield = partitioner.num_partitions();
while !output_channels.is_empty() {
// fetch the next batch
let timer = metrics.fetch_time.timer();
let result = stream.next().await;
timer.done();

// Input is done
let batch = match result {
Some(result) => result?,
None => break,
};

for res in partitioner.partition_iter(batch)? {
let (partition, batch) = res?;
let size = batch.get_array_memory_size();

let timer = metrics.send_time[partition].timer();
// if there is still a receiver, send to it
if let Some((tx, reservation)) = output_channels.get_mut(&partition) {
reservation.lock().try_grow(size)?;

if tx.send(Some(Ok(batch))).await.is_err() {
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
reservation.lock().shrink(size);
output_channels.remove(&partition);
}
}
timer.done();
}

// If the input stream is endless, we may spin forever and
// never yield back to tokio. See
// https://github.com/apache/datafusion/issues/5278.
//
// However, yielding on every batch causes a bottleneck
// when running with multiple cores. See
// https://github.com/apache/datafusion/issues/6290
//
// Thus, heuristically yield after producing num_partition
// batches
//
// In round robin this is ideal as each input will get a
// new batch. In hash partitioning it may yield too often
// on uneven distributions even if some partition can not
// make progress, but parallelism is going to be limited
// in that case anyways
if batches_until_yield == 0 {
tokio::task::yield_now().await;
batches_until_yield = partitioner.num_partitions();
} else {
batches_until_yield -= 1;
}
}

Ok(())
}


// /// Pulls data from the specified input plan, feeding it to the
// /// output partitions based on the desired partitioning
// ///
// /// txs hold the output sending channels for each output partition
// async fn pull_from_input(
// mut stream: SendableRecordBatchStream,
// // partition: Partition,
// mut output_channels: HashMap<
// usize,
// (DistributionSender<MaybeBatch>, SharedMemoryReservation),
// >,
// partitioning: Partitioning,
// metrics: RepartitionMetrics,

// ) -> Result<()> {
// let mut partitioner =
// BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?;

// // execute the child operator
// let timer = metrics.fetch_time.timer();
// // let mut stream = input.execute(partition.index(), context)?;
// timer.done();

// // While there are still outputs to send to, keep pulling inputs
// let mut batches_until_yield = partitioner.num_partitions();
// while !output_channels.is_empty() {
// // fetch the next batch
// let timer = metrics.fetch_time.timer();
// let result = stream.next().await;
// timer.done();

// // Input is done
// let batch = match result {
// Some(result) => result?,
// None => break,
// };

// for res in partitioner.partition_iter(batch)? {
// let (partition, batch) = res?;
// let size = batch.get_array_memory_size();

// let timer = metrics.send_time[partition].timer();
// // if there is still a receiver, send to it
// if let Some((tx, reservation)) = output_channels.get_mut(&partition) {
// reservation.lock().try_grow(size)?;

// if tx.send(Some(Ok(batch))).await.is_err() {
// // If the other end has hung up, it was an early shutdown (e.g. LIMIT)
// reservation.lock().shrink(size);
// output_channels.remove(&partition);
// }
// }
// timer.done();
// }

// // If the input stream is endless, we may spin forever and
// // never yield back to tokio. See
// // https://github.com/apache/datafusion/issues/5278.
// //
// // However, yielding on every batch causes a bottleneck
// // when running with multiple cores. See
// // https://github.com/apache/datafusion/issues/6290
// //
// // Thus, heuristically yield after producing num_partition
// // batches
// //
// // In round robin this is ideal as each input will get a
// // new batch. In hash partitioning it may yield too often
// // on uneven distributions even if some partition can not
// // make progress, but parallelism is going to be limited
// // in that case anyways
// if batches_until_yield == 0 {
// tokio::task::yield_now().await;
// batches_until_yield = partitioner.num_partitions();
// } else {
// batches_until_yield -= 1;
// }
// }

// Ok(())
// }

/// Pulls data from the specified input plan, feeding it to the
/// output partitions based on the desired partitioning
///
/// txs hold the output sending channels for each output partition
async fn pull_from_input(
mut stream: SendableRecordBatchStream,
// partition: Partition,
input: Arc<dyn ExecutionPlan>,
partition: usize,
mut output_channels: HashMap<
usize,
(DistributionSender<MaybeBatch>, SharedMemoryReservation),
>,
partitioning: Partitioning,
metrics: RepartitionMetrics,

context: Arc<TaskContext>,
) -> Result<()> {
let mut partitioner =
BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?;

// execute the child operator
let timer = metrics.fetch_time.timer();
// let mut stream = input.execute(partition.index(), context)?;
let mut stream = input.execute(partition, context)?;
timer.done();

// While there are still outputs to send to, keep pulling inputs
Expand Down Expand Up @@ -857,6 +1070,7 @@ impl RepartitionExec {
Ok(())
}


/// Waits for `input_task` which is consuming one of the inputs to
/// complete. Upon each successful completion, sends a `None` to
/// each of the output tx channels to signal one of the inputs is
Expand Down

0 comments on commit 6a180f2

Please sign in to comment.