Skip to content

Commit

Permalink
Add metrics actor to record remote execution
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 26, 2024
1 parent c6ae8dc commit 582c0db
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 18 deletions.
81 changes: 70 additions & 11 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import contextlib
import dataclasses
import logging
import threading
import time
Expand Down Expand Up @@ -479,57 +480,75 @@ def build_partitions(
# Give the same function different names to aid in profiling data distribution.


@contextlib.contextmanager
def collect_task_metrics(job_id: str, task_id: str):
import time

metrics_actor = ray.get_actor(name=f"{METRICS_ACTOR_NAME}-{job_id}", namespace=METRICS_ACTOR_NAMESPACE)
metrics_actor.mark_task_start.remote(task_id, time.time())
yield
metrics_actor.mark_task_end.remote(task_id, time.time())


@ray_tracing.RayFunctionWrapper.wrap
@ray.remote
def single_partition_pipeline(
job_id: str,
task_id: str,
daft_execution_config: PyDaftExecutionConfig,
instruction_stack: list[Instruction],
partial_metadatas: list[PartitionMetadata],
*inputs: MicroPartition,
) -> list[list[PartitionMetadata] | MicroPartition]:
with execution_config_ctx(
config=daft_execution_config,
):
), collect_task_metrics(job_id, task_id):
return build_partitions(instruction_stack, partial_metadatas, *inputs)


@ray_tracing.RayFunctionWrapper.wrap
@ray.remote
def fanout_pipeline(
job_id: str,
task_id: str,
daft_execution_config: PyDaftExecutionConfig,
instruction_stack: list[Instruction],
partial_metadatas: list[PartitionMetadata],
*inputs: MicroPartition,
) -> list[list[PartitionMetadata] | MicroPartition]:
with execution_config_ctx(config=daft_execution_config):
with execution_config_ctx(config=daft_execution_config), collect_task_metrics(job_id, task_id):
return build_partitions(instruction_stack, partial_metadatas, *inputs)


@ray_tracing.RayFunctionWrapper.wrap
@ray.remote(scheduling_strategy="SPREAD")
def reduce_pipeline(
job_id: str,
task_id: str,
daft_execution_config: PyDaftExecutionConfig,
instruction_stack: list[Instruction],
partial_metadatas: list[PartitionMetadata],
inputs: list,
) -> list[list[PartitionMetadata] | MicroPartition]:
import ray

with execution_config_ctx(config=daft_execution_config):
with execution_config_ctx(config=daft_execution_config), collect_task_metrics(job_id, task_id):
return build_partitions(instruction_stack, partial_metadatas, *ray.get(inputs))


@ray_tracing.RayFunctionWrapper.wrap
@ray.remote(scheduling_strategy="SPREAD")
def reduce_and_fanout(
job_id: str,
task_id: str,
daft_execution_config: PyDaftExecutionConfig,
instruction_stack: list[Instruction],
partial_metadatas: list[PartitionMetadata],
inputs: list,
) -> list[list[PartitionMetadata] | MicroPartition]:
import ray

with execution_config_ctx(config=daft_execution_config):
with execution_config_ctx(config=daft_execution_config), collect_task_metrics(job_id, task_id):
return build_partitions(instruction_stack, partial_metadatas, *ray.get(inputs))


Expand Down Expand Up @@ -667,6 +686,14 @@ def _run_plan(
psets: dict[str, ray.ObjectRef],
result_uuid: str,
) -> None:
# Initialize a metrics actor for this execution.
# NOTE: This goes out of scope after _run_plan is completed, and should be garbage-collected
metrics_actor = MetricsActor.options( # type: ignore[attr-defined]
name=f"{METRICS_ACTOR_NAME}-{result_uuid}",
namespace=METRICS_ACTOR_NAMESPACE,
get_if_exists=True,
).remote()

# Get executable tasks from plan scheduler.
results_buffer_size = self.results_buffer_size_by_df[result_uuid]

Expand All @@ -693,10 +720,7 @@ def place_in_queue(item):
except Full:
pass

# TODO: Add metrics actor functionality
metrics_actor = None

