From 106856ef5417387b723345a2fa5c6a521d1c4ea2 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Sat, 26 Oct 2024 18:26:54 -0700 Subject: [PATCH] Separate view of loop and async tasks --- daft/runners/ray_tracing.py | 88 +++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 38 deletions(-) diff --git a/daft/runners/ray_tracing.py b/daft/runners/ray_tracing.py index d6a673cc20..2aca7ab533 100644 --- a/daft/runners/ray_tracing.py +++ b/daft/runners/ray_tracing.py @@ -62,38 +62,14 @@ def ray_tracer(job_id: str): # Retrieve metrics from the metrics actor metrics = ray.get(metrics_actor.collect.remote()) for metric in metrics: - f.write( - json.dumps( - { - "id": metric.task_id, - "category": "task", - "name": "task_remote_execution", - "ph": "b", - "pid": 2, - "tid": 1, - "ts": (metric.start - tracer_start) * 1000 * 1000, - } - ) - ) - f.write(",\n") - if metric.end is not None: - f.write( - json.dumps( - { - "id": metric.task_id, - "category": "task", - "name": "task_remote_execution", - "ph": "e", - "pid": 2, - "tid": 1, - "ts": (metric.end - tracer_start) * 1000 * 1000, - } - ) - ) - f.write(",\n") + runner_tracer.write_task_metric(metric) # Add the final touches to the file - f.write(json.dumps({"name": "process_name", "ph": "M", "pid": 1, "args": {"name": "Scheduler"}})) + f.write( + json.dumps( + {"name": "process_name", "ph": "M", "pid": 1, "args": {"name": "Scheduler"}, "sort_index": 1} + ) + ) f.write(",\n") f.write( json.dumps( @@ -101,7 +77,17 @@ def ray_tracer(job_id: str): ) ) f.write(",\n") - f.write(json.dumps({"name": "process_name", "ph": "M", "pid": 2, "args": {"name": "Ray Tasks"}})) + f.write( + json.dumps( + { + "name": "process_name", + "ph": "M", + "pid": 2, + "args": {"name": "Tasks (Grouped by Stage ID)"}, + "sort_index": 2, + } + ) + ) f.write("\n]") else: runner_tracer = RunnerTracer(None, tracer_start) @@ -113,18 +99,44 @@ def __init__(self, file: TextIO | None, start: float): self._file = file self._start = start - def _write_event(self, event: dict[str, Any]): + def _write_event(self, event: dict[str, Any], ts: int | None = None): if self._file is not None: + ts = int((time.time() - self._start) * 1000 * 1000) if ts is None else ts self._file.write( json.dumps( { **event, - "ts": int((time.time() - self._start) * 1000 * 1000), + "ts": ts, } ) ) self._file.write(",\n") + def write_task_metric(self, metric: ray_metrics.TaskMetric): + self._write_event( + { + "id": metric.task_id, + "category": "task", + "name": "task_remote_execution", + "ph": "b", + "pid": 2, + "tid": 1, + }, + ts=int((metric.start - self._start) * 1000 * 1000), + ) + if metric.end is not None: + self._write_event( + { + "id": metric.task_id, + "category": "task", + "name": "task_remote_execution", + "ph": "e", + "pid": 2, + "tid": 1, + }, + ts=int((metric.end - self._start) * 1000 * 1000), + ) + @contextlib.contextmanager def dispatch_wave(self, wave_num: int): self._write_event( @@ -371,7 +383,7 @@ def task_created(self, task_id: str, stage_id: int, resource_request: ResourceRe "stage_id": stage_id, "instructions": instructions, }, - "pid": 1, + "pid": 2, "tid": 1, } ) @@ -383,7 +395,7 @@ def task_dispatched(self, task_id: str): "category": "task", "name": "task_dispatch", "ph": "b", - "pid": 1, + "pid": 2, "tid": 1, } ) @@ -395,7 +407,7 @@ def task_not_ready(self, task_id: str): "category": "task", "name": "task_awaited_not_ready", "ph": "n", - "pid": 1, + "pid": 2, "tid": 1, } ) @@ -407,7 +419,7 @@ def task_received_as_ready(self, task_id: str, stage_id: int): "category": "task", "name": "task_dispatch", "ph": "e", - "pid": 1, + "pid": 2, "tid": 1, } ) @@ -417,7 +429,7 @@ def task_received_as_ready(self, task_id: str, stage_id: int): "category": "task", "name": f"task_execution.stage-{stage_id}", "ph": "e", - "pid": 1, + "pid": 2, "tid": 1, } )