From 491054b0455307e161012e4d48c9a5198b1334e3 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Thu, 5 Dec 2024 16:31:45 -0800 Subject: [PATCH] Add feature flag DAFT_ENABLE_AGGRESSIVE_SCANTASK_SPLITTING --- daft/context.py | 3 + daft/daft/__init__.pyi | 1 + src/common/daft-config/src/lib.rs | 9 + src/common/daft-config/src/python.rs | 10 + src/daft-scan/src/scan_task_iters.rs | 182 +++++++++++++++++- .../split_parquet_files_by_rowgroup.rs | 2 +- tests/io/test_split_scan_tasks.py | 134 ++++++------- 7 files changed, 267 insertions(+), 74 deletions(-) diff --git a/daft/context.py b/daft/context.py index 7f9b8b1ae6..fd0c6b7756 100644 --- a/daft/context.py +++ b/daft/context.py @@ -351,6 +351,7 @@ def set_execution_config( shuffle_algorithm: str | None = None, pre_shuffle_merge_threshold: int | None = None, enable_ray_tracing: bool | None = None, + enable_aggressive_scantask_splitting: bool | None = None, ) -> DaftContext: """Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`) @@ -390,6 +391,7 @@ def set_execution_config( shuffle_algorithm: The shuffle algorithm to use. Defaults to "map_reduce". Other options are "pre_shuffle_merge". pre_shuffle_merge_threshold: Memory threshold in bytes for pre-shuffle merge. Defaults to 1GB enable_ray_tracing: Enable tracing for Ray. Accessible in `/tmp/ray/session_latest/logs/daft` after the run completes. Defaults to False. + enable_aggressive_scantask_splitting: Enable more aggressive splitting of ScanTasks to make smaller partitions. Defaults to False. """ # Replace values in the DaftExecutionConfig with user-specified overrides ctx = get_context() @@ -418,6 +420,7 @@ def set_execution_config( shuffle_algorithm=shuffle_algorithm, pre_shuffle_merge_threshold=pre_shuffle_merge_threshold, enable_ray_tracing=enable_ray_tracing, + enable_aggressive_scantask_splitting=enable_aggressive_scantask_splitting, ) ctx._daft_execution_config = new_daft_execution_config diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 9fc430b663..b06aa799b7 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1829,6 +1829,7 @@ class PyDaftExecutionConfig: enable_ray_tracing: bool | None = None, shuffle_algorithm: str | None = None, pre_shuffle_merge_threshold: int | None = None, + enable_aggressive_scantask_splitting: bool | None = None, ) -> PyDaftExecutionConfig: ... @property def scan_tasks_min_size_bytes(self) -> int: ... diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 4aeae897a7..21f9a4d384 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -51,6 +51,7 @@ pub struct DaftExecutionConfig { pub shuffle_algorithm: String, pub pre_shuffle_merge_threshold: usize, pub enable_ray_tracing: bool, + pub enable_aggressive_scantask_splitting: bool, } impl Default for DaftExecutionConfig { @@ -77,6 +78,7 @@ impl Default for DaftExecutionConfig { shuffle_algorithm: "map_reduce".to_string(), pre_shuffle_merge_threshold: 1024 * 1024 * 1024, // 1GB enable_ray_tracing: false, + enable_aggressive_scantask_splitting: false, } } } @@ -110,6 +112,13 @@ impl DaftExecutionConfig { { cfg.enable_ray_tracing = true; } + let enable_aggressive_scantask_splitting_env_var_name = + "DAFT_ENABLE_AGGRESSIVE_SCANTASK_SPLITTING"; + if let Ok(val) = std::env::var(enable_aggressive_scantask_splitting_env_var_name) + && matches!(val.trim().to_lowercase().as_str(), "1" | "true") + { + cfg.enable_aggressive_scantask_splitting = true; + } cfg } } diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index aceefd63d2..1a83613dc5 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -97,6 +97,7 @@ impl PyDaftExecutionConfig { shuffle_algorithm: Option<&str>, pre_shuffle_merge_threshold: Option, enable_ray_tracing: Option, + enable_aggressive_scantask_splitting: Option, ) -> PyResult { let mut config = self.config.as_ref().clone(); @@ -175,6 +176,10 @@ impl PyDaftExecutionConfig { config.enable_ray_tracing = enable_ray_tracing; } + if let Some(enable_aggressive_scantask_splitting) = enable_aggressive_scantask_splitting { + config.enable_aggressive_scantask_splitting = enable_aggressive_scantask_splitting; + } + Ok(Self { config: Arc::new(config), }) @@ -274,6 +279,11 @@ impl PyDaftExecutionConfig { fn enable_ray_tracing(&self) -> PyResult { Ok(self.config.enable_ray_tracing) } + + #[getter] + fn enable_aggressive_scantask_splitting(&self) -> PyResult { + Ok(self.config.enable_aggressive_scantask_splitting) + } } impl_bincode_py_state_serialization!(PyDaftExecutionConfig); diff --git a/src/daft-scan/src/scan_task_iters.rs b/src/daft-scan/src/scan_task_iters.rs index 5a4bdacf63..8d4b51e25e 100644 --- a/src/daft-scan/src/scan_task_iters.rs +++ b/src/daft-scan/src/scan_task_iters.rs @@ -1,13 +1,18 @@ +mod split_parquet_files_by_rowgroup; + use std::sync::Arc; use common_daft_config::DaftExecutionConfig; use common_error::{DaftError, DaftResult}; +use common_file_formats::{FileFormatConfig, ParquetSourceConfig}; use common_scan_info::{ScanTaskLike, ScanTaskLikeRef, SPLIT_AND_MERGE_PASS}; -use split_parquet_files_by_rowgroup::split_by_row_groups; +use daft_io::IOStatsContext; +use daft_parquet::read::read_parquet_metadata; +use parquet2::metadata::RowGroupList; -mod split_parquet_files_by_rowgroup; - -use crate::{Pushdowns, ScanTask, ScanTaskRef}; +use crate::{ + storage_config::StorageConfig, ChunkSpec, DataSource, Pushdowns, ScanTask, ScanTaskRef, +}; pub(crate) type BoxScanTaskIter<'a> = Box> + 'a>; @@ -174,6 +179,139 @@ impl<'a> Iterator for MergeByFileSize<'a> { } } +#[must_use] +pub(crate) fn split_by_row_groups( + scan_tasks: BoxScanTaskIter, + max_tasks: usize, + min_size_bytes: usize, + max_size_bytes: usize, +) -> BoxScanTaskIter { + let mut scan_tasks = itertools::peek_nth(scan_tasks); + + // only split if we have a small amount of files + if scan_tasks.peek_nth(max_tasks).is_some() { + Box::new(scan_tasks) + } else { + Box::new( + scan_tasks + .map(move |t| -> DaftResult { + let t = t?; + + /* Only split parquet tasks if they: + - have one source + - use native storage config + - have no specified chunk spec or number of rows + - have size past split threshold + - no iceberg delete files + */ + if let ( + FileFormatConfig::Parquet(ParquetSourceConfig { + field_id_mapping, .. + }), + StorageConfig::Native(_), + [source], + Some(None), + None, + ) = ( + t.file_format_config.as_ref(), + t.storage_config.as_ref(), + &t.sources[..], + t.sources.first().map(DataSource::get_chunk_spec), + t.pushdowns.limit, + ) && source + .get_size_bytes() + .map_or(true, |s| s > max_size_bytes as u64) + && source + .get_iceberg_delete_files() + .map_or(true, std::vec::Vec::is_empty) + { + let (io_runtime, io_client) = + t.storage_config.get_io_client_and_runtime()?; + + let path = source.get_path(); + + let io_stats = + IOStatsContext::new(format!("split_by_row_groups for {path:#?}")); + + let mut file = io_runtime.block_on_current_thread(read_parquet_metadata( + path, + io_client, + Some(io_stats), + field_id_mapping.clone(), + ))?; + + let mut new_tasks: Vec> = Vec::new(); + let mut curr_row_group_indices = Vec::new(); + let mut curr_row_groups = Vec::new(); + let mut curr_size_bytes = 0; + let mut curr_num_rows = 0; + + let row_groups = std::mem::take(&mut file.row_groups); + let num_row_groups = row_groups.len(); + for (i, rg) in row_groups { + curr_row_groups.push((i, rg)); + let rg = &curr_row_groups.last().unwrap().1; + curr_row_group_indices.push(i as i64); + curr_size_bytes += rg.compressed_size(); + curr_num_rows += rg.num_rows(); + + if curr_size_bytes >= min_size_bytes || i == num_row_groups - 1 { + let mut new_source = source.clone(); + + if let DataSource::File { + chunk_spec, + size_bytes, + parquet_metadata, + .. + } = &mut new_source + { + // only keep relevant row groups in the metadata + let row_group_list = RowGroupList::from_iter(curr_row_groups.into_iter()); + let new_metadata = file.clone_with_row_groups(curr_num_rows, row_group_list); + *parquet_metadata = Some(Arc::new(new_metadata)); + + *chunk_spec = Some(ChunkSpec::Parquet(curr_row_group_indices)); + *size_bytes = Some(curr_size_bytes as u64); + } else { + unreachable!("Parquet file format should only be used with DataSource::File"); + } + + if let DataSource::File { + metadata: Some(metadata), + .. + } = &mut new_source + { + metadata.length = curr_num_rows; + } + + // Reset accumulators + curr_row_groups = Vec::new(); + curr_row_group_indices = Vec::new(); + curr_size_bytes = 0; + curr_num_rows = 0; + + new_tasks.push(Ok(ScanTask::new( + vec![new_source], + t.file_format_config.clone(), + t.schema.clone(), + t.storage_config.clone(), + t.pushdowns.clone(), + t.generated_fields.clone(), + ) + .into())); + } + } + + Ok(Box::new(new_tasks.into_iter())) + } else { + Ok(Box::new(std::iter::once(Ok(t)))) + } + }) + .flat_map(|t| t.unwrap_or_else(|e| Box::new(std::iter::once(Err(e))))), + ) + } +} + fn split_and_merge_pass( scan_tasks: Arc>, pushdowns: &Pushdowns, @@ -193,9 +331,14 @@ fn split_and_merge_pass( .downcast::() .map_err(|e| DaftError::TypeError(format!("Expected Arc, found {:?}", e))) })); - let split_tasks = split_by_row_groups(iter, cfg); - let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg); - let scan_tasks: Vec> = merged_tasks + + let optimized = if cfg.enable_aggressive_scantask_splitting { + split_and_merge_pass_v2(iter, pushdowns, cfg) + } else { + split_and_merge_pass_v1(iter, pushdowns, cfg) + }; + + let scan_tasks: Vec> = optimized .map(|st| st.map(|task| task as Arc)) .collect::>>()?; Ok(Arc::new(scan_tasks)) @@ -204,6 +347,31 @@ fn split_and_merge_pass( } } +fn split_and_merge_pass_v1<'a>( + inputs: BoxScanTaskIter<'a>, + pushdowns: &Pushdowns, + cfg: &'a DaftExecutionConfig, +) -> BoxScanTaskIter<'a> { + let split_tasks = split_by_row_groups( + inputs, + cfg.parquet_split_row_groups_max_files, + cfg.scan_tasks_min_size_bytes, + cfg.scan_tasks_max_size_bytes, + ); + let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg); + merged_tasks +} + +fn split_and_merge_pass_v2<'a>( + inputs: BoxScanTaskIter<'a>, + pushdowns: &Pushdowns, + cfg: &'a DaftExecutionConfig, +) -> BoxScanTaskIter<'a> { + let split_tasks = split_parquet_files_by_rowgroup::split_all_files_by_rowgroup(inputs, cfg); + let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg); + merged_tasks +} + #[ctor::ctor] fn set_pass() { let _ = SPLIT_AND_MERGE_PASS.set(&split_and_merge_pass); diff --git a/src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs b/src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs index 8c4bb6c3b6..ee5debd06b 100644 --- a/src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs +++ b/src/daft-scan/src/scan_task_iters/split_parquet_files_by_rowgroup.rs @@ -453,7 +453,7 @@ impl<'a> Iterator for SplitParquetFilesByRowGroups<'a> { } #[must_use] -pub(crate) fn split_by_row_groups<'a>( +pub(crate) fn split_all_files_by_rowgroup<'a>( scan_tasks: BoxScanTaskIter<'a>, config: &'a DaftExecutionConfig, ) -> BoxScanTaskIter<'a> { diff --git a/tests/io/test_split_scan_tasks.py b/tests/io/test_split_scan_tasks.py index 891b6a16a0..29791ee2f0 100644 --- a/tests/io/test_split_scan_tasks.py +++ b/tests/io/test_split_scan_tasks.py @@ -28,72 +28,74 @@ def test_split_parquet_read(parquet_files): def test_split_parquet_read_many_files(tmpdir): - # Write 20 files into tmpdir - for i in range(20): - tbl = pa.table({"data": [str(i) for i in range(100)]}) - path = tmpdir / f"file.{i}.pq" - papq.write_table(tbl, str(path), row_group_size=10, use_dictionary=False) - - with daft.execution_config_ctx( - scan_tasks_min_size_bytes=20, - scan_tasks_max_size_bytes=100, - ): - df = daft.read_parquet(str(tmpdir)) - assert df.num_partitions() == 200, "Should have 200 partitions since we will split all files" - assert df.to_pydict() == {"data": [str(i) for i in range(100)] * 20} + with daft.execution_config_ctx(enable_aggressive_scantask_splitting=True): + # Write 20 files into tmpdir + for i in range(20): + tbl = pa.table({"data": [str(i) for i in range(100)]}) + path = tmpdir / f"file.{i}.pq" + papq.write_table(tbl, str(path), row_group_size=10, use_dictionary=False) + + with daft.execution_config_ctx( + scan_tasks_min_size_bytes=20, + scan_tasks_max_size_bytes=100, + ): + df = daft.read_parquet(str(tmpdir)) + assert df.num_partitions() == 200, "Should have 200 partitions since we will split all files" + assert df.to_pydict() == {"data": [str(i) for i in range(100)] * 20} def test_split_parquet_read_some_splits(tmpdir): - # Write a mix of 20 large and 20 small files - # Small ones should not be split, large ones should be split into 10 rowgroups each - # This gives us a total of 200 + 20 scantasks - - # Write 20 large files into tmpdir - large_file_paths = [] - for i in range(20): - tbl = pa.table({"data": [str(f"large{i}") for i in range(100)]}) - path = tmpdir / f"file.{i}.large.pq" - papq.write_table(tbl, str(path), row_group_size=10, use_dictionary=False) - large_file_paths.append(str(path)) - - # Write 20 small files into tmpdir - small_file_paths = [] - for i in range(20): - tbl = pa.table({"data": ["small"]}) - path = tmpdir / f"file.{i}.small.pq" - papq.write_table(tbl, str(path), row_group_size=1, use_dictionary=False) - small_file_paths.append(str(path)) - - # Test [large_paths, ..., small_paths, ...] - with daft.execution_config_ctx( - scan_tasks_min_size_bytes=20, - scan_tasks_max_size_bytes=100, - ): - df = daft.read_parquet(large_file_paths + small_file_paths) - assert ( - df.num_partitions() == 220 - ), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit" - assert df.to_pydict() == {"data": [str(f"large{i}") for i in range(100)] * 20 + ["small"] * 20} - - # Test interleaved [large_path, small_path, large_path, small_path, ...] - with daft.execution_config_ctx( - scan_tasks_min_size_bytes=20, - scan_tasks_max_size_bytes=100, - ): - interleaved_paths = [path for pair in zip(large_file_paths, small_file_paths) for path in pair] - df = daft.read_parquet(interleaved_paths) - assert ( - df.num_partitions() == 220 - ), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit" - assert df.to_pydict() == {"data": ([str(f"large{i}") for i in range(100)] + ["small"]) * 20} - - # Test [small_paths, ..., large_paths] - with daft.execution_config_ctx( - scan_tasks_min_size_bytes=20, - scan_tasks_max_size_bytes=100, - ): - df = daft.read_parquet(small_file_paths + large_file_paths) - assert ( - df.num_partitions() == 220 - ), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit" - assert df.to_pydict() == {"data": ["small"] * 20 + [str(f"large{i}") for i in range(100)] * 20} + with daft.execution_config_ctx(enable_aggressive_scantask_splitting=True): + # Write a mix of 20 large and 20 small files + # Small ones should not be split, large ones should be split into 10 rowgroups each + # This gives us a total of 200 + 20 scantasks + + # Write 20 large files into tmpdir + large_file_paths = [] + for i in range(20): + tbl = pa.table({"data": [str(f"large{i}") for i in range(100)]}) + path = tmpdir / f"file.{i}.large.pq" + papq.write_table(tbl, str(path), row_group_size=10, use_dictionary=False) + large_file_paths.append(str(path)) + + # Write 20 small files into tmpdir + small_file_paths = [] + for i in range(20): + tbl = pa.table({"data": ["small"]}) + path = tmpdir / f"file.{i}.small.pq" + papq.write_table(tbl, str(path), row_group_size=1, use_dictionary=False) + small_file_paths.append(str(path)) + + # Test [large_paths, ..., small_paths, ...] + with daft.execution_config_ctx( + scan_tasks_min_size_bytes=20, + scan_tasks_max_size_bytes=100, + ): + df = daft.read_parquet(large_file_paths + small_file_paths) + assert ( + df.num_partitions() == 220 + ), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit" + assert df.to_pydict() == {"data": [str(f"large{i}") for i in range(100)] * 20 + ["small"] * 20} + + # Test interleaved [large_path, small_path, large_path, small_path, ...] + with daft.execution_config_ctx( + scan_tasks_min_size_bytes=20, + scan_tasks_max_size_bytes=100, + ): + interleaved_paths = [path for pair in zip(large_file_paths, small_file_paths) for path in pair] + df = daft.read_parquet(interleaved_paths) + assert ( + df.num_partitions() == 220 + ), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit" + assert df.to_pydict() == {"data": ([str(f"large{i}") for i in range(100)] + ["small"]) * 20} + + # Test [small_paths, ..., large_paths] + with daft.execution_config_ctx( + scan_tasks_min_size_bytes=20, + scan_tasks_max_size_bytes=100, + ): + df = daft.read_parquet(small_file_paths + large_file_paths) + assert ( + df.num_partitions() == 220 + ), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit" + assert df.to_pydict() == {"data": ["small"] * 20 + [str(f"large{i}") for i in range(100)] * 20}