diff --git a/daft/context.py b/daft/context.py index caf74ef4d6..9cacb94dee 100644 --- a/daft/context.py +++ b/daft/context.py @@ -311,7 +311,7 @@ def set_execution_config( read_sql_partition_size_bytes: int | None = None, enable_aqe: bool | None = None, enable_native_executor: bool | None = None, - default_morsel_size: int | None = None, + morsel_size: int | None = None, ) -> DaftContext: """Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`) @@ -347,7 +347,7 @@ def set_execution_config( read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB enable_aqe: Enables Adaptive Query Execution, Defaults to False enable_native_executor: Enables new local executor. Defaults to False - default_morsel_size: Default size of morsels used for the new local executor. Defaults to 131072 rows. + morsel_size: Size of morsels to be processed in parallel by the streaming local executor. Defaults to 131,072 rows. """ # Replace values in the DaftExecutionConfig with user-specified overrides ctx = get_context() @@ -372,7 +372,7 @@ def set_execution_config( read_sql_partition_size_bytes=read_sql_partition_size_bytes, enable_aqe=enable_aqe, enable_native_executor=enable_native_executor, - default_morsel_size=default_morsel_size, + morsel_size=morsel_size, ) ctx._daft_execution_config = new_daft_execution_config diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 47e980e907..7351495100 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1789,7 +1789,7 @@ class PyDaftExecutionConfig: read_sql_partition_size_bytes: int | None = None, enable_aqe: bool | None = None, enable_native_executor: bool | None = None, - default_morsel_size: int | None = None, + morsel_size: int | None = None, ) -> PyDaftExecutionConfig: ... @property def scan_tasks_min_size_bytes(self) -> int: ... @@ -1824,7 +1824,7 @@ class PyDaftExecutionConfig: @property def enable_native_executor(self) -> bool: ... @property - def default_morsel_size(self) -> int: ... + def morsel_size(self) -> int: ... class PyDaftPlanningConfig: @staticmethod diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 2202b20d39..4492052c02 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -56,7 +56,7 @@ pub struct DaftExecutionConfig { pub read_sql_partition_size_bytes: usize, pub enable_aqe: bool, pub enable_native_executor: bool, - pub default_morsel_size: usize, + pub morsel_size: usize, } impl Default for DaftExecutionConfig { @@ -79,7 +79,7 @@ impl Default for DaftExecutionConfig { read_sql_partition_size_bytes: 512 * 1024 * 1024, // 512MB enable_aqe: false, enable_native_executor: false, - default_morsel_size: 128 * 1024, + morsel_size: 128 * 1024, } } } diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 27663f4841..c833a53a4b 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -106,7 +106,7 @@ impl PyDaftExecutionConfig { read_sql_partition_size_bytes: Option, enable_aqe: Option, enable_native_executor: Option, - default_morsel_size: Option, + morsel_size: Option, ) -> PyResult { let mut config = self.config.as_ref().clone(); @@ -166,8 +166,8 @@ impl PyDaftExecutionConfig { if let Some(enable_native_executor) = enable_native_executor { config.enable_native_executor = enable_native_executor; } - if let Some(default_morsel_size) = default_morsel_size { - config.default_morsel_size = default_morsel_size; + if let Some(morsel_size) = morsel_size { + config.morsel_size = morsel_size; } Ok(Self { @@ -253,8 +253,8 @@ impl PyDaftExecutionConfig { Ok(self.config.enable_native_executor) } #[getter] - fn default_morsel_size(&self) -> PyResult { - Ok(self.config.default_morsel_size) + fn morsel_size(&self) -> PyResult { + Ok(self.config.morsel_size) } } diff --git a/src/daft-dsl/src/expr/mod.rs b/src/daft-dsl/src/expr/mod.rs index 873f9013bd..87a06591f0 100644 --- a/src/daft-dsl/src/expr/mod.rs +++ b/src/daft-dsl/src/expr/mod.rs @@ -1104,6 +1104,10 @@ pub fn has_agg(expr: &ExprRef) -> bool { expr.exists(|e| matches!(e.as_ref(), Expr::Agg(_))) } +pub fn is_io_bound(expr: &ExprRef) -> bool { + expr.exists(|e| matches!(e.as_ref(), Expr::ScalarFunction(func) if func.is_io_bound())) +} + pub fn has_stateful_udf(expr: &ExprRef) -> bool { expr.exists(|e| { matches!( diff --git a/src/daft-dsl/src/functions/scalar.rs b/src/daft-dsl/src/functions/scalar.rs index 7e610ba6eb..056c9443d1 100644 --- a/src/daft-dsl/src/functions/scalar.rs +++ b/src/daft-dsl/src/functions/scalar.rs @@ -8,7 +8,7 @@ use common_error::DaftResult; use daft_core::prelude::*; use serde::{Deserialize, Serialize}; -use crate::{Expr, ExprRef}; +use crate::{is_io_bound, Expr, ExprRef}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ScalarFunction { @@ -31,6 +31,10 @@ impl ScalarFunction { pub fn to_field(&self, schema: &Schema) -> DaftResult { self.udf.to_field(&self.inputs, schema) } + + pub fn is_io_bound(&self) -> bool { + self.udf.is_io_bound() || self.inputs.iter().any(is_io_bound) + } } impl From for ExprRef { fn from(func: ScalarFunction) -> Self { @@ -44,6 +48,9 @@ pub trait ScalarUDF: Send + Sync + std::fmt::Debug { fn name(&self) -> &'static str; fn evaluate(&self, inputs: &[Series]) -> DaftResult; fn to_field(&self, inputs: &[ExprRef], schema: &Schema) -> DaftResult; + fn is_io_bound(&self) -> bool { + false + } } pub fn scalar_function_semantic_id(func: &ScalarFunction, schema: &Schema) -> FieldID { diff --git a/src/daft-dsl/src/lib.rs b/src/daft-dsl/src/lib.rs index c3f5d68594..f08f79ea2c 100644 --- a/src/daft-dsl/src/lib.rs +++ b/src/daft-dsl/src/lib.rs @@ -15,7 +15,7 @@ mod resolve_expr; mod treenode; pub use common_treenode; pub use expr::{ - binary_op, col, has_agg, has_stateful_udf, is_partition_compatible, AggExpr, + binary_op, col, has_agg, has_stateful_udf, is_io_bound, is_partition_compatible, AggExpr, ApproxPercentileParams, Expr, ExprRef, Operator, SketchType, }; pub use lit::{lit, literal_value, literals_to_series, null_lit, Literal, LiteralValue}; diff --git a/src/daft-functions/src/uri/download.rs b/src/daft-functions/src/uri/download.rs index 15ebc2f9fc..1296c62397 100644 --- a/src/daft-functions/src/uri/download.rs +++ b/src/daft-functions/src/uri/download.rs @@ -80,6 +80,10 @@ impl ScalarUDF for DownloadFunction { ))), } } + + fn is_io_bound(&self) -> bool { + true + } } fn url_download( diff --git a/src/daft-functions/src/uri/upload.rs b/src/daft-functions/src/uri/upload.rs index 4ab677614c..79d0b330c7 100644 --- a/src/daft-functions/src/uri/upload.rs +++ b/src/daft-functions/src/uri/upload.rs @@ -64,6 +64,10 @@ impl ScalarUDF for UploadFunction { ))), } } + + fn is_io_bound(&self) -> bool { + true + } } /// Uploads data from a Binary/FixedSizeBinary/Utf8 Series to the provided folder_path diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index 7b0267c7c0..ce9fb1e5e5 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use common_daft_config::DaftExecutionConfig; use common_display::tree::TreeDisplay; use common_error::DaftResult; use daft_micropartition::MicroPartition; @@ -34,6 +35,9 @@ pub trait IntermediateOperator: Send + Sync { fn make_state(&self) -> Option> { None } + fn morsel_size(&self) -> usize { + DaftExecutionConfig::default().morsel_size + } } pub struct IntermediateNode { @@ -207,12 +211,16 @@ impl PipelineNode for IntermediateNode { let worker_senders = self.spawn_workers(*NUM_CPUS, &mut destination_channel, runtime_handle); + + // Use the intermediate_op's morsel size unless the default has been overridden + let morsel_size = + if runtime_handle.morsel_size() != DaftExecutionConfig::default().morsel_size { + runtime_handle.morsel_size() + } else { + self.intermediate_op.morsel_size() + }; runtime_handle.spawn( - Self::send_to_workers( - child_result_receivers, - worker_senders, - runtime_handle.default_morsel_size(), - ), + Self::send_to_workers(child_result_receivers, worker_senders, morsel_size), self.intermediate_op.name(), ); Ok(destination_channel) diff --git a/src/daft-local-execution/src/intermediate_ops/project.rs b/src/daft-local-execution/src/intermediate_ops/project.rs index abd37b461f..7e029f3f92 100644 --- a/src/daft-local-execution/src/intermediate_ops/project.rs +++ b/src/daft-local-execution/src/intermediate_ops/project.rs @@ -1,7 +1,8 @@ use std::sync::Arc; +use common_daft_config::DaftExecutionConfig; use common_error::DaftResult; -use daft_dsl::ExprRef; +use daft_dsl::{is_io_bound, ExprRef}; use tracing::instrument; use super::intermediate_op::{ @@ -36,4 +37,13 @@ impl IntermediateOperator for ProjectOperator { fn name(&self) -> &'static str { "ProjectOperator" } + + fn morsel_size(&self) -> usize { + for expr in &self.projection { + if is_io_bound(expr) { + return 10; + } + } + DaftExecutionConfig::default().morsel_size + } } diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index b7809b4126..22e2ec7643 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -16,15 +16,15 @@ lazy_static! { pub struct ExecutionRuntimeHandle { worker_set: tokio::task::JoinSet>, - default_morsel_size: usize, + morsel_size: usize, } impl ExecutionRuntimeHandle { #[must_use] - pub fn new(default_morsel_size: usize) -> Self { + pub fn new(morsel_size: usize) -> Self { Self { worker_set: tokio::task::JoinSet::new(), - default_morsel_size, + morsel_size, } } pub fn spawn( @@ -46,8 +46,8 @@ impl ExecutionRuntimeHandle { } #[must_use] - pub fn default_morsel_size(&self) -> usize { - self.default_morsel_size + pub fn morsel_size(&self) -> usize { + self.morsel_size } } diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index a89d06b2f8..abd32613f4 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -135,7 +135,7 @@ pub fn run_local( .build() .expect("Failed to create tokio runtime"); runtime.block_on(async { - let mut runtime_handle = ExecutionRuntimeHandle::new(cfg.default_morsel_size); + let mut runtime_handle = ExecutionRuntimeHandle::new(cfg.morsel_size); let mut receiver = pipeline.start(true, &mut runtime_handle)?.get_receiver(); while let Some(val) = receiver.recv().await { let _ = tx.send(val.as_data().clone()).await;