Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: allow for level 2 of Ray tracing which records memory #3590

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def set_execution_config(
default_morsel_size: int | None = None,
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
enable_ray_tracing: bool | None = None,
enable_ray_tracing: int | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution.

Expand Down Expand Up @@ -395,7 +395,8 @@ def set_execution_config(
default_morsel_size: Default size of morsels used for the new local executor. Defaults to 131072 rows.
shuffle_algorithm: The shuffle algorithm to use. Defaults to "map_reduce". Other options are "pre_shuffle_merge".
pre_shuffle_merge_threshold: Memory threshold in bytes for pre-shuffle merge. Defaults to 1GB
enable_ray_tracing: Enable tracing for Ray. Accessible in `/tmp/ray/session_latest/logs/daft` after the run completes. Defaults to False.
enable_ray_tracing: Enable tracing for Ray. Accessible in `/tmp/ray/session_latest/logs/daft` after the run completes. Defaults to 0, but
can be set to 1 or 2 depending on the level of tracing desired. Levels 2 and above require `memray` to be installed.
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
ctx = get_context()
Expand Down
4 changes: 2 additions & 2 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,7 @@ class PyDaftExecutionConfig:
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
default_morsel_size: int | None = None,
enable_ray_tracing: bool | None = None,
enable_ray_tracing: int | None = None,
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
) -> PyDaftExecutionConfig: ...
Expand Down Expand Up @@ -1783,7 +1783,7 @@ class PyDaftExecutionConfig:
@property
def pre_shuffle_merge_threshold(self) -> int: ...
@property
def enable_ray_tracing(self) -> bool: ...
def enable_ray_tracing(self) -> int: ...

class PyDaftPlanningConfig:
@staticmethod
Expand Down
20 changes: 18 additions & 2 deletions daft/runners/ray_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@

# End Unix timestamp
end: float
memory_stats: TaskMemoryStats | None


@dataclasses.dataclass(frozen=True)
class TaskMemoryStats:
peak_memory_allocated: int
total_memory_allocated: int
total_num_allocations: int


class _NodeInfo:
Expand Down Expand Up @@ -123,9 +131,15 @@
)
)

def mark_task_end(self, execution_id: str, task_id: str, end: float):
def mark_task_end(
self,
execution_id: str,
task_id: str,
end: float,
memory_stats: TaskMemoryStats | None,
):
# Add an EndTaskEvent
self._task_events[execution_id].append(EndTaskEvent(task_id=task_id, end=end))
self._task_events[execution_id].append(EndTaskEvent(task_id=task_id, end=end, memory_stats=memory_stats))

Check warning on line 142 in daft/runners/ray_metrics.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_metrics.py#L142

Added line #L142 was not covered by tests

def get_task_events(self, execution_id: str, idx: int) -> tuple[list[TaskEvent], int]:
events = self._task_events[execution_id]
Expand Down Expand Up @@ -177,11 +191,13 @@
self,
task_id: str,
end: float,
memory_stats: TaskMemoryStats | None,
) -> None:
self.actor.mark_task_end.remote(
self.execution_id,
task_id,
end,
memory_stats,
)

def get_task_events(self, idx: int) -> tuple[list[TaskEvent], int]:
Expand Down
62 changes: 57 additions & 5 deletions daft/runners/ray_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import dataclasses
import json
import logging
import os
import pathlib
import time
from datetime import datetime
Expand Down Expand Up @@ -50,7 +51,7 @@
# Dump the RayRunner trace if we detect an active Ray session, otherwise we give up and do not write the trace
ray_logs_location = get_log_location()
filepath: pathlib.Path | None
if ray_logs_location.exists() and daft_execution_config.enable_ray_tracing:
if ray_logs_location.exists() and daft_execution_config.enable_ray_tracing > 0:
trace_filename = (
f"trace_RayRunner.{execution_id}.{datetime.replace(datetime.now(), microsecond=0).isoformat()[:-3]}.json"
)
Expand Down Expand Up @@ -255,6 +256,11 @@
"ph": RunnerTracer.PHASE_ASYNC_END,
"pid": 1,
"tid": 2,
"args": {
"memray_peak_memory_allocated": task_event.memory_stats.peak_memory_allocated,
"memray_total_memory_allocated": task_event.memory_stats.total_memory_allocated,
"memray_total_num_allocations": task_event.memory_stats.total_num_allocations,
},
},
ts=end_ts,
)
Expand All @@ -272,6 +278,11 @@
"ph": RunnerTracer.PHASE_DURATION_END,
"pid": node_idx + RunnerTracer.NODE_PIDS_START,
"tid": worker_idx,
"args": {
"memray_peak_memory_allocated": task_event.memory_stats.peak_memory_allocated,
"memray_total_memory_allocated": task_event.memory_stats.total_memory_allocated,
"memray_total_num_allocations": task_event.memory_stats.total_num_allocations,
},
},
ts=end_ts,
)
Expand Down Expand Up @@ -655,7 +666,9 @@
@contextlib.contextmanager
def collect_ray_task_metrics(execution_id: str, task_id: str, stage_id: int, execution_config: PyDaftExecutionConfig):
"""Context manager that will ping the metrics actor to record various execution metrics about a given task."""
if execution_config.enable_ray_tracing:
if execution_config.enable_ray_tracing == 0:
yield
elif execution_config.enable_ray_tracing == 1:

