diff --git a/daft/runners/ray_tracing.py b/daft/runners/ray_tracing.py index 2d4db4d32d..f56effdfa8 100644 --- a/daft/runners/ray_tracing.py +++ b/daft/runners/ray_tracing.py @@ -9,6 +9,7 @@ import contextlib import dataclasses import json +import logging import os import pathlib import time @@ -27,6 +28,7 @@ from daft import ResourceRequest from daft.execution.physical_plan import MaterializedPhysicalPlan +logger = logging.getLogger(__name__) # We add the trace by default to the latest session logs of the Ray Runner # This lets us access our logs via the Ray dashboard when running Ray jobs @@ -89,6 +91,16 @@ def write_header(self) -> None: if self.file is not None: self.file.write("[") + def write_metadata(self, event: dict[str, Any]) -> None: + """Write a metadata event to the trace file + + Args: + event: The metadata event to write + """ + if self.file is not None: + self.file.write(json.dumps(event)) + self.file.write(",\n") + def write_event(self, event: dict[str, Any], ts: int | None = None) -> int: """Write a single trace event to the file @@ -114,46 +126,8 @@ def write_event(self, event: dict[str, Any], ts: int | None = None) -> int: self.file.write(",\n") return ts - def write_footer(self, process_meta: list[tuple[int, str]], thread_meta: list[tuple[int, int, str]]) -> None: - """Writes metadata for the file, closing out the file with a footer. - - Args: - process_meta: Pass in custom names for PIDs as a list of (pid, name). - thread_meta: Pass in custom names for threads a a list of (pid, tid, name). - """ + def write_footer(self) -> None: if self.file is not None: - for pid, name in [ - (SCHEDULER_PID, "Scheduler"), - (STAGES_PID, "Stages"), - ] + process_meta: - self.file.write( - json.dumps( - { - "name": "process_name", - "ph": PHASE_METADATA, - "pid": pid, - "args": {"name": name}, - } - ) - ) - self.file.write(",\n") - - for pid, tid, name in [ - (SCHEDULER_PID, 1, "_run_plan dispatch loop"), - ] + thread_meta: - self.file.write( - json.dumps( - { - "name": "thread_name", - "ph": PHASE_METADATA, - "pid": pid, - "tid": tid, - "args": {"name": name}, - } - ) - ) - self.file.write(",\n") - # Remove the trailing comma self.file.seek(self.file.tell() - 2, os.SEEK_SET) self.file.truncate() @@ -172,26 +146,85 @@ def __init__(self, file: TextIO | None): self._writer.write_header() def _write_event(self, event: dict[str, Any], ts: int | None = None) -> int: - return self._writer.write_event(event, ts) + try: + return self._writer.write_event(event, ts) + except (json.JSONDecodeError, TypeError) as e: + logger.exception("Failed to serialize event to JSON: %s", e) + return ts or -1 + except OSError as e: + logger.exception("Failed to write trace event to file: %s", e) + return ts or -1 + + def _write_metadata(self, metadata_event: dict[str, Any]) -> None: + try: + return self._writer.write_metadata(metadata_event) + except (json.JSONDecodeError, TypeError) as e: + logger.exception("Failed to serialize metadata to JSON: %s", e) + return + except OSError as e: + logger.exception("Failed to write trace metadata to file: %s", e) + return def finalize(self, metrics_actor: ray_metrics.MetricsActorHandle) -> None: - # Retrieve metrics from the metrics actor and perform some post-processing + # Retrieve metrics from the metrics actor (blocking call) task_metrics, node_metrics = metrics_actor.collect_metrics() + + # Write out labels for the nodes and other traced events nodes_to_pid_mapping = {node_id: i + NODE_PIDS_START for i, node_id in enumerate(node_metrics)} nodes_workers_to_tid_mapping = { (node_id, worker_id): (pid, tid) for node_id, pid in nodes_to_pid_mapping.items() for tid, worker_id in enumerate(node_metrics[node_id]) } + self._write_process_and_thread_names( + [(pid, f"Node {node_id}") for node_id, pid in nodes_to_pid_mapping.items()], + [(pid, tid, f"Worker {worker_id}") for (_, worker_id), (pid, tid) in nodes_workers_to_tid_mapping.items()], + ) # Write out collected metrics for metric in task_metrics: self._write_task_metric(metric, nodes_workers_to_tid_mapping) self._write_stages() - self._writer.write_footer( - [(pid, f"Node {node_id}") for node_id, pid in nodes_to_pid_mapping.items()], - [(pid, tid, f"Worker {worker_id}") for (_, worker_id), (pid, tid) in nodes_workers_to_tid_mapping.items()], - ) + + # End the file with the appropriate footer + self._writer.write_footer() + + def _write_process_and_thread_names( + self, + process_meta: list[tuple[int, str]], + thread_meta: list[tuple[int, int, str]], + ): + """Writes metadata for the file + + Args: + process_meta: Pass in custom names for PIDs as a list of (pid, name). + thread_meta: Pass in custom names for threads a a list of (pid, tid, name). + """ + for pid, name in [ + (SCHEDULER_PID, "Scheduler"), + (STAGES_PID, "Stages"), + ] + process_meta: + self._write_metadata( + { + "name": "process_name", + "ph": PHASE_METADATA, + "pid": pid, + "args": {"name": name}, + } + ) + + for pid, tid, name in [ + (SCHEDULER_PID, 1, "_run_plan dispatch loop"), + ] + thread_meta: + self._write_metadata( + { + "name": "thread_name", + "ph": PHASE_METADATA, + "pid": pid, + "tid": tid, + "args": {"name": name}, + } + ) def _write_stages(self): for stage_id in self._stage_start_end: