Skip to content

Commit

Permalink
fix resource freeing when unable to acquire
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinzwang committed Oct 5, 2024
1 parent 6db6229 commit 821b8e1
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 0 deletions.
4 changes: 4 additions & 0 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ def actor_pool_context(
resources = [self._attempt_admit_task(actor_resource_request) for _ in range(num_actors)]

if any(r is None for r in resources):
for r in resources:
if r is not None:
self._release_resources(r)

raise RuntimeError(
f"Not enough resources available to admit {num_actors} actors, each with resource request: {actor_resource_request}"
)
Expand Down
File renamed without changes.
6 changes: 6 additions & 0 deletions tests/actor_pool/test_pyactor_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,20 @@ def test_pyactor_pool():

@pytest.mark.skipif(get_context().runner_config.name != "py", reason="Test can only be run on PyRunner")
def test_pyactor_pool_not_enough_resources():
from copy import deepcopy

cpu_count = multiprocessing.cpu_count()
projection = ExpressionsProjection([MyStatefulUDF(daft.col("x"))])

runner = get_context().runner()
assert isinstance(runner, PyRunner)

original_resources = deepcopy(runner._available_resources)

with pytest.raises(RuntimeError, match=f"Not enough resources available to admit {cpu_count + 1} actors"):
with runner.actor_pool_context(
"my-pool", ResourceRequest(num_cpus=1), ResourceRequest(), cpu_count + 1, projection
) as _:
pass

assert runner._available_resources == original_resources

0 comments on commit 821b8e1

Please sign in to comment.