Skip to content

Commit

Permalink
Add instant events for when an awaited task is not ready
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 25, 2024
1 parent 38f343c commit 69092df
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
6 changes: 5 additions & 1 deletion daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,14 +844,18 @@ def place_in_queue(item):
if num_returns == 0:
break

readies, _ = ray.wait(
readies, not_readies = ray.wait(
list(inflight_ref_to_task.keys()),
num_returns=num_returns,
timeout=timeout,
fetch_local=False,
)
num_ready += len(readies)

for not_ready in not_readies:
if not_ready in inflight_ref_to_task:
runner_tracer.task_not_ready(inflight_ref_to_task[not_ready])

for ready in readies:
if ready in inflight_ref_to_task:
task_id = inflight_ref_to_task[ready]
Expand Down
12 changes: 12 additions & 0 deletions daft/runners/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,18 @@ def task_dispatched(self, task_id: str, stage_id: int, resource_request: Resourc
}
)

def task_not_ready(self, task_id: str):
self._write_event(
{
"id": task_id,
"category": "task",
"name": "task_awaited_not_ready",
"ph": "n",
"pid": 2,
"tid": 1,
}
)

def task_ready(self, task_id: str):
self._write_event(
{
Expand Down

0 comments on commit 69092df

Please sign in to comment.