Skip to content

Commit

Permalink
add resource request num_gpu check
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinzwang committed Oct 5, 2024
1 parent df2507f commit 127807e
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 56 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/resource-request/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[dependencies]
common-error = {path = "../error"}
common-hashable-float-wrapper = {path = "../hashable-float-wrapper"}
common-py-serde = {path = "../py-serde"}
pyo3 = {workspace = true, optional = true}
Expand Down
105 changes: 56 additions & 49 deletions src/common/resource-request/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
ops::Add,
};

use common_error::{DaftError, DaftResult};
use common_hashable_float_wrapper::FloatWrapper;
use common_py_serde::impl_bincode_py_state_serialization;
#[cfg(feature = "python")]
Expand All @@ -19,47 +20,58 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))]
pub struct ResourceRequest {
pub num_cpus: Option<f64>,
pub num_gpus: Option<f64>,
pub memory_bytes: Option<usize>,
num_cpus: Option<f64>,
num_gpus: Option<f64>,
memory_bytes: Option<usize>,
}

impl ResourceRequest {
pub fn new_internal(
num_cpus: Option<f64>,
num_gpus: Option<f64>,
memory_bytes: Option<usize>,
) -> Self {
Self {
) -> DaftResult<Self> {
if let Some(num_gpus) = num_gpus {
if num_gpus < 0.0 {
return Err(DaftError::ValueError(format!(
"ResourceRequest num_gpus must be nonnegative, got {}",
num_gpus
)));
}

if num_gpus > 1.0 && num_gpus.fract() != 0.0 {
return Err(DaftError::ValueError(format!(
"ResourceRequest num_gpus greater than 1 must be an integer, got {}",
num_gpus
)));
}
}

Ok(Self {
num_cpus,
num_gpus,
memory_bytes,
}
})
}

pub fn default_cpu() -> Self {
Self::new_internal(Some(1.0), None, None)
Self::new_internal(Some(1.0), None, None).unwrap()
}

pub fn or_num_cpus(&self, num_cpus: Option<f64>) -> Self {
Self {
num_cpus: self.num_cpus.or(num_cpus),
..self.clone()
}
pub fn or_num_cpus(&self, num_cpus: Option<f64>) -> DaftResult<Self> {
Self::new_internal(self.num_cpus.or(num_cpus), self.num_gpus, self.memory_bytes)
}

pub fn or_num_gpus(&self, num_gpus: Option<f64>) -> Self {
Self {
num_gpus: self.num_gpus.or(num_gpus),
..self.clone()
}
pub fn or_num_gpus(&self, num_gpus: Option<f64>) -> DaftResult<Self> {
Self::new_internal(self.num_cpus, self.num_gpus.or(num_gpus), self.memory_bytes)
}

pub fn or_memory_bytes(&self, memory_bytes: Option<usize>) -> Self {
Self {
memory_bytes: self.memory_bytes.or(memory_bytes),
..self.clone()
}
pub fn or_memory_bytes(&self, memory_bytes: Option<usize>) -> DaftResult<Self> {
Self::new_internal(
self.num_cpus,
self.num_gpus,
self.memory_bytes.or(memory_bytes),
)
}

pub fn has_any(&self) -> bool {
Expand Down Expand Up @@ -104,7 +116,7 @@ impl ResourceRequest {
let max_num_cpus = lift(float_max, self.num_cpus, other.num_cpus);
let max_num_gpus = lift(float_max, self.num_gpus, other.num_gpus);
let max_memory_bytes = lift(std::cmp::max, self.memory_bytes, other.memory_bytes);
Self::new_internal(max_num_cpus, max_num_gpus, max_memory_bytes)
Self::new_internal(max_num_cpus, max_num_gpus, max_memory_bytes).unwrap()
}

pub fn max_all<ResourceRequestAsRef: AsRef<Self>>(
Expand All @@ -115,7 +127,7 @@ impl ResourceRequest {
.fold(Default::default(), |acc, e| acc.max(e.as_ref()))
}

pub fn multiply(&self, factor: f64) -> Self {
pub fn multiply(&self, factor: f64) -> DaftResult<Self> {
Self::new_internal(
self.num_cpus.map(|x| x * factor),
self.num_gpus.map(|x| x * factor),
Expand All @@ -125,9 +137,9 @@ impl ResourceRequest {
}

impl Add for &ResourceRequest {
type Output = ResourceRequest;
type Output = DaftResult<ResourceRequest>;
fn add(self, other: Self) -> Self::Output {
Self::Output::new_internal(
ResourceRequest::new_internal(
lift(Add::add, self.num_cpus, other.num_cpus),
lift(Add::add, self.num_gpus, other.num_gpus),
lift(Add::add, self.memory_bytes, other.memory_bytes),
Expand All @@ -136,8 +148,8 @@ impl Add for &ResourceRequest {
}

impl Add for ResourceRequest {
type Output = Self;
fn add(self, other: Self) -> Self {
type Output = DaftResult<Self>;
fn add(self, other: Self) -> Self::Output {
&self + &other
}
}
Expand Down Expand Up @@ -174,8 +186,12 @@ fn float_max(left: f64, right: f64) -> f64 {
#[pymethods]
impl ResourceRequest {
#[new]
pub fn new(num_cpus: Option<f64>, num_gpus: Option<f64>, memory_bytes: Option<usize>) -> Self {
Self::new_internal(num_cpus, num_gpus, memory_bytes)
pub fn new(
num_cpus: Option<f64>,
num_gpus: Option<f64>,
memory_bytes: Option<usize>,
) -> PyResult<Self> {
Ok(Self::new_internal(num_cpus, num_gpus, memory_bytes)?)
}

/// Take a field-wise max of the list of resource requests.
Expand All @@ -199,33 +215,24 @@ impl ResourceRequest {
Ok(self.memory_bytes)
}

pub fn with_num_cpus(&self, num_cpus: Option<f64>) -> Self {
Self {
num_cpus,
..self.clone()
}
pub fn with_num_cpus(&self, num_cpus: Option<f64>) -> DaftResult<Self> {
Self::new_internal(num_cpus, self.num_gpus, self.memory_bytes)
}

pub fn with_num_gpus(&self, num_gpus: Option<f64>) -> Self {
Self {
num_gpus,
..self.clone()
}
pub fn with_num_gpus(&self, num_gpus: Option<f64>) -> DaftResult<Self> {
Self::new_internal(self.num_cpus, num_gpus, self.memory_bytes)
}

pub fn with_memory_bytes(&self, memory_bytes: Option<usize>) -> Self {
Self {
memory_bytes,
..self.clone()
}
pub fn with_memory_bytes(&self, memory_bytes: Option<usize>) -> DaftResult<Self> {
Self::new_internal(self.num_cpus, self.num_gpus, memory_bytes)
}

fn __add__(&self, other: &Self) -> Self {
self + other
fn __add__(&self, other: &Self) -> PyResult<Self> {
Ok((self + other)?)
}

fn __mul__(&self, factor: f64) -> Self {
self.multiply(factor)
fn __mul__(&self, factor: f64) -> PyResult<Self> {
Ok(self.multiply(factor)?)
}

fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,11 +588,7 @@ mod tests {
}

fn create_resource_request() -> ResourceRequest {
ResourceRequest {
num_cpus: Some(8.),
num_gpus: Some(1.),
memory_bytes: None,
}
ResourceRequest::new_internal(Some(8.), Some(1.), None).unwrap()
}

#[test]
Expand Down
30 changes: 28 additions & 2 deletions tests/expressions/test_stateful_udf.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import os

import pytest

import daft
Expand Down Expand Up @@ -95,8 +97,6 @@ def test_stateful_udf_cuda_env_var(concurrency, num_gpus):
if concurrency * num_gpus > len(cuda_visible_devices()):
pytest.skip("Not enough GPUs available")

import os

@udf(return_dtype=DataType.string(), num_gpus=num_gpus)
class GetCudaVisibleDevices:
def __init__(self):
Expand Down Expand Up @@ -124,3 +124,29 @@ def __call__(self, data):

all_devices = (",".join(unique_visible_devices)).split(",")
assert len(all_devices) == concurrency * num_gpus


@pytest.mark.skipif(len(cuda_visible_devices()) == 0, reason="No GPUs available")
def test_stateful_udf_fractional_gpu():
@udf(return_dtype=DataType.string(), num_gpus=0.5)
class FractionalGpuUdf:
def __init__(self):
self.cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"]

def __call__(self, data):
assert os.environ["CUDA_VISIBLE_DEVICES"] == self.cuda_visible_devices

import time

time.sleep(0.1)

return [self.cuda_visible_devices] * len(data)

df = daft.from_pydict({"x": [1, 2]})
df = df.into_partitions(2)
df = df.select(FractionalGpuUdf(df["x"]))

result = df.to_pydict()

unique_visible_devices = set(result["x"])
assert len(unique_visible_devices) == 1
24 changes: 24 additions & 0 deletions tests/test_resource_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,27 @@ def test_with_column_max_resources_rayrunner_gpu():
)

df.collect()


def test_improper_num_gpus():
with pytest.raises(ValueError, match="DaftError::ValueError"):

@udf(return_dtype=daft.DataType.int64(), num_gpus=-1)
def foo(c):
return c

with pytest.raises(ValueError, match="DaftError::ValueError"):

@udf(return_dtype=daft.DataType.int64(), num_gpus=1.5)
def foo(c):
return c

@udf(return_dtype=daft.DataType.int64())
def foo(c):
return c

with pytest.raises(ValueError, match="DaftError::ValueError"):
foo = foo.override_options(num_gpus=-1)

with pytest.raises(ValueError, match="DaftError::ValueError"):
foo = foo.override_options(num_gpus=1.5)

0 comments on commit 127807e

Please sign in to comment.