From 291fad44d86cd0869f62b81904c7b470dfd9f38f Mon Sep 17 00:00:00 2001 From: desmondcheongzx Date: Mon, 25 Nov 2024 15:32:18 -0800 Subject: [PATCH] Pass execution config directly on optimize --- daft/daft/__init__.pyi | 3 +- daft/dataframe/dataframe.py | 11 ++- daft/logical/builder.py | 7 +- daft/runners/native_runner.py | 4 +- daft/runners/pyrunner.py | 2 +- daft/runners/ray_runner.py | 2 +- src/daft-connect/src/op/execute/root.rs | 8 +- src/daft-logical-plan/src/builder.rs | 79 +++++-------------- src/daft-logical-plan/src/display.rs | 2 +- .../optimization/rules/materialize_scans.rs | 8 +- src/daft-sql/src/python.rs | 2 +- 11 files changed, 44 insertions(+), 84 deletions(-) diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 41d03da8b5..65f6439eb0 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1708,7 +1708,6 @@ class LogicalPlanBuilder: num_rows: int, ) -> LogicalPlanBuilder: ... def with_planning_config(self, daft_planning_config: PyDaftPlanningConfig) -> LogicalPlanBuilder: ... - def with_execution_config(self, daft_execution_config: PyDaftExecutionConfig) -> LogicalPlanBuilder: ... def select(self, to_select: list[PyExpr]) -> LogicalPlanBuilder: ... def with_columns(self, columns: list[PyExpr]) -> LogicalPlanBuilder: ... def exclude(self, to_exclude: list[str]) -> LogicalPlanBuilder: ... @@ -1793,7 +1792,7 @@ class LogicalPlanBuilder: kwargs: dict[str, Any] | None = None, ) -> LogicalPlanBuilder: ... def schema(self) -> PySchema: ... - def optimize(self) -> LogicalPlanBuilder: ... + def optimize(self, execution_config: PyDaftExecutionConfig | None = None) -> LogicalPlanBuilder: ... def to_physical_plan_scheduler(self, cfg: PyDaftExecutionConfig) -> PhysicalPlanScheduler: ... def to_adaptive_physical_plan_scheduler(self, cfg: PyDaftExecutionConfig) -> AdaptivePhysicalPlanScheduler: ... def repr_ascii(self, simple: bool) -> str: ... diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 31148e6896..1633a3ca6f 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -172,11 +172,12 @@ def explain( print_to_file("== Unoptimized Logical Plan ==\n") print_to_file(builder.pretty_print(simple, format=format)) if show_all: + execution_config = get_context().daft_execution_config print_to_file("\n== Optimized Logical Plan ==\n") - builder = builder.optimize() + builder = builder.optimize(execution_config) print_to_file(builder.pretty_print(simple)) print_to_file("\n== Physical Plan ==\n") - physical_plan_scheduler = builder.to_physical_plan_scheduler(get_context().daft_execution_config) + physical_plan_scheduler = builder.to_physical_plan_scheduler(execution_config) print_to_file(physical_plan_scheduler.pretty_print(simple, format=format)) else: print_to_file( @@ -187,7 +188,11 @@ def explain( def num_partitions(self) -> int: daft_execution_config = get_context().daft_execution_config # We need to run the optimizer since that could change the number of partitions - return self.__builder.optimize().to_physical_plan_scheduler(daft_execution_config).num_partitions() + return ( + self.__builder.optimize(daft_execution_config) + .to_physical_plan_scheduler(daft_execution_config) + .num_partitions() + ) @DataframePublicAPI def schema(self) -> Schema: diff --git a/daft/logical/builder.py b/daft/logical/builder.py index f986e5ff0e..cb6fc29dcb 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -40,10 +40,9 @@ def _apply_daft_planning_config_to_initializer(classmethod_func: Callable[..., L def wrapper(cls: type[LogicalPlanBuilder], *args, **kwargs): instantiated_logical_plan_builder = classmethod_func(cls, *args, **kwargs) - # Parametrize the builder with the current DaftPlanningConfig and DaftExecutionConfig + # Parametrize the builder with the current DaftPlanningConfig inner = instantiated_logical_plan_builder._builder inner = inner.with_planning_config(get_context().daft_planning_config) - inner = inner.with_execution_config(get_context().daft_execution_config) return cls(inner) @@ -110,11 +109,11 @@ def pretty_print(self, simple: bool = False, format: str = "ascii") -> str: def __repr__(self) -> str: return self._builder.repr_ascii(simple=False) - def optimize(self) -> LogicalPlanBuilder: + def optimize(self, execution_config: PyDaftExecutionConfig | None) -> LogicalPlanBuilder: """ Optimize the underlying logical plan. """ - builder = self._builder.optimize() + builder = self._builder.optimize(execution_config) return LogicalPlanBuilder(builder) @classmethod diff --git a/daft/runners/native_runner.py b/daft/runners/native_runner.py index c7e5ce8034..d296e1d5b0 100644 --- a/daft/runners/native_runner.py +++ b/daft/runners/native_runner.py @@ -74,7 +74,9 @@ def run_iter( daft_execution_config = get_context().daft_execution_config # Optimize the logical plan. - builder = builder.optimize() + # TODO(desmond): Currently we don't provide the execution config because this triggers + # scan task merging, but the native executor expects one source per scan task. + builder = builder.optimize(None) executor = NativeExecutor.from_logical_plan_builder(builder) results_gen = executor.run( {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}, diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 48d3a0e362..f378bcff67 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -345,7 +345,7 @@ def run_iter( execution_id = str(uuid.uuid4()) # Optimize the logical plan. - builder = builder.optimize() + builder = builder.optimize(daft_execution_config) if daft_execution_config.enable_aqe: adaptive_planner = builder.to_adaptive_physical_plan_scheduler(daft_execution_config) diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 82f648e44b..5bee0b9289 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -1286,7 +1286,7 @@ def run_iter( daft_execution_config = get_context().daft_execution_config # Optimize the logical plan. - builder = builder.optimize() + builder = builder.optimize(daft_execution_config) if daft_execution_config.enable_aqe: adaptive_planner = builder.to_adaptive_physical_plan_scheduler(daft_execution_config) diff --git a/src/daft-connect/src/op/execute/root.rs b/src/daft-connect/src/op/execute/root.rs index 1e1fac147b..dbe225bb37 100644 --- a/src/daft-connect/src/op/execute/root.rs +++ b/src/daft-connect/src/op/execute/root.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, future::ready}; +use std::{collections::HashMap, future::ready, sync::Arc}; use common_daft_config::DaftExecutionConfig; use daft_local_execution::NativeExecutor; @@ -32,11 +32,11 @@ impl Session { tokio::spawn(async move { let execution_fut = async { let plan = translation::to_logical_plan(command)?; - let optimized_plan = plan.optimize()?; - let cfg = DaftExecutionConfig::default(); + let cfg = Arc::new(DaftExecutionConfig::default()); + let optimized_plan = plan.optimize(Some(cfg.clone()))?; let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?; let mut result_stream = native_executor - .run(HashMap::new(), cfg.into(), None)? + .run(HashMap::new(), cfg, None)? .into_stream(); while let Some(result) = result_stream.next().await { diff --git a/src/daft-logical-plan/src/builder.rs b/src/daft-logical-plan/src/builder.rs index 8fd96f781f..b3f94cc966 100644 --- a/src/daft-logical-plan/src/builder.rs +++ b/src/daft-logical-plan/src/builder.rs @@ -44,21 +44,12 @@ use crate::{ pub struct LogicalPlanBuilder { // The current root of the logical plan in this builder. pub plan: Arc, - planning_config: Option>, - execution_config: Option>, + config: Option>, } impl LogicalPlanBuilder { - pub fn new( - plan: Arc, - planning_config: Option>, - execution_config: Option>, - ) -> Self { - Self { - plan, - planning_config, - execution_config, - } + pub fn new(plan: Arc, config: Option>) -> Self { + Self { plan, config } } } @@ -66,8 +57,7 @@ impl From<&Self> for LogicalPlanBuilder { fn from(builder: &Self) -> Self { Self { plan: builder.plan.clone(), - planning_config: builder.planning_config.clone(), - execution_config: builder.execution_config.clone(), + config: builder.config.clone(), } } } @@ -86,7 +76,7 @@ impl From<&LogicalPlanBuilder> for LogicalPlanRef { impl From for LogicalPlanBuilder { fn from(plan: LogicalPlanRef) -> Self { - Self::new(plan, None, None) + Self::new(plan, None) } } @@ -116,29 +106,12 @@ impl IntoGlobPath for Vec<&str> { impl LogicalPlanBuilder { /// Replace the LogicalPlanBuilder's plan with the provided plan pub fn with_new_plan>>(&self, plan: LP) -> Self { - Self::new( - plan.into(), - self.planning_config.clone(), - self.execution_config.clone(), - ) + Self::new(plan.into(), self.config.clone()) } /// Parametrize the LogicalPlanBuilder with a DaftPlanningConfig - pub fn with_planning_config(&self, planning_config: Arc) -> Self { - Self::new( - self.plan.clone(), - Some(planning_config), - self.execution_config.clone(), - ) - } - - /// Parametrize the LogicalPlanBuilder with a DaftExecutionConfig - pub fn with_execution_config(&self, execution_config: Arc) -> Self { - Self::new( - self.plan.clone(), - self.planning_config.clone(), - Some(execution_config), - ) + pub fn with_config(&self, config: Arc) -> Self { + Self::new(self.plan.clone(), Some(config)) } #[cfg(feature = "python")] @@ -616,17 +589,17 @@ impl LogicalPlanBuilder { Ok(self.with_new_plan(logical_plan)) } - pub fn optimize(&self) -> DaftResult { + pub fn optimize(&self, execution_config: Option>) -> DaftResult { let default_optimizer_config: OptimizerConfig = Default::default(); let optimizer_config = OptimizerConfig { enable_actor_pool_projections: self - .planning_config + .config .as_ref() .map(|planning_cfg| planning_cfg.enable_actor_pool_projections) .unwrap_or(default_optimizer_config.enable_actor_pool_projections), ..default_optimizer_config }; - let optimizer = Optimizer::new(optimizer_config, self.execution_config.clone()); + let optimizer = Optimizer::new(optimizer_config, execution_config); // Run LogicalPlan optimizations let unoptimized_plan = self.build(); @@ -652,11 +625,7 @@ impl LogicalPlanBuilder { }, )?; - let builder = Self::new( - optimized_plan, - self.planning_config.clone(), - self.execution_config.clone(), - ); + let builder = Self::new(optimized_plan, self.config.clone()); Ok(builder) } @@ -728,20 +697,7 @@ impl PyLogicalPlanBuilder { &self, daft_planning_config: PyDaftPlanningConfig, ) -> PyResult { - Ok(self - .builder - .with_planning_config(daft_planning_config.config) - .into()) - } - - pub fn with_execution_config( - &self, - daft_execution_config: PyDaftExecutionConfig, - ) -> PyResult { - Ok(self - .builder - .with_execution_config(daft_execution_config.config) - .into()) + Ok(self.builder.with_config(daft_planning_config.config).into()) } pub fn select(&self, to_select: Vec) -> PyResult { @@ -1001,8 +957,13 @@ impl PyLogicalPlanBuilder { } /// Optimize the underlying logical plan, returning a new plan builder containing the optimized plan. - pub fn optimize(&self, py: Python) -> PyResult { - py.allow_threads(|| Ok(self.builder.optimize()?.into())) + pub fn optimize( + &self, + py: Python, + execution_config: Option, + ) -> PyResult { + let execution_config = execution_config.map(|cfg| cfg.config); + py.allow_threads(|| Ok(self.builder.optimize(execution_config)?.into())) } pub fn repr_ascii(&self, simple: bool) -> PyResult { diff --git a/src/daft-logical-plan/src/display.rs b/src/daft-logical-plan/src/display.rs index b2340ef616..84db90d273 100644 --- a/src/daft-logical-plan/src/display.rs +++ b/src/daft-logical-plan/src/display.rs @@ -99,7 +99,7 @@ mod test { .sort(vec![col("last_name")], vec![false], vec![false])? .build(); - let plan = LogicalPlanBuilder::new(subplan, None, None) + let plan = LogicalPlanBuilder::from(subplan) .join( subplan2, vec![col("id")], diff --git a/src/daft-logical-plan/src/optimization/rules/materialize_scans.rs b/src/daft-logical-plan/src/optimization/rules/materialize_scans.rs index 272b5117b0..78e06597ab 100644 --- a/src/daft-logical-plan/src/optimization/rules/materialize_scans.rs +++ b/src/daft-logical-plan/src/optimization/rules/materialize_scans.rs @@ -35,15 +35,9 @@ impl MaterializeScans { SourceInfo::Physical(_) => { let source_plan = Arc::unwrap_or_clone(plan); if let LogicalPlan::Source(source) = source_plan { - // TODO(desmond): The local executor assumes that no execution config is passed in. Doing so breaks assumptions - // around scan task merging. Will need to dig deeper. - let execution_config = self - .execution_config - .as_deref() - .filter(|cfg| !cfg.enable_native_executor); Ok(Transformed::yes( source - .build_materialized_scan_source(execution_config) + .build_materialized_scan_source(self.execution_config.as_deref()) .into(), )) } else { diff --git a/src/daft-sql/src/python.rs b/src/daft-sql/src/python.rs index 73f7b4d0bd..69fc3426fc 100644 --- a/src/daft-sql/src/python.rs +++ b/src/daft-sql/src/python.rs @@ -38,7 +38,7 @@ pub fn sql( ) -> PyResult { let mut planner = SQLPlanner::new(catalog.catalog); let plan = planner.plan_sql(sql)?; - Ok(LogicalPlanBuilder::new(plan, Some(daft_planning_config.config), None).into()) + Ok(LogicalPlanBuilder::new(plan, Some(daft_planning_config.config)).into()) } #[pyfunction]