Skip to content

Commit

Permalink
Add debug logging for optimizer.
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkzinzow committed Sep 12, 2023
1 parent f9c3c01 commit 7702bb4
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 13 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions daft/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,31 @@


def setup_logger() -> None:
import inspect
import logging

from loguru import logger
from loguru._defaults import env

logger.remove()
LOGURU_LEVEL = env("LOGURU_LEVEL", str, "INFO")
logger.add(sys.stderr, level=LOGURU_LEVEL)

class InterceptHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
# Get corresponding Loguru level if it exists.
level: str | int
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno

# Find caller from where originated the logged message.
frame, depth = inspect.currentframe(), 0
while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__):
frame = frame.f_back
depth += 1

logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())

logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
2 changes: 2 additions & 0 deletions src/daft-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
indexmap = {workspace = true}
log = {workspace = true}
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true, optional = true}
serde = {workspace = true, features = ["rc"]}
serde_json = {workspace = true}
snafu = {workspace = true}
Expand Down
22 changes: 21 additions & 1 deletion src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,27 @@ impl PyLogicalPlanBuilder {
pub fn optimize(&self) -> PyResult<Self> {
let optimizer = Optimizer::new(Default::default());
let unoptimized_plan = self.builder.build();
let optimized_plan = optimizer.optimize(unoptimized_plan, |_, _, _, _, _| {})?;
let optimized_plan = optimizer.optimize(
unoptimized_plan,
|new_plan, rule_batch, pass, transformed, seen| {
if transformed {
log::debug!(
"Rule batch {:?} transformed plan on pass {}, and produced {} plan:\n{}",
rule_batch,
pass,
if seen { "an already seen" } else { "a new" },
new_plan.repr_ascii(true),
);
} else {
log::debug!(
"Rule batch {:?} did NOT transform plan on pass {} for plan:\n{}",
rule_batch,
pass,
new_plan.repr_ascii(true),
);
}
},
)?;
let builder = LogicalPlanBuilder::new(optimized_plan);
Ok(builder.into())
}
Expand Down
27 changes: 19 additions & 8 deletions src/daft-plan/src/optimization/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use super::{
};

/// Config for optimizer.
#[derive(Debug)]
pub struct OptimizerConfig {
// Default maximum number of optimization passes the optimizer will make over a fixed-point RuleBatch.
pub default_max_optimizer_passes: usize,
Expand All @@ -33,10 +34,15 @@ impl Default for OptimizerConfig {
}
}

pub trait OptimizerRuleInBatch: OptimizerRule + std::fmt::Debug {}

impl<T: OptimizerRule + std::fmt::Debug> OptimizerRuleInBatch for T {}

/// A batch of logical optimization rules.
#[derive(Debug)]
pub struct RuleBatch {
// Optimization rules in this batch.
pub rules: Vec<Box<dyn OptimizerRule>>,
pub rules: Vec<Box<dyn OptimizerRuleInBatch>>,
// The rule execution strategy (once, fixed-point).
pub strategy: RuleExecutionStrategy,
// The application order for the entire rule batch, derived from the application
Expand All @@ -49,7 +55,7 @@ pub struct RuleBatch {
}

impl RuleBatch {
pub fn new(rules: Vec<Box<dyn OptimizerRule>>, strategy: RuleExecutionStrategy) -> Self {
pub fn new(rules: Vec<Box<dyn OptimizerRuleInBatch>>, strategy: RuleExecutionStrategy) -> Self {
// Get all unique application orders for the rules.
let unique_application_orders: Vec<ApplyOrder> = rules
.iter()
Expand All @@ -73,7 +79,7 @@ impl RuleBatch {

#[allow(dead_code)]
pub fn with_order(
rules: Vec<Box<dyn OptimizerRule>>,
rules: Vec<Box<dyn OptimizerRuleInBatch>>,
strategy: RuleExecutionStrategy,
order: Option<ApplyOrder>,
) -> Self {
Expand All @@ -99,6 +105,7 @@ impl RuleBatch {
}

/// The execution strategy for a batch of rules.
#[derive(Debug)]
pub enum RuleExecutionStrategy {
// Apply the batch of rules only once.
#[allow(dead_code)]
Expand Down Expand Up @@ -182,12 +189,12 @@ impl Optimizer {
if plan_tracker.add_plan(new_plan.as_ref()) {
// Transformed plan has not yet been seen by this optimizer, which means we have
// not reached a fixed-point or a cycle. We therefore continue applying this rule batch.
observer(new_plan.as_ref(), batch, pass, true, true);
observer(new_plan.as_ref(), batch, pass, true, false);
ControlFlow::Continue(new_plan)
} else {
// We've already seen this transformed plan, which means we have hit a cycle while repeatedly
// applying this rule batch. We therefore stop applying this rule batch.
observer(new_plan.as_ref(), batch, pass, true, false);
observer(new_plan.as_ref(), batch, pass, true, true);
ControlFlow::Break(Ok(new_plan))
}
}
Expand All @@ -213,7 +220,7 @@ impl Optimizer {
/// If order.is_some(), all rules are expected to have that application order.
pub fn optimize_with_rules(
&self,
rules: &[Box<dyn OptimizerRule>],
rules: &[Box<dyn OptimizerRuleInBatch>],
plan: Arc<LogicalPlan>,
order: &Option<ApplyOrder>,
) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
Expand Down Expand Up @@ -258,7 +265,7 @@ impl Optimizer {
/// in rule.try_optimize().
fn optimize_node(
&self,
rules: &[Box<dyn OptimizerRule>],
rules: &[Box<dyn OptimizerRuleInBatch>],
plan: Arc<LogicalPlan>,
) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
// Fold over the rules, applying each rule to this plan node sequentially.
Expand All @@ -271,7 +278,7 @@ impl Optimizer {
/// if the children are transformed.
fn optimize_children(
&self,
rules: &[Box<dyn OptimizerRule>],
rules: &[Box<dyn OptimizerRuleInBatch>],
plan: Arc<LogicalPlan>,
order: ApplyOrder,
) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
Expand Down Expand Up @@ -340,6 +347,7 @@ mod tests {
Ok(())
}

#[derive(Debug)]
struct NoOp {}

impl NoOp {
Expand Down Expand Up @@ -430,6 +438,7 @@ mod tests {
Ok(())
}

#[derive(Debug)]
struct RotateProjection {
reverse_first: Mutex<bool>,
}
Expand Down Expand Up @@ -532,6 +541,7 @@ mod tests {
Ok(())
}

#[derive(Debug)]
struct FilterOrFalse {}

impl FilterOrFalse {
Expand Down Expand Up @@ -560,6 +570,7 @@ mod tests {
}
}

#[derive(Debug)]
struct FilterAndTrue {}

impl FilterAndTrue {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-plan/src/optimization/rules/drop_repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{LogicalPlan, PartitionScheme};
use super::{ApplyOrder, OptimizerRule, Transformed};

/// Optimization rules for dropping unnecessary Repartitions.
#[derive(Default)]
#[derive(Default, Debug)]
pub struct DropRepartition {}

impl DropRepartition {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-plan/src/optimization/rules/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::{
};

/// Optimization rules for pushing Filters further into the logical plan.
#[derive(Default)]
#[derive(Default, Debug)]
pub struct PushDownFilter {}

impl PushDownFilter {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-plan/src/optimization/rules/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{source_info::SourceInfo, LogicalPlan};
use super::{ApplyOrder, OptimizerRule, Transformed};

/// Optimization rules for pushing Limits further into the logical plan.
#[derive(Default)]
#[derive(Default, Debug)]
pub struct PushDownLimit {}

impl PushDownLimit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{

use super::{ApplyOrder, OptimizerRule, Transformed};

#[derive(Default)]
#[derive(Default, Debug)]
pub struct PushDownProjection {}

impl PushDownProjection {
Expand Down

0 comments on commit 7702bb4

Please sign in to comment.