From ccd5878ce2ebf500307cb0e56b6308dc7c20cd41 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Wed, 25 Oct 2023 00:43:48 -0700 Subject: [PATCH] [BUG] Re-raise exceptions in rayrunner (#1522) Previously prior to this change, if an error occurs during execution in the RayRunner: 1. The "run_plan" thread will error out and die before sending any information on the return queue 2. The Scheduler actor will block forever on the next result on the return queue via a `.get()` This change re-raises those errors so that it surfaces in the scheduler, and prevents that issue from occurring. Co-authored-by: Jay Chia --- daft/runners/ray_runner.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 5b36cfacfe..b485d0ceb9 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -423,7 +423,7 @@ def next(self, result_uuid: str) -> ray.ObjectRef | StopIteration: result = self.results_by_df[result_uuid].get() # If there are no more results, delete the thread. - if isinstance(result, StopIteration): + if isinstance(result, (StopIteration, Exception)): self.threads_by_df[result_uuid].join() del self.threads_by_df[result_uuid] @@ -570,6 +570,11 @@ def _run_plan( except StopIteration as e: self.results_by_df[result_uuid].put(e) + # Ensure that all Exceptions are correctly propagated to the consumer before reraising to kill thread + except Exception as e: + self.results_by_df[result_uuid].put(e) + raise + @ray.remote(num_cpus=1) class SchedulerActor(Scheduler): @@ -665,6 +670,9 @@ def run_iter(self, builder: LogicalPlanBuilder, results_buffer_size: int | None if isinstance(result, StopIteration): return + elif isinstance(result, Exception): + raise result + yield result def run_iter_tables(self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None) -> Iterator[Table]: