From d02e5dbd4018e92d8c4eaaeb9a2c29d266d44625 Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Mon, 11 Dec 2023 10:28:17 -0800 Subject: [PATCH] Expand size approximation to more ops. --- src/daft-plan/src/physical_plan.rs | 42 ++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index 5f6f6e8631..68112897ee 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -179,6 +179,14 @@ impl PhysicalPlan { .iter() .map(|scan_task| scan_task.size_bytes()) .sum::>(), + // Assume no row/column pruning in cardinality-affecting operations. + // TODO(Clark): Estimate row/column pruning to get a better size approximation. + Self::Filter(Filter { input, .. }) + | Self::Limit(Limit { input, .. }) + | Self::Project(Project { input, .. }) => input.approximate_size_bytes(), + // Assume ~the same size in bytes for explodes. + // TODO(Clark): Improve this estimate. + Self::Explode(Explode { input, .. }) => input.approximate_size_bytes(), // Propagate child approximation for operations that don't affect cardinality. Self::Coalesce(Coalesce { input, .. }) | Self::FanoutByHash(FanoutByHash { input, .. }) @@ -188,8 +196,38 @@ impl PhysicalPlan { | Self::ReduceMerge(ReduceMerge { input, .. }) | Self::Sort(Sort { input, .. }) | Self::Split(Split { input, .. }) => input.approximate_size_bytes(), - // TODO(Clark): Add size bytes estimates for other operators. - _ => None, + Self::Concat(Concat { input, other }) => { + input.approximate_size_bytes().and_then(|input_size| { + other + .approximate_size_bytes() + .map(|other_size| input_size + other_size) + }) + } + // Assume a simple sum of the sizes of both sides of the join for the post-join size. + // TODO(Clark): This will double-count join key columns, we should ensure that these are only counted once. + Self::BroadcastJoin(BroadcastJoin { + broadcaster: left, + receiver: right, + .. + }) + | Self::HashJoin(HashJoin { left, right, .. }) => { + left.approximate_size_bytes().and_then(|left_size| { + right + .approximate_size_bytes() + .map(|right_size| left_size + right_size) + }) + } + // TODO(Clark): Approximate post-aggregation sizes via grouping estimates + aggregation type. + Self::Aggregate(_) => None, + // No size approximation support for legacy I/O. + Self::TabularScanParquet(_) | Self::TabularScanCsv(_) | Self::TabularScanJson(_) => { + None + } + // Post-write DataFrame will contain paths to files that were written. + // TODO(Clark): Estimate output size via root directory and estimates for # of partitions given partitioning column. + Self::TabularWriteParquet(_) | Self::TabularWriteCsv(_) | Self::TabularWriteJson(_) => { + None + } } } }