From adbf947cd256fd5ca5f55b3083205454f88253f7 Mon Sep 17 00:00:00 2001 From: desmondcheongzx Date: Sat, 23 Nov 2024 00:08:53 -0800 Subject: [PATCH] Cleanup --- .../optimization/rules/materialize_scans.rs | 32 ++++++++++++------- .../src/physical_planner/planner.rs | 5 +-- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/daft-logical-plan/src/optimization/rules/materialize_scans.rs b/src/daft-logical-plan/src/optimization/rules/materialize_scans.rs index 45148c734d..78e06597ab 100644 --- a/src/daft-logical-plan/src/optimization/rules/materialize_scans.rs +++ b/src/daft-logical-plan/src/optimization/rules/materialize_scans.rs @@ -17,26 +17,36 @@ use common_treenode::{Transformed, TreeNode}; use super::OptimizerRule; use crate::{LogicalPlan, SourceInfo}; -// Add stats to all logical plan nodes in a bottom up fashion. +// Materialize scan tasks from scan operators for all physical scans. impl OptimizerRule for MaterializeScans { fn try_optimize(&self, plan: Arc) -> DaftResult>> { - plan.transform_up(|node| self.try_optimize_node(Arc::unwrap_or_clone(node))) + plan.transform_up(|node| self.try_optimize_node(node)) } } impl MaterializeScans { #[allow(clippy::only_used_in_recursion)] - fn try_optimize_node(&self, plan: LogicalPlan) -> DaftResult>> { - match plan { + fn try_optimize_node( + &self, + plan: Arc, + ) -> DaftResult>> { + match &*plan { LogicalPlan::Source(source) => match &*source.source_info { - SourceInfo::Physical(_) => Ok(Transformed::yes( - source - .build_materialized_scan_source(self.execution_config.as_deref()) - .into(), - )), - _ => Ok(Transformed::no(Arc::new(LogicalPlan::Source(source)))), + SourceInfo::Physical(_) => { + let source_plan = Arc::unwrap_or_clone(plan); + if let LogicalPlan::Source(source) = source_plan { + Ok(Transformed::yes( + source + .build_materialized_scan_source(self.execution_config.as_deref()) + .into(), + )) + } else { + unreachable!("This logical plan was already matched as a Source node") + } + } + _ => Ok(Transformed::no(plan)), }, - _ => Ok(Transformed::no(Arc::new(plan))), + _ => Ok(Transformed::no(plan)), } } } diff --git a/src/daft-physical-plan/src/physical_planner/planner.rs b/src/daft-physical-plan/src/physical_planner/planner.rs index d7e227b39a..0b5fb4e905 100644 --- a/src/daft-physical-plan/src/physical_planner/planner.rs +++ b/src/daft-physical-plan/src/physical_planner/planner.rs @@ -230,9 +230,10 @@ impl TreeNodeRewriter for ReplacePlaceholdersWithMaterializedResult { let new_source_node = LogicalPlan::Source(Source::new( mat_results.in_memory_info.source_schema.clone(), SourceInfo::InMemory(mat_results.in_memory_info).into(), - )); + )) + .arced(); Ok(Transformed::new( - new_source_node.arced(), + new_source_node, true, TreeNodeRecursion::Stop, ))