Skip to content

Commit

Permalink
Add docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Dec 4, 2023
1 parent 89d1f9e commit 29f5613
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions src/daft-scan/src/scan_task_iters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,35 @@ use crate::{ScanTask, ScanTaskRef};

type BoxScanTaskIter = Box<dyn Iterator<Item = DaftResult<ScanTaskRef>>>;

/// Coalesces ScanTasks by their filesizes
/// Coalesces ScanTasks by their [`ScanTask::size_bytes()`]
///
/// NOTE: `min_size_bytes` and `max_size_bytes` are only parameters for the algorithm used for merging ScanTasks,
/// and do not provide any guarantees about the sizes of ScanTasks yielded by the resultant iterator.
/// This function may still yield ScanTasks with smaller sizes than `min_size_bytes`, or larger sizes
/// than `max_size_bytes` if **existing non-merged ScanTasks** with those sizes exist in the iterator.
///
/// # Arguments:
///
/// * `scan_tasks`: A Boxed Iterator of ScanTaskRefs to perform merging on
/// * `min_size_bytes`: Minimum size in bytes of a ScanTask, after which no more merging will be performed
/// * `max_size_bytes`: Maximum size in bytes of a ScanTask, capping the maximum size of a merged ScanTask
pub fn merge_by_sizes(
scan_tasks: BoxScanTaskIter,
min_filesize: usize,
max_filesize: usize,
min_size_bytes: usize,
max_size_bytes: usize,
) -> BoxScanTaskIter {
Box::new(MergeByFileSize {
iter: scan_tasks,
min_filesize,
max_filesize,
min_size_bytes,
max_size_bytes,
accumulator: None,
})
}

struct MergeByFileSize {
iter: BoxScanTaskIter,
min_filesize: usize,
max_filesize: usize,
min_size_bytes: usize,
max_size_bytes: usize,

// Current element being accumulated on
accumulator: Option<ScanTaskRef>,
Expand Down Expand Up @@ -52,11 +63,11 @@ impl Iterator for MergeByFileSize {
&& child_item.schema == accumulator.schema
&& child_item.storage_config == accumulator.storage_config
&& child_item.pushdowns == accumulator.pushdowns;
let smaller_than_max_filesize = matches!(
let smaller_than_max_size_bytes = matches!(
(child_item.size_bytes(), accumulator.size_bytes()),
(Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size <= self.max_filesize
(Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size <= self.max_size_bytes
);
child_matches_accumulator && smaller_than_max_filesize
child_matches_accumulator && smaller_than_max_size_bytes
};

if should_merge {
Expand All @@ -68,7 +79,7 @@ impl Iterator for MergeByFileSize {
// Whether or not we should immediately yield the merged result, or keep accumulating
let should_yield = matches!(
(child_item.size_bytes(), accumulator.size_bytes()),
(Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size >= self.min_filesize
(Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size >= self.min_size_bytes
);
if should_yield {
Ok(merged_result).transpose()
Expand Down

0 comments on commit 29f5613

Please sign in to comment.