Skip to content

Commit

Permalink
Move utility functions to modules
Browse files Browse the repository at this point in the history
Move the utility functions for channels (`maybe_spawn_tee` and
`maybe_recv`) to a separate `channel` module, and move the
`send_request` utility function to the `client` module.

Rearrange the main module so it reads "from top to bottom".
  • Loading branch information
unflxw committed Nov 14, 2024
1 parent f0bd71d commit 9f6009b
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 113 deletions.
71 changes: 71 additions & 0 deletions src/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::future::Future;

use ::log::debug;

use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

pub async fn maybe_recv<T>(receiver: &mut Option<UnboundedReceiver<T>>) -> Option<Option<T>> {
match receiver {
Some(receiver) => Some(receiver.recv().await),
None => None,
}
}

pub fn maybe_spawn_tee<T: Clone + Send + 'static>(
receiver: Option<UnboundedReceiver<T>>,
) -> (Option<UnboundedReceiver<T>>, Option<UnboundedReceiver<T>>) {
match receiver {
Some(receiver) => {
let (first_receiver, second_receiver) = spawn_tee(receiver);
(Some(first_receiver), Some(second_receiver))
}
None => (None, None),
}
}

// An utility function that takes an unbounded receiver and returns two
// unbounded receivers that will receive the same items, spawning a task
// to read from the given receiver and write to the returned receivers.
// The items must implement the `Clone` trait.
pub fn spawn_tee<T: Clone + Send + 'static>(
receiver: UnboundedReceiver<T>,
) -> (UnboundedReceiver<T>, UnboundedReceiver<T>) {
let (future, first_receiver, second_receiver) = tee(receiver);

tokio::spawn(future);

(first_receiver, second_receiver)
}

fn tee<T: Clone + Send + 'static>(
receiver: UnboundedReceiver<T>,
) -> (
impl Future<Output = ()>,
UnboundedReceiver<T>,
UnboundedReceiver<T>,
) {
let (first_sender, first_receiver) = unbounded_channel();
let (second_sender, second_receiver) = unbounded_channel();

let future = tee_loop(receiver, first_sender, second_sender);

(future, first_receiver, second_receiver)
}

async fn tee_loop<T: Clone + Send + 'static>(
mut receiver: UnboundedReceiver<T>,
first_sender: UnboundedSender<T>,
second_sender: UnboundedSender<T>,
) {
while let Some(item) = receiver.recv().await {
if let Err(err) = first_sender.send(item.clone()) {
debug!("error sending item to first receiver: {}", err);
break;
}

if let Err(err) = second_sender.send(item) {
debug!("error sending item to second receiver: {}", err);
break;
}
}
}
25 changes: 25 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use reqwest::{Client, ClientBuilder};

use ::log::{debug, trace};

use crate::package::{NAME, VERSION};

pub fn client() -> Client {
Expand All @@ -8,3 +10,26 @@ pub fn client() -> Client {
.build()
.unwrap()
}

pub async fn send_request(request: Result<reqwest::Request, reqwest::Error>) {
let request = match request {
Ok(request) => request,
Err(err) => {
debug!("error creating request: {}", err);
return;
}
};

match client().execute(request.try_clone().unwrap()).await {
Ok(response) => {
if !response.status().is_success() {
debug!("request failed with status: {}", response.status());
} else {
trace!("request successful: {}", request.url());
}
}
Err(err) => {
debug!("error sending request: {:?}", err);
}
};
}
154 changes: 41 additions & 113 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ mod cli;
mod error;
mod log;

mod channel;
mod client;
mod exit;
mod ndjson;
mod package;
mod signal;
mod timestamp;

use crate::channel::{maybe_recv, maybe_spawn_tee};
use crate::check_in::{CronKind, HeartbeatConfig};
use crate::cli::Cli;
use crate::client::client;
use crate::client::send_request;
use crate::log::{LogConfig, LogMessage, LogSeverity};
use crate::package::NAME;
use crate::signal::{has_terminating_intent, signal_stream};
Expand Down Expand Up @@ -58,38 +60,6 @@ fn main() {
}
}

fn spawn_child(
cli: &Cli,
tasks: &TaskTracker,
) -> io::Result<(
Child,
Option<UnboundedReceiver<String>>,
Option<UnboundedReceiver<String>>,
)> {
let should_stdout = cli.should_pipe_stdout();
let should_stderr = cli.should_pipe_stderr();

let mut child = command(&cli.command, should_stdout, should_stderr).spawn()?;

let stdout = if should_stdout {
let (sender, receiver) = unbounded_channel();
tasks.spawn(pipe_lines(child.stdout.take().unwrap(), stdout(), sender));
Some(receiver)
} else {
None
};

let stderr = if should_stderr {
let (sender, receiver) = unbounded_channel();
tasks.spawn(pipe_lines(child.stderr.take().unwrap(), stderr(), sender));
Some(receiver)
} else {
None
};

Ok((child, stdout, stderr))
}

