Skip to content

Commit

Permalink
bug fix when cluster has no workers
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Dec 18, 2023
1 parent e4e8528 commit a64b40c
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,14 +397,14 @@ def _ray_num_cpus_provider(ttl_seconds: int = 1) -> Generator[int, None, None]:
>>> next(p)
"""
last_checked_time = time.time()
last_num_cpus_queried = int(ray.cluster_resources()["CPU"])
last_num_cpus_queried = int(ray.cluster_resources().get("CPU", 0))
while True:
currtime = time.time()
if currtime - last_checked_time < ttl_seconds:
yield last_num_cpus_queried
else:
last_checked_time = currtime
last_num_cpus_queried = int(ray.cluster_resources()["CPU"])
last_num_cpus_queried = int(ray.cluster_resources().get("CPU", 0))
yield last_num_cpus_queried


Expand Down Expand Up @@ -526,7 +526,7 @@ def place_in_queue(item):
while is_active(): # Loop: Dispatch (get tasks -> batch dispatch).
tasks_to_dispatch: list[PartitionTask] = []

cores: int = next(num_cpus_provider) - self.reserved_cores
cores: int = max(next(num_cpus_provider) - self.reserved_cores, 0)
max_inflight_tasks = cores + self.max_task_backlog
dispatches_allowed = max_inflight_tasks - len(inflight_tasks)
dispatches_allowed = min(cores, dispatches_allowed)
Expand Down

0 comments on commit a64b40c

Please sign in to comment.