Skip to content

Commit

Permalink
Wrap bar management in barkeeper module
Browse files Browse the repository at this point in the history
  • Loading branch information
jzbor committed Jul 8, 2024
1 parent a964112 commit 0f9229b
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 93 deletions.
137 changes: 137 additions & 0 deletions src/barkeeper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use std::{fmt::Write, time::Duration};


struct BarMessageWriter(String, indicatif::ProgressBar);
struct BarPrintWriter(String, indicatif::ProgressBar);


pub trait StateTracker {
fn set_njobs(&self, njobs: usize);
fn start(&self);
fn for_threads(&self, nthreads: usize) -> Vec<impl ThreadStateTracker>;
}

pub trait ThreadStateTracker {
fn out(&mut self) -> &mut impl Write;
fn status(&mut self) -> &mut impl Write;
fn job_completed(&self);
fn start(&self);
fn set_prefix(&self, prefix: String);
}

pub struct Barkeeper {
mp: indicatif::MultiProgress,
bar: indicatif::ProgressBar,
}

pub struct ThreadBarkeeper {
message_writer: BarMessageWriter,
print_writer: BarPrintWriter,
mp: indicatif::MultiProgress,
bar: indicatif::ProgressBar,
main_bar: indicatif::ProgressBar,
}


impl Barkeeper {
pub fn new() -> Self {
let mp = indicatif::MultiProgress::new();
let bar_style = indicatif::ProgressStyle::with_template("[{elapsed}] {wide_bar} {pos}/{len}").unwrap();
let bar = indicatif::ProgressBar::new(1);
bar.set_style(bar_style);

Barkeeper { mp, bar }
}
}

impl StateTracker for Barkeeper {
fn set_njobs(&self, njobs: usize) {
self.bar.set_length(njobs as u64)
}

fn start(&self) {
self.mp.add(self.bar.clone());
self.bar.tick();
self.bar.enable_steady_tick(Duration::from_millis(75));
}

fn for_threads(&self, nthreads: usize) -> Vec<ThreadBarkeeper> {
(0..nthreads).map(|_| {
let bar = indicatif::ProgressBar::new(1);
bar.set_style(indicatif::ProgressStyle::with_template("{spinner} {prefix:.cyan} {wide_msg}").unwrap());

ThreadBarkeeper {
message_writer: BarMessageWriter(String::new(), bar.clone()),
print_writer: BarPrintWriter(String::new(), bar.clone()),
mp: self.mp.clone(),
main_bar: self.bar.clone(),
bar,
}

}).collect()
}
}

impl ThreadStateTracker for ThreadBarkeeper {
fn out(&mut self) -> &mut impl Write {
&mut self.print_writer
}

fn status(&mut self) -> &mut impl Write {
&mut self.message_writer
}

fn start(&self) {
self.mp.add(self.bar.clone());
self.bar.tick();
self.bar.enable_steady_tick(Duration::from_millis(75));
}

fn job_completed(&self) {
self.main_bar.inc(1)
}

fn set_prefix(&self, prefix: String) {
self.bar.set_prefix(prefix)
}
}


impl Write for BarMessageWriter {
fn write_str(&mut self, s: &str) -> std::fmt::Result {
for c in s.chars() {
self.write_char(c)?
}
Ok(())
}

fn write_char(&mut self, c: char) -> std::fmt::Result {
if c == '\n' {
let msg = self.0.clone();
self.1.set_message(msg);
self.0 = String::new();
} else {
self.0.push(c);
}
Ok(())
}
}

impl Write for BarPrintWriter {
fn write_str(&mut self, s: &str) -> std::fmt::Result {
for c in s.chars() {
self.write_char(c)?
}
Ok(())
}

fn write_char(&mut self, c: char) -> std::fmt::Result {
if c == '\n' {
self.1.println(&self.0);
self.0.clear();
} else {
self.0.push(c)
}
Ok(())
}
}
13 changes: 7 additions & 6 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::Arc;
use handlebars::Handlebars;
use serde::{Deserialize, Serialize};

