diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..021504c --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,71 @@ +use std::future::Future; + +use ::log::debug; + +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +pub async fn maybe_recv(receiver: &mut Option>) -> Option> { + match receiver { + Some(receiver) => Some(receiver.recv().await), + None => None, + } +} + +pub fn maybe_spawn_tee( + receiver: Option>, +) -> (Option>, Option>) { + match receiver { + Some(receiver) => { + let (first_receiver, second_receiver) = spawn_tee(receiver); + (Some(first_receiver), Some(second_receiver)) + } + None => (None, None), + } +} + +// An utility function that takes an unbounded receiver and returns two +// unbounded receivers that will receive the same items, spawning a task +// to read from the given receiver and write to the returned receivers. +// The items must implement the `Clone` trait. +pub fn spawn_tee( + receiver: UnboundedReceiver, +) -> (UnboundedReceiver, UnboundedReceiver) { + let (future, first_receiver, second_receiver) = tee(receiver); + + tokio::spawn(future); + + (first_receiver, second_receiver) +} + +fn tee( + receiver: UnboundedReceiver, +) -> ( + impl Future, + UnboundedReceiver, + UnboundedReceiver, +) { + let (first_sender, first_receiver) = unbounded_channel(); + let (second_sender, second_receiver) = unbounded_channel(); + + let future = tee_loop(receiver, first_sender, second_sender); + + (future, first_receiver, second_receiver) +} + +async fn tee_loop( + mut receiver: UnboundedReceiver, + first_sender: UnboundedSender, + second_sender: UnboundedSender, +) { + while let Some(item) = receiver.recv().await { + if let Err(err) = first_sender.send(item.clone()) { + debug!("error sending item to first receiver: {}", err); + break; + } + + if let Err(err) = second_sender.send(item) { + debug!("error sending item to second receiver: {}", err); + break; + } + } +} diff --git a/src/cli.rs b/src/cli.rs index ca4e4e6..c2d776a 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,6 +1,7 @@ use std::ffi::OsString; use crate::check_in::{CheckInConfig, CronConfig, HeartbeatConfig}; +use crate::error::ErrorConfig; use crate::log::{LogConfig, LogOrigin}; use ::log::warn; @@ -43,6 +44,13 @@ pub struct Cli { #[arg(long, value_name = "GROUP")] log: Option, + /// The action name to use to group errors by. + /// + /// If this option is not set, errors will not be sent to AppSignal when + /// a process exits with a non-zero exit code. + #[arg(long, value_name = "ACTION", requires = "api_key")] + error: Option, + /// The log source API key to use to send logs. /// /// If this option is not set, logs will be sent to the default @@ -71,11 +79,19 @@ pub struct Cli { #[arg(long, value_name = "IDENTIFIER", requires = "api_key")] cron: Option, - /// Do not send standard output as logs. + /// Do not send standard output. + /// + /// Do not send standard output as logs, and do not use the last + /// lines of standard output as part of the error message when + /// `--error` is set. #[arg(long)] no_stdout: bool, - /// Do not send standard error as logs. + /// Do not send standard error. + /// + /// Do not send standard error as logs, and do not use the last + /// lines of standard error as part of the error message when + /// `--error` is set. #[arg(long)] no_stderr: bool, @@ -238,7 +254,7 @@ impl Cli { .unwrap() .clone(); let endpoint = self.endpoint.clone(); - let origin = LogOrigin::from_args(self.no_log, self.no_stdout, self.no_stderr); + let origin = self.log_origin(); let group = self.log.clone().unwrap_or_else(|| "process".to_string()); let hostname = self.hostname.clone(); let digest: String = self.digest.clone(); @@ -252,6 +268,48 @@ impl Cli { digest, } } + + pub fn error(&self) -> Option { + self.error.as_ref().map(|action| { + let api_key = self.api_key.as_ref().unwrap().clone(); + let endpoint = self.endpoint.clone(); + let action = action.clone(); + let hostname = self.hostname.clone(); + let digest = self.digest.clone(); + + ErrorConfig { + api_key, + endpoint, + action, + hostname, + digest, + } + }) + } + + fn log_origin(&self) -> LogOrigin { + LogOrigin::from_args(self.no_log, self.no_stdout, self.no_stderr) + } + + pub fn should_pipe_stderr(&self) -> bool { + // If `--error` is set, we need to pipe stderr for the error message, + // even if we're not sending logs, unless `--no-stderr` is set. + if self.error.is_some() { + return !self.no_stderr; + } + + self.log_origin().is_err() + } + + pub fn should_pipe_stdout(&self) -> bool { + // If `--error` is set, we need to pipe stdout for the error message, + // even if we're not sending logs, unless `--no-stdout` is set. + if self.error.is_some() { + return !self.no_stdout; + } + + self.log_origin().is_out() + } } #[cfg(test)] diff --git a/src/client.rs b/src/client.rs index 2ca17de..c414f80 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,7 @@ use reqwest::{Client, ClientBuilder}; +use ::log::{debug, trace}; + use crate::package::{NAME, VERSION}; pub fn client() -> Client { @@ -8,3 +10,26 @@ pub fn client() -> Client { .build() .unwrap() } + +pub async fn send_request(request: Result) { + let request = match request { + Ok(request) => request, + Err(err) => { + debug!("error creating request: {}", err); + return; + } + }; + + match client().execute(request.try_clone().unwrap()).await { + Ok(response) => { + if !response.status().is_success() { + debug!("request failed with status: {}", response.status()); + } else { + trace!("request successful: {}", request.url()); + } + } + Err(err) => { + debug!("error sending request: {:?}", err); + } + }; +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..6416e0c --- /dev/null +++ b/src/error.rs @@ -0,0 +1,182 @@ +use std::collections::BTreeMap; +use std::os::unix::process::ExitStatusExt; +use std::process::ExitStatus; + +use reqwest::Body; +use serde::Serialize; + +use crate::client::client; +use crate::package::NAME; +use crate::signal::signal_name; +use crate::timestamp::Timestamp; + +pub struct ErrorConfig { + pub api_key: String, + pub endpoint: String, + pub action: String, + pub hostname: String, + pub digest: String, +} + +impl ErrorConfig { + pub fn request( + &self, + timestamp: &mut impl Timestamp, + exit: &ExitStatus, + lines: impl IntoIterator, + ) -> Result { + let url = format!("{}/errors", self.endpoint); + + client() + .post(url) + .query(&[("api_key", &self.api_key)]) + .header("Content-Type", "application/json") + .body(ErrorBody::from_config(self, timestamp, exit, lines)) + .build() + } +} + +#[derive(Serialize)] +pub struct ErrorBody { + pub timestamp: u64, + pub action: String, + pub namespace: String, + pub error: ErrorBodyError, + pub tags: BTreeMap, +} + +impl ErrorBody { + pub fn from_config( + config: &ErrorConfig, + timestamp: &mut impl Timestamp, + exit: &ExitStatus, + lines: impl IntoIterator, + ) -> Self { + ErrorBody { + timestamp: timestamp.as_secs(), + action: config.action.clone(), + namespace: "process".to_string(), + error: ErrorBodyError::new(exit, lines), + tags: exit_tags(exit) + .into_iter() + .chain([ + ("hostname".to_string(), config.hostname.clone()), + (format!("{}-digest", NAME), config.digest.clone()), + ]) + .collect(), + } + } +} + +impl From for Body { + fn from(body: ErrorBody) -> Self { + Body::from(serde_json::to_string(&body).unwrap()) + } +} + +#[derive(Serialize)] +pub struct ErrorBodyError { + pub name: String, + pub message: String, +} + +impl ErrorBodyError { + pub fn new(exit: &ExitStatus, lines: impl IntoIterator) -> Self { + let (name, exit_context) = if let Some(code) = exit.code() { + ("NonZeroExit".to_string(), format!("code {}", code)) + } else if let Some(signal) = exit.signal() { + ( + "SignalExit".to_string(), + format!("signal {}", signal_name(signal)), + ) + } else { + ("UnknownExit".to_string(), "unknown status".to_string()) + }; + + let mut lines = lines.into_iter().collect::>(); + lines.push(format!("[Process exited with {}]", exit_context)); + + let message = lines.join("\n"); + + ErrorBodyError { name, message } + } +} + +fn exit_tags(exit: &ExitStatus) -> BTreeMap { + if let Some(code) = exit.code() { + [ + ("exit_code".to_string(), format!("{}", code)), + ("exit_kind".to_string(), "code".to_string()), + ] + .into() + } else if let Some(signal) = exit.signal() { + [ + ("exit_signal".to_string(), signal_name(signal)), + ("exit_kind".to_string(), "signal".to_string()), + ] + .into() + } else { + [("exit_kind".to_string(), "unknown".to_string())].into() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::timestamp::tests::{timestamp, EXPECTED_SECS}; + + fn error_config() -> ErrorConfig { + ErrorConfig { + api_key: "some_api_key".to_string(), + endpoint: "https://some-endpoint.com".to_string(), + hostname: "some-hostname".to_string(), + digest: "some-digest".to_string(), + action: "some-action".to_string(), + } + } + + #[test] + fn error_config_request() { + let config = error_config(); + // `ExitStatus::from_raw` expects a wait status, not an exit status. + // The wait status for exit code `n` is represented by `n << 8`. + // See `__WEXITSTATUS` in `glibc/bits/waitstatus.h` for reference. + let exit = ExitStatus::from_raw(42 << 8); + let lines = vec!["line 1".to_string(), "line 2".to_string()]; + + let request = config.request(&mut timestamp(), &exit, lines).unwrap(); + + assert_eq!(request.method().as_str(), "POST"); + assert_eq!( + request.url().as_str(), + "https://some-endpoint.com/errors?api_key=some_api_key" + ); + assert_eq!( + request.headers().get("Content-Type").unwrap(), + "application/json" + ); + assert_eq!( + String::from_utf8_lossy(request.body().unwrap().as_bytes().unwrap()), + format!( + concat!( + "{{", + r#""timestamp":{},"#, + r#""action":"some-action","#, + r#""namespace":"process","#, + r#""error":{{"#, + r#""name":"NonZeroExit","#, + r#""message":"line 1\nline 2\n[Process exited with code 42]""#, + r#"}},"#, + r#""tags":{{"#, + r#""{}-digest":"some-digest","#, + r#""exit_code":"42","#, + r#""exit_kind":"code","#, + r#""hostname":"some-hostname""#, + r#"}}"#, + "}}" + ), + EXPECTED_SECS, NAME + ) + ); + } +} diff --git a/src/main.rs b/src/main.rs index 8b74597..cab60cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,28 @@ mod check_in; mod cli; +mod error; mod log; +mod channel; mod client; mod exit; mod ndjson; mod package; -mod signals; +mod signal; mod timestamp; +use crate::channel::{maybe_recv, maybe_spawn_tee}; use crate::check_in::{CronKind, HeartbeatConfig}; use crate::cli::Cli; -use crate::client::client; +use crate::client::send_request; use crate::log::{LogConfig, LogMessage, LogSeverity}; use crate::package::NAME; -use crate::signals::{has_terminating_intent, signal_stream}; +use crate::signal::{has_terminating_intent, signal_stream}; use crate::timestamp::SystemTimestamp; use ::log::{debug, error, trace}; +use error::ErrorConfig; +use std::collections::VecDeque; use std::os::unix::process::ExitStatusExt; use std::process::{exit, ExitStatus, Stdio}; use std::{ @@ -29,6 +34,7 @@ use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; use tokio::process::{Child, Command}; use tokio::select; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; use tokio::time::{interval, Duration, MissedTickBehavior}; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; @@ -58,26 +64,14 @@ fn main() { async fn start(cli: Cli) -> Result> { let cron = cli.cron(); let log = cli.log(); + let error = cli.error(); let tasks = TaskTracker::new(); - let mut child = command(&cli.command, &log).spawn()?; + let (child, stdout, stderr) = spawn_child(&cli, &tasks)?; - let stdout = if log.origin.is_out() { - let (sender, receiver) = unbounded_channel(); - tasks.spawn(pipe_lines(child.stdout.take().unwrap(), stdout(), sender)); - Some(receiver) - } else { - None - }; - - let stderr = if log.origin.is_err() { - let (sender, receiver) = unbounded_channel(); - tasks.spawn(pipe_lines(child.stderr.take().unwrap(), stderr(), sender)); - Some(receiver) - } else { - None - }; + let (log_stdout, error_stdout) = maybe_spawn_tee(stdout); + let (log_stderr, error_stderr) = maybe_spawn_tee(stderr); if let Some(cron) = cron.as_ref() { tasks.spawn(send_request( @@ -91,7 +85,15 @@ async fn start(cli: Cli) -> Result> { token }); - tasks.spawn(log_loop(log, stdout, stderr)); + tasks.spawn(log_loop(log, log_stdout, log_stderr)); + + let error_message = if error.is_some() { + let (sender, receiver) = oneshot::channel(); + tasks.spawn(error_message_loop(sender, error_stdout, error_stderr)); + Some(receiver) + } else { + None + }; let exit_status = forward_signals_and_wait(child).await?; @@ -103,6 +105,12 @@ async fn start(cli: Cli) -> Result> { cron.request(&mut SystemTimestamp, CronKind::Finish), )); } + } else if let Some(error) = error { + tasks.spawn(send_error_request( + error, + exit_status, + error_message.unwrap(), + )); } if let Some(heartbeat) = heartbeat { @@ -158,25 +166,35 @@ async fn start(cli: Cli) -> Result> { } } -fn command(argv: &[String], log: &LogConfig) -> Command { - let mut command = Command::new(argv[0].clone()); - for arg in argv[1..].iter() { - command.arg(arg); - } +type SpawnedChild = ( + Child, + Option>, + Option>, +); - if log.origin.is_out() { - command.stdout(Stdio::piped()); - } +fn spawn_child(cli: &Cli, tasks: &TaskTracker) -> io::Result { + let should_stdout = cli.should_pipe_stdout(); + let should_stderr = cli.should_pipe_stderr(); - if log.origin.is_err() { - command.stderr(Stdio::piped()); - } + let mut child = command(&cli.command, should_stdout, should_stderr).spawn()?; - unsafe { - command.pre_exec(exit::exit_with_parent); - } + let stdout = if should_stdout { + let (sender, receiver) = unbounded_channel(); + tasks.spawn(pipe_lines(child.stdout.take().unwrap(), stdout(), sender)); + Some(receiver) + } else { + None + }; - command + let stderr = if should_stderr { + let (sender, receiver) = unbounded_channel(); + tasks.spawn(pipe_lines(child.stderr.take().unwrap(), stderr(), sender)); + Some(receiver) + } else { + None + }; + + Ok((child, stdout, stderr)) } // Pipes lines from an asynchronous reader to a synchronous writer, returning @@ -186,7 +204,7 @@ fn command(argv: &[String], log: &LogConfig) -> Command { async fn pipe_lines( from: impl AsyncRead + Unpin + Send + 'static, mut to: impl Write + Send + 'static, - sender: UnboundedSender>, + sender: UnboundedSender, ) { let mut from = BufReader::new(from).lines(); @@ -198,7 +216,7 @@ async fn pipe_lines( break; } - if let Err(err) = sender.send(Some(line)) { + if let Err(err) = sender.send(line) { debug!("error sending line: {}", err); break; }; @@ -210,33 +228,6 @@ async fn pipe_lines( } } } - - if let Err(err) = sender.send(None) { - debug!("error sending EOF: {}", err); - } -} - -async fn send_request(request: Result) { - let request = match request { - Ok(request) => request, - Err(err) => { - debug!("error creating request: {}", err); - return; - } - }; - - match client().execute(request.try_clone().unwrap()).await { - Ok(response) => { - if !response.status().is_success() { - debug!("request failed with status: {}", response.status()); - } else { - trace!("request successful: {}", request.url()); - } - } - Err(err) => { - debug!("error sending request: {:?}", err); - } - }; } async fn heartbeat_loop(config: HeartbeatConfig, cancel: CancellationToken) { @@ -257,24 +248,19 @@ async fn heartbeat_loop(config: HeartbeatConfig, cancel: CancellationToken) { } } -async fn maybe_recv(receiver: &mut Option>) -> Option { - match receiver { - Some(receiver) => receiver.recv().await, - None => None, - } -} +const LOG_MESSAGES_BATCH_SIZE: usize = 100; async fn log_loop( log: LogConfig, - mut stdout: Option>>, - mut stderr: Option>>, + mut stdout: Option>, + mut stderr: Option>, ) { - let mut timestamp = MonotonicTimestamp::new(SystemTimestamp); - if stdout.is_none() && stderr.is_none() { return; } + let mut timestamp = MonotonicTimestamp::new(SystemTimestamp); + let mut messages = Vec::new(); let mut interval = interval(Duration::from_secs(10)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -282,7 +268,7 @@ async fn log_loop( let tasks = TaskTracker::new(); loop { - if messages.len() >= 100 { + if messages.len() >= LOG_MESSAGES_BATCH_SIZE { let request = log.request(std::mem::take(&mut messages)); tasks.spawn(send_request(request)); interval.reset(); @@ -341,6 +327,62 @@ async fn log_loop( tasks.wait().await; } +const ERROR_MESSAGE_LINES: usize = 10; + +async fn error_message_loop( + sender: oneshot::Sender>, + mut stdout: Option>, + mut stderr: Option>, +) { + let mut lines = VecDeque::with_capacity(ERROR_MESSAGE_LINES); + + loop { + select! { + Some(maybe_line) = maybe_recv(&mut stdout) => { + match maybe_line { + None => { + stdout = None; + if stderr.is_none() { + break; + } + } + Some(line) => { + if lines.len() >= ERROR_MESSAGE_LINES { + lines.pop_front(); + } + + lines.push_back(line); + } + } + } + + Some(maybe_line) = maybe_recv(&mut stderr) => { + match maybe_line { + None => { + stderr = None; + if stdout.is_none() { + break; + } + } + Some(line) => { + if lines.len() >= ERROR_MESSAGE_LINES { + lines.pop_front(); + } + + lines.push_back(line); + } + } + } + + else => break + } + } + + if sender.send(lines).is_err() { + debug!("error sending error message"); + } +} + async fn forward_signals_and_wait(mut child: Child) -> io::Result { use nix::sys::signal::kill; use nix::unistd::Pid; @@ -369,3 +411,40 @@ async fn forward_signals_and_wait(mut child: Child) -> io::Result { } } } + +async fn send_error_request( + error: ErrorConfig, + exit_status: ExitStatus, + receiver: oneshot::Receiver>, +) { + let lines = match receiver.await { + Ok(lines) => lines, + Err(_) => { + debug!("error receiving error message"); + VecDeque::new() + } + }; + + send_request(error.request(&mut SystemTimestamp, &exit_status, lines)).await; +} + +fn command(argv: &[String], should_stdout: bool, should_stderr: bool) -> Command { + let mut command = Command::new(argv[0].clone()); + for arg in argv[1..].iter() { + command.arg(arg); + } + + if should_stdout { + command.stdout(Stdio::piped()); + } + + if should_stderr { + command.stderr(Stdio::piped()); + } + + unsafe { + command.pre_exec(exit::exit_with_parent); + } + + command +} diff --git a/src/signals.rs b/src/signal.rs similarity index 63% rename from src/signals.rs rename to src/signal.rs index 66fd666..6995e7a 100644 --- a/src/signals.rs +++ b/src/signal.rs @@ -68,3 +68,41 @@ pub fn signal_stream() -> io::Result> { Ok(signals.map(|(signal, _)| signal)) } + +// A mapping of signal numbers to signal names. Uses `libc` constants to +// correctly map non-portable signals to their names across platforms. +// For an unknown signal, the signal number is returned as a string. +pub fn signal_name(signal: i32) -> String { + match signal { + libc::SIGABRT => "SIGABRT".to_owned(), + libc::SIGALRM => "SIGALRM".to_owned(), + libc::SIGBUS => "SIGBUS".to_owned(), + libc::SIGCHLD => "SIGCHLD".to_owned(), + libc::SIGCONT => "SIGCONT".to_owned(), + libc::SIGFPE => "SIGFPE".to_owned(), + libc::SIGHUP => "SIGHUP".to_owned(), + libc::SIGILL => "SIGILL".to_owned(), + libc::SIGINT => "SIGINT".to_owned(), + libc::SIGIO => "SIGIO".to_owned(), + libc::SIGKILL => "SIGKILL".to_owned(), + libc::SIGPIPE => "SIGPIPE".to_owned(), + libc::SIGPROF => "SIGPROF".to_owned(), + libc::SIGQUIT => "SIGQUIT".to_owned(), + libc::SIGSEGV => "SIGSEGV".to_owned(), + libc::SIGSTOP => "SIGSTOP".to_owned(), + libc::SIGSYS => "SIGSYS".to_owned(), + libc::SIGTERM => "SIGTERM".to_owned(), + libc::SIGTRAP => "SIGTRAP".to_owned(), + libc::SIGTSTP => "SIGTSTP".to_owned(), + libc::SIGTTIN => "SIGTTIN".to_owned(), + libc::SIGTTOU => "SIGTTOU".to_owned(), + libc::SIGURG => "SIGURG".to_owned(), + libc::SIGUSR1 => "SIGUSR1".to_owned(), + libc::SIGUSR2 => "SIGUSR2".to_owned(), + libc::SIGVTALRM => "SIGVTALRM".to_owned(), + libc::SIGWINCH => "SIGWINCH".to_owned(), + libc::SIGXCPU => "SIGXCPU".to_owned(), + libc::SIGXFSZ => "SIGXFSZ".to_owned(), + signal => format!("{}", signal), + } +}