Skip to content

Commit

Permalink
Separate view of loop and async tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 27, 2024
1 parent 8e6c238 commit 106856e
Showing 1 changed file with 50 additions and 38 deletions.
88 changes: 50 additions & 38 deletions daft/runners/ray_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,46 +62,32 @@ def ray_tracer(job_id: str):
# Retrieve metrics from the metrics actor
metrics = ray.get(metrics_actor.collect.remote())
for metric in metrics:
f.write(
json.dumps(
{
"id": metric.task_id,
"category": "task",
"name": "task_remote_execution",
"ph": "b",
"pid": 2,
"tid": 1,
"ts": (metric.start - tracer_start) * 1000 * 1000,
}
)
)
f.write(",\n")
if metric.end is not None:
f.write(
json.dumps(
{
"id": metric.task_id,
"category": "task",
"name": "task_remote_execution",
"ph": "e",
"pid": 2,
"tid": 1,
"ts": (metric.end - tracer_start) * 1000 * 1000,
}
)
)
f.write(",\n")
runner_tracer.write_task_metric(metric)

# Add the final touches to the file
f.write(json.dumps({"name": "process_name", "ph": "M", "pid": 1, "args": {"name": "Scheduler"}}))
f.write(
json.dumps(
{"name": "process_name", "ph": "M", "pid": 1, "args": {"name": "Scheduler"}, "sort_index": 1}
)
)
f.write(",\n")
f.write(
json.dumps(
{"name": "thread_name", "ph": "M", "pid": 1, "tid": 1, "args": {"name": "_run_plan dispatch loop"}}
)
)
f.write(",\n")
f.write(json.dumps({"name": "process_name", "ph": "M", "pid": 2, "args": {"name": "Ray Tasks"}}))
f.write(
json.dumps(
{
"name": "process_name",
"ph": "M",
"pid": 2,
"args": {"name": "Tasks (Grouped by Stage ID)"},
"sort_index": 2,
}
)
)
f.write("\n]")
else:
runner_tracer = RunnerTracer(None, tracer_start)
Expand All @@ -113,18 +99,44 @@ def __init__(self, file: TextIO | None, start: float):
self._file = file
self._start = start

def _write_event(self, event: dict[str, Any]):
def _write_event(self, event: dict[str, Any], ts: int | None = None):
if self._file is not None:
ts = int((time.time() - self._start) * 1000 * 1000) if ts is None else ts
self._file.write(
json.dumps(
{
**event,
"ts": int((time.time() - self._start) * 1000 * 1000),
"ts": ts,
}
)
)
self._file.write(",\n")

def write_task_metric(self, metric: ray_metrics.TaskMetric):
self._write_event(
{
"id": metric.task_id,
"category": "task",
"name": "task_remote_execution",
"ph": "b",
"pid": 2,
"tid": 1,
},
ts=int((metric.start - self._start) * 1000 * 1000),
)
if metric.end is not None:
self._write_event(
{
"id": metric.task_id,
"category": "task",
"name": "task_remote_execution",
"ph": "e",
"pid": 2,
"tid": 1,
},
ts=int((metric.end - self._start) * 1000 * 1000),
)

@contextlib.contextmanager
def dispatch_wave(self, wave_num: int):
self._write_event(
Expand Down Expand Up @@ -371,7 +383,7 @@ def task_created(self, task_id: str, stage_id: int, resource_request: ResourceRe
"stage_id": stage_id,
"instructions": instructions,
},
"pid": 1,
"pid": 2,
"tid": 1,
}
)
Expand All @@ -383,7 +395,7 @@ def task_dispatched(self, task_id: str):
"category": "task",
"name": "task_dispatch",
"ph": "b",
"pid": 1,
"pid": 2,
"tid": 1,
}
)
Expand All @@ -395,7 +407,7 @@ def task_not_ready(self, task_id: str):
"category": "task",
"name": "task_awaited_not_ready",
"ph": "n",
"pid": 1,
"pid": 2,
"tid": 1,
}
)
Expand All @@ -407,7 +419,7 @@ def task_received_as_ready(self, task_id: str, stage_id: int):
"category": "task",
"name": "task_dispatch",
"ph": "e",
"pid": 1,
"pid": 2,
"tid": 1,
}
)
Expand All @@ -417,7 +429,7 @@ def task_received_as_ready(self, task_id: str, stage_id: int):
"category": "task",
"name": f"task_execution.stage-{stage_id}",
"ph": "e",
"pid": 1,
"pid": 2,
"tid": 1,
}
)
Expand Down

0 comments on commit 106856e

Please sign in to comment.