From 7702bb4238bab506ef77fee88e56ed2d3adc30d7 Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Tue, 12 Sep 2023 15:00:12 -0600 Subject: [PATCH] Add debug logging for optimizer. --- Cargo.lock | 2 ++ daft/logging.py | 22 +++++++++++++++ src/daft-plan/Cargo.toml | 2 ++ src/daft-plan/src/builder.rs | 22 ++++++++++++++- src/daft-plan/src/optimization/optimizer.rs | 27 +++++++++++++------ .../optimization/rules/drop_repartition.rs | 2 +- .../optimization/rules/push_down_filter.rs | 2 +- .../src/optimization/rules/push_down_limit.rs | 2 +- .../rules/push_down_projection.rs | 2 +- 9 files changed, 70 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index daad687e39..8d8003f0f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1129,7 +1129,9 @@ dependencies = [ "daft-dsl", "daft-table", "indexmap 2.0.0", + "log", "pyo3", + "pyo3-log", "serde", "serde_json", "snafu", diff --git a/daft/logging.py b/daft/logging.py index a10daded2d..b1d0ee51fb 100644 --- a/daft/logging.py +++ b/daft/logging.py @@ -4,9 +4,31 @@ def setup_logger() -> None: + import inspect + import logging + from loguru import logger from loguru._defaults import env logger.remove() LOGURU_LEVEL = env("LOGURU_LEVEL", str, "INFO") logger.add(sys.stderr, level=LOGURU_LEVEL) + + class InterceptHandler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + # Get corresponding Loguru level if it exists. + level: str | int + try: + level = logger.level(record.levelname).name + except ValueError: + level = record.levelno + + # Find caller from where originated the logged message. + frame, depth = inspect.currentframe(), 0 + while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__): + frame = frame.f_back + depth += 1 + + logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) + + logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True) diff --git a/src/daft-plan/Cargo.toml b/src/daft-plan/Cargo.toml index 1b8a39b350..62b0721f74 100644 --- a/src/daft-plan/Cargo.toml +++ b/src/daft-plan/Cargo.toml @@ -7,7 +7,9 @@ daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} daft-table = {path = "../daft-table", default-features = false} indexmap = {workspace = true} +log = {workspace = true} pyo3 = {workspace = true, optional = true} +pyo3-log = {workspace = true, optional = true} serde = {workspace = true, features = ["rc"]} serde_json = {workspace = true} snafu = {workspace = true} diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index accc0204bc..8fa31000c6 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -408,7 +408,27 @@ impl PyLogicalPlanBuilder { pub fn optimize(&self) -> PyResult { let optimizer = Optimizer::new(Default::default()); let unoptimized_plan = self.builder.build(); - let optimized_plan = optimizer.optimize(unoptimized_plan, |_, _, _, _, _| {})?; + let optimized_plan = optimizer.optimize( + unoptimized_plan, + |new_plan, rule_batch, pass, transformed, seen| { + if transformed { + log::debug!( + "Rule batch {:?} transformed plan on pass {}, and produced {} plan:\n{}", + rule_batch, + pass, + if seen { "an already seen" } else { "a new" }, + new_plan.repr_ascii(true), + ); + } else { + log::debug!( + "Rule batch {:?} did NOT transform plan on pass {} for plan:\n{}", + rule_batch, + pass, + new_plan.repr_ascii(true), + ); + } + }, + )?; let builder = LogicalPlanBuilder::new(optimized_plan); Ok(builder.into()) } diff --git a/src/daft-plan/src/optimization/optimizer.rs b/src/daft-plan/src/optimization/optimizer.rs index a3b9a8a1f1..346c93d523 100644 --- a/src/daft-plan/src/optimization/optimizer.rs +++ b/src/daft-plan/src/optimization/optimizer.rs @@ -13,6 +13,7 @@ use super::{ }; /// Config for optimizer. +#[derive(Debug)] pub struct OptimizerConfig { // Default maximum number of optimization passes the optimizer will make over a fixed-point RuleBatch. pub default_max_optimizer_passes: usize, @@ -33,10 +34,15 @@ impl Default for OptimizerConfig { } } +pub trait OptimizerRuleInBatch: OptimizerRule + std::fmt::Debug {} + +impl OptimizerRuleInBatch for T {} + /// A batch of logical optimization rules. +#[derive(Debug)] pub struct RuleBatch { // Optimization rules in this batch. - pub rules: Vec>, + pub rules: Vec>, // The rule execution strategy (once, fixed-point). pub strategy: RuleExecutionStrategy, // The application order for the entire rule batch, derived from the application @@ -49,7 +55,7 @@ pub struct RuleBatch { } impl RuleBatch { - pub fn new(rules: Vec>, strategy: RuleExecutionStrategy) -> Self { + pub fn new(rules: Vec>, strategy: RuleExecutionStrategy) -> Self { // Get all unique application orders for the rules. let unique_application_orders: Vec = rules .iter() @@ -73,7 +79,7 @@ impl RuleBatch { #[allow(dead_code)] pub fn with_order( - rules: Vec>, + rules: Vec>, strategy: RuleExecutionStrategy, order: Option, ) -> Self { @@ -99,6 +105,7 @@ impl RuleBatch { } /// The execution strategy for a batch of rules. +#[derive(Debug)] pub enum RuleExecutionStrategy { // Apply the batch of rules only once. #[allow(dead_code)] @@ -182,12 +189,12 @@ impl Optimizer { if plan_tracker.add_plan(new_plan.as_ref()) { // Transformed plan has not yet been seen by this optimizer, which means we have // not reached a fixed-point or a cycle. We therefore continue applying this rule batch. - observer(new_plan.as_ref(), batch, pass, true, true); + observer(new_plan.as_ref(), batch, pass, true, false); ControlFlow::Continue(new_plan) } else { // We've already seen this transformed plan, which means we have hit a cycle while repeatedly // applying this rule batch. We therefore stop applying this rule batch. - observer(new_plan.as_ref(), batch, pass, true, false); + observer(new_plan.as_ref(), batch, pass, true, true); ControlFlow::Break(Ok(new_plan)) } } @@ -213,7 +220,7 @@ impl Optimizer { /// If order.is_some(), all rules are expected to have that application order. pub fn optimize_with_rules( &self, - rules: &[Box], + rules: &[Box], plan: Arc, order: &Option, ) -> DaftResult>> { @@ -258,7 +265,7 @@ impl Optimizer { /// in rule.try_optimize(). fn optimize_node( &self, - rules: &[Box], + rules: &[Box], plan: Arc, ) -> DaftResult>> { // Fold over the rules, applying each rule to this plan node sequentially. @@ -271,7 +278,7 @@ impl Optimizer { /// if the children are transformed. fn optimize_children( &self, - rules: &[Box], + rules: &[Box], plan: Arc, order: ApplyOrder, ) -> DaftResult>> { @@ -340,6 +347,7 @@ mod tests { Ok(()) } + #[derive(Debug)] struct NoOp {} impl NoOp { @@ -430,6 +438,7 @@ mod tests { Ok(()) } + #[derive(Debug)] struct RotateProjection { reverse_first: Mutex, } @@ -532,6 +541,7 @@ mod tests { Ok(()) } + #[derive(Debug)] struct FilterOrFalse {} impl FilterOrFalse { @@ -560,6 +570,7 @@ mod tests { } } + #[derive(Debug)] struct FilterAndTrue {} impl FilterAndTrue { diff --git a/src/daft-plan/src/optimization/rules/drop_repartition.rs b/src/daft-plan/src/optimization/rules/drop_repartition.rs index 7d71aa9895..01b0f54c59 100644 --- a/src/daft-plan/src/optimization/rules/drop_repartition.rs +++ b/src/daft-plan/src/optimization/rules/drop_repartition.rs @@ -7,7 +7,7 @@ use crate::{LogicalPlan, PartitionScheme}; use super::{ApplyOrder, OptimizerRule, Transformed}; /// Optimization rules for dropping unnecessary Repartitions. -#[derive(Default)] +#[derive(Default, Debug)] pub struct DropRepartition {} impl DropRepartition { diff --git a/src/daft-plan/src/optimization/rules/push_down_filter.rs b/src/daft-plan/src/optimization/rules/push_down_filter.rs index 963c5602a8..9c272f32c5 100644 --- a/src/daft-plan/src/optimization/rules/push_down_filter.rs +++ b/src/daft-plan/src/optimization/rules/push_down_filter.rs @@ -21,7 +21,7 @@ use super::{ }; /// Optimization rules for pushing Filters further into the logical plan. -#[derive(Default)] +#[derive(Default, Debug)] pub struct PushDownFilter {} impl PushDownFilter { diff --git a/src/daft-plan/src/optimization/rules/push_down_limit.rs b/src/daft-plan/src/optimization/rules/push_down_limit.rs index 00d4d795a0..f4f2169af9 100644 --- a/src/daft-plan/src/optimization/rules/push_down_limit.rs +++ b/src/daft-plan/src/optimization/rules/push_down_limit.rs @@ -7,7 +7,7 @@ use crate::{source_info::SourceInfo, LogicalPlan}; use super::{ApplyOrder, OptimizerRule, Transformed}; /// Optimization rules for pushing Limits further into the logical plan. -#[derive(Default)] +#[derive(Default, Debug)] pub struct PushDownLimit {} impl PushDownLimit { diff --git a/src/daft-plan/src/optimization/rules/push_down_projection.rs b/src/daft-plan/src/optimization/rules/push_down_projection.rs index 48ba15457b..9789cd5d30 100644 --- a/src/daft-plan/src/optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/optimization/rules/push_down_projection.rs @@ -13,7 +13,7 @@ use crate::{ use super::{ApplyOrder, OptimizerRule, Transformed}; -#[derive(Default)] +#[derive(Default, Debug)] pub struct PushDownProjection {} impl PushDownProjection {