diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 0b47c71a03..c28df0c262 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -79,8 +79,11 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { let scan_tasks = scan_op.0.to_scan_tasks(pushdowns.clone())?; // Apply transformations on the ScanTasks to optimize - let scan_tasks = - daft_scan::scan_task_iters::merge_by_sizes(scan_tasks, 128 * 1024 * 1024); + let scan_tasks = daft_scan::scan_task_iters::merge_by_sizes( + scan_tasks, + 64 * 1024 * 1024, + 512 * 1024 * 1024, + ); let scan_tasks = scan_tasks.collect::>>()?; let partition_spec = Arc::new(PartitionSpec::new_internal( diff --git a/src/daft-scan/src/scan_task_iters.rs b/src/daft-scan/src/scan_task_iters.rs index 8209a35169..1ea965e3a8 100644 --- a/src/daft-scan/src/scan_task_iters.rs +++ b/src/daft-scan/src/scan_task_iters.rs @@ -7,17 +7,23 @@ use crate::{ScanTask, ScanTaskRef}; type BoxScanTaskIter = Box>>; /// Coalesces ScanTasks by their filesizes -pub fn merge_by_sizes(scan_tasks: BoxScanTaskIter, target_filesize: usize) -> BoxScanTaskIter { +pub fn merge_by_sizes( + scan_tasks: BoxScanTaskIter, + min_filesize: usize, + max_filesize: usize, +) -> BoxScanTaskIter { Box::new(MergeByFileSize { iter: scan_tasks, - target_filesize, + min_filesize, + max_filesize, accumulator: None, }) } struct MergeByFileSize { iter: BoxScanTaskIter, - target_filesize: usize, + min_filesize: usize, + max_filesize: usize, // Current element being accumulated on accumulator: Option, @@ -31,34 +37,52 @@ impl Iterator for MergeByFileSize { let accumulator = self.accumulator.take(); match (self.iter.next(), accumulator) { - // On first iteration, place ScanTask into the accumulator + // When no accumulator exists, trivially place the ScanTask into the accumulator (Some(Ok(child_item)), None) => { self.accumulator = Some(child_item); self.next() } - // On subsequent iterations, check if ScanTask can be merged with accumulator + // When an accumulator exists, attempt a merge and yield the result (Some(Ok(child_item)), Some(accumulator)) => { - let child_matches_accumulator = child_item.partition_spec() - == accumulator.partition_spec() - && child_item.file_format_config == accumulator.file_format_config - && child_item.schema == accumulator.schema - && child_item.storage_config == accumulator.storage_config - && child_item.pushdowns == accumulator.pushdowns; - let can_merge_filesize = matches!( + // Whether or not the accumulator and the current item should be merged + let should_merge = { + let child_matches_accumulator = child_item.partition_spec() + == accumulator.partition_spec() + && child_item.file_format_config == accumulator.file_format_config + && child_item.schema == accumulator.schema + && child_item.storage_config == accumulator.storage_config + && child_item.pushdowns == accumulator.pushdowns; + let smaller_than_max_filesize = matches!( + (child_item.size_bytes(), accumulator.size_bytes()), + (Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size <= self.max_filesize + ); + child_matches_accumulator && smaller_than_max_filesize + }; + // Whether or not we should immediately yield the merged result, or keep accumulating + let should_yield_merged_result = matches!( (child_item.size_bytes(), accumulator.size_bytes()), - (Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size <= self.target_filesize + (Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size >= self.min_filesize ); - // Merge the accumulator and continue iteration - if child_matches_accumulator && can_merge_filesize { - self.accumulator = Some(Arc::new( + + match (should_merge, should_yield_merged_result) { + // Merge and yield the merged result immediately + (true, true) => Some(Ok(Arc::new( ScanTask::merge(child_item.as_ref(), accumulator.as_ref()) .expect("ScanTasks should be mergeable in MergeByFileSize"), - )); - self.next() - // Replace the accumulator with the new ScanTask and then yield the old accumulator - } else { - self.accumulator = Some(child_item); - Some(Ok(accumulator)) + ))), + // Merge and continue iterating and accumulating without yielding a result right now + (true, false) => { + self.accumulator = Some(Arc::new( + ScanTask::merge(child_item.as_ref(), accumulator.as_ref()) + .expect("ScanTasks should be mergeable in MergeByFileSize"), + )); + self.next() + } + // Cannot merge: replace the accumulator and then yield the old accumulator + (false, _) => { + self.accumulator = Some(child_item); + Some(Ok(accumulator)) + } } } // Bubble up errors from child iterator, making sure to replace the accumulator which we moved