Skip to content

Commit

Permalink
Add CommandExt for working with std::process::Command output stre…
Browse files Browse the repository at this point in the history
…ams (#535)
  • Loading branch information
Malax authored Dec 15, 2022
1 parent 39edef1 commit 2c9c642
Show file tree
Hide file tree
Showing 6 changed files with 426 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ separate changelogs for each crate were used. If you need to refer to these old
- libcnb: Drop the use of the `stacker` crate when recursively removing layer directories. ([#517](https://github.com/heroku/libcnb.rs/pull/517))
- libcnb-cargo: Updated to Clap v4. ([#511](https://github.com/heroku/libcnb.rs/pull/511))

## Added

- libherokubuildpack: Add `command` and `write` modules for working with `std::process::Command` output streams. ([#535](https://github.com/heroku/libcnb.rs/pull/535))

## [0.11.1] 2022-09-29

### Fixed
Expand Down
5 changes: 4 additions & 1 deletion libherokubuildpack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@ include = ["src/**/*", "LICENSE", "README.md"]
all-features = true

[features]
default = ["download", "digest", "error", "log", "tar", "toml", "fs"]
default = ["command", "download", "digest", "error", "log", "tar", "toml", "fs", "write"]
download = ["dep:ureq", "dep:thiserror"]
digest = ["dep:sha2"]
error = ["log", "dep:libcnb"]
log = ["dep:termcolor"]
tar = ["dep:tar", "dep:flate2"]
toml = ["dep:toml"]
fs = ["dep:pathdiff"]
command = ["write", "dep:crossbeam-utils"]
write = []

[dependencies]
crossbeam-utils = { version = "0.8.2", optional = true }
flate2 = { version = "1.0.24", optional = true }
libcnb = { workspace = true, optional = true }
pathdiff = { version = "0.2.1", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions libherokubuildpack/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ uses Cargo features to allow opt-out of certain modules if they're not needed.

The feature names line up with the modules in this crate. All features are enabled by default.

* **command** -
Enabled helpers to work with `std::process::Command`.
* **download** -
Enables helpers to download files over HTTP.
* **digest** -
Expand All @@ -28,6 +30,8 @@ The feature names line up with the modules in this crate. All features are enabl
Enables helpers for working with TOML data.
* **fs** -
Enables helpers for filesystem related tasks.
* **write** -
Enables `std::io::Write` proxy implementations.

[Docs]: https://img.shields.io/docsrs/libherokubuildpack
[docs.rs]: https://docs.rs/libherokubuildpack/latest/libherokubuildpack/
Expand Down
185 changes: 185 additions & 0 deletions libherokubuildpack/src/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
use crate::write::tee;
use crossbeam_utils::thread::ScopedJoinHandle;
use std::io::Write;
use std::{io, process, thread};
use std::{mem, panic};

/// Extension trait for [`process::Command`] that adds functions for use within buildpacks.
pub trait CommandExt {
/// Spawns the command process and sends the output of stdout and stderr to the given writers.
///
/// This allows for additional flexibility when dealing with these output streams compared to
/// functionality that the stock [`process::Command`] provides. See the [`write`](crate::write)
/// module for [`std::io::Write`] implementations designed for common buildpack tasks.
///
/// This function will redirect the output unbuffered and in parallel for both streams. This
/// means that it can be used to output data from these streams while the command is running,
/// providing a live view into the process' output. This function will block until both streams
/// have been closed.
///
/// # Example:
/// ```no_run
/// use libherokubuildpack::command::CommandExt;
/// use libherokubuildpack::write::tee;
/// use std::fs;
/// use std::process::Command;
///
/// let logfile = fs::File::open("log.txt").unwrap();
/// let exit_status = Command::new("date")
/// .spawn_and_write_streams(tee(std::io::stdout(), &logfile), std::io::stderr())
/// .and_then(|mut child| child.wait())
/// .unwrap();
/// ```
fn spawn_and_write_streams<OW: Write + Send, EW: Write + Send>(
&mut self,
stdout_write: OW,
stderr_write: EW,
) -> io::Result<process::Child>;

/// Spawns the command process and sends the output of stdout and stderr to the given writers.
///
/// In addition to what [`spawn_and_write_streams`](Self::spawn_and_write_streams) does, this
/// function captures stdout and stderr as `Vec<u8>` and returns them after waiting for the
/// process to finish. This function is meant as a drop-in replacement for existing
/// `Command:output` calls.
///
/// # Example:
/// ```no_run
/// use libherokubuildpack::command::CommandExt;
/// use libherokubuildpack::write::tee;
/// use std::fs;
/// use std::process::Command;
///
/// let logfile = fs::File::open("log.txt").unwrap();
/// let output = Command::new("date")
/// .output_and_write_streams(tee(std::io::stdout(), &logfile), std::io::stderr())
/// .unwrap();
///
/// // Return value can be used as with Command::output, but the streams will also be written to
/// // the given writers.
/// println!(
/// "Process exited with {}, stdout: {:?}, stderr: {:?}",
/// output.status, output.stdout, output.stderr
/// );
/// ```
fn output_and_write_streams<OW: Write + Send, EW: Write + Send>(
&mut self,
stdout_write: OW,
stderr_write: EW,
) -> io::Result<process::Output>;
}

impl CommandExt for process::Command {
fn spawn_and_write_streams<OW: Write + Send, EW: Write + Send>(
&mut self,
stdout_write: OW,
stderr_write: EW,
) -> io::Result<process::Child> {
self.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.spawn()
.and_then(|child| write_child_process_output(child, stdout_write, stderr_write))
}

fn output_and_write_streams<OW: Write + Send, EW: Write + Send>(
&mut self,
stdout_write: OW,
stderr_write: EW,
) -> io::Result<process::Output> {
let mut stdout_buffer = vec![];
let mut stderr_buffer = vec![];

self.spawn_and_write_streams(
tee(&mut stdout_buffer, stdout_write),
tee(&mut stderr_buffer, stderr_write),
)
.and_then(|mut child| child.wait())
.map(|status| process::Output {
status,
stdout: stdout_buffer,
stderr: stderr_buffer,
})
}
}

fn write_child_process_output<OW: Write + Send, EW: Write + Send>(
mut child: process::Child,
mut stdout_writer: OW,
mut stderr_writer: EW,
) -> io::Result<process::Child> {
// Copying the data to the writers happens in separate threads for stdout and stderr to ensure
// they're processed in parallel. Example: imagine the caller uses io::stdout() and io::stderr()
// as the writers so that the user can follow along with the command's output. If we copy stdout
// first and then stderr afterwards, interleaved stdout and stderr messages will no longer be
// interleaved (stderr output is always printed after stdout has been closed).
//
// The rust compiler currently cannot figure out how long a thread will run (doesn't take the
// almost immediate join calls into account) and therefore requires that data used in a thread
// lives forever. To avoid requiring 'static lifetimes for the writers, we use crossbeam's
// scoped threads here. This enables writers that write, for example, to a mutable buffer.
unwind_panic(crossbeam_utils::thread::scope(|scope| {
let stdout_copy_thread = mem::take(&mut child.stdout)
.map(|mut stdout| scope.spawn(move |_| std::io::copy(&mut stdout, &mut stdout_writer)));

let stderr_copy_thread = mem::take(&mut child.stderr)
.map(|mut stderr| scope.spawn(move |_| std::io::copy(&mut stderr, &mut stderr_writer)));

let stdout_copy_result = stdout_copy_thread.map_or_else(|| Ok(0), join_and_unwind_panic);
let stderr_copy_result = stderr_copy_thread.map_or_else(|| Ok(0), join_and_unwind_panic);

// Return the first error from either Result, or the child process value
stdout_copy_result.and(stderr_copy_result).map(|_| child)
}))
}

fn join_and_unwind_panic<T>(h: ScopedJoinHandle<T>) -> T {
unwind_panic(h.join())
}

fn unwind_panic<T>(t: thread::Result<T>) -> T {
match t {
Ok(value) => value,
Err(err) => panic::resume_unwind(err),
}
}

#[cfg(test)]
mod test {
use crate::command::CommandExt;
use std::process::Command;

#[test]
#[cfg(unix)]
fn test_spawn_and_write_streams() {
let mut stdout_buf = vec![];
let mut stderr_buf = vec![];

Command::new("echo")
.args(["-n", "Hello World!"])
.spawn_and_write_streams(&mut stdout_buf, &mut stderr_buf)
.and_then(|mut child| child.wait())
.unwrap();

assert_eq!(stdout_buf, "Hello World!".as_bytes());
assert_eq!(stderr_buf, Vec::<u8>::new());
}

#[test]
#[cfg(unix)]
fn test_output_and_write_streams() {
let mut stdout_buf = vec![];
let mut stderr_buf = vec![];

let output = Command::new("echo")
.args(["-n", "Hello World!"])
.output_and_write_streams(&mut stdout_buf, &mut stderr_buf)
.unwrap();

assert_eq!(stdout_buf, "Hello World!".as_bytes());
assert_eq!(stderr_buf, Vec::<u8>::new());

assert_eq!(output.status.code(), Some(0));
assert_eq!(output.stdout, "Hello World!".as_bytes());
assert_eq!(output.stderr, Vec::<u8>::new());
}
}
4 changes: 4 additions & 0 deletions libherokubuildpack/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
// This lint is too noisy and enforces a style that reduces readability in many cases.
#![allow(clippy::module_name_repetitions)]

#[cfg(feature = "command")]
pub mod command;
#[cfg(feature = "digest")]
pub mod digest;
#[cfg(feature = "download")]
Expand All @@ -21,3 +23,5 @@ pub mod log;
pub mod tar;
#[cfg(feature = "toml")]
pub mod toml;
#[cfg(feature = "write")]
pub mod write;
Loading

0 comments on commit 2c9c642

Please sign in to comment.