diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index b2b6ba9d9d..f44a50eb07 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -1,6 +1,7 @@ from __future__ import annotations import contextlib +import dataclasses import logging import threading import time @@ -479,9 +480,21 @@ def build_partitions( # Give the same function different names to aid in profiling data distribution. +@contextlib.contextmanager +def collect_task_metrics(job_id: str, task_id: str): + import time + + metrics_actor = ray.get_actor(name=f"{METRICS_ACTOR_NAME}-{job_id}", namespace=METRICS_ACTOR_NAMESPACE) + metrics_actor.mark_task_start.remote(task_id, time.time()) + yield + metrics_actor.mark_task_end.remote(task_id, time.time()) + + @ray_tracing.RayFunctionWrapper.wrap @ray.remote def single_partition_pipeline( + job_id: str, + task_id: str, daft_execution_config: PyDaftExecutionConfig, instruction_stack: list[Instruction], partial_metadatas: list[PartitionMetadata], @@ -489,25 +502,29 @@ def single_partition_pipeline( ) -> list[list[PartitionMetadata] | MicroPartition]: with execution_config_ctx( config=daft_execution_config, - ): + ), collect_task_metrics(job_id, task_id): return build_partitions(instruction_stack, partial_metadatas, *inputs) @ray_tracing.RayFunctionWrapper.wrap @ray.remote def fanout_pipeline( + job_id: str, + task_id: str, daft_execution_config: PyDaftExecutionConfig, instruction_stack: list[Instruction], partial_metadatas: list[PartitionMetadata], *inputs: MicroPartition, ) -> list[list[PartitionMetadata] | MicroPartition]: - with execution_config_ctx(config=daft_execution_config): + with execution_config_ctx(config=daft_execution_config), collect_task_metrics(job_id, task_id): return build_partitions(instruction_stack, partial_metadatas, *inputs) @ray_tracing.RayFunctionWrapper.wrap @ray.remote(scheduling_strategy="SPREAD") def reduce_pipeline( + job_id: str, + task_id: str, daft_execution_config: PyDaftExecutionConfig, instruction_stack: list[Instruction], partial_metadatas: list[PartitionMetadata], @@ -515,13 +532,15 @@ def reduce_pipeline( ) -> list[list[PartitionMetadata] | MicroPartition]: import ray - with execution_config_ctx(config=daft_execution_config): + with execution_config_ctx(config=daft_execution_config), collect_task_metrics(job_id, task_id): return build_partitions(instruction_stack, partial_metadatas, *ray.get(inputs)) @ray_tracing.RayFunctionWrapper.wrap @ray.remote(scheduling_strategy="SPREAD") def reduce_and_fanout( + job_id: str, + task_id: str, daft_execution_config: PyDaftExecutionConfig, instruction_stack: list[Instruction], partial_metadatas: list[PartitionMetadata], @@ -529,7 +548,7 @@ def reduce_and_fanout( ) -> list[list[PartitionMetadata] | MicroPartition]: import ray - with execution_config_ctx(config=daft_execution_config): + with execution_config_ctx(config=daft_execution_config), collect_task_metrics(job_id, task_id): return build_partitions(instruction_stack, partial_metadatas, *ray.get(inputs)) @@ -667,6 +686,14 @@ def _run_plan( psets: dict[str, ray.ObjectRef], result_uuid: str, ) -> None: + # Initialize a metrics actor for this execution. + # NOTE: This goes out of scope after _run_plan is completed, and should be garbage-collected + metrics_actor = MetricsActor.options( # type: ignore[attr-defined] + name=f"{METRICS_ACTOR_NAME}-{result_uuid}", + namespace=METRICS_ACTOR_NAMESPACE, + get_if_exists=True, + ).remote() + # Get executable tasks from plan scheduler. results_buffer_size = self.results_buffer_size_by_df[result_uuid] @@ -693,10 +720,7 @@ def place_in_queue(item): except Full: pass - # TODO: Add metrics actor functionality - metrics_actor = None - - with profiler(profile_filename), ray_tracing.ray_tracer(metrics_actor) as runner_tracer: + with profiler(profile_filename), ray_tracing.ray_tracer(result_uuid, metrics_actor) as runner_tracer: ray_wrapper = ray_tracing.RayModuleWrapper(runner_tracer=runner_tracer) raw_tasks = plan_scheduler.to_partition_tasks( psets, @@ -781,7 +805,9 @@ def place_in_queue(item): for task in tasks_to_dispatch: if task.actor_pool_id is None: - results = _build_partitions(daft_execution_config, task, runner_tracer=runner_tracer) + results = _build_partitions( + result_uuid, daft_execution_config, task, runner_tracer=runner_tracer + ) else: actor_pool = self._actor_pools.get(task.actor_pool_id) assert actor_pool is not None, "Ray actor pool must live for as long as the tasks." @@ -882,6 +908,8 @@ def actor_pool_context( SCHEDULER_ACTOR_NAME = "scheduler" SCHEDULER_ACTOR_NAMESPACE = "daft" +METRICS_ACTOR_NAME = "metrics" +METRICS_ACTOR_NAMESPACE = "daft" @ray.remote(num_cpus=1) @@ -891,7 +919,38 @@ def __init__(self, *n, **kw) -> None: self.reserved_cores = 1 +@dataclasses.dataclass(frozen=True) +class TaskMetric: + task_id: str + start: float + end: float | None + + +@ray.remote(num_cpus=0) +class MetricsActor: + def __init__(self): + self.task_starts: dict[str, float] = {} + self.task_ends: dict[str, float] = {} + + def mark_task_start(self, task_id: str, start: float): + self.task_starts[task_id] = start + + def mark_task_end(self, task_id: str, end: float): + self.task_ends[task_id] = end + + def collect(self) -> list[TaskMetric]: + return [ + TaskMetric( + task_id=task_id, + start=self.task_starts[task_id], + end=self.task_ends.get(task_id), + ) + for task_id in self.task_starts + ] + + def _build_partitions( + job_id: str, daft_execution_config_objref: ray.ObjectRef, task: PartitionTask[ray.ObjectRef], runner_tracer: RunnerTracer, @@ -910,7 +969,7 @@ def _build_partitions( ) build_remote = build_remote.options(**ray_options).with_tracing(runner_tracer, task) [metadatas_ref, *partitions] = build_remote.remote( - daft_execution_config_objref, task.instructions, task.partial_metadatas, task.inputs + job_id, task.id(), daft_execution_config_objref, task.instructions, task.partial_metadatas, task.inputs ) else: @@ -923,7 +982,7 @@ def _build_partitions( ray_options["scheduling_strategy"] = "SPREAD" build_remote = build_remote.options(**ray_options).with_tracing(runner_tracer, task) [metadatas_ref, *partitions] = build_remote.remote( - daft_execution_config_objref, task.instructions, task.partial_metadatas, *task.inputs + job_id, task.id(), daft_execution_config_objref, task.instructions, task.partial_metadatas, *task.inputs ) metadatas_accessor = PartitionMetadataAccessor(metadatas_ref) diff --git a/daft/runners/ray_tracing.py b/daft/runners/ray_tracing.py index a769ce63f6..a004dc1191 100644 --- a/daft/runners/ray_tracing.py +++ b/daft/runners/ray_tracing.py @@ -26,11 +26,11 @@ @contextlib.contextmanager -def ray_tracer(metrics_actor: ray.actor.ActorHandle | None): +def ray_tracer(job_id: str, metrics_actor: ray.actor.ActorHandle): # Dump the RayRunner trace if we detect an active Ray session, otherwise we give up and do not write the trace if pathlib.Path(DEFAULT_RAY_LOGS_LOCATION).exists(): trace_filename = ( - f"trace_RayRunner." f"{datetime.replace(datetime.now(), second=0, microsecond=0).isoformat()[:-3]}.json" + f"trace_RayRunner.{job_id}.{datetime.replace(datetime.now(), microsecond=0).isoformat()[:-3]}.json" ) daft_trace_location = pathlib.Path(DEFAULT_DAFT_TRACE_LOCATION) daft_trace_location.mkdir(exist_ok=True, parents=True) @@ -38,15 +38,64 @@ def ray_tracer(metrics_actor: ray.actor.ActorHandle | None): else: filepath = None + tracer_start = time.time() + if filepath is not None: with open(filepath, "w") as f: # Initialize the JSON file f.write("[") # Yield the tracer - runner_tracer = RunnerTracer(f, metrics_actor) + runner_tracer = RunnerTracer(f, tracer_start) yield runner_tracer + # Retrieve metrics from the metrics actor + metrics = ray.get(metrics_actor.collect.remote()) + for metric in metrics: + if metric.end is not None: + f.write( + json.dumps( + { + "id": metric.task_id, + "category": "task", + "name": "task_remote_execution", + "ph": "b", + "pid": 2, + "tid": 1, + "ts": (metric.start - tracer_start) * 1000 * 1000, + } + ) + ) + f.write(",\n") + f.write( + json.dumps( + { + "id": metric.task_id, + "category": "task", + "name": "task_remote_execution", + "ph": "e", + "pid": 2, + "tid": 1, + "ts": (metric.end - tracer_start) * 1000 * 1000, + } + ) + ) + f.write(",\n") + else: + f.write( + json.dumps( + { + "id": metric.task_id, + "category": "task", + "name": "task_remote_execution_start_no_end", + "ph": "n", + "pid": 2, + "tid": 1, + "ts": (metric.start - tracer_start) * 1000 * 1000, + } + ) + ) + # Add the final touches to the file f.write( json.dumps({"name": "process_name", "ph": "M", "pid": 1, "args": {"name": "RayRunner dispatch loop"}}) @@ -55,15 +104,14 @@ def ray_tracer(metrics_actor: ray.actor.ActorHandle | None): f.write(json.dumps({"name": "process_name", "ph": "M", "pid": 2, "args": {"name": "Ray Task Execution"}})) f.write("\n]") else: - runner_tracer = RunnerTracer(None, metrics_actor) + runner_tracer = RunnerTracer(None, tracer_start) yield runner_tracer class RunnerTracer: - def __init__(self, file: TextIO | None, metrics_ray_actor: ray.actor.ActorHandle | None): + def __init__(self, file: TextIO | None, start: float): self._file = file - self._start = time.time() - self._metrics_actor = metrics_ray_actor + self._start = start def _write_event(self, event: dict[str, Any]): if self._file is not None: @@ -277,6 +325,7 @@ def task_created(self, task_id: str, stage_id: int, resource_request: ResourceRe "name": "task_execution", "ph": "b", "args": { + "task_id": task_id, "resource_request": { "num_cpus": resource_request.num_cpus, "num_gpus": resource_request.num_gpus,