diff --git a/tests/expressions/test_stateful_udf.py b/tests/expressions/test_stateful_udf.py new file mode 100644 index 0000000000..0a8c7e05d4 --- /dev/null +++ b/tests/expressions/test_stateful_udf.py @@ -0,0 +1,190 @@ +from __future__ import annotations + +import pytest + +import daft +from daft import ResourceRequest +from daft.context import get_actor_context, get_context, set_planning_config +from daft.datatype import DataType +from daft.internal.gpu import cuda_visible_devices +from daft.udf import udf + +pytestmark = pytest.mark.skipif( + get_context().daft_execution_config.enable_native_executor is True, + reason="Native executor fails for these tests", +) + + +@pytest.fixture(scope="module", autouse=True) +def enable_actor_pool(): + try: + original_config = get_context().daft_planning_config + + set_planning_config( + config=get_context().daft_planning_config.with_config_values(enable_actor_pool_projections=True) + ) + yield + finally: + set_planning_config(config=original_config) + + +@pytest.mark.parametrize("concurrency", [1, 2, 4]) +def test_stateful_udf_context_rank(concurrency): + @udf(return_dtype=DataType.int64()) + class GetRank: + def __init__(self): + actor_context = get_actor_context() + self._rank = actor_context.rank + + def __call__(self, data): + actor_context = get_actor_context() + + assert actor_context.rank == self._rank + + return [self._rank] * len(data) + + GetRank = GetRank.with_concurrency(concurrency) + + df = daft.from_pydict({"x": [1, 2, 3, 4]}) + df = df.into_partitions(4) + df = df.select(GetRank(df["x"])) + + result = df.to_pydict() + ranks = set(result["x"]) + for i in range(concurrency): + assert i in ranks, f"rank {i} not found in {ranks}" + + +@pytest.mark.parametrize("concurrency", [1, 2, 4]) +def test_stateful_udf_context_resource_request(concurrency): + @udf(return_dtype=DataType.int64(), num_cpus=1, memory_bytes=5_000_000) + class TestResourceRequest: + def __init__(self, resource_request: ResourceRequest): + self.resource_request = resource_request + + actor_context = get_actor_context() + assert actor_context.resource_request == self.resource_request + + def __call__(self, data): + actor_context = get_actor_context() + assert actor_context.resource_request == self.resource_request + + return data + + TestResourceRequest = TestResourceRequest.with_concurrency(concurrency) + TestResourceRequest = TestResourceRequest.with_init_args(ResourceRequest(num_cpus=1, memory_bytes=5_000_000)) + + df = daft.from_pydict({"x": [1, 2, 3, 4]}) + df = df.into_partitions(4) + df = df.select(TestResourceRequest(df["x"])) + + df.collect() + + +@pytest.mark.parametrize("concurrency", [1, 2]) +@pytest.mark.parametrize("num_gpus", [1, 2]) +def test_stateful_udf_cuda_env_var(concurrency, num_gpus): + if concurrency * num_gpus > len(cuda_visible_devices()): + pytest.skip("Not enough GPUs available") + + import os + + @udf(return_dtype=DataType.string(), num_gpus=num_gpus) + class GetCudaVisibleDevices: + def __init__(self): + self.cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"] + + def __call__(self, data): + assert os.environ["CUDA_VISIBLE_DEVICES"] == self.cuda_visible_devices + + return [self.cuda_visible_devices] * len(data) + + GetCudaVisibleDevices = GetCudaVisibleDevices.with_concurrency(concurrency) + + df = daft.from_pydict({"x": [1, 2, 3, 4]}) + df = df.into_partitions(4) + df = df.select(GetCudaVisibleDevices(df["x"])) + + result = df.to_pydict() + + unique_visible_devices = set(result["x"]) + assert len(unique_visible_devices) == concurrency + + all_devices = (",".join(cuda_visible_devices())).split(",") + assert len(all_devices) == concurrency * num_gpus + + +@pytest.mark.skipif(len(cuda_visible_devices()) < 2, reason="Not enough GPUs available") +def test_stateful_udf_cuda_env_var_filtered(): + import os + + os.environ["CUDA_VISIBLE_DEVICES"] = "1" + + @udf(return_dtype=DataType.string(), num_gpus=1) + class GetCudaVisibleDevices: + def __init__(self): + self.cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"] + + def __call__(self, data): + assert os.environ["CUDA_VISIBLE_DEVICES"] == self.cuda_visible_devices + + return [self.cuda_visible_devices] * len(data) + + GetCudaVisibleDevices = GetCudaVisibleDevices.with_concurrency(1) + + df = daft.from_pydict({"x": [1]}) + df = df.select(GetCudaVisibleDevices(df["x"])) + + result = df.to_pydict() + assert result == {"x": ["1"]} + + +@pytest.mark.skipif(len(cuda_visible_devices()) < 3, reason="Not enough GPUs available") +def test_stateful_udf_cuda_env_var_filtered_multiple_gpus(): + import os + + os.environ["CUDA_VISIBLE_DEVICES"] = "1,2" + + @udf(return_dtype=DataType.string(), num_gpus=1) + class GetCudaVisibleDevices: + def __init__(self): + self.cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"] + + def __call__(self, data): + assert os.environ["CUDA_VISIBLE_DEVICES"] == self.cuda_visible_devices + + return [self.cuda_visible_devices] * len(data) + + GetCudaVisibleDevices = GetCudaVisibleDevices.with_concurrency(1) + + df = daft.from_pydict({"x": [1]}) + df = df.select(GetCudaVisibleDevices(df["x"])) + + result = df.to_pydict() + assert result == {"x": ["1,2"]} or result == {"x": ["2,1"]} + + +@pytest.mark.skipif(len(cuda_visible_devices()) < 3, reason="Not enough GPUs available") +def test_stateful_udf_cuda_env_var_filtered_multiple_concurrency(): + import os + + os.environ["CUDA_VISIBLE_DEVICES"] = "1,2" + + @udf(return_dtype=DataType.string(), num_gpus=1) + class GetCudaVisibleDevices: + def __init__(self): + self.cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"] + + def __call__(self, data): + assert os.environ["CUDA_VISIBLE_DEVICES"] == self.cuda_visible_devices + + return [self.cuda_visible_devices] * len(data) + + GetCudaVisibleDevices = GetCudaVisibleDevices.with_concurrency(2) + + df = daft.from_pydict({"x": [1, 2]}) + df = df.into_partitions(2) + df = df.select(GetCudaVisibleDevices(df["x"])).sort("x") + + result = df.to_pydict() + assert result == {"x": ["1", "2"]} diff --git a/tests/test_resource_requests.py b/tests/test_resource_requests.py index ec867aada7..77cd7d8de0 100644 --- a/tests/test_resource_requests.py +++ b/tests/test_resource_requests.py @@ -11,7 +11,7 @@ from daft.context import get_context, set_planning_config from daft.daft import SystemInfo from daft.expressions import col -from daft.internal.gpu import cuda_device_count +from daft.internal.gpu import cuda_visible_devices pytestmark = pytest.mark.skipif( context.get_context().daft_execution_config.enable_native_executor is True, @@ -20,7 +20,7 @@ def no_gpu_available() -> bool: - return cuda_device_count() == 0 + return len(cuda_visible_devices()) == 0 DATA = {"id": [i for i in range(100)]} @@ -100,7 +100,7 @@ def test_requesting_too_many_cpus(): def test_requesting_too_many_gpus(): df = daft.from_pydict(DATA) - my_udf_parametrized = my_udf.override_options(num_gpus=cuda_device_count() + 1) + my_udf_parametrized = my_udf.override_options(num_gpus=len(cuda_visible_devices()) + 1) df = df.with_column("foo", my_udf_parametrized(col("id"))) with pytest.raises(RuntimeError): @@ -270,15 +270,15 @@ def test_with_column_folded_rayrunner_class(enable_actor_pool): @udf(return_dtype=daft.DataType.int64(), num_gpus=1) def assert_num_cuda_visible_devices(c, num_gpus: int = 0): - cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + cuda_visible_devices_env = os.getenv("CUDA_VISIBLE_DEVICES") # Env var not set: program is free to use any number of GPUs - if cuda_visible_devices is None: - result = cuda_device_count() + if cuda_visible_devices_env is None: + result = len(cuda_visible_devices()) # Env var set to empty: program has no access to any GPUs - elif cuda_visible_devices == "": + elif cuda_visible_devices_env == "": result = 0 else: - result = len(cuda_visible_devices.split(",")) + result = len(cuda_visible_devices_env.split(",")) assert result == num_gpus return c @@ -291,7 +291,7 @@ def test_with_column_pyrunner_gpu(): # We set num_gpus=1 on the UDF itself df = df.with_column( "foo", - assert_num_cuda_visible_devices(col("id"), num_gpus=cuda_device_count()), + assert_num_cuda_visible_devices(col("id"), num_gpus=len(cuda_visible_devices())), ) df.collect()