Skip to content

Commit

Permalink
Simplify metrics actor state
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 27, 2024
1 parent 8401121 commit 6252ae1
Showing 1 changed file with 8 additions and 11 deletions.
19 changes: 8 additions & 11 deletions daft/runners/ray_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ class TaskMetric:
class ExecutionMetrics:
"""Holds the metrics for a given execution ID"""

task_start_info: dict[str, TaskMetric] = dataclasses.field(default_factory=lambda: {})
task_ends: dict[str, float] = dataclasses.field(default_factory=lambda: {})
task_metrics: dict[str, TaskMetric] = dataclasses.field(default_factory=lambda: {})


@ray.remote(num_cpus=0)
Expand All @@ -53,7 +52,7 @@ def mark_task_start(
self.execution_node_and_worker_ids[execution_id][node_id_trunc].add(worker_id_trunc)

Check warning on line 52 in daft/runners/ray_metrics.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_metrics.py#L51-L52

Added lines #L51 - L52 were not covered by tests

# Update task info
self.execution_metrics[execution_id].task_start_info[task_id] = TaskMetric(
self.execution_metrics[execution_id].task_metrics[task_id] = TaskMetric(

Check warning on line 55 in daft/runners/ray_metrics.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_metrics.py#L55

Added line #L55 was not covered by tests
task_id=task_id,
stage_id=stage_id,
start=start,
Expand All @@ -63,24 +62,22 @@ def mark_task_start(
)

def mark_task_end(self, execution_id: str, task_id: str, end: float):
self.execution_metrics[execution_id].task_ends[task_id] = end
self.execution_metrics[execution_id].task_metrics[task_id] = dataclasses.replace(

Check warning on line 65 in daft/runners/ray_metrics.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_metrics.py#L65

Added line #L65 was not covered by tests
self.execution_metrics[execution_id].task_metrics[task_id],
end=end,
)

def collect_metrics(self, execution_id: str) -> tuple[list[TaskMetric], dict[str, set[str]]]:
"""Collect the metrics associated with this execution, cleaning up the memory used for this execution ID"""
execution_metrics = self.execution_metrics[execution_id]
data = [
dataclasses.replace(
execution_metrics.task_start_info[task_id], end=execution_metrics.task_ends.get(task_id)
)
for task_id in execution_metrics.task_start_info
]
task_metrics = list(execution_metrics.task_metrics.values())
node_data = self.execution_node_and_worker_ids[execution_id]

Check warning on line 74 in daft/runners/ray_metrics.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_metrics.py#L72-L74

Added lines #L72 - L74 were not covered by tests

# Clean up the stats for this execution
del self.execution_metrics[execution_id]
del self.execution_node_and_worker_ids[execution_id]

Check warning on line 78 in daft/runners/ray_metrics.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_metrics.py#L77-L78

Added lines #L77 - L78 were not covered by tests

return data, node_data
return task_metrics, node_data

Check warning on line 80 in daft/runners/ray_metrics.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_metrics.py#L80

Added line #L80 was not covered by tests


@dataclasses.dataclass(frozen=True)
Expand Down

0 comments on commit 6252ae1

Please sign in to comment.