Skip to content

Commit

Permalink
Add config flag and unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Dec 19, 2024
1 parent ed8618b commit e811034
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 20 deletions.
3 changes: 3 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ def set_execution_config(
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
enable_ray_tracing: bool | None = None,
scantask_splitting_level: int | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution.
Expand Down Expand Up @@ -395,6 +396,7 @@ def set_execution_config(
shuffle_algorithm: The shuffle algorithm to use. Defaults to "map_reduce". Other options are "pre_shuffle_merge".
pre_shuffle_merge_threshold: Memory threshold in bytes for pre-shuffle merge. Defaults to 1GB
enable_ray_tracing: Enable tracing for Ray. Accessible in `/tmp/ray/session_latest/logs/daft` after the run completes. Defaults to False.
scantask_splitting_level: How aggressively to split scan tasks. Setting this to `2` will use a more aggressive ScanTask splitting algorithm which might be more expensive to run but results in more even splits of partitions. Defaults to 1.
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
ctx = get_context()
Expand Down Expand Up @@ -425,6 +427,7 @@ def set_execution_config(
shuffle_algorithm=shuffle_algorithm,
pre_shuffle_merge_threshold=pre_shuffle_merge_threshold,
enable_ray_tracing=enable_ray_tracing,
scantask_splitting_level=scantask_splitting_level,
)

ctx._daft_execution_config = new_daft_execution_config
Expand Down
1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,7 @@ class PyDaftExecutionConfig:
enable_ray_tracing: bool | None = None,
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
scantask_splitting_level: int | None = None,
) -> PyDaftExecutionConfig: ...
@property
def scan_tasks_min_size_bytes(self) -> int: ...
Expand Down
6 changes: 6 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct DaftExecutionConfig {
pub shuffle_algorithm: String,
pub pre_shuffle_merge_threshold: usize,
pub enable_ray_tracing: bool,
pub scantask_splitting_level: i32,
}

impl Default for DaftExecutionConfig {
Expand Down Expand Up @@ -81,6 +82,7 @@ impl Default for DaftExecutionConfig {
shuffle_algorithm: "map_reduce".to_string(),
pre_shuffle_merge_threshold: 1024 * 1024 * 1024, // 1GB
enable_ray_tracing: false,
scantask_splitting_level: 1,
}
}
}
Expand Down Expand Up @@ -118,6 +120,10 @@ impl DaftExecutionConfig {
if let Ok(val) = std::env::var(shuffle_algorithm_env_var_name) {
cfg.shuffle_algorithm = val;
}
let enable_aggressive_scantask_splitting_env_var_name = "DAFT_SCANTASK_SPLITTING_LEVEL";
if let Ok(val) = std::env::var(enable_aggressive_scantask_splitting_env_var_name) {
cfg.scantask_splitting_level = val.parse::<i32>().unwrap_or(0);
}
cfg
}
}
Expand Down
15 changes: 15 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl PyDaftExecutionConfig {
shuffle_algorithm: Option<&str>,
pre_shuffle_merge_threshold: Option<usize>,
enable_ray_tracing: Option<bool>,
scantask_splitting_level: Option<i32>,
) -> PyResult<Self> {
let mut config = self.config.as_ref().clone();

Expand Down Expand Up @@ -184,6 +185,15 @@ impl PyDaftExecutionConfig {
config.enable_ray_tracing = enable_ray_tracing;
}

if let Some(scantask_splitting_level) = scantask_splitting_level {
if !matches!(scantask_splitting_level, 1 | 2) {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"scantask_splitting_level must be 1 or 2",
));
}
config.scantask_splitting_level = scantask_splitting_level;
}

Ok(Self {
config: Arc::new(config),
})
Expand Down Expand Up @@ -293,6 +303,11 @@ impl PyDaftExecutionConfig {
fn enable_ray_tracing(&self) -> PyResult<bool> {
Ok(self.config.enable_ray_tracing)
}

#[getter]
fn scantask_splitting_level(&self) -> PyResult<i32> {
Ok(self.config.scantask_splitting_level)
}
}

