From 9f6009bd588aeb1858eafbeaa175d9f3ce958451 Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Thu, 14 Nov 2024 18:34:17 +0100 Subject: [PATCH] Move utility functions to modules Move the utility functions for channels (`maybe_spawn_tee` and `maybe_recv`) to a separate `channel` module, and move the `send_request` utility function to the `client` module. Rearrange the main module so it reads "from top to bottom". --- src/channel.rs | 71 +++++++++++++++++++++++ src/client.rs | 25 ++++++++ src/main.rs | 154 +++++++++++++------------------------------------ 3 files changed, 137 insertions(+), 113 deletions(-) create mode 100644 src/channel.rs diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..021504c --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,71 @@ +use std::future::Future; + +use ::log::debug; + +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +pub async fn maybe_recv(receiver: &mut Option>) -> Option> { + match receiver { + Some(receiver) => Some(receiver.recv().await), + None => None, + } +} + +pub fn maybe_spawn_tee( + receiver: Option>, +) -> (Option>, Option>) { + match receiver { + Some(receiver) => { + let (first_receiver, second_receiver) = spawn_tee(receiver); + (Some(first_receiver), Some(second_receiver)) + } + None => (None, None), + } +} + +// An utility function that takes an unbounded receiver and returns two +// unbounded receivers that will receive the same items, spawning a task +// to read from the given receiver and write to the returned receivers. +// The items must implement the `Clone` trait. +pub fn spawn_tee( + receiver: UnboundedReceiver, +) -> (UnboundedReceiver, UnboundedReceiver) { + let (future, first_receiver, second_receiver) = tee(receiver); + + tokio::spawn(future); + + (first_receiver, second_receiver) +} + +fn tee( + receiver: UnboundedReceiver, +) -> ( + impl Future, + UnboundedReceiver, + UnboundedReceiver, +) { + let (first_sender, first_receiver) = unbounded_channel(); + let (second_sender, second_receiver) = unbounded_channel(); + + let future = tee_loop(receiver, first_sender, second_sender); + + (future, first_receiver, second_receiver) +} + +async fn tee_loop( + mut receiver: UnboundedReceiver, + first_sender: UnboundedSender, + second_sender: UnboundedSender, +) { + while let Some(item) = receiver.recv().await { + if let Err(err) = first_sender.send(item.clone()) { + debug!("error sending item to first receiver: {}", err); + break; + } + + if let Err(err) = second_sender.send(item) { + debug!("error sending item to second receiver: {}", err); + break; + } + } +} diff --git a/src/client.rs b/src/client.rs index 2ca17de..c414f80 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,7 @@ use reqwest::{Client, ClientBuilder}; +use ::log::{debug, trace}; + use crate::package::{NAME, VERSION}; pub fn client() -> Client { @@ -8,3 +10,26 @@ pub fn client() -> Client { .build() .unwrap() } + +pub async fn send_request(request: Result) { + let request = match request { + Ok(request) => request, + Err(err) => { + debug!("error creating request: {}", err); + return; + } + }; + + match client().execute(request.try_clone().unwrap()).await { + Ok(response) => { + if !response.status().is_success() { + debug!("request failed with status: {}", response.status()); + } else { + trace!("request successful: {}", request.url()); + } + } + Err(err) => { + debug!("error sending request: {:?}", err); + } + }; +} diff --git a/src/main.rs b/src/main.rs index a45fade..8257c2b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ mod cli; mod error; mod log; +mod channel; mod client; mod exit; mod ndjson; @@ -10,9 +11,10 @@ mod package; mod signal; mod timestamp; +use crate::channel::{maybe_recv, maybe_spawn_tee}; use crate::check_in::{CronKind, HeartbeatConfig}; use crate::cli::Cli; -use crate::client::client; +use crate::client::send_request; use crate::log::{LogConfig, LogMessage, LogSeverity}; use crate::package::NAME; use crate::signal::{has_terminating_intent, signal_stream}; @@ -58,38 +60,6 @@ fn main() { } } -fn spawn_child( - cli: &Cli, - tasks: &TaskTracker, -) -> io::Result<( - Child, - Option>, - Option>, -)> { - let should_stdout = cli.should_pipe_stdout(); - let should_stderr = cli.should_pipe_stderr(); - - let mut child = command(&cli.command, should_stdout, should_stderr).spawn()?; - - let stdout = if should_stdout { - let (sender, receiver) = unbounded_channel(); - tasks.spawn(pipe_lines(child.stdout.take().unwrap(), stdout(), sender)); - Some(receiver) - } else { - None - }; - - let stderr = if should_stderr { - let (sender, receiver) = unbounded_channel(); - tasks.spawn(pipe_lines(child.stderr.take().unwrap(), stderr(), sender)); - Some(receiver) - } else { - None - }; - - Ok((child, stdout, stderr)) -} - #[tokio::main] async fn start(cli: Cli) -> Result> { let cron = cli.cron(); @@ -198,25 +168,36 @@ async fn start(cli: Cli) -> Result> { } } -fn command(argv: &[String], should_stdout: bool, should_stderr: bool) -> Command { - let mut command = Command::new(argv[0].clone()); - for arg in argv[1..].iter() { - command.arg(arg); - } +fn spawn_child( + cli: &Cli, + tasks: &TaskTracker, +) -> io::Result<( + Child, + Option>, + Option>, +)> { + let should_stdout = cli.should_pipe_stdout(); + let should_stderr = cli.should_pipe_stderr(); - if should_stdout { - command.stdout(Stdio::piped()); - } + let mut child = command(&cli.command, should_stdout, should_stderr).spawn()?; - if should_stderr { - command.stderr(Stdio::piped()); - } + let stdout = if should_stdout { + let (sender, receiver) = unbounded_channel(); + tasks.spawn(pipe_lines(child.stdout.take().unwrap(), stdout(), sender)); + Some(receiver) + } else { + None + }; - unsafe { - command.pre_exec(exit::exit_with_parent); - } + let stderr = if should_stderr { + let (sender, receiver) = unbounded_channel(); + tasks.spawn(pipe_lines(child.stderr.take().unwrap(), stderr(), sender)); + Some(receiver) + } else { + None + }; - command + Ok((child, stdout, stderr)) } // Pipes lines from an asynchronous reader to a synchronous writer, returning @@ -451,76 +432,23 @@ async fn send_error_request( send_request(error.request(&mut SystemTimestamp, &exit_status, lines)).await; } -async fn maybe_recv(receiver: &mut Option>) -> Option> { - match receiver { - Some(receiver) => Some(receiver.recv().await), - None => None, +fn command(argv: &[String], should_stdout: bool, should_stderr: bool) -> Command { + let mut command = Command::new(argv[0].clone()); + for arg in argv[1..].iter() { + command.arg(arg); } -} -fn maybe_spawn_tee( - receiver: Option>, -) -> (Option>, Option>) { - match receiver { - Some(receiver) => { - let (first_receiver, second_receiver) = spawn_tee(receiver); - (Some(first_receiver), Some(second_receiver)) - } - None => (None, None), + if should_stdout { + command.stdout(Stdio::piped()); } -} -fn spawn_tee( - receiver: UnboundedReceiver, -) -> (UnboundedReceiver, UnboundedReceiver) { - let (first_sender, first_receiver) = unbounded_channel(); - let (second_sender, second_receiver) = unbounded_channel(); - - tokio::spawn(tee(receiver, first_sender, second_sender)); - - (first_receiver, second_receiver) -} - -// An utility function that takes an unbounded receiver and returns two -// unbounded receivers that will receive the same items. The items must -// implement the `Clone` trait. -async fn tee( - mut receiver: UnboundedReceiver, - first_sender: UnboundedSender, - second_sender: UnboundedSender, -) { - while let Some(item) = receiver.recv().await { - if let Err(err) = first_sender.send(item.clone()) { - debug!("error sending item to first receiver: {}", err); - break; - } - - if let Err(err) = second_sender.send(item) { - debug!("error sending item to second receiver: {}", err); - break; - } + if should_stderr { + command.stderr(Stdio::piped()); } -} -async fn send_request(request: Result) { - let request = match request { - Ok(request) => request, - Err(err) => { - debug!("error creating request: {}", err); - return; - } - }; + unsafe { + command.pre_exec(exit::exit_with_parent); + } - match client().execute(request.try_clone().unwrap()).await { - Ok(response) => { - if !response.status().is_success() { - debug!("request failed with status: {}", response.status()); - } else { - trace!("request successful: {}", request.url()); - } - } - Err(err) => { - debug!("error sending request: {:?}", err); - } - }; + command }