Skip to content

Commit

Permalink
[FEAT] (ACTORS-3) Propagate feature flags from Planning Config throug…
Browse files Browse the repository at this point in the history
…h to logical optimizer (#2674)

Perform forwarding of the feature flags from our DaftPlanningConfig to
the logical plan builder, so that it is actually properly used when
building the plan

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Aug 26, 2024
1 parent e333fc3 commit 3cacd59
Show file tree
Hide file tree
Showing 15 changed files with 136 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ def minhash(
ngram_size: int,
seed: int = 1,
) -> PyExpr: ...
def sql(sql: str, catalog: PyCatalog) -> LogicalPlanBuilder: ...
def sql(sql: str, catalog: PyCatalog, daft_planning_config: PyDaftPlanningConfig) -> LogicalPlanBuilder: ...
def sql_expr(sql: str) -> PyExpr: ...
def utf8_count_matches(expr: PyExpr, patterns: PyExpr, whole_words: bool, case_sensitive: bool) -> PyExpr: ...
def list_sort(expr: PyExpr, desc: PyExpr) -> PyExpr: ...
Expand Down Expand Up @@ -1625,6 +1625,7 @@ class LogicalPlanBuilder:
) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan(scan_operator: ScanOperatorHandle) -> LogicalPlanBuilder: ...
def with_planning_config(self, daft_planning_config: PyDaftPlanningConfig) -> LogicalPlanBuilder: ...
def select(self, to_select: list[PyExpr]) -> LogicalPlanBuilder: ...
def with_columns(self, columns: list[PyExpr]) -> LogicalPlanBuilder: ...
def exclude(self, to_exclude: list[str]) -> LogicalPlanBuilder: ...
Expand Down
25 changes: 24 additions & 1 deletion daft/logical/builder.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import functools
import pathlib
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Callable

from daft.context import get_context
from daft.daft import (
CountMode,
FileFormat,
Expand All @@ -26,6 +28,25 @@
)


def _apply_daft_planning_config_to_initializer(classmethod_func: Callable[..., LogicalPlanBuilder]):
"""Decorator to be applied to any @classmethod instantiation method on LogicalPlanBuilder
This decorator ensures that the current DaftPlanningConfig is applied to the instantiated LogicalPlanBuilder
"""

@functools.wraps(classmethod_func)
def wrapper(cls: type[LogicalPlanBuilder], *args, **kwargs):
instantiated_logical_plan_builder = classmethod_func(cls, *args, **kwargs)

# Parametrize the builder with the current DaftPlanningConfig
inner = instantiated_logical_plan_builder._builder
inner = inner.with_planning_config(get_context().daft_planning_config)

return cls(inner)

return wrapper


class LogicalPlanBuilder:
"""
A logical plan builder for the Daft DataFrame.
Expand Down Expand Up @@ -91,6 +112,7 @@ def optimize(self) -> LogicalPlanBuilder:
return LogicalPlanBuilder(builder)

@classmethod
@_apply_daft_planning_config_to_initializer
def from_in_memory_scan(
cls,
partition: PartitionCacheEntry,
Expand All @@ -110,6 +132,7 @@ def from_in_memory_scan(
return cls(builder)

@classmethod
@_apply_daft_planning_config_to_initializer
def from_tabular_scan(
cls,
*,
Expand Down
5 changes: 4 additions & 1 deletion daft/sql/sql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# isort: dont-add-import: from __future__ import annotations

from daft.api_annotations import PublicAPI
from daft.context import get_context
from daft.daft import PyCatalog as _PyCatalog
from daft.daft import sql as _sql
from daft.daft import sql_expr as _sql_expr
Expand Down Expand Up @@ -45,6 +46,8 @@ def sql(sql: str, catalog: SQLCatalog) -> DataFrame:
Returns:
DataFrame: Dataframe containing the results of the query
"""
planning_config = get_context().daft_planning_config

_py_catalog = catalog._catalog
_py_logical = _sql(sql, _py_catalog)
_py_logical = _sql(sql, _py_catalog, planning_config)
return DataFrame(LogicalPlanBuilder(_py_logical))
5 changes: 4 additions & 1 deletion src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub const BOLD_TABLE_HEADERS_IN_DISPLAY: &str = "DAFT_BOLD_TABLE_HEADERS";
/// 1. Creation of a Dataframe including any file listing and schema inference that needs to happen. Note
/// that this does not include the actual scan, which is taken care of by the DaftExecutionConfig.
/// 2. Building of logical plan nodes
#[derive(Clone, Serialize, Deserialize, Default)]
#[derive(Clone, Serialize, Deserialize, Default, Debug)]
pub struct DaftPlanningConfig {
pub default_io_config: IOConfig,
pub enable_actor_pool_projections: bool,
Expand Down Expand Up @@ -111,6 +111,9 @@ mod python;
#[cfg(feature = "python")]
pub use python::PyDaftExecutionConfig;

#[cfg(feature = "python")]
pub use python::PyDaftPlanningConfig;

#[cfg(feature = "python")]
use pyo3::prelude::*;

Expand Down
Loading

0 comments on commit 3cacd59

Please sign in to comment.