impl_bincode_py_state_serialization!(PyDaftExecutionConfig);
48 changes: 28 additions & 20 deletions src/daft-scan/src/scan_task_iters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,26 +316,34 @@ fn split_and_merge_pass(
.iter()
.all(|st| st.as_any().downcast_ref::<ScanTask>().is_some())
{
// TODO(desmond): Here we downcast Arc<dyn ScanTaskLike> to Arc<ScanTask>. ScanTask and DummyScanTask (test only) are
// the only non-test implementer of ScanTaskLike. It might be possible to avoid the downcast by implementing merging
// at the trait level, but today that requires shifting around a non-trivial amount of code to avoid circular dependencies.
let iter: BoxScanTaskIter = Box::new(scan_tasks.as_ref().iter().map(|st| {
st.clone()
.as_any_arc()
.downcast::<ScanTask>()
.map_err(|e| DaftError::TypeError(format!("Expected Arc<ScanTask>, found {:?}", e)))
}));
let split_tasks = split_by_row_groups(
iter,
cfg.parquet_split_row_groups_max_files,
cfg.scan_tasks_min_size_bytes,
cfg.scan_tasks_max_size_bytes,
);
let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg);
let scan_tasks: Vec<Arc<dyn ScanTaskLike>> = merged_tasks
.map(|st| st.map(|task| task as Arc<dyn ScanTaskLike>))
.collect::<DaftResult<Vec<_>>>()?;
Ok(Arc::new(scan_tasks))
if cfg.scantask_splitting_level == 1 {
// TODO(desmond): Here we downcast Arc<dyn ScanTaskLike> to Arc<ScanTask>. ScanTask and DummyScanTask (test only) are
// the only non-test implementer of ScanTaskLike. It might be possible to avoid the downcast by implementing merging
// at the trait level, but today that requires shifting around a non-trivial amount of code to avoid circular dependencies.
let iter: BoxScanTaskIter = Box::new(scan_tasks.as_ref().iter().map(|st| {
st.clone().as_any_arc().downcast::<ScanTask>().map_err(|e| {
DaftError::TypeError(format!("Expected Arc<ScanTask>, found {:?}", e))
})
}));
let split_tasks = split_by_row_groups(
iter,
cfg.parquet_split_row_groups_max_files,
cfg.scan_tasks_min_size_bytes,
cfg.scan_tasks_max_size_bytes,
);
let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg);
let scan_tasks: Vec<Arc<dyn ScanTaskLike>> = merged_tasks
.map(|st| st.map(|task| task as Arc<dyn ScanTaskLike>))
.collect::<DaftResult<Vec<_>>>()?;
Ok(Arc::new(scan_tasks))
} else if cfg.scantask_splitting_level == 2 {
todo!("Implement aggressive scantask splitting");
} else {
panic!(
"DAFT_SCANTASK_SPLITTING_LEVEL must be either 1 or 2, received: {}",
cfg.scantask_splitting_level
);
}
} else {
Ok(scan_tasks)
}
Expand Down
57 changes: 57 additions & 0 deletions tests/io/test_split_scan_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,60 @@ def test_split_parquet_read(parquet_files):
df = daft.read_parquet(str(parquet_files))
assert df.num_partitions() == 10, "Should have 10 partitions since we will split the file"
assert df.to_pydict() == {"data": ["aaa"] * 100}


def test_split_parquet_read_some_splits(tmpdir):
with daft.execution_config_ctx(scantask_splitting_level=2):
# Write a mix of 20 large and 20 small files
# Small ones should not be split, large ones should be split into 10 rowgroups each
# This gives us a total of 200 + 20 scantasks

# Write 20 large files into tmpdir
large_file_paths = []
for i in range(20):
tbl = pa.table({"data": [str(f"large{i}") for i in range(100)]})
path = tmpdir / f"file.{i}.large.pq"
papq.write_table(tbl, str(path), row_group_size=10, use_dictionary=False)
large_file_paths.append(str(path))

# Write 20 small files into tmpdir
small_file_paths = []
for i in range(20):
tbl = pa.table({"data": ["small"]})
path = tmpdir / f"file.{i}.small.pq"
papq.write_table(tbl, str(path), row_group_size=1, use_dictionary=False)
small_file_paths.append(str(path))

# Test [large_paths, ..., small_paths, ...]
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=20,
scan_tasks_max_size_bytes=100,
):
df = daft.read_parquet(large_file_paths + small_file_paths)
assert (
df.num_partitions() == 220
), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit"
assert df.to_pydict() == {"data": [str(f"large{i}") for i in range(100)] * 20 + ["small"] * 20}

# Test interleaved [large_path, small_path, large_path, small_path, ...]
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=20,
scan_tasks_max_size_bytes=100,
):
interleaved_paths = [path for pair in zip(large_file_paths, small_file_paths) for path in pair]
df = daft.read_parquet(interleaved_paths)
assert (
df.num_partitions() == 220
), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit"
assert df.to_pydict() == {"data": ([str(f"large{i}") for i in range(100)] + ["small"]) * 20}

# Test [small_paths, ..., large_paths]
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=20,
scan_tasks_max_size_bytes=100,
):
df = daft.read_parquet(small_file_paths + large_file_paths)
assert (
df.num_partitions() == 220
), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit"
assert df.to_pydict() == {"data": ["small"] * 20 + [str(f"large{i}") for i in range(100)] * 20}

0 comments on commit e811034

Please sign in to comment.