#[tokio::main]
async fn start(cli: Cli) -> Result<i32, Box<dyn std::error::Error>> {
let cron = cli.cron();
Expand Down Expand Up @@ -198,25 +168,36 @@ async fn start(cli: Cli) -> Result<i32, Box<dyn std::error::Error>> {
}
}

fn command(argv: &[String], should_stdout: bool, should_stderr: bool) -> Command {
let mut command = Command::new(argv[0].clone());
for arg in argv[1..].iter() {
command.arg(arg);
}
fn spawn_child(
cli: &Cli,
tasks: &TaskTracker,
) -> io::Result<(
Child,
Option<UnboundedReceiver<String>>,
Option<UnboundedReceiver<String>>,
)> {
let should_stdout = cli.should_pipe_stdout();
let should_stderr = cli.should_pipe_stderr();

if should_stdout {
command.stdout(Stdio::piped());
}
let mut child = command(&cli.command, should_stdout, should_stderr).spawn()?;

if should_stderr {
command.stderr(Stdio::piped());
}
let stdout = if should_stdout {
let (sender, receiver) = unbounded_channel();
tasks.spawn(pipe_lines(child.stdout.take().unwrap(), stdout(), sender));
Some(receiver)
} else {
None
};

unsafe {
command.pre_exec(exit::exit_with_parent);
}
let stderr = if should_stderr {
let (sender, receiver) = unbounded_channel();
tasks.spawn(pipe_lines(child.stderr.take().unwrap(), stderr(), sender));
Some(receiver)
} else {
None
};

command
Ok((child, stdout, stderr))
}

// Pipes lines from an asynchronous reader to a synchronous writer, returning
Expand Down Expand Up @@ -451,76 +432,23 @@ async fn send_error_request(
send_request(error.request(&mut SystemTimestamp, &exit_status, lines)).await;
}

async fn maybe_recv<T>(receiver: &mut Option<UnboundedReceiver<T>>) -> Option<Option<T>> {
match receiver {
Some(receiver) => Some(receiver.recv().await),
None => None,
fn command(argv: &[String], should_stdout: bool, should_stderr: bool) -> Command {
let mut command = Command::new(argv[0].clone());
for arg in argv[1..].iter() {
command.arg(arg);
}
}

fn maybe_spawn_tee<T: Clone + Send + 'static>(
receiver: Option<UnboundedReceiver<T>>,
) -> (Option<UnboundedReceiver<T>>, Option<UnboundedReceiver<T>>) {
match receiver {
Some(receiver) => {
let (first_receiver, second_receiver) = spawn_tee(receiver);
(Some(first_receiver), Some(second_receiver))
}
None => (None, None),
if should_stdout {
command.stdout(Stdio::piped());
}
}

fn spawn_tee<T: Clone + Send + 'static>(
receiver: UnboundedReceiver<T>,
) -> (UnboundedReceiver<T>, UnboundedReceiver<T>) {
let (first_sender, first_receiver) = unbounded_channel();
let (second_sender, second_receiver) = unbounded_channel();

tokio::spawn(tee(receiver, first_sender, second_sender));

(first_receiver, second_receiver)
}

// An utility function that takes an unbounded receiver and returns two
// unbounded receivers that will receive the same items. The items must
// implement the `Clone` trait.
async fn tee<T: Clone + Send + 'static>(
mut receiver: UnboundedReceiver<T>,
first_sender: UnboundedSender<T>,
second_sender: UnboundedSender<T>,
) {
while let Some(item) = receiver.recv().await {
if let Err(err) = first_sender.send(item.clone()) {
debug!("error sending item to first receiver: {}", err);
break;
}

if let Err(err) = second_sender.send(item) {
debug!("error sending item to second receiver: {}", err);
break;
}
if should_stderr {
command.stderr(Stdio::piped());
}
}

async fn send_request(request: Result<reqwest::Request, reqwest::Error>) {
let request = match request {
Ok(request) => request,
Err(err) => {
debug!("error creating request: {}", err);
return;
}
};
unsafe {
command.pre_exec(exit::exit_with_parent);
}

match client().execute(request.try_clone().unwrap()).await {
Ok(response) => {
if !response.status().is_success() {
debug!("request failed with status: {}", response.status());
} else {
trace!("request successful: {}", request.url());
}
}
Err(err) => {
debug!("error sending request: {:?}", err);
}
};
command
}

0 comments on commit 9f6009b

Please sign in to comment.