Skip to content

Commit

Permalink
[FEAT] Add better detection of Ray Job environment (#3148)
Browse files Browse the repository at this point in the history
When running in a Ray Job, without the user invoking any Ray commands or
`ray.init()` explicitly, the `ray.is_initialized()` function returns
False.

This means that Daft "does not know" that it is running inside of a Ray
cluster, and thus will not default to using the RayRunner. This can lead
to unexpected behavior when using `daft-launcher` because a user must
know to call `daft.context.set_runner_ray()`.

This PR changes that behavior by attempting to look up the `$RAY_JOB_ID`
environment variable, as a heuristic to tell whether or not it is
currently running inside of a Ray job.

To test, I just ran a Ray job and called `daft.context.get_context()`
after initializing a Daft dataframe

<img width="1350" alt="image"
src="https://github.com/user-attachments/assets/0a6d8ae4-034a-424d-a3d7-9311d08be454">

---------

Co-authored-by: EC2 Default User <[email protected]>
Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 1, 2024
1 parent 9d4adfb commit 3cef614
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def _get_runner_config_from_env() -> _RunnerConfig:
)

ray_is_initialized = False
ray_is_in_job = False
in_ray_worker = False
try:
import ray
Expand All @@ -66,6 +67,10 @@ def _get_runner_config_from_env() -> _RunnerConfig:
# Check if running inside a Ray worker
if ray._private.worker.global_worker.mode == ray.WORKER_MODE:
in_ray_worker = True
# In a Ray job, Ray might not be initialized yet but we can pick up an environment variable as a heuristic here
elif os.getenv("RAY_JOB_ID") is not None:
ray_is_in_job = True

except ImportError:
pass

Expand All @@ -89,7 +94,7 @@ def _get_runner_config_from_env() -> _RunnerConfig:
raise ValueError(f"Unsupported DAFT_RUNNER variable: {runner_from_envvar}")

# Retrieve the runner from current initialized Ray environment, only if not running in a Ray worker
elif ray_is_initialized and not in_ray_worker:
elif not in_ray_worker and (ray_is_initialized or ray_is_in_job):
return _RayRunnerConfig(
address=None, # No address supplied, use the existing connection
max_task_backlog=task_backlog,
Expand Down

0 comments on commit 3cef614

Please sign in to comment.