Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Split parquet scan tasks into individual row groups #1799

Merged
merged 15 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ def set_execution_config(
merge_scan_tasks_min_size_bytes: int | None = None,
merge_scan_tasks_max_size_bytes: int | None = None,
broadcast_join_size_bytes_threshold: int | None = None,
split_row_groups_max_files: int | None = None,
kevinzwang marked this conversation as resolved.
Show resolved Hide resolved
split_row_groups_threshold_bytes: int | None = None,
split_row_groups_min_size_bytes: int | None = None,
sample_size_for_sort: int | None = None,
num_preview_rows: int | None = None,
parquet_target_filesize: int | None = None,
Expand All @@ -228,6 +231,9 @@ def set_execution_config(
fewer partitions. (Defaults to 512 MiB)
broadcast_join_size_bytes_threshold: If one side of a join is smaller than this threshold, a broadcast join will be used.
Default is 10 MiB.
split_row_groups_max_files: Maximum number of files to read in which the row group splitting should happen. (Defaults to 10)
split_row_groups_threshold_bytes: Threshold in which a file should be split by row group. (Defaults to 24 MiB)
split_row_groups_min_size_bytes: Minimum size of scan tasks when splitting by row group. (Defaults to 16 MiB)
sample_size_for_sort: number of elements to sample from each partition when running sort,
Default is 20.
num_preview_rows: number of rows to when showing a dataframe preview,
Expand All @@ -245,6 +251,9 @@ def set_execution_config(
merge_scan_tasks_min_size_bytes=merge_scan_tasks_min_size_bytes,
merge_scan_tasks_max_size_bytes=merge_scan_tasks_max_size_bytes,
broadcast_join_size_bytes_threshold=broadcast_join_size_bytes_threshold,
split_row_groups_max_files=split_row_groups_max_files,
split_row_groups_threshold_bytes=split_row_groups_threshold_bytes,
split_row_groups_min_size_bytes=split_row_groups_min_size_bytes,
sample_size_for_sort=sample_size_for_sort,
num_preview_rows=num_preview_rows,
parquet_target_filesize=parquet_target_filesize,
Expand Down
3 changes: 3 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,9 @@ class PyDaftExecutionConfig:
merge_scan_tasks_min_size_bytes: int | None = None,
merge_scan_tasks_max_size_bytes: int | None = None,
broadcast_join_size_bytes_threshold: int | None = None,
split_row_groups_max_files: int | None = None,
split_row_groups_threshold_bytes: int | None = None,
split_row_groups_min_size_bytes: int | None = None,
sample_size_for_sort: int | None = None,
num_preview_rows: int | None = None,
parquet_target_filesize: int | None = None,
Expand Down
6 changes: 6 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub struct DaftExecutionConfig {
pub merge_scan_tasks_max_size_bytes: usize,
pub broadcast_join_size_bytes_threshold: usize,
pub sample_size_for_sort: usize,
pub split_row_groups_max_files: usize,
kevinzwang marked this conversation as resolved.
Show resolved Hide resolved
pub split_row_groups_threshold_bytes: usize,
pub split_row_groups_min_size_bytes: usize,
pub num_preview_rows: usize,
kevinzwang marked this conversation as resolved.
Show resolved Hide resolved
pub parquet_target_filesize: usize,
pub parquet_target_row_group_size: usize,
Expand All @@ -41,6 +44,9 @@ impl Default for DaftExecutionConfig {
merge_scan_tasks_max_size_bytes: 512 * 1024 * 1024, // 512MB
broadcast_join_size_bytes_threshold: 10 * 1024 * 1024, // 10 MiB
sample_size_for_sort: 20,
split_row_groups_max_files: 10,
split_row_groups_threshold_bytes: 24 * 1024 * 1024, // 24MB
kevinzwang marked this conversation as resolved.
Show resolved Hide resolved
split_row_groups_min_size_bytes: 16 * 1024 * 1024, // 16MB
num_preview_rows: 8,
parquet_target_filesize: 512 * 1024 * 1024, // 512MB
parquet_target_row_group_size: 128 * 1024 * 1024, // 128MB
Expand Down
12 changes: 12 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ impl PyDaftExecutionConfig {
merge_scan_tasks_min_size_bytes: Option<usize>,
merge_scan_tasks_max_size_bytes: Option<usize>,
broadcast_join_size_bytes_threshold: Option<usize>,
split_row_groups_max_files: Option<usize>,
split_row_groups_threshold_bytes: Option<usize>,
split_row_groups_min_size_bytes: Option<usize>,
sample_size_for_sort: Option<usize>,
num_preview_rows: Option<usize>,
parquet_target_filesize: Option<usize>,
Expand All @@ -99,6 +102,15 @@ impl PyDaftExecutionConfig {
if let Some(broadcast_join_size_bytes_threshold) = broadcast_join_size_bytes_threshold {
config.broadcast_join_size_bytes_threshold = broadcast_join_size_bytes_threshold;
}
if let Some(split_row_groups_max_files) = split_row_groups_max_files {
config.split_row_groups_max_files = split_row_groups_max_files
}
if let Some(split_row_groups_threshold_bytes) = split_row_groups_threshold_bytes {
config.split_row_groups_threshold_bytes = split_row_groups_threshold_bytes
}
if let Some(split_row_groups_min_size_bytes) = split_row_groups_min_size_bytes {
config.split_row_groups_min_size_bytes = split_row_groups_min_size_bytes
}
if let Some(sample_size_for_sort) = sample_size_for_sort {
config.sample_size_for_sort = sample_size_for_sort;
}
Expand Down
7 changes: 7 additions & 0 deletions src/daft-plan/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc<DaftExecutionConfig>) -> DaftRe
})) => {
let scan_tasks = scan_op.0.to_scan_tasks(pushdowns.clone())?;

let scan_tasks = daft_scan::scan_task_iters::split_by_row_groups(
scan_tasks,
cfg.split_row_groups_max_files,
cfg.split_row_groups_threshold_bytes,
cfg.split_row_groups_min_size_bytes,
);
kevinzwang marked this conversation as resolved.
Show resolved Hide resolved

// Apply transformations on the ScanTasks to optimize
let scan_tasks = daft_scan::scan_task_iters::merge_by_sizes(
scan_tasks,
Expand Down
1 change: 1 addition & 0 deletions src/daft-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ daft-parquet = {path = "../daft-parquet", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
itertools = {workspace = true}
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true}
serde = {workspace = true}
Expand Down
38 changes: 3 additions & 35 deletions src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use std::{fmt::Display, sync::Arc};
use common_error::{DaftError, DaftResult};
use daft_core::schema::SchemaRef;
use daft_csv::CsvParseOptions;
use daft_io::{
get_io_client, get_runtime, parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef,
};
use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef};
use daft_parquet::read::ParquetSchemaInferenceOptions;
use futures::{stream::BoxStream, StreamExt};
use snafu::Snafu;
Expand Down Expand Up @@ -83,36 +81,6 @@ fn run_glob(
Ok(Box::new(iterator))
}

fn get_io_client_and_runtime(
storage_config: &StorageConfig,
) -> DaftResult<(Arc<tokio::runtime::Runtime>, Arc<IOClient>)> {
// Grab an IOClient and Runtime
// TODO: This should be cleaned up and hidden behind a better API from daft-io
match storage_config {
StorageConfig::Native(cfg) => {
let multithreaded_io = cfg.multithreaded_io;
Ok((
get_runtime(multithreaded_io)?,
get_io_client(
multithreaded_io,
Arc::new(cfg.io_config.clone().unwrap_or_default()),
)?,
))
}
#[cfg(feature = "python")]
StorageConfig::Python(cfg) => {
let multithreaded_io = true; // Hardcode to use multithreaded IO if Python storage config is used for data fetches
Ok((
get_runtime(multithreaded_io)?,
get_io_client(
multithreaded_io,
Arc::new(cfg.io_config.clone().unwrap_or_default()),
)?,
))
}
}
}

impl GlobScanOperator {
pub fn try_new(
glob_paths: &[&str],
Expand All @@ -127,7 +95,7 @@ impl GlobScanOperator {
Some(path) => Ok(path),
}?;

let (io_runtime, io_client) = get_io_client_and_runtime(storage_config.as_ref())?;
let (io_runtime, io_client) = storage_config.get_io_client_and_runtime()?;
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator::try_new schema inference for {first_glob_path}"
));
Expand Down Expand Up @@ -246,7 +214,7 @@ impl ScanOperator for GlobScanOperator {
&self,
pushdowns: Pushdowns,
) -> DaftResult<Box<dyn Iterator<Item = DaftResult<ScanTaskRef>> + 'static>> {
let (io_runtime, io_client) = get_io_client_and_runtime(self.storage_config.as_ref())?;
let (io_runtime, io_client) = self.storage_config.get_io_client_and_runtime()?;
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator::to_scan_tasks for {:#?}",
self.glob_paths
Expand Down
113 changes: 112 additions & 1 deletion src/daft-scan/src/scan_task_iters.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_io::IOStatsContext;
use daft_parquet::read::read_parquet_metadata;

use crate::{ScanTask, ScanTaskRef};
use crate::{
file_format::FileFormatConfig, storage_config::StorageConfig, ChunkSpec, DataFileSource,
ScanTask, ScanTaskRef,
};

type BoxScanTaskIter = Box<dyn Iterator<Item = DaftResult<ScanTaskRef>>>;

Expand Down Expand Up @@ -111,3 +116,109 @@ impl Iterator for MergeByFileSize {
}
}
}

pub fn split_by_row_groups(
scan_tasks: BoxScanTaskIter,
max_tasks: usize,
split_threshold_bytes: usize,
min_size_bytes: usize,
) -> BoxScanTaskIter {
let mut scan_tasks = itertools::peek_nth(scan_tasks);

// only split if we have a small amount of files
if scan_tasks.peek_nth(max_tasks).is_some() {
return Box::new(scan_tasks);
kevinzwang marked this conversation as resolved.
Show resolved Hide resolved
}

Box::new(
scan_tasks
.map(move |t| -> DaftResult<BoxScanTaskIter> {
let t = t?;

/* Only split parquet tasks if they:
- have one source
- use native storage config
- have no specified chunk spec or number of rows
- have size past split threshold
*/
match (
t.file_format_config.as_ref(),
t.storage_config.as_ref(),
&t.sources[..],
t.sources.get(0).map(DataFileSource::get_chunk_spec),
t.pushdowns.limit,
) {
kevinzwang marked this conversation as resolved.
Show resolved Hide resolved
(
FileFormatConfig::Parquet(_),
StorageConfig::Native(_),
[source],
Some(None),
None,
) if source
.get_size_bytes()
.map_or(true, |s| s > split_threshold_bytes as u64) =>
{
let (io_runtime, io_client) =
t.storage_config.get_io_client_and_runtime()?;

let path = source.get_path();

let io_stats =
IOStatsContext::new(format!("split_by_row_groups for {:#?}", path));

let runtime_handle = io_runtime.handle();

let file = runtime_handle.block_on(read_parquet_metadata(
path,
io_client,
Some(io_stats),
))?;

let mut new_tasks: Vec<DaftResult<ScanTaskRef>> = Vec::new();
let mut curr_row_groups = Vec::new();
let mut curr_size_bytes = 0;

for (i, rg) in file.row_groups.iter().enumerate() {
curr_row_groups.push(i as i64);
curr_size_bytes += rg.compressed_size();
kevinzwang marked this conversation as resolved.
Show resolved Hide resolved

if curr_size_bytes >= min_size_bytes || i == file.row_groups.len() - 1 {
let mut new_source = source.clone();
match &mut new_source {
DataFileSource::AnonymousDataFile {
chunk_spec,
size_bytes,
..
}
| DataFileSource::CatalogDataFile {
chunk_spec,
size_bytes,
..
} => {
*chunk_spec = Some(ChunkSpec::Parquet(curr_row_groups));
*size_bytes = Some(curr_size_bytes as u64);
}
};

new_tasks.push(Ok(ScanTask::new(
vec![new_source],
t.file_format_config.clone(),
t.schema.clone(),
t.storage_config.clone(),
t.pushdowns.clone(),
)
.into()));

curr_row_groups = Vec::new();
kevinzwang marked this conversation as resolved.
Show resolved Hide resolved
curr_size_bytes = 0;
}
}

Ok(Box::new(new_tasks.into_iter()))
}
(..) => Ok(Box::new(std::iter::once(Ok(t)))),
}
})
.flat_map(|t| t.unwrap_or_else(|e| Box::new(std::iter::once(Err(e))))),
)
}
34 changes: 34 additions & 0 deletions src/daft-scan/src/storage_config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::sync::Arc;

use common_error::DaftResult;
use common_io_config::IOConfig;
use daft_core::impl_bincode_py_state_serialization;
use daft_io::{get_io_client, get_runtime, IOClient};
use serde::{Deserialize, Serialize};

#[cfg(feature = "python")]
Expand All @@ -23,6 +25,38 @@ pub enum StorageConfig {
Python(Arc<PythonStorageConfig>),
}

impl StorageConfig {
pub fn get_io_client_and_runtime(
&self,
) -> DaftResult<(Arc<tokio::runtime::Runtime>, Arc<IOClient>)> {
// Grab an IOClient and Runtime
// TODO: This should be cleaned up and hidden behind a better API from daft-io
match self {
StorageConfig::Native(cfg) => {
let multithreaded_io = cfg.multithreaded_io;
Ok((
get_runtime(multithreaded_io)?,
get_io_client(
multithreaded_io,
Arc::new(cfg.io_config.clone().unwrap_or_default()),
)?,
))
}
#[cfg(feature = "python")]
StorageConfig::Python(cfg) => {
let multithreaded_io = true; // Hardcode to use multithreaded IO if Python storage config is used for data fetches
Ok((
get_runtime(multithreaded_io)?,
get_io_client(
multithreaded_io,
Arc::new(cfg.io_config.clone().unwrap_or_default()),
)?,
))
}
}
}
}

/// Storage configuration for the Rust-native I/O layer.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)]
#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))]
Expand Down
13 changes: 12 additions & 1 deletion tests/integration/io/parquet/test_reads_public_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,17 @@ def parquet_file(request) -> tuple[str, str]:
return request.param


@pytest.fixture(params=[(True, True), (True, False), (False, False)], ids=["split", "merge", "ignore"])
def set_split_config(request):
threshold = 0 if request.param[0] else 24 * 1024 * 1024
min_size = 0 if request.param[1] else 16 * 1024 * 1024

daft.set_execution_config(
split_row_groups_threshold_bytes=threshold,
split_row_groups_min_size_bytes=min_size,
)
kevinzwang marked this conversation as resolved.
Show resolved Hide resolved


def read_parquet_with_pyarrow(path) -> pa.Table:
kwargs = {}
if get_protocol_from_path(path) == "s3" or get_protocol_from_path(path) == "s3a":
Expand Down Expand Up @@ -287,7 +298,7 @@ def test_parquet_into_pyarrow_bulk(parquet_file, public_storage_io_config, multi


@pytest.mark.integration()
def test_parquet_read_df(parquet_file, public_storage_io_config):
def test_parquet_read_df(parquet_file, public_storage_io_config, set_split_config):
_, url = parquet_file
daft_native_read = daft.read_parquet(url, io_config=public_storage_io_config)
pa_read = MicroPartition.from_arrow(read_parquet_with_pyarrow(url))
Expand Down
Loading