diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 5b36cfacfe..b485d0ceb9 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -423,7 +423,7 @@ def next(self, result_uuid: str) -> ray.ObjectRef | StopIteration: result = self.results_by_df[result_uuid].get() # If there are no more results, delete the thread. - if isinstance(result, StopIteration): + if isinstance(result, (StopIteration, Exception)): self.threads_by_df[result_uuid].join() del self.threads_by_df[result_uuid] @@ -570,6 +570,11 @@ def _run_plan( except StopIteration as e: self.results_by_df[result_uuid].put(e) + # Ensure that all Exceptions are correctly propagated to the consumer before reraising to kill thread + except Exception as e: + self.results_by_df[result_uuid].put(e) + raise + @ray.remote(num_cpus=1) class SchedulerActor(Scheduler): @@ -665,6 +670,9 @@ def run_iter(self, builder: LogicalPlanBuilder, results_buffer_size: int | None if isinstance(result, StopIteration): return + elif isinstance(result, Exception): + raise result + yield result def run_iter_tables(self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None) -> Iterator[Table]: