From 127807e88d1c32c581f11c0ec91f5b51da769161 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 4 Oct 2024 22:06:18 -0700 Subject: [PATCH] add resource request num_gpu check --- Cargo.lock | 1 + src/common/resource-request/Cargo.toml | 1 + src/common/resource-request/src/lib.rs | 105 ++++++++++-------- .../rules/split_actor_pool_projects.rs | 6 +- tests/expressions/test_stateful_udf.py | 30 ++++- tests/test_resource_requests.py | 24 ++++ 6 files changed, 111 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db446053c8..dbcf6f2a46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1377,6 +1377,7 @@ dependencies = [ name = "common-resource-request" version = "0.3.0-dev0" dependencies = [ + "common-error", "common-hashable-float-wrapper", "common-py-serde", "pyo3", diff --git a/src/common/resource-request/Cargo.toml b/src/common/resource-request/Cargo.toml index d72d63b796..edec66e790 100644 --- a/src/common/resource-request/Cargo.toml +++ b/src/common/resource-request/Cargo.toml @@ -1,4 +1,5 @@ [dependencies] +common-error = {path = "../error"} common-hashable-float-wrapper = {path = "../hashable-float-wrapper"} common-py-serde = {path = "../py-serde"} pyo3 = {workspace = true, optional = true} diff --git a/src/common/resource-request/src/lib.rs b/src/common/resource-request/src/lib.rs index a422c91475..80657113f2 100644 --- a/src/common/resource-request/src/lib.rs +++ b/src/common/resource-request/src/lib.rs @@ -3,6 +3,7 @@ use std::{ ops::Add, }; +use common_error::{DaftError, DaftResult}; use common_hashable_float_wrapper::FloatWrapper; use common_py_serde::impl_bincode_py_state_serialization; #[cfg(feature = "python")] @@ -19,9 +20,9 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub struct ResourceRequest { - pub num_cpus: Option, - pub num_gpus: Option, - pub memory_bytes: Option, + num_cpus: Option, + num_gpus: Option, + memory_bytes: Option, } impl ResourceRequest { @@ -29,37 +30,48 @@ impl ResourceRequest { num_cpus: Option, num_gpus: Option, memory_bytes: Option, - ) -> Self { - Self { + ) -> DaftResult { + if let Some(num_gpus) = num_gpus { + if num_gpus < 0.0 { + return Err(DaftError::ValueError(format!( + "ResourceRequest num_gpus must be nonnegative, got {}", + num_gpus + ))); + } + + if num_gpus > 1.0 && num_gpus.fract() != 0.0 { + return Err(DaftError::ValueError(format!( + "ResourceRequest num_gpus greater than 1 must be an integer, got {}", + num_gpus + ))); + } + } + + Ok(Self { num_cpus, num_gpus, memory_bytes, - } + }) } pub fn default_cpu() -> Self { - Self::new_internal(Some(1.0), None, None) + Self::new_internal(Some(1.0), None, None).unwrap() } - pub fn or_num_cpus(&self, num_cpus: Option) -> Self { - Self { - num_cpus: self.num_cpus.or(num_cpus), - ..self.clone() - } + pub fn or_num_cpus(&self, num_cpus: Option) -> DaftResult { + Self::new_internal(self.num_cpus.or(num_cpus), self.num_gpus, self.memory_bytes) } - pub fn or_num_gpus(&self, num_gpus: Option) -> Self { - Self { - num_gpus: self.num_gpus.or(num_gpus), - ..self.clone() - } + pub fn or_num_gpus(&self, num_gpus: Option) -> DaftResult { + Self::new_internal(self.num_cpus, self.num_gpus.or(num_gpus), self.memory_bytes) } - pub fn or_memory_bytes(&self, memory_bytes: Option) -> Self { - Self { - memory_bytes: self.memory_bytes.or(memory_bytes), - ..self.clone() - } + pub fn or_memory_bytes(&self, memory_bytes: Option) -> DaftResult { + Self::new_internal( + self.num_cpus, + self.num_gpus, + self.memory_bytes.or(memory_bytes), + ) } pub fn has_any(&self) -> bool { @@ -104,7 +116,7 @@ impl ResourceRequest { let max_num_cpus = lift(float_max, self.num_cpus, other.num_cpus); let max_num_gpus = lift(float_max, self.num_gpus, other.num_gpus); let max_memory_bytes = lift(std::cmp::max, self.memory_bytes, other.memory_bytes); - Self::new_internal(max_num_cpus, max_num_gpus, max_memory_bytes) + Self::new_internal(max_num_cpus, max_num_gpus, max_memory_bytes).unwrap() } pub fn max_all>( @@ -115,7 +127,7 @@ impl ResourceRequest { .fold(Default::default(), |acc, e| acc.max(e.as_ref())) } - pub fn multiply(&self, factor: f64) -> Self { + pub fn multiply(&self, factor: f64) -> DaftResult { Self::new_internal( self.num_cpus.map(|x| x * factor), self.num_gpus.map(|x| x * factor), @@ -125,9 +137,9 @@ impl ResourceRequest { } impl Add for &ResourceRequest { - type Output = ResourceRequest; + type Output = DaftResult; fn add(self, other: Self) -> Self::Output { - Self::Output::new_internal( + ResourceRequest::new_internal( lift(Add::add, self.num_cpus, other.num_cpus), lift(Add::add, self.num_gpus, other.num_gpus), lift(Add::add, self.memory_bytes, other.memory_bytes), @@ -136,8 +148,8 @@ impl Add for &ResourceRequest { } impl Add for ResourceRequest { - type Output = Self; - fn add(self, other: Self) -> Self { + type Output = DaftResult; + fn add(self, other: Self) -> Self::Output { &self + &other } } @@ -174,8 +186,12 @@ fn float_max(left: f64, right: f64) -> f64 { #[pymethods] impl ResourceRequest { #[new] - pub fn new(num_cpus: Option, num_gpus: Option, memory_bytes: Option) -> Self { - Self::new_internal(num_cpus, num_gpus, memory_bytes) + pub fn new( + num_cpus: Option, + num_gpus: Option, + memory_bytes: Option, + ) -> PyResult { + Ok(Self::new_internal(num_cpus, num_gpus, memory_bytes)?) } /// Take a field-wise max of the list of resource requests. @@ -199,33 +215,24 @@ impl ResourceRequest { Ok(self.memory_bytes) } - pub fn with_num_cpus(&self, num_cpus: Option) -> Self { - Self { - num_cpus, - ..self.clone() - } + pub fn with_num_cpus(&self, num_cpus: Option) -> DaftResult { + Self::new_internal(num_cpus, self.num_gpus, self.memory_bytes) } - pub fn with_num_gpus(&self, num_gpus: Option) -> Self { - Self { - num_gpus, - ..self.clone() - } + pub fn with_num_gpus(&self, num_gpus: Option) -> DaftResult { + Self::new_internal(self.num_cpus, num_gpus, self.memory_bytes) } - pub fn with_memory_bytes(&self, memory_bytes: Option) -> Self { - Self { - memory_bytes, - ..self.clone() - } + pub fn with_memory_bytes(&self, memory_bytes: Option) -> DaftResult { + Self::new_internal(self.num_cpus, self.num_gpus, memory_bytes) } - fn __add__(&self, other: &Self) -> Self { - self + other + fn __add__(&self, other: &Self) -> PyResult { + Ok((self + other)?) } - fn __mul__(&self, factor: f64) -> Self { - self.multiply(factor) + fn __mul__(&self, factor: f64) -> PyResult { + Ok(self.multiply(factor)?) } fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool { diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 1d77502221..d2e7126fc0 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -588,11 +588,7 @@ mod tests { } fn create_resource_request() -> ResourceRequest { - ResourceRequest { - num_cpus: Some(8.), - num_gpus: Some(1.), - memory_bytes: None, - } + ResourceRequest::new_internal(Some(8.), Some(1.), None).unwrap() } #[test] diff --git a/tests/expressions/test_stateful_udf.py b/tests/expressions/test_stateful_udf.py index f5160da245..080c90ec81 100644 --- a/tests/expressions/test_stateful_udf.py +++ b/tests/expressions/test_stateful_udf.py @@ -1,5 +1,7 @@ from __future__ import annotations +import os + import pytest import daft @@ -95,8 +97,6 @@ def test_stateful_udf_cuda_env_var(concurrency, num_gpus): if concurrency * num_gpus > len(cuda_visible_devices()): pytest.skip("Not enough GPUs available") - import os - @udf(return_dtype=DataType.string(), num_gpus=num_gpus) class GetCudaVisibleDevices: def __init__(self): @@ -124,3 +124,29 @@ def __call__(self, data): all_devices = (",".join(unique_visible_devices)).split(",") assert len(all_devices) == concurrency * num_gpus + + +@pytest.mark.skipif(len(cuda_visible_devices()) == 0, reason="No GPUs available") +def test_stateful_udf_fractional_gpu(): + @udf(return_dtype=DataType.string(), num_gpus=0.5) + class FractionalGpuUdf: + def __init__(self): + self.cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"] + + def __call__(self, data): + assert os.environ["CUDA_VISIBLE_DEVICES"] == self.cuda_visible_devices + + import time + + time.sleep(0.1) + + return [self.cuda_visible_devices] * len(data) + + df = daft.from_pydict({"x": [1, 2]}) + df = df.into_partitions(2) + df = df.select(FractionalGpuUdf(df["x"])) + + result = df.to_pydict() + + unique_visible_devices = set(result["x"]) + assert len(unique_visible_devices) == 1 diff --git a/tests/test_resource_requests.py b/tests/test_resource_requests.py index 77cd7d8de0..3cab797498 100644 --- a/tests/test_resource_requests.py +++ b/tests/test_resource_requests.py @@ -330,3 +330,27 @@ def test_with_column_max_resources_rayrunner_gpu(): ) df.collect() + + +def test_improper_num_gpus(): + with pytest.raises(ValueError, match="DaftError::ValueError"): + + @udf(return_dtype=daft.DataType.int64(), num_gpus=-1) + def foo(c): + return c + + with pytest.raises(ValueError, match="DaftError::ValueError"): + + @udf(return_dtype=daft.DataType.int64(), num_gpus=1.5) + def foo(c): + return c + + @udf(return_dtype=daft.DataType.int64()) + def foo(c): + return c + + with pytest.raises(ValueError, match="DaftError::ValueError"): + foo = foo.override_options(num_gpus=-1) + + with pytest.raises(ValueError, match="DaftError::ValueError"): + foo = foo.override_options(num_gpus=1.5)