Skip to content

Commit

Permalink
[BUG] Add proper resources to Ray stateful UDF actor
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinzwang committed Oct 2, 2024
1 parent fe4553f commit 464a7a1
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 8 deletions.
1 change: 1 addition & 0 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def actor_pool_project(
with get_context().runner().actor_pool_context(
actor_pool_name,
actor_resource_request,
task_resource_request,
num_actors,
projection,
) as actor_pool_id:
Expand Down
15 changes: 11 additions & 4 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,20 +337,27 @@ def run_iter_tables(

@contextlib.contextmanager
def actor_pool_context(
self, name: str, resource_request: ResourceRequest, num_actors: int, projection: ExpressionsProjection
self,
name: str,
actor_resource_request: ResourceRequest,
_: ResourceRequest,
num_actors: int,
projection: ExpressionsProjection,
) -> Iterator[str]:
actor_pool_id = f"py_actor_pool-{name}"

total_resource_request = resource_request * num_actors
total_resource_request = actor_resource_request * num_actors
admitted = self._attempt_admit_task(total_resource_request)

if not admitted:
raise RuntimeError(
f"Not enough resources available to admit {num_actors} actors, each with resource request: {resource_request}"
f"Not enough resources available to admit {num_actors} actors, each with resource request: {actor_resource_request}"
)

try:
self._actor_pools[actor_pool_id] = PyActorPool(actor_pool_id, num_actors, resource_request, projection)
self._actor_pools[actor_pool_id] = PyActorPool(
actor_pool_id, num_actors, actor_resource_request, projection
)
self._actor_pools[actor_pool_id].setup()
logger.debug("Created actor pool %s with resources: %s", actor_pool_id, total_resource_request)
yield actor_pool_id
Expand Down
16 changes: 14 additions & 2 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,8 +986,12 @@ def __init__(
self._projection = projection

def setup(self) -> None:
ray_options = _get_ray_task_options(self._resource_request_per_actor)

self._actors = [
DaftRayActor.options(name=f"rank={rank}-{self._id}").remote(self._execution_config, self._projection) # type: ignore
DaftRayActor.options(name=f"rank={rank}-{self._id}", **ray_options).remote( # type: ignore
self._execution_config, self._projection
)
for rank in range(self._num_actors)
]

Expand Down Expand Up @@ -1155,8 +1159,16 @@ def run_iter_tables(

@contextlib.contextmanager
def actor_pool_context(
self, name: str, resource_request: ResourceRequest, num_actors: PartID, projection: ExpressionsProjection
self,
name: str,
actor_resource_request: ResourceRequest,
task_resource_request: ResourceRequest,
num_actors: PartID,
projection: ExpressionsProjection,
) -> Iterator[str]:
# Ray runs actor methods serially, so the resource request for an actor should be both the actor's resources and the task's resources
resource_request = actor_resource_request + task_resource_request

execution_config = get_context().daft_execution_config
if self.ray_client_mode:
try:
Expand Down
3 changes: 2 additions & 1 deletion daft/runners/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def run_iter_tables(
def actor_pool_context(
self,
name: str,
resource_request: ResourceRequest,
actor_resource_request: ResourceRequest,
task_resource_request: ResourceRequest,
num_actors: int,
projection: ExpressionsProjection,
) -> Iterator[str]:
Expand Down
4 changes: 3 additions & 1 deletion tests/actor_pool/test_pyactor_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,7 @@ def test_pyactor_pool_not_enough_resources():
assert isinstance(runner, PyRunner)

with pytest.raises(RuntimeError, match=f"Requested {float(cpu_count + 1)} CPUs but found only"):
with runner.actor_pool_context("my-pool", ResourceRequest(num_cpus=1), cpu_count + 1, projection) as _:
with runner.actor_pool_context(
"my-pool", ResourceRequest(num_cpus=1), ResourceRequest(), cpu_count + 1, projection
) as _:
pass

0 comments on commit 464a7a1

Please sign in to comment.