Skip to content

Commit

Permalink
[PERF] Split parquet scan tasks into individual row groups (#1799)
Browse files Browse the repository at this point in the history
## Benchmark results 

All times averaged over 5 runs.

### Single file read

Read one file in S3 using Ray with 4 worker nodes

Parquet file info:
- number of rows: 18,751,674
- number of row groups: 18
- file size: 711.3 MiB
- number of columns: 16

| Split threshold (MiB) | # of scan tasks | Read time |
|--------|--------|--------|
| 32 | 18 | 3.84s |
| 64 | 9 | 4.21s |
| 128 | 5 | 5.47s |
| 256 | 3 | 4.45s |
| 512 | 2 | 6.06s |
| 1024 | 1 | 10.51s |

### Multi-file workflow

Read and aggregate (Dataframe.count()) 32 files in S3 using Ray with 4
worker nodes

Parquet file info:
- total rows: 600,037,902
- number of row groups per file: 18
- file sizes: 710.7-711.5 MiB
- number of columns: 16

| Split threshold (MiB) | # of scan tasks per file | Time (4 workers) |
Time (8 workers) |
|--------|--------|--------|--------|
| 32 | 18 | 23.83s | 14.17s |
| 64 | 9 | 24.99s | 14.17s |
| 128 | 5 | 26.51s | 15.23s |
| 256 | 3 | 27.96s | 16.58s |
| 512 | 2 | 30.27s | 16.85s |
| 1024 | 1 | 26.50s | 29.06s |

(Averaged over 5 runs)
  • Loading branch information
kevinzwang authored Feb 8, 2024
1 parent f471738 commit 8aba872
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 70 deletions.
19 changes: 11 additions & 8 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ def set_planning_config(

def set_execution_config(
config: PyDaftExecutionConfig | None = None,
merge_scan_tasks_min_size_bytes: int | None = None,
merge_scan_tasks_max_size_bytes: int | None = None,
scan_tasks_min_size_bytes: int | None = None,
scan_tasks_max_size_bytes: int | None = None,
broadcast_join_size_bytes_threshold: int | None = None,
parquet_split_row_groups_max_files: int | None = None,
sort_merge_join_sort_with_aligned_boundaries: bool | None = None,
sample_size_for_sort: int | None = None,
num_preview_rows: int | None = None,
Expand All @@ -221,14 +222,15 @@ def set_execution_config(
Args:
config: A PyDaftExecutionConfig object to set the config to, before applying other kwargs. Defaults to None which indicates
that the old (current) config should be used.
merge_scan_tasks_min_size_bytes: Minimum size in bytes when merging ScanTasks when reading files from storage.
scan_tasks_min_size_bytes: Minimum size in bytes when merging ScanTasks when reading files from storage.
Increasing this value will make Daft perform more merging of files into a single partition before yielding,
which leads to bigger but fewer partitions. (Defaults to 64 MiB)
merge_scan_tasks_max_size_bytes: Maximum size in bytes when merging ScanTasks when reading files from storage.
which leads to bigger but fewer partitions. (Defaults to 96 MiB)
scan_tasks_max_size_bytes: Maximum size in bytes when merging ScanTasks when reading files from storage.
Increasing this value will increase the upper bound of the size of merged ScanTasks, which leads to bigger but
fewer partitions. (Defaults to 512 MiB)
fewer partitions. (Defaults to 384 MiB)
broadcast_join_size_bytes_threshold: If one side of a join is smaller than this threshold, a broadcast join will be used.
Default is 10 MiB.
parquet_split_row_groups_max_files: Maximum number of files to read in which the row group splitting should happen. (Defaults to 10)
sort_merge_join_sort_with_aligned_boundaries: Whether to use a specialized algorithm for sorting both sides of a
sort-merge join such that they have aligned boundaries. This can lead to a faster merge-join at the cost of
more skewed sorted join inputs, increasing the risk of OOMs.
Expand All @@ -246,9 +248,10 @@ def set_execution_config(
ctx = get_context()
old_daft_execution_config = ctx.daft_execution_config if config is None else config
new_daft_execution_config = old_daft_execution_config.with_config_values(
merge_scan_tasks_min_size_bytes=merge_scan_tasks_min_size_bytes,
merge_scan_tasks_max_size_bytes=merge_scan_tasks_max_size_bytes,
scan_tasks_min_size_bytes=scan_tasks_min_size_bytes,
scan_tasks_max_size_bytes=scan_tasks_max_size_bytes,
broadcast_join_size_bytes_threshold=broadcast_join_size_bytes_threshold,
parquet_split_row_groups_max_files=parquet_split_row_groups_max_files,
sort_merge_join_sort_with_aligned_boundaries=sort_merge_join_sort_with_aligned_boundaries,
sample_size_for_sort=sample_size_for_sort,
num_preview_rows=num_preview_rows,
Expand Down
9 changes: 5 additions & 4 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1182,9 +1182,10 @@ class LogicalPlanBuilder:
class PyDaftExecutionConfig:
def with_config_values(
self,
merge_scan_tasks_min_size_bytes: int | None = None,
merge_scan_tasks_max_size_bytes: int | None = None,
scan_tasks_min_size_bytes: int | None = None,
scan_tasks_max_size_bytes: int | None = None,
broadcast_join_size_bytes_threshold: int | None = None,
parquet_split_row_groups_max_files: int | None = None,
sort_merge_join_sort_with_aligned_boundaries: bool | None = None,
sample_size_for_sort: int | None = None,
num_preview_rows: int | None = None,
Expand All @@ -1195,9 +1196,9 @@ class PyDaftExecutionConfig:
csv_inflation_factor: float | None = None,
) -> PyDaftExecutionConfig: ...
@property
def merge_scan_tasks_min_size_bytes(self) -> int: ...
def scan_tasks_min_size_bytes(self) -> int: ...
@property
def merge_scan_tasks_max_size_bytes(self) -> int: ...
def scan_tasks_max_size_bytes(self) -> int: ...
@property
def broadcast_join_size_bytes_threshold(self) -> int: ...
@property
Expand Down
10 changes: 6 additions & 4 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ pub struct DaftPlanningConfig {
/// 5. Task local execution
#[derive(Clone, Serialize, Deserialize)]
pub struct DaftExecutionConfig {
pub merge_scan_tasks_min_size_bytes: usize,
pub merge_scan_tasks_max_size_bytes: usize,
pub scan_tasks_min_size_bytes: usize,
pub scan_tasks_max_size_bytes: usize,
pub broadcast_join_size_bytes_threshold: usize,
pub sort_merge_join_sort_with_aligned_boundaries: bool,
pub sample_size_for_sort: usize,
pub parquet_split_row_groups_max_files: usize,
pub num_preview_rows: usize,
pub parquet_target_filesize: usize,
pub parquet_target_row_group_size: usize,
Expand All @@ -38,11 +39,12 @@ pub struct DaftExecutionConfig {
impl Default for DaftExecutionConfig {
fn default() -> Self {
DaftExecutionConfig {
merge_scan_tasks_min_size_bytes: 64 * 1024 * 1024, // 64MB
merge_scan_tasks_max_size_bytes: 512 * 1024 * 1024, // 512MB
scan_tasks_min_size_bytes: 96 * 1024 * 1024, // 96MB
scan_tasks_max_size_bytes: 384 * 1024 * 1024, // 384MB
broadcast_join_size_bytes_threshold: 10 * 1024 * 1024, // 10 MiB
sort_merge_join_sort_with_aligned_boundaries: false,
sample_size_for_sort: 20,
parquet_split_row_groups_max_files: 10,
num_preview_rows: 8,
parquet_target_filesize: 512 * 1024 * 1024, // 512MB
parquet_target_row_group_size: 128 * 1024 * 1024, // 128MB
Expand Down
24 changes: 14 additions & 10 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ impl PyDaftExecutionConfig {
#[allow(clippy::too_many_arguments)]
fn with_config_values(
&self,
merge_scan_tasks_min_size_bytes: Option<usize>,
merge_scan_tasks_max_size_bytes: Option<usize>,
scan_tasks_min_size_bytes: Option<usize>,
scan_tasks_max_size_bytes: Option<usize>,
broadcast_join_size_bytes_threshold: Option<usize>,
parquet_split_row_groups_max_files: Option<usize>,
sort_merge_join_sort_with_aligned_boundaries: Option<bool>,
sample_size_for_sort: Option<usize>,
num_preview_rows: Option<usize>,
Expand All @@ -91,15 +92,18 @@ impl PyDaftExecutionConfig {
) -> PyResult<PyDaftExecutionConfig> {
let mut config = self.config.as_ref().clone();

if let Some(merge_scan_tasks_max_size_bytes) = merge_scan_tasks_max_size_bytes {
config.merge_scan_tasks_max_size_bytes = merge_scan_tasks_max_size_bytes;
if let Some(scan_tasks_max_size_bytes) = scan_tasks_max_size_bytes {
config.scan_tasks_max_size_bytes = scan_tasks_max_size_bytes;
}
if let Some(merge_scan_tasks_min_size_bytes) = merge_scan_tasks_min_size_bytes {
config.merge_scan_tasks_min_size_bytes = merge_scan_tasks_min_size_bytes;
if let Some(scan_tasks_min_size_bytes) = scan_tasks_min_size_bytes {
config.scan_tasks_min_size_bytes = scan_tasks_min_size_bytes;
}
if let Some(broadcast_join_size_bytes_threshold) = broadcast_join_size_bytes_threshold {
config.broadcast_join_size_bytes_threshold = broadcast_join_size_bytes_threshold;
}
if let Some(parquet_split_row_groups_max_files) = parquet_split_row_groups_max_files {
config.parquet_split_row_groups_max_files = parquet_split_row_groups_max_files;
}
if let Some(sort_merge_join_sort_with_aligned_boundaries) =
sort_merge_join_sort_with_aligned_boundaries
{
Expand Down Expand Up @@ -134,13 +138,13 @@ impl PyDaftExecutionConfig {
}

#[getter]
fn get_merge_scan_tasks_min_size_bytes(&self) -> PyResult<usize> {
Ok(self.config.merge_scan_tasks_min_size_bytes)
fn get_scan_tasks_min_size_bytes(&self) -> PyResult<usize> {
Ok(self.config.scan_tasks_min_size_bytes)
}

#[getter]
fn get_merge_scan_tasks_max_size_bytes(&self) -> PyResult<usize> {
Ok(self.config.merge_scan_tasks_max_size_bytes)
fn get_scan_tasks_max_size_bytes(&self) -> PyResult<usize> {
Ok(self.config.scan_tasks_max_size_bytes)
}

#[getter]
Expand Down
11 changes: 9 additions & 2 deletions src/daft-plan/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,18 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc<DaftExecutionConfig>) -> DaftRe
})) => {
let scan_tasks = scan_op.0.to_scan_tasks(pushdowns.clone())?;

let scan_tasks = daft_scan::scan_task_iters::split_by_row_groups(
scan_tasks,
cfg.parquet_split_row_groups_max_files,
cfg.scan_tasks_min_size_bytes,
cfg.scan_tasks_max_size_bytes,
);

// Apply transformations on the ScanTasks to optimize
let scan_tasks = daft_scan::scan_task_iters::merge_by_sizes(
scan_tasks,
cfg.merge_scan_tasks_min_size_bytes,
cfg.merge_scan_tasks_max_size_bytes,
cfg.scan_tasks_min_size_bytes,
cfg.scan_tasks_max_size_bytes,
);
let scan_tasks = scan_tasks.collect::<DaftResult<Vec<_>>>()?;
if scan_tasks.is_empty() {
Expand Down
38 changes: 3 additions & 35 deletions src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use std::{fmt::Display, sync::Arc};
use common_error::{DaftError, DaftResult};
use daft_core::schema::SchemaRef;
use daft_csv::CsvParseOptions;
use daft_io::{
get_io_client, get_runtime, parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef,
};
use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef};
use daft_parquet::read::ParquetSchemaInferenceOptions;
use futures::{stream::BoxStream, StreamExt};
use itertools::Itertools;
Expand Down Expand Up @@ -84,36 +82,6 @@ fn run_glob(
Ok(Box::new(iterator))
}

fn get_io_client_and_runtime(
storage_config: &StorageConfig,
) -> DaftResult<(Arc<tokio::runtime::Runtime>, Arc<IOClient>)> {
// Grab an IOClient and Runtime
// TODO: This should be cleaned up and hidden behind a better API from daft-io
match storage_config {
StorageConfig::Native(cfg) => {
let multithreaded_io = cfg.multithreaded_io;
Ok((
get_runtime(multithreaded_io)?,
get_io_client(
multithreaded_io,
Arc::new(cfg.io_config.clone().unwrap_or_default()),
)?,
))
}
#[cfg(feature = "python")]
StorageConfig::Python(cfg) => {
let multithreaded_io = true; // Hardcode to use multithreaded IO if Python storage config is used for data fetches
Ok((
get_runtime(multithreaded_io)?,
get_io_client(
multithreaded_io,
Arc::new(cfg.io_config.clone().unwrap_or_default()),
)?,
))
}
}
}

impl GlobScanOperator {
pub fn try_new(
glob_paths: &[&str],
Expand All @@ -128,7 +96,7 @@ impl GlobScanOperator {
Some(path) => Ok(path),
}?;

let (io_runtime, io_client) = get_io_client_and_runtime(storage_config.as_ref())?;
let (io_runtime, io_client) = storage_config.get_io_client_and_runtime()?;
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator::try_new schema inference for {first_glob_path}"
));
Expand Down Expand Up @@ -271,7 +239,7 @@ impl ScanOperator for GlobScanOperator {
&self,
pushdowns: Pushdowns,
) -> DaftResult<Box<dyn Iterator<Item = DaftResult<ScanTaskRef>> + 'static>> {
let (io_runtime, io_client) = get_io_client_and_runtime(self.storage_config.as_ref())?;
let (io_runtime, io_client) = self.storage_config.get_io_client_and_runtime()?;
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator::to_scan_tasks for {:#?}",
self.glob_paths
Expand Down
109 changes: 108 additions & 1 deletion src/daft-scan/src/scan_task_iters.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_io::IOStatsContext;
use daft_parquet::read::read_parquet_metadata;

use crate::{ScanTask, ScanTaskRef};
use crate::{
file_format::FileFormatConfig, storage_config::StorageConfig, ChunkSpec, DataFileSource,
ScanTask, ScanTaskRef,
};

type BoxScanTaskIter = Box<dyn Iterator<Item = DaftResult<ScanTaskRef>>>;

Expand Down Expand Up @@ -111,3 +116,105 @@ impl Iterator for MergeByFileSize {
}
}
}

pub fn split_by_row_groups(
scan_tasks: BoxScanTaskIter,
max_tasks: usize,
min_size_bytes: usize,
max_size_bytes: usize,
) -> BoxScanTaskIter {
let mut scan_tasks = itertools::peek_nth(scan_tasks);

// only split if we have a small amount of files
if scan_tasks.peek_nth(max_tasks).is_some() {
Box::new(scan_tasks)
} else {
Box::new(
scan_tasks
.map(move |t| -> DaftResult<BoxScanTaskIter> {
let t = t?;

/* Only split parquet tasks if they:
- have one source
- use native storage config
- have no specified chunk spec or number of rows
- have size past split threshold
*/
if let (
FileFormatConfig::Parquet(_),
StorageConfig::Native(_),
[source],
Some(None),
None,
) = (
t.file_format_config.as_ref(),
t.storage_config.as_ref(),
&t.sources[..],
t.sources.get(0).map(DataFileSource::get_chunk_spec),
t.pushdowns.limit,
) && source.get_size_bytes().map_or(true, |s| s > max_size_bytes as u64) {
let (io_runtime, io_client) =
t.storage_config.get_io_client_and_runtime()?;

let path = source.get_path();

let io_stats =
IOStatsContext::new(format!("split_by_row_groups for {:#?}", path));

let runtime_handle = io_runtime.handle();

let file = runtime_handle.block_on(read_parquet_metadata(
path,
io_client,
Some(io_stats),
))?;

let mut new_tasks: Vec<DaftResult<ScanTaskRef>> = Vec::new();
let mut curr_row_groups = Vec::new();
let mut curr_size_bytes = 0;

for (i, rg) in file.row_groups.iter().enumerate() {
curr_row_groups.push(i as i64);
curr_size_bytes += rg.compressed_size();

if curr_size_bytes >= min_size_bytes || i == file.row_groups.len() - 1 {
let mut new_source = source.clone();
match &mut new_source {
DataFileSource::AnonymousDataFile {
chunk_spec,
size_bytes,
..
}
| DataFileSource::CatalogDataFile {
chunk_spec,
size_bytes,
..
} => {
*chunk_spec = Some(ChunkSpec::Parquet(curr_row_groups));
*size_bytes = Some(curr_size_bytes as u64);

curr_row_groups = Vec::new();
curr_size_bytes = 0;
}
};

new_tasks.push(Ok(ScanTask::new(
vec![new_source],
t.file_format_config.clone(),
t.schema.clone(),
t.storage_config.clone(),
t.pushdowns.clone(),
)
.into()));
}
}

Ok(Box::new(new_tasks.into_iter()))
} else {
Ok(Box::new(std::iter::once(Ok(t))))
}
})
.flat_map(|t| t.unwrap_or_else(|e| Box::new(std::iter::once(Err(e))))),
)
}
}
Loading

0 comments on commit 8aba872

Please sign in to comment.