Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Add smart planning of ScanTasks starting with merging by filesizes #1692

Merged
merged 13 commits into from
Dec 5, 2023
10 changes: 6 additions & 4 deletions src/daft-plan/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult<PhysicalPlan> {
scan_op,
..
})) => {
let scan_tasks = scan_op
.0
.to_scan_tasks(pushdowns.clone())?
.collect::<DaftResult<Vec<_>>>()?;
let scan_tasks = scan_op.0.to_scan_tasks(pushdowns.clone())?;

// Apply transformations on the ScanTasks to optimize
let scan_tasks =
daft_scan::scan_task_iters::merge_by_sizes(scan_tasks, 128 * 1024 * 1024);

let scan_tasks = scan_tasks.collect::<DaftResult<Vec<_>>>()?;
let partition_spec = Arc::new(PartitionSpec::new_internal(
PartitionScheme::Unknown,
scan_tasks.len(),
Expand Down
83 changes: 83 additions & 0 deletions src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod file_format;
mod glob;
#[cfg(feature = "python")]
pub mod py_object_serde;
pub mod scan_task_iters;

#[cfg(feature = "python")]
pub mod python;
Expand All @@ -31,6 +32,9 @@ use storage_config::StorageConfig;
pub enum Error {
#[cfg(feature = "python")]
PyIO { source: PyErr },

#[snafu(display("Error when merging ScanTasks: {}", msg))]
jaychia marked this conversation as resolved.
Show resolved Hide resolved
MergeScanTask { msg: String },
}

impl From<Error> for DaftError {
Expand Down Expand Up @@ -108,6 +112,13 @@ impl DataFileSource {
| Self::CatalogDataFile { statistics, .. } => statistics.as_ref(),
}
}

pub fn get_partition_spec(&self) -> Option<&PartitionSpec> {
match self {
Self::AnonymousDataFile { partition_spec, .. } => partition_spec.as_ref(),
Self::CatalogDataFile { partition_spec, .. } => Some(partition_spec),
}
}
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -132,6 +143,13 @@ impl ScanTask {
pushdowns: Pushdowns,
) -> Self {
assert!(!sources.is_empty());
let first_pspec = sources.first().unwrap().get_partition_spec();
assert!(
jaychia marked this conversation as resolved.
Show resolved Hide resolved
sources
.iter()
.all(|s| s.get_partition_spec() == first_pspec),
"ScanTask sources must all have the same PartitionSpec at construction",
);
let (length, size_bytes_on_disk, statistics) = sources
.iter()
.map(|s| {
Expand Down Expand Up @@ -167,6 +185,64 @@ impl ScanTask {
}
}

pub fn merge(sc1: &ScanTask, sc2: &ScanTask) -> Result<ScanTask, Error> {
if sc1.partition_spec() != sc2.partition_spec() {
return Err(Error::MergeScanTask {
msg: format!(
jaychia marked this conversation as resolved.
Show resolved Hide resolved
"Cannot merge ScanTasks with differing PartitionSpecs: {:?} vs {:?}",
sc1.partition_spec(),
sc2.partition_spec()
),
});
}
if sc1.file_format_config != sc2.file_format_config {
return Err(Error::MergeScanTask {
msg: format!(
"Cannot merge ScanTasks with differing FileFormatConfigs: {:?} vs {:?}",
sc1.file_format_config.as_ref(),
sc2.file_format_config.as_ref()
),
});
}
if sc1.schema != sc2.schema {
return Err(Error::MergeScanTask {
msg: format!(
"Cannot merge ScanTasks with differing Schema: {} vs {}",
sc1.schema.as_ref(),
sc2.schema.as_ref()
),
});
}
if sc1.storage_config != sc2.storage_config {
return Err(Error::MergeScanTask {
msg: format!(
"Cannot merge ScanTasks with differing StorageConfigs: {:?} vs {:?}",
sc1.storage_config.as_ref(),
sc2.storage_config.as_ref()
),
});
}
if sc1.pushdowns != sc2.pushdowns {
return Err(Error::MergeScanTask {
msg: format!(
"Cannot merge ScanTasks with differing Pushdowns: {:?} vs {:?}",
sc1.pushdowns, sc2.pushdowns
),
});
}
Ok(ScanTask::new(
sc1.sources
.clone()
.into_iter()
.chain(sc2.sources.clone())
.collect(),
sc1.file_format_config.clone(),
sc1.schema.clone(),
sc1.storage_config.clone(),
sc1.pushdowns.clone(),
))
}

pub fn num_rows(&self) -> Option<usize> {
self.metadata.as_ref().map(|m| m.length)
}
Expand All @@ -182,6 +258,13 @@ impl ScanTask {
// Fall back on on-disk size.
.or_else(|| self.size_bytes_on_disk.map(|s| s as usize))
}

pub fn partition_spec(&self) -> Option<&PartitionSpec> {
match self.sources.first() {
None => None,
Some(source) => source.get_partition_spec(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
Expand Down
78 changes: 78 additions & 0 deletions src/daft-scan/src/scan_task_iters.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::sync::Arc;

use common_error::DaftResult;

use crate::{ScanTask, ScanTaskRef};

type BoxScanTaskIter = Box<dyn Iterator<Item = DaftResult<ScanTaskRef>>>;

/// Coalesces ScanTasks by their filesizes
pub fn merge_by_sizes(scan_tasks: BoxScanTaskIter, target_filesize: usize) -> BoxScanTaskIter {
Box::new(MergeByFileSize {
iter: scan_tasks,
target_filesize,
buffer: std::default::Default::default(),
})
}

struct MergeByFileSize {
iter: BoxScanTaskIter,
target_filesize: usize,

// HACK: we use a Vec instead of IndexMap because PartitionSpec is not Hashable
// This might potentially be very slow. We should explore another option here?
// Could be easier to make `PartitionSpec: Hash` if we don't use a `Table` under the hood.
buffer: Vec<ScanTaskRef>,
jaychia marked this conversation as resolved.
Show resolved Hide resolved
}

impl Iterator for MergeByFileSize {
type Item = DaftResult<ScanTaskRef>;

fn next(&mut self) -> Option<Self::Item> {
match self.iter.next() {
Some(Ok(child_item)) => {
// Try to find matches in the buffer to perform merging operations
for (idx, buffered_item) in self.buffer.iter().enumerate() {
jaychia marked this conversation as resolved.
Show resolved Hide resolved
if child_item.partition_spec() == buffered_item.partition_spec()
&& child_item.file_format_config == buffered_item.file_format_config
&& child_item.schema == buffered_item.schema
&& child_item.storage_config == buffered_item.storage_config
&& child_item.pushdowns == buffered_item.pushdowns
{
// Remove the matched ScanTask from the buffer to either return, or be merged
let matched_item = self.buffer.remove(idx);

match (child_item.size_bytes(), matched_item.size_bytes()) {
// Merge if combined size will still be under the target size, and keep iterating
(Some(child_item_size), Some(buffered_item_size))
if child_item_size + buffered_item_size <= self.target_filesize =>
{
let merged_scan_task = ScanTask::merge(
&matched_item,
child_item.as_ref(),
)
.expect(
"Should be able to merge ScanTasks if all invariants are met",
);
self.buffer.push(Arc::new(merged_scan_task));
return self.next();
}
// Otherwise place the current child ScanTask into the buffer and yield the matched ScanTask
_ => {
self.buffer.push(child_item);
return Some(Ok(matched_item));
}
}
}
}
// No match found, place it in the buffer and keep going
self.buffer.push(child_item);
self.next()
}
// Bubble up errors from child iterator
Some(Err(e)) => Some(Err(e)),
// Iterator ran out of elements, we now flush the buffer
None => Ok(self.buffer.pop()).transpose(),
}
}
}
24 changes: 24 additions & 0 deletions src/daft-stats/src/partition_spec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
use daft_core::array::ops::DaftCompare;
use daft_table::Table;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PartitionSpec {
pub keys: Table,
}

impl PartialEq for PartitionSpec {
fn eq(&self, other: &Self) -> bool {
// If the names of fields or types of fields don't match, return False
if self.keys.schema != other.keys.schema {
return false;
}

// Assuming exact matches in field names and types, now compare each field's values
for field_name in self.keys.schema.as_ref().fields.keys() {
let self_column = self.keys.get_column(field_name).unwrap();
let other_column = other.keys.get_column(field_name).unwrap();
let value_eq = self_column.equal(other_column).unwrap().get(0).unwrap();
if !value_eq {
return false;
}
}

true
}
}

impl Eq for PartitionSpec {}
Loading