Skip to content

Commit

Permalink
[FEAT] Respect resource request for projections in swordfish (#3460)
Browse files Browse the repository at this point in the history
Make swordfish respect resource requests (CPU only for now) on
projections.

---------

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Dec 5, 2024
1 parent 56f5089 commit faf050c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 16 deletions.
12 changes: 12 additions & 0 deletions src/common/resource-request/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ impl ResourceRequest {
self.memory_bytes.map(|x| x * (factor as usize)),
)
}

pub fn num_cpus(&self) -> Option<f64> {
self.num_cpus
}

pub fn num_gpus(&self) -> Option<f64> {
self.num_gpus
}

pub fn memory_bytes(&self) -> Option<usize> {
self.memory_bytes
}
}

impl Add for &ResourceRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ impl IntermediateOperator for ActorPoolProjectOperator {
}))
}

fn max_concurrency(&self) -> usize {
self.concurrency
fn max_concurrency(&self) -> DaftResult<usize> {
Ok(self.concurrency)
}

fn dispatch_spawner(
Expand Down
12 changes: 8 additions & 4 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use common_display::tree::TreeDisplay;
use common_error::DaftResult;
use common_runtime::{get_compute_runtime, RuntimeRef};
use daft_micropartition::MicroPartition;
use snafu::ResultExt;
use tracing::{info_span, instrument};

use crate::{
Expand All @@ -14,7 +15,7 @@ use crate::{
dispatcher::{DispatchSpawner, RoundRobinDispatcher, UnorderedDispatcher},
pipeline::PipelineNode,
runtime_stats::{CountingReceiver, CountingSender, RuntimeStatsContext},
ExecutionRuntimeContext, OperatorOutput, NUM_CPUS,
ExecutionRuntimeContext, OperatorOutput, PipelineExecutionSnafu, NUM_CPUS,
};

pub(crate) trait IntermediateOpState: Send + Sync {
Expand Down Expand Up @@ -49,8 +50,9 @@ pub trait IntermediateOperator: Send + Sync {
}
/// The maximum number of concurrent workers that can be spawned for this operator.
/// Each worker will has its own IntermediateOperatorState.
fn max_concurrency(&self) -> usize {
*NUM_CPUS
/// This method should be overridden if the operator needs to limit the number of concurrent workers, i.e. UDFs with resource requests.
fn max_concurrency(&self) -> DaftResult<usize> {
Ok(*NUM_CPUS)
}

fn dispatch_spawner(
Expand Down Expand Up @@ -208,7 +210,9 @@ impl PipelineNode for IntermediateNode {
));
}
let op = self.intermediate_op.clone();
let num_workers = op.max_concurrency();
let num_workers = op.max_concurrency().context(PipelineExecutionSnafu {
node_name: self.name(),
})?;
let (destination_sender, destination_receiver) = create_channel(1);
let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone());

Expand Down
28 changes: 27 additions & 1 deletion src/daft-local-execution/src/intermediate_ops/project.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::sync::Arc;

use common_error::{DaftError, DaftResult};
use common_runtime::RuntimeRef;
use daft_dsl::ExprRef;
use daft_dsl::{functions::python::get_resource_request, ExprRef};
use daft_micropartition::MicroPartition;
use tracing::instrument;

use super::intermediate_op::{
IntermediateOpExecuteResult, IntermediateOpState, IntermediateOperator,
IntermediateOperatorResult,
};
use crate::NUM_CPUS;

pub struct ProjectOperator {
projection: Arc<Vec<ExprRef>>,
Expand Down Expand Up @@ -45,4 +47,28 @@ impl IntermediateOperator for ProjectOperator {
fn name(&self) -> &'static str {
"ProjectOperator"
}

fn max_concurrency(&self) -> DaftResult<usize> {
let resource_request = get_resource_request(&self.projection);
match resource_request {
// If the resource request specifies a number of CPUs, the max concurrency is the number of CPUs
// divided by the requested number of CPUs, clamped to (1, NUM_CPUS).
// E.g. if the resource request specifies 2 CPUs and NUM_CPUS is 4, the max concurrency is 2.
Some(resource_request) if resource_request.num_cpus().is_some() => {
let requested_num_cpus = resource_request.num_cpus().unwrap();
if requested_num_cpus > *NUM_CPUS as f64 {
Err(DaftError::ValueError(format!(
"Requested {} CPUs but found only {} available",
requested_num_cpus, *NUM_CPUS
)))
} else {
Ok(
(*NUM_CPUS as f64 / requested_num_cpus).clamp(1.0, *NUM_CPUS as f64)
as usize,
)
}
}
_ => Ok(*NUM_CPUS),
}
}
}
14 changes: 5 additions & 9 deletions tests/test_resource_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
from daft.internal.gpu import cuda_visible_devices
from tests.conftest import get_tests_daft_runner_name

pytestmark = pytest.mark.skipif(
get_tests_daft_runner_name() == "native",
reason="Native runner does not support resource requests",
)


def no_gpu_available() -> bool:
return len(cuda_visible_devices()) == 0
Expand Down Expand Up @@ -81,18 +76,19 @@ def test_resource_request_pickle_roundtrip():
###


@pytest.mark.skipif(get_tests_daft_runner_name() not in {"py"}, reason="requires PyRunner to be in use")
@pytest.mark.skipif(
get_tests_daft_runner_name() not in {"native", "py"}, reason="requires Native or Py Runner to be in use"
)
def test_requesting_too_many_cpus():
df = daft.from_pydict(DATA)
system_info = SystemInfo()

my_udf_parametrized = my_udf.override_options(num_cpus=system_info.cpu_count() + 1)
my_udf_parametrized = my_udf.override_options(num_cpus=1000)
df = df.with_column(
"foo",
my_udf_parametrized(col("id")),
)

with pytest.raises(RuntimeError):
with pytest.raises(Exception):
df.collect()


Expand Down

0 comments on commit faf050c

Please sign in to comment.