diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index cf33efc2a0..02a86dc3d0 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -82,6 +82,9 @@ def __str__(self) -> str: def __repr__(self) -> str: return self.__str__() + def name(self) -> str: + return f"{'-'.join(i.__class__.__name__ for i in self.instructions)} [Stage:{self.stage_id}]" + class PartitionTaskBuilder(Generic[PartitionT]): """Builds a PartitionTask by adding instructions to its pipeline.""" diff --git a/daft/runners/progress_bar.py b/daft/runners/progress_bar.py index 37a84d18fb..725ed6302b 100644 --- a/daft/runners/progress_bar.py +++ b/daft/runners/progress_bar.py @@ -56,7 +56,7 @@ def mark_task_start(self, step: PartitionTask[Any]) -> None: stage_id = step.stage_id if stage_id not in self.pbars: - name = "-".join(i.__class__.__name__ for i in step.instructions) + name = step.name() self._make_new_bar(stage_id, name) else: pb = self.pbars[stage_id] diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 7eacf32778..afcbcc3f39 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -651,9 +651,7 @@ def _build_partitions( daft_execution_config_objref: ray.ObjectRef, task: PartitionTask[ray.ObjectRef] ) -> list[ray.ObjectRef]: """Run a PartitionTask and return the resulting list of partitions.""" - ray_options: dict[str, Any] = { - "num_returns": task.num_results + 1, - } + ray_options: dict[str, Any] = {"num_returns": task.num_results + 1, "name": task.name()} if task.resource_request is not None: ray_options = {**ray_options, **_get_ray_task_options(task.resource_request)}