diff --git a/src/daft-scan/src/scan_task_iters.rs b/src/daft-scan/src/scan_task_iters.rs index 2e498e6179..fdda27033d 100644 --- a/src/daft-scan/src/scan_task_iters.rs +++ b/src/daft-scan/src/scan_task_iters.rs @@ -6,24 +6,35 @@ use crate::{ScanTask, ScanTaskRef}; type BoxScanTaskIter = Box>>; -/// Coalesces ScanTasks by their filesizes +/// Coalesces ScanTasks by their [`ScanTask::size_bytes()`] +/// +/// NOTE: `min_size_bytes` and `max_size_bytes` are only parameters for the algorithm used for merging ScanTasks, +/// and do not provide any guarantees about the sizes of ScanTasks yielded by the resultant iterator. +/// This function may still yield ScanTasks with smaller sizes than `min_size_bytes`, or larger sizes +/// than `max_size_bytes` if **existing non-merged ScanTasks** with those sizes exist in the iterator. +/// +/// # Arguments: +/// +/// * `scan_tasks`: A Boxed Iterator of ScanTaskRefs to perform merging on +/// * `min_size_bytes`: Minimum size in bytes of a ScanTask, after which no more merging will be performed +/// * `max_size_bytes`: Maximum size in bytes of a ScanTask, capping the maximum size of a merged ScanTask pub fn merge_by_sizes( scan_tasks: BoxScanTaskIter, - min_filesize: usize, - max_filesize: usize, + min_size_bytes: usize, + max_size_bytes: usize, ) -> BoxScanTaskIter { Box::new(MergeByFileSize { iter: scan_tasks, - min_filesize, - max_filesize, + min_size_bytes, + max_size_bytes, accumulator: None, }) } struct MergeByFileSize { iter: BoxScanTaskIter, - min_filesize: usize, - max_filesize: usize, + min_size_bytes: usize, + max_size_bytes: usize, // Current element being accumulated on accumulator: Option, @@ -52,11 +63,11 @@ impl Iterator for MergeByFileSize { && child_item.schema == accumulator.schema && child_item.storage_config == accumulator.storage_config && child_item.pushdowns == accumulator.pushdowns; - let smaller_than_max_filesize = matches!( + let smaller_than_max_size_bytes = matches!( (child_item.size_bytes(), accumulator.size_bytes()), - (Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size <= self.max_filesize + (Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size <= self.max_size_bytes ); - child_matches_accumulator && smaller_than_max_filesize + child_matches_accumulator && smaller_than_max_size_bytes }; if should_merge { @@ -68,7 +79,7 @@ impl Iterator for MergeByFileSize { // Whether or not we should immediately yield the merged result, or keep accumulating let should_yield = matches!( (child_item.size_bytes(), accumulator.size_bytes()), - (Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size >= self.min_filesize + (Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size >= self.min_size_bytes ); if should_yield { Ok(merged_result).transpose()