From 1c0f7803ca5d0a23b0b46f30f410a2aab3850cbf Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 20 Dec 2024 20:41:00 -0800 Subject: [PATCH] chore: Build progress bar only on first update (#3626) Theres 2 progress bars for swordfish, a rust based one thats for terminals, and a python one thats for jupyter notebooks (uses tqdm). This pr makes it so that the tqdm bar is only rendered on first update (the rust based one is already lazy rendered), + increase the refresh rate of both progress bars to 500ms. native https://github.com/user-attachments/assets/c55e51a4-4919-4246-9add-2fe13d305435 py https://github.com/user-attachments/assets/8be184c5-5b48-4fee-9579-deb485f37e07 --------- Co-authored-by: Colin Ho Co-authored-by: Colin Ho --- daft/runners/progress_bar.py | 33 +++++++++++++------- src/daft-local-execution/src/lib.rs | 4 +-- src/daft-local-execution/src/progress_bar.rs | 25 +++------------ 3 files changed, 26 insertions(+), 36 deletions(-) diff --git a/daft/runners/progress_bar.py b/daft/runners/progress_bar.py index ed4c91c9a3..220fc7a9bf 100644 --- a/daft/runners/progress_bar.py +++ b/daft/runners/progress_bar.py @@ -114,25 +114,34 @@ def __init__(self) -> None: self._maxinterval = 5.0 self.tqdm_mod = get_tqdm(False) self.pbars: dict[int, Any] = dict() + self.bar_configs: dict[int, str] = dict() + self.next_id = 0 - def make_new_bar(self, bar_format: str, initial_message: str) -> int: - pbar_id = len(self.pbars) - self.pbars[pbar_id] = self.tqdm_mod( - bar_format=bar_format, - desc=initial_message, - position=pbar_id, - leave=False, - mininterval=1.0, - maxinterval=self._maxinterval, - ) + def make_new_bar(self, bar_format: str) -> int: + pbar_id = self.next_id + self.next_id += 1 + self.bar_configs[pbar_id] = bar_format return pbar_id def update_bar(self, pbar_id: int, message: str) -> None: + if pbar_id not in self.pbars: + if pbar_id not in self.bar_configs: + raise ValueError(f"No bar configuration found for id {pbar_id}") + bar_format = self.bar_configs[pbar_id] + self.pbars[pbar_id] = self.tqdm_mod( + bar_format=bar_format, + position=pbar_id, + leave=False, + mininterval=1.0, + maxinterval=self._maxinterval, + ) + del self.bar_configs[pbar_id] self.pbars[pbar_id].set_description_str(message) def close_bar(self, pbar_id: int) -> None: - self.pbars[pbar_id].close() - del self.pbars[pbar_id] + if pbar_id in self.pbars: + self.pbars[pbar_id].close() + del self.pbars[pbar_id] def close(self) -> None: for p in self.pbars.values(): diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index 654deea901..cd0c146879 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -166,9 +166,7 @@ impl ExecutionRuntimeContext { runtime_stats: Arc, ) -> Option> { if let Some(ref pb_manager) = self.progress_bar_manager { - let pb = pb_manager - .make_new_bar(color, prefix, show_received) - .unwrap(); + let pb = pb_manager.make_new_bar(color, prefix).unwrap(); Some(Arc::new(OperatorProgressBar::new( pb, runtime_stats, diff --git a/src/daft-local-execution/src/progress_bar.rs b/src/daft-local-execution/src/progress_bar.rs index cf7fa17e3a..c5c59b3e06 100644 --- a/src/daft-local-execution/src/progress_bar.rs +++ b/src/daft-local-execution/src/progress_bar.rs @@ -21,7 +21,6 @@ pub trait ProgressBarManager { &self, color: ProgressBarColor, prefix: &str, - show_received: bool, ) -> DaftResult>; fn close_all(&self) -> DaftResult<()>; @@ -52,8 +51,8 @@ pub struct OperatorProgressBar { } impl OperatorProgressBar { - // 100ms = 100_000_000ns - const UPDATE_INTERVAL: u64 = 100_000_000; + // 500ms = 500_000_000ns + const UPDATE_INTERVAL: u64 = 500_000_000; pub fn new( progress_bar: Box, @@ -146,27 +145,19 @@ impl ProgressBarManager for IndicatifProgressBarManager { &self, color: ProgressBarColor, prefix: &str, - show_received: bool, ) -> DaftResult> { let template_str = format!( "🗡️ 🐟 {{spinner:.green}} {{prefix:.{color}/bold}} | [{{elapsed_precise}}] {{msg}}", color = color.to_str(), ); - let initial_message = if show_received { - "0 rows received, 0 rows emitted".to_string() - } else { - "0 rows emitted".to_string() - }; - let pb = indicatif::ProgressBar::new_spinner() .with_style( ProgressStyle::default_spinner() .template(template_str.as_str()) .unwrap(), ) - .with_prefix(prefix.to_string()) - .with_message(initial_message); + .with_prefix(prefix.to_string()); self.multi_progress.add(pb.clone()); DaftResult::Ok(Box::new(IndicatifProgressBar(pb))) @@ -263,18 +254,10 @@ mod python { &self, _color: ProgressBarColor, prefix: &str, - show_received: bool, ) -> DaftResult> { let bar_format = format!("🗡️ 🐟 {prefix}: {{elapsed}} {{desc}}", prefix = prefix); - let initial_message = if show_received { - "0 rows received, 0 rows emitted".to_string() - } else { - "0 rows emitted".to_string() - }; let pb_id = Python::with_gil(|py| { - let pb_id = - self.inner - .call_method1(py, "make_new_bar", (bar_format, initial_message))?; + let pb_id = self.inner.call_method1(py, "make_new_bar", (bar_format,))?; let pb_id = pb_id.extract::(py)?; DaftResult::Ok(pb_id) })?;