Skip to content

Commit

Permalink
Expand size approximation to more ops.
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkzinzow committed Dec 11, 2023
1 parent 68160ab commit d02e5db
Showing 1 changed file with 40 additions and 2 deletions.
42 changes: 40 additions & 2 deletions src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ impl PhysicalPlan {
.iter()
.map(|scan_task| scan_task.size_bytes())
.sum::<Option<usize>>(),
// Assume no row/column pruning in cardinality-affecting operations.
// TODO(Clark): Estimate row/column pruning to get a better size approximation.
Self::Filter(Filter { input, .. })
| Self::Limit(Limit { input, .. })
| Self::Project(Project { input, .. }) => input.approximate_size_bytes(),
// Assume ~the same size in bytes for explodes.
// TODO(Clark): Improve this estimate.
Self::Explode(Explode { input, .. }) => input.approximate_size_bytes(),
// Propagate child approximation for operations that don't affect cardinality.
Self::Coalesce(Coalesce { input, .. })
| Self::FanoutByHash(FanoutByHash { input, .. })
Expand All @@ -188,8 +196,38 @@ impl PhysicalPlan {
| Self::ReduceMerge(ReduceMerge { input, .. })
| Self::Sort(Sort { input, .. })
| Self::Split(Split { input, .. }) => input.approximate_size_bytes(),
// TODO(Clark): Add size bytes estimates for other operators.
_ => None,
Self::Concat(Concat { input, other }) => {
input.approximate_size_bytes().and_then(|input_size| {
other
.approximate_size_bytes()
.map(|other_size| input_size + other_size)
})
}
// Assume a simple sum of the sizes of both sides of the join for the post-join size.
// TODO(Clark): This will double-count join key columns, we should ensure that these are only counted once.
Self::BroadcastJoin(BroadcastJoin {
broadcaster: left,
receiver: right,
..
})
| Self::HashJoin(HashJoin { left, right, .. }) => {
left.approximate_size_bytes().and_then(|left_size| {
right
.approximate_size_bytes()
.map(|right_size| left_size + right_size)
})
}
// TODO(Clark): Approximate post-aggregation sizes via grouping estimates + aggregation type.
Self::Aggregate(_) => None,
// No size approximation support for legacy I/O.
Self::TabularScanParquet(_) | Self::TabularScanCsv(_) | Self::TabularScanJson(_) => {
None
}
// Post-write DataFrame will contain paths to files that were written.
// TODO(Clark): Estimate output size via root directory and estimates for # of partitions given partitioning column.
Self::TabularWriteParquet(_) | Self::TabularWriteCsv(_) | Self::TabularWriteJson(_) => {
None
}
}
}
}
Expand Down

0 comments on commit d02e5db

Please sign in to comment.