From faf050cad09d15bb2d0f482742231b039a4df1e5 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 5 Dec 2024 00:04:02 -0800 Subject: [PATCH] [FEAT] Respect resource request for projections in swordfish (#3460) Make swordfish respect resource requests (CPU only for now) on projections. --------- Co-authored-by: Colin Ho --- src/common/resource-request/src/lib.rs | 12 ++++++++ .../intermediate_ops/actor_pool_project.rs | 4 +-- .../src/intermediate_ops/intermediate_op.rs | 12 +++++--- .../src/intermediate_ops/project.rs | 28 ++++++++++++++++++- tests/test_resource_requests.py | 14 ++++------ 5 files changed, 54 insertions(+), 16 deletions(-) diff --git a/src/common/resource-request/src/lib.rs b/src/common/resource-request/src/lib.rs index f79c2cf84d..8239da9b06 100644 --- a/src/common/resource-request/src/lib.rs +++ b/src/common/resource-request/src/lib.rs @@ -139,6 +139,18 @@ impl ResourceRequest { self.memory_bytes.map(|x| x * (factor as usize)), ) } + + pub fn num_cpus(&self) -> Option { + self.num_cpus + } + + pub fn num_gpus(&self) -> Option { + self.num_gpus + } + + pub fn memory_bytes(&self) -> Option { + self.memory_bytes + } } impl Add for &ResourceRequest { diff --git a/src/daft-local-execution/src/intermediate_ops/actor_pool_project.rs b/src/daft-local-execution/src/intermediate_ops/actor_pool_project.rs index 92ba1de5f7..de2369985e 100644 --- a/src/daft-local-execution/src/intermediate_ops/actor_pool_project.rs +++ b/src/daft-local-execution/src/intermediate_ops/actor_pool_project.rs @@ -181,8 +181,8 @@ impl IntermediateOperator for ActorPoolProjectOperator { })) } - fn max_concurrency(&self) -> usize { - self.concurrency + fn max_concurrency(&self) -> DaftResult { + Ok(self.concurrency) } fn dispatch_spawner( diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index 91dba3de0a..67e3f15832 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -4,6 +4,7 @@ use common_display::tree::TreeDisplay; use common_error::DaftResult; use common_runtime::{get_compute_runtime, RuntimeRef}; use daft_micropartition::MicroPartition; +use snafu::ResultExt; use tracing::{info_span, instrument}; use crate::{ @@ -14,7 +15,7 @@ use crate::{ dispatcher::{DispatchSpawner, RoundRobinDispatcher, UnorderedDispatcher}, pipeline::PipelineNode, runtime_stats::{CountingReceiver, CountingSender, RuntimeStatsContext}, - ExecutionRuntimeContext, OperatorOutput, NUM_CPUS, + ExecutionRuntimeContext, OperatorOutput, PipelineExecutionSnafu, NUM_CPUS, }; pub(crate) trait IntermediateOpState: Send + Sync { @@ -49,8 +50,9 @@ pub trait IntermediateOperator: Send + Sync { } /// The maximum number of concurrent workers that can be spawned for this operator. /// Each worker will has its own IntermediateOperatorState. - fn max_concurrency(&self) -> usize { - *NUM_CPUS + /// This method should be overridden if the operator needs to limit the number of concurrent workers, i.e. UDFs with resource requests. + fn max_concurrency(&self) -> DaftResult { + Ok(*NUM_CPUS) } fn dispatch_spawner( @@ -208,7 +210,9 @@ impl PipelineNode for IntermediateNode { )); } let op = self.intermediate_op.clone(); - let num_workers = op.max_concurrency(); + let num_workers = op.max_concurrency().context(PipelineExecutionSnafu { + node_name: self.name(), + })?; let (destination_sender, destination_receiver) = create_channel(1); let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone()); diff --git a/src/daft-local-execution/src/intermediate_ops/project.rs b/src/daft-local-execution/src/intermediate_ops/project.rs index d0854cf238..404bf407ac 100644 --- a/src/daft-local-execution/src/intermediate_ops/project.rs +++ b/src/daft-local-execution/src/intermediate_ops/project.rs @@ -1,7 +1,8 @@ use std::sync::Arc; +use common_error::{DaftError, DaftResult}; use common_runtime::RuntimeRef; -use daft_dsl::ExprRef; +use daft_dsl::{functions::python::get_resource_request, ExprRef}; use daft_micropartition::MicroPartition; use tracing::instrument; @@ -9,6 +10,7 @@ use super::intermediate_op::{ IntermediateOpExecuteResult, IntermediateOpState, IntermediateOperator, IntermediateOperatorResult, }; +use crate::NUM_CPUS; pub struct ProjectOperator { projection: Arc>, @@ -45,4 +47,28 @@ impl IntermediateOperator for ProjectOperator { fn name(&self) -> &'static str { "ProjectOperator" } + + fn max_concurrency(&self) -> DaftResult { + let resource_request = get_resource_request(&self.projection); + match resource_request { + // If the resource request specifies a number of CPUs, the max concurrency is the number of CPUs + // divided by the requested number of CPUs, clamped to (1, NUM_CPUS). + // E.g. if the resource request specifies 2 CPUs and NUM_CPUS is 4, the max concurrency is 2. + Some(resource_request) if resource_request.num_cpus().is_some() => { + let requested_num_cpus = resource_request.num_cpus().unwrap(); + if requested_num_cpus > *NUM_CPUS as f64 { + Err(DaftError::ValueError(format!( + "Requested {} CPUs but found only {} available", + requested_num_cpus, *NUM_CPUS + ))) + } else { + Ok( + (*NUM_CPUS as f64 / requested_num_cpus).clamp(1.0, *NUM_CPUS as f64) + as usize, + ) + } + } + _ => Ok(*NUM_CPUS), + } + } } diff --git a/tests/test_resource_requests.py b/tests/test_resource_requests.py index 2f3d867f32..213e7af860 100644 --- a/tests/test_resource_requests.py +++ b/tests/test_resource_requests.py @@ -13,11 +13,6 @@ from daft.internal.gpu import cuda_visible_devices from tests.conftest import get_tests_daft_runner_name -pytestmark = pytest.mark.skipif( - get_tests_daft_runner_name() == "native", - reason="Native runner does not support resource requests", -) - def no_gpu_available() -> bool: return len(cuda_visible_devices()) == 0 @@ -81,18 +76,19 @@ def test_resource_request_pickle_roundtrip(): ### -@pytest.mark.skipif(get_tests_daft_runner_name() not in {"py"}, reason="requires PyRunner to be in use") +@pytest.mark.skipif( + get_tests_daft_runner_name() not in {"native", "py"}, reason="requires Native or Py Runner to be in use" +) def test_requesting_too_many_cpus(): df = daft.from_pydict(DATA) - system_info = SystemInfo() - my_udf_parametrized = my_udf.override_options(num_cpus=system_info.cpu_count() + 1) + my_udf_parametrized = my_udf.override_options(num_cpus=1000) df = df.with_column( "foo", my_udf_parametrized(col("id")), ) - with pytest.raises(RuntimeError): + with pytest.raises(Exception): df.collect()