diff --git a/Cargo.lock b/Cargo.lock index 8d7b1b8401..c57309e10f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1378,6 +1378,7 @@ dependencies = [ name = "common-resource-request" version = "0.3.0-dev0" dependencies = [ + "common-error", "common-hashable-float-wrapper", "common-py-serde", "pyo3", diff --git a/daft/dependencies.py b/daft/dependencies.py index e4c692b4bf..56e2054c97 100644 --- a/daft/dependencies.py +++ b/daft/dependencies.py @@ -3,8 +3,6 @@ from daft.lazy_import import LazyImport if TYPE_CHECKING: - import xml.etree.ElementTree as ET - import fsspec import numpy as np import pandas as pd @@ -16,8 +14,6 @@ import pyarrow.json as pajson import pyarrow.parquet as pq else: - ET = LazyImport("xml.etree.ElementTree") - fsspec = LazyImport("fsspec") np = LazyImport("numpy") pd = LazyImport("pandas") diff --git a/daft/internal/gpu.py b/daft/internal/gpu.py index fb0a25565a..1c7a527c4d 100644 --- a/daft/internal/gpu.py +++ b/daft/internal/gpu.py @@ -1,18 +1,40 @@ from __future__ import annotations -import subprocess +import warnings -from daft.dependencies import ET +def _raw_device_count_nvml() -> int: + """ + Return number of devices as reported by NVML or zero if NVML discovery/initialization failed. + + Inspired by PyTorch: https://github.com/pytorch/pytorch/blob/88e54de21976aa504e797e47f06b480b9108ef5c/torch/cuda/__init__.py#L711 + """ + from ctypes import CDLL, byref, c_int -def cuda_device_count(): - """Returns the number of CUDA devices detected by nvidia-smi command""" try: - nvidia_smi_output = subprocess.check_output(["nvidia-smi", "-x", "-q"]) - except Exception: + nvml_h = CDLL("libnvidia-ml.so.1") + except OSError: + return 0 + rc = nvml_h.nvmlInit() + if rc != 0: + warnings.warn("Can't initialize NVML, assuming no CUDA devices.") return 0 - root = ET.fromstring(nvidia_smi_output.decode("utf-8")) - attached_gpus = root.find("attached_gpus") - if attached_gpus is None: + dev_count = c_int(0) + rc = nvml_h.nvmlDeviceGetCount_v2(byref(dev_count)) + if rc != 0: + warnings.warn("Can't get nvml device count, assuming no CUDA devices.") return 0 - return int(attached_gpus.text) + del nvml_h + return dev_count.value + + +def cuda_visible_devices() -> list[str]: + """Get the list of CUDA devices visible to the current process.""" + import os + + visible_devices_envvar = os.getenv("CUDA_VISIBLE_DEVICES") + + if visible_devices_envvar is None: + return [str(i) for i in range(_raw_device_count_nvml())] + + return [device.strip() for device in visible_devices_envvar.split(",") if device.strip()] diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index d861e2e060..e5a8c766a7 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -2,18 +2,19 @@ import contextlib import logging +import multiprocessing as mp import threading import uuid from concurrent import futures from dataclasses import dataclass -from typing import TYPE_CHECKING, Iterator +from typing import TYPE_CHECKING, Callable, Iterator from daft.context import get_context from daft.daft import FileFormatConfig, FileInfos, IOConfig, ResourceRequest, SystemInfo from daft.execution.native_executor import NativeExecutor from daft.expressions import ExpressionsProjection from daft.filesystem import glob_path_with_stats -from daft.internal.gpu import cuda_device_count +from daft.internal.gpu import cuda_visible_devices from daft.runners import runner_io from daft.runners.partitioning import ( MaterializedResult, @@ -107,27 +108,147 @@ def wait(self) -> None: pass -class PyActorPool: - initialized_stateful_udfs_process_singleton: dict[str, UserProvidedPythonFunction] | None = None +@dataclass +class AcquiredResources: + num_cpus: float + gpus: dict[str, float] + memory_bytes: int - def __init__( - self, - pool_id: str, - num_actors: int, - resource_request: ResourceRequest, - projection: ExpressionsProjection, - ): - self._pool_id = pool_id - self._num_actors = num_actors - self._resource_request = resource_request - self._executor: futures.ProcessPoolExecutor | None = None - self._projection = projection + +class PyRunnerResources: + def __init__(self, num_cpus: float, gpus: list[str], memory_bytes: int): + gpus_dict = {gpu: 1.0 for gpu in gpus} + self.num_cpus = num_cpus + self.num_gpus = len(gpus) + self.memory_bytes = memory_bytes + + self.available_resources = AcquiredResources(num_cpus, gpus_dict, memory_bytes) + self.lock = threading.Lock() + + def try_acquire(self, resource_request: ResourceRequest) -> AcquiredResources | None: + resources = self.try_acquire_multiple([resource_request]) + return resources[0] if resources is not None else None + + def try_acquire_multiple(self, resource_requests: list[ResourceRequest]) -> list[AcquiredResources] | None: + """ + Attempts to acquire the requested resources. + + If the requested resources are available, returns a list of `AcquiredResources` with the amount of acquired CPUs and memory, as well as the specific GPUs that were acquired per request. + + If the requested resources are not available, returns None. + """ + all_requested_cpus = [r.num_cpus or 0.0 for r in resource_requests] + total_requested_cpus = sum(all_requested_cpus) + + all_requested_memory_bytes = [r.memory_bytes or 0 for r in resource_requests] + total_requested_memory_bytes = sum(all_requested_memory_bytes) + + total_requested_gpus = sum([r.num_gpus or 0.0 for r in resource_requests]) + + for resource_name, requested, total in [ + ("CPUs", total_requested_cpus, self.num_cpus), + ("bytes of memory", total_requested_memory_bytes, self.memory_bytes), + ("GPUs", total_requested_gpus, self.num_gpus), + ]: + if requested > total: + raise RuntimeError(f"Requested {requested} {resource_name} but found only {total} available") + + with self.lock: + if total_requested_cpus > self.available_resources.num_cpus: + return None + + if total_requested_memory_bytes > self.available_resources.memory_bytes: + return None + + remaining_available_gpus = self.available_resources.gpus.copy() + all_requested_gpus = [] + + # choose GPUs for resource requests + for r in resource_requests: + num_gpus = r.num_gpus or 0.0 + chosen_gpus = {} + + if num_gpus.is_integer(): + for device in remaining_available_gpus: + if num_gpus == 0: + break + + if remaining_available_gpus[device] == 1.0: + chosen_gpus[device] = 1.0 + num_gpus -= 1.0 + + if num_gpus > 0: + return None + else: + # do not allow fractional GPUs above 1.0, similar to Ray's behavior + # this should have been validated when creating the resource request so we only do an assert here + assert 0 <= num_gpus < 1 + + chosen_gpu = None + + # greedily choose GPU that has lowest fraction available which can fit the requested fraction + for device, fraction in remaining_available_gpus.items(): + if fraction >= num_gpus: + if chosen_gpu is None or fraction < remaining_available_gpus[chosen_gpu]: + chosen_gpu = device + + if chosen_gpu is None: + return None + + chosen_gpus[chosen_gpu] = num_gpus + + for device, fraction in chosen_gpus.items(): + remaining_available_gpus[device] -= fraction + + all_requested_gpus.append(chosen_gpus) + + self.available_resources.num_cpus -= total_requested_cpus + self.available_resources.memory_bytes -= total_requested_memory_bytes + self.available_resources.gpus = remaining_available_gpus + + return [ + AcquiredResources(num_cpus, gpus, memory_bytes) + for num_cpus, gpus, memory_bytes in zip( + all_requested_cpus, all_requested_gpus, all_requested_memory_bytes + ) + ] + + def release(self, resources: AcquiredResources | list[AcquiredResources]): + """Admit the resources back into the resource pool.""" + with self.lock: + if not isinstance(resources, list): + resources = [resources] + + for r in resources: + self.available_resources.num_cpus += r.num_cpus + self.available_resources.memory_bytes += r.memory_bytes + for gpu, amount in r.gpus.items(): + self.available_resources.gpus[gpu] += amount + + +class PyStatefulActorSingleton: + """ + This class stores the singleton `initialized_udfs` that is isolated to each Python process. It stores the stateful UDF objects of a single actor. + + Currently, only one stateful UDF per actor is supported, but we allow multiple here in case we want to support multiple stateful UDFs in the future. + + Note: The class methods should only be called inside of actor processes. + """ + + initialized_udfs: dict[str, UserProvidedPythonFunction] | None = None @staticmethod - def initialize_actor_global_state(uninitialized_projection: ExpressionsProjection): + def initialize_actor_global_state( + uninitialized_projection: ExpressionsProjection, + cuda_device_queue: mp.Queue[str], + ): + import os + from daft.daft import extract_partial_stateful_udf_py - if PyActorPool.initialized_stateful_udfs_process_singleton is not None: + os.environ["CUDA_VISIBLE_DEVICES"] = cuda_device_queue.get(timeout=1) + + if PyStatefulActorSingleton.initialized_udfs is not None: raise RuntimeError("Cannot initialize Python process actor twice.") else: partial_stateful_udfs = { @@ -138,15 +259,13 @@ def initialize_actor_global_state(uninitialized_projection: ExpressionsProjectio logger.info("Initializing stateful UDFs: %s", ", ".join(partial_stateful_udfs.keys())) - PyActorPool.initialized_stateful_udfs_process_singleton = {} + PyStatefulActorSingleton.initialized_udfs = {} for name, (partial_udf, init_args) in partial_stateful_udfs.items(): if init_args is None: - PyActorPool.initialized_stateful_udfs_process_singleton[name] = partial_udf.func_cls() + PyStatefulActorSingleton.initialized_udfs[name] = partial_udf.func_cls() else: args, kwargs = init_args - PyActorPool.initialized_stateful_udfs_process_singleton[name] = partial_udf.func_cls( - *args, **kwargs - ) + PyStatefulActorSingleton.initialized_udfs[name] = partial_udf.func_cls(*args, **kwargs) @staticmethod def build_partitions_with_stateful_project( @@ -155,7 +274,7 @@ def build_partitions_with_stateful_project( partial_metadata: PartialPartitionMetadata, ) -> list[MaterializedResult[MicroPartition]]: # Bind the expressions to the initialized stateful UDFs, which should already have been initialized at process start-up - initialized_stateful_udfs = PyActorPool.initialized_stateful_udfs_process_singleton + initialized_stateful_udfs = PyStatefulActorSingleton.initialized_udfs assert ( initialized_stateful_udfs is not None ), "PyActor process must be initialized with stateful UDFs before execution" @@ -167,6 +286,21 @@ def build_partitions_with_stateful_project( PyMaterializedResult(new_part, PartitionMetadata.from_table(new_part).merge_with_partial(partial_metadata)) ] + +class PyActorPool: + def __init__( + self, + pool_id: str, + num_actors: int, + resources: list[AcquiredResources], + projection: ExpressionsProjection, + ): + self._pool_id = pool_id + self._num_actors = num_actors + self._resources = resources + self._executor: futures.ProcessPoolExecutor | None = None + self._projection = projection + def submit( self, instruction_stack: list[Instruction], @@ -189,7 +323,7 @@ def submit( partial_metadata = final_metadata[0] return self._executor.submit( - PyActorPool.build_partitions_with_stateful_project, + PyStatefulActorSingleton.build_partitions_with_stateful_project, projection, partition, partial_metadata, @@ -202,8 +336,15 @@ def teardown(self) -> None: self._executor = None def setup(self) -> None: + cuda_device_queue: mp.Queue[str] = mp.Queue() + for r in self._resources: + visible_device_str = ",".join(r.gpus.keys()) + cuda_device_queue.put(visible_device_str) + self._executor = futures.ProcessPoolExecutor( - self._num_actors, initializer=PyActorPool.initialize_actor_global_state, initargs=(self._projection,) + self._num_actors, + initializer=PyStatefulActorSingleton.initialize_actor_global_state, + initargs=(self._projection, cuda_device_queue), ) @@ -245,18 +386,16 @@ def __init__(self, use_thread_pool: bool | None) -> None: if num_cpus is None: import multiprocessing - self.num_cpus = multiprocessing.cpu_count() - else: - self.num_cpus = num_cpus + num_cpus = multiprocessing.cpu_count() - self.num_gpus = cuda_device_count() - self.total_bytes_memory = system_info.total_memory() + gpus = cuda_visible_devices() + memory_bytes = system_info.total_memory() - # Resource accounting: - self._resource_accounting_lock = threading.Lock() - self._available_bytes_memory = self.total_bytes_memory - self._available_cpus = float(self.num_cpus) - self._available_gpus = float(self.num_gpus) + self._resources = PyRunnerResources( + num_cpus, + gpus, + memory_bytes, + ) def runner_io(self) -> PyRunnerIO: return PyRunnerIO() @@ -346,28 +485,41 @@ def actor_pool_context( ) -> Iterator[str]: actor_pool_id = f"py_actor_pool-{name}" - total_resource_request = actor_resource_request * num_actors - admitted = self._attempt_admit_task(total_resource_request) - - if not admitted: + resources = self._resources.try_acquire_multiple([actor_resource_request] * num_actors) + if resources is None: raise RuntimeError( f"Not enough resources available to admit {num_actors} actors, each with resource request: {actor_resource_request}" ) try: self._actor_pools[actor_pool_id] = PyActorPool( - actor_pool_id, num_actors, actor_resource_request, projection + actor_pool_id, + num_actors, + resources, + projection, ) self._actor_pools[actor_pool_id].setup() - logger.debug("Created actor pool %s with resources: %s", actor_pool_id, total_resource_request) + logger.debug( + "Created actor pool %s with %s actors, each with resources: %s", + actor_pool_id, + num_actors, + actor_resource_request, + ) yield actor_pool_id # NOTE: Ensure that teardown always occurs regardless of any errors that occur during actor pool setup or execution finally: logger.debug("Tearing down actor pool: %s", actor_pool_id) - self._release_resources(total_resource_request) + self._resources.release(resources) self._actor_pools[actor_pool_id].teardown() del self._actor_pools[actor_pool_id] + def _create_resource_release_callback(self, resources: AcquiredResources) -> Callable[[futures.Future], None]: + """ + This higher order function is used so that the `resources` released by the callback + are from the ones stored in the variable at the creation of the callback instead of during its call. + """ + return lambda _: self._resources.release(resources) + def _physical_plan_to_partitions( self, execution_id: str, @@ -401,11 +553,9 @@ def _physical_plan_to_partitions( else: # next_task is a task to run. - task_admitted = self._attempt_admit_task( - next_step.resource_request, - ) + resources = self._resources.try_acquire(next_step.resource_request) - if not task_admitted: + if resources is None: # Insufficient resources; await some tasks. logger.debug( "execution[%s] Skipping to wait on dispatched tasks: insufficient resources", @@ -437,7 +587,7 @@ def _physical_plan_to_partitions( next_step.partial_metadatas, ) - self._release_resources(next_step.resource_request) + self._resources.release(resources) next_step.set_result(materialized_results) @@ -466,9 +616,7 @@ def _physical_plan_to_partitions( next_step.partial_metadatas, ) - resource_request = next_step.resource_request - - future.add_done_callback(lambda _: self._release_resources(resource_request)) + future.add_done_callback(self._create_resource_release_callback(resources)) # Register the inflight task assert ( @@ -522,44 +670,6 @@ def _physical_plan_to_partitions( if exec_id == execution_id: del self._inflight_futures[(exec_id, task_id)] - def _check_resource_requests(self, resource_request: ResourceRequest) -> None: - """Validates that the requested ResourceRequest is possible to run locally""" - - if resource_request.num_cpus is not None and resource_request.num_cpus > self.num_cpus: - raise RuntimeError(f"Requested {resource_request.num_cpus} CPUs but found only {self.num_cpus} available") - if resource_request.num_gpus is not None and resource_request.num_gpus > self.num_gpus: - raise RuntimeError(f"Requested {resource_request.num_gpus} GPUs but found only {self.num_gpus} available") - if resource_request.memory_bytes is not None and resource_request.memory_bytes > self.total_bytes_memory: - raise RuntimeError( - f"Requested {resource_request.memory_bytes} bytes of memory but found only {self.total_bytes_memory} available" - ) - - def _attempt_admit_task( - self, - resource_request: ResourceRequest, - ) -> bool: - self._check_resource_requests(resource_request) - - with self._resource_accounting_lock: - memory_okay = (resource_request.memory_bytes or 0) <= self._available_bytes_memory - cpus_okay = (resource_request.num_cpus or 0) <= self._available_cpus - gpus_okay = (resource_request.num_gpus or 0) <= self._available_gpus - all_okay = all((cpus_okay, gpus_okay, memory_okay)) - - # Update resource accounting if we have the resources (this is considered as the task being "admitted") - if all_okay: - self._available_bytes_memory -= resource_request.memory_bytes or 0 - self._available_cpus -= resource_request.num_cpus or 0.0 - self._available_gpus -= resource_request.num_gpus or 0.0 - - return all_okay - - def _release_resources(self, resource_request: ResourceRequest) -> None: - with self._resource_accounting_lock: - self._available_bytes_memory += resource_request.memory_bytes or 0 - self._available_cpus += resource_request.num_cpus or 0.0 - self._available_gpus += resource_request.num_gpus or 0.0 - def build_partitions( self, instruction_stack: list[Instruction], diff --git a/src/common/resource-request/Cargo.toml b/src/common/resource-request/Cargo.toml index d72d63b796..edec66e790 100644 --- a/src/common/resource-request/Cargo.toml +++ b/src/common/resource-request/Cargo.toml @@ -1,4 +1,5 @@ [dependencies] +common-error = {path = "../error"} common-hashable-float-wrapper = {path = "../hashable-float-wrapper"} common-py-serde = {path = "../py-serde"} pyo3 = {workspace = true, optional = true} diff --git a/src/common/resource-request/src/lib.rs b/src/common/resource-request/src/lib.rs index 0b27d4a054..f79c2cf84d 100644 --- a/src/common/resource-request/src/lib.rs +++ b/src/common/resource-request/src/lib.rs @@ -3,6 +3,7 @@ use std::{ ops::Add, }; +use common_error::{DaftError, DaftResult}; use common_hashable_float_wrapper::FloatWrapper; use common_py_serde::impl_bincode_py_state_serialization; #[cfg(feature = "python")] @@ -19,52 +20,59 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub struct ResourceRequest { - pub num_cpus: Option, - pub num_gpus: Option, - pub memory_bytes: Option, + num_cpus: Option, + num_gpus: Option, + memory_bytes: Option, } impl ResourceRequest { - #[must_use] - pub fn new_internal( + pub fn try_new_internal( num_cpus: Option, num_gpus: Option, memory_bytes: Option, - ) -> Self { - Self { + ) -> DaftResult { + if let Some(num_gpus) = num_gpus { + if num_gpus < 0.0 { + return Err(DaftError::ValueError(format!( + "ResourceRequest num_gpus must be nonnegative, got {}", + num_gpus + ))); + } + + if num_gpus > 1.0 && num_gpus.fract() != 0.0 { + return Err(DaftError::ValueError(format!( + "ResourceRequest num_gpus greater than 1 must be an integer, got {}", + num_gpus + ))); + } + } + + Ok(Self { num_cpus, num_gpus, memory_bytes, - } + }) } #[must_use] pub fn default_cpu() -> Self { - Self::new_internal(Some(1.0), None, None) + Self::try_new_internal(Some(1.0), None, None).unwrap() } - #[must_use] - pub fn or_num_cpus(&self, num_cpus: Option) -> Self { - Self { - num_cpus: self.num_cpus.or(num_cpus), - ..self.clone() - } + pub fn or_num_cpus(&self, num_cpus: Option) -> DaftResult { + Self::try_new_internal(self.num_cpus.or(num_cpus), self.num_gpus, self.memory_bytes) } - #[must_use] - pub fn or_num_gpus(&self, num_gpus: Option) -> Self { - Self { - num_gpus: self.num_gpus.or(num_gpus), - ..self.clone() - } + pub fn or_num_gpus(&self, num_gpus: Option) -> DaftResult { + Self::try_new_internal(self.num_cpus, self.num_gpus.or(num_gpus), self.memory_bytes) } - #[must_use] - pub fn or_memory_bytes(&self, memory_bytes: Option) -> Self { - Self { - memory_bytes: self.memory_bytes.or(memory_bytes), - ..self.clone() - } + pub fn or_memory_bytes(&self, memory_bytes: Option) -> DaftResult { + Self::try_new_internal( + self.num_cpus, + self.num_gpus, + self.memory_bytes.or(memory_bytes), + ) } #[must_use] @@ -113,7 +121,7 @@ impl ResourceRequest { let max_num_cpus = lift(float_max, self.num_cpus, other.num_cpus); let max_num_gpus = lift(float_max, self.num_gpus, other.num_gpus); let max_memory_bytes = lift(std::cmp::max, self.memory_bytes, other.memory_bytes); - Self::new_internal(max_num_cpus, max_num_gpus, max_memory_bytes) + Self::try_new_internal(max_num_cpus, max_num_gpus, max_memory_bytes).unwrap() } pub fn max_all>( @@ -124,9 +132,8 @@ impl ResourceRequest { .fold(Self::default(), |acc, e| acc.max(e.as_ref())) } - #[must_use] - pub fn multiply(&self, factor: f64) -> Self { - Self::new_internal( + pub fn multiply(&self, factor: f64) -> DaftResult { + Self::try_new_internal( self.num_cpus.map(|x| x * factor), self.num_gpus.map(|x| x * factor), self.memory_bytes.map(|x| x * (factor as usize)), @@ -135,9 +142,9 @@ impl ResourceRequest { } impl Add for &ResourceRequest { - type Output = ResourceRequest; + type Output = DaftResult; fn add(self, other: Self) -> Self::Output { - Self::Output::new_internal( + ResourceRequest::try_new_internal( lift(Add::add, self.num_cpus, other.num_cpus), lift(Add::add, self.num_gpus, other.num_gpus), lift(Add::add, self.memory_bytes, other.memory_bytes), @@ -146,8 +153,8 @@ impl Add for &ResourceRequest { } impl Add for ResourceRequest { - type Output = Self; - fn add(self, other: Self) -> Self { + type Output = DaftResult; + fn add(self, other: Self) -> Self::Output { &self + &other } } @@ -184,9 +191,12 @@ fn float_max(left: f64, right: f64) -> f64 { #[pymethods] impl ResourceRequest { #[new] - #[must_use] - pub fn new(num_cpus: Option, num_gpus: Option, memory_bytes: Option) -> Self { - Self::new_internal(num_cpus, num_gpus, memory_bytes) + pub fn new( + num_cpus: Option, + num_gpus: Option, + memory_bytes: Option, + ) -> PyResult { + Ok(Self::try_new_internal(num_cpus, num_gpus, memory_bytes)?) } /// Take a field-wise max of the list of resource requests. @@ -211,36 +221,24 @@ impl ResourceRequest { Ok(self.memory_bytes) } - #[must_use] - pub fn with_num_cpus(&self, num_cpus: Option) -> Self { - Self { - num_cpus, - ..self.clone() - } + pub fn with_num_cpus(&self, num_cpus: Option) -> DaftResult { + Self::try_new_internal(num_cpus, self.num_gpus, self.memory_bytes) } - #[must_use] - pub fn with_num_gpus(&self, num_gpus: Option) -> Self { - Self { - num_gpus, - ..self.clone() - } + pub fn with_num_gpus(&self, num_gpus: Option) -> DaftResult { + Self::try_new_internal(self.num_cpus, num_gpus, self.memory_bytes) } - #[must_use] - pub fn with_memory_bytes(&self, memory_bytes: Option) -> Self { - Self { - memory_bytes, - ..self.clone() - } + pub fn with_memory_bytes(&self, memory_bytes: Option) -> DaftResult { + Self::try_new_internal(self.num_cpus, self.num_gpus, memory_bytes) } - fn __add__(&self, other: &Self) -> Self { - self + other + fn __add__(&self, other: &Self) -> PyResult { + Ok((self + other)?) } - fn __mul__(&self, factor: f64) -> Self { - self.multiply(factor) + fn __mul__(&self, factor: f64) -> PyResult { + Ok(self.multiply(factor)?) } fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool { diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 170eace0d6..dace536834 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -615,11 +615,7 @@ mod tests { } fn create_resource_request() -> ResourceRequest { - ResourceRequest { - num_cpus: Some(8.), - num_gpus: Some(1.), - memory_bytes: None, - } + ResourceRequest::try_new_internal(Some(8.), Some(1.), None).unwrap() } #[test] diff --git a/tests/actor_pool/test_actor_cuda_devices.py b/tests/actor_pool/test_actor_cuda_devices.py new file mode 100644 index 0000000000..4f8bf6c410 --- /dev/null +++ b/tests/actor_pool/test_actor_cuda_devices.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +import os +from contextlib import contextmanager + +import pytest +import ray + +import daft +from daft.context import get_context, set_planning_config +from daft.datatype import DataType +from daft.internal.gpu import cuda_visible_devices +from daft.udf import udf + +pytestmark = pytest.mark.skipif( + get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + + +@pytest.fixture(scope="module") +def enable_actor_pool(): + try: + original_config = get_context().daft_planning_config + + set_planning_config( + config=get_context().daft_planning_config.with_config_values(enable_actor_pool_projections=True) + ) + yield + finally: + set_planning_config(config=original_config) + + +@contextmanager +def reset_runner_with_gpus(num_gpus, monkeypatch): + """If current runner does not have enough GPUs, create a new runner with mocked GPU resources""" + if len(cuda_visible_devices()) < num_gpus: + if get_context().runner_config.name == "ray": + try: + ray.shutdown() + ray.init(num_gpus=num_gpus) + yield + finally: + ray.shutdown() + ray.init() + else: + try: + monkeypatch.setenv("CUDA_VISIBLE_DEVICES", ",".join(str(i) for i in range(num_gpus))) + + # Need to reset runner to recompute resources + original_runner = daft.context.get_context()._runner + daft.context.get_context()._runner = None + yield + finally: + daft.context.get_context()._runner = original_runner + else: + yield + + +@pytest.mark.parametrize("concurrency", [1, 2]) +@pytest.mark.parametrize("num_gpus", [1, 2]) +def test_stateful_udf_cuda_env_var(enable_actor_pool, monkeypatch, concurrency, num_gpus): + with reset_runner_with_gpus(concurrency * num_gpus, monkeypatch): + + @udf(return_dtype=DataType.string(), num_gpus=num_gpus) + class GetCudaVisibleDevices: + def __init__(self): + self.cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"] + + def __call__(self, data): + assert os.environ["CUDA_VISIBLE_DEVICES"] == self.cuda_visible_devices + + import time + + time.sleep(0.1) + + return [self.cuda_visible_devices] * len(data) + + GetCudaVisibleDevices = GetCudaVisibleDevices.with_concurrency(concurrency) + + df = daft.from_pydict({"x": [1, 2, 3, 4]}) + df = df.repartition(4) + df = df.select(GetCudaVisibleDevices(df["x"])) + + result = df.to_pydict() + + unique_visible_devices = set(result["x"]) + assert len(unique_visible_devices) == concurrency + + all_devices = (",".join(unique_visible_devices)).split(",") + assert len(all_devices) == concurrency * num_gpus + + +def test_stateful_udf_fractional_gpu(enable_actor_pool, monkeypatch): + with reset_runner_with_gpus(1, monkeypatch): + + @udf(return_dtype=DataType.string(), num_gpus=0.5) + class FractionalGpuUdf: + def __init__(self): + self.cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"] + + def __call__(self, data): + assert os.environ["CUDA_VISIBLE_DEVICES"] == self.cuda_visible_devices + + import time + + time.sleep(0.1) + + return [self.cuda_visible_devices] * len(data) + + FractionalGpuUdf = FractionalGpuUdf.with_concurrency(2) + + df = daft.from_pydict({"x": [1, 2]}) + df = df.into_partitions(2) + df = df.select(FractionalGpuUdf(df["x"])) + + result = df.to_pydict() + + unique_visible_devices = set(result["x"]) + assert len(unique_visible_devices) == 1 + + +@pytest.mark.skipif(get_context().runner_config.name != "py", reason="Test can only be run on PyRunner") +def test_stateful_udf_no_cuda_devices(enable_actor_pool, monkeypatch): + monkeypatch.setattr(daft.internal.gpu, "_raw_device_count_nvml", lambda: 0) + monkeypatch.delenv("CUDA_VISIBLE_DEVICES", raising=False) + + original_runner = daft.context.get_context()._runner + + try: + daft.context.get_context()._runner = None + + @udf(return_dtype=DataType.string(), num_gpus=1) + class UdfWithGpus: + def __init__(self): + pass + + def __call__(self, data): + return [str(i) for i in range(len(data))] + + UdfWithGpus = UdfWithGpus.with_concurrency(1) + + df = daft.from_pydict({"x": [1, 2, 3, 4]}) + df = df.select(UdfWithGpus(df["x"])) + + with pytest.raises(RuntimeError): + df.collect() + finally: + daft.context.get_context()._runner = original_runner + + +@pytest.mark.skipif(get_context().runner_config.name != "py", reason="Test can only be run on PyRunner") +def test_stateful_udf_no_cuda_visible_device_envvar(enable_actor_pool, monkeypatch): + monkeypatch.setattr(daft.internal.gpu, "_raw_device_count_nvml", lambda: 1) + monkeypatch.delenv("CUDA_VISIBLE_DEVICES", raising=False) + + original_runner = daft.context.get_context()._runner + + try: + daft.context.get_context()._runner = None + + @udf(return_dtype=DataType.string(), num_gpus=1) + class UdfWithGpus: + def __init__(self): + pass + + def __call__(self, data): + return [str(i) for i in range(len(data))] + + UdfWithGpus = UdfWithGpus.with_concurrency(1) + + df = daft.from_pydict({"x": [1, 2, 3, 4]}) + df = df.select(UdfWithGpus(df["x"])) + + df.collect() + + UdfWithGpus2 = UdfWithGpus.with_concurrency(2) + + df = daft.from_pydict({"x": [1, 2, 3, 4]}) + df = df.select(UdfWithGpus2(df["x"])) + + with pytest.raises(RuntimeError): + df.collect() + finally: + daft.context.get_context()._runner = original_runner diff --git a/tests/actor_pool/test_pyactor_pool.py b/tests/actor_pool/test_pyactor_pool.py index f34d91bd7b..81f46cf86c 100644 --- a/tests/actor_pool/test_pyactor_pool.py +++ b/tests/actor_pool/test_pyactor_pool.py @@ -9,7 +9,7 @@ from daft.execution.execution_step import StatefulUDFProject from daft.expressions import ExpressionsProjection from daft.runners.partitioning import PartialPartitionMetadata -from daft.runners.pyrunner import PyActorPool, PyRunner +from daft.runners.pyrunner import AcquiredResources, PyActorPool, PyRunner from daft.table import MicroPartition @@ -25,7 +25,7 @@ def __call__(self, x): def test_pyactor_pool(): projection = ExpressionsProjection([MyStatefulUDF(daft.col("x"))]) - pool = PyActorPool("my-pool", 1, ResourceRequest(num_cpus=1), projection) + pool = PyActorPool("my-pool", 1, [AcquiredResources(num_cpus=1, gpus={}, memory_bytes=0)], projection) initial_partition = MicroPartition.from_pydict({"x": [1, 1, 1]}) ppm = PartialPartitionMetadata(num_rows=None, size_bytes=None) instr = StatefulUDFProject(projection=projection) @@ -62,14 +62,20 @@ def test_pyactor_pool(): @pytest.mark.skipif(get_context().runner_config.name != "py", reason="Test can only be run on PyRunner") def test_pyactor_pool_not_enough_resources(): + from copy import deepcopy + cpu_count = multiprocessing.cpu_count() projection = ExpressionsProjection([MyStatefulUDF(daft.col("x"))]) runner = get_context().runner() assert isinstance(runner, PyRunner) + original_resources = deepcopy(runner._resources.available_resources) + with pytest.raises(RuntimeError, match=f"Requested {float(cpu_count + 1)} CPUs but found only"): with runner.actor_pool_context( "my-pool", ResourceRequest(num_cpus=1), ResourceRequest(), cpu_count + 1, projection ) as _: pass + + assert runner._resources.available_resources == original_resources diff --git a/tests/sql/test_sql.py b/tests/sql/test_sql.py index 7df8a44e55..8b8cce43b5 100644 --- a/tests/sql/test_sql.py +++ b/tests/sql/test_sql.py @@ -152,10 +152,13 @@ def test_sql_count_star(): assert actual == expected -GLOBAL_DF = daft.from_pydict({"n": [1, 2, 3]}) +@pytest.fixture +def set_global_df(): + global GLOBAL_DF + GLOBAL_DF = daft.from_pydict({"n": [1, 2, 3]}) -def test_sql_function_sees_caller_tables(): +def test_sql_function_sees_caller_tables(set_global_df): # sees the globals df = daft.sql("SELECT * FROM GLOBAL_DF") assert df.collect().to_pydict() == GLOBAL_DF.collect().to_pydict() @@ -164,20 +167,20 @@ def test_sql_function_sees_caller_tables(): assert df.collect().to_pydict() == df_copy.collect().to_pydict() -def test_sql_function_locals_shadow_globals(): +def test_sql_function_locals_shadow_globals(set_global_df): GLOBAL_DF = None # noqa: F841 with pytest.raises(Exception, match="Table not found"): daft.sql("SELECT * FROM GLOBAL_DF") -def test_sql_function_globals_are_added_to_catalog(): +def test_sql_function_globals_are_added_to_catalog(set_global_df): df = daft.from_pydict({"n": [1], "x": [2]}) res = daft.sql("SELECT * FROM GLOBAL_DF g JOIN df d USING (n)", catalog=SQLCatalog({"df": df})) joined = GLOBAL_DF.join(df, on="n") assert res.collect().to_pydict() == joined.collect().to_pydict() -def test_sql_function_catalog_is_final(): +def test_sql_function_catalog_is_final(set_global_df): df = daft.from_pydict({"a": [1]}) # sanity check to ensure validity of below test assert df.collect().to_pydict() != GLOBAL_DF.collect().to_pydict() @@ -185,12 +188,12 @@ def test_sql_function_catalog_is_final(): assert res.collect().to_pydict() == df.collect().to_pydict() -def test_sql_function_register_globals(): +def test_sql_function_register_globals(set_global_df): with pytest.raises(Exception, match="Table not found"): daft.sql("SELECT * FROM GLOBAL_DF", SQLCatalog({}), register_globals=False) -def test_sql_function_requires_catalog_or_globals(): +def test_sql_function_requires_catalog_or_globals(set_global_df): with pytest.raises(Exception, match="Must supply a catalog"): daft.sql("SELECT * FROM GLOBAL_DF", register_globals=False) diff --git a/tests/test_resource_requests.py b/tests/test_resource_requests.py index ec867aada7..3cab797498 100644 --- a/tests/test_resource_requests.py +++ b/tests/test_resource_requests.py @@ -11,7 +11,7 @@ from daft.context import get_context, set_planning_config from daft.daft import SystemInfo from daft.expressions import col -from daft.internal.gpu import cuda_device_count +from daft.internal.gpu import cuda_visible_devices pytestmark = pytest.mark.skipif( context.get_context().daft_execution_config.enable_native_executor is True, @@ -20,7 +20,7 @@ def no_gpu_available() -> bool: - return cuda_device_count() == 0 + return len(cuda_visible_devices()) == 0 DATA = {"id": [i for i in range(100)]} @@ -100,7 +100,7 @@ def test_requesting_too_many_cpus(): def test_requesting_too_many_gpus(): df = daft.from_pydict(DATA) - my_udf_parametrized = my_udf.override_options(num_gpus=cuda_device_count() + 1) + my_udf_parametrized = my_udf.override_options(num_gpus=len(cuda_visible_devices()) + 1) df = df.with_column("foo", my_udf_parametrized(col("id"))) with pytest.raises(RuntimeError): @@ -270,15 +270,15 @@ def test_with_column_folded_rayrunner_class(enable_actor_pool): @udf(return_dtype=daft.DataType.int64(), num_gpus=1) def assert_num_cuda_visible_devices(c, num_gpus: int = 0): - cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + cuda_visible_devices_env = os.getenv("CUDA_VISIBLE_DEVICES") # Env var not set: program is free to use any number of GPUs - if cuda_visible_devices is None: - result = cuda_device_count() + if cuda_visible_devices_env is None: + result = len(cuda_visible_devices()) # Env var set to empty: program has no access to any GPUs - elif cuda_visible_devices == "": + elif cuda_visible_devices_env == "": result = 0 else: - result = len(cuda_visible_devices.split(",")) + result = len(cuda_visible_devices_env.split(",")) assert result == num_gpus return c @@ -291,7 +291,7 @@ def test_with_column_pyrunner_gpu(): # We set num_gpus=1 on the UDF itself df = df.with_column( "foo", - assert_num_cuda_visible_devices(col("id"), num_gpus=cuda_device_count()), + assert_num_cuda_visible_devices(col("id"), num_gpus=len(cuda_visible_devices())), ) df.collect() @@ -330,3 +330,27 @@ def test_with_column_max_resources_rayrunner_gpu(): ) df.collect() + + +def test_improper_num_gpus(): + with pytest.raises(ValueError, match="DaftError::ValueError"): + + @udf(return_dtype=daft.DataType.int64(), num_gpus=-1) + def foo(c): + return c + + with pytest.raises(ValueError, match="DaftError::ValueError"): + + @udf(return_dtype=daft.DataType.int64(), num_gpus=1.5) + def foo(c): + return c + + @udf(return_dtype=daft.DataType.int64()) + def foo(c): + return c + + with pytest.raises(ValueError, match="DaftError::ValueError"): + foo = foo.override_options(num_gpus=-1) + + with pytest.raises(ValueError, match="DaftError::ValueError"): + foo = foo.override_options(num_gpus=1.5)