Skip to content

Commit

Permalink
Ok this is very close to what I want. Need to polish up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 27, 2024
1 parent a10aa80 commit c7d2f5b
Showing 1 changed file with 40 additions and 22 deletions.
62 changes: 40 additions & 22 deletions daft/runners/ray_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,7 @@ def ray_tracer(job_id: str):
for metric in metrics:
runner_tracer.write_task_metric(metric, parsed_task_node_locations.get(metric.task_id))

stage_id_to_start_end: dict[int, tuple[float, float]] = {}
for metric in metrics:
if metric.stage_id not in stage_id_to_start_end:
stage_id_to_start_end[metric.stage_id] = (metric.start, metric.end)
else:
old_start, old_end = stage_id_to_start_end[metric.stage_id]
stage_id_to_start_end[metric.stage_id] = (min(old_start, metric.start), max(old_end, metric.end))
runner_tracer.write_stages(stage_id_to_start_end)
runner_tracer.write_stages()

# Add the final touches to the file
f.write(
Expand All @@ -142,7 +135,7 @@ def ray_tracer(job_id: str):
"name": "process_name",
"ph": "M",
"pid": 2,
"args": {"name": "Tasks (Grouped by Stage ID)"},
"args": {"name": "Stages"},
"sort_index": 1,
}
)
Expand Down Expand Up @@ -173,26 +166,39 @@ def _write_event(self, event: dict[str, Any], ts: int | None = None) -> int:
self._file.write(",\n")
return ts

def write_stages(self, stages: dict[int, tuple[float, float]]):
for stage_id in stages:
start, end = stages[stage_id]
def write_stages(self):
for stage_id in self._stage_start_end:
start_ts, end_ts = self._stage_start_end[stage_id]
self._write_event(
{
"name": f"stage-{stage_id}",
"ph": "B",
"pid": 2,
"tid": stage_id,
},
ts=int((start - self._start) * 1000 * 1000),
ts=start_ts,
)

# Add a flow view here to point to the nodes
self._write_event(
{
"name": "stage-to-node-flow",
"id": stage_id,
"ph": "s",
"pid": 2,
"tid": stage_id,
},
ts=start_ts,
)

self._write_event(
{
"name": f"stage-{stage_id}",
"ph": "E",
"pid": 2,
"tid": stage_id,
},
ts=int((end - self._start) * 1000 * 1000),
ts=end_ts,
)

def write_task_metric(self, metric: ray_metrics.TaskMetric, node_id_worker_id: tuple[int, int] | None):
Expand All @@ -203,7 +209,7 @@ def write_task_metric(self, metric: ray_metrics.TaskMetric, node_id_worker_id: t
"category": "task",
"name": "task_remote_execution",
"ph": "b",
"pid": 2,
"pid": 1,
"tid": metric.stage_id,
},
ts=int((metric.start - self._start) * 1000 * 1000),
Expand All @@ -215,7 +221,7 @@ def write_task_metric(self, metric: ray_metrics.TaskMetric, node_id_worker_id: t
"category": "task",
"name": "task_remote_execution",
"ph": "e",
"pid": 2,
"pid": 1,
"tid": metric.stage_id,
},
ts=int((metric.end - self._start) * 1000 * 1000),
Expand All @@ -224,14 +230,26 @@ def write_task_metric(self, metric: ray_metrics.TaskMetric, node_id_worker_id: t
# Write to the node/worker view
if node_id_worker_id is not None:
pid, tid = node_id_worker_id
start_ts = int((metric.start - self._start) * 1000 * 1000)
self._write_event(
{
"name": "task_remote_execution",
"ph": "B",
"pid": pid,
"tid": tid,
},
ts=int((metric.start - self._start) * 1000 * 1000),
ts=start_ts,
)
self._write_event(
{
"name": "stage-to-node-flow",
"id": metric.stage_id,
"ph": "f",
"bp": "e", # enclosed, since the stage "encloses" the execution
"pid": pid,
"tid": tid,
},
ts=start_ts,
)
if metric.end is not None:
self._write_event(
Expand Down Expand Up @@ -490,7 +508,7 @@ def task_created(self, task_id: str, stage_id: int, resource_request: ResourceRe
"stage_id": stage_id,
"instructions": instructions,
},
"pid": 2,
"pid": 1,
"tid": 1,
}
)
Expand All @@ -505,7 +523,7 @@ def task_dispatched(self, task_id: str):
"category": "task",
"name": "task_dispatch",
"ph": "b",
"pid": 2,
"pid": 1,
"tid": 1,
}
)
Expand All @@ -517,7 +535,7 @@ def task_not_ready(self, task_id: str):
"category": "task",
"name": "task_awaited_not_ready",
"ph": "n",
"pid": 2,
"pid": 1,
"tid": 1,
}
)
Expand All @@ -529,7 +547,7 @@ def task_received_as_ready(self, task_id: str, stage_id: int):
"category": "task",
"name": "task_dispatch",
"ph": "e",
"pid": 2,
"pid": 1,
"tid": 1,
}
)
Expand All @@ -539,7 +557,7 @@ def task_received_as_ready(self, task_id: str, stage_id: int):
"category": "task",
"name": f"task_execution.stage-{stage_id}",
"ph": "e",
"pid": 2,
"pid": 1,
"tid": 1,
}
)
Expand Down

0 comments on commit c7d2f5b

Please sign in to comment.