Skip to content

Commit

Permalink
Dump data to /logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 29, 2024
1 parent b8a53ce commit 2dc0e4c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
38 changes: 25 additions & 13 deletions daft/runners/ray_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
METRICS_ACTOR_NAMESPACE = "daft"


@dataclasses.dataclass(frozen=True)
@dataclasses.dataclass
class TaskMetric:
task_id: str
stage_id: int | None
Expand All @@ -29,17 +29,25 @@ class TaskMetric:
end: float | None


@dataclasses.dataclass
class ExecutionMetrics:
@dataclasses.dataclass(frozen=True)
class StartExecutionMetrics:
"""Holds the metrics for a given execution ID"""

task_metrics: dict[str, TaskMetric] = dataclasses.field(default_factory=lambda: {})


@dataclasses.dataclass
class EndExecutionMetrics:
"""Holds the metrics for a given execution ID for the ending of task executions"""

task_ends: dict[str, float] = dataclasses.field(default_factory=lambda: {})


@ray.remote(num_cpus=0)
class _MetricsActor:
def __init__(self):
self.execution_metrics: dict[str, ExecutionMetrics] = defaultdict(lambda: ExecutionMetrics())
self.start_execution_metrics: dict[str, StartExecutionMetrics] = defaultdict(lambda: StartExecutionMetrics())
self.end_execution_metrics: dict[str, EndExecutionMetrics] = defaultdict(lambda: EndExecutionMetrics())
self.execution_node_and_worker_ids: dict[str, dict[str, set[str]]] = defaultdict(
lambda: defaultdict(lambda: set())
)
Expand All @@ -52,7 +60,7 @@ def mark_task_start(
self.execution_node_and_worker_ids[execution_id][node_id_trunc].add(worker_id_trunc)

# Update task info
self.execution_metrics[execution_id].task_metrics[task_id] = TaskMetric(
self.start_execution_metrics[execution_id].task_metrics[task_id] = TaskMetric(
task_id=task_id,
stage_id=stage_id,
start=start,
Expand All @@ -62,19 +70,23 @@ def mark_task_start(
)

def mark_task_end(self, execution_id: str, task_id: str, end: float):
self.execution_metrics[execution_id].task_metrics[task_id] = dataclasses.replace(
self.execution_metrics[execution_id].task_metrics[task_id],
end=end,
)
self.end_execution_metrics[execution_id].task_ends[task_id] = end

def collect_metrics(self, execution_id: str) -> tuple[list[TaskMetric], dict[str, set[str]]]:
"""Collect the metrics associated with this execution, cleaning up the memory used for this execution ID"""
execution_metrics = self.execution_metrics[execution_id]
task_metrics = list(execution_metrics.task_metrics.values())
# Data about the available nodes and worker IDs in those nodes
node_data = self.execution_node_and_worker_ids[execution_id]

# Data about task
start_execution_metrics = self.start_execution_metrics[execution_id]
task_metrics = list(start_execution_metrics.task_metrics.values())
for tm in task_metrics:
if tm.task_id in self.end_execution_metrics[execution_id].task_ends:
tm.end = self.end_execution_metrics[execution_id].task_ends[tm.task_id]

# Clean up the stats for this execution
del self.execution_metrics[execution_id]
del self.start_execution_metrics[execution_id]
del self.end_execution_metrics[execution_id]
del self.execution_node_and_worker_ids[execution_id]

return task_metrics, node_data
Expand Down Expand Up @@ -131,7 +143,7 @@ def get_metrics_actor(execution_id: str) -> MetricsActorHandle:
"""Retrieves a handle to the Actor for a given job_id"""
with _metrics_actor_lock:
actor = _MetricsActor.options( # type: ignore[attr-defined]
name="METRICS_ACTOR_NAME",
name="daft_metrics_actor",
namespace=METRICS_ACTOR_NAMESPACE,
get_if_exists=True,
lifetime="detached",
Expand Down
2 changes: 1 addition & 1 deletion daft/runners/ray_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

# We add the trace by default to the latest session logs of the Ray Runner
# This lets us access our logs via the Ray dashboard when running Ray jobs
DEFAULT_RAY_LOGS_LOCATION = pathlib.Path("/tmp") / "ray" / "session_latest"
DEFAULT_RAY_LOGS_LOCATION = pathlib.Path("/tmp") / "ray" / "session_latest" / "logs"
DEFAULT_DAFT_TRACE_LOCATION = DEFAULT_RAY_LOGS_LOCATION / "daft"

# IDs and names for the visualized data
Expand Down

0 comments on commit 2dc0e4c

Please sign in to comment.