Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Build progress bar only on first update #3626

Merged
merged 4 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions daft/runners/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,25 +114,34 @@
self._maxinterval = 5.0
self.tqdm_mod = get_tqdm(False)
self.pbars: dict[int, Any] = dict()
self.bar_configs: dict[int, str] = dict()
self.next_id = 0

Check warning on line 118 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L117-L118

Added lines #L117 - L118 were not covered by tests

def make_new_bar(self, bar_format: str, initial_message: str) -> int:
pbar_id = len(self.pbars)
self.pbars[pbar_id] = self.tqdm_mod(
bar_format=bar_format,
desc=initial_message,
position=pbar_id,
leave=False,
mininterval=1.0,
maxinterval=self._maxinterval,
)
def make_new_bar(self, bar_format: str) -> int:
pbar_id = self.next_id
self.next_id += 1
self.bar_configs[pbar_id] = bar_format

Check warning on line 123 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L121-L123

Added lines #L121 - L123 were not covered by tests
return pbar_id

def update_bar(self, pbar_id: int, message: str) -> None:
if pbar_id not in self.pbars:
if pbar_id not in self.bar_configs:
raise ValueError(f"No bar configuration found for id {pbar_id}")
bar_format = self.bar_configs[pbar_id]
self.pbars[pbar_id] = self.tqdm_mod(

Check warning on line 131 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L127-L131

Added lines #L127 - L131 were not covered by tests
bar_format=bar_format,
position=pbar_id,
leave=False,
mininterval=1.0,
maxinterval=self._maxinterval,
)
del self.bar_configs[pbar_id]

Check warning on line 138 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L138

Added line #L138 was not covered by tests
self.pbars[pbar_id].set_description_str(message)

def close_bar(self, pbar_id: int) -> None:
self.pbars[pbar_id].close()
del self.pbars[pbar_id]
if pbar_id in self.pbars:
self.pbars[pbar_id].close()
del self.pbars[pbar_id]

Check warning on line 144 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L142-L144

Added lines #L142 - L144 were not covered by tests

def close(self) -> None:
for p in self.pbars.values():
Expand Down
4 changes: 1 addition & 3 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,7 @@ impl ExecutionRuntimeContext {
runtime_stats: Arc<RuntimeStatsContext>,
) -> Option<Arc<OperatorProgressBar>> {
if let Some(ref pb_manager) = self.progress_bar_manager {
let pb = pb_manager
.make_new_bar(color, prefix, show_received)
.unwrap();
let pb = pb_manager.make_new_bar(color, prefix).unwrap();
Some(Arc::new(OperatorProgressBar::new(
pb,
runtime_stats,
Expand Down
25 changes: 4 additions & 21 deletions src/daft-local-execution/src/progress_bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
&self,
color: ProgressBarColor,
prefix: &str,
show_received: bool,
) -> DaftResult<Box<dyn ProgressBar>>;

fn close_all(&self) -> DaftResult<()>;
Expand Down Expand Up @@ -52,8 +51,8 @@
}

impl OperatorProgressBar {
// 100ms = 100_000_000ns
const UPDATE_INTERVAL: u64 = 100_000_000;
// 500ms = 500_000_000ns
const UPDATE_INTERVAL: u64 = 500_000_000;

pub fn new(
progress_bar: Box<dyn ProgressBar>,
Expand Down Expand Up @@ -146,27 +145,19 @@
&self,
color: ProgressBarColor,
prefix: &str,
show_received: bool,
) -> DaftResult<Box<dyn ProgressBar>> {
let template_str = format!(
"🗡️ 🐟 {{spinner:.green}} {{prefix:.{color}/bold}} | [{{elapsed_precise}}] {{msg}}",
color = color.to_str(),
);

let initial_message = if show_received {
"0 rows received, 0 rows emitted".to_string()
} else {
"0 rows emitted".to_string()
};

let pb = indicatif::ProgressBar::new_spinner()
.with_style(
ProgressStyle::default_spinner()
.template(template_str.as_str())
.unwrap(),
)
.with_prefix(prefix.to_string())
.with_message(initial_message);
.with_prefix(prefix.to_string());

self.multi_progress.add(pb.clone());
DaftResult::Ok(Box::new(IndicatifProgressBar(pb)))
Expand Down Expand Up @@ -263,18 +254,10 @@
&self,
_color: ProgressBarColor,
prefix: &str,
show_received: bool,
) -> DaftResult<Box<dyn ProgressBar>> {
let bar_format = format!("🗡️ 🐟 {prefix}: {{elapsed}} {{desc}}", prefix = prefix);
let initial_message = if show_received {
"0 rows received, 0 rows emitted".to_string()
} else {
"0 rows emitted".to_string()
};
let pb_id = Python::with_gil(|py| {
let pb_id =
self.inner
.call_method1(py, "make_new_bar", (bar_format, initial_message))?;
let pb_id = self.inner.call_method1(py, "make_new_bar", (bar_format,))?;

Check warning on line 260 in src/daft-local-execution/src/progress_bar.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/progress_bar.rs#L260

Added line #L260 was not covered by tests
let pb_id = pb_id.extract::<usize>(py)?;
DaftResult::Ok(pb_id)
})?;
Expand Down
Loading