use crate::barkeeper::ThreadStateTracker;
use crate::error::*;
use crate::queue::JobState;
use crate::Options;
Expand Down Expand Up @@ -177,11 +178,11 @@ impl JobDescription {
}

impl InnerJobRealization {
pub fn run(&self, status_writer: &mut impl Write, log_writer: &mut impl Write, options: &Options) -> ZinnResult<JobState> {
pub fn run(&self, tracker: &mut impl ThreadStateTracker, options: &Options) -> ZinnResult<JobState> {
// skip if dry run
if options.dry_run {
if options.trace {
let _ = writeln!(log_writer, "{}", self.cmd());
let _ = writeln!(tracker.out(), "{}", self.cmd());
}
return Ok(JobState::Finished);
}
Expand Down Expand Up @@ -218,7 +219,7 @@ impl InnerJobRealization {

// print out trace
if options.trace {
let _ = writeln!(log_writer, "{}", self.cmd());
let _ = writeln!(tracker.out(), "{}", self.cmd());
}

let (io_reader, io_writer) = os_pipe::pipe()?;
Expand All @@ -234,17 +235,17 @@ impl InnerJobRealization {
let mut last_line: Option<String> = None;

for line in BufReader::new(io_reader).lines().map_while(Result::ok) {
let _ = writeln!(status_writer, "{}", line);
let _ = writeln!(tracker.status(), "{}", line);

if options.verbose {
if let Some(line) = last_line.take() {
let _ = writeln!(log_writer, "{}: {}", self, line);
let _ = writeln!(tracker.out(), "{}: {}", self, line);
}
last_line = Some(line);
}
}
if let Some(line) = last_line.take() {
let _ = writeln!(log_writer, "{}: {}", self, line);
let _ = writeln!(tracker.out(), "{}: {}", self, line);
}

let status = process.wait()?;
Expand Down
39 changes: 13 additions & 26 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
#![doc = include_str!("../README.md")]

use barkeeper::{StateTracker, ThreadStateTracker};
use clap::Parser;
use handlebars::Handlebars;
use queue::Queue;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::error::Error;
use std::time::Duration;
use std::{fs, thread};
use indicatif::MultiProgress;
use indicatif::ProgressBar;
use indicatif::ProgressStyle;

use error::*;
use job::*;


mod barkeeper;
mod constants;
mod error;
mod hbextensions;
mod job;
mod worker;
mod queue;
mod hbextensions;
mod constants;
mod worker;


const DOCS_URL: &str = "https://jzbor.de/zinn/zinn";
Expand Down Expand Up @@ -177,16 +175,8 @@ fn main() {
}

// setup bars
let mp = MultiProgress::new();
let main_bar_style = ProgressStyle::with_template("[{elapsed}] {wide_bar} {pos}/{len}").unwrap();
let main_bar = ProgressBar::new(zinnfile.jobs.len() as u64);
main_bar.set_style(main_bar_style);
let mut bars: Vec<_> = (0..nthreads).map(|_| {
let bar = ProgressBar::new(10000000);
bar.set_style(ProgressStyle::with_template("{spinner} {prefix:.cyan} {wide_msg}").unwrap());
mp.add(bar.clone());
bar
}).collect();
let barkeeper = barkeeper::Barkeeper::new();
let mut thread_barkeepers = barkeeper.for_threads(nthreads);

// feed the queue
let queue = Queue::new();
Expand All @@ -202,29 +192,26 @@ fn main() {
queue.enqueue(job);
}

main_bar.set_length(queue.len() as u64);
barkeeper.set_njobs(queue.len());

// start worker bars
for bar in &mut bars {
bar.tick();
bar.enable_steady_tick(Duration::from_millis(75));
for tb in &thread_barkeepers {
tb.start();
}

// start the threads
let threads: Vec<_> = (0..nthreads).map(|_| {
let main_bar = main_bar.clone();
let queue = queue.clone();
let bar = bars.pop().unwrap();
let tb = thread_barkeepers.pop().unwrap();
let options = args.options();

thread::spawn(move || {
worker::run_worker(queue, bar, main_bar, options)
worker::run_worker(queue, tb, options)
})
}).collect();

// enable the main bar
mp.add(main_bar.clone());
main_bar.enable_steady_tick(Duration::from_millis(75));
barkeeper.start();

// wait for the work to be completed
queue.done();
Expand Down
78 changes: 17 additions & 61 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,75 +1,31 @@
use std::fmt::Write;

use indicatif::ProgressBar;

use crate::barkeeper::ThreadStateTracker;
use crate::queue::{JobState, Queue};
use crate::Options;

struct BarMessageWriter(String, ProgressBar);
struct BarPrintWriter(String, ProgressBar);

impl Write for BarMessageWriter {
fn write_str(&mut self, s: &str) -> std::fmt::Result {
for c in s.chars() {
self.write_char(c)?
}
Ok(())
}

fn write_char(&mut self, c: char) -> std::fmt::Result {
if c == '\n' {
let msg = self.0.clone();
self.1.set_message(msg);
self.0 = String::new();
} else {
self.0.push(c);
}
Ok(())
}
}

impl Write for BarPrintWriter {
fn write_str(&mut self, s: &str) -> std::fmt::Result {
for c in s.chars() {
self.write_char(c)?
}
Ok(())
}

fn write_char(&mut self, c: char) -> std::fmt::Result {
if c == '\n' {
self.1.println(&self.0);
self.0.clear();
} else {
self.0.push(c)
}
Ok(())
}
}

pub fn run_worker(queue: Queue, bar: ProgressBar, main_bar: ProgressBar, options: Options) {
let mut status_writer = BarMessageWriter(String::new(), bar.clone());
let mut log_writer = BarPrintWriter(String::new(), bar.clone());

pub fn run_worker(queue: Queue, mut tracker: impl ThreadStateTracker, options: Options) {
loop {
bar.set_prefix("waiting...");
bar.set_message("");
tracker.set_prefix(String::from("waiting..."));
let _ = writeln!(tracker.status(), "");

if let Some(job) = queue.fetch() {
bar.set_prefix(job.to_string());
if let Ok(state) = job.run(&mut status_writer, &mut log_writer, &options) {
match state {
JobState::Finished => bar.println(console::style(format!("=> DONE {}", job)).green().to_string()),
JobState::Skipped => bar.println(console::style(format!("=> SKIPPED {}", job)).yellow().to_string()),
JobState::Failed => bar.println(console::style(format!("=> FAILED {}", job)).red().to_string()),
tracker.set_prefix(job.to_string());
if let Ok(state) = job.run(&mut tracker, &options) {
let msg = match state {
JobState::Finished => console::style(format!("=> DONE {}", job)).green().to_string(),
JobState::Skipped => console::style(format!("=> SKIPPED {}", job)).yellow().to_string(),
JobState::Failed => console::style(format!("=> FAILED {}", job)).red().to_string(),
_ => panic!("Invalid job state after run: {:?}", state),

}
};
let _ = writeln!(tracker.out(), "{}", msg);
queue.finished(job);
} else if let Err(e) = job.run(&mut status_writer, &mut log_writer, &options) {
bar.println(console::style(format!("=> FAILED {}: {}", job, e)).red().to_string());
} else if let Err(e) = job.run(&mut tracker, &options) {
let msg = console::style(format!("=> FAILED {}: {}", job, e)).red().to_string();
let _ = writeln!(tracker.out(), "{}", msg);
queue.failed(job);
}
main_bar.inc(1);
tracker.job_completed();
} else {
break;
}
Expand Down

0 comments on commit 0f9229b

Please sign in to comment.