Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] IO Morsel Size #3004

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def set_execution_config(
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
default_morsel_size: int | None = None,
morsel_size: int | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values
are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`)
Expand Down Expand Up @@ -347,7 +347,7 @@ def set_execution_config(
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
enable_aqe: Enables Adaptive Query Execution, Defaults to False
enable_native_executor: Enables new local executor. Defaults to False
default_morsel_size: Default size of morsels used for the new local executor. Defaults to 131072 rows.
morsel_size: Size of morsels to be processed in parallel by the streaming local executor. Defaults to 131,072 rows.
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
ctx = get_context()
Expand All @@ -372,7 +372,7 @@ def set_execution_config(
read_sql_partition_size_bytes=read_sql_partition_size_bytes,
enable_aqe=enable_aqe,
enable_native_executor=enable_native_executor,
default_morsel_size=default_morsel_size,
morsel_size=morsel_size,
)

ctx._daft_execution_config = new_daft_execution_config
Expand Down
4 changes: 2 additions & 2 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1789,7 +1789,7 @@ class PyDaftExecutionConfig:
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
default_morsel_size: int | None = None,
morsel_size: int | None = None,
) -> PyDaftExecutionConfig: ...
@property
def scan_tasks_min_size_bytes(self) -> int: ...
Expand Down Expand Up @@ -1824,7 +1824,7 @@ class PyDaftExecutionConfig:
@property
def enable_native_executor(self) -> bool: ...
@property
def default_morsel_size(self) -> int: ...
def morsel_size(self) -> int: ...

class PyDaftPlanningConfig:
@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct DaftExecutionConfig {
pub read_sql_partition_size_bytes: usize,
pub enable_aqe: bool,
pub enable_native_executor: bool,
pub default_morsel_size: usize,
pub morsel_size: usize,
}

impl Default for DaftExecutionConfig {
Expand All @@ -79,7 +79,7 @@ impl Default for DaftExecutionConfig {
read_sql_partition_size_bytes: 512 * 1024 * 1024, // 512MB
enable_aqe: false,
enable_native_executor: false,
default_morsel_size: 128 * 1024,
morsel_size: 128 * 1024,
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
read_sql_partition_size_bytes: Option<usize>,
enable_aqe: Option<bool>,
enable_native_executor: Option<bool>,
default_morsel_size: Option<usize>,
morsel_size: Option<usize>,
) -> PyResult<Self> {
let mut config = self.config.as_ref().clone();

Expand Down Expand Up @@ -166,8 +166,8 @@
if let Some(enable_native_executor) = enable_native_executor {
config.enable_native_executor = enable_native_executor;
}
if let Some(default_morsel_size) = default_morsel_size {
config.default_morsel_size = default_morsel_size;
if let Some(morsel_size) = morsel_size {
config.morsel_size = morsel_size;

Check warning on line 170 in src/common/daft-config/src/python.rs

View check run for this annotation

Codecov / codecov/patch

src/common/daft-config/src/python.rs#L170

Added line #L170 was not covered by tests
}

Ok(Self {
Expand Down Expand Up @@ -253,8 +253,8 @@
Ok(self.config.enable_native_executor)
}
#[getter]
fn default_morsel_size(&self) -> PyResult<usize> {
Ok(self.config.default_morsel_size)
fn morsel_size(&self) -> PyResult<usize> {
Ok(self.config.morsel_size)

Check warning on line 257 in src/common/daft-config/src/python.rs

View check run for this annotation

Codecov / codecov/patch

src/common/daft-config/src/python.rs#L256-L257

Added lines #L256 - L257 were not covered by tests
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/daft-dsl/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,10 @@ pub fn has_agg(expr: &ExprRef) -> bool {
expr.exists(|e| matches!(e.as_ref(), Expr::Agg(_)))
}

pub fn is_io_bound(expr: &ExprRef) -> bool {
expr.exists(|e| matches!(e.as_ref(), Expr::ScalarFunction(func) if func.is_io_bound()))
}

pub fn has_stateful_udf(expr: &ExprRef) -> bool {
expr.exists(|e| {
matches!(
Expand Down
9 changes: 8 additions & 1 deletion src/daft-dsl/src/functions/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use common_error::DaftResult;
use daft_core::prelude::*;
use serde::{Deserialize, Serialize};

use crate::{Expr, ExprRef};
use crate::{is_io_bound, Expr, ExprRef};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScalarFunction {
Expand All @@ -31,6 +31,10 @@ impl ScalarFunction {
pub fn to_field(&self, schema: &Schema) -> DaftResult<Field> {
self.udf.to_field(&self.inputs, schema)
}

pub fn is_io_bound(&self) -> bool {
self.udf.is_io_bound() || self.inputs.iter().any(is_io_bound)
}
}
impl From<ScalarFunction> for ExprRef {
fn from(func: ScalarFunction) -> Self {
Expand All @@ -44,6 +48,9 @@ pub trait ScalarUDF: Send + Sync + std::fmt::Debug {
fn name(&self) -> &'static str;
fn evaluate(&self, inputs: &[Series]) -> DaftResult<Series>;
fn to_field(&self, inputs: &[ExprRef], schema: &Schema) -> DaftResult<Field>;
fn is_io_bound(&self) -> bool {
false
}
}

pub fn scalar_function_semantic_id(func: &ScalarFunction, schema: &Schema) -> FieldID {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-dsl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod resolve_expr;
mod treenode;
pub use common_treenode;
pub use expr::{
binary_op, col, has_agg, has_stateful_udf, is_partition_compatible, AggExpr,
binary_op, col, has_agg, has_stateful_udf, is_io_bound, is_partition_compatible, AggExpr,
ApproxPercentileParams, Expr, ExprRef, Operator, SketchType,
};
pub use lit::{lit, literal_value, literals_to_series, null_lit, Literal, LiteralValue};
Expand Down
4 changes: 4 additions & 0 deletions src/daft-functions/src/uri/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ impl ScalarUDF for DownloadFunction {
))),
}
}

fn is_io_bound(&self) -> bool {
true
}
}

fn url_download(
Expand Down
4 changes: 4 additions & 0 deletions src/daft-functions/src/uri/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ impl ScalarUDF for UploadFunction {
))),
}
}

fn is_io_bound(&self) -> bool {
true
}
}

/// Uploads data from a Binary/FixedSizeBinary/Utf8 Series to the provided folder_path
Expand Down
18 changes: 13 additions & 5 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use common_daft_config::DaftExecutionConfig;
use common_display::tree::TreeDisplay;
use common_error::DaftResult;
use daft_micropartition::MicroPartition;
Expand Down Expand Up @@ -34,6 +35,9 @@
fn make_state(&self) -> Option<Box<dyn IntermediateOperatorState>> {
None
}
fn morsel_size(&self) -> usize {
DaftExecutionConfig::default().morsel_size
}
}

pub struct IntermediateNode {
Expand Down Expand Up @@ -207,12 +211,16 @@

let worker_senders =
self.spawn_workers(*NUM_CPUS, &mut destination_channel, runtime_handle);

// Use the intermediate_op's morsel size unless the default has been overridden
let morsel_size =
if runtime_handle.morsel_size() != DaftExecutionConfig::default().morsel_size {
runtime_handle.morsel_size()

Check warning on line 218 in src/daft-local-execution/src/intermediate_ops/intermediate_op.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/intermediate_ops/intermediate_op.rs#L218

Added line #L218 was not covered by tests
} else {
self.intermediate_op.morsel_size()
};
runtime_handle.spawn(
Self::send_to_workers(
child_result_receivers,
worker_senders,
runtime_handle.default_morsel_size(),
),
Self::send_to_workers(child_result_receivers, worker_senders, morsel_size),
self.intermediate_op.name(),
);
Ok(destination_channel)
Expand Down
12 changes: 11 additions & 1 deletion src/daft-local-execution/src/intermediate_ops/project.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
use daft_dsl::ExprRef;
use daft_dsl::{is_io_bound, ExprRef};
use tracing::instrument;

use super::intermediate_op::{
Expand Down Expand Up @@ -36,4 +37,13 @@ impl IntermediateOperator for ProjectOperator {
fn name(&self) -> &'static str {
"ProjectOperator"
}

fn morsel_size(&self) -> usize {
for expr in &self.projection {
if is_io_bound(expr) {
return 10;
}
}
DaftExecutionConfig::default().morsel_size
}
}
10 changes: 5 additions & 5 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ lazy_static! {

pub struct ExecutionRuntimeHandle {
worker_set: tokio::task::JoinSet<crate::Result<()>>,
default_morsel_size: usize,
morsel_size: usize,
}

impl ExecutionRuntimeHandle {
#[must_use]
pub fn new(default_morsel_size: usize) -> Self {
pub fn new(morsel_size: usize) -> Self {
Self {
worker_set: tokio::task::JoinSet::new(),
default_morsel_size,
morsel_size,
}
}
pub fn spawn(
Expand All @@ -46,8 +46,8 @@ impl ExecutionRuntimeHandle {
}

#[must_use]
pub fn default_morsel_size(&self) -> usize {
self.default_morsel_size
pub fn morsel_size(&self) -> usize {
self.morsel_size
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/daft-local-execution/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub fn run_local(
.build()
.expect("Failed to create tokio runtime");
runtime.block_on(async {
let mut runtime_handle = ExecutionRuntimeHandle::new(cfg.default_morsel_size);
let mut runtime_handle = ExecutionRuntimeHandle::new(cfg.morsel_size);
let mut receiver = pipeline.start(true, &mut runtime_handle)?.get_receiver();
while let Some(val) = receiver.recv().await {
let _ = tx.send(val.as_data().clone()).await;
Expand Down
Loading