Skip to content

Commit

Permalink
Rearrange main.rs and add comments
Browse files Browse the repository at this point in the history
Add comments to explain the reasoning behind certain quirks in how
this works.

Rearrange `main.rs` so it reads "from top to bottom", that is, from
the main function to the smaller helper functions.
  • Loading branch information
unflxw committed Nov 12, 2024
1 parent 8b339e8 commit e881c62
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 105 deletions.
208 changes: 104 additions & 104 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,110 @@ fn main() {
}
}

#[tokio::main]
async fn start(cli: Cli) -> Result<i32, Box<dyn std::error::Error>> {
let cron = cli.cron();
let log = cli.log();

let tasks = TaskTracker::new();

let mut child = command(&cli.command, &log).spawn()?;

let stdout = if log.origin.is_out() {
let (sender, receiver) = unbounded_channel();
tasks.spawn(pipe_lines(child.stdout.take().unwrap(), stdout(), sender));
Some(receiver)
} else {
None
};

let stderr = if log.origin.is_err() {
let (sender, receiver) = unbounded_channel();
tasks.spawn(pipe_lines(child.stderr.take().unwrap(), stderr(), sender));
Some(receiver)
} else {
None
};

if let Some(cron) = cron.as_ref() {
tasks.spawn(send_request(
cron.request(&mut SystemTimestamp, CronKind::Start),
));
}

let heartbeat = cli.heartbeat().map(|config| {
let token = CancellationToken::new();
tasks.spawn(heartbeat_loop(config, token.clone()));
token
});

tasks.spawn(log_loop(log, stdout, stderr));

let exit_status = forward_signals_and_wait(child).await?;

debug!("command exited with: {}", exit_status);

if exit_status.success() {
if let Some(cron) = cron.as_ref() {
tasks.spawn(send_request(
cron.request(&mut SystemTimestamp, CronKind::Finish),
));
}
}

if let Some(heartbeat) = heartbeat {
heartbeat.cancel();
}

tasks.close();

if !tasks.is_empty() {
debug!("waiting for {} tasks to complete", tasks.len());

// Calling `forward_signals_and_wait` earlier set a signal handler for those signals,
// overriding their default behaviour, which is to cause the process to terminate.
// After `forward_signals_and_wait` finishes, those signal handlers are still set.
//
// While we wait for the tasks to complete, we need to continue to listen to those
// signal handlers.
//
// This allows for the wrapper process to be terminated by certain signals both before
// and after the child process' lifetime.
//
// See https://docs.rs/tokio/latest/tokio/signal/unix/struct.Signal.html#caveats
// for reference.
let mut signals = signal_stream()?;

loop {
select! {
biased;

_ = tasks.wait() => {
break;
}

Some(signal) = signals.next() => {
if has_terminating_intent(&signal) {
debug!("received terminating signal after child: {}", signal);
return Ok(128 + signal as i32);
} else {
trace!("ignoring non-terminating signal after child: {}", signal);
}
}
}
}
}

if let Some(code) = exit_status.code() {
Ok(code)
} else {
match exit_status.signal() {
Some(signal) => Ok(128 + signal),
None => Err("command exited without code or signal".into()),
}
}
}

fn command(argv: &[String], log: &LogConfig) -> Command {
let mut command = Command::new(argv[0].clone());
for arg in argv[1..].iter() {
Expand Down Expand Up @@ -265,107 +369,3 @@ async fn forward_signals_and_wait(mut child: Child) -> io::Result<ExitStatus> {
}
}
}

#[tokio::main]
async fn start(cli: Cli) -> Result<i32, Box<dyn std::error::Error>> {
let cron = cli.cron();
let log = cli.log();

let tasks = TaskTracker::new();

let mut child = command(&cli.command, &log).spawn()?;

let stdout = if log.origin.is_out() {
let (sender, receiver) = unbounded_channel();
tasks.spawn(pipe_lines(child.stdout.take().unwrap(), stdout(), sender));
Some(receiver)
} else {
None
};

let stderr = if log.origin.is_err() {
let (sender, receiver) = unbounded_channel();
tasks.spawn(pipe_lines(child.stderr.take().unwrap(), stderr(), sender));
Some(receiver)
} else {
None
};

if let Some(cron) = cron.as_ref() {
tasks.spawn(send_request(
cron.request(&mut SystemTimestamp, CronKind::Start),
));
}

let heartbeat = cli.heartbeat().map(|config| {
let token = CancellationToken::new();
tasks.spawn(heartbeat_loop(config, token.clone()));
token
});

tasks.spawn(log_loop(log, stdout, stderr));

let exit_status = forward_signals_and_wait(child).await?;

debug!("command exited with: {}", exit_status);

if exit_status.success() {
if let Some(cron) = cron.as_ref() {
tasks.spawn(send_request(
cron.request(&mut SystemTimestamp, CronKind::Finish),
));
}
}

if let Some(heartbeat) = heartbeat {
heartbeat.cancel();
}

tasks.close();

if !tasks.is_empty() {
debug!("waiting for {} tasks to complete", tasks.len());

// Calling `forward_signals_and_wait` earlier set a signal handler for those signals,
// overriding their default behaviour, which is to cause the process to terminate.
// After `forward_signals_and_wait` finishes, those signal handlers are still set.
//
// While we wait for the tasks to complete, we need to continue to listen to those
// signal handlers.
//
// This allows for the wrapper process to be terminated by certain signals both before
// and after the child process' lifetime.
//
// See https://docs.rs/tokio/latest/tokio/signal/unix/struct.Signal.html#caveats
// for reference.
let mut signals = signal_stream()?;

loop {
select! {
biased;

_ = tasks.wait() => {
break;
}

Some(signal) = signals.next() => {
if has_terminating_intent(&signal) {
debug!("received terminating signal after child: {}", signal);
return Ok(128 + signal as i32);
} else {
trace!("ignoring non-terminating signal after child: {}", signal);
}
}
}
}
}

if let Some(code) = exit_status.code() {
Ok(code)
} else {
match exit_status.signal() {
Some(signal) => Ok(128 + signal),
None => Err("command exited without code or signal".into()),
}
}
}
3 changes: 2 additions & 1 deletion src/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ fn nix_to_tokio(signal: &Signal) -> SignalKind {
//
// This list only includes signals that can be caught and handled by the
// application. Signals that cannot be caught, such as SIGKILL and SIGSTOP,
// are not included.
// are not included. If the wrapper is killed by a `SIGKILL` or `SIGSTOP`,
// the child process will receive a `SIGTERM` signal -- see `exit_with_parent`.
const CHILD_FORWARDABLE_SIGNALS: [Signal; 7] = [
Signal::SIGUSR1,
Signal::SIGUSR2,
Expand Down
9 changes: 9 additions & 0 deletions src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ impl Timestamp for SystemTimestamp {

const MONOTONIC_GAP: Duration = Duration::from_millis(1);

// This works around an issue with the logging feature, where timestamps only
// have millisecond precision. This can cause issues when multiple logs are
// written in the same millisecond, as they will have the same timestamp and
// they will be displayed out of order in the UI.

// The monotonic timestamp prevents this issue by ensuring that the
// timestamps returned between two successive calls are at least one
// millisecond apart. This means, however, that the timestamps may not
// accurately reflect the times at which the logs were written.
pub struct MonotonicTimestamp<T: Timestamp> {
last: Option<Duration>,
source: T,
Expand Down

0 comments on commit e881c62

Please sign in to comment.