Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT]: expr simplifier #3393

Merged
merged 15 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions daft/delta_lake/delta_lake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ def __init__(
def schema(self) -> Schema:
return self._schema

def name(self) -> str:
return "DeltaLakeScanOperator"

def display_name(self) -> str:
return f"DeltaLakeScanOperator({self._table.metadata().name})"

Expand Down
3 changes: 3 additions & 0 deletions daft/hudi/hudi_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def __init__(self, table_uri: str, storage_config: StorageConfig) -> None:
def schema(self) -> Schema:
return self._schema

def name(self) -> str:
return "HudiScanOperator"

def display_name(self) -> str:
return f"HudiScanOperator({self._table.props.name})"

Expand Down
3 changes: 3 additions & 0 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ def __init__(self, iceberg_table: Table, snapshot_id: int | None, storage_config
def schema(self) -> Schema:
return self._schema

def name(self) -> str:
return "IcebergScanOperator"

def display_name(self) -> str:
return f"IcebergScanOperator({'.'.join(self._table.name())})"

Expand Down
3 changes: 3 additions & 0 deletions daft/io/_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def __init__(
self._generators = generators
self._schema = schema

def name(self) -> str:
return self.display_name()

def display_name(self) -> str:
return "GeneratorScanOperator"

Expand Down
3 changes: 3 additions & 0 deletions daft/io/_lance.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class LanceDBScanOperator(ScanOperator):
def __init__(self, ds: "lance.LanceDataset"):
self._ds = ds

def name(self) -> str:
return "LanceDBScanOperator"

def display_name(self) -> str:
return f"LanceDBScanOperator({self._ds.uri})"

Expand Down
3 changes: 3 additions & 0 deletions daft/sql/sql_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@
def schema(self) -> Schema:
return self._schema

def name(self) -> str:
return "SQLScanOperator"

Check warning on line 73 in daft/sql/sql_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/sql/sql_scan.py#L73

Added line #L73 was not covered by tests

def display_name(self) -> str:
return f"SQLScanOperator(sql={self.sql}, conn={self.conn})"

Expand Down
2 changes: 2 additions & 0 deletions src/common/scan-info/src/scan_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use daft_schema::schema::SchemaRef;
use crate::{PartitionField, Pushdowns, ScanTaskLikeRef};

pub trait ScanOperator: Send + Sync + Debug {
fn name(&self) -> &str;

fn schema(&self) -> SchemaRef;
fn partitioning_keys(&self) -> &[PartitionField];
fn file_path_column(&self) -> Option<&str>;
Expand Down
3 changes: 3 additions & 0 deletions src/common/scan-info/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@
}

impl ScanOperator for DummyScanOperator {
fn name(&self) -> &'static str {
"dummy"
}

Check warning on line 102 in src/common/scan-info/src/test/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/common/scan-info/src/test/mod.rs#L100-L102

Added lines #L100 - L102 were not covered by tests
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down
14 changes: 13 additions & 1 deletion src/daft-logical-plan/src/optimization/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use super::{
logical_plan_tracker::LogicalPlanTracker,
rules::{
DropRepartition, EliminateCrossJoin, EnrichWithStats, LiftProjectFromAgg, MaterializeScans,
OptimizerRule, PushDownFilter, PushDownLimit, PushDownProjection, SplitActorPoolProjects,
OptimizerRule, PushDownFilter, PushDownLimit, PushDownProjection, SimplifyExpressionsRule,
SplitActorPoolProjects,
},
};
use crate::LogicalPlan;
Expand Down Expand Up @@ -93,6 +94,12 @@ impl Optimizer {
pub fn new(config: OptimizerConfig) -> Self {
let mut rule_batches = Vec::new();

// we want to simplify expressions first to make the rest of the rules easier
rule_batches.push(RuleBatch::new(
vec![Box::new(SimplifyExpressionsRule::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put this under the rewrite rules since in my mind, the plan is in an invalid state before the rewrite rules are applied, so the rewrite rules should not rely on any optimizer rules but the optimizer rules can make assumptions about the validity of the plan.

// --- Split ActorPoolProjection nodes from Project nodes ---
// This is feature-flagged behind DAFT_ENABLE_ACTOR_POOL_PROJECTIONS=1
if config.enable_actor_pool_projections {
Expand Down Expand Up @@ -135,6 +142,11 @@ impl Optimizer {
vec![Box::new(PushDownLimit::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
));
// make a second pass at simplifying expressions. This is necessary because other rules can introduce new expressions
rule_batches.push(RuleBatch::new(
vec![Box::new(SimplifyExpressionsRule::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
));

// --- Materialize scan nodes ---
rule_batches.push(RuleBatch::new(
Expand Down
2 changes: 2 additions & 0 deletions src/daft-logical-plan/src/optimization/rules/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod push_down_filter;
mod push_down_limit;
mod push_down_projection;
mod rule;
mod simplify_expressions;
mod split_actor_pool_projects;

pub use drop_repartition::DropRepartition;
Expand All @@ -18,4 +19,5 @@ pub use push_down_filter::PushDownFilter;
pub use push_down_limit::PushDownLimit;
pub use push_down_projection::PushDownProjection;
pub use rule::OptimizerRule;
pub use simplify_expressions::SimplifyExpressionsRule;
pub use split_actor_pool_projects::SplitActorPoolProjects;
Loading
Loading