Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming Local Writes for Native Executor #2871

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ daft-local-execution = {path = "src/daft-local-execution", default-features = fa
daft-micropartition = {path = "src/daft-micropartition", default-features = false}
daft-minhash = {path = "src/daft-minhash", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-physical-plan = {path = "src/daft-physical-plan", default-features = false}
daft-plan = {path = "src/daft-plan", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
Expand Down Expand Up @@ -47,6 +48,7 @@ python = [
"daft-json/python",
"daft-micropartition/python",
"daft-parquet/python",
"daft-physical-plan/python",
"daft-plan/python",
"daft-scan/python",
"daft-scheduler/python",
Expand Down
79 changes: 79 additions & 0 deletions daft/io/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import uuid
from typing import Optional, Union

from daft.daft import IOConfig
from daft.dependencies import pa, pacsv, pq
from daft.filesystem import (
_resolve_paths_and_filesystem,
canonicalize_protocol,
get_protocol_from_path,
)
from daft.table.micropartition import MicroPartition


class FileWriterBase:
def __init__(
self,
root_dir: str,
file_idx: int,
file_format: str,
compression: Optional[str] = None,
io_config: Optional[IOConfig] = None,
):
[self.resolved_path], self.fs = _resolve_paths_and_filesystem(root_dir, io_config=io_config)
protocol = get_protocol_from_path(root_dir)
canonicalized_protocol = canonicalize_protocol(protocol)
is_local_fs = canonicalized_protocol == "file"
if is_local_fs:
self.fs.create_dir(root_dir)

self.file_name = f"{uuid.uuid4()}-{file_idx}.{file_format}"
self.compression = compression if compression is not None else "none"
self.current_writer: Optional[Union[pq.ParquetWriter, pacsv.CSVWriter]] = None

def _create_writer(self, schema: pa.Schema):
raise NotImplementedError("Subclasses must implement this method.")

Check warning on line 35 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L35

Added line #L35 was not covered by tests

def write(self, table: MicroPartition):
if self.current_writer is None:
self.current_writer = self._create_writer(table.schema().to_pyarrow_schema())
self.current_writer.write_table(table.to_arrow())

Check warning on line 40 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L38-L40

Added lines #L38 - L40 were not covered by tests

def close(self) -> Optional[str]:
if self.current_writer is None:
return None
self.current_writer.close()
return f"{self.resolved_path}/{self.file_name}"

Check warning on line 46 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L43-L46

Added lines #L43 - L46 were not covered by tests


class ParquetFileWriter(FileWriterBase):
def __init__(
self,
root_dir: str,
file_idx: int,
compression: str = "none",
io_config: Optional[IOConfig] = None,
):
super().__init__(root_dir, file_idx, "parquet", compression, io_config)

def _create_writer(self, schema: pa.Schema) -> pq.ParquetWriter:
file_path = f"{self.resolved_path}/{self.file_name}"
return pq.ParquetWriter(

Check warning on line 61 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L60-L61

Added lines #L60 - L61 were not covered by tests
file_path,
schema,
compression=self.compression,
use_compliant_nested_type=False,
filesystem=self.fs,
)


class CSVFileWriter(FileWriterBase):
def __init__(self, root_dir: str, file_idx: int, io_config: Optional[IOConfig] = None):
super().__init__(root_dir, file_idx, "csv", None, io_config)

def _create_writer(self, schema: pa.Schema) -> pacsv.CSVWriter:
file_path = f"{self.resolved_path}/{self.file_name}"
return pacsv.CSVWriter(

Check warning on line 76 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L75-L76

Added lines #L75 - L76 were not covered by tests
file_path,
schema,
)
52 changes: 37 additions & 15 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{collections::HashMap, sync::Arc};

use common_daft_config::DaftExecutionConfig;
use common_display::{mermaid::MermaidDisplayVisitor, tree::TreeDisplay};
use common_error::DaftResult;
use common_file_formats::FileFormat;
use daft_core::{
datatypes::Field,
prelude::{Schema, SchemaRef},
Expand All @@ -10,8 +12,8 @@
use daft_dsl::{join::get_common_join_keys, Expr};
use daft_micropartition::MicroPartition;
use daft_physical_plan::{
Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, Project, Sort,
UnGroupedAggregate,
Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, PhysicalWrite,
Project, Sort, UnGroupedAggregate,
};
use daft_plan::{populate_aggregation_stages, JoinType};
use daft_table::{Probeable, Table};
Expand All @@ -27,8 +29,8 @@
},
sinks::{
aggregate::AggregateSink, blocking_sink::BlockingSinkNode,
hash_join_build::HashJoinBuildSink, limit::LimitSink, sort::SortSink,
streaming_sink::StreamingSinkNode,
hash_join_build::HashJoinBuildSink, limit::LimitSink, physical_write::PhysicalWriteSink,
sort::SortSink, streaming_sink::StreamingSinkNode,
},
sources::in_memory::InMemorySource,
ExecutionRuntimeHandle, PipelineCreationSnafu,
Expand Down Expand Up @@ -99,6 +101,7 @@
pub fn physical_plan_to_pipeline(
physical_plan: &LocalPhysicalPlan,
psets: &HashMap<String, Vec<Arc<MicroPartition>>>,
cfg: &Arc<DaftExecutionConfig>,
) -> crate::Result<Box<dyn PipelineNode>> {
use daft_physical_plan::PhysicalScan;

Expand All @@ -110,27 +113,29 @@
}
LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => {
let partitions = psets.get(&info.cache_key).expect("Cache key not found");
InMemorySource::new(partitions.clone()).boxed().into()
InMemorySource::new(partitions.clone(), info.source_schema.clone())
.boxed()
.into()
}
LocalPhysicalPlan::Project(Project {
input, projection, ..
}) => {
let proj_op = ProjectOperator::new(projection.clone());
let child_node = physical_plan_to_pipeline(input, psets)?;
let child_node = physical_plan_to_pipeline(input, psets, cfg)?;
IntermediateNode::new(Arc::new(proj_op), vec![child_node]).boxed()
}
LocalPhysicalPlan::Filter(Filter {
input, predicate, ..
}) => {
let filter_op = FilterOperator::new(predicate.clone());
let child_node = physical_plan_to_pipeline(input, psets)?;
let child_node = physical_plan_to_pipeline(input, psets, cfg)?;
IntermediateNode::new(Arc::new(filter_op), vec![child_node]).boxed()
}
LocalPhysicalPlan::Limit(Limit {
input, num_rows, ..
}) => {
let sink = LimitSink::new(*num_rows as usize);
let child_node = physical_plan_to_pipeline(input, psets)?;
let child_node = physical_plan_to_pipeline(input, psets, cfg)?;
StreamingSinkNode::new(sink.boxed(), vec![child_node]).boxed()
}
LocalPhysicalPlan::Concat(_) => {
Expand All @@ -156,7 +161,7 @@
.collect(),
vec![],
);
let child_node = physical_plan_to_pipeline(input, psets)?;
let child_node = physical_plan_to_pipeline(input, psets, cfg)?;
let post_first_agg_node =
IntermediateNode::new(Arc::new(first_stage_agg_op), vec![child_node]).boxed();

Expand Down Expand Up @@ -192,7 +197,7 @@
.collect(),
group_by.clone(),
);
let child_node = physical_plan_to_pipeline(input, psets)?;
let child_node = physical_plan_to_pipeline(input, psets, cfg)?;
let post_first_agg_node =
IntermediateNode::new(Arc::new(first_stage_agg_op), vec![child_node]).boxed();

Expand All @@ -218,7 +223,7 @@
..
}) => {
let sort_sink = SortSink::new(sort_by.clone(), descending.clone());
let child_node = physical_plan_to_pipeline(input, psets)?;
let child_node = physical_plan_to_pipeline(input, psets, cfg)?;
BlockingSinkNode::new(sort_sink.boxed(), child_node).boxed()
}
LocalPhysicalPlan::HashJoin(HashJoin {
Expand Down Expand Up @@ -290,11 +295,11 @@
// we should move to a builder pattern
let build_sink =
HashJoinBuildSink::new(key_schema.clone(), casted_build_on, join_type)?;
let build_child_node = physical_plan_to_pipeline(build_child, psets)?;
let build_child_node = physical_plan_to_pipeline(build_child, psets, cfg)?;
let build_node =
BlockingSinkNode::new(build_sink.boxed(), build_child_node).boxed();

let probe_child_node = physical_plan_to_pipeline(probe_child, psets)?;
let probe_child_node = physical_plan_to_pipeline(probe_child, psets, cfg)?;

match join_type {
JoinType::Anti | JoinType::Semi => DaftResult::Ok(IntermediateNode::new(
Expand Down Expand Up @@ -324,8 +329,25 @@
})?;
probe_node.boxed()
}
_ => {
unimplemented!("Physical plan not supported: {}", physical_plan.name());
LocalPhysicalPlan::PhysicalWrite(PhysicalWrite {
input, file_info, ..
}) => {
let child_node = physical_plan_to_pipeline(input, psets, cfg)?;
let (inflation_factor, target_file_size) = match file_info.file_format {
FileFormat::Parquet => (cfg.parquet_inflation_factor, cfg.parquet_target_filesize),
FileFormat::Csv => (cfg.csv_inflation_factor, cfg.csv_target_filesize),
_ => unreachable!("Unsupported file format"),

Check warning on line 339 in src/daft-local-execution/src/pipeline.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/pipeline.rs#L339

Added line #L339 was not covered by tests
};
let write_sink = PhysicalWriteSink::new(
file_info,
inflation_factor,
target_file_size,
cfg.parquet_target_row_group_size,
)
.context(PipelineCreationSnafu {
plan_name: physical_plan.name(),
})?;
BlockingSinkNode::new(write_sink.boxed(), child_node).boxed()
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/daft-local-execution/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub fn run_local(
results_buffer_size: Option<usize>,
) -> DaftResult<Box<dyn Iterator<Item = DaftResult<Arc<MicroPartition>>> + Send>> {
refresh_chrome_trace();
let mut pipeline = physical_plan_to_pipeline(physical_plan, &psets)?;
let mut pipeline = physical_plan_to_pipeline(physical_plan, &psets, &cfg)?;
let (tx, rx) = create_channel(results_buffer_size.unwrap_or(1));
let handle = std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
Expand Down
88 changes: 88 additions & 0 deletions src/daft-local-execution/src/sinks/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::{cmp::Ordering::*, collections::VecDeque, sync::Arc};

use common_error::DaftResult;
use daft_micropartition::MicroPartition;

pub struct SizeBasedBuffer {
pub buffer: VecDeque<Arc<MicroPartition>>,
pub curr_size: usize,
pub threshold: usize,
}

impl SizeBasedBuffer {
pub fn new(threshold: usize) -> Self {
assert!(threshold > 0);
Self {
buffer: VecDeque::new(),
curr_size: 0,
threshold,
}
}

pub fn push(&mut self, part: Arc<MicroPartition>) {
self.curr_size += part.size_bytes().unwrap().unwrap();
self.buffer.push_back(part);
}

pub fn try_clear(&mut self) -> Option<DaftResult<Arc<MicroPartition>>> {
match self.curr_size.cmp(&self.threshold) {
Less => None,
Equal => Some(self.concat_and_return()),

Check warning on line 30 in src/daft-local-execution/src/sinks/buffer.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/sinks/buffer.rs#L30

Added line #L30 was not covered by tests
Greater => Some(self.clear_enough()),
}
}

fn clear_enough(&mut self) -> DaftResult<Arc<MicroPartition>> {
assert!(self.curr_size > self.threshold);

let mut to_concat = Vec::with_capacity(self.buffer.len());
let mut remaining = self.threshold;

while remaining > 0 {
let part = self.buffer.pop_front().expect("Buffer should not be empty");
let part_size = part.size_bytes()?.unwrap();
if part_size <= remaining {
remaining -= part_size;
to_concat.push(part);
} else {

Check warning on line 47 in src/daft-local-execution/src/sinks/buffer.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/sinks/buffer.rs#L45-L47

Added lines #L45 - L47 were not covered by tests
let num_rows_to_take =
(remaining as f64 / part_size as f64 * part.len() as f64).ceil() as usize;
let (head, tail) = part.split_at(num_rows_to_take)?;
remaining = 0;
to_concat.push(Arc::new(head));
self.buffer.push_front(Arc::new(tail));
break;
}
}
assert_eq!(remaining, 0);

self.curr_size -= self.threshold;
match to_concat.len() {
1 => Ok(to_concat.pop().unwrap()),
_ => MicroPartition::concat(&to_concat.iter().map(|x| x.as_ref()).collect::<Vec<_>>())
.map(Arc::new),

Check warning on line 63 in src/daft-local-execution/src/sinks/buffer.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/sinks/buffer.rs#L62-L63

Added lines #L62 - L63 were not covered by tests
}
}

pub fn clear_all(&mut self) -> DaftResult<Vec<Arc<MicroPartition>>> {
if self.buffer.is_empty() {
return Ok(vec![]);
}
let mut res = vec![];
while let Some(part) = self.try_clear() {
res.push(part?);
}
if self.curr_size > 0 {
res.push(self.concat_and_return()?);
}

Check warning on line 77 in src/daft-local-execution/src/sinks/buffer.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/sinks/buffer.rs#L77

Added line #L77 was not covered by tests
Ok(res)
}

fn concat_and_return(&mut self) -> DaftResult<Arc<MicroPartition>> {
let concated =
MicroPartition::concat(&self.buffer.iter().map(|x| x.as_ref()).collect::<Vec<_>>())?;
self.buffer.clear();
self.curr_size = 0;
Ok(concated.into())
}
}
2 changes: 2 additions & 0 deletions src/daft-local-execution/src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub mod aggregate;
pub mod blocking_sink;
pub mod buffer;
pub mod concat;
pub mod hash_join_build;
pub mod limit;
pub mod physical_write;
pub mod sort;
pub mod streaming_sink;
Loading
Loading