diff --git a/src/barkeeper.rs b/src/barkeeper.rs new file mode 100644 index 0000000..4d8dda3 --- /dev/null +++ b/src/barkeeper.rs @@ -0,0 +1,137 @@ +use std::{fmt::Write, time::Duration}; + + +struct BarMessageWriter(String, indicatif::ProgressBar); +struct BarPrintWriter(String, indicatif::ProgressBar); + + +pub trait StateTracker { + fn set_njobs(&self, njobs: usize); + fn start(&self); + fn for_threads(&self, nthreads: usize) -> Vec; +} + +pub trait ThreadStateTracker { + fn out(&mut self) -> &mut impl Write; + fn status(&mut self) -> &mut impl Write; + fn job_completed(&self); + fn start(&self); + fn set_prefix(&self, prefix: String); +} + +pub struct Barkeeper { + mp: indicatif::MultiProgress, + bar: indicatif::ProgressBar, +} + +pub struct ThreadBarkeeper { + message_writer: BarMessageWriter, + print_writer: BarPrintWriter, + mp: indicatif::MultiProgress, + bar: indicatif::ProgressBar, + main_bar: indicatif::ProgressBar, +} + + +impl Barkeeper { + pub fn new() -> Self { + let mp = indicatif::MultiProgress::new(); + let bar_style = indicatif::ProgressStyle::with_template("[{elapsed}] {wide_bar} {pos}/{len}").unwrap(); + let bar = indicatif::ProgressBar::new(1); + bar.set_style(bar_style); + + Barkeeper { mp, bar } + } +} + +impl StateTracker for Barkeeper { + fn set_njobs(&self, njobs: usize) { + self.bar.set_length(njobs as u64) + } + + fn start(&self) { + self.mp.add(self.bar.clone()); + self.bar.tick(); + self.bar.enable_steady_tick(Duration::from_millis(75)); + } + + fn for_threads(&self, nthreads: usize) -> Vec { + (0..nthreads).map(|_| { + let bar = indicatif::ProgressBar::new(1); + bar.set_style(indicatif::ProgressStyle::with_template("{spinner} {prefix:.cyan} {wide_msg}").unwrap()); + + ThreadBarkeeper { + message_writer: BarMessageWriter(String::new(), bar.clone()), + print_writer: BarPrintWriter(String::new(), bar.clone()), + mp: self.mp.clone(), + main_bar: self.bar.clone(), + bar, + } + + }).collect() + } +} + +impl ThreadStateTracker for ThreadBarkeeper { + fn out(&mut self) -> &mut impl Write { + &mut self.print_writer + } + + fn status(&mut self) -> &mut impl Write { + &mut self.message_writer + } + + fn start(&self) { + self.mp.add(self.bar.clone()); + self.bar.tick(); + self.bar.enable_steady_tick(Duration::from_millis(75)); + } + + fn job_completed(&self) { + self.main_bar.inc(1) + } + + fn set_prefix(&self, prefix: String) { + self.bar.set_prefix(prefix) + } +} + + +impl Write for BarMessageWriter { + fn write_str(&mut self, s: &str) -> std::fmt::Result { + for c in s.chars() { + self.write_char(c)? + } + Ok(()) + } + + fn write_char(&mut self, c: char) -> std::fmt::Result { + if c == '\n' { + let msg = self.0.clone(); + self.1.set_message(msg); + self.0 = String::new(); + } else { + self.0.push(c); + } + Ok(()) + } +} + +impl Write for BarPrintWriter { + fn write_str(&mut self, s: &str) -> std::fmt::Result { + for c in s.chars() { + self.write_char(c)? + } + Ok(()) + } + + fn write_char(&mut self, c: char) -> std::fmt::Result { + if c == '\n' { + self.1.println(&self.0); + self.0.clear(); + } else { + self.0.push(c) + } + Ok(()) + } +} diff --git a/src/job.rs b/src/job.rs index 5d40075..cc9654f 100644 --- a/src/job.rs +++ b/src/job.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use handlebars::Handlebars; use serde::{Deserialize, Serialize}; +use crate::barkeeper::ThreadStateTracker; use crate::error::*; use crate::queue::JobState; use crate::Options; @@ -177,11 +178,11 @@ impl JobDescription { } impl InnerJobRealization { - pub fn run(&self, status_writer: &mut impl Write, log_writer: &mut impl Write, options: &Options) -> ZinnResult { + pub fn run(&self, tracker: &mut impl ThreadStateTracker, options: &Options) -> ZinnResult { // skip if dry run if options.dry_run { if options.trace { - let _ = writeln!(log_writer, "{}", self.cmd()); + let _ = writeln!(tracker.out(), "{}", self.cmd()); } return Ok(JobState::Finished); } @@ -218,7 +219,7 @@ impl InnerJobRealization { // print out trace if options.trace { - let _ = writeln!(log_writer, "{}", self.cmd()); + let _ = writeln!(tracker.out(), "{}", self.cmd()); } let (io_reader, io_writer) = os_pipe::pipe()?; @@ -234,17 +235,17 @@ impl InnerJobRealization { let mut last_line: Option = None; for line in BufReader::new(io_reader).lines().map_while(Result::ok) { - let _ = writeln!(status_writer, "{}", line); + let _ = writeln!(tracker.status(), "{}", line); if options.verbose { if let Some(line) = last_line.take() { - let _ = writeln!(log_writer, "{}: {}", self, line); + let _ = writeln!(tracker.out(), "{}: {}", self, line); } last_line = Some(line); } } if let Some(line) = last_line.take() { - let _ = writeln!(log_writer, "{}: {}", self, line); + let _ = writeln!(tracker.out(), "{}: {}", self, line); } let status = process.wait()?; diff --git a/src/main.rs b/src/main.rs index 5fa4183..2b3bc74 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,27 +1,25 @@ #![doc = include_str!("../README.md")] +use barkeeper::{StateTracker, ThreadStateTracker}; use clap::Parser; use handlebars::Handlebars; use queue::Queue; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::error::Error; -use std::time::Duration; use std::{fs, thread}; -use indicatif::MultiProgress; -use indicatif::ProgressBar; -use indicatif::ProgressStyle; use error::*; use job::*; +mod barkeeper; +mod constants; mod error; +mod hbextensions; mod job; -mod worker; mod queue; -mod hbextensions; -mod constants; +mod worker; const DOCS_URL: &str = "https://jzbor.de/zinn/zinn"; @@ -177,16 +175,8 @@ fn main() { } // setup bars - let mp = MultiProgress::new(); - let main_bar_style = ProgressStyle::with_template("[{elapsed}] {wide_bar} {pos}/{len}").unwrap(); - let main_bar = ProgressBar::new(zinnfile.jobs.len() as u64); - main_bar.set_style(main_bar_style); - let mut bars: Vec<_> = (0..nthreads).map(|_| { - let bar = ProgressBar::new(10000000); - bar.set_style(ProgressStyle::with_template("{spinner} {prefix:.cyan} {wide_msg}").unwrap()); - mp.add(bar.clone()); - bar - }).collect(); + let barkeeper = barkeeper::Barkeeper::new(); + let mut thread_barkeepers = barkeeper.for_threads(nthreads); // feed the queue let queue = Queue::new(); @@ -202,29 +192,26 @@ fn main() { queue.enqueue(job); } - main_bar.set_length(queue.len() as u64); + barkeeper.set_njobs(queue.len()); // start worker bars - for bar in &mut bars { - bar.tick(); - bar.enable_steady_tick(Duration::from_millis(75)); + for tb in &thread_barkeepers { + tb.start(); } // start the threads let threads: Vec<_> = (0..nthreads).map(|_| { - let main_bar = main_bar.clone(); let queue = queue.clone(); - let bar = bars.pop().unwrap(); + let tb = thread_barkeepers.pop().unwrap(); let options = args.options(); thread::spawn(move || { - worker::run_worker(queue, bar, main_bar, options) + worker::run_worker(queue, tb, options) }) }).collect(); // enable the main bar - mp.add(main_bar.clone()); - main_bar.enable_steady_tick(Duration::from_millis(75)); + barkeeper.start(); // wait for the work to be completed queue.done(); diff --git a/src/worker.rs b/src/worker.rs index ab5295f..c682bb0 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,75 +1,31 @@ use std::fmt::Write; -use indicatif::ProgressBar; - +use crate::barkeeper::ThreadStateTracker; use crate::queue::{JobState, Queue}; use crate::Options; -struct BarMessageWriter(String, ProgressBar); -struct BarPrintWriter(String, ProgressBar); - -impl Write for BarMessageWriter { - fn write_str(&mut self, s: &str) -> std::fmt::Result { - for c in s.chars() { - self.write_char(c)? - } - Ok(()) - } - - fn write_char(&mut self, c: char) -> std::fmt::Result { - if c == '\n' { - let msg = self.0.clone(); - self.1.set_message(msg); - self.0 = String::new(); - } else { - self.0.push(c); - } - Ok(()) - } -} - -impl Write for BarPrintWriter { - fn write_str(&mut self, s: &str) -> std::fmt::Result { - for c in s.chars() { - self.write_char(c)? - } - Ok(()) - } - - fn write_char(&mut self, c: char) -> std::fmt::Result { - if c == '\n' { - self.1.println(&self.0); - self.0.clear(); - } else { - self.0.push(c) - } - Ok(()) - } -} - -pub fn run_worker(queue: Queue, bar: ProgressBar, main_bar: ProgressBar, options: Options) { - let mut status_writer = BarMessageWriter(String::new(), bar.clone()); - let mut log_writer = BarPrintWriter(String::new(), bar.clone()); - +pub fn run_worker(queue: Queue, mut tracker: impl ThreadStateTracker, options: Options) { loop { - bar.set_prefix("waiting..."); - bar.set_message(""); + tracker.set_prefix(String::from("waiting...")); + let _ = writeln!(tracker.status(), ""); + if let Some(job) = queue.fetch() { - bar.set_prefix(job.to_string()); - if let Ok(state) = job.run(&mut status_writer, &mut log_writer, &options) { - match state { - JobState::Finished => bar.println(console::style(format!("=> DONE {}", job)).green().to_string()), - JobState::Skipped => bar.println(console::style(format!("=> SKIPPED {}", job)).yellow().to_string()), - JobState::Failed => bar.println(console::style(format!("=> FAILED {}", job)).red().to_string()), + tracker.set_prefix(job.to_string()); + if let Ok(state) = job.run(&mut tracker, &options) { + let msg = match state { + JobState::Finished => console::style(format!("=> DONE {}", job)).green().to_string(), + JobState::Skipped => console::style(format!("=> SKIPPED {}", job)).yellow().to_string(), + JobState::Failed => console::style(format!("=> FAILED {}", job)).red().to_string(), _ => panic!("Invalid job state after run: {:?}", state), - - } + }; + let _ = writeln!(tracker.out(), "{}", msg); queue.finished(job); - } else if let Err(e) = job.run(&mut status_writer, &mut log_writer, &options) { - bar.println(console::style(format!("=> FAILED {}: {}", job, e)).red().to_string()); + } else if let Err(e) = job.run(&mut tracker, &options) { + let msg = console::style(format!("=> FAILED {}: {}", job, e)).red().to_string(); + let _ = writeln!(tracker.out(), "{}", msg); queue.failed(job); } - main_bar.inc(1); + tracker.job_completed(); } else { break; }