-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: break tools out from monitor into agent-utils
- Loading branch information
Showing
24 changed files
with
392 additions
and
329 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ resolver = "2" | |
|
||
members = [ | ||
"accumulator", | ||
"agent-utils", | ||
"nomad-types", | ||
"nomad-core", | ||
"nomad-base", | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
[package] | ||
name = "agent-utils" | ||
version = "0.1.0" | ||
edition = "2021" | ||
authors = ["James Prestwich <[email protected]>"] | ||
description = "Utils for building better agents" | ||
repository = "https://github.com/nomad-xyz/rust" | ||
license = "MIT OR Apache-2.0" | ||
keywords = ["Ethereum", "Nomad"] | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
tokio = { version = "1.0.1", features = ["rt", "macros"] } | ||
tracing = "0.1.35" | ||
ethers = { git = "https://github.com/gakonst/ethers-rs", branch = "master", default-features = false } | ||
|
||
tracing-subscriber = "0.2.15" | ||
eyre = "0.6.8" | ||
warp = "0.3.2" | ||
async-trait = "0.1.56" | ||
futures-util = "0.3.21" | ||
tracing-test = "0.2.3" | ||
|
||
nomad-core = { path = "../nomad-core", default-features = false } | ||
nomad-xyz-configuration = { version = "0.1.0-rc.25", path = "../configuration" } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
use std::collections::HashMap; | ||
|
||
use tokio::{sync::mpsc, task::JoinHandle}; | ||
|
||
pub type Restartable<Task> = JoinHandle<crate::TaskResult<Task>>; | ||
|
||
pub type Faucet<T> = mpsc::UnboundedReceiver<T>; | ||
pub type Sink<T> = mpsc::UnboundedSender<T>; | ||
|
||
pub type NetworkMap<'a, T> = HashMap<&'a str, T>; | ||
pub type HomeReplicaMap<'a, T> = HashMap<&'a str, std::collections::HashMap<&'a str, T>>; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
use nomad_xyz_configuration::{get_builtin, NomadConfig}; | ||
use tracing::Level; | ||
use tracing_subscriber::EnvFilter; | ||
|
||
pub fn config_from_file() -> Option<NomadConfig> { | ||
std::env::var("CONFIG_PATH") | ||
.ok() | ||
.and_then(|path| NomadConfig::from_file(path).ok()) | ||
} | ||
|
||
pub fn config_from_env() -> Option<NomadConfig> { | ||
std::env::var("RUN_ENV") | ||
.ok() | ||
.and_then(|env| get_builtin(&env)) | ||
.map(ToOwned::to_owned) | ||
} | ||
|
||
pub fn config() -> eyre::Result<NomadConfig> { | ||
config_from_file() | ||
.or_else(config_from_env) | ||
.ok_or_else(|| eyre::eyre!("Unable to load config from file or env")) | ||
} | ||
|
||
pub fn init_tracing() { | ||
let builder = tracing_subscriber::FmtSubscriber::builder() | ||
.with_max_level(Level::INFO) | ||
.with_env_filter(EnvFilter::from_default_env()) | ||
.with_level(true); | ||
if std::env::var("MONITOR_PRETTY").is_ok() { | ||
builder.pretty().init() | ||
} else { | ||
builder.json().init() | ||
} | ||
} | ||
|
||
pub fn networks_from_env() -> Option<Vec<String>> { | ||
std::env::var("MONITOR_NETWORKS") | ||
.ok() | ||
.map(|s| s.split(',').map(ToOwned::to_owned).collect()) | ||
} | ||
|
||
pub fn rpc_from_env(network: &str) -> Option<String> { | ||
std::env::var(format!("{}_CONNECTION_URL", network.to_uppercase())).ok() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
pub mod aliases; | ||
pub mod init; | ||
pub mod macros; | ||
pub mod pipe; | ||
pub mod utils; | ||
|
||
use std::panic; | ||
|
||
pub use aliases::*; | ||
|
||
use tokio::task::JoinHandle; | ||
|
||
#[derive(Debug)] | ||
pub enum TaskResult<T> { | ||
Recoverable { | ||
task: T, | ||
err: eyre::Report, | ||
}, | ||
Unrecoverable { | ||
err: eyre::Report, | ||
worth_logging: bool, | ||
}, | ||
} | ||
|
||
pub trait ProcessStep: std::fmt::Display { | ||
fn spawn(self) -> Restartable<Self> | ||
where | ||
Self: 'static + Send + Sync + Sized; | ||
|
||
/// Run the task until it panics. Errors result in a task restart with the | ||
/// same channels. This means that an error causes the task to lose only | ||
/// the data that is in-scope when it faults. | ||
fn run_until_panic(self) -> JoinHandle<()> | ||
where | ||
Self: 'static + Send + Sync + Sized, | ||
{ | ||
let task_description = format!("{}", self); | ||
tokio::spawn(async move { | ||
let mut handle = self.spawn(); | ||
loop { | ||
let result = handle.await; | ||
|
||
let again = match result { | ||
Ok(TaskResult::Recoverable { task, err }) => { | ||
tracing::warn!( | ||
error = %err, | ||
task = task_description.as_str(), | ||
"Restarting task", | ||
); | ||
task | ||
} | ||
|
||
Ok(TaskResult::Unrecoverable { err, worth_logging }) => { | ||
if worth_logging { | ||
tracing::error!(err = %err, task = task_description.as_str(), "Unrecoverable error encountered"); | ||
} else { | ||
tracing::trace!(err = %err, task = task_description.as_str(), "Unrecoverable error encountered"); | ||
} | ||
break; | ||
} | ||
|
||
Err(e) => { | ||
let panic_res = e.try_into_panic(); | ||
|
||
if panic_res.is_err() { | ||
tracing::trace!( | ||
task = task_description.as_str(), | ||
"Internal task cancelled", | ||
); | ||
break; | ||
} | ||
let p = panic_res.unwrap(); | ||
tracing::error!(task = task_description.as_str(), "Internal task panicked"); | ||
panic::resume_unwind(p); | ||
} | ||
}; | ||
|
||
utils::noisy_sleep(15_000).await; | ||
handle = again.spawn(); | ||
} | ||
}) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use crate::{ProcessStep, TaskResult}; | ||
|
||
struct RecoverableTask; | ||
impl std::fmt::Display for RecoverableTask { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "RecoverableTask") | ||
} | ||
} | ||
|
||
impl ProcessStep for RecoverableTask { | ||
fn spawn(self) -> crate::Restartable<Self> | ||
where | ||
Self: 'static + Send + Sync + Sized, | ||
{ | ||
tokio::spawn(async move { | ||
TaskResult::Recoverable { | ||
task: self, | ||
err: eyre::eyre!("This error was recoverable"), | ||
} | ||
}) | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
#[tracing_test::traced_test] | ||
async fn test_recovery() { | ||
let handle = RecoverableTask.run_until_panic(); | ||
tokio::time::sleep(std::time::Duration::from_secs(2)).await; | ||
handle.abort(); | ||
let result = handle.await; | ||
|
||
assert!(logs_contain("RecoverableTask")); | ||
assert!(logs_contain("Restarting task")); | ||
assert!(logs_contain("This error was recoverable")); | ||
assert!(result.is_err() && result.unwrap_err().is_cancelled()); | ||
} | ||
|
||
struct UnrecoverableTask; | ||
impl std::fmt::Display for UnrecoverableTask { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "UnrecoverableTask") | ||
} | ||
} | ||
|
||
impl ProcessStep for UnrecoverableTask { | ||
fn spawn(self) -> crate::Restartable<Self> | ||
where | ||
Self: 'static + Send + Sync + Sized, | ||
{ | ||
tokio::spawn(async move { | ||
TaskResult::Unrecoverable { | ||
err: eyre::eyre!("This error was unrecoverable"), | ||
worth_logging: true, | ||
} | ||
}) | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
#[tracing_test::traced_test] | ||
async fn test_unrecoverable() { | ||
let handle = UnrecoverableTask.run_until_panic(); | ||
let result = handle.await; | ||
assert!(logs_contain("UnrecoverableTask")); | ||
assert!(logs_contain("Unrecoverable error encountered")); | ||
assert!(logs_contain("This error was unrecoverable")); | ||
assert!(result.is_ok()); | ||
} | ||
|
||
struct PanicTask; | ||
impl std::fmt::Display for PanicTask { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "PanicTask") | ||
} | ||
} | ||
|
||
impl ProcessStep for PanicTask { | ||
fn spawn(self) -> crate::Restartable<Self> | ||
where | ||
Self: 'static + Send + Sync + Sized, | ||
{ | ||
tokio::spawn(async move { panic!("intentional panic :)") }) | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
#[tracing_test::traced_test] | ||
async fn test_panic() { | ||
let handle = PanicTask.run_until_panic(); | ||
let result = handle.await; | ||
assert!(logs_contain("PanicTask")); | ||
assert!(logs_contain("Internal task panicked")); | ||
assert!(result.is_err() && result.unwrap_err().is_panic()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
use std::fmt::Debug; | ||
|
||
use eyre::bail; | ||
|
||
use crate::aliases::*; | ||
|
||
#[derive(Debug)] | ||
pub struct Pipe<T> { | ||
rx: Faucet<T>, | ||
tx: Sink<T>, | ||
contents: Option<T>, | ||
} | ||
|
||
impl<T> Pipe<T> | ||
where | ||
T: Debug + Send + Sync + 'static, | ||
{ | ||
pub fn new(rx: Faucet<T>, tx: Sink<T>, contents: Option<T>) -> Self { | ||
Self { rx, tx, contents } | ||
} | ||
|
||
pub fn read(&self) -> Option<&T> { | ||
self.contents.as_ref() | ||
} | ||
|
||
pub fn finish(&mut self) -> eyre::Result<()> { | ||
if let Some(contents) = self.contents.take() { | ||
self.tx.send(contents)?; | ||
} | ||
Ok(()) | ||
} | ||
|
||
pub async fn next(&mut self) -> eyre::Result<&T> { | ||
self.finish()?; | ||
|
||
self.contents = self.rx.recv().await; | ||
if self.contents.is_none() { | ||
bail!("rx broke") | ||
} | ||
Ok(self.read().expect("checked")) | ||
} | ||
} |
Oops, something went wrong.