Check warning on line 671 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L671

Added line #L671 was not covered by tests
import time

runtime_context = ray.get_runtime_context()
Expand All @@ -670,7 +683,46 @@
runtime_context.get_assigned_resources(),
runtime_context.get_task_id(),
)
yield
metrics_actor.mark_task_end(task_id, time.time())
try:
yield

Check warning on line 687 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L686-L687

Added lines #L686 - L687 were not covered by tests
finally:
metrics_actor.mark_task_end(task_id, time.time(), memory_stats=None)
elif execution_config.enable_ray_tracing == 2:
import time

Check warning on line 691 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L689-L691

Added lines #L689 - L691 were not covered by tests

import memray
from memray._memray import compute_statistics

Check warning on line 694 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L693-L694

Added lines #L693 - L694 were not covered by tests

tmpdir = "/tmp/ray/session_latest/logs/daft/task_memray_dumps"
os.makedirs(tmpdir, exist_ok=True)
memray_tmpfile = os.path.join(tmpdir, f"task-{task_id}.memray.bin")

Check warning on line 698 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L696-L698

Added lines #L696 - L698 were not covered by tests

runtime_context = ray.get_runtime_context()
metrics_actor = ray_metrics.get_metrics_actor(execution_id)
metrics_actor.mark_task_start(

Check warning on line 702 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L700-L702

Added lines #L700 - L702 were not covered by tests
task_id,
time.time(),
runtime_context.get_node_id(),
runtime_context.get_worker_id(),
stage_id,
runtime_context.get_assigned_resources(),
runtime_context.get_task_id(),
)
try:
with memray.Tracker(memray_tmpfile, native_traces=True, follow_fork=True):
yield

Check warning on line 713 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L711-L713

Added lines #L711 - L713 were not covered by tests
finally:
stats = compute_statistics(memray_tmpfile)
metrics_actor.mark_task_end(

Check warning on line 716 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L715-L716

Added lines #L715 - L716 were not covered by tests
task_id,
time.time(),
ray_metrics.TaskMemoryStats(
peak_memory_allocated=stats.peak_memory_allocated,
total_memory_allocated=stats.total_memory_allocated,
total_num_allocations=stats.total_num_allocations,
),
)
else:
yield
raise RuntimeError(

Check warning on line 726 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L726

Added line #L726 was not covered by tests
f"Unrecognized value for $DAFT_ENABLE_RAY_TRACING. Expected a number from 0 to 2, but received: {execution_config.enable_ray_tracing}"
)
14 changes: 8 additions & 6 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
pub default_morsel_size: usize,
pub shuffle_algorithm: String,
pub pre_shuffle_merge_threshold: usize,
pub enable_ray_tracing: bool,
pub enable_ray_tracing: u32,
}

impl Default for DaftExecutionConfig {
Expand Down Expand Up @@ -80,7 +80,7 @@
default_morsel_size: 128 * 1024,
shuffle_algorithm: "map_reduce".to_string(),
pre_shuffle_merge_threshold: 1024 * 1024 * 1024, // 1GB
enable_ray_tracing: false,
enable_ray_tracing: 0,
}
}
}
Expand Down Expand Up @@ -109,10 +109,12 @@
cfg.enable_native_executor = true;
}
let ray_tracing_env_var_name = "DAFT_ENABLE_RAY_TRACING";
if let Ok(val) = std::env::var(ray_tracing_env_var_name)
&& matches!(val.trim().to_lowercase().as_str(), "1" | "true")
{
cfg.enable_ray_tracing = true;
if let Ok(val) = std::env::var(ray_tracing_env_var_name) {
if let Ok(val) = val.trim().parse::<u32>() {
cfg.enable_ray_tracing = val;
} else {
log::warn!("Invalid value for DAFT_ENABLE_RAY_TRACING. Expected a number from 0 to 2, but received: {}", val.trim());

Check warning on line 116 in src/common/daft-config/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

src/common/daft-config/src/lib.rs#L113-L116

Added lines #L113 - L116 were not covered by tests
}
}
let shuffle_algorithm_env_var_name = "DAFT_SHUFFLE_ALGORITHM";
if let Ok(val) = std::env::var(shuffle_algorithm_env_var_name) {
Expand Down
4 changes: 2 additions & 2 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl PyDaftExecutionConfig {
default_morsel_size: Option<usize>,
shuffle_algorithm: Option<&str>,
pre_shuffle_merge_threshold: Option<usize>,
enable_ray_tracing: Option<bool>,
enable_ray_tracing: Option<u32>,
) -> PyResult<Self> {
let mut config = self.config.as_ref().clone();

Expand Down Expand Up @@ -290,7 +290,7 @@ impl PyDaftExecutionConfig {
}

#[getter]
fn enable_ray_tracing(&self) -> PyResult<bool> {
fn enable_ray_tracing(&self) -> PyResult<u32> {
Ok(self.config.enable_ray_tracing)
}
}
Expand Down
Loading