Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming Local Writes for Native Executor #2871

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ daft-local-execution = {path = "src/daft-local-execution", default-features = fa
daft-micropartition = {path = "src/daft-micropartition", default-features = false}
daft-minhash = {path = "src/daft-minhash", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-physical-plan = {path = "src/daft-physical-plan", default-features = false}
daft-plan = {path = "src/daft-plan", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
Expand Down Expand Up @@ -47,6 +48,7 @@ python = [
"daft-json/python",
"daft-micropartition/python",
"daft-parquet/python",
"daft-physical-plan/python",
"daft-plan/python",
"daft-scan/python",
"daft-scheduler/python",
Expand Down
79 changes: 79 additions & 0 deletions daft/io/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import uuid
from typing import Optional, Union

Check warning on line 2 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L1-L2

Added lines #L1 - L2 were not covered by tests

from daft.daft import IOConfig
from daft.dependencies import pa, pacsv, pq
from daft.filesystem import (

Check warning on line 6 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L4-L6

Added lines #L4 - L6 were not covered by tests
_resolve_paths_and_filesystem,
canonicalize_protocol,
get_protocol_from_path,
)
from daft.table.micropartition import MicroPartition

Check warning on line 11 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L11

Added line #L11 was not covered by tests


class FileWriterBase:
def __init__(

Check warning on line 15 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L14-L15

Added lines #L14 - L15 were not covered by tests
self,
root_dir: str,
file_idx: int,
file_format: str,
compression: Optional[str] = None,
io_config: Optional[IOConfig] = None,
):
[self.resolved_path], self.fs = _resolve_paths_and_filesystem(root_dir, io_config=io_config)
protocol = get_protocol_from_path(root_dir)
canonicalized_protocol = canonicalize_protocol(protocol)
is_local_fs = canonicalized_protocol == "file"
if is_local_fs:
self.fs.create_dir(root_dir)

Check warning on line 28 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L23-L28

Added lines #L23 - L28 were not covered by tests

self.file_name = f"{uuid.uuid4()}-{file_idx}.{file_format}"
self.full_path = f"{self.resolved_path}/{self.file_name}"
self.compression = compression if compression is not None else "none"
self.current_writer: Optional[Union[pq.ParquetWriter, pacsv.CSVWriter]] = None

Check warning on line 33 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L30-L33

Added lines #L30 - L33 were not covered by tests

def _create_writer(self, schema: pa.Schema):
raise NotImplementedError("Subclasses must implement this method.")

Check warning on line 36 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L35-L36

Added lines #L35 - L36 were not covered by tests

def write(self, table: MicroPartition):
if self.current_writer is None:
self.current_writer = self._create_writer(table.schema().to_pyarrow_schema())
self.current_writer.write_table(table.to_arrow())

Check warning on line 41 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L38-L41

Added lines #L38 - L41 were not covered by tests

def close(self) -> Optional[str]:
if self.current_writer is None:
return None
self.current_writer.close()
return self.full_path

Check warning on line 47 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L43-L47

Added lines #L43 - L47 were not covered by tests


class ParquetFileWriter(FileWriterBase):
def __init__(

Check warning on line 51 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L50-L51

Added lines #L50 - L51 were not covered by tests
self,
root_dir: str,
file_idx: int,
compression: str = "none",
io_config: Optional[IOConfig] = None,
):
super().__init__(root_dir, file_idx, "parquet", compression, io_config)

Check warning on line 58 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L58

Added line #L58 was not covered by tests

def _create_writer(self, schema: pa.Schema) -> pq.ParquetWriter:
return pq.ParquetWriter(

Check warning on line 61 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L60-L61

Added lines #L60 - L61 were not covered by tests
self.full_path,
schema,
compression=self.compression,
use_compliant_nested_type=False,
filesystem=self.fs,
)


class CSVFileWriter(FileWriterBase):
def __init__(self, root_dir: str, file_idx: int, io_config: Optional[IOConfig] = None):
super().__init__(root_dir, file_idx, "csv", None, io_config)

Check warning on line 72 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L70-L72

Added lines #L70 - L72 were not covered by tests

def _create_writer(self, schema: pa.Schema) -> pacsv.CSVWriter:
file_path = f"{self.resolved_path}/{self.file_name}"
return pacsv.CSVWriter(

Check warning on line 76 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L74-L76

Added lines #L74 - L76 were not covered by tests
file_path,
schema,
)
89 changes: 89 additions & 0 deletions src/daft-local-execution/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::{cmp::Ordering::*, collections::VecDeque, sync::Arc};

use common_error::DaftResult;
use daft_micropartition::MicroPartition;

pub struct RowBasedBuffer {
pub buffer: VecDeque<Arc<MicroPartition>>,
pub curr_len: usize,
pub threshold: usize,
}

impl RowBasedBuffer {
pub fn new(threshold: usize) -> Self {
assert!(threshold > 0);
Self {
buffer: VecDeque::new(),
curr_len: 0,
threshold,
}
}

pub fn push(&mut self, part: Arc<MicroPartition>) {
self.curr_len += part.len();
self.buffer.push_back(part);
}

pub fn pop_enough(&mut self) -> DaftResult<Option<Vec<Arc<MicroPartition>>>> {
match self.curr_len.cmp(&self.threshold) {
Less => Ok(None),
Equal => {
if self.buffer.len() == 1 {
let part = self.buffer.pop_front().unwrap();
self.curr_len = 0;
Ok(Some(vec![part]))

Check warning on line 34 in src/daft-local-execution/src/buffer.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/buffer.rs#L31-L34

Added lines #L31 - L34 were not covered by tests
} else {
let chunk = MicroPartition::concat(
&std::mem::take(&mut self.buffer)
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>(),
)?;
self.curr_len = 0;
Ok(Some(vec![Arc::new(chunk)]))

Check warning on line 43 in src/daft-local-execution/src/buffer.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/buffer.rs#L36-L43

Added lines #L36 - L43 were not covered by tests
}
}
Greater => {
let num_ready_chunks = self.curr_len / self.threshold;
let concated = MicroPartition::concat(
&std::mem::take(&mut self.buffer)
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>(),
)?;
let mut start = 0;
let mut parts_to_return = Vec::with_capacity(num_ready_chunks);
for _ in 0..num_ready_chunks {
let end = start + self.threshold;
let part = Arc::new(concated.slice(start, end)?);
parts_to_return.push(part);
start = end;
}
if start < concated.len() {
let part = Arc::new(concated.slice(start, concated.len())?);
self.curr_len = part.len();
self.buffer.push_back(part);
} else {
self.curr_len = 0;
}

Check warning on line 68 in src/daft-local-execution/src/buffer.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/buffer.rs#L66-L68

Added lines #L66 - L68 were not covered by tests
Ok(Some(parts_to_return))
}
}
}

pub fn pop_all(&mut self) -> DaftResult<Option<Arc<MicroPartition>>> {
assert!(self.curr_len < self.threshold);
if self.buffer.is_empty() {
Ok(None)
} else {
let concated = MicroPartition::concat(
&std::mem::take(&mut self.buffer)
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>(),
)?;
self.curr_len = 0;
Ok(Some(Arc::new(concated)))
}
}
}
77 changes: 0 additions & 77 deletions src/daft-local-execution/src/intermediate_ops/buffer.rs

This file was deleted.

19 changes: 8 additions & 11 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
use daft_micropartition::MicroPartition;
use tracing::{info_span, instrument};

use super::buffer::OperatorBuffer;
use crate::{
buffer::RowBasedBuffer,
channel::{create_channel, PipelineChannel, Receiver, Sender},
pipeline::{PipelineNode, PipelineResultType},
runtime_stats::{CountingReceiver, CountingSender, RuntimeStatsContext},
Expand Down Expand Up @@ -135,26 +135,23 @@
};

for (idx, mut receiver) in receivers.into_iter().enumerate() {
let mut buffer = OperatorBuffer::new(morsel_size);
let mut buffer = RowBasedBuffer::new(morsel_size);
while let Some(morsel) = receiver.recv().await {
if morsel.should_broadcast() {
for worker_sender in worker_senders.iter() {
let _ = worker_sender.send((idx, morsel.clone())).await;
}
} else {
buffer.push(morsel.as_data().clone());
if let Some(ready) = buffer.try_clear() {
let _ = send_to_next_worker(idx, ready?.into()).await;
if let Some(ready) = buffer.pop_enough()? {
for part in ready {
let _ = send_to_next_worker(idx, part.into()).await;

Check warning on line 148 in src/daft-local-execution/src/intermediate_ops/intermediate_op.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/intermediate_ops/intermediate_op.rs#L147-L148

Added lines #L147 - L148 were not covered by tests
}
}
}
}
// Buffer may still have some morsels left above the threshold
while let Some(ready) = buffer.try_clear() {
let _ = send_to_next_worker(idx, ready?.into()).await;
}
// Clear all remaining morsels
if let Some(last_morsel) = buffer.clear_all() {
let _ = send_to_next_worker(idx, last_morsel?.into()).await;
if let Some(ready) = buffer.pop_all()? {
let _ = send_to_next_worker(idx, ready.into()).await;
}
}
Ok(())
Expand Down
1 change: 0 additions & 1 deletion src/daft-local-execution/src/intermediate_ops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod aggregate;
pub mod anti_semi_hash_join_probe;
pub mod buffer;
pub mod filter;
pub mod hash_join_probe;
pub mod intermediate_op;
Expand Down
10 changes: 8 additions & 2 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![feature(let_chains)]
mod buffer;
mod channel;
mod intermediate_ops;
mod pipeline;
Expand All @@ -14,15 +15,20 @@ lazy_static! {
pub static ref NUM_CPUS: usize = std::thread::available_parallelism().unwrap().get();
}

pub(crate) type TaskSet<T> = tokio::task::JoinSet<T>;
pub(crate) fn create_task_set<T>() -> TaskSet<T> {
TaskSet::new()
}

pub struct ExecutionRuntimeHandle {
worker_set: tokio::task::JoinSet<crate::Result<()>>,
worker_set: TaskSet<crate::Result<()>>,
default_morsel_size: usize,
}

impl ExecutionRuntimeHandle {
pub fn new(default_morsel_size: usize) -> Self {
Self {
worker_set: tokio::task::JoinSet::new(),
worker_set: create_task_set(),
default_morsel_size,
}
}
Expand Down
Loading
Loading