Skip to content

Commit

Permalink
[PERF] Update number of cores on every iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 9, 2023
1 parent 553a911 commit 0342fd0
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,6 @@ def _run_plan(
# Get executable tasks from plan scheduler.
tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=True)

# Note: For autoscaling clusters, we will probably want to query cores dynamically.
# Keep in mind this call takes about 0.3ms.
cores = int(ray.cluster_resources()["CPU"]) - self.reserved_cores

max_inflight_tasks = cores + self.max_task_backlog

inflight_tasks: dict[str, PartitionTask[ray.ObjectRef]] = dict()
Expand All @@ -453,6 +449,9 @@ def _run_plan(
next_step = next(tasks)

while True: # Loop: Dispatch -> await.
# This call takes about 0.3ms and hits a locally in-memory cached record of cluster resources
cores: int = int(ray.cluster_resources()["CPU"]) - self.reserved_cores

while True: # Loop: Dispatch (get tasks -> batch dispatch).
tasks_to_dispatch: list[PartitionTask] = []

Expand Down

0 comments on commit 0342fd0

Please sign in to comment.