with profiler(profile_filename), ray_tracing.ray_tracer(metrics_actor) as runner_tracer:
with profiler(profile_filename), ray_tracing.ray_tracer(result_uuid, metrics_actor) as runner_tracer:
ray_wrapper = ray_tracing.RayModuleWrapper(runner_tracer=runner_tracer)
raw_tasks = plan_scheduler.to_partition_tasks(
psets,
Expand Down Expand Up @@ -781,7 +805,9 @@ def place_in_queue(item):

for task in tasks_to_dispatch:
if task.actor_pool_id is None:
results = _build_partitions(daft_execution_config, task, runner_tracer=runner_tracer)
results = _build_partitions(
result_uuid, daft_execution_config, task, runner_tracer=runner_tracer
)
else:
actor_pool = self._actor_pools.get(task.actor_pool_id)
assert actor_pool is not None, "Ray actor pool must live for as long as the tasks."
Expand Down Expand Up @@ -882,6 +908,8 @@ def actor_pool_context(

SCHEDULER_ACTOR_NAME = "scheduler"
SCHEDULER_ACTOR_NAMESPACE = "daft"
METRICS_ACTOR_NAME = "metrics"
METRICS_ACTOR_NAMESPACE = "daft"


@ray.remote(num_cpus=1)
Expand All @@ -891,7 +919,38 @@ def __init__(self, *n, **kw) -> None:
self.reserved_cores = 1


@dataclasses.dataclass(frozen=True)
class TaskMetric:
task_id: str
start: float
end: float | None


@ray.remote(num_cpus=0)
class MetricsActor:
def __init__(self):
self.task_starts: dict[str, float] = {}
self.task_ends: dict[str, float] = {}

def mark_task_start(self, task_id: str, start: float):
self.task_starts[task_id] = start

def mark_task_end(self, task_id: str, end: float):
self.task_ends[task_id] = end

def collect(self) -> list[TaskMetric]:
return [
TaskMetric(
task_id=task_id,
start=self.task_starts[task_id],
end=self.task_ends.get(task_id),
)
for task_id in self.task_starts
]


def _build_partitions(
job_id: str,
daft_execution_config_objref: ray.ObjectRef,
task: PartitionTask[ray.ObjectRef],
runner_tracer: RunnerTracer,
Expand All @@ -910,7 +969,7 @@ def _build_partitions(
)
build_remote = build_remote.options(**ray_options).with_tracing(runner_tracer, task)
[metadatas_ref, *partitions] = build_remote.remote(
daft_execution_config_objref, task.instructions, task.partial_metadatas, task.inputs
job_id, task.id(), daft_execution_config_objref, task.instructions, task.partial_metadatas, task.inputs
)

else:
Expand All @@ -923,7 +982,7 @@ def _build_partitions(
ray_options["scheduling_strategy"] = "SPREAD"
build_remote = build_remote.options(**ray_options).with_tracing(runner_tracer, task)
[metadatas_ref, *partitions] = build_remote.remote(
daft_execution_config_objref, task.instructions, task.partial_metadatas, *task.inputs
job_id, task.id(), daft_execution_config_objref, task.instructions, task.partial_metadatas, *task.inputs
)

metadatas_accessor = PartitionMetadataAccessor(metadatas_ref)
Expand Down
63 changes: 56 additions & 7 deletions daft/runners/ray_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,76 @@


@contextlib.contextmanager
def ray_tracer(metrics_actor: ray.actor.ActorHandle | None):
def ray_tracer(job_id: str, metrics_actor: ray.actor.ActorHandle):
# Dump the RayRunner trace if we detect an active Ray session, otherwise we give up and do not write the trace
if pathlib.Path(DEFAULT_RAY_LOGS_LOCATION).exists():
trace_filename = (
f"trace_RayRunner." f"{datetime.replace(datetime.now(), second=0, microsecond=0).isoformat()[:-3]}.json"
f"trace_RayRunner.{job_id}.{datetime.replace(datetime.now(), microsecond=0).isoformat()[:-3]}.json"
)
daft_trace_location = pathlib.Path(DEFAULT_DAFT_TRACE_LOCATION)
daft_trace_location.mkdir(exist_ok=True, parents=True)
filepath = DEFAULT_DAFT_TRACE_LOCATION / trace_filename
else:
filepath = None

tracer_start = time.time()

if filepath is not None:
with open(filepath, "w") as f:
# Initialize the JSON file
f.write("[")

# Yield the tracer
runner_tracer = RunnerTracer(f, metrics_actor)
runner_tracer = RunnerTracer(f, tracer_start)
yield runner_tracer

# Retrieve metrics from the metrics actor
metrics = ray.get(metrics_actor.collect.remote())
for metric in metrics:
if metric.end is not None:
f.write(
json.dumps(
{
"id": metric.task_id,
"category": "task",
"name": "task_remote_execution",
"ph": "b",
"pid": 2,
"tid": 1,
"ts": (metric.start - tracer_start) * 1000 * 1000,
}
)
)
f.write(",\n")
f.write(
json.dumps(
{
"id": metric.task_id,
"category": "task",
"name": "task_remote_execution",
"ph": "e",
"pid": 2,
"tid": 1,
"ts": (metric.end - tracer_start) * 1000 * 1000,
}
)
)
f.write(",\n")
else:
f.write(
json.dumps(
{
"id": metric.task_id,
"category": "task",
"name": "task_remote_execution_start_no_end",
"ph": "n",
"pid": 2,
"tid": 1,
"ts": (metric.start - tracer_start) * 1000 * 1000,
}
)
)

# Add the final touches to the file
f.write(
json.dumps({"name": "process_name", "ph": "M", "pid": 1, "args": {"name": "RayRunner dispatch loop"}})
Expand All @@ -55,15 +104,14 @@ def ray_tracer(metrics_actor: ray.actor.ActorHandle | None):
f.write(json.dumps({"name": "process_name", "ph": "M", "pid": 2, "args": {"name": "Ray Task Execution"}}))
f.write("\n]")
else:
runner_tracer = RunnerTracer(None, metrics_actor)
runner_tracer = RunnerTracer(None, tracer_start)
yield runner_tracer


class RunnerTracer:
def __init__(self, file: TextIO | None, metrics_ray_actor: ray.actor.ActorHandle | None):
def __init__(self, file: TextIO | None, start: float):
self._file = file
self._start = time.time()
self._metrics_actor = metrics_ray_actor
self._start = start

def _write_event(self, event: dict[str, Any]):
if self._file is not None:
Expand Down Expand Up @@ -277,6 +325,7 @@ def task_created(self, task_id: str, stage_id: int, resource_request: ResourceRe
"name": "task_execution",
"ph": "b",
"args": {
"task_id": task_id,
"resource_request": {
"num_cpus": resource_request.num_cpus,
"num_gpus": resource_request.num_gpus,
Expand Down

0 comments on commit 582c0db

Please sign in to comment.