From 66c87c7cbc25e2834db248a20606aac19f527cbe Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Thu, 14 Nov 2024 14:14:48 +0100 Subject: [PATCH 1/5] Report non-success exits as errors When the child process exits due to a signal, or exits with a non-zero exit code, report this as an error to AppSignal. Closes #2. This commit implements this functionality as opt-in, with an `--error` flag. This will probably be changed to opt-out once #5 is implemented, which will provide a name to group by. Set the hostname, digest, and exit information as tags on the error sample. --- src/cli.rs | 26 ++++++++ src/error.rs | 177 +++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 7 ++ src/signals.rs | 38 +++++++++++ 4 files changed, 248 insertions(+) create mode 100644 src/error.rs diff --git a/src/cli.rs b/src/cli.rs index ca4e4e6..27ac74c 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,6 +1,7 @@ use std::ffi::OsString; use crate::check_in::{CheckInConfig, CronConfig, HeartbeatConfig}; +use crate::error::ErrorConfig; use crate::log::{LogConfig, LogOrigin}; use ::log::warn; @@ -43,6 +44,13 @@ pub struct Cli { #[arg(long, value_name = "GROUP")] log: Option, + /// The action name to use to group errors by. + /// + /// If this option is not set, errors will not be sent to AppSignal when + /// a process exits with a non-zero exit code. + #[arg(long, value_name = "ACTION", requires = "api_key")] + error: Option, + /// The log source API key to use to send logs. /// /// If this option is not set, logs will be sent to the default @@ -252,6 +260,24 @@ impl Cli { digest, } } + + pub fn error(&self) -> Option { + self.error.as_ref().map(|action| { + let api_key = self.api_key.as_ref().unwrap().clone(); + let endpoint = self.endpoint.clone(); + let action = action.clone(); + let hostname = self.hostname.clone(); + let digest = self.digest.clone(); + + ErrorConfig { + api_key, + endpoint, + action, + hostname, + digest, + } + }) + } } #[cfg(test)] diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..dcf036b --- /dev/null +++ b/src/error.rs @@ -0,0 +1,177 @@ +use std::collections::BTreeMap; +use std::os::unix::process::ExitStatusExt; +use std::process::ExitStatus; + +use reqwest::Body; +use serde::Serialize; + +use crate::client::client; +use crate::package::NAME; +use crate::signals::signal_name; +use crate::timestamp::Timestamp; + +pub struct ErrorConfig { + pub api_key: String, + pub endpoint: String, + pub action: String, + pub hostname: String, + pub digest: String, +} + +impl ErrorConfig { + pub fn request( + &self, + timestamp: &mut impl Timestamp, + exit: &ExitStatus, + ) -> Result { + let url = format!("{}/errors", self.endpoint); + + client() + .post(url) + .query(&[("api_key", &self.api_key)]) + .header("Content-Type", "application/json") + .body(ErrorBody::from_config(&self, timestamp, exit)) + .build() + } +} + +#[derive(Serialize)] +pub struct ErrorBody { + pub timestamp: u64, + pub action: String, + pub namespace: String, + pub error: ErrorBodyError, + pub tags: BTreeMap, +} + +impl ErrorBody { + pub fn from_config( + config: &ErrorConfig, + timestamp: &mut impl Timestamp, + exit: &ExitStatus, + ) -> Self { + ErrorBody { + timestamp: timestamp.as_secs(), + action: config.action.clone(), + namespace: "process".to_string(), + error: ErrorBodyError::from_exit(exit), + tags: exit_tags(exit) + .into_iter() + .chain([ + ("hostname".to_string(), config.hostname.clone()), + (format!("{}-digest", NAME), config.digest.clone()), + ]) + .collect(), + } + } +} + +impl From for Body { + fn from(body: ErrorBody) -> Self { + Body::from(serde_json::to_string(&body).unwrap()) + } +} + +#[derive(Serialize)] +pub struct ErrorBodyError { + pub name: String, + pub message: String, +} + +impl ErrorBodyError { + pub fn from_exit(exit: &ExitStatus) -> Self { + if let Some(code) = exit.code() { + ErrorBodyError { + name: "NonZeroExit".to_string(), + message: format!("Process exited with code {}", code), + } + } else if let Some(signal) = exit.signal() { + ErrorBodyError { + name: "SignalExit".to_string(), + message: format!("Process exited with signal {}", signal_name(signal)), + } + } else { + ErrorBodyError { + name: "UnknownExit".to_string(), + message: "Process exited with unknown status".to_string(), + } + } + } +} + +fn exit_tags(exit: &ExitStatus) -> BTreeMap { + if let Some(code) = exit.code() { + [ + ("exit_code".to_string(), format!("{}", code)), + ("exit_kind".to_string(), "code".to_string()), + ] + .into() + } else if let Some(signal) = exit.signal() { + [ + ("exit_signal".to_string(), signal_name(signal)), + ("exit_kind".to_string(), "signal".to_string()), + ] + .into() + } else { + [("exit_kind".to_string(), "unknown".to_string())].into() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::timestamp::tests::{timestamp, EXPECTED_SECS}; + + fn error_config() -> ErrorConfig { + ErrorConfig { + api_key: "some_api_key".to_string(), + endpoint: "https://some-endpoint.com".to_string(), + hostname: "some-hostname".to_string(), + digest: "some-digest".to_string(), + action: "some-action".to_string(), + } + } + + #[test] + fn error_config_request() { + let config = error_config(); + // `ExitStatus::from_raw` expects a wait status, not an exit status. + // The wait status for exit code `n` is represented by `n << 8`. + let exit = ExitStatus::from_raw(42 << 8); + + let request = config.request(&mut timestamp(), &exit).unwrap(); + + assert_eq!(request.method().as_str(), "POST"); + assert_eq!( + request.url().as_str(), + "https://some-endpoint.com/errors?api_key=some_api_key" + ); + assert_eq!( + request.headers().get("Content-Type").unwrap(), + "application/json" + ); + assert_eq!( + String::from_utf8_lossy(request.body().unwrap().as_bytes().unwrap()), + format!( + concat!( + "{{", + r#""timestamp":{},"#, + r#""action":"some-action","#, + r#""namespace":"process","#, + r#""error":{{"#, + r#""name":"NonZeroExit","#, + r#""message":"Process exited with code 42""#, + r#"}},"#, + r#""tags":{{"#, + r#""{}-digest":"some-digest","#, + r#""exit_code":"42","#, + r#""exit_kind":"code","#, + r#""hostname":"some-hostname""#, + r#"}}"#, + "}}" + ), + EXPECTED_SECS, NAME + ) + ); + } +} diff --git a/src/main.rs b/src/main.rs index 8b74597..aa9eaa1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ mod check_in; mod cli; +mod error; mod log; mod client; @@ -103,6 +104,12 @@ async fn start(cli: Cli) -> Result> { cron.request(&mut SystemTimestamp, CronKind::Finish), )); } + } else { + if let Some(error) = cli.error() { + tasks.spawn(send_request( + error.request(&mut SystemTimestamp, &exit_status), + )); + } } if let Some(heartbeat) = heartbeat { diff --git a/src/signals.rs b/src/signals.rs index 66fd666..6995e7a 100644 --- a/src/signals.rs +++ b/src/signals.rs @@ -68,3 +68,41 @@ pub fn signal_stream() -> io::Result> { Ok(signals.map(|(signal, _)| signal)) } + +// A mapping of signal numbers to signal names. Uses `libc` constants to +// correctly map non-portable signals to their names across platforms. +// For an unknown signal, the signal number is returned as a string. +pub fn signal_name(signal: i32) -> String { + match signal { + libc::SIGABRT => "SIGABRT".to_owned(), + libc::SIGALRM => "SIGALRM".to_owned(), + libc::SIGBUS => "SIGBUS".to_owned(), + libc::SIGCHLD => "SIGCHLD".to_owned(), + libc::SIGCONT => "SIGCONT".to_owned(), + libc::SIGFPE => "SIGFPE".to_owned(), + libc::SIGHUP => "SIGHUP".to_owned(), + libc::SIGILL => "SIGILL".to_owned(), + libc::SIGINT => "SIGINT".to_owned(), + libc::SIGIO => "SIGIO".to_owned(), + libc::SIGKILL => "SIGKILL".to_owned(), + libc::SIGPIPE => "SIGPIPE".to_owned(), + libc::SIGPROF => "SIGPROF".to_owned(), + libc::SIGQUIT => "SIGQUIT".to_owned(), + libc::SIGSEGV => "SIGSEGV".to_owned(), + libc::SIGSTOP => "SIGSTOP".to_owned(), + libc::SIGSYS => "SIGSYS".to_owned(), + libc::SIGTERM => "SIGTERM".to_owned(), + libc::SIGTRAP => "SIGTRAP".to_owned(), + libc::SIGTSTP => "SIGTSTP".to_owned(), + libc::SIGTTIN => "SIGTTIN".to_owned(), + libc::SIGTTOU => "SIGTTOU".to_owned(), + libc::SIGURG => "SIGURG".to_owned(), + libc::SIGUSR1 => "SIGUSR1".to_owned(), + libc::SIGUSR2 => "SIGUSR2".to_owned(), + libc::SIGVTALRM => "SIGVTALRM".to_owned(), + libc::SIGWINCH => "SIGWINCH".to_owned(), + libc::SIGXCPU => "SIGXCPU".to_owned(), + libc::SIGXFSZ => "SIGXFSZ".to_owned(), + signal => format!("{}", signal), + } +} From 6f02bf995ac42d50f9fdaf575ca9e78745fb53bf Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Thu, 14 Nov 2024 18:00:16 +0100 Subject: [PATCH 2/5] Add last lines of output to error message When `--error` is set, use the last lines of standard output and standard error as the error message. This honors the `--no-stdout` and `--no-stderr` options -- if set, standard output or standard error won't be used for either logs or error messages. A separate message processing loop is used to keep the last ten lines of output and use them as the message. --- src/cli.rs | 38 +++++++- src/error.rs | 43 +++++---- src/main.rs | 260 ++++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 263 insertions(+), 78 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 27ac74c..c2d776a 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -79,11 +79,19 @@ pub struct Cli { #[arg(long, value_name = "IDENTIFIER", requires = "api_key")] cron: Option, - /// Do not send standard output as logs. + /// Do not send standard output. + /// + /// Do not send standard output as logs, and do not use the last + /// lines of standard output as part of the error message when + /// `--error` is set. #[arg(long)] no_stdout: bool, - /// Do not send standard error as logs. + /// Do not send standard error. + /// + /// Do not send standard error as logs, and do not use the last + /// lines of standard error as part of the error message when + /// `--error` is set. #[arg(long)] no_stderr: bool, @@ -246,7 +254,7 @@ impl Cli { .unwrap() .clone(); let endpoint = self.endpoint.clone(); - let origin = LogOrigin::from_args(self.no_log, self.no_stdout, self.no_stderr); + let origin = self.log_origin(); let group = self.log.clone().unwrap_or_else(|| "process".to_string()); let hostname = self.hostname.clone(); let digest: String = self.digest.clone(); @@ -278,6 +286,30 @@ impl Cli { } }) } + + fn log_origin(&self) -> LogOrigin { + LogOrigin::from_args(self.no_log, self.no_stdout, self.no_stderr) + } + + pub fn should_pipe_stderr(&self) -> bool { + // If `--error` is set, we need to pipe stderr for the error message, + // even if we're not sending logs, unless `--no-stderr` is set. + if self.error.is_some() { + return !self.no_stderr; + } + + self.log_origin().is_err() + } + + pub fn should_pipe_stdout(&self) -> bool { + // If `--error` is set, we need to pipe stdout for the error message, + // even if we're not sending logs, unless `--no-stdout` is set. + if self.error.is_some() { + return !self.no_stdout; + } + + self.log_origin().is_out() + } } #[cfg(test)] diff --git a/src/error.rs b/src/error.rs index dcf036b..2c02e5a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,6 +23,7 @@ impl ErrorConfig { &self, timestamp: &mut impl Timestamp, exit: &ExitStatus, + lines: impl IntoIterator, ) -> Result { let url = format!("{}/errors", self.endpoint); @@ -30,7 +31,7 @@ impl ErrorConfig { .post(url) .query(&[("api_key", &self.api_key)]) .header("Content-Type", "application/json") - .body(ErrorBody::from_config(&self, timestamp, exit)) + .body(ErrorBody::from_config(&self, timestamp, exit, lines)) .build() } } @@ -49,12 +50,13 @@ impl ErrorBody { config: &ErrorConfig, timestamp: &mut impl Timestamp, exit: &ExitStatus, + lines: impl IntoIterator, ) -> Self { ErrorBody { timestamp: timestamp.as_secs(), action: config.action.clone(), namespace: "process".to_string(), - error: ErrorBodyError::from_exit(exit), + error: ErrorBodyError::new(exit, lines), tags: exit_tags(exit) .into_iter() .chain([ @@ -79,23 +81,24 @@ pub struct ErrorBodyError { } impl ErrorBodyError { - pub fn from_exit(exit: &ExitStatus) -> Self { - if let Some(code) = exit.code() { - ErrorBodyError { - name: "NonZeroExit".to_string(), - message: format!("Process exited with code {}", code), - } + pub fn new(exit: &ExitStatus, lines: impl IntoIterator) -> Self { + let (name, exit_context) = if let Some(code) = exit.code() { + ("NonZeroExit".to_string(), format!("code {}", code)) } else if let Some(signal) = exit.signal() { - ErrorBodyError { - name: "SignalExit".to_string(), - message: format!("Process exited with signal {}", signal_name(signal)), - } + ( + "SignalExit".to_string(), + format!("signal {}", signal_name(signal)), + ) } else { - ErrorBodyError { - name: "UnknownExit".to_string(), - message: "Process exited with unknown status".to_string(), - } - } + ("UnknownExit".to_string(), "unknown status".to_string()) + }; + + let mut lines = lines.into_iter().collect::>(); + lines.push(format!("[Process exited with {}]", exit_context)); + + let message = lines.join("\n"); + + ErrorBodyError { name, message } } } @@ -137,9 +140,11 @@ mod tests { let config = error_config(); // `ExitStatus::from_raw` expects a wait status, not an exit status. // The wait status for exit code `n` is represented by `n << 8`. + // See `__WEXITSTATUS` in `glibc/bits/waitstatus.h` for reference. let exit = ExitStatus::from_raw(42 << 8); + let lines = vec!["line 1".to_string(), "line 2".to_string()]; - let request = config.request(&mut timestamp(), &exit).unwrap(); + let request = config.request(&mut timestamp(), &exit, lines).unwrap(); assert_eq!(request.method().as_str(), "POST"); assert_eq!( @@ -160,7 +165,7 @@ mod tests { r#""namespace":"process","#, r#""error":{{"#, r#""name":"NonZeroExit","#, - r#""message":"Process exited with code 42""#, + r#""message":"line 1\nline 2\n[Process exited with code 42]""#, r#"}},"#, r#""tags":{{"#, r#""{}-digest":"some-digest","#, diff --git a/src/main.rs b/src/main.rs index aa9eaa1..6ee8520 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,6 +19,8 @@ use crate::signals::{has_terminating_intent, signal_stream}; use crate::timestamp::SystemTimestamp; use ::log::{debug, error, trace}; +use error::ErrorConfig; +use std::collections::VecDeque; use std::os::unix::process::ExitStatusExt; use std::process::{exit, ExitStatus, Stdio}; use std::{ @@ -30,6 +32,7 @@ use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; use tokio::process::{Child, Command}; use tokio::select; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; use tokio::time::{interval, Duration, MissedTickBehavior}; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; @@ -55,16 +58,20 @@ fn main() { } } -#[tokio::main] -async fn start(cli: Cli) -> Result> { - let cron = cli.cron(); - let log = cli.log(); - - let tasks = TaskTracker::new(); +fn spawn_child( + cli: &Cli, + tasks: &TaskTracker, +) -> io::Result<( + Child, + Option>, + Option>, +)> { + let should_stdout = cli.should_pipe_stdout(); + let should_stderr = cli.should_pipe_stderr(); - let mut child = command(&cli.command, &log).spawn()?; + let mut child = command(&cli.command, should_stdout, should_stderr).spawn()?; - let stdout = if log.origin.is_out() { + let stdout = if should_stdout { let (sender, receiver) = unbounded_channel(); tasks.spawn(pipe_lines(child.stdout.take().unwrap(), stdout(), sender)); Some(receiver) @@ -72,7 +79,7 @@ async fn start(cli: Cli) -> Result> { None }; - let stderr = if log.origin.is_err() { + let stderr = if should_stderr { let (sender, receiver) = unbounded_channel(); tasks.spawn(pipe_lines(child.stderr.take().unwrap(), stderr(), sender)); Some(receiver) @@ -80,6 +87,22 @@ async fn start(cli: Cli) -> Result> { None }; + Ok((child, stdout, stderr)) +} + +#[tokio::main] +async fn start(cli: Cli) -> Result> { + let cron = cli.cron(); + let log = cli.log(); + let error = cli.error(); + + let tasks = TaskTracker::new(); + + let (child, stdout, stderr) = spawn_child(&cli, &tasks)?; + + let (log_stdout, error_stdout) = maybe_spawn_tee(stdout); + let (log_stderr, error_stderr) = maybe_spawn_tee(stderr); + if let Some(cron) = cron.as_ref() { tasks.spawn(send_request( cron.request(&mut SystemTimestamp, CronKind::Start), @@ -92,7 +115,15 @@ async fn start(cli: Cli) -> Result> { token }); - tasks.spawn(log_loop(log, stdout, stderr)); + tasks.spawn(log_loop(log, log_stdout, log_stderr)); + + let error_message = if error.is_some() { + let (sender, receiver) = oneshot::channel(); + tasks.spawn(error_message_loop(sender, error_stdout, error_stderr)); + Some(receiver) + } else { + None + }; let exit_status = forward_signals_and_wait(child).await?; @@ -105,9 +136,11 @@ async fn start(cli: Cli) -> Result> { )); } } else { - if let Some(error) = cli.error() { - tasks.spawn(send_request( - error.request(&mut SystemTimestamp, &exit_status), + if let Some(error) = error { + tasks.spawn(send_error_request( + error, + exit_status.clone(), + error_message.unwrap(), )); } } @@ -165,17 +198,17 @@ async fn start(cli: Cli) -> Result> { } } -fn command(argv: &[String], log: &LogConfig) -> Command { +fn command(argv: &[String], should_stdout: bool, should_stderr: bool) -> Command { let mut command = Command::new(argv[0].clone()); for arg in argv[1..].iter() { command.arg(arg); } - if log.origin.is_out() { + if should_stdout { command.stdout(Stdio::piped()); } - if log.origin.is_err() { + if should_stderr { command.stderr(Stdio::piped()); } @@ -193,7 +226,7 @@ fn command(argv: &[String], log: &LogConfig) -> Command { async fn pipe_lines( from: impl AsyncRead + Unpin + Send + 'static, mut to: impl Write + Send + 'static, - sender: UnboundedSender>, + sender: UnboundedSender, ) { let mut from = BufReader::new(from).lines(); @@ -205,7 +238,7 @@ async fn pipe_lines( break; } - if let Err(err) = sender.send(Some(line)) { + if let Err(err) = sender.send(line) { debug!("error sending line: {}", err); break; }; @@ -217,33 +250,6 @@ async fn pipe_lines( } } } - - if let Err(err) = sender.send(None) { - debug!("error sending EOF: {}", err); - } -} - -async fn send_request(request: Result) { - let request = match request { - Ok(request) => request, - Err(err) => { - debug!("error creating request: {}", err); - return; - } - }; - - match client().execute(request.try_clone().unwrap()).await { - Ok(response) => { - if !response.status().is_success() { - debug!("request failed with status: {}", response.status()); - } else { - trace!("request successful: {}", request.url()); - } - } - Err(err) => { - debug!("error sending request: {:?}", err); - } - }; } async fn heartbeat_loop(config: HeartbeatConfig, cancel: CancellationToken) { @@ -264,24 +270,19 @@ async fn heartbeat_loop(config: HeartbeatConfig, cancel: CancellationToken) { } } -async fn maybe_recv(receiver: &mut Option>) -> Option { - match receiver { - Some(receiver) => receiver.recv().await, - None => None, - } -} +const LOG_MESSAGES_BATCH_SIZE: usize = 100; async fn log_loop( log: LogConfig, - mut stdout: Option>>, - mut stderr: Option>>, + mut stdout: Option>, + mut stderr: Option>, ) { - let mut timestamp = MonotonicTimestamp::new(SystemTimestamp); - if stdout.is_none() && stderr.is_none() { return; } + let mut timestamp = MonotonicTimestamp::new(SystemTimestamp); + let mut messages = Vec::new(); let mut interval = interval(Duration::from_secs(10)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -289,7 +290,7 @@ async fn log_loop( let tasks = TaskTracker::new(); loop { - if messages.len() >= 100 { + if messages.len() >= LOG_MESSAGES_BATCH_SIZE { let request = log.request(std::mem::take(&mut messages)); tasks.spawn(send_request(request)); interval.reset(); @@ -348,6 +349,63 @@ async fn log_loop( tasks.wait().await; } +const ERROR_MESSAGE_LINES: usize = 10; + +async fn error_message_loop( + sender: oneshot::Sender>, + mut stdout: Option>, + mut stderr: Option>, +) { + let mut lines = VecDeque::with_capacity(ERROR_MESSAGE_LINES); + + loop { + select! { + Some(maybe_line) = maybe_recv(&mut stdout) => { + match maybe_line { + None => { + stdout = None; + if stderr.is_none() { + break; + } + } + Some(line) => { + if lines.len() >= ERROR_MESSAGE_LINES { + lines.pop_front(); + } + + lines.push_back(line); + } + } + } + + Some(maybe_line) = maybe_recv(&mut stderr) => { + match maybe_line { + None => { + stderr = None; + if stdout.is_none() { + break; + } + } + Some(line) => { + if lines.len() >= ERROR_MESSAGE_LINES { + lines.pop_front(); + } + + lines.push_back(line); + } + } + } + + else => break + } + } + + match sender.send(lines.into()) { + Err(_) => debug!("error sending error message"), + _ => (), + }; +} + async fn forward_signals_and_wait(mut child: Child) -> io::Result { use nix::sys::signal::kill; use nix::unistd::Pid; @@ -376,3 +434,93 @@ async fn forward_signals_and_wait(mut child: Child) -> io::Result { } } } + +async fn send_error_request( + error: ErrorConfig, + exit_status: ExitStatus, + receiver: oneshot::Receiver>, +) { + let lines = match receiver.await { + Ok(lines) => lines, + Err(_) => { + debug!("error receiving error message"); + VecDeque::new() + } + }; + + send_request(error.request(&mut SystemTimestamp, &exit_status, lines)).await; +} + +async fn maybe_recv(receiver: &mut Option>) -> Option> { + match receiver { + Some(receiver) => Some(receiver.recv().await), + None => None, + } +} + +fn maybe_spawn_tee( + receiver: Option>, +) -> (Option>, Option>) { + match receiver { + Some(receiver) => { + let (first_receiver, second_receiver) = spawn_tee(receiver); + (Some(first_receiver), Some(second_receiver)) + } + None => (None, None), + } +} + +fn spawn_tee( + receiver: UnboundedReceiver, +) -> (UnboundedReceiver, UnboundedReceiver) { + let (first_sender, first_receiver) = unbounded_channel(); + let (second_sender, second_receiver) = unbounded_channel(); + + tokio::spawn(tee(receiver, first_sender, second_sender)); + + (first_receiver, second_receiver) +} + +// An utility function that takes an unbounded receiver and returns two +// unbounded receivers that will receive the same items. The items must +// implement the `Clone` trait. +async fn tee( + mut receiver: UnboundedReceiver, + first_sender: UnboundedSender, + second_sender: UnboundedSender, +) { + while let Some(item) = receiver.recv().await { + if let Err(err) = first_sender.send(item.clone()) { + debug!("error sending item to first receiver: {}", err); + break; + } + + if let Err(err) = second_sender.send(item) { + debug!("error sending item to second receiver: {}", err); + break; + } + } +} + +async fn send_request(request: Result) { + let request = match request { + Ok(request) => request, + Err(err) => { + debug!("error creating request: {}", err); + return; + } + }; + + match client().execute(request.try_clone().unwrap()).await { + Ok(response) => { + if !response.status().is_success() { + debug!("request failed with status: {}", response.status()); + } else { + trace!("request successful: {}", request.url()); + } + } + Err(err) => { + debug!("error sending request: {:?}", err); + } + }; +} From f0bd71d936b6e348ceb1855288e7314e4bb46747 Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Thu, 14 Nov 2024 18:16:18 +0100 Subject: [PATCH 3/5] Rename `signals.rs` to `signal.rs` This matches the naming of the upstream crates. --- src/error.rs | 2 +- src/main.rs | 4 ++-- src/{signals.rs => signal.rs} | 0 3 files changed, 3 insertions(+), 3 deletions(-) rename src/{signals.rs => signal.rs} (100%) diff --git a/src/error.rs b/src/error.rs index 2c02e5a..9dfc7bd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,7 +7,7 @@ use serde::Serialize; use crate::client::client; use crate::package::NAME; -use crate::signals::signal_name; +use crate::signal::signal_name; use crate::timestamp::Timestamp; pub struct ErrorConfig { diff --git a/src/main.rs b/src/main.rs index 6ee8520..a45fade 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ mod client; mod exit; mod ndjson; mod package; -mod signals; +mod signal; mod timestamp; use crate::check_in::{CronKind, HeartbeatConfig}; @@ -15,7 +15,7 @@ use crate::cli::Cli; use crate::client::client; use crate::log::{LogConfig, LogMessage, LogSeverity}; use crate::package::NAME; -use crate::signals::{has_terminating_intent, signal_stream}; +use crate::signal::{has_terminating_intent, signal_stream}; use crate::timestamp::SystemTimestamp; use ::log::{debug, error, trace}; diff --git a/src/signals.rs b/src/signal.rs similarity index 100% rename from src/signals.rs rename to src/signal.rs From 9f6009bd588aeb1858eafbeaa175d9f3ce958451 Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Thu, 14 Nov 2024 18:34:17 +0100 Subject: [PATCH 4/5] Move utility functions to modules Move the utility functions for channels (`maybe_spawn_tee` and `maybe_recv`) to a separate `channel` module, and move the `send_request` utility function to the `client` module. Rearrange the main module so it reads "from top to bottom". --- src/channel.rs | 71 +++++++++++++++++++++++ src/client.rs | 25 ++++++++ src/main.rs | 154 +++++++++++++------------------------------------ 3 files changed, 137 insertions(+), 113 deletions(-) create mode 100644 src/channel.rs diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..021504c --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,71 @@ +use std::future::Future; + +use ::log::debug; + +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +pub async fn maybe_recv(receiver: &mut Option>) -> Option> { + match receiver { + Some(receiver) => Some(receiver.recv().await), + None => None, + } +} + +pub fn maybe_spawn_tee( + receiver: Option>, +) -> (Option>, Option>) { + match receiver { + Some(receiver) => { + let (first_receiver, second_receiver) = spawn_tee(receiver); + (Some(first_receiver), Some(second_receiver)) + } + None => (None, None), + } +} + +// An utility function that takes an unbounded receiver and returns two +// unbounded receivers that will receive the same items, spawning a task +// to read from the given receiver and write to the returned receivers. +// The items must implement the `Clone` trait. +pub fn spawn_tee( + receiver: UnboundedReceiver, +) -> (UnboundedReceiver, UnboundedReceiver) { + let (future, first_receiver, second_receiver) = tee(receiver); + + tokio::spawn(future); + + (first_receiver, second_receiver) +} + +fn tee( + receiver: UnboundedReceiver, +) -> ( + impl Future, + UnboundedReceiver, + UnboundedReceiver, +) { + let (first_sender, first_receiver) = unbounded_channel(); + let (second_sender, second_receiver) = unbounded_channel(); + + let future = tee_loop(receiver, first_sender, second_sender); + + (future, first_receiver, second_receiver) +} + +async fn tee_loop( + mut receiver: UnboundedReceiver, + first_sender: UnboundedSender, + second_sender: UnboundedSender, +) { + while let Some(item) = receiver.recv().await { + if let Err(err) = first_sender.send(item.clone()) { + debug!("error sending item to first receiver: {}", err); + break; + } + + if let Err(err) = second_sender.send(item) { + debug!("error sending item to second receiver: {}", err); + break; + } + } +} diff --git a/src/client.rs b/src/client.rs index 2ca17de..c414f80 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,7 @@ use reqwest::{Client, ClientBuilder}; +use ::log::{debug, trace}; + use crate::package::{NAME, VERSION}; pub fn client() -> Client { @@ -8,3 +10,26 @@ pub fn client() -> Client { .build() .unwrap() } + +pub async fn send_request(request: Result) { + let request = match request { + Ok(request) => request, + Err(err) => { + debug!("error creating request: {}", err); + return; + } + }; + + match client().execute(request.try_clone().unwrap()).await { + Ok(response) => { + if !response.status().is_success() { + debug!("request failed with status: {}", response.status()); + } else { + trace!("request successful: {}", request.url()); + } + } + Err(err) => { + debug!("error sending request: {:?}", err); + } + }; +} diff --git a/src/main.rs b/src/main.rs index a45fade..8257c2b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ mod cli; mod error; mod log; +mod channel; mod client; mod exit; mod ndjson; @@ -10,9 +11,10 @@ mod package; mod signal; mod timestamp; +use crate::channel::{maybe_recv, maybe_spawn_tee}; use crate::check_in::{CronKind, HeartbeatConfig}; use crate::cli::Cli; -use crate::client::client; +use crate::client::send_request; use crate::log::{LogConfig, LogMessage, LogSeverity}; use crate::package::NAME; use crate::signal::{has_terminating_intent, signal_stream}; @@ -58,38 +60,6 @@ fn main() { } } -fn spawn_child( - cli: &Cli, - tasks: &TaskTracker, -) -> io::Result<( - Child, - Option>, - Option>, -)> { - let should_stdout = cli.should_pipe_stdout(); - let should_stderr = cli.should_pipe_stderr(); - - let mut child = command(&cli.command, should_stdout, should_stderr).spawn()?; - - let stdout = if should_stdout { - let (sender, receiver) = unbounded_channel(); - tasks.spawn(pipe_lines(child.stdout.take().unwrap(), stdout(), sender)); - Some(receiver) - } else { - None - }; - - let stderr = if should_stderr { - let (sender, receiver) = unbounded_channel(); - tasks.spawn(pipe_lines(child.stderr.take().unwrap(), stderr(), sender)); - Some(receiver) - } else { - None - }; - - Ok((child, stdout, stderr)) -} - #[tokio::main] async fn start(cli: Cli) -> Result> { let cron = cli.cron(); @@ -198,25 +168,36 @@ async fn start(cli: Cli) -> Result> { } } -fn command(argv: &[String], should_stdout: bool, should_stderr: bool) -> Command { - let mut command = Command::new(argv[0].clone()); - for arg in argv[1..].iter() { - command.arg(arg); - } +fn spawn_child( + cli: &Cli, + tasks: &TaskTracker, +) -> io::Result<( + Child, + Option>, + Option>, +)> { + let should_stdout = cli.should_pipe_stdout(); + let should_stderr = cli.should_pipe_stderr(); - if should_stdout { - command.stdout(Stdio::piped()); - } + let mut child = command(&cli.command, should_stdout, should_stderr).spawn()?; - if should_stderr { - command.stderr(Stdio::piped()); - } + let stdout = if should_stdout { + let (sender, receiver) = unbounded_channel(); + tasks.spawn(pipe_lines(child.stdout.take().unwrap(), stdout(), sender)); + Some(receiver) + } else { + None + }; - unsafe { - command.pre_exec(exit::exit_with_parent); - } + let stderr = if should_stderr { + let (sender, receiver) = unbounded_channel(); + tasks.spawn(pipe_lines(child.stderr.take().unwrap(), stderr(), sender)); + Some(receiver) + } else { + None + }; - command + Ok((child, stdout, stderr)) } // Pipes lines from an asynchronous reader to a synchronous writer, returning @@ -451,76 +432,23 @@ async fn send_error_request( send_request(error.request(&mut SystemTimestamp, &exit_status, lines)).await; } -async fn maybe_recv(receiver: &mut Option>) -> Option> { - match receiver { - Some(receiver) => Some(receiver.recv().await), - None => None, +fn command(argv: &[String], should_stdout: bool, should_stderr: bool) -> Command { + let mut command = Command::new(argv[0].clone()); + for arg in argv[1..].iter() { + command.arg(arg); } -} -fn maybe_spawn_tee( - receiver: Option>, -) -> (Option>, Option>) { - match receiver { - Some(receiver) => { - let (first_receiver, second_receiver) = spawn_tee(receiver); - (Some(first_receiver), Some(second_receiver)) - } - None => (None, None), + if should_stdout { + command.stdout(Stdio::piped()); } -} -fn spawn_tee( - receiver: UnboundedReceiver, -) -> (UnboundedReceiver, UnboundedReceiver) { - let (first_sender, first_receiver) = unbounded_channel(); - let (second_sender, second_receiver) = unbounded_channel(); - - tokio::spawn(tee(receiver, first_sender, second_sender)); - - (first_receiver, second_receiver) -} - -// An utility function that takes an unbounded receiver and returns two -// unbounded receivers that will receive the same items. The items must -// implement the `Clone` trait. -async fn tee( - mut receiver: UnboundedReceiver, - first_sender: UnboundedSender, - second_sender: UnboundedSender, -) { - while let Some(item) = receiver.recv().await { - if let Err(err) = first_sender.send(item.clone()) { - debug!("error sending item to first receiver: {}", err); - break; - } - - if let Err(err) = second_sender.send(item) { - debug!("error sending item to second receiver: {}", err); - break; - } + if should_stderr { + command.stderr(Stdio::piped()); } -} -async fn send_request(request: Result) { - let request = match request { - Ok(request) => request, - Err(err) => { - debug!("error creating request: {}", err); - return; - } - }; + unsafe { + command.pre_exec(exit::exit_with_parent); + } - match client().execute(request.try_clone().unwrap()).await { - Ok(response) => { - if !response.status().is_success() { - debug!("request failed with status: {}", response.status()); - } else { - trace!("request successful: {}", request.url()); - } - } - Err(err) => { - debug!("error sending request: {:?}", err); - } - }; + command } From 33a78377c2252bd6de7b6d3ea5a493e084f739f8 Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Thu, 14 Nov 2024 18:45:43 +0100 Subject: [PATCH 5/5] Make Clippy happy Some linter-suggested fixes. --- src/error.rs | 2 +- src/main.rs | 30 +++++++++++++----------------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/src/error.rs b/src/error.rs index 9dfc7bd..6416e0c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -31,7 +31,7 @@ impl ErrorConfig { .post(url) .query(&[("api_key", &self.api_key)]) .header("Content-Type", "application/json") - .body(ErrorBody::from_config(&self, timestamp, exit, lines)) + .body(ErrorBody::from_config(self, timestamp, exit, lines)) .build() } } diff --git a/src/main.rs b/src/main.rs index 8257c2b..cab60cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -105,14 +105,12 @@ async fn start(cli: Cli) -> Result> { cron.request(&mut SystemTimestamp, CronKind::Finish), )); } - } else { - if let Some(error) = error { - tasks.spawn(send_error_request( - error, - exit_status.clone(), - error_message.unwrap(), - )); - } + } else if let Some(error) = error { + tasks.spawn(send_error_request( + error, + exit_status, + error_message.unwrap(), + )); } if let Some(heartbeat) = heartbeat { @@ -168,14 +166,13 @@ async fn start(cli: Cli) -> Result> { } } -fn spawn_child( - cli: &Cli, - tasks: &TaskTracker, -) -> io::Result<( +type SpawnedChild = ( Child, Option>, Option>, -)> { +); + +fn spawn_child(cli: &Cli, tasks: &TaskTracker) -> io::Result { let should_stdout = cli.should_pipe_stdout(); let should_stderr = cli.should_pipe_stderr(); @@ -381,10 +378,9 @@ async fn error_message_loop( } } - match sender.send(lines.into()) { - Err(_) => debug!("error sending error message"), - _ => (), - }; + if sender.send(lines).is_err() { + debug!("error sending error message"); + } } async fn forward_signals_and_wait(mut child: Child) -> io::Result {