Skip to content

Commit

Permalink
Add stage view
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 27, 2024
1 parent 9ea1737 commit a10aa80
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions daft/runners/ray_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ def ray_tracer(job_id: str):
for metric in metrics:
runner_tracer.write_task_metric(metric, parsed_task_node_locations.get(metric.task_id))

stage_id_to_start_end: dict[int, tuple[float, float]] = {}
for metric in metrics:
if metric.stage_id not in stage_id_to_start_end:
stage_id_to_start_end[metric.stage_id] = (metric.start, metric.end)
else:
old_start, old_end = stage_id_to_start_end[metric.stage_id]
stage_id_to_start_end[metric.stage_id] = (min(old_start, metric.start), max(old_end, metric.end))
runner_tracer.write_stages(stage_id_to_start_end)

# Add the final touches to the file
f.write(
json.dumps(
Expand Down Expand Up @@ -164,6 +173,28 @@ def _write_event(self, event: dict[str, Any], ts: int | None = None) -> int:
self._file.write(",\n")
return ts

def write_stages(self, stages: dict[int, tuple[float, float]]):
for stage_id in stages:
start, end = stages[stage_id]
self._write_event(
{
"name": f"stage-{stage_id}",
"ph": "B",
"pid": 2,
"tid": stage_id,
},
ts=int((start - self._start) * 1000 * 1000),
)
self._write_event(
{
"name": f"stage-{stage_id}",
"ph": "E",
"pid": 2,
"tid": stage_id,
},
ts=int((end - self._start) * 1000 * 1000),
)

def write_task_metric(self, metric: ray_metrics.TaskMetric, node_id_worker_id: tuple[int, int] | None):
# Write to the Async view (will group by the stage ID)
self._write_event(
Expand Down

0 comments on commit a10aa80

Please sign in to comment.