Skip to content

Commit

Permalink
Switch from target_filesize to use a min/max filesize for merging
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Dec 4, 2023
1 parent 6cc6bea commit 378280a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 24 deletions.
7 changes: 5 additions & 2 deletions src/daft-plan/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult<PhysicalPlan> {
let scan_tasks = scan_op.0.to_scan_tasks(pushdowns.clone())?;

// Apply transformations on the ScanTasks to optimize
let scan_tasks =
daft_scan::scan_task_iters::merge_by_sizes(scan_tasks, 128 * 1024 * 1024);
let scan_tasks = daft_scan::scan_task_iters::merge_by_sizes(
scan_tasks,
64 * 1024 * 1024,
512 * 1024 * 1024,
);

let scan_tasks = scan_tasks.collect::<DaftResult<Vec<_>>>()?;
let partition_spec = Arc::new(PartitionSpec::new_internal(
Expand Down
68 changes: 46 additions & 22 deletions src/daft-scan/src/scan_task_iters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@ use crate::{ScanTask, ScanTaskRef};
type BoxScanTaskIter = Box<dyn Iterator<Item = DaftResult<ScanTaskRef>>>;

/// Coalesces ScanTasks by their filesizes
pub fn merge_by_sizes(scan_tasks: BoxScanTaskIter, target_filesize: usize) -> BoxScanTaskIter {
pub fn merge_by_sizes(
scan_tasks: BoxScanTaskIter,
min_filesize: usize,
max_filesize: usize,
) -> BoxScanTaskIter {
Box::new(MergeByFileSize {
iter: scan_tasks,
target_filesize,
min_filesize,
max_filesize,
accumulator: None,
})
}

struct MergeByFileSize {
iter: BoxScanTaskIter,
target_filesize: usize,
min_filesize: usize,
max_filesize: usize,

// Current element being accumulated on
accumulator: Option<ScanTaskRef>,
Expand All @@ -31,34 +37,52 @@ impl Iterator for MergeByFileSize {
let accumulator = self.accumulator.take();

match (self.iter.next(), accumulator) {
// On first iteration, place ScanTask into the accumulator
// When no accumulator exists, trivially place the ScanTask into the accumulator
(Some(Ok(child_item)), None) => {
self.accumulator = Some(child_item);
self.next()
}
// On subsequent iterations, check if ScanTask can be merged with accumulator
// When an accumulator exists, attempt a merge and yield the result
(Some(Ok(child_item)), Some(accumulator)) => {
let child_matches_accumulator = child_item.partition_spec()
== accumulator.partition_spec()
&& child_item.file_format_config == accumulator.file_format_config
&& child_item.schema == accumulator.schema
&& child_item.storage_config == accumulator.storage_config
&& child_item.pushdowns == accumulator.pushdowns;
let can_merge_filesize = matches!(
// Whether or not the accumulator and the current item should be merged
let should_merge = {
let child_matches_accumulator = child_item.partition_spec()
== accumulator.partition_spec()
&& child_item.file_format_config == accumulator.file_format_config
&& child_item.schema == accumulator.schema
&& child_item.storage_config == accumulator.storage_config
&& child_item.pushdowns == accumulator.pushdowns;
let smaller_than_max_filesize = matches!(
(child_item.size_bytes(), accumulator.size_bytes()),
(Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size <= self.max_filesize
);
child_matches_accumulator && smaller_than_max_filesize
};
// Whether or not we should immediately yield the merged result, or keep accumulating
let should_yield_merged_result = matches!(
(child_item.size_bytes(), accumulator.size_bytes()),
(Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size <= self.target_filesize
(Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size >= self.min_filesize
);
// Merge the accumulator and continue iteration
if child_matches_accumulator && can_merge_filesize {
self.accumulator = Some(Arc::new(

match (should_merge, should_yield_merged_result) {
// Merge and yield the merged result immediately
(true, true) => Some(Ok(Arc::new(
ScanTask::merge(child_item.as_ref(), accumulator.as_ref())
.expect("ScanTasks should be mergeable in MergeByFileSize"),
));
self.next()
// Replace the accumulator with the new ScanTask and then yield the old accumulator
} else {
self.accumulator = Some(child_item);
Some(Ok(accumulator))
))),
// Merge and continue iterating and accumulating without yielding a result right now
(true, false) => {
self.accumulator = Some(Arc::new(
ScanTask::merge(child_item.as_ref(), accumulator.as_ref())
.expect("ScanTasks should be mergeable in MergeByFileSize"),
));
self.next()
}
// Cannot merge: replace the accumulator and then yield the old accumulator
(false, _) => {
self.accumulator = Some(child_item);
Some(Ok(accumulator))
}
}
}
// Bubble up errors from child iterator, making sure to replace the accumulator which we moved
Expand Down

0 comments on commit 378280a

Please sign in to comment.