Skip to content

Commit

Permalink
Clean up with materialized_stats()
Browse files Browse the repository at this point in the history
  • Loading branch information
desmondcheongzx committed Nov 23, 2024
1 parent b3e4c3a commit a3aa463
Show file tree
Hide file tree
Showing 17 changed files with 28 additions and 56 deletions.
6 changes: 3 additions & 3 deletions src/daft-logical-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use indexmap::IndexSet;
use snafu::Snafu;

pub use crate::ops::*;
use crate::stats::StatsState;
use crate::stats::PlanStats;

/// Logical plan for a Daft query.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -199,7 +199,7 @@ impl LogicalPlan {
}
}

pub fn get_stats(&self) -> &StatsState {
pub fn materialized_stats(&self) -> &PlanStats {
match self {
Self::Source(Source { stats_state, .. })
| Self::Project(Project { stats_state, .. })
Expand All @@ -218,7 +218,7 @@ impl LogicalPlan {
| Self::Sink(Sink { stats_state, .. })
| Self::Sample(Sample { stats_state, .. })
| Self::MonotonicallyIncreasingId(MonotonicallyIncreasingId { stats_state, .. }) => {
stats_state
stats_state.materialized_stats()
}
Self::Intersect(_) => {
panic!("Intersect nodes should be optimized away before stats are materialized")
Expand Down
5 changes: 2 additions & 3 deletions src/daft-logical-plan/src/ops/actor_pool_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ impl ActorPoolProject {

pub(crate) fn with_materialized_stats(mut self) -> Self {
// TODO(desmond): We can do better estimations with the projection schema. For now, reuse the old logic.
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
self.stats_state = input_stats.clone();
let input_stats = self.input.materialized_stats();
self.stats_state = StatsState::Materialized(input_stats.clone());
self
}

Expand Down
4 changes: 1 addition & 3 deletions src/daft-logical-plan/src/ops/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ impl Aggregate {

pub(crate) fn with_materialized_stats(mut self) -> Self {
// TODO(desmond): We can use the schema here for better estimations. For now, use the old logic.
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
let input_stats = input_stats.clone().unwrap_or_default();
let input_stats = self.input.materialized_stats();
let est_bytes_per_row_lower = input_stats.approx_stats.lower_bound_bytes
/ (input_stats.approx_stats.lower_bound_rows.max(1));
let est_bytes_per_row_upper =
Expand Down
8 changes: 2 additions & 6 deletions src/daft-logical-plan/src/ops/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,8 @@ impl Concat {

pub(crate) fn with_materialized_stats(mut self) -> Self {
// TODO(desmond): We can do better estimations with the projection schema. For now, reuse the old logic.
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
let other_stats = self.other.get_stats();
assert!(matches!(other_stats, StatsState::Materialized(..)));
let input_stats = input_stats.clone().unwrap_or_default();
let other_stats = other_stats.clone().unwrap_or_default();
let input_stats = self.input.materialized_stats();
let other_stats = self.other.materialized_stats();
let approx_stats = &input_stats.approx_stats + &other_stats.approx_stats;
self.stats_state = StatsState::Materialized(PlanStats::new(approx_stats));
self
Expand Down
4 changes: 1 addition & 3 deletions src/daft-logical-plan/src/ops/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ impl Distinct {

pub(crate) fn with_materialized_stats(mut self) -> Self {
// TODO(desmond): We can simply use NDVs here. For now, do a naive estimation.
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
let input_stats = input_stats.clone().unwrap_or_default();
let input_stats = self.input.materialized_stats();
let est_bytes_per_row_lower = input_stats.approx_stats.lower_bound_bytes
/ (input_stats.approx_stats.lower_bound_rows.max(1));
let approx_stats = ApproxStats {
Expand Down
4 changes: 1 addition & 3 deletions src/daft-logical-plan/src/ops/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ impl Explode {
}

pub(crate) fn with_materialized_stats(mut self) -> Self {
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
let input_stats = input_stats.clone().unwrap_or_default();
let input_stats = self.input.materialized_stats();
let approx_stats = ApproxStats {
lower_bound_rows: input_stats.approx_stats.lower_bound_rows,
upper_bound_rows: None,
Expand Down
4 changes: 1 addition & 3 deletions src/daft-logical-plan/src/ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ impl Filter {
pub(crate) fn with_materialized_stats(mut self) -> Self {
// Assume no row/column pruning in cardinality-affecting operations.
// TODO(desmond): We can do better estimations here. For now, reuse the old logic.
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
let input_stats = input_stats.clone().unwrap_or_default();
let input_stats = self.input.materialized_stats();
let upper_bound_rows = input_stats.approx_stats.upper_bound_rows;
let upper_bound_bytes = input_stats.approx_stats.upper_bound_bytes;
let approx_stats = ApproxStats {
Expand Down
8 changes: 2 additions & 6 deletions src/daft-logical-plan/src/ops/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,8 @@ impl Join {
pub(crate) fn with_materialized_stats(mut self) -> Self {
// Assume a Primary-key + Foreign-Key join which would yield the max of the two tables.
// TODO(desmond): We can do better estimations here. For now, use the old logic.
let left_stats = self.left.get_stats();
let right_stats = self.right.get_stats();
assert!(matches!(left_stats, StatsState::Materialized(..)));
assert!(matches!(right_stats, StatsState::Materialized(..)));
let left_stats = left_stats.clone().unwrap_or_default();
let right_stats = right_stats.clone().unwrap_or_default();
let left_stats = self.left.materialized_stats();
let right_stats = self.right.materialized_stats();
let approx_stats = ApproxStats {
lower_bound_rows: 0,
upper_bound_rows: left_stats
Expand Down
4 changes: 1 addition & 3 deletions src/daft-logical-plan/src/ops/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ impl Limit {
}

pub(crate) fn with_materialized_stats(mut self) -> Self {
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
let input_stats = input_stats.clone().unwrap_or_default();
let input_stats = self.input.materialized_stats();
let limit = self.limit as usize;
let est_bytes_per_row_lower = input_stats.approx_stats.lower_bound_bytes
/ input_stats.approx_stats.lower_bound_rows.max(1);
Expand Down
5 changes: 2 additions & 3 deletions src/daft-logical-plan/src/ops/monotonically_increasing_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ impl MonotonicallyIncreasingId {

pub(crate) fn with_materialized_stats(mut self) -> Self {
// TODO(desmond): We can do better estimations with the projection schema. For now, reuse the old logic.
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
self.stats_state = input_stats.clone();
let input_stats = self.input.materialized_stats();
self.stats_state = StatsState::Materialized(input_stats.clone());
self
}

Expand Down
5 changes: 2 additions & 3 deletions src/daft-logical-plan/src/ops/pivot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ impl Pivot {

pub(crate) fn with_materialized_stats(mut self) -> Self {
// TODO(desmond): Pivoting does affect cardinality, but for now we keep the old logic.
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
self.stats_state = input_stats.clone();
let input_stats = self.input.materialized_stats();
self.stats_state = StatsState::Materialized(input_stats.clone());
self
}

Expand Down
5 changes: 2 additions & 3 deletions src/daft-logical-plan/src/ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ impl Project {

pub(crate) fn with_materialized_stats(mut self) -> Self {
// TODO(desmond): We can do better estimations with the projection schema. For now, reuse the old logic.
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
self.stats_state = input_stats.clone();
let input_stats = self.input.materialized_stats();
self.stats_state = StatsState::Materialized(input_stats.clone());
self
}

Expand Down
5 changes: 2 additions & 3 deletions src/daft-logical-plan/src/ops/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ impl Repartition {

pub(crate) fn with_materialized_stats(mut self) -> Self {
// Repartitioning does not affect cardinality.
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
self.stats_state = input_stats.clone();
let input_stats = self.input.materialized_stats();
self.stats_state = StatsState::Materialized(input_stats.clone());
self
}

Expand Down
4 changes: 1 addition & 3 deletions src/daft-logical-plan/src/ops/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ impl Sample {

pub(crate) fn with_materialized_stats(mut self) -> Self {
// TODO(desmond): We can do better estimations with the projection schema. For now, reuse the old logic.
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
let input_stats = input_stats.clone().unwrap_or_default();
let input_stats = self.input.materialized_stats();
let approx_stats = input_stats
.approx_stats
.apply(|v| ((v as f64) * self.fraction) as usize);
Expand Down
5 changes: 2 additions & 3 deletions src/daft-logical-plan/src/ops/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ impl Sort {

pub(crate) fn with_materialized_stats(mut self) -> Self {
// Sorting does not affect cardinality.
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
self.stats_state = input_stats.clone();
let input_stats = self.input.materialized_stats();
self.stats_state = StatsState::Materialized(input_stats.clone());
self
}

Expand Down
4 changes: 1 addition & 3 deletions src/daft-logical-plan/src/ops/unpivot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ impl Unpivot {
}

pub(crate) fn with_materialized_stats(mut self) -> Self {
let input_stats = self.input.get_stats();
assert!(matches!(input_stats, StatsState::Materialized(..)));
let input_stats = input_stats.clone().unwrap_or_default();
let input_stats = self.input.materialized_stats();
let num_values = self.values.len();
let approx_stats = ApproxStats {
lower_bound_rows: input_stats.approx_stats.lower_bound_rows * num_values,
Expand Down
4 changes: 2 additions & 2 deletions src/daft-logical-plan/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ pub enum StatsState {
}

impl StatsState {
pub fn unwrap_or_default(self) -> PlanStats {
pub fn materialized_stats(&self) -> &PlanStats {
match self {
Self::Materialized(stats) => stats,
Self::NotMaterialized => PlanStats::default(),
Self::NotMaterialized => panic!("Tried to get unmaterialized stats"),
}
}
}
Expand Down

0 comments on commit a3aa463

Please sign in to comment.