Skip to content

Commit

Permalink
Pass execution config directly on optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
desmondcheongzx committed Nov 25, 2024
1 parent 5b7829b commit 291fad4
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 84 deletions.
3 changes: 1 addition & 2 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,6 @@ class LogicalPlanBuilder:
num_rows: int,
) -> LogicalPlanBuilder: ...
def with_planning_config(self, daft_planning_config: PyDaftPlanningConfig) -> LogicalPlanBuilder: ...
def with_execution_config(self, daft_execution_config: PyDaftExecutionConfig) -> LogicalPlanBuilder: ...
def select(self, to_select: list[PyExpr]) -> LogicalPlanBuilder: ...
def with_columns(self, columns: list[PyExpr]) -> LogicalPlanBuilder: ...
def exclude(self, to_exclude: list[str]) -> LogicalPlanBuilder: ...
Expand Down Expand Up @@ -1793,7 +1792,7 @@ class LogicalPlanBuilder:
kwargs: dict[str, Any] | None = None,
) -> LogicalPlanBuilder: ...
def schema(self) -> PySchema: ...
def optimize(self) -> LogicalPlanBuilder: ...
def optimize(self, execution_config: PyDaftExecutionConfig | None = None) -> LogicalPlanBuilder: ...
def to_physical_plan_scheduler(self, cfg: PyDaftExecutionConfig) -> PhysicalPlanScheduler: ...
def to_adaptive_physical_plan_scheduler(self, cfg: PyDaftExecutionConfig) -> AdaptivePhysicalPlanScheduler: ...
def repr_ascii(self, simple: bool) -> str: ...
Expand Down
11 changes: 8 additions & 3 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,12 @@ def explain(
print_to_file("== Unoptimized Logical Plan ==\n")
print_to_file(builder.pretty_print(simple, format=format))
if show_all:
execution_config = get_context().daft_execution_config
print_to_file("\n== Optimized Logical Plan ==\n")
builder = builder.optimize()
builder = builder.optimize(execution_config)
print_to_file(builder.pretty_print(simple))
print_to_file("\n== Physical Plan ==\n")
physical_plan_scheduler = builder.to_physical_plan_scheduler(get_context().daft_execution_config)
physical_plan_scheduler = builder.to_physical_plan_scheduler(execution_config)
print_to_file(physical_plan_scheduler.pretty_print(simple, format=format))
else:
print_to_file(
Expand All @@ -187,7 +188,11 @@ def explain(
def num_partitions(self) -> int:
daft_execution_config = get_context().daft_execution_config
# We need to run the optimizer since that could change the number of partitions
return self.__builder.optimize().to_physical_plan_scheduler(daft_execution_config).num_partitions()
return (
self.__builder.optimize(daft_execution_config)
.to_physical_plan_scheduler(daft_execution_config)
.num_partitions()
)

@DataframePublicAPI
def schema(self) -> Schema:
Expand Down
7 changes: 3 additions & 4 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ def _apply_daft_planning_config_to_initializer(classmethod_func: Callable[..., L
def wrapper(cls: type[LogicalPlanBuilder], *args, **kwargs):
instantiated_logical_plan_builder = classmethod_func(cls, *args, **kwargs)

# Parametrize the builder with the current DaftPlanningConfig and DaftExecutionConfig
# Parametrize the builder with the current DaftPlanningConfig
inner = instantiated_logical_plan_builder._builder
inner = inner.with_planning_config(get_context().daft_planning_config)
inner = inner.with_execution_config(get_context().daft_execution_config)

return cls(inner)

Expand Down Expand Up @@ -110,11 +109,11 @@ def pretty_print(self, simple: bool = False, format: str = "ascii") -> str:
def __repr__(self) -> str:
return self._builder.repr_ascii(simple=False)

def optimize(self) -> LogicalPlanBuilder:
def optimize(self, execution_config: PyDaftExecutionConfig | None) -> LogicalPlanBuilder:
"""
Optimize the underlying logical plan.
"""
builder = self._builder.optimize()
builder = self._builder.optimize(execution_config)
return LogicalPlanBuilder(builder)

@classmethod
Expand Down
4 changes: 3 additions & 1 deletion daft/runners/native_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ def run_iter(
daft_execution_config = get_context().daft_execution_config

# Optimize the logical plan.
builder = builder.optimize()
# TODO(desmond): Currently we don't provide the execution config because this triggers
# scan task merging, but the native executor expects one source per scan task.
builder = builder.optimize(None)
executor = NativeExecutor.from_logical_plan_builder(builder)
results_gen = executor.run(
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()},
Expand Down
2 changes: 1 addition & 1 deletion daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def run_iter(
execution_id = str(uuid.uuid4())

# Optimize the logical plan.
builder = builder.optimize()
builder = builder.optimize(daft_execution_config)

if daft_execution_config.enable_aqe:
adaptive_planner = builder.to_adaptive_physical_plan_scheduler(daft_execution_config)
Expand Down
2 changes: 1 addition & 1 deletion daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@ def run_iter(
daft_execution_config = get_context().daft_execution_config

# Optimize the logical plan.
builder = builder.optimize()
builder = builder.optimize(daft_execution_config)

if daft_execution_config.enable_aqe:
adaptive_planner = builder.to_adaptive_physical_plan_scheduler(daft_execution_config)
Expand Down
8 changes: 4 additions & 4 deletions src/daft-connect/src/op/execute/root.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, future::ready};
use std::{collections::HashMap, future::ready, sync::Arc};

use common_daft_config::DaftExecutionConfig;
use daft_local_execution::NativeExecutor;
Expand Down Expand Up @@ -32,11 +32,11 @@ impl Session {
tokio::spawn(async move {
let execution_fut = async {
let plan = translation::to_logical_plan(command)?;
let optimized_plan = plan.optimize()?;
let cfg = DaftExecutionConfig::default();
let cfg = Arc::new(DaftExecutionConfig::default());
let optimized_plan = plan.optimize(Some(cfg.clone()))?;
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;
let mut result_stream = native_executor
.run(HashMap::new(), cfg.into(), None)?
.run(HashMap::new(), cfg, None)?
.into_stream();

while let Some(result) = result_stream.next().await {
Expand Down
79 changes: 20 additions & 59 deletions src/daft-logical-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,20 @@ use crate::{
pub struct LogicalPlanBuilder {
// The current root of the logical plan in this builder.
pub plan: Arc<LogicalPlan>,
planning_config: Option<Arc<DaftPlanningConfig>>,
execution_config: Option<Arc<DaftExecutionConfig>>,
config: Option<Arc<DaftPlanningConfig>>,
}

impl LogicalPlanBuilder {
pub fn new(
plan: Arc<LogicalPlan>,
planning_config: Option<Arc<DaftPlanningConfig>>,
execution_config: Option<Arc<DaftExecutionConfig>>,
) -> Self {
Self {
plan,
planning_config,
execution_config,
}
pub fn new(plan: Arc<LogicalPlan>, config: Option<Arc<DaftPlanningConfig>>) -> Self {
Self { plan, config }
}
}

impl From<&Self> for LogicalPlanBuilder {
fn from(builder: &Self) -> Self {
Self {
plan: builder.plan.clone(),
planning_config: builder.planning_config.clone(),
execution_config: builder.execution_config.clone(),
config: builder.config.clone(),
}
}
}
Expand All @@ -86,7 +76,7 @@ impl From<&LogicalPlanBuilder> for LogicalPlanRef {

impl From<LogicalPlanRef> for LogicalPlanBuilder {
fn from(plan: LogicalPlanRef) -> Self {
Self::new(plan, None, None)
Self::new(plan, None)
}
}

Expand Down Expand Up @@ -116,29 +106,12 @@ impl IntoGlobPath for Vec<&str> {
impl LogicalPlanBuilder {
/// Replace the LogicalPlanBuilder's plan with the provided plan
pub fn with_new_plan<LP: Into<Arc<LogicalPlan>>>(&self, plan: LP) -> Self {
Self::new(
plan.into(),
self.planning_config.clone(),
self.execution_config.clone(),
)
Self::new(plan.into(), self.config.clone())
}

/// Parametrize the LogicalPlanBuilder with a DaftPlanningConfig
pub fn with_planning_config(&self, planning_config: Arc<DaftPlanningConfig>) -> Self {
Self::new(
self.plan.clone(),
Some(planning_config),
self.execution_config.clone(),
)
}

/// Parametrize the LogicalPlanBuilder with a DaftExecutionConfig
pub fn with_execution_config(&self, execution_config: Arc<DaftExecutionConfig>) -> Self {
Self::new(
self.plan.clone(),
self.planning_config.clone(),
Some(execution_config),
)
pub fn with_config(&self, config: Arc<DaftPlanningConfig>) -> Self {
Self::new(self.plan.clone(), Some(config))
}

#[cfg(feature = "python")]
Expand Down Expand Up @@ -616,17 +589,17 @@ impl LogicalPlanBuilder {
Ok(self.with_new_plan(logical_plan))
}

pub fn optimize(&self) -> DaftResult<Self> {
pub fn optimize(&self, execution_config: Option<Arc<DaftExecutionConfig>>) -> DaftResult<Self> {
let default_optimizer_config: OptimizerConfig = Default::default();
let optimizer_config = OptimizerConfig {
enable_actor_pool_projections: self
.planning_config
.config
.as_ref()
.map(|planning_cfg| planning_cfg.enable_actor_pool_projections)
.unwrap_or(default_optimizer_config.enable_actor_pool_projections),
..default_optimizer_config
};
let optimizer = Optimizer::new(optimizer_config, self.execution_config.clone());
let optimizer = Optimizer::new(optimizer_config, execution_config);

// Run LogicalPlan optimizations
let unoptimized_plan = self.build();
Expand All @@ -652,11 +625,7 @@ impl LogicalPlanBuilder {
},
)?;

let builder = Self::new(
optimized_plan,
self.planning_config.clone(),
self.execution_config.clone(),
);
let builder = Self::new(optimized_plan, self.config.clone());
Ok(builder)
}

Expand Down Expand Up @@ -728,20 +697,7 @@ impl PyLogicalPlanBuilder {
&self,
daft_planning_config: PyDaftPlanningConfig,
) -> PyResult<Self> {
Ok(self
.builder
.with_planning_config(daft_planning_config.config)
.into())
}

pub fn with_execution_config(
&self,
daft_execution_config: PyDaftExecutionConfig,
) -> PyResult<Self> {
Ok(self
.builder
.with_execution_config(daft_execution_config.config)
.into())
Ok(self.builder.with_config(daft_planning_config.config).into())
}

pub fn select(&self, to_select: Vec<PyExpr>) -> PyResult<Self> {
Expand Down Expand Up @@ -1001,8 +957,13 @@ impl PyLogicalPlanBuilder {
}

/// Optimize the underlying logical plan, returning a new plan builder containing the optimized plan.
pub fn optimize(&self, py: Python) -> PyResult<Self> {
py.allow_threads(|| Ok(self.builder.optimize()?.into()))
pub fn optimize(
&self,
py: Python,
execution_config: Option<PyDaftExecutionConfig>,
) -> PyResult<Self> {
let execution_config = execution_config.map(|cfg| cfg.config);
py.allow_threads(|| Ok(self.builder.optimize(execution_config)?.into()))
}

pub fn repr_ascii(&self, simple: bool) -> PyResult<String> {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-logical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ mod test {
.sort(vec![col("last_name")], vec![false], vec![false])?
.build();

let plan = LogicalPlanBuilder::new(subplan, None, None)
let plan = LogicalPlanBuilder::from(subplan)
.join(
subplan2,
vec![col("id")],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,9 @@ impl MaterializeScans {
SourceInfo::Physical(_) => {
let source_plan = Arc::unwrap_or_clone(plan);
if let LogicalPlan::Source(source) = source_plan {
// TODO(desmond): The local executor assumes that no execution config is passed in. Doing so breaks assumptions
// around scan task merging. Will need to dig deeper.
let execution_config = self
.execution_config
.as_deref()
.filter(|cfg| !cfg.enable_native_executor);
Ok(Transformed::yes(
source
.build_materialized_scan_source(execution_config)
.build_materialized_scan_source(self.execution_config.as_deref())
.into(),
))
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-sql/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn sql(
) -> PyResult<PyLogicalPlanBuilder> {
let mut planner = SQLPlanner::new(catalog.catalog);
let plan = planner.plan_sql(sql)?;
Ok(LogicalPlanBuilder::new(plan, Some(daft_planning_config.config), None).into())
Ok(LogicalPlanBuilder::new(plan, Some(daft_planning_config.config)).into())
}

#[pyfunction]
Expand Down

0 comments on commit 291fad4

Please sign in to comment.