diff --git a/daft/runners/progress_bar.py b/daft/runners/progress_bar.py index ed4c91c9a3..220fc7a9bf 100644 --- a/daft/runners/progress_bar.py +++ b/daft/runners/progress_bar.py @@ -114,25 +114,34 @@ def __init__(self) -> None: self._maxinterval = 5.0 self.tqdm_mod = get_tqdm(False) self.pbars: dict[int, Any] = dict() + self.bar_configs: dict[int, str] = dict() + self.next_id = 0 - def make_new_bar(self, bar_format: str, initial_message: str) -> int: - pbar_id = len(self.pbars) - self.pbars[pbar_id] = self.tqdm_mod( - bar_format=bar_format, - desc=initial_message, - position=pbar_id, - leave=False, - mininterval=1.0, - maxinterval=self._maxinterval, - ) + def make_new_bar(self, bar_format: str) -> int: + pbar_id = self.next_id + self.next_id += 1 + self.bar_configs[pbar_id] = bar_format return pbar_id def update_bar(self, pbar_id: int, message: str) -> None: + if pbar_id not in self.pbars: + if pbar_id not in self.bar_configs: + raise ValueError(f"No bar configuration found for id {pbar_id}") + bar_format = self.bar_configs[pbar_id] + self.pbars[pbar_id] = self.tqdm_mod( + bar_format=bar_format, + position=pbar_id, + leave=False, + mininterval=1.0, + maxinterval=self._maxinterval, + ) + del self.bar_configs[pbar_id] self.pbars[pbar_id].set_description_str(message) def close_bar(self, pbar_id: int) -> None: - self.pbars[pbar_id].close() - del self.pbars[pbar_id] + if pbar_id in self.pbars: + self.pbars[pbar_id].close() + del self.pbars[pbar_id] def close(self) -> None: for p in self.pbars.values(): diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index 654deea901..cd0c146879 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -166,9 +166,7 @@ impl ExecutionRuntimeContext { runtime_stats: Arc, ) -> Option> { if let Some(ref pb_manager) = self.progress_bar_manager { - let pb = pb_manager - .make_new_bar(color, prefix, show_received) - .unwrap(); + let pb = pb_manager.make_new_bar(color, prefix).unwrap(); Some(Arc::new(OperatorProgressBar::new( pb, runtime_stats, diff --git a/src/daft-local-execution/src/progress_bar.rs b/src/daft-local-execution/src/progress_bar.rs index cf7fa17e3a..c5c59b3e06 100644 --- a/src/daft-local-execution/src/progress_bar.rs +++ b/src/daft-local-execution/src/progress_bar.rs @@ -21,7 +21,6 @@ pub trait ProgressBarManager { &self, color: ProgressBarColor, prefix: &str, - show_received: bool, ) -> DaftResult>; fn close_all(&self) -> DaftResult<()>; @@ -52,8 +51,8 @@ pub struct OperatorProgressBar { } impl OperatorProgressBar { - // 100ms = 100_000_000ns - const UPDATE_INTERVAL: u64 = 100_000_000; + // 500ms = 500_000_000ns + const UPDATE_INTERVAL: u64 = 500_000_000; pub fn new( progress_bar: Box, @@ -146,27 +145,19 @@ impl ProgressBarManager for IndicatifProgressBarManager { &self, color: ProgressBarColor, prefix: &str, - show_received: bool, ) -> DaftResult> { let template_str = format!( "🗡️ 🐟 {{spinner:.green}} {{prefix:.{color}/bold}} | [{{elapsed_precise}}] {{msg}}", color = color.to_str(), ); - let initial_message = if show_received { - "0 rows received, 0 rows emitted".to_string() - } else { - "0 rows emitted".to_string() - }; - let pb = indicatif::ProgressBar::new_spinner() .with_style( ProgressStyle::default_spinner() .template(template_str.as_str()) .unwrap(), ) - .with_prefix(prefix.to_string()) - .with_message(initial_message); + .with_prefix(prefix.to_string()); self.multi_progress.add(pb.clone()); DaftResult::Ok(Box::new(IndicatifProgressBar(pb))) @@ -263,18 +254,10 @@ mod python { &self, _color: ProgressBarColor, prefix: &str, - show_received: bool, ) -> DaftResult> { let bar_format = format!("🗡️ 🐟 {prefix}: {{elapsed}} {{desc}}", prefix = prefix); - let initial_message = if show_received { - "0 rows received, 0 rows emitted".to_string() - } else { - "0 rows emitted".to_string() - }; let pb_id = Python::with_gil(|py| { - let pb_id = - self.inner - .call_method1(py, "make_new_bar", (bar_format, initial_message))?; + let pb_id = self.inner.call_method1(py, "make_new_bar", (bar_format,))?; let pb_id = pb_id.extract::(py)?; DaftResult::Ok(pb_id) })?;