diff --git a/datafusion-optd-cli/src/main.rs b/datafusion-optd-cli/src/main.rs index d59d4147..f8c7e315 100644 --- a/datafusion-optd-cli/src/main.rs +++ b/datafusion-optd-cli/src/main.rs @@ -156,7 +156,7 @@ pub async fn main() -> Result<()> { let args = Args::parse(); tracing_subscriber::fmt() - .with_max_level(tracing::Level::INFO) + .with_max_level(tracing::Level::TRACE) .with_target(false) .with_ansi(false) .init(); diff --git a/optd-core/src/cascades/optimizer.rs b/optd-core/src/cascades/optimizer.rs index fc3767dd..67cf9573 100644 --- a/optd-core/src/cascades/optimizer.rs +++ b/optd-core/src/cascades/optimizer.rs @@ -3,24 +3,25 @@ // Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -use std::collections::{BTreeSet, HashMap, HashSet, VecDeque}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt::Display; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use anyhow::Result; use tracing::trace; use super::memo::{ArcMemoPlanNode, GroupInfo, Memo}; -use super::tasks::OptimizeGroupTask; use super::{NaiveMemo, Task}; use crate::cascades::memo::Winner; +use crate::cascades::tasks::get_initial_task; use crate::cost::CostModel; use crate::nodes::{ ArcPlanNode, ArcPredNode, NodeType, PlanNodeMeta, PlanNodeMetaMap, PlanNodeOrGroup, }; use crate::optimizer::Optimizer; use crate::property::{PropertyBuilder, PropertyBuilderAny}; -use crate::rules::Rule; +use crate::rules::{Rule, RuleMatcher}; pub type RuleId = usize; @@ -43,11 +44,19 @@ pub struct OptimizerProperties { pub struct CascadesOptimizer = NaiveMemo> { memo: M, - pub(super) tasks: VecDeque>>, + /// Stack of tasks that are waiting to be executed + tasks: Vec>>, + /// Monotonically increasing counter for task invocations + task_counter: AtomicUsize, explored_group: HashSet, explored_expr: HashSet, - fired_rules: HashMap>, - rules: Arc<[Arc>]>, + applied_rules: HashMap>, + /// Transformation rules that may be used while exploring + /// (logical -> logical) + transformation_rules: Arc<[(RuleId, Arc>)]>, + /// Implementation rules that may be used while optimizing + /// (logical -> physical) + implementation_rules: Arc<[(RuleId, Arc>)]>, disabled_rules: HashSet, cost: Arc>, property_builders: Arc<[Box>]>, @@ -94,29 +103,52 @@ impl Display for PredId { impl CascadesOptimizer> { pub fn new( - rules: Vec>>, + transformation_rules: Arc<[Arc>]>, + implementation_rules: Arc<[Arc>]>, cost: Box>>, property_builders: Vec>>, ) -> Self { - Self::new_with_prop(rules, cost, property_builders, Default::default()) + Self::new_with_prop( + transformation_rules, + implementation_rules, + cost, + property_builders, + Default::default(), + ) } pub fn new_with_prop( - rules: Vec>>, + transformation_rules: Arc<[Arc>]>, + implementation_rules: Arc<[Arc>]>, cost: Box>>, property_builders: Vec>>, prop: OptimizerProperties, ) -> Self { - let tasks = VecDeque::new(); + let tasks = Vec::new(); + // Assign rule IDs + let transformation_rules: Arc<[(RuleId, Arc>)]> = transformation_rules + .iter() + .enumerate() + .map(|(i, r)| (i, r.clone())) + .collect(); + let implementation_rules: Arc<[(RuleId, Arc>)]> = implementation_rules + .iter() + .enumerate() + .map(|(i, r)| (i + transformation_rules.len(), r.clone())) + .collect(); + debug_assert!(transformation_rules.iter().all(|(_, r)| !r.is_impl_rule())); + debug_assert!(implementation_rules.iter().all(|(_, r)| r.is_impl_rule())); let property_builders: Arc<[_]> = property_builders.into(); let memo = NaiveMemo::new(property_builders.clone()); Self { memo, + task_counter: AtomicUsize::new(0), tasks, explored_group: HashSet::new(), explored_expr: HashSet::new(), - fired_rules: HashMap::new(), - rules: rules.into(), + applied_rules: HashMap::new(), + transformation_rules, + implementation_rules, cost: cost.into(), ctx: OptimizerContext::default(), property_builders, @@ -128,7 +160,7 @@ impl CascadesOptimizer> { /// Clear the memo table and all optimizer states. pub fn step_clear(&mut self) { self.memo = NaiveMemo::new(self.property_builders.clone()); - self.fired_rules.clear(); + self.applied_rules.clear(); self.explored_group.clear(); self.explored_expr.clear(); } @@ -153,8 +185,12 @@ impl> CascadesOptimizer { self.cost.clone() } - pub fn rules(&self) -> Arc<[Arc>]> { - self.rules.clone() + pub fn transformation_rules(&self) -> Arc<[(RuleId, Arc>)]> { + self.transformation_rules.clone() + } + + pub fn implementation_rules(&self) -> Arc<[(RuleId, Arc>)]> { + self.implementation_rules.clone() } pub fn disable_rule(&mut self, rule_id: usize) { @@ -215,7 +251,7 @@ impl> CascadesOptimizer { /// Optimize a `RelNode`. pub fn step_optimize_rel(&mut self, root_rel: ArcPlanNode) -> Result { let (group_id, _) = self.add_new_expr(root_rel); - self.fire_optimize_tasks(group_id)?; + self.fire_optimize_tasks(group_id); Ok(group_id) } @@ -247,17 +283,30 @@ impl> CascadesOptimizer { res } - fn fire_optimize_tasks(&mut self, group_id: GroupId) -> Result<()> { - trace!(event = "fire_optimize_tasks", root_group_id = %group_id); - self.tasks - .push_back(Box::new(OptimizeGroupTask::new(group_id))); + pub fn get_next_task_id(&self) -> usize { + self.task_counter.fetch_add(1, Ordering::AcqRel) + } + + pub fn push_task(&mut self, task: Box>) { + self.tasks.push(task); + } + + fn pop_task(&mut self) -> Option>> { + self.tasks.pop() + } + + fn fire_optimize_tasks(&mut self, root_group_id: GroupId) { + trace!(event = "fire_optimize_tasks", root_group_id = %root_group_id); + let initial_task_id = self.get_next_task_id(); + self.push_task(get_initial_task(initial_task_id, root_group_id)); // get the task from the stack self.ctx.budget_used = false; let plan_space_begin = self.memo.estimated_plan_space(); let mut iter = 0; - while let Some(task) = self.tasks.pop_back() { - let new_tasks = task.execute(self)?; - self.tasks.extend(new_tasks); + while let Some(task) = self.pop_task() { + task.execute(self); + + // TODO: Iter is wrong iter += 1; if !self.ctx.budget_used { let plan_space = self.memo.estimated_plan_space(); @@ -286,12 +335,11 @@ impl> CascadesOptimizer { } } } - Ok(()) } fn optimize_inner(&mut self, root_rel: ArcPlanNode) -> Result> { let (group_id, _) = self.add_new_expr(root_rel); - self.fire_optimize_tasks(group_id)?; + self.fire_optimize_tasks(group_id); self.memo.get_best_group_binding(group_id, |_, _, _| {}) } @@ -374,15 +422,15 @@ impl> CascadesOptimizer { self.explored_expr.remove(&expr_id); } - pub(super) fn is_rule_fired(&self, group_expr_id: ExprId, rule_id: RuleId) -> bool { - self.fired_rules + pub(super) fn is_rule_applied(&self, group_expr_id: ExprId, rule_id: RuleId) -> bool { + self.applied_rules .get(&group_expr_id) .map(|rules| rules.contains(&rule_id)) .unwrap_or(false) } - pub(super) fn mark_rule_fired(&mut self, group_expr_id: ExprId, rule_id: RuleId) { - self.fired_rules + pub(super) fn mark_rule_applied(&mut self, group_expr_id: ExprId, rule_id: RuleId) { + self.applied_rules .entry(group_expr_id) .or_default() .insert(rule_id); @@ -406,3 +454,18 @@ impl> Optimizer for CascadesOptimizer { self.get_property_by_group::

(self.resolve_group_id(root_rel), idx) } } + +pub fn rule_matches_expr>( + rule: &Arc>>, + expr: &ArcMemoPlanNode, +) -> bool { + let matcher = rule.matcher(); + let typ_to_match = &expr.typ; + match matcher { + RuleMatcher::MatchNode { typ, .. } => typ == typ_to_match, + RuleMatcher::MatchDiscriminant { + typ_discriminant, .. + } => *typ_discriminant == std::mem::discriminant(typ_to_match), + _ => panic!("IR should have root node of match"), // TODO: what does this mean? replace text + } +} diff --git a/optd-core/src/cascades/tasks.rs b/optd-core/src/cascades/tasks.rs index 610c24e8..a88bbb14 100644 --- a/optd-core/src/cascades/tasks.rs +++ b/optd-core/src/cascades/tasks.rs @@ -3,26 +3,31 @@ // Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -use anyhow::Result; -use super::{CascadesOptimizer, Memo}; +use super::{CascadesOptimizer, GroupId, Memo}; use crate::nodes::NodeType; mod apply_rule; +mod explore_expr; mod explore_group; -mod optimize_expression; +mod optimize_expr; mod optimize_group; mod optimize_inputs; -pub use apply_rule::ApplyRuleTask; -pub use explore_group::ExploreGroupTask; -pub use optimize_expression::OptimizeExpressionTask; pub use optimize_group::OptimizeGroupTask; -pub use optimize_inputs::OptimizeInputsTask; pub trait Task>: 'static + Send + Sync { - fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>>; + fn execute(&self, optimizer: &mut CascadesOptimizer); +} - #[allow(dead_code)] - fn describe(&self) -> String; +pub fn get_initial_task>( + initial_task_id: usize, + root_group_id: GroupId, +) -> Box> { + Box::new(OptimizeGroupTask::new( + None, + initial_task_id, + root_group_id, + None, + )) } diff --git a/optd-core/src/cascades/tasks/apply_rule.rs b/optd-core/src/cascades/tasks/apply_rule.rs index b6e62efb..ceee98d1 100644 --- a/optd-core/src/cascades/tasks/apply_rule.rs +++ b/optd-core/src/cascades/tasks/apply_rule.rs @@ -1,37 +1,20 @@ -// Copyright (c) 2023-2024 CMU Database Group -// -// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - use std::sync::Arc; -use anyhow::Result; use itertools::Itertools; use tracing::trace; -use super::Task; -use crate::cascades::memo::ArcMemoPlanNode; -use crate::cascades::optimizer::{CascadesOptimizer, ExprId, RuleId}; -use crate::cascades::tasks::{OptimizeExpressionTask, OptimizeInputsTask}; -use crate::cascades::{GroupId, Memo}; -use crate::nodes::{ArcPlanNode, NodeType, PlanNode, PlanNodeOrGroup}; -use crate::rules::RuleMatcher; - -pub struct ApplyRuleTask { - rule_id: RuleId, - expr_id: ExprId, - exploring: bool, -} +use crate::{ + cascades::{ + memo::ArcMemoPlanNode, + optimizer::{rule_matches_expr, ExprId, RuleId}, + tasks::{explore_expr::ExploreExprTask, optimize_inputs::OptimizeInputsTask}, + CascadesOptimizer, GroupId, Memo, + }, + nodes::{ArcPlanNode, NodeType, PlanNode, PlanNodeOrGroup}, + rules::{Rule, RuleMatcher}, +}; -impl ApplyRuleTask { - pub fn new(rule_id: RuleId, expr_id: ExprId, exploring: bool) -> Self { - Self { - rule_id, - expr_id, - exploring, - } - } -} +use super::Task; // Pick/match logic, to get pieces of info to pass to the rule apply function // TODO: I would like to see this moved elsewhere @@ -153,60 +136,115 @@ fn match_and_pick_group>( matches } -impl> Task for ApplyRuleTask { - fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { - if optimizer.is_rule_fired(self.expr_id, self.rule_id) { - return Ok(vec![]); +pub struct ApplyRuleTask> { + parent_task_id: Option, + task_id: usize, + expr_id: ExprId, + rule_id: RuleId, + rule: Arc>>, + // TODO: Promise here? Maybe it can be part of the Rule trait. + cost_limit: Option, +} + +impl> ApplyRuleTask { + pub fn new( + parent_task_id: Option, + task_id: usize, + expr_id: ExprId, + rule_id: RuleId, + rule: Arc>>, + cost_limit: Option, + ) -> Self { + Self { + parent_task_id, + task_id, + expr_id, + rule_id, + rule, + cost_limit, } + } +} + +fn transform>( + optimizer: &CascadesOptimizer, + expr_id: ExprId, + rule: &Arc>>, +) -> Vec> { + let picked_datas = match_and_pick_expr(rule.matcher(), expr_id, optimizer); + + if picked_datas.is_empty() { + vec![] + } else { + picked_datas + .into_iter() + .flat_map(|picked_data| rule.apply(optimizer, picked_data)) + .collect() + } +} - if optimizer.is_rule_disabled(self.rule_id) { - optimizer.mark_rule_fired(self.expr_id, self.rule_id); - return Ok(vec![]); +fn update_memo>( + optimizer: &mut CascadesOptimizer, + group_id: GroupId, + new_exprs: Vec>, +) -> Vec { + let mut expr_ids = vec![]; + for new_expr in new_exprs { + if let Some(expr_id) = optimizer.add_expr_to_group(new_expr, group_id) { + expr_ids.push(expr_id); } + } + expr_ids +} - let rule = optimizer.rules()[self.rule_id].clone(); +/// TODO +/// +/// Pseudocode: +/// function ApplyRule(expr, rule, promise, limit) +/// newExprs ← Transform(expr,rule) +/// UpdateMemo(newExprs) +/// Sort exprs by promise +/// for newExpr ∈ newExprs do +/// if Rule is a transformation rule then +/// tasks.Push(ExplExpr(newExpr, limit)) +/// else +/// // Can fail if the cost limit becomes 0 or negative +/// limit ← UpdateCostLimit(newExpr, limit) +/// tasks.Push(OptInputs(newExpr, limit)) +impl> Task for ApplyRuleTask { + fn execute(&self, optimizer: &mut CascadesOptimizer) { + let expr = optimizer.get_expr_memoed(self.expr_id); + + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_begin", task = "apply_rule", rule_id = %self.rule_id, rule = %self.rule.name(), expr_id = %self.expr_id, expr = %expr); - trace!(event = "task_begin", task = "apply_rule", expr_id = %self.expr_id, rule_id = %self.rule_id, rule = %rule.name()); let group_id = optimizer.get_group_id(self.expr_id); - let mut tasks = vec![]; - let binding_exprs = match_and_pick_expr(rule.matcher(), self.expr_id, optimizer); - for binding in binding_exprs { - trace!(event = "before_apply_rule", task = "apply_rule", input_binding=%binding); - let applied = rule.apply(optimizer, binding); - - for expr in applied { - trace!(event = "after_apply_rule", task = "apply_rule", output_binding=%expr); - // TODO: remove clone in the below line - if let Some(expr_id) = optimizer.add_expr_to_group(expr.clone(), group_id) { - let typ = expr.unwrap_typ(); - if typ.is_logical() { - tasks.push( - Box::new(OptimizeExpressionTask::new(expr_id, self.exploring)) - as Box>, - ); - } else { - tasks.push(Box::new(OptimizeInputsTask::new( - expr_id, - !optimizer.prop.disable_pruning, - )) as Box>); - } - optimizer.unmark_expr_explored(expr_id); - trace!(event = "apply_rule", expr_id = %self.expr_id, rule_id = %self.rule_id, new_expr_id = %expr_id); - } else { - trace!(event = "apply_rule", expr_id = %self.expr_id, rule_id = %self.rule_id, "triggered group merge"); - } + + debug_assert!(rule_matches_expr(&self.rule, &expr)); + + let new_exprs = transform(optimizer, self.expr_id, &self.rule); + let new_expr_ids = update_memo(optimizer, group_id, new_exprs); + // TODO sort exprs by promise + for new_expr_id in new_expr_ids { + let is_transformation_rule = !self.rule.is_impl_rule(); + if is_transformation_rule { + // TODO: Increment transformation count + optimizer.push_task(Box::new(ExploreExprTask::new( + Some(self.task_id), + optimizer.get_next_task_id(), + new_expr_id, + self.cost_limit, + ))); + } else { + // TODO: Also, make cost limit optional with parameters struct like before + let new_limit = None; // TODO: How do we update cost limit + optimizer.push_task(Box::new(OptimizeInputsTask::new( + Some(self.task_id), + optimizer.get_next_task_id(), + new_expr_id, + new_limit, + ))); } } - optimizer.mark_rule_fired(self.expr_id, self.rule_id); - - trace!(event = "task_end", task = "apply_rule", expr_id = %self.expr_id, rule_id = %self.rule_id); - Ok(tasks) - } - - fn describe(&self) -> String { - format!( - "apply_rule {{ rule_id: {}, expr_id: {}, exploring: {} }}", - self.rule_id, self.expr_id, self.exploring - ) + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_finish", task = "apply_rule", rule_id = %self.rule_id, rule = %self.rule.name(), expr_id = %self.expr_id, expr = %expr); } } diff --git a/optd-core/src/cascades/tasks/explore_expr.rs b/optd-core/src/cascades/tasks/explore_expr.rs new file mode 100644 index 00000000..bf8fd727 --- /dev/null +++ b/optd-core/src/cascades/tasks/explore_expr.rs @@ -0,0 +1,97 @@ +use tracing::trace; + +use crate::{ + cascades::{ + optimizer::{rule_matches_expr, ExprId}, + CascadesOptimizer, Memo, + }, + nodes::NodeType, +}; + +use super::{apply_rule::ApplyRuleTask, explore_group::ExploreGroupTask, Task}; + +pub struct ExploreExprTask { + parent_task_id: Option, + task_id: usize, + expr_id: ExprId, + cost_limit: Option, +} + +impl ExploreExprTask { + pub fn new( + parent_task_id: Option, + task_id: usize, + expr_id: ExprId, + cost_limit: Option, + ) -> Self { + Self { + parent_task_id, + task_id, + expr_id, + cost_limit, + } + } +} + +/// ExploreExpr applies transformation rules to a single expression, to generate +/// more possible plans. +/// (Recall "transformation rules" are logical -> logical) +/// +/// Pseudocode: +/// function ExplExpr(expr, limit) +/// moves ← ∅ +/// for rule ∈ Transformation Rules do +/// // Can optionally apply guidance in if statement +/// if !expr.IsApplied(rule) and rule matches expr then +/// moves.Add(ApplyRule(expr, rule, promise, limit)) +/// // Sort moves by promise +/// for m ∈ moves do +/// tasks.Push(m) +/// for childExpr in inputs of expr do +/// grp ← GetGroup(childExpr) +/// if !grp.Explored then +/// tasks.Push(ExplGrp(grp, limit)) +impl> Task for ExploreExprTask { + fn execute(&self, optimizer: &mut CascadesOptimizer) { + let expr = optimizer.get_expr_memoed(self.expr_id); + let group_id = optimizer.get_group_id(self.expr_id); + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_begin", task = "explore_expr", group_id = %group_id, expr_id = %self.expr_id, expr = %expr); + + let mut moves = vec![]; + for (rule_id, rule) in optimizer.transformation_rules().iter() { + let is_rule_applied = optimizer.is_rule_applied(self.expr_id, *rule_id); + let rule_matches_expr = rule_matches_expr(rule, &expr); + if !is_rule_applied && rule_matches_expr { + // Mark rule applied before actually running ApplyRule, to avoid pushing the same task twice when query plan is warped + debug_assert!(!optimizer.is_rule_applied(self.expr_id, *rule_id)); + optimizer.mark_rule_applied(self.expr_id, *rule_id); + // TODO: Check transformation count and cancel invocation if we've hit a limit (does that happen here, or in ApplyRule?) + moves.push(Box::new(ApplyRuleTask::new( + Some(self.task_id), + optimizer.get_next_task_id(), + self.expr_id, + *rule_id, + rule.clone(), + self.cost_limit, + ))); + } + } + // TODO: Sort moves by promise here + for m in moves { + // TODO: Add an optimized way to enqueue several tasks without + // locking the tasks queue every time + optimizer.push_task(m); + } + for child_group_id in expr.children.iter() { + if !optimizer.is_group_explored(*child_group_id) { + optimizer.push_task(Box::new(ExploreGroupTask::new( + Some(self.task_id), + optimizer.get_next_task_id(), + *child_group_id, + self.cost_limit, + ))); + } + } + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_finish", task = "explore_expr", group_id = %group_id, expr_id = %self.expr_id, expr = %expr); + } +} diff --git a/optd-core/src/cascades/tasks/explore_group.rs b/optd-core/src/cascades/tasks/explore_group.rs index f2a47daa..8f84a71d 100644 --- a/optd-core/src/cascades/tasks/explore_group.rs +++ b/optd-core/src/cascades/tasks/explore_group.rs @@ -1,55 +1,57 @@ -// Copyright (c) 2023-2024 CMU Database Group -// -// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -use anyhow::Result; use tracing::trace; -use super::Task; -use crate::cascades::optimizer::{CascadesOptimizer, GroupId}; -use crate::cascades::tasks::OptimizeExpressionTask; -use crate::cascades::Memo; -use crate::nodes::NodeType; +use crate::{ + cascades::{CascadesOptimizer, GroupId, Memo}, + nodes::NodeType, +}; + +use super::{explore_expr::ExploreExprTask, Task}; pub struct ExploreGroupTask { + parent_task_id: Option, + task_id: usize, group_id: GroupId, + cost_limit: Option, } impl ExploreGroupTask { - pub fn new(group_id: GroupId) -> Self { - Self { group_id } + pub fn new( + parent_task_id: Option, + task_id: usize, + group_id: GroupId, + cost_limit: Option, + ) -> Self { + Self { + parent_task_id, + task_id, + group_id, + cost_limit, + } } } +/// ExploreGroup will apply transformation rules to generate more logical +/// expressions (or "explore" more logical expressions). It does this by +/// invoking the ExploreExpr task on every expression in the group. +/// (Recall "transformation rules" are logical -> logical) +/// +/// Pseudocode: +/// function ExplGrp(grp, limit) +/// grp.Explored ← true +/// for expr ∈ grp.Expressions do +/// tasks.Push(ExplExpr(expr, limit)) impl> Task for ExploreGroupTask { - fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { - trace!(event = "task_begin", task = "explore_group", group_id = %self.group_id); - let mut tasks = vec![]; - if optimizer.is_group_explored(self.group_id) { - trace!(target: "task_finish", task = "explore_group", result = "already explored, skipping", group_id = %self.group_id); - return Ok(vec![]); - } - let exprs = optimizer.get_all_exprs_in_group(self.group_id); - let exprs_cnt = exprs.len(); - for expr in exprs { - let typ = optimizer.get_expr_memoed(expr).typ.clone(); - if typ.is_logical() { - tasks - .push(Box::new(OptimizeExpressionTask::new(expr, true)) as Box>); - } - } + fn execute(&self, optimizer: &mut CascadesOptimizer) { + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_begin", task = "explore_group", group_id = %self.group_id); optimizer.mark_group_explored(self.group_id); - trace!( - event = "task_finish", - task = "explore_group", - result = "expand group", - exprs_cnt = exprs_cnt - ); - Ok(tasks) - } - - fn describe(&self) -> String { - format!("explore_group {}", self.group_id) + for expr in optimizer.get_all_exprs_in_group(self.group_id) { + optimizer.push_task(Box::new(ExploreExprTask::new( + Some(self.task_id), + optimizer.get_next_task_id(), + expr, + self.cost_limit, + ))); + } + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_finish", task = "explore_group", group_id = %self.group_id); } } diff --git a/optd-core/src/cascades/tasks/optimize_expr.rs b/optd-core/src/cascades/tasks/optimize_expr.rs new file mode 100644 index 00000000..ff4a232e --- /dev/null +++ b/optd-core/src/cascades/tasks/optimize_expr.rs @@ -0,0 +1,95 @@ +use tracing::trace; + +use crate::{ + cascades::{ + optimizer::{rule_matches_expr, ExprId}, + CascadesOptimizer, Memo, + }, + nodes::NodeType, +}; + +use super::{apply_rule::ApplyRuleTask, explore_group::ExploreGroupTask, Task}; + +pub struct OptimizeExprTask { + parent_task_id: Option, + task_id: usize, + expr_id: ExprId, + cost_limit: Option, +} + +impl OptimizeExprTask { + pub fn new( + parent_task_id: Option, + task_id: usize, + expr_id: ExprId, + cost_limit: Option, + ) -> Self { + Self { + parent_task_id, + task_id, + expr_id, + cost_limit, + } + } +} + +/// ExploreExpr applies implementation rules to a single expression, to generate +/// more possible plans. +/// (Recall "implementation rules" are logical -> logical) +/// +/// Pseudocode: +/// function OptExpr(expr, limit) +/// moves ← ∅ +/// for rule ∈ Rules do +/// // Can optionally apply guidance in if statement +/// if !expr.IsApplied(rule) and expr matches rule then +/// moves.Add(ApplyRule(expr, rule, promise, limit)) +/// // Sort moves by promise +/// for m ∈ moves do +/// tasks.Push(m) +/// for child ∈ inputs of expr do +/// grp ← GetGroup(child) +/// if !grp.Explored then +/// tasks.Push(ExplGrp(grp, limit)) +impl> Task for OptimizeExprTask { + fn execute(&self, optimizer: &mut CascadesOptimizer) { + let expr = optimizer.get_expr_memoed(self.expr_id); + let group_id = optimizer.get_group_id(self.expr_id); + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_begin", task = "optimize_expr", group_id = %group_id, expr_id = %self.expr_id, expr = %expr); + + let mut moves = vec![]; + for (rule_id, rule) in optimizer.implementation_rules().iter() { + let is_rule_applied = optimizer.is_rule_applied(self.expr_id, *rule_id); + let rule_matches_expr = rule_matches_expr(rule, &expr); + if !is_rule_applied && rule_matches_expr { + debug_assert!(!optimizer.is_rule_applied(self.expr_id, *rule_id)); + optimizer.mark_rule_applied(self.expr_id, *rule_id); + moves.push(Box::new(ApplyRuleTask::new( + Some(self.task_id), + optimizer.get_next_task_id(), + self.expr_id, + *rule_id, + rule.clone(), + self.cost_limit, + ))); + } + } + // TODO: Sort moves by promise here + for m in moves { + // TODO: Add an optimized way to enqueue several tasks without + // locking the tasks queue every time + optimizer.push_task(m); + } + for child_group_id in expr.children.iter() { + if !optimizer.is_group_explored(*child_group_id) { + optimizer.push_task(Box::new(ExploreGroupTask::new( + Some(self.task_id), + optimizer.get_next_task_id(), + *child_group_id, + self.cost_limit, + ))); + } + } + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_finish", task = "optimize_expr", group_id = %group_id, expr_id = %self.expr_id, expr = %expr); + } +} diff --git a/optd-core/src/cascades/tasks/optimize_expression.rs b/optd-core/src/cascades/tasks/optimize_expression.rs deleted file mode 100644 index a47f01a7..00000000 --- a/optd-core/src/cascades/tasks/optimize_expression.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright (c) 2023-2024 CMU Database Group -// -// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -use anyhow::Result; -use tracing::trace; - -use super::Task; -use crate::cascades::optimizer::{CascadesOptimizer, ExprId}; -use crate::cascades::tasks::{ApplyRuleTask, ExploreGroupTask}; -use crate::cascades::Memo; -use crate::nodes::NodeType; -use crate::rules::RuleMatcher; - -pub struct OptimizeExpressionTask { - expr_id: ExprId, - exploring: bool, -} - -impl OptimizeExpressionTask { - pub fn new(expr_id: ExprId, exploring: bool) -> Self { - Self { expr_id, exploring } - } -} - -fn top_matches(matcher: &RuleMatcher, match_typ: T) -> bool { - match matcher { - RuleMatcher::MatchNode { typ, .. } => typ == &match_typ, - RuleMatcher::MatchDiscriminant { - typ_discriminant, .. - } => std::mem::discriminant(&match_typ) == *typ_discriminant, - _ => panic!("IR should have root node of match"), - } -} - -impl> Task for OptimizeExpressionTask { - fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { - let expr = optimizer.get_expr_memoed(self.expr_id); - trace!(event = "task_begin", task = "optimize_expr", expr_id = %self.expr_id, expr = %expr); - let mut tasks = vec![]; - for (rule_id, rule) in optimizer.rules().iter().enumerate() { - if optimizer.is_rule_fired(self.expr_id, rule_id) { - continue; - } - // Skip impl rules when exploring - if self.exploring && rule.is_impl_rule() { - continue; - } - // Skip transformation rules when budget is used - if optimizer.ctx.budget_used && !rule.is_impl_rule() { - continue; - } - if top_matches(rule.matcher(), expr.typ.clone()) { - tasks.push( - Box::new(ApplyRuleTask::new(rule_id, self.expr_id, self.exploring)) - as Box>, - ); - for &input_group_id in &expr.children { - tasks.push( - Box::new(ExploreGroupTask::new(input_group_id)) as Box> - ); - } - } - } - trace!(event = "task_end", task = "optimize_expr", expr_id = %self.expr_id); - Ok(tasks) - } - - fn describe(&self) -> String { - format!("optimize_expr {}", self.expr_id) - } -} diff --git a/optd-core/src/cascades/tasks/optimize_group.rs b/optd-core/src/cascades/tasks/optimize_group.rs index 2863af8c..774870d0 100644 --- a/optd-core/src/cascades/tasks/optimize_group.rs +++ b/optd-core/src/cascades/tasks/optimize_group.rs @@ -1,59 +1,85 @@ -// Copyright (c) 2023-2024 CMU Database Group -// -// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -use anyhow::Result; use tracing::trace; -use super::Task; -use crate::cascades::optimizer::GroupId; -use crate::cascades::tasks::optimize_expression::OptimizeExpressionTask; -use crate::cascades::tasks::OptimizeInputsTask; -use crate::cascades::{CascadesOptimizer, Memo}; -use crate::nodes::NodeType; +use crate::{ + cascades::{CascadesOptimizer, GroupId, Memo}, + nodes::NodeType, +}; + +use super::{explore_group::ExploreGroupTask, optimize_expr::OptimizeExprTask, Task}; pub struct OptimizeGroupTask { + parent_task_id: Option, + task_id: usize, group_id: GroupId, + cost_limit: Option, } impl OptimizeGroupTask { - pub fn new(group_id: GroupId) -> Self { - Self { group_id } + pub fn new( + parent_task_id: Option, + task_id: usize, + group_id: GroupId, + cost_limit: Option, + ) -> Self { + Self { + parent_task_id, + task_id, + group_id, + cost_limit, + } } } +/// OptimizeGroup will find the best physical plan for the group. +/// It does this by applying implementation rules through the OptimizeExpr task +/// (Recall "implementation rules" are logical -> physical) +/// +/// Before it tries to generate different physical plans, it will invoke +/// explore tasks to generate more logical expressions through transformation +/// rules. +/// +/// Pseudocode: +/// function OptGrp(expr, limit) +/// grp ← GetGroup(expr) +/// if !grp.Explored then +/// tasks.Push(OptGrp(grp, limit)) +/// tasks.Push(ExplGrp(grp, limit)) +/// else +/// for expr ∈ grp.Expressions do +/// tasks.Push(OptExpr(expr, limit)) impl> Task for OptimizeGroupTask { - fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { - trace!(event = "task_begin", task = "optimize_group", group_id = %self.group_id); - let group_info = optimizer.get_group_info(self.group_id); - if group_info.winner.has_decided() { - trace!(event = "task_finish", task = "optimize_group"); - return Ok(vec![]); - } - let exprs = optimizer.get_all_exprs_in_group(self.group_id); - let mut tasks = vec![]; - let exprs_cnt = exprs.len(); - for &expr in &exprs { - let typ = optimizer.get_expr_memoed(expr).typ.clone(); - if typ.is_logical() { - tasks.push(Box::new(OptimizeExpressionTask::new(expr, false)) as Box>); - } - } - for &expr in &exprs { - let typ = optimizer.get_expr_memoed(expr).typ.clone(); - if !typ.is_logical() { - tasks.push(Box::new(OptimizeInputsTask::new( + fn execute(&self, optimizer: &mut CascadesOptimizer) { + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_begin", task = "optimize_group", group_id = %self.group_id); + let group_explored = optimizer.is_group_explored(self.group_id); + + // Apply transformation rules *before* trying to apply our + // implementation rules. (Task dependency enforced via stack push order) + if !group_explored { + // TODO(parallel): Task dependency here + optimizer.push_task(Box::new(OptimizeGroupTask::new( + Some(self.task_id), + optimizer.get_next_task_id(), + self.group_id, + self.cost_limit, + ))); + optimizer.push_task(Box::new(ExploreGroupTask::new( + Some(self.task_id), + optimizer.get_next_task_id(), + self.group_id, + self.cost_limit, + ))); + } else { + // Optimize every expression in the group + // (apply implementation rules) + for expr in optimizer.get_all_exprs_in_group(self.group_id) { + optimizer.push_task(Box::new(OptimizeExprTask::new( + Some(self.task_id), + optimizer.get_next_task_id(), expr, - !optimizer.prop.disable_pruning, - )) as Box>); + self.cost_limit, + ))); } } - trace!(event = "task_finish", task = "optimize_group", group_id = %self.group_id, exprs_cnt = exprs_cnt); - Ok(tasks) - } - - fn describe(&self) -> String { - format!("optimize_group {}", self.group_id) + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_finish", task = "optimize_group", group_id = %self.group_id); } } diff --git a/optd-core/src/cascades/tasks/optimize_inputs.rs b/optd-core/src/cascades/tasks/optimize_inputs.rs index c8c5a958..32bdb7fc 100644 --- a/optd-core/src/cascades/tasks/optimize_inputs.rs +++ b/optd-core/src/cascades/tasks/optimize_inputs.rs @@ -1,283 +1,229 @@ -// Copyright (c) 2023-2024 CMU Database Group -// -// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. +use std::sync::Arc; -use anyhow::Result; -use itertools::Itertools; use tracing::trace; -use super::Task; -use crate::cascades::memo::{GroupInfo, Winner, WinnerInfo}; -use crate::cascades::optimizer::ExprId; -use crate::cascades::tasks::OptimizeGroupTask; -use crate::cascades::{CascadesOptimizer, Memo, RelNodeContext}; -use crate::cost::{Cost, Statistics}; -use crate::nodes::NodeType; +use crate::{ + cascades::{ + memo::{GroupInfo, Winner, WinnerInfo}, + optimizer::{ExprId, RelNodeContext}, + CascadesOptimizer, GroupId, Memo, + }, + cost::{Cost, Statistics}, + nodes::NodeType, +}; -#[derive(Debug, Clone)] -struct ContinueTask { - next_group_idx: usize, - return_from_optimize_group: bool, -} - -struct ContinueTaskDisplay<'a>(&'a Option); - -impl std::fmt::Display for ContinueTaskDisplay<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.0 { - Some(x) => { - if x.return_from_optimize_group { - write!(f, "return,next_group_idx={}", x.next_group_idx) - } else { - write!(f, "enter,next_group_idx={}", x.next_group_idx) - } - } - None => write!(f, "none"), - } - } -} +use super::{optimize_group::OptimizeGroupTask, Task}; pub struct OptimizeInputsTask { + parent_task_id: Option, + task_id: usize, expr_id: ExprId, - continue_from: Option, - pruning: bool, + cost_limit: Option, + iteration: usize, } impl OptimizeInputsTask { - pub fn new(expr_id: ExprId, pruning: bool) -> Self { + pub fn new( + parent_task_id: Option, + task_id: usize, + expr_id: ExprId, + cost_limit: Option, + ) -> Self { Self { + parent_task_id, + task_id, expr_id, - continue_from: None, - pruning, + cost_limit, + iteration: 0, } } - fn continue_from(&self, cont: ContinueTask, pruning: bool) -> Self { + fn new_continue_iteration( + &self, + optimizer: &CascadesOptimizer>, + ) -> Self { Self { + parent_task_id: Some(self.task_id), + task_id: optimizer.get_next_task_id(), expr_id: self.expr_id, - continue_from: Some(cont), - pruning, + cost_limit: self.cost_limit, + iteration: self.iteration + 1, } } +} - fn update_winner_impossible>( - &self, - optimizer: &mut CascadesOptimizer, - ) { - let group_id = optimizer.get_group_id(self.expr_id); - if let Winner::Unknown = optimizer.get_group_info(group_id).winner { - optimizer.update_group_info( - group_id, - GroupInfo { - winner: Winner::Impossible, - }, - ); - } - } +fn get_input_cost>( + children: &[GroupId], + optimizer: &CascadesOptimizer, +) -> Vec { + let cost = optimizer.cost(); + let input_cost = children + .iter() + .map(|&group_id| { + optimizer + .get_group_info(group_id) + .winner + .as_full_winner() + .map(|x| x.total_cost.clone()) + .unwrap_or_else(|| cost.zero()) + }) + .collect::>(); + input_cost +} - fn update_winner>( - &self, - input_statistics: Vec>, - operation_cost: Cost, - total_cost: Cost, - optimizer: &mut CascadesOptimizer, - ) { - let group_id = optimizer.get_group_id(self.expr_id); - let group_info = optimizer.get_group_info(group_id); - let cost = optimizer.cost(); - let operation_weighted_cost = cost.weighted_cost(&operation_cost); - let total_weighted_cost = cost.weighted_cost(&total_cost); - let mut update_cost = false; - if let Some(winner) = group_info.winner.as_full_winner() { - if winner.total_weighted_cost > total_weighted_cost { - update_cost = true; - } - } else { +fn compute_cost>( + expr_id: ExprId, + optimizer: &mut CascadesOptimizer, +) -> (Cost, Cost, Vec>>) { + let group_id = optimizer.get_group_id(expr_id); + let expr = optimizer.get_expr_memoed(expr_id); + let cost = optimizer.cost(); + let children_group_ids = expr.children.clone(); + let context = RelNodeContext { + expr_id, + group_id, + children_group_ids: children_group_ids.clone(), + }; + let input_statistics = children_group_ids + .iter() + .map(|&group_id| { + optimizer + .get_group_info(group_id) + .winner + .as_full_winner() + .map(|x| x.statistics.clone()) + }) + .collect::>(); + let input_statistics_ref = input_statistics + .iter() + .map(|x| x.as_deref()) + .collect::>(); + let input_cost = children_group_ids + .iter() + .map(|&group_id| { + optimizer + .get_group_info(group_id) + .winner + .as_full_winner() + .map(|x| x.total_cost.clone()) + .unwrap_or_else(|| cost.zero()) + }) + .collect::>(); + let preds: Vec<_> = expr + .predicates + .iter() + .map(|pred_id| optimizer.get_pred(*pred_id)) + .collect(); + let operation_cost = cost.compute_operation_cost( + &expr.typ, + &preds, + &input_statistics_ref, + &input_cost, + Some(context.clone()), + Some(optimizer), + ); + let total_cost = cost.sum(&operation_cost, &input_cost); + (total_cost, operation_cost, input_statistics) +} + +fn update_winner>( + expr_id: ExprId, + optimizer: &mut CascadesOptimizer, +) { + let (total_cost, operation_cost, input_statistics) = compute_cost(expr_id, optimizer); + let group_id = optimizer.get_group_id(expr_id); + let group_info = optimizer.get_group_info(group_id); + let cost = optimizer.cost(); + let operation_weighted_cost = cost.weighted_cost(&operation_cost); + let total_weighted_cost = cost.weighted_cost(&total_cost); + let mut update_cost = false; + if let Some(winner) = group_info.winner.as_full_winner() { + if winner.total_weighted_cost > total_weighted_cost { update_cost = true; } - if update_cost { - let expr = optimizer.get_expr_memoed(self.expr_id); - let preds = expr - .predicates + } else { + update_cost = true; + } + if update_cost { + let expr = optimizer.get_expr_memoed(expr_id); + let preds: Vec<_> = expr + .predicates + .iter() + .map(|pred_id| optimizer.get_pred(*pred_id)) + .collect(); + let input_statistics = input_statistics + .iter() + .map(|x| x.as_deref()) + .collect::>(); + let statistics = cost.derive_statistics( + &expr.typ, + &preds, + &input_statistics .iter() - .map(|pred_id| optimizer.get_pred(*pred_id)) - .collect_vec(); - let statistics = cost.derive_statistics( - &expr.typ, - &preds, - &input_statistics - .iter() - .map(|x| x.expect("child winner should always have statistics?")) - .collect::>(), - Some(RelNodeContext { - group_id, - expr_id: self.expr_id, - children_group_ids: expr.children.clone(), - }), - Some(optimizer), - ); - optimizer.update_group_info( + .map(|x| x.expect("child winner should always have statistics?")) + .collect::>(), + Some(RelNodeContext { group_id, - GroupInfo { - winner: Winner::Full(WinnerInfo { - expr_id: self.expr_id, - total_weighted_cost, - operation_weighted_cost, - total_cost, - operation_cost, - statistics: statistics.into(), - }), - }, - ); - } + expr_id, + children_group_ids: expr.children.clone(), + }), + Some(optimizer), + ); + optimizer.update_group_info( + group_id, + GroupInfo { + winner: Winner::Full(WinnerInfo { + expr_id, + total_weighted_cost, + operation_weighted_cost, + total_cost, + operation_cost, + statistics: statistics.into(), + }), + }, + ); } } +/// TODO +/// +/// Pseudocode: +/// function OptInputs(expr, rule, limit) +/// childExpr ← expr.GetNextInput() +/// if childExpr is null then +/// memo.UpdateBestPlan(expr) +/// return +/// tasks.Push(OptInputs(expr, limit)) +/// UpdateCostBound(expr) +/// limit ← UpdateCostLimit(expr, limit) +/// tasks.Push(OptGrp(GetGroup(childExpr), limit)) impl> Task for OptimizeInputsTask { - fn execute(&self, optimizer: &mut CascadesOptimizer) -> Result>>> { - if self.continue_from.is_none() { - if optimizer.is_expr_explored(self.expr_id) { - // skip optimize_inputs to avoid dead-loop: consider join commute being fired twice - // that produces two projections, therefore having groups like - // projection1 -> projection2 -> join = projection1. - trace!(event = "task_skip", task = "optimize_inputs", expr_id = %self.expr_id); - return Ok(vec![]); - } - optimizer.mark_expr_explored(self.expr_id); - } + fn execute(&self, optimizer: &mut CascadesOptimizer) { let expr = optimizer.get_expr_memoed(self.expr_id); let group_id = optimizer.get_group_id(self.expr_id); - let children_group_ids = &expr.children; - let cost = optimizer.cost(); - - trace!(event = "task_begin", task = "optimize_inputs", expr_id = %self.expr_id, continue_from = %ContinueTaskDisplay(&self.continue_from), total_children = %children_group_ids.len()); - - if let Some(ContinueTask { - next_group_idx, - return_from_optimize_group, - }) = self.continue_from.clone() - { - let context = RelNodeContext { - expr_id: self.expr_id, - group_id, - children_group_ids: children_group_ids.clone(), - }; - let input_statistics = children_group_ids - .iter() - .map(|&group_id| { - optimizer - .get_group_info(group_id) - .winner - .as_full_winner() - .map(|x| x.statistics.clone()) - }) - .collect::>(); - let input_statistics_ref = input_statistics - .iter() - .map(|x| x.as_deref()) - .collect::>(); - let input_cost = children_group_ids - .iter() - .map(|&group_id| { - optimizer - .get_group_info(group_id) - .winner - .as_full_winner() - .map(|x| x.total_cost.clone()) - .unwrap_or_else(|| cost.zero()) - }) - .collect::>(); - let preds = expr - .predicates - .iter() - .map(|pred_id| optimizer.get_pred(*pred_id)) - .collect_vec(); - let operation_cost = cost.compute_operation_cost( - &expr.typ, - &preds, - &input_statistics_ref, - &input_cost, - Some(context.clone()), - Some(optimizer), - ); - let total_cost = cost.sum(&operation_cost, &input_cost); - - if self.pruning { - let group_info = optimizer.get_group_info(group_id); - fn trace_fmt(winner: &Winner) -> String { - match winner { - Winner::Full(winner) => winner.total_weighted_cost.to_string(), - Winner::Impossible => "impossible".to_string(), - Winner::Unknown => "unknown".to_string(), - } - } - trace!( - event = "compute_cost", - task = "optimize_inputs", - expr_id = %self.expr_id, - weighted_cost_so_far = cost.weighted_cost(&total_cost), - winner_weighted_cost = %trace_fmt(&group_info.winner), - current_processing = %next_group_idx, - total_child_groups = %children_group_ids.len()); - if let Some(winner) = group_info.winner.as_full_winner() { - let cost_so_far = cost.weighted_cost(&total_cost); - if winner.total_weighted_cost <= cost_so_far { - trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id, result = "pruned"); - return Ok(vec![]); - } - } - } - - if next_group_idx < children_group_ids.len() { - let child_group_id = children_group_ids[next_group_idx]; - let group_idx = next_group_idx; - let child_group_info = optimizer.get_group_info(child_group_id); - if !child_group_info.winner.has_full_winner() { - if !return_from_optimize_group { - trace!(event = "task_yield", task = "optimize_inputs", expr_id = %self.expr_id, group_idx = %group_idx, yield_to = "optimize_group", optimize_group_id = %child_group_id); - return Ok(vec![ - Box::new(self.continue_from( - ContinueTask { - next_group_idx, - return_from_optimize_group: true, - }, - self.pruning, - )) as Box>, - Box::new(OptimizeGroupTask::new(child_group_id)) as Box>, - ]); - } else { - self.update_winner_impossible(optimizer); - trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id, result = "impossible"); - return Ok(vec![]); - } - } - trace!(event = "task_yield", task = "optimize_inputs", expr_id = %self.expr_id, group_idx = %group_idx, yield_to = "next_optimize_input"); - Ok(vec![Box::new(self.continue_from( - ContinueTask { - next_group_idx: group_idx + 1, - return_from_optimize_group: false, - }, - self.pruning, - )) as Box>]) - } else { - self.update_winner(input_statistics_ref, operation_cost, total_cost, optimizer); - trace!(event = "task_finish", task = "optimize_inputs", expr_id = %self.expr_id, result = "optimized"); - Ok(vec![]) - } - } else { - trace!(event = "task_yield", task = "optimize_inputs", expr_id = %self.expr_id); - Ok(vec![Box::new(self.continue_from( - ContinueTask { - next_group_idx: 0, - return_from_optimize_group: false, - }, - self.pruning, - )) as Box>]) + // TODO: add typ to more traces and iteration to traces below + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_begin", task = "optimize_inputs", iteration = %self.iteration, group_id = %group_id, expr_id = %self.expr_id, expr = %expr); + let next_child_expr = expr.children.get(self.iteration); + if next_child_expr.is_none() { + // TODO: If we want to support interrupting the optimizer, it might + // behoove us to update the winner more often than this. + update_winner(self.expr_id, optimizer); + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_finish", task = "optimize_inputs", iteration = %self.iteration, group_id = %group_id, expr_id = %self.expr_id, expr = %expr); + return; } - } - - fn describe(&self) -> String { - format!("optimize_inputs {}", self.expr_id) + let next_child_expr = next_child_expr.unwrap(); + + //TODO(parallel): Task dependency + //TODO: Should be able to add multiple tasks at once + optimizer.push_task(Box::new(self.new_continue_iteration(optimizer))); + // TODO updatecostbound (involves cost limit) + let new_limit = None; // TODO: How do we update cost limit + optimizer.push_task(Box::new(OptimizeGroupTask::new( + Some(self.task_id), + optimizer.get_next_task_id(), + *next_child_expr, + new_limit, + ))); + trace!(task_id = self.task_id, parent_task_id = self.parent_task_id, event = "task_finish", task = "optimize_inputs", iteration = %self.iteration, group_id = %group_id, expr_id = %self.expr_id, expr = %expr); } } diff --git a/optd-datafusion-repr/src/lib.rs b/optd-datafusion-repr/src/lib.rs index 2e80ff19..f81f3c42 100644 --- a/optd-datafusion-repr/src/lib.rs +++ b/optd-datafusion-repr/src/lib.rs @@ -94,29 +94,37 @@ impl DatafusionOptimizer { ] } - pub fn default_cascades_rules() -> Vec>>> - { - let rules = rules::PhysicalConversionRule::all_conversions(); - let mut rule_wrappers = vec![]; - for rule in rules { - rule_wrappers.push(rule); - } - rule_wrappers.push(Arc::new(rules::FilterProjectTransposeRule::new())); - rule_wrappers.push(Arc::new(rules::FilterCrossJoinTransposeRule::new())); - rule_wrappers.push(Arc::new(rules::FilterInnerJoinTransposeRule::new())); - rule_wrappers.push(Arc::new(rules::FilterSortTransposeRule::new())); - rule_wrappers.push(Arc::new(rules::FilterAggTransposeRule::new())); - rule_wrappers.push(Arc::new(rules::HashJoinRule::new())); - rule_wrappers.push(Arc::new(rules::JoinCommuteRule::new())); - rule_wrappers.push(Arc::new(rules::JoinAssocRule::new())); - rule_wrappers.push(Arc::new(rules::ProjectionPullUpJoin::new())); - rule_wrappers.push(Arc::new(rules::EliminateProjectRule::new())); - rule_wrappers.push(Arc::new(rules::ProjectMergeRule::new())); - rule_wrappers.push(Arc::new(rules::EliminateLimitRule::new())); - rule_wrappers.push(Arc::new(rules::EliminateJoinRule::new())); - rule_wrappers.push(Arc::new(rules::EliminateFilterRule::new())); - rule_wrappers.push(Arc::new(rules::ProjectFilterTransposeRule::new())); - rule_wrappers + pub fn default_cascades_rules() -> ( + Vec>>>, + Vec>>>, + ) { + // Transformation rules (logical->logical) + let mut transformation_rules: Vec< + Arc>>, + > = vec![]; + // TODO: There's a significant cycle issue in this PR! + // transformation_rules.push(Arc::new(rules::FilterProjectTransposeRule::new())); + transformation_rules.push(Arc::new(rules::FilterCrossJoinTransposeRule::new())); + transformation_rules.push(Arc::new(rules::FilterInnerJoinTransposeRule::new())); + transformation_rules.push(Arc::new(rules::FilterSortTransposeRule::new())); + transformation_rules.push(Arc::new(rules::FilterAggTransposeRule::new())); + transformation_rules.push(Arc::new(rules::JoinCommuteRule::new())); + transformation_rules.push(Arc::new(rules::JoinAssocRule::new())); + // transformation_rules.push(Arc::new(rules::ProjectionPullUpJoin::new())); + transformation_rules.push(Arc::new(rules::EliminateProjectRule::new())); + // transformation_rules.push(Arc::new(rules::ProjectMergeRule::new())); + transformation_rules.push(Arc::new(rules::EliminateLimitRule::new())); + transformation_rules.push(Arc::new(rules::EliminateJoinRule::new())); + transformation_rules.push(Arc::new(rules::EliminateFilterRule::new())); + // transformation_rules.push(Arc::new(rules::ProjectFilterTransposeRule::new())); + + // Implementation rules (logical->physical) + let mut implementation_rules: Vec< + Arc>>, + > = rules::PhysicalConversionRule::all_conversions(); + implementation_rules.push(Arc::new(rules::HashJoinRule::new())); + + (transformation_rules, implementation_rules) } /// Create an optimizer with partial explore (otherwise it's too slow). @@ -132,7 +140,8 @@ impl DatafusionOptimizer { cost_model: impl CostModel>, runtime_map: RuntimeAdaptionStorage, ) -> Self { - let cascades_rules = Self::default_cascades_rules(); + let (cascades_transformation_rules, cascades_implementation_rules) = + Self::default_cascades_rules(); let heuristic_rules = Self::default_heuristic_rules(); let property_builders: Arc<[Box>]> = Arc::new([ Box::new(SchemaPropertyBuilder::new(catalog.clone())), @@ -141,7 +150,8 @@ impl DatafusionOptimizer { Self { runtime_statistics: runtime_map, cascades_optimizer: CascadesOptimizer::new_with_prop( - cascades_rules, + cascades_transformation_rules.into(), + cascades_implementation_rules.into(), Box::new(cost_model), vec![ Box::new(SchemaPropertyBuilder::new(catalog.clone())), @@ -166,21 +176,23 @@ impl DatafusionOptimizer { /// The optimizer settings for three-join demo as a perfect optimizer. pub fn new_alternative_physical_for_demo(catalog: Arc) -> Self { - let rules = rules::PhysicalConversionRule::all_conversions(); - let mut rule_wrappers = Vec::new(); - for rule in rules { - rule_wrappers.push(rule); - } - rule_wrappers.push(Arc::new(rules::HashJoinRule::new())); - rule_wrappers.insert(0, Arc::new(rules::JoinCommuteRule::new())); - rule_wrappers.insert(1, Arc::new(rules::JoinAssocRule::new())); - rule_wrappers.insert(2, Arc::new(rules::ProjectionPullUpJoin::new())); - rule_wrappers.insert(3, Arc::new(rules::EliminateFilterRule::new())); + let mut impl_rules: Vec>>> = + rules::PhysicalConversionRule::all_conversions(); + + let mut transform_rules: Vec>>> = + vec![]; + + impl_rules.push(Arc::new(rules::HashJoinRule::new())); + transform_rules.insert(0, Arc::new(rules::JoinCommuteRule::new())); + transform_rules.insert(1, Arc::new(rules::JoinAssocRule::new())); + transform_rules.insert(2, Arc::new(rules::ProjectionPullUpJoin::new())); + transform_rules.insert(3, Arc::new(rules::EliminateFilterRule::new())); let cost_model = AdaptiveCostModel::new(1000); let runtime_statistics = cost_model.get_runtime_map(); let optimizer = CascadesOptimizer::new( - rule_wrappers, + impl_rules.into(), + transform_rules.into(), Box::new(cost_model), vec![ Box::new(SchemaPropertyBuilder::new(catalog.clone())), diff --git a/optd-sqlplannertest/src/lib.rs b/optd-sqlplannertest/src/lib.rs index c556d9d0..bddc9520 100644 --- a/optd-sqlplannertest/src/lib.rs +++ b/optd-sqlplannertest/src/lib.rs @@ -133,28 +133,31 @@ impl DatafusionDBMS { } else { optimizer.disable_pruning(false); } - let rules = optimizer.rules(); + let t_rules = optimizer.transformation_rules(); + let i_rules = optimizer.transformation_rules(); if flags.enable_logical_rules.is_empty() { - for r in 0..rules.len() { - optimizer.enable_rule(r); + for (r_id, _) in t_rules.iter() { + optimizer.enable_rule(*r_id); + } + for (r_id, _) in i_rules.iter() { + optimizer.enable_rule(*r_id); } guard.as_mut().unwrap().enable_heuristic(true); } else { - for (rule_id, rule) in rules.as_ref().iter().enumerate() { - if rule.is_impl_rule() { - optimizer.enable_rule(rule_id); - } else { - optimizer.disable_rule(rule_id); - } + for (rule_id, _) in t_rules.iter() { + optimizer.disable_rule(*rule_id); + } + for (rule_id, _) in i_rules.iter() { + optimizer.enable_rule(*rule_id); } let mut rules_to_enable = flags .enable_logical_rules .iter() .map(|x| x.as_str()) .collect::>(); - for (rule_id, rule) in rules.as_ref().iter().enumerate() { + for (rule_id, rule) in t_rules.iter().chain(i_rules.iter()) { if rules_to_enable.remove(rule.name()) { - optimizer.enable_rule(rule_id); + optimizer.enable_rule(*rule_id); } } if !rules_to_enable.is_empty() {