Skip to content

Commit

Permalink
Cleanup logic and make writes log exceptions instead of raising
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 27, 2024
1 parent 92074e1 commit 873785d
Showing 1 changed file with 78 additions and 45 deletions.
123 changes: 78 additions & 45 deletions daft/runners/ray_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import contextlib
import dataclasses
import json
import logging
import os
import pathlib
import time
Expand All @@ -27,6 +28,7 @@
from daft import ResourceRequest
from daft.execution.physical_plan import MaterializedPhysicalPlan

logger = logging.getLogger(__name__)

# We add the trace by default to the latest session logs of the Ray Runner
# This lets us access our logs via the Ray dashboard when running Ray jobs
Expand Down Expand Up @@ -89,6 +91,16 @@ def write_header(self) -> None:
if self.file is not None:
self.file.write("[")

def write_metadata(self, event: dict[str, Any]) -> None:
"""Write a metadata event to the trace file
Args:
event: The metadata event to write
"""
if self.file is not None:
self.file.write(json.dumps(event))
self.file.write(",\n")

def write_event(self, event: dict[str, Any], ts: int | None = None) -> int:
"""Write a single trace event to the file
Expand All @@ -114,46 +126,8 @@ def write_event(self, event: dict[str, Any], ts: int | None = None) -> int:
self.file.write(",\n")
return ts

def write_footer(self, process_meta: list[tuple[int, str]], thread_meta: list[tuple[int, int, str]]) -> None:
"""Writes metadata for the file, closing out the file with a footer.
Args:
process_meta: Pass in custom names for PIDs as a list of (pid, name).
thread_meta: Pass in custom names for threads a a list of (pid, tid, name).
"""
def write_footer(self) -> None:
if self.file is not None:
for pid, name in [
(SCHEDULER_PID, "Scheduler"),
(STAGES_PID, "Stages"),
] + process_meta:
self.file.write(
json.dumps(
{
"name": "process_name",
"ph": PHASE_METADATA,
"pid": pid,
"args": {"name": name},
}
)
)
self.file.write(",\n")

for pid, tid, name in [
(SCHEDULER_PID, 1, "_run_plan dispatch loop"),
] + thread_meta:
self.file.write(
json.dumps(
{
"name": "thread_name",
"ph": PHASE_METADATA,
"pid": pid,
"tid": tid,
"args": {"name": name},
}
)
)
self.file.write(",\n")

# Remove the trailing comma
self.file.seek(self.file.tell() - 2, os.SEEK_SET)
self.file.truncate()
Expand All @@ -172,26 +146,85 @@ def __init__(self, file: TextIO | None):
self._writer.write_header()

def _write_event(self, event: dict[str, Any], ts: int | None = None) -> int:
return self._writer.write_event(event, ts)
try:
return self._writer.write_event(event, ts)
except (json.JSONDecodeError, TypeError) as e:
logger.exception("Failed to serialize event to JSON: %s", e)
return ts or -1
except OSError as e:
logger.exception("Failed to write trace event to file: %s", e)
return ts or -1

def _write_metadata(self, metadata_event: dict[str, Any]) -> None:
try:
return self._writer.write_metadata(metadata_event)
except (json.JSONDecodeError, TypeError) as e:
logger.exception("Failed to serialize metadata to JSON: %s", e)
return
except OSError as e:
logger.exception("Failed to write trace metadata to file: %s", e)
return

def finalize(self, metrics_actor: ray_metrics.MetricsActorHandle) -> None:
# Retrieve metrics from the metrics actor and perform some post-processing
# Retrieve metrics from the metrics actor (blocking call)
task_metrics, node_metrics = metrics_actor.collect_metrics()

# Write out labels for the nodes and other traced events
nodes_to_pid_mapping = {node_id: i + NODE_PIDS_START for i, node_id in enumerate(node_metrics)}
nodes_workers_to_tid_mapping = {
(node_id, worker_id): (pid, tid)
for node_id, pid in nodes_to_pid_mapping.items()
for tid, worker_id in enumerate(node_metrics[node_id])
}
self._write_process_and_thread_names(
[(pid, f"Node {node_id}") for node_id, pid in nodes_to_pid_mapping.items()],
[(pid, tid, f"Worker {worker_id}") for (_, worker_id), (pid, tid) in nodes_workers_to_tid_mapping.items()],
)

# Write out collected metrics
for metric in task_metrics:
self._write_task_metric(metric, nodes_workers_to_tid_mapping)
self._write_stages()
self._writer.write_footer(
[(pid, f"Node {node_id}") for node_id, pid in nodes_to_pid_mapping.items()],
[(pid, tid, f"Worker {worker_id}") for (_, worker_id), (pid, tid) in nodes_workers_to_tid_mapping.items()],
)

# End the file with the appropriate footer
self._writer.write_footer()

def _write_process_and_thread_names(
self,
process_meta: list[tuple[int, str]],
thread_meta: list[tuple[int, int, str]],
):
"""Writes metadata for the file
Args:
process_meta: Pass in custom names for PIDs as a list of (pid, name).
thread_meta: Pass in custom names for threads a a list of (pid, tid, name).
"""
for pid, name in [
(SCHEDULER_PID, "Scheduler"),
(STAGES_PID, "Stages"),
] + process_meta:
self._write_metadata(
{
"name": "process_name",
"ph": PHASE_METADATA,
"pid": pid,
"args": {"name": name},
}
)

for pid, tid, name in [
(SCHEDULER_PID, 1, "_run_plan dispatch loop"),
] + thread_meta:
self._write_metadata(
{
"name": "thread_name",
"ph": PHASE_METADATA,
"pid": pid,
"tid": tid,
"args": {"name": name},
}
)

def _write_stages(self):
for stage_id in self._stage_start_end:
Expand Down

0 comments on commit 873785d

Please sign in to comment.