From 990149a048477c494909649f6afa6f211a70227b Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Thu, 7 Nov 2024 20:05:21 -0800 Subject: [PATCH] [DOCS] Changing docs for UDF (#2880) 1. Improves UDF documentation by adding API pages for `StatefulUDF` and `StatelessUDF`, with lots of docstrings and examples ~2. Moves our `daft.udf` module to `daft.udfs` instead, which avoids a naming conflict with `daft.udf` which is our decorator. This might be a breaking change...~ (Reverted) --------- Co-authored-by: Jay Chia --- daft/dataframe/dataframe.py | 3 + daft/expressions/expressions.py | 8 +- daft/udf.py | 264 ++++++++++++++++---- docs/source/api_docs/udf.rst | 30 ++- tests/actor_pool/test_actor_cuda_devices.py | 2 +- tests/test_resource_requests.py | 36 +-- 6 files changed, 272 insertions(+), 71 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 52f0f7458e..15e74dddab 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -3083,10 +3083,13 @@ def map_groups(self, udf: Expression) -> "DataFrame": Example: >>> import daft, statistics + >>> >>> df = daft.from_pydict({"group": ["a", "a", "a", "b", "b", "b"], "data": [1, 20, 30, 4, 50, 600]}) + >>> >>> @daft.udf(return_dtype=daft.DataType.float64()) ... def std_dev(data): ... return [statistics.stdev(data.to_pylist())] + >>> >>> df = df.groupby("group").map_groups(std_dev(df["data"])) >>> df.show() ╭───────┬────────────────────╮ diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index 03b64b24c2..48ec445584 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -1001,7 +1001,7 @@ def apply(self, func: Callable, return_dtype: DataType) -> Expression: Returns: Expression: New expression after having run the function on the expression """ - from daft.udf import StatelessUDF + from daft.udf import CommonUDFArgs, StatelessUDF def batch_func(self_series): return [func(x) for x in self_series.to_pylist()] @@ -1015,8 +1015,10 @@ def batch_func(self_series): name=name, func=batch_func, return_dtype=return_dtype, - resource_request=None, - batch_size=None, + common_args=CommonUDFArgs( + resource_request=None, + batch_size=None, + ), )(self) def is_null(self) -> Expression: diff --git a/daft/udf.py b/daft/udf.py index c662dc6ced..fdc238b980 100644 --- a/daft/udf.py +++ b/daft/udf.py @@ -3,7 +3,6 @@ import dataclasses import functools import inspect -from abc import abstractmethod from typing import Any, Callable, Dict, Optional, Tuple, Union from daft.context import get_context @@ -170,13 +169,10 @@ def get_args_for_slice(start: int, end: int): @dataclasses.dataclass -class UDF: +class CommonUDFArgs: resource_request: ResourceRequest | None batch_size: int | None - @abstractmethod - def __call__(self, *args, **kwargs) -> Expression: ... - def override_options( self, *, @@ -184,33 +180,7 @@ def override_options( num_gpus: float | None = _UnsetMarker, memory_bytes: int | None = _UnsetMarker, batch_size: int | None = _UnsetMarker, - ) -> UDF: - """Replace the resource requests for running each instance of your UDF. - - For instance, if your UDF requires 4 CPUs to run, you can configure it like so: - - >>> import daft - >>> - >>> @daft.udf(return_dtype=daft.DataType.string()) - ... def example_stateless_udf(inputs): - ... # You will have access to 4 CPUs here if you configure your UDF correctly! - ... return inputs - >>> - >>> # Parametrize the UDF to run with 4 CPUs - >>> example_stateless_udf_4CPU = example_stateless_udf.override_options(num_cpus=4) - >>> - >>> df = daft.from_pydict({"foo": [1, 2, 3]}) - >>> df = df.with_column("bar", example_stateless_udf_4CPU(df["foo"])) - - Args: - num_cpus: Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your - machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time). - num_gpus: Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating - the appropriate GPU to each UDF using `CUDA_VISIBLE_DEVICES`. - memory_bytes: Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, - this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution. - batch_size: Enables batching of the input into batches of at most this size. Results between batches are concatenated. - """ + ) -> CommonUDFArgs: result = self # Any changes to resource request @@ -255,7 +225,10 @@ class PartialStatefulUDF: @dataclasses.dataclass -class StatelessUDF(UDF): +class StatelessUDF: + """A Stateless UDF is produced by calling `@udf` over a Python function""" + + common_args: CommonUDFArgs name: str func: UserProvidedPythonFunction return_dtype: DataType @@ -269,18 +242,53 @@ def __post_init__(self): functools.update_wrapper(self, self.func) def __call__(self, *args, **kwargs) -> Expression: - bound_args = BoundUDFArgs(self.bind_func(*args, **kwargs)) + """Call the UDF using some input Expressions, producing a new Expression that can be used by a DataFrame. + Args: + *args: Positional arguments to be passed to the UDF. These can be either Expressions or Python values. + **kwargs: Keyword arguments to be passed to the UDF. These can be either Expressions or Python values. + + Returns: + Expression: A new Expression representing the UDF call, which can be used in DataFrame operations. + + .. NOTE:: + When passing arguments to the UDF, you can use a mix of Expressions (e.g., df["column"]) and Python values. + Expressions will be evaluated for each row, while Python values will be passed as-is to the UDF. + + Example: + >>> import daft + >>> @daft.udf(return_dtype=daft.DataType.float64()) + ... def multiply_and_add(x: daft.Series, y: float, z: float): + ... return x.to_arrow().to_numpy() * y + z + >>> + >>> df = daft.from_pydict({"x": [1, 2, 3]}) + >>> df = df.with_column("result", multiply_and_add(df["x"], 2.0, z=1.5)) + >>> df.show() + ╭───────┬─────────╮ + │ x ┆ result │ + │ --- ┆ --- │ + │ Int64 ┆ Float64 │ + ╞═══════╪═════════╡ + │ 1 ┆ 3.5 │ + ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ + │ 2 ┆ 5.5 │ + ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ + │ 3 ┆ 7.5 │ + ╰───────┴─────────╯ + + (Showing first 3 of 3 rows) + """ + bound_args = BoundUDFArgs(self._bind_func(*args, **kwargs)) expressions = list(bound_args.expressions().values()) return Expression.stateless_udf( name=self.name, partial=PartialStatelessUDF(self.func, self.return_dtype, bound_args), expressions=expressions, return_dtype=self.return_dtype, - resource_request=self.resource_request, - batch_size=self.batch_size, + resource_request=self.common_args.resource_request, + batch_size=self.common_args.batch_size, ) - def bind_func(self, *args, **kwargs) -> inspect.BoundArguments: + def _bind_func(self, *args, **kwargs) -> inspect.BoundArguments: sig = inspect.signature(self.func) bound_args = sig.bind(*args, **kwargs) bound_args.apply_defaults() @@ -289,9 +297,86 @@ def bind_func(self, *args, **kwargs) -> inspect.BoundArguments: def __hash__(self) -> int: return hash((self.func, self.return_dtype)) + def override_options( + self, + *, + num_cpus: float | None = _UnsetMarker, + num_gpus: float | None = _UnsetMarker, + memory_bytes: int | None = _UnsetMarker, + batch_size: int | None = _UnsetMarker, + ) -> StatelessUDF: + """Replace the resource requests for running each instance of your UDF. + + For instance, if your UDF requires 4 CPUs to run, you can configure it like so: + + >>> import daft + >>> + >>> @daft.udf(return_dtype=daft.DataType.string()) + ... def example_stateless_udf(inputs): + ... # You will have access to 4 CPUs here if you configure your UDF correctly! + ... return inputs + >>> + >>> # Parametrize the UDF to run with 4 CPUs + >>> example_stateless_udf_4CPU = example_stateless_udf.override_options(num_cpus=4) + >>> + >>> df = daft.from_pydict({"foo": [1, 2, 3]}) + >>> df = df.with_column("bar", example_stateless_udf_4CPU(df["foo"])) + + Args: + num_cpus: Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your + machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time). + num_gpus: Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating + the appropriate GPU to each UDF using `CUDA_VISIBLE_DEVICES`. + memory_bytes: Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, + this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution. + batch_size: Enables batching of the input into batches of at most this size. Results between batches are concatenated. + """ + new_common_args = self.common_args.override_options( + num_cpus=num_cpus, num_gpus=num_gpus, memory_bytes=memory_bytes, batch_size=batch_size + ) + return dataclasses.replace(self, common_args=new_common_args) + @dataclasses.dataclass -class StatefulUDF(UDF): +class StatefulUDF: + """A StatefulUDF is produced by calling `@udf` over a Python class, allowing for maintaining state between calls: it can be further parametrized at runtime with custom concurrency, resources, and init args. + + Example of a Stateful UDF: + >>> import daft + >>> + >>> @daft.udf(return_dtype=daft.DataType.string()) + ... class MyStatefulUdf: + ... def __init__(self, prefix: str = "Goodbye"): + ... self.prefix = prefix + ... + ... def __call__(self, name: daft.Series) -> list: + ... return [f"{self.prefix}, {n}!" for n in name.to_pylist()] + >>> + >>> MyHelloStatefulUdf = MyStatefulUdf.with_init_args(prefix="Hello") + >>> + >>> df = daft.from_pydict({"name": ["Alice", "Bob", "Charlie"]}) + >>> df = df.with_column("greeting", MyHelloStatefulUdf(df["name"])) + >>> df.show() + ╭─────────┬─────────────────╮ + │ name ┆ greeting │ + │ --- ┆ --- │ + │ Utf8 ┆ Utf8 │ + ╞═════════╪═════════════════╡ + │ Alice ┆ Hello, Alice! │ + ├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ + │ Bob ┆ Hello, Bob! │ + ├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ + │ Charlie ┆ Hello, Charlie! │ + ╰─────────┴─────────────────╯ + + (Showing first 3 of 3 rows) + + The state (in this case, the prefix) is maintained across calls to the UDF. Most commonly, this state is + used for things such as ML models which should be downloaded and loaded into memory once for multiple + invocations. + """ + + common_args: CommonUDFArgs name: str cls: type return_dtype: DataType @@ -307,6 +392,46 @@ def __post_init__(self): functools.update_wrapper(self, self.cls) def __call__(self, *args, **kwargs) -> Expression: + """Call the UDF using some input Expressions, producing a new Expression that can be used by a DataFrame. + Args: + *args: Positional arguments to be passed to the UDF. These can be either Expressions or Python values. + **kwargs: Keyword arguments to be passed to the UDF. These can be either Expressions or Python values. + + Returns: + Expression: A new Expression representing the UDF call, which can be used in DataFrame operations. + + .. NOTE:: + When passing arguments to the UDF, you can use a mix of Expressions (e.g., df["column"]) and Python values. + Expressions will be evaluated for each row, while Python values will be passed as-is to the UDF. + + Example: + >>> import daft + >>> + >>> @daft.udf(return_dtype=daft.DataType.float64()) + ... class MultiplyAndAdd: + ... def __init__(self, multiplier: float = 2.0): + ... self.multiplier = multiplier + ... + ... def __call__(self, x: daft.Series, z: float) -> list: + ... return [val * self.multiplier + z for val in x.to_pylist()] + >>> + >>> df = daft.from_pydict({"x": [1, 2, 3]}) + >>> df = df.with_column("result", MultiplyAndAdd(df["x"], z=1.5)) + >>> df.show() + ╭───────┬─────────╮ + │ x ┆ result │ + │ --- ┆ --- │ + │ Int64 ┆ Float64 │ + ╞═══════╪═════════╡ + │ 1 ┆ 3.5 │ + ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ + │ 2 ┆ 5.5 │ + ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ + │ 3 ┆ 7.5 │ + ╰───────┴─────────╯ + + (Showing first 3 of 3 rows) + """ # Validate that the UDF has a concurrency set, if running with actor pool projections if get_context().daft_planning_config.enable_actor_pool_projections: if self.concurrency is None: @@ -330,16 +455,16 @@ def __call__(self, *args, **kwargs) -> Expression: "initialization arguments using `.with_init_args(...)`." ) - bound_args = BoundUDFArgs(self.bind_func(*args, **kwargs)) + bound_args = BoundUDFArgs(self._bind_func(*args, **kwargs)) expressions = list(bound_args.expressions().values()) return Expression.stateful_udf( name=self.name, partial=PartialStatefulUDF(self.cls, self.return_dtype, bound_args), expressions=expressions, return_dtype=self.return_dtype, - resource_request=self.resource_request, + resource_request=self.common_args.resource_request, init_args=self.init_args, - batch_size=self.batch_size, + batch_size=self.common_args.batch_size, concurrency=self.concurrency, ) @@ -364,7 +489,7 @@ def with_concurrency(self, concurrency: int) -> StatefulUDF: return dataclasses.replace(self, concurrency=concurrency) def with_init_args(self, *args, **kwargs) -> StatefulUDF: - """Replace initialization arguments for the Stateful UDF when calling __init__ at runtime + """Replace initialization arguments for the Stateful UDF when calling `__init__` at runtime on each instance of the UDF. Example: @@ -409,7 +534,46 @@ def with_init_args(self, *args, **kwargs) -> StatefulUDF: ) return dataclasses.replace(self, init_args=(args, kwargs)) - def bind_func(self, *args, **kwargs) -> inspect.BoundArguments: + def override_options( + self, + *, + num_cpus: float | None = _UnsetMarker, + num_gpus: float | None = _UnsetMarker, + memory_bytes: int | None = _UnsetMarker, + batch_size: int | None = _UnsetMarker, + ) -> StatefulUDF: + """Replace the resource requests for running each instance of your UDF. + + For instance, if your UDF requires 4 CPUs to run, you can configure it like so: + + >>> import daft + >>> + >>> @daft.udf(return_dtype=daft.DataType.string()) + ... def example_stateless_udf(inputs): + ... # You will have access to 4 CPUs here if you configure your UDF correctly! + ... return inputs + >>> + >>> # Parametrize the UDF to run with 4 CPUs + >>> example_stateless_udf_4CPU = example_stateless_udf.override_options(num_cpus=4) + >>> + >>> df = daft.from_pydict({"foo": [1, 2, 3]}) + >>> df = df.with_column("bar", example_stateless_udf_4CPU(df["foo"])) + + Args: + num_cpus: Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your + machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time). + num_gpus: Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating + the appropriate GPU to each UDF using `CUDA_VISIBLE_DEVICES`. + memory_bytes: Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, + this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution. + batch_size: Enables batching of the input into batches of at most this size. Results between batches are concatenated. + """ + new_common_args = self.common_args.override_options( + num_cpus=num_cpus, num_gpus=num_gpus, memory_bytes=memory_bytes, batch_size=batch_size + ) + return dataclasses.replace(self, common_args=new_common_args) + + def _bind_func(self, *args, **kwargs) -> inspect.BoundArguments: sig = inspect.signature(self.cls.__call__) bound_args = sig.bind( # Placeholder for `self` @@ -432,7 +596,7 @@ def udf( memory_bytes: int | None = None, batch_size: int | None = None, ) -> Callable[[UserProvidedPythonFunction | type], StatelessUDF | StatefulUDF]: - """Decorator to convert a Python function into a UDF + """`@udf` Decorator to convert a Python function/class into a `StatelessUDF` or `StatefulUDF` respectively UDFs allow users to run arbitrary Python code on the outputs of Expressions. @@ -566,16 +730,20 @@ def _udf(f: UserProvidedPythonFunction | type) -> StatelessUDF | StatefulUDF: name=name, cls=f, return_dtype=return_dtype, - resource_request=resource_request, - batch_size=batch_size, + common_args=CommonUDFArgs( + resource_request=resource_request, + batch_size=batch_size, + ), ) else: return StatelessUDF( name=name, func=f, return_dtype=return_dtype, - resource_request=resource_request, - batch_size=batch_size, + common_args=CommonUDFArgs( + resource_request=resource_request, + batch_size=batch_size, + ), ) return _udf diff --git a/docs/source/api_docs/udf.rst b/docs/source/api_docs/udf.rst index 21b7d4767b..1e095eaadc 100644 --- a/docs/source/api_docs/udf.rst +++ b/docs/source/api_docs/udf.rst @@ -1,4 +1,32 @@ User Defined Functions (UDFs) ============================= -.. autofunction:: daft.udf +User-Defined Functions (UDFs) are a mechanism to run Python code on the data that lives in a DataFrame. + +A UDF can be used just like :doc:`Expressions <../user_guide/expressions>`, allowing users to express computation that +should be executed by Daft lazily. + +To write a UDF, you should use the :func:`@udf ` decorator, which can decorate either a Python +function or a Python class, producing either a :class:`StatelessUDF ` or +:class:`StatefulUDF ` respectively. + +For more details, please consult the :doc:`UDF User Guide <../user_guide/udf>` + +.. currentmodule:: daft + +Creating UDFs +============= + +.. autofunction:: + udf + +Using UDFs +========== + +.. autoclass:: daft.udf.StatelessUDF + :members: + :special-members: __call__ + +.. autoclass:: daft.udf.StatefulUDF + :members: + :special-members: __call__ diff --git a/tests/actor_pool/test_actor_cuda_devices.py b/tests/actor_pool/test_actor_cuda_devices.py index 4da1f15fa1..ae676d30e9 100644 --- a/tests/actor_pool/test_actor_cuda_devices.py +++ b/tests/actor_pool/test_actor_cuda_devices.py @@ -7,10 +7,10 @@ import ray import daft +from daft import udf from daft.context import get_context, set_planning_config from daft.datatype import DataType from daft.internal.gpu import cuda_visible_devices -from daft.udf import udf pytestmark = pytest.mark.skipif( get_context().runner_config.name == "native", reason="Native runner does not support GPU tests yet" diff --git a/tests/test_resource_requests.py b/tests/test_resource_requests.py index 5f76a8ef24..708ee665dd 100644 --- a/tests/test_resource_requests.py +++ b/tests/test_resource_requests.py @@ -37,38 +37,38 @@ def my_udf(c): def test_partial_resource_request_overrides(): new_udf = my_udf.override_options(num_cpus=1.0) - assert new_udf.resource_request.num_cpus == 1.0 - assert new_udf.resource_request.num_gpus is None - assert new_udf.resource_request.memory_bytes is None + assert new_udf.common_args.resource_request.num_cpus == 1.0 + assert new_udf.common_args.resource_request.num_gpus is None + assert new_udf.common_args.resource_request.memory_bytes is None new_udf = new_udf.override_options(num_gpus=8.0) - assert new_udf.resource_request.num_cpus == 1.0 - assert new_udf.resource_request.num_gpus == 8.0 - assert new_udf.resource_request.memory_bytes is None + assert new_udf.common_args.resource_request.num_cpus == 1.0 + assert new_udf.common_args.resource_request.num_gpus == 8.0 + assert new_udf.common_args.resource_request.memory_bytes is None new_udf = new_udf.override_options(num_gpus=None) - assert new_udf.resource_request.num_cpus == 1.0 - assert new_udf.resource_request.num_gpus is None - assert new_udf.resource_request.memory_bytes is None + assert new_udf.common_args.resource_request.num_cpus == 1.0 + assert new_udf.common_args.resource_request.num_gpus is None + assert new_udf.common_args.resource_request.memory_bytes is None new_udf = new_udf.override_options(memory_bytes=100) - assert new_udf.resource_request.num_cpus == 1.0 - assert new_udf.resource_request.num_gpus is None - assert new_udf.resource_request.memory_bytes == 100 + assert new_udf.common_args.resource_request.num_cpus == 1.0 + assert new_udf.common_args.resource_request.num_gpus is None + assert new_udf.common_args.resource_request.memory_bytes == 100 def test_resource_request_pickle_roundtrip(): new_udf = my_udf.override_options(num_cpus=1.0) - assert new_udf.resource_request.num_cpus == 1.0 - assert new_udf.resource_request.num_gpus is None - assert new_udf.resource_request.memory_bytes is None + assert new_udf.common_args.resource_request.num_cpus == 1.0 + assert new_udf.common_args.resource_request.num_gpus is None + assert new_udf.common_args.resource_request.memory_bytes is None assert new_udf == copy.deepcopy(new_udf) new_udf = new_udf.override_options(num_gpus=8.0) - assert new_udf.resource_request.num_cpus == 1.0 - assert new_udf.resource_request.num_gpus == 8.0 - assert new_udf.resource_request.memory_bytes is None + assert new_udf.common_args.resource_request.num_cpus == 1.0 + assert new_udf.common_args.resource_request.num_gpus == 8.0 + assert new_udf.common_args.resource_request.memory_bytes is None assert new_udf == copy.deepcopy(new_udf)