diff --git a/daft/runners/ray_tracing.py b/daft/runners/ray_tracing.py index 3dc24936a8..9ae8e95ed4 100644 --- a/daft/runners/ray_tracing.py +++ b/daft/runners/ray_tracing.py @@ -114,6 +114,15 @@ def ray_tracer(job_id: str): for metric in metrics: runner_tracer.write_task_metric(metric, parsed_task_node_locations.get(metric.task_id)) + stage_id_to_start_end: dict[int, tuple[float, float]] = {} + for metric in metrics: + if metric.stage_id not in stage_id_to_start_end: + stage_id_to_start_end[metric.stage_id] = (metric.start, metric.end) + else: + old_start, old_end = stage_id_to_start_end[metric.stage_id] + stage_id_to_start_end[metric.stage_id] = (min(old_start, metric.start), max(old_end, metric.end)) + runner_tracer.write_stages(stage_id_to_start_end) + # Add the final touches to the file f.write( json.dumps( @@ -164,6 +173,28 @@ def _write_event(self, event: dict[str, Any], ts: int | None = None) -> int: self._file.write(",\n") return ts + def write_stages(self, stages: dict[int, tuple[float, float]]): + for stage_id in stages: + start, end = stages[stage_id] + self._write_event( + { + "name": f"stage-{stage_id}", + "ph": "B", + "pid": 2, + "tid": stage_id, + }, + ts=int((start - self._start) * 1000 * 1000), + ) + self._write_event( + { + "name": f"stage-{stage_id}", + "ph": "E", + "pid": 2, + "tid": stage_id, + }, + ts=int((end - self._start) * 1000 * 1000), + ) + def write_task_metric(self, metric: ray_metrics.TaskMetric, node_id_worker_id: tuple[int, int] | None): # Write to the Async view (will group by the stage ID) self._write_event(