From 3cef6146f779fd8b533972c024e4f15d38dcaeed Mon Sep 17 00:00:00 2001
From: Jay Chia <17691182+jaychia@users.noreply.github.com>
Date: Fri, 1 Nov 2024 13:52:38 -0700
Subject: [PATCH] [FEAT] Add better detection of Ray Job environment (#3148)
When running in a Ray Job, without the user invoking any Ray commands or
`ray.init()` explicitly, the `ray.is_initialized()` function returns
False.
This means that Daft "does not know" that it is running inside of a Ray
cluster, and thus will not default to using the RayRunner. This can lead
to unexpected behavior when using `daft-launcher` because a user must
know to call `daft.context.set_runner_ray()`.
This PR changes that behavior by attempting to look up the `$RAY_JOB_ID`
environment variable, as a heuristic to tell whether or not it is
currently running inside of a Ray job.
To test, I just ran a Ray job and called `daft.context.get_context()`
after initializing a Daft dataframe
---------
Co-authored-by: EC2 Default User
Co-authored-by: Jay Chia
---
daft/context.py | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/daft/context.py b/daft/context.py
index b21fa09b60..a7f1948bf3 100644
--- a/daft/context.py
+++ b/daft/context.py
@@ -57,6 +57,7 @@ def _get_runner_config_from_env() -> _RunnerConfig:
)
ray_is_initialized = False
+ ray_is_in_job = False
in_ray_worker = False
try:
import ray
@@ -66,6 +67,10 @@ def _get_runner_config_from_env() -> _RunnerConfig:
# Check if running inside a Ray worker
if ray._private.worker.global_worker.mode == ray.WORKER_MODE:
in_ray_worker = True
+ # In a Ray job, Ray might not be initialized yet but we can pick up an environment variable as a heuristic here
+ elif os.getenv("RAY_JOB_ID") is not None:
+ ray_is_in_job = True
+
except ImportError:
pass
@@ -89,7 +94,7 @@ def _get_runner_config_from_env() -> _RunnerConfig:
raise ValueError(f"Unsupported DAFT_RUNNER variable: {runner_from_envvar}")
# Retrieve the runner from current initialized Ray environment, only if not running in a Ray worker
- elif ray_is_initialized and not in_ray_worker:
+ elif not in_ray_worker and (ray_is_initialized or ray_is_in_job):
return _RayRunnerConfig(
address=None, # No address supplied, use the existing connection
max_task_backlog=task_backlog,