Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Swordfish perf + cleanup #3132

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[dependencies]
async-trait = {workspace = true}
common-daft-config = {path = "../common/daft-config", default-features = false}
common-display = {path = "../common/display", default-features = false}
common-error = {path = "../common/error", default-features = false}
Expand All @@ -21,6 +22,7 @@ futures = {workspace = true}
indexmap = {workspace = true}
lazy_static = {workspace = true}
log = {workspace = true}
loole = "0.4.0"
num-format = "0.4.4"
pyo3 = {workspace = true, optional = true}
snafu = {workspace = true}
Expand Down
97 changes: 97 additions & 0 deletions src/daft-local-execution/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use std::{cmp::Ordering::*, collections::VecDeque, sync::Arc};

use common_error::DaftResult;
use daft_micropartition::MicroPartition;

// A buffer that accumulates morsels until a threshold is reached
pub struct RowBasedBuffer {
pub buffer: VecDeque<Arc<MicroPartition>>,
pub curr_len: usize,
pub threshold: usize,
}

impl RowBasedBuffer {
pub fn new(threshold: usize) -> Self {
assert!(threshold > 0);
Self {
buffer: VecDeque::new(),
curr_len: 0,
threshold,
}
}

// Push a morsel to the buffer
pub fn push(&mut self, part: Arc<MicroPartition>) {
self.curr_len += part.len();
self.buffer.push_back(part);
}

// Pop enough morsels that reach the threshold
// - If the buffer currently has not enough morsels, return None
// - If the buffer has exactly enough morsels, return the morsels
// - If the buffer has more than enough morsels, return a vec of morsels, each correctly sized to the threshold.
// The remaining morsels will be pushed back to the buffer
pub fn pop_enough(&mut self) -> DaftResult<Option<Vec<Arc<MicroPartition>>>> {
match self.curr_len.cmp(&self.threshold) {
Less => Ok(None),
Equal => {
if self.buffer.len() == 1 {
let part = self.buffer.pop_front().unwrap();
self.curr_len = 0;
Ok(Some(vec![part]))
} else {
let chunk = MicroPartition::concat(
&std::mem::take(&mut self.buffer)
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>(),
)?;
self.curr_len = 0;
Ok(Some(vec![Arc::new(chunk)]))
}
}
Greater => {
let num_ready_chunks = self.curr_len / self.threshold;
let concated = MicroPartition::concat(
&std::mem::take(&mut self.buffer)
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>(),
)?;
let mut start = 0;
let mut parts_to_return = Vec::with_capacity(num_ready_chunks);
for _ in 0..num_ready_chunks {
let end = start + self.threshold;
let part = Arc::new(concated.slice(start, end)?);
parts_to_return.push(part);
start = end;
}
if start < concated.len() {
let part = Arc::new(concated.slice(start, concated.len())?);
self.curr_len = part.len();
self.buffer.push_back(part);

Check warning on line 72 in src/daft-local-execution/src/buffer.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/buffer.rs#L70-L72

Added lines #L70 - L72 were not covered by tests
} else {
self.curr_len = 0;
}
Ok(Some(parts_to_return))
}
}
}

// Pop all morsels in the buffer regardless of the threshold
pub fn pop_all(&mut self) -> DaftResult<Option<Arc<MicroPartition>>> {
assert!(self.curr_len < self.threshold);
if self.buffer.is_empty() {
Ok(None)
} else {
let concated = MicroPartition::concat(
&std::mem::take(&mut self.buffer)
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>(),
)?;
self.curr_len = 0;
Ok(Some(Arc::new(concated)))
}
}
}
137 changes: 71 additions & 66 deletions src/daft-local-execution/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,121 +1,126 @@
use std::sync::Arc;
use loole::SendError;

use crate::{
pipeline::PipelineResultType,
runtime_stats::{CountingReceiver, CountingSender, RuntimeStatsContext},
};
pub(crate) type Sender<T> = loole::Sender<T>;
pub(crate) type Receiver<T> = loole::Receiver<T>;

pub type Sender<T> = tokio::sync::mpsc::Sender<T>;
pub type Receiver<T> = tokio::sync::mpsc::Receiver<T>;

pub fn create_channel<T>(buffer_size: usize) -> (Sender<T>, Receiver<T>) {
tokio::sync::mpsc::channel(buffer_size)
}

pub struct PipelineChannel {
sender: PipelineSender,
receiver: PipelineReceiver,
pub(crate) fn create_channel<T>(buffer_size: usize) -> (Sender<T>, Receiver<T>) {
loole::bounded(buffer_size)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinzwang made a great callout the other day that the round-robin dispatching could be inefficient if ordering is not required. This can be fixed using https://docs.rs/loole/latest/loole/ channels, which are multi-producer multi-consumer. This essentially makes it work-stealing if maintaining order is not required.

daft.context.set_execution_config(enable_native_executor=True, default_morsel_size=1)


@daft.udf(return_dtype=daft.DataType.int64())
def do_work(x):
    x = x.to_pylist()
    # my laptop has 12 cores, so 12 workers will be spawned to run this udf.
    if x[0] % 12 == 0:
        print("doing a lot of work on ", x)
        time.sleep(1)
    else:
        print("doing a little work on ", x)
        time.sleep(0.1)
    return x


daft.from_pydict({"a": [i for i in range(48)]})
    .with_column("b", do_work(col("a")))
    .agg(col("b").sum()) # agg does not require ordering
    .collect()

This script is 4x faster now.


impl PipelineChannel {
pub fn new(buffer_size: usize, in_order: bool) -> Self {
if in_order {
let (senders, receivers) = (0..buffer_size).map(|_| create_channel(1)).unzip();
let sender = PipelineSender::InOrder(RoundRobinSender::new(senders));
let receiver = PipelineReceiver::InOrder(RoundRobinReceiver::new(receivers));
Self { sender, receiver }
} else {
let (sender, receiver) = create_channel(buffer_size);
let sender = PipelineSender::OutOfOrder(sender);
let receiver = PipelineReceiver::OutOfOrder(receiver);
Self { sender, receiver }
pub(crate) fn create_ordering_aware_receiver_channel<T>(
ordered: bool,
buffer_size: usize,
) -> (Vec<Sender<T>>, OrderingAwareReceiver<T>) {
match ordered {
true => {
let (senders, receiver) = (0..buffer_size).map(|_| create_channel::<T>(1)).unzip();
(
senders,
OrderingAwareReceiver::InOrder(RoundRobinReceiver::new(receiver)),
)
}
}

fn get_next_sender(&mut self) -> Sender<PipelineResultType> {
match &mut self.sender {
PipelineSender::InOrder(rr) => rr.get_next_sender(),
PipelineSender::OutOfOrder(sender) => sender.clone(),
false => {
let (sender, receiver) = create_channel::<T>(buffer_size);
(
(0..buffer_size).map(|_| sender.clone()).collect(),
OrderingAwareReceiver::OutOfOrder(receiver),
)
}
}
}

pub(crate) fn get_next_sender_with_stats(
&mut self,
rt: &Arc<RuntimeStatsContext>,
) -> CountingSender {
CountingSender::new(self.get_next_sender(), rt.clone())
}

pub fn get_receiver(self) -> PipelineReceiver {
self.receiver
pub(crate) fn create_ordering_aware_sender_channel<T>(
ordered: bool,
buffer_size: usize,
) -> (OrderingAwareSender<T>, Vec<Receiver<T>>) {
match ordered {
true => {
let (sender, receivers) = (0..buffer_size).map(|_| create_channel::<T>(1)).unzip();
(
OrderingAwareSender::InOrder(RoundRobinSender::new(sender)),
receivers,
)
}
false => {
let (sender, receiver) = create_channel::<T>(buffer_size);
(
OrderingAwareSender::OutOfOrder(sender),
(0..buffer_size).map(|_| receiver.clone()).collect(),
)
}
}
}

pub(crate) fn get_receiver_with_stats(self, rt: &Arc<RuntimeStatsContext>) -> CountingReceiver {
CountingReceiver::new(self.get_receiver(), rt.clone())
}
pub(crate) enum OrderingAwareSender<T> {
InOrder(RoundRobinSender<T>),
OutOfOrder(Sender<T>),
}

pub enum PipelineSender {
InOrder(RoundRobinSender<PipelineResultType>),
OutOfOrder(Sender<PipelineResultType>),
impl<T> OrderingAwareSender<T> {
pub(crate) async fn send(&mut self, val: T) -> Result<(), SendError<T>> {
match self {
Self::InOrder(rr) => rr.send(val).await,
Self::OutOfOrder(s) => s.send_async(val).await,
}
}
}

pub struct RoundRobinSender<T> {
pub(crate) struct RoundRobinSender<T> {
senders: Vec<Sender<T>>,
curr_sender_idx: usize,
next_sender_idx: usize,
}

impl<T> RoundRobinSender<T> {
pub fn new(senders: Vec<Sender<T>>) -> Self {
fn new(senders: Vec<Sender<T>>) -> Self {
Self {
senders,
curr_sender_idx: 0,
next_sender_idx: 0,
}
}

pub fn get_next_sender(&mut self) -> Sender<T> {
let next_idx = self.curr_sender_idx;
self.curr_sender_idx = (next_idx + 1) % self.senders.len();
self.senders[next_idx].clone()
async fn send(&mut self, val: T) -> Result<(), SendError<T>> {
let next_sender_idx = self.next_sender_idx;
self.next_sender_idx = (next_sender_idx + 1) % self.senders.len();
self.senders[next_sender_idx].send_async(val).await
}
}

pub enum PipelineReceiver {
InOrder(RoundRobinReceiver<PipelineResultType>),
OutOfOrder(Receiver<PipelineResultType>),
pub(crate) enum OrderingAwareReceiver<T> {
InOrder(RoundRobinReceiver<T>),
OutOfOrder(Receiver<T>),
}

impl PipelineReceiver {
pub async fn recv(&mut self) -> Option<PipelineResultType> {
impl<T> OrderingAwareReceiver<T> {
pub(crate) async fn recv(&mut self) -> Option<T> {
match self {
Self::InOrder(rr) => rr.recv().await,
Self::OutOfOrder(r) => r.recv().await,
Self::OutOfOrder(r) => r.recv_async().await.ok(),
}
}
}

pub struct RoundRobinReceiver<T> {
pub(crate) struct RoundRobinReceiver<T> {
receivers: Vec<Receiver<T>>,
curr_receiver_idx: usize,
is_done: bool,
}

impl<T> RoundRobinReceiver<T> {
pub fn new(receivers: Vec<Receiver<T>>) -> Self {
fn new(receivers: Vec<Receiver<T>>) -> Self {
Self {
receivers,
curr_receiver_idx: 0,
is_done: false,
}
}

pub async fn recv(&mut self) -> Option<T> {
async fn recv(&mut self) -> Option<T> {
if self.is_done {
return None;
}
for i in 0..self.receivers.len() {
let next_idx = (i + self.curr_receiver_idx) % self.receivers.len();
if let Some(val) = self.receivers[next_idx].recv().await {
if let Ok(val) = self.receivers[next_idx].recv_async().await {
self.curr_receiver_idx = (next_idx + 1) % self.receivers.len();
return Some(val);
}
Expand Down
Loading
Loading