diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index fa1f09b0ee..ddc98521e2 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -844,7 +844,7 @@ def place_in_queue(item): if num_returns == 0: break - readies, _ = ray.wait( + readies, not_readies = ray.wait( list(inflight_ref_to_task.keys()), num_returns=num_returns, timeout=timeout, @@ -852,6 +852,10 @@ def place_in_queue(item): ) num_ready += len(readies) + for not_ready in not_readies: + if not_ready in inflight_ref_to_task: + runner_tracer.task_not_ready(inflight_ref_to_task[not_ready]) + for ready in readies: if ready in inflight_ref_to_task: task_id = inflight_ref_to_task[ready] diff --git a/daft/runners/tracer.py b/daft/runners/tracer.py index 935161caf5..70c9aa716d 100644 --- a/daft/runners/tracer.py +++ b/daft/runners/tracer.py @@ -282,6 +282,18 @@ def task_dispatched(self, task_id: str, stage_id: int, resource_request: Resourc } ) + def task_not_ready(self, task_id: str): + self._write_event( + { + "id": task_id, + "category": "task", + "name": "task_awaited_not_ready", + "ph": "n", + "pid": 2, + "tid": 1, + } + ) + def task_ready(self, task_id: str): self._write_event( {