From 2c9c642871b82ce8da33bd49db757fab90c54171 Mon Sep 17 00:00:00 2001 From: Manuel Fuchs Date: Thu, 15 Dec 2022 12:03:32 +0100 Subject: [PATCH] Add `CommandExt` for working with `std::process::Command` output streams (#535) --- CHANGELOG.md | 4 + libherokubuildpack/Cargo.toml | 5 +- libherokubuildpack/README.md | 4 + libherokubuildpack/src/command.rs | 185 ++++++++++++++++++++++++ libherokubuildpack/src/lib.rs | 4 + libherokubuildpack/src/write.rs | 225 ++++++++++++++++++++++++++++++ 6 files changed, 426 insertions(+), 1 deletion(-) create mode 100644 libherokubuildpack/src/command.rs create mode 100644 libherokubuildpack/src/write.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d7c102b..26f673e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,10 @@ separate changelogs for each crate were used. If you need to refer to these old - libcnb: Drop the use of the `stacker` crate when recursively removing layer directories. ([#517](https://github.com/heroku/libcnb.rs/pull/517)) - libcnb-cargo: Updated to Clap v4. ([#511](https://github.com/heroku/libcnb.rs/pull/511)) +## Added + +- libherokubuildpack: Add `command` and `write` modules for working with `std::process::Command` output streams. ([#535](https://github.com/heroku/libcnb.rs/pull/535)) + ## [0.11.1] 2022-09-29 ### Fixed diff --git a/libherokubuildpack/Cargo.toml b/libherokubuildpack/Cargo.toml index a964d82f..015841cf 100644 --- a/libherokubuildpack/Cargo.toml +++ b/libherokubuildpack/Cargo.toml @@ -15,7 +15,7 @@ include = ["src/**/*", "LICENSE", "README.md"] all-features = true [features] -default = ["download", "digest", "error", "log", "tar", "toml", "fs"] +default = ["command", "download", "digest", "error", "log", "tar", "toml", "fs", "write"] download = ["dep:ureq", "dep:thiserror"] digest = ["dep:sha2"] error = ["log", "dep:libcnb"] @@ -23,8 +23,11 @@ log = ["dep:termcolor"] tar = ["dep:tar", "dep:flate2"] toml = ["dep:toml"] fs = ["dep:pathdiff"] +command = ["write", "dep:crossbeam-utils"] +write = [] [dependencies] +crossbeam-utils = { version = "0.8.2", optional = true } flate2 = { version = "1.0.24", optional = true } libcnb = { workspace = true, optional = true } pathdiff = { version = "0.2.1", optional = true } diff --git a/libherokubuildpack/README.md b/libherokubuildpack/README.md index 67f0d5a4..91f75470 100644 --- a/libherokubuildpack/README.md +++ b/libherokubuildpack/README.md @@ -14,6 +14,8 @@ uses Cargo features to allow opt-out of certain modules if they're not needed. The feature names line up with the modules in this crate. All features are enabled by default. +* **command** - + Enabled helpers to work with `std::process::Command`. * **download** - Enables helpers to download files over HTTP. * **digest** - @@ -28,6 +30,8 @@ The feature names line up with the modules in this crate. All features are enabl Enables helpers for working with TOML data. * **fs** - Enables helpers for filesystem related tasks. +* **write** - + Enables `std::io::Write` proxy implementations. [Docs]: https://img.shields.io/docsrs/libherokubuildpack [docs.rs]: https://docs.rs/libherokubuildpack/latest/libherokubuildpack/ diff --git a/libherokubuildpack/src/command.rs b/libherokubuildpack/src/command.rs new file mode 100644 index 00000000..ac37e420 --- /dev/null +++ b/libherokubuildpack/src/command.rs @@ -0,0 +1,185 @@ +use crate::write::tee; +use crossbeam_utils::thread::ScopedJoinHandle; +use std::io::Write; +use std::{io, process, thread}; +use std::{mem, panic}; + +/// Extension trait for [`process::Command`] that adds functions for use within buildpacks. +pub trait CommandExt { + /// Spawns the command process and sends the output of stdout and stderr to the given writers. + /// + /// This allows for additional flexibility when dealing with these output streams compared to + /// functionality that the stock [`process::Command`] provides. See the [`write`](crate::write) + /// module for [`std::io::Write`] implementations designed for common buildpack tasks. + /// + /// This function will redirect the output unbuffered and in parallel for both streams. This + /// means that it can be used to output data from these streams while the command is running, + /// providing a live view into the process' output. This function will block until both streams + /// have been closed. + /// + /// # Example: + /// ```no_run + /// use libherokubuildpack::command::CommandExt; + /// use libherokubuildpack::write::tee; + /// use std::fs; + /// use std::process::Command; + /// + /// let logfile = fs::File::open("log.txt").unwrap(); + /// let exit_status = Command::new("date") + /// .spawn_and_write_streams(tee(std::io::stdout(), &logfile), std::io::stderr()) + /// .and_then(|mut child| child.wait()) + /// .unwrap(); + /// ``` + fn spawn_and_write_streams( + &mut self, + stdout_write: OW, + stderr_write: EW, + ) -> io::Result; + + /// Spawns the command process and sends the output of stdout and stderr to the given writers. + /// + /// In addition to what [`spawn_and_write_streams`](Self::spawn_and_write_streams) does, this + /// function captures stdout and stderr as `Vec` and returns them after waiting for the + /// process to finish. This function is meant as a drop-in replacement for existing + /// `Command:output` calls. + /// + /// # Example: + /// ```no_run + /// use libherokubuildpack::command::CommandExt; + /// use libherokubuildpack::write::tee; + /// use std::fs; + /// use std::process::Command; + /// + /// let logfile = fs::File::open("log.txt").unwrap(); + /// let output = Command::new("date") + /// .output_and_write_streams(tee(std::io::stdout(), &logfile), std::io::stderr()) + /// .unwrap(); + /// + /// // Return value can be used as with Command::output, but the streams will also be written to + /// // the given writers. + /// println!( + /// "Process exited with {}, stdout: {:?}, stderr: {:?}", + /// output.status, output.stdout, output.stderr + /// ); + /// ``` + fn output_and_write_streams( + &mut self, + stdout_write: OW, + stderr_write: EW, + ) -> io::Result; +} + +impl CommandExt for process::Command { + fn spawn_and_write_streams( + &mut self, + stdout_write: OW, + stderr_write: EW, + ) -> io::Result { + self.stdout(process::Stdio::piped()) + .stderr(process::Stdio::piped()) + .spawn() + .and_then(|child| write_child_process_output(child, stdout_write, stderr_write)) + } + + fn output_and_write_streams( + &mut self, + stdout_write: OW, + stderr_write: EW, + ) -> io::Result { + let mut stdout_buffer = vec![]; + let mut stderr_buffer = vec![]; + + self.spawn_and_write_streams( + tee(&mut stdout_buffer, stdout_write), + tee(&mut stderr_buffer, stderr_write), + ) + .and_then(|mut child| child.wait()) + .map(|status| process::Output { + status, + stdout: stdout_buffer, + stderr: stderr_buffer, + }) + } +} + +fn write_child_process_output( + mut child: process::Child, + mut stdout_writer: OW, + mut stderr_writer: EW, +) -> io::Result { + // Copying the data to the writers happens in separate threads for stdout and stderr to ensure + // they're processed in parallel. Example: imagine the caller uses io::stdout() and io::stderr() + // as the writers so that the user can follow along with the command's output. If we copy stdout + // first and then stderr afterwards, interleaved stdout and stderr messages will no longer be + // interleaved (stderr output is always printed after stdout has been closed). + // + // The rust compiler currently cannot figure out how long a thread will run (doesn't take the + // almost immediate join calls into account) and therefore requires that data used in a thread + // lives forever. To avoid requiring 'static lifetimes for the writers, we use crossbeam's + // scoped threads here. This enables writers that write, for example, to a mutable buffer. + unwind_panic(crossbeam_utils::thread::scope(|scope| { + let stdout_copy_thread = mem::take(&mut child.stdout) + .map(|mut stdout| scope.spawn(move |_| std::io::copy(&mut stdout, &mut stdout_writer))); + + let stderr_copy_thread = mem::take(&mut child.stderr) + .map(|mut stderr| scope.spawn(move |_| std::io::copy(&mut stderr, &mut stderr_writer))); + + let stdout_copy_result = stdout_copy_thread.map_or_else(|| Ok(0), join_and_unwind_panic); + let stderr_copy_result = stderr_copy_thread.map_or_else(|| Ok(0), join_and_unwind_panic); + + // Return the first error from either Result, or the child process value + stdout_copy_result.and(stderr_copy_result).map(|_| child) + })) +} + +fn join_and_unwind_panic(h: ScopedJoinHandle) -> T { + unwind_panic(h.join()) +} + +fn unwind_panic(t: thread::Result) -> T { + match t { + Ok(value) => value, + Err(err) => panic::resume_unwind(err), + } +} + +#[cfg(test)] +mod test { + use crate::command::CommandExt; + use std::process::Command; + + #[test] + #[cfg(unix)] + fn test_spawn_and_write_streams() { + let mut stdout_buf = vec![]; + let mut stderr_buf = vec![]; + + Command::new("echo") + .args(["-n", "Hello World!"]) + .spawn_and_write_streams(&mut stdout_buf, &mut stderr_buf) + .and_then(|mut child| child.wait()) + .unwrap(); + + assert_eq!(stdout_buf, "Hello World!".as_bytes()); + assert_eq!(stderr_buf, Vec::::new()); + } + + #[test] + #[cfg(unix)] + fn test_output_and_write_streams() { + let mut stdout_buf = vec![]; + let mut stderr_buf = vec![]; + + let output = Command::new("echo") + .args(["-n", "Hello World!"]) + .output_and_write_streams(&mut stdout_buf, &mut stderr_buf) + .unwrap(); + + assert_eq!(stdout_buf, "Hello World!".as_bytes()); + assert_eq!(stderr_buf, Vec::::new()); + + assert_eq!(output.status.code(), Some(0)); + assert_eq!(output.stdout, "Hello World!".as_bytes()); + assert_eq!(output.stderr, Vec::::new()); + } +} diff --git a/libherokubuildpack/src/lib.rs b/libherokubuildpack/src/lib.rs index e1276bdd..35693e9b 100644 --- a/libherokubuildpack/src/lib.rs +++ b/libherokubuildpack/src/lib.rs @@ -7,6 +7,8 @@ // This lint is too noisy and enforces a style that reduces readability in many cases. #![allow(clippy::module_name_repetitions)] +#[cfg(feature = "command")] +pub mod command; #[cfg(feature = "digest")] pub mod digest; #[cfg(feature = "download")] @@ -21,3 +23,5 @@ pub mod log; pub mod tar; #[cfg(feature = "toml")] pub mod toml; +#[cfg(feature = "write")] +pub mod write; diff --git a/libherokubuildpack/src/write.rs b/libherokubuildpack/src/write.rs new file mode 100644 index 00000000..d62de446 --- /dev/null +++ b/libherokubuildpack/src/write.rs @@ -0,0 +1,225 @@ +use std::fmt::{Debug, Formatter}; +use std::io; +use std::mem; +use std::sync::Arc; + +/// Constructs a writer that buffers written data until given marker byte is encountered and +/// then applies the given mapping function to the data before passing the result to the wrapped +/// writer. +/// +/// See the [`mappers`] module for a collection of commonly used mappers. +pub fn mapped) -> Vec) + Sync + Send + 'static>( + w: W, + marker_byte: u8, + f: F, +) -> MappedWrite { + MappedWrite { + inner: w, + marker_byte, + buffer: vec![], + mapping_fn: Arc::new(f), + } +} + +/// Constructs a writer that buffers written data until an ASCII/UTF-8 newline byte (`0x0A`) is +/// encountered and then applies the given mapping function to the data before passing the result to +/// the wrapped writer. +/// +/// See the [`mappers`] module for a collection of commonly used mappers. +pub fn line_mapped) -> Vec) + Sync + Send + 'static>( + w: W, + f: F, +) -> MappedWrite { + mapped(w, NEWLINE_ASCII_BYTE, f) +} + +/// Constructs a writer that writes to two other writers. Similar to the UNIX `tee` command. +pub fn tee(a: A, b: B) -> TeeWrite { + TeeWrite { + inner_a: a, + inner_b: b, + } +} + +/// A mapped writer that was created with the [`mapped`] or [`line_mapped`] function. +#[derive(Clone)] +pub struct MappedWrite { + inner: W, + marker_byte: u8, + buffer: Vec, + mapping_fn: Arc) -> Vec) + Sync + Send>, +} + +/// A tee writer that was created with the [`tee`] function. +#[derive(Debug, Clone)] +pub struct TeeWrite { + inner_a: A, + inner_b: B, +} + +impl MappedWrite { + fn map_and_write_current_buffer(&mut self) -> io::Result<()> { + self.inner + .write_all(&(self.mapping_fn)(mem::take(&mut self.buffer))) + } +} + +impl io::Write for MappedWrite { + fn write(&mut self, buf: &[u8]) -> io::Result { + for byte in buf { + self.buffer.push(*byte); + + if *byte == self.marker_byte { + self.map_and_write_current_buffer()?; + } + } + + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} + +impl Drop for MappedWrite { + fn drop(&mut self) { + // Drop implementations must not panic. We intentionally ignore the potential error here. + let _result = self.map_and_write_current_buffer(); + } +} + +impl Debug for MappedWrite { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MappedWrite") + .field("inner", &self.inner) + .field("marker_byte", &self.marker_byte) + .field("buffer", &self.buffer) + .finish() + } +} + +impl io::Write for TeeWrite { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner_a.write_all(buf)?; + self.inner_b.write_all(buf)?; + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner_a.flush()?; + self.inner_b.flush() + } +} + +const NEWLINE_ASCII_BYTE: u8 = 0x0Au8; + +#[cfg(test)] +mod test { + use super::tee; + use crate::write::line_mapped; + + #[test] + fn test_tee_write() { + let mut a = vec![]; + let mut b = vec![]; + + let mut input = "foo bar baz".as_bytes(); + std::io::copy(&mut input, &mut tee(&mut a, &mut b)).unwrap(); + + assert_eq!(a, "foo bar baz".as_bytes()); + assert_eq!(a, b); + } + + #[test] + fn test_mapped_write() { + let mut output = vec![]; + + let mut input = "foo\nbar\nbaz".as_bytes(); + std::io::copy( + &mut input, + &mut line_mapped(&mut output, |line| line.repeat(2)), + ) + .unwrap(); + + assert_eq!(output, "foo\nfoo\nbar\nbar\nbazbaz".as_bytes()); + } +} + +/// Mapper functions for use with the [`mapped`] and [`line_mapped`] functions. +pub mod mappers { + /// Adds a prefix. + /// + /// # Example + /// ```no_run + /// use libherokubuildpack::write::line_mapped; + /// use libherokubuildpack::write::mappers::add_prefix; + /// use libherokubuildpack::command::CommandExt; + /// use std::process::Command; + /// + /// Command::new("date") + /// .spawn_and_write_streams( + /// line_mapped( + /// std::io::stdout(), + /// add_prefix("date stdout> "), + /// ), + /// std::io::stderr(), + /// ) + /// .and_then(|mut child| child.wait()) + /// .unwrap(); + /// ``` + pub fn add_prefix>>(prefix: P) -> impl Fn(Vec) -> Vec { + let prefix = prefix.into(); + + move |mut input| { + let mut result = prefix.clone(); + result.append(&mut input); + result + } + } + + /// Allows mapping the data as an UTF-8 string that was lossy converted from the data to be mapped. + /// + /// # Example + /// ```no_run + /// use libherokubuildpack::write::line_mapped; + /// use libherokubuildpack::write::mappers::map_utf8_lossy; + /// use libherokubuildpack::command::CommandExt; + /// use std::process::Command; + /// + /// Command::new("date") + /// .spawn_and_write_streams( + /// line_mapped( + /// std::io::stdout(), + /// map_utf8_lossy(|string| string.replace("foo", "bar")), + /// ), + /// std::io::stderr(), + /// ) + /// .and_then(|mut child| child.wait()) + /// .unwrap(); + /// ``` + pub fn map_utf8_lossy String>(f: F) -> impl Fn(Vec) -> Vec { + move |input| f(String::from_utf8_lossy(&input).to_string()).into_bytes() + } + + #[cfg(test)] + mod test { + use super::add_prefix; + use super::map_utf8_lossy; + + #[test] + fn test_add_prefix() { + let result = (add_prefix(">> "))(String::from("Hello World!").into_bytes()); + assert_eq!(result, String::from(">> Hello World!").into_bytes()); + } + + #[test] + fn test_map_utf8_lossy() { + let result = (map_utf8_lossy(|input| input.replace("foo", "bar")))( + String::from("foo = foo").into_bytes(), + ); + + assert_eq!(result, String::from("bar = bar").into_bytes()); + } + } +}