From aaf279e4816ae373ed83dfab339d321b054fccca Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 21 Nov 2023 13:00:03 -0800 Subject: [PATCH] [CHORE] enable refresh on tqdm total updates (#1654) * We currently were only using a background thread when updating the done tasks. Now we also check if new tasks are enqueued to perform a refresh. --- daft/runners/progress_bar.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/daft/runners/progress_bar.py b/daft/runners/progress_bar.py index 3a6ff6210d..51a474b470 100644 --- a/daft/runners/progress_bar.py +++ b/daft/runners/progress_bar.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +import time from typing import Any from tqdm.auto import tqdm @@ -13,6 +14,7 @@ def __init__(self, use_ray_tqdm: bool, show_tasks_bar: bool = False, disable: bo self.use_ray_tqdm = use_ray_tqdm self.show_tasks_bar = show_tasks_bar self.tqdm_mod = tqdm + self._maxinterval = 5.0 self.pbars: dict[int, tqdm] = dict() self.disable = ( disable @@ -25,7 +27,12 @@ def _make_new_bar(self, stage_id: int, name: str): self.pbars[stage_id] = self.tqdm_mod(total=1, desc=name, position=len(self.pbars)) else: self.pbars[stage_id] = self.tqdm_mod( - total=1, desc=name, position=len(self.pbars), leave=False, mininterval=1.0 + total=1, + desc=name, + position=len(self.pbars), + leave=False, + mininterval=1.0, + maxinterval=self._maxinterval, ) def mark_task_start(self, step: PartitionTask[Any]) -> None: @@ -46,6 +53,10 @@ def mark_task_start(self, step: PartitionTask[Any]) -> None: else: pb = self.pbars[stage_id] pb.total += 1 + if hasattr(pb, "last_print_t"): + dt = time.time() - pb.last_print_t + if dt >= self._maxinterval: + pb.refresh() def mark_task_done(self, step: PartitionTask[Any]) -> None: if self.disable: