Skip to content

Commit

Permalink
Replace Vec accumulator with single element accumulator
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Dec 4, 2023
1 parent 7d2e35d commit 6cc6bea
Showing 1 changed file with 42 additions and 46 deletions.
88 changes: 42 additions & 46 deletions src/daft-scan/src/scan_task_iters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,68 +11,64 @@ pub fn merge_by_sizes(scan_tasks: BoxScanTaskIter, target_filesize: usize) -> Bo
Box::new(MergeByFileSize {
iter: scan_tasks,
target_filesize,
buffer: std::default::Default::default(),
accumulator: None,
})
}

struct MergeByFileSize {
iter: BoxScanTaskIter,
target_filesize: usize,

// HACK: we use a Vec instead of IndexMap because PartitionSpec is not Hashable
// This might potentially be very slow. We should explore another option here?
// Could be easier to make `PartitionSpec: Hash` if we don't use a `Table` under the hood.
buffer: Vec<ScanTaskRef>,
// Current element being accumulated on
accumulator: Option<ScanTaskRef>,
}

impl Iterator for MergeByFileSize {
type Item = DaftResult<ScanTaskRef>;

fn next(&mut self) -> Option<Self::Item> {
match self.iter.next() {
Some(Ok(child_item)) => {
// Try to find matches in the buffer to perform merging operations
for (idx, buffered_item) in self.buffer.iter().enumerate() {
if child_item.partition_spec() == buffered_item.partition_spec()
&& child_item.file_format_config == buffered_item.file_format_config
&& child_item.schema == buffered_item.schema
&& child_item.storage_config == buffered_item.storage_config
&& child_item.pushdowns == buffered_item.pushdowns
{
// Remove the matched ScanTask from the buffer to either return, or be merged
let matched_item = self.buffer.remove(idx);
// Grabs the accumulator, leaving a `None` in its place
let accumulator = self.accumulator.take();

match (child_item.size_bytes(), matched_item.size_bytes()) {
// Merge if combined size will still be under the target size, and keep iterating
(Some(child_item_size), Some(buffered_item_size))
if child_item_size + buffered_item_size <= self.target_filesize =>
{
let merged_scan_task = ScanTask::merge(
&matched_item,
child_item.as_ref(),
)
.expect(
"Should be able to merge ScanTasks if all invariants are met",
);
self.buffer.push(Arc::new(merged_scan_task));
return self.next();
}
// Otherwise place the current child ScanTask into the buffer and yield the matched ScanTask
_ => {
self.buffer.push(child_item);
return Some(Ok(matched_item));
}
}
}
}
// No match found, place it in the buffer and keep going
self.buffer.push(child_item);
match (self.iter.next(), accumulator) {
// On first iteration, place ScanTask into the accumulator
(Some(Ok(child_item)), None) => {
self.accumulator = Some(child_item);
self.next()
}
// Bubble up errors from child iterator
Some(Err(e)) => Some(Err(e)),
// Iterator ran out of elements, we now flush the buffer
None => Ok(self.buffer.pop()).transpose(),
// On subsequent iterations, check if ScanTask can be merged with accumulator
(Some(Ok(child_item)), Some(accumulator)) => {
let child_matches_accumulator = child_item.partition_spec()
== accumulator.partition_spec()
&& child_item.file_format_config == accumulator.file_format_config
&& child_item.schema == accumulator.schema
&& child_item.storage_config == accumulator.storage_config
&& child_item.pushdowns == accumulator.pushdowns;
let can_merge_filesize = matches!(
(child_item.size_bytes(), accumulator.size_bytes()),
(Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size <= self.target_filesize
);
// Merge the accumulator and continue iteration
if child_matches_accumulator && can_merge_filesize {
self.accumulator = Some(Arc::new(
ScanTask::merge(child_item.as_ref(), accumulator.as_ref())
.expect("ScanTasks should be mergeable in MergeByFileSize"),
));
self.next()
// Replace the accumulator with the new ScanTask and then yield the old accumulator
} else {
self.accumulator = Some(child_item);
Some(Ok(accumulator))
}
}
// Bubble up errors from child iterator, making sure to replace the accumulator which we moved
(Some(Err(e)), acc) => {
self.accumulator = acc;
Some(Err(e))
}
// Iterator ran out of elements: ensure that we flush the last buffered ScanTask
(None, Some(last_scan_task)) => Some(Ok(last_scan_task)),
(None, None) => None,
}
}
}

0 comments on commit 6cc6bea

Please sign in to comment.