diff --git a/Cargo.lock b/Cargo.lock index f2cf02140..94b20ccf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5553,6 +5553,7 @@ version = "2.0.0-rc.1" dependencies = [ "actix", "async-jsonrpc-client", + "chrono", "ctrlc", "env_logger", "envy", diff --git a/Justfile b/Justfile index 295859514..825638c8e 100644 --- a/Justfile +++ b/Justfile @@ -26,17 +26,7 @@ versions: cargo clippy -- --version # additional clippy lints -export CLIPPY_LINTS := '-D warnings - -D clippy::cast-lossless - -D clippy::cast-possible-truncation - -D clippy::cast-possible-wrap - -D clippy::cast-precision-loss - -D clippy::cast-sign-loss - -D clippy::checked-conversions - -A clippy::upper-case-acronyms - -A clippy::uninlined-format-args - -A renamed_and_removed_lints -' +export CLIPPY_LINTS := '-D warnings -D clippy::cast-lossless -D clippy::cast-possible-truncation -D clippy::cast-possible-wrap -D clippy::cast-precision-loss -D clippy::cast-sign-loss -D clippy::checked-conversions -A clippy::upper-case-acronyms -A clippy::uninlined-format-args -A renamed_and_removed_lints' # run clippy clippy +flags="": diff --git a/bridges/centralized-ethereum/Cargo.toml b/bridges/centralized-ethereum/Cargo.toml index 48daf541b..ca7fc3442 100644 --- a/bridges/centralized-ethereum/Cargo.toml +++ b/bridges/centralized-ethereum/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] actix = { version = "0.13.0", default-features = false } async-jsonrpc-client = { git = "https://github.com/witnet/async-jsonrpc-client", features = ["tcp"], branch = "fix-tcp-leak" } +chrono = "0.4.38" ctrlc = "3.1.3" env_logger = "0.9.0" envy = "0.4" diff --git a/bridges/centralized-ethereum/src/actors/dr_database.rs b/bridges/centralized-ethereum/src/actors/dr_database.rs index 67046d5eb..d6f36f917 100644 --- a/bridges/centralized-ethereum/src/actors/dr_database.rs +++ b/bridges/centralized-ethereum/src/actors/dr_database.rs @@ -179,6 +179,13 @@ impl Message for SetDrState { type Result = Result<(), ()>; } +/// Count number of data requests in given state +pub struct CountDrsPerState; + +impl Message for CountDrsPerState { + type Result = Result<(u32, u32, u32, u32), ()>; +} + impl Handler for DrDatabase { type Result = (); @@ -274,6 +281,26 @@ impl Handler for DrDatabase { } } +impl Handler for DrDatabase { + type Result = Result<(u32, u32, u32, u32), ()>; + + fn handle(&mut self, _msg: CountDrsPerState, _ctx: &mut Self::Context) -> Self::Result { + Ok(self.dr.iter().fold( + (0u32, 0u32, 0u32, 0u32), + |(mut drs_new, mut drs_pending, mut drs_finished, mut drs_dismissed), + (_dr_id, dr_info)| { + match dr_info.dr_state { + DrState::New => drs_new += 1, + DrState::Pending => drs_pending += 1, + DrState::Finished => drs_finished += 1, + DrState::Dismissed => drs_dismissed += 1, + }; + (drs_new, drs_pending, drs_finished, drs_dismissed) + }, + )) + } +} + /// Required trait for being able to retrieve DrDatabase address from system registry impl actix::Supervised for DrDatabase {} diff --git a/bridges/centralized-ethereum/src/actors/dr_reporter.rs b/bridges/centralized-ethereum/src/actors/dr_reporter.rs index 17b7f9f4d..43dc58749 100644 --- a/bridges/centralized-ethereum/src/actors/dr_reporter.rs +++ b/bridges/centralized-ethereum/src/actors/dr_reporter.rs @@ -653,7 +653,6 @@ mod tests { ]) }) .collect(); - let verbose = true; let params_one = unwrap_batch(batch_results[0].clone()); wrb_contract_abi @@ -661,7 +660,7 @@ mod tests { .and_then(|function| function.encode_input(¶ms_one.into_tokens())) .expect("encode args failed"); - let params_batch = (batch_results, verbose); + let params_batch = batch_results; wrb_contract_abi .function("reportResultBatch") .and_then(|function| function.encode_input(¶ms_batch.into_tokens())) @@ -682,15 +681,11 @@ mod tests { .parse() .unwrap(), topics: vec![ - "0x00e9413c6321ec446a267b7ebf5bb108663f2ef58b35c4f6e18905ac8f205cb2" + "0x4df64445edc775fba59db44b8001852fb1b777eea88fd54f04572dd114e3ff7f" .parse() .unwrap(), ], - data: web3::types::Bytes(vec![ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 248, 117, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 232, 36, 130, 44, 106, 92, - 40, 222, 53, 104, 223, 153, 96, 77, 104, 233, 253, 156, 140, - ]), + data: web3::types::Bytes(hex::decode("0000000000000000000000000000000000000000000000000000000000001b58000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000146e6f7420696e20506f7374656420737461747573000000000000000000000000").unwrap()), block_hash: None, block_number: None, transaction_hash: None, @@ -700,13 +695,9 @@ mod tests { log_type: None, removed: None, }; - assert_eq!( parse_batch_report_error_log(&wrb_contract_abi, log_posted_result), - Some(( - U256::from(63605), - String::from("WitnetOracle: query not in Posted status"), - )) + Some((U256::from(7_000), String::from("not in Posted status"),)) ); } diff --git a/bridges/centralized-ethereum/src/actors/dr_sender/tests.rs b/bridges/centralized-ethereum/src/actors/dr_sender/tests.rs index eb4a8fa30..b6f578809 100644 --- a/bridges/centralized-ethereum/src/actors/dr_sender/tests.rs +++ b/bridges/centralized-ethereum/src/actors/dr_sender/tests.rs @@ -80,8 +80,8 @@ fn deserialize_dr_collateral_one_nanowit() { assert_eq!(total_value, 20_000_000); let dro_bytes = dro.to_pb_bytes().unwrap(); - let err = deserialize_and_validate_dr_bytes(&dro_bytes, 1, total_value).unwrap_err(); - assert_eq!(err.encode_cbor(), vec![216, 39, 129, 24, 224]); + let err = deserialize_and_validate_dr_bytes(&dro_bytes, total_value, 1).unwrap_err(); + assert_eq!(err.encode_cbor(), vec![216, 39, 129, 24, 225]); } #[test] @@ -97,8 +97,8 @@ fn deserialize_dr_value_overflow() { }; let dro_bytes = dro.to_pb_bytes().unwrap(); - let err = deserialize_and_validate_dr_bytes(&dro_bytes, 0, 1).unwrap_err(); - assert_eq!(err.encode_cbor(), vec![216, 39, 129, 24, 224]); + let err = deserialize_and_validate_dr_bytes(&dro_bytes, 1, 1).unwrap_err(); + assert_eq!(err.encode_cbor(), vec![216, 39, 129, 24, 225]); } #[test] diff --git a/bridges/centralized-ethereum/src/actors/eth_poller.rs b/bridges/centralized-ethereum/src/actors/eth_poller.rs index ce0c72972..3a357e2a0 100644 --- a/bridges/centralized-ethereum/src/actors/eth_poller.rs +++ b/bridges/centralized-ethereum/src/actors/eth_poller.rs @@ -111,7 +111,7 @@ impl EthPoller { ); last_dr_id = skip_first; } - while last_dr_id < next_dr_id { + while last_dr_id + 1 < next_dr_id { let init_index = usize::try_from(last_dr_id + 1).unwrap(); let last_index = match next_dr_id.cmp(&(last_dr_id + max_batch_size)) { std::cmp::Ordering::Greater => { diff --git a/bridges/centralized-ethereum/src/actors/mod.rs b/bridges/centralized-ethereum/src/actors/mod.rs index 94a2b84d2..0f80bbc0c 100644 --- a/bridges/centralized-ethereum/src/actors/mod.rs +++ b/bridges/centralized-ethereum/src/actors/mod.rs @@ -12,3 +12,6 @@ pub mod eth_poller; /// wit_poller actor module pub mod wit_poller; + +/// watch_dog actor module +pub mod watch_dog; diff --git a/bridges/centralized-ethereum/src/actors/watch_dog.rs b/bridges/centralized-ethereum/src/actors/watch_dog.rs new file mode 100644 index 000000000..be680e027 --- /dev/null +++ b/bridges/centralized-ethereum/src/actors/watch_dog.rs @@ -0,0 +1,524 @@ +use crate::{ + actors::dr_database::{CountDrsPerState, DrDatabase}, + config::Config, +}; +use actix::prelude::*; +use chrono::{NaiveTime, Timelike, Utc}; +use core::fmt; +use std::{ + convert::TryFrom, + sync::Arc, + time::{Duration, Instant}, +}; +use web3::{ + contract::{self, Contract}, + transports::Http, + types::H160, +}; +use witnet_net::client::tcp::{jsonrpc, JsonRpcClient}; +use witnet_node::utils::stop_system_if_panicking; + +/// EthPoller actor reads periodically new requests from the WRB Contract and includes them +/// in the DrDatabase +#[derive(Default)] +pub struct WatchDog { + /// JSON WIT/RPC client connection to Wit/node + pub wit_client: Option>, + /// JSON WIT/RPC socket address + pub wit_jsonrpc_socket: String, + /// Bridge UTXO min value threshold + pub wit_utxo_min_value_threshold: u64, + /// Web3 object + pub eth_jsonrpc_url: String, + /// Web3 signer address + pub eth_account: H160, + /// WitOracle bridge contract + pub eth_contract: Option>>, + /// Polling period for global status + pub polling_rate_minutes: u32, + /// Instant at which the actor is created + pub start_ts: Option, + /// Eth balance upon first metric report: + pub start_eth_balance: Option, + /// Wit balance upon last refund + pub start_wit_balance: Option, + /// Past data request cumulative counters: + pub drs_history: Option<(u32, u32, u32)>, +} + +impl Drop for WatchDog { + fn drop(&mut self) { + log::trace!("Dropping WatchDog"); + stop_system_if_panicking("WatchDog"); + } +} + +/// Make actor from EthPoller +impl Actor for WatchDog { + /// Every actor has to provide execution Context in which it can run. + type Context = Context; + + /// Method to be executed when the actor is started + fn started(&mut self, ctx: &mut Self::Context) { + log::debug!("WatchDog actor has been started!"); + + self.watch_global_status(None, None, None, ctx); + } +} + +#[derive(Debug, PartialEq)] +enum WatchDogStatus { + EvmBalanceLeak, + EvmDisconnect, + EvmErrors, + EvmSyncing, + WitAlmostSynced, + WitBalanceLow, + WitErrors, + WitDisconnect, + WitSyncing, + WitUtxosLow, + WitWaitingConsensus, + UpAndRestarted, + UpAndRunning, +} + +impl fmt::Display for WatchDogStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + WatchDogStatus::EvmBalanceLeak => write!(f, "evm-balance-leak"), + WatchDogStatus::EvmDisconnect => write!(f, "evm-disconnect"), + WatchDogStatus::EvmErrors => write!(f, "evm-errors"), + WatchDogStatus::EvmSyncing => write!(f, "evm-syncing"), + WatchDogStatus::WitAlmostSynced => write!(f, "wit-almost-synced"), + WatchDogStatus::WitBalanceLow => write!(f, "wit-balance-low"), + WatchDogStatus::WitDisconnect => write!(f, "wit-disconnect"), + WatchDogStatus::WitErrors => write!(f, "wit-errors"), + WatchDogStatus::WitSyncing => write!(f, "wit-syncing"), + WatchDogStatus::WitUtxosLow => write!(f, "wit-utxos-low"), + WatchDogStatus::WitWaitingConsensus => write!(f, "wit-waiting-consensus"), + WatchDogStatus::UpAndRestarted => write!(f, "up-and-restarted"), + WatchDogStatus::UpAndRunning => write!(f, "up-and-running"), + } + } +} + +/// Required trait for being able to retrieve WatchDog address from system registry +impl actix::Supervised for WatchDog {} +impl SystemService for WatchDog {} + +impl WatchDog { + /// Initialize from config + pub fn from_config(config: &Config, eth_contract: Arc>) -> Self { + Self { + wit_client: JsonRpcClient::start(config.witnet_jsonrpc_socket.to_string().as_str()) + .ok(), + wit_jsonrpc_socket: config.witnet_jsonrpc_socket.to_string(), + wit_utxo_min_value_threshold: config.witnet_utxo_min_value_threshold, + eth_account: config.eth_from, + eth_contract: Some(eth_contract), + eth_jsonrpc_url: config.eth_jsonrpc_url.clone(), + polling_rate_minutes: config.watch_dog_polling_rate_minutes, + start_ts: Some(Instant::now()), + start_eth_balance: None, + start_wit_balance: None, + drs_history: None, + } + } + + fn watch_global_status( + &mut self, + eth_balance: Option, + wit_balance: Option, + drs_history: Option<(u32, u32, u32)>, + ctx: &mut Context, + ) { + if self.start_eth_balance.is_none() && eth_balance.is_some() { + self.start_eth_balance = eth_balance; + } + if let Some(wit_balance) = wit_balance { + if wit_balance > self.start_wit_balance.unwrap_or_default() { + self.start_wit_balance = Some(wit_balance); + log::warn!("Wit account refunded to {} $WIT", wit_balance); + } + } + if self.drs_history.is_none() && drs_history.is_some() { + self.drs_history = drs_history; + } + let start_eth_balance = self.start_eth_balance; + let start_wit_balance = self.start_wit_balance; + let wit_client = self.wit_client.clone(); + let wit_jsonrpc_socket = self.wit_jsonrpc_socket.clone(); + let mut wit_next_balance = wit_balance; + let wit_utxo_min_value_threshold = self.wit_utxo_min_value_threshold; + let eth_jsonrpc_url = self.eth_jsonrpc_url.clone(); + let eth_account = self.eth_account; + let eth_contract = self.eth_contract.clone().unwrap(); + let eth_contract_address = eth_contract.address(); + let running_secs = + u32::try_from(self.start_ts.unwrap().elapsed().as_secs()).unwrap_or_default(); + let mut drs_history = drs_history.unwrap_or_default(); + + let fut = async move { + let mut status = WatchDogStatus::UpAndRunning; + + let dr_database = DrDatabase::from_registry(); + let (drs_new, drs_pending, drs_finished, drs_dismissed) = + dr_database.send(CountDrsPerState).await.unwrap().unwrap(); + let total_queries = drs_new + drs_pending + drs_finished + drs_dismissed; + + let mut metrics: String = "{".to_string(); + + metrics.push_str(&format!( + "\"drsCurrentlyPending\": {}, ", + drs_new + drs_pending + )); + + drs_history = if drs_history != (0u32, 0u32, 0u32) { + let daily_queries = (f64::from(total_queries - drs_history.2) + / f64::from(running_secs)) + * 86400_f64; + metrics.push_str(&format!("\"drsDailyQueries\": {:.1}, ", daily_queries)); + + let last_dismissed = drs_dismissed - drs_history.1; + metrics.push_str(&format!("\"drsLastDismissed\": {last_dismissed}, ")); + + let last_reported = drs_finished - drs_history.0; + metrics.push_str(&format!("\"drsLastReported\": {last_reported}, ")); + + // preserve the number of total queries counted upon bridge launch, + // so average queries per day can be averaged: + (drs_finished, drs_dismissed, drs_history.2) + } else { + status = WatchDogStatus::UpAndRestarted; + (drs_finished, drs_dismissed, total_queries) + }; + metrics.push_str(&format!("\"drsTotalQueries\": {total_queries}, ")); + + let eth_balance = match ( + eth_balance, + check_eth_account_balance(ð_jsonrpc_url, eth_account).await, + ) { + (Some(eth_balance), Ok(Some(new_balance))) => { + if status == WatchDogStatus::UpAndRunning && new_balance < eth_balance { + status = WatchDogStatus::EvmBalanceLeak + } + Some(new_balance) + } + (_, Ok(new_balance)) => new_balance, + (_, Err(err)) => { + if status == WatchDogStatus::UpAndRunning { + status = err; + } + None + } + }; + + let eth_contract_class: Option = match eth_contract + .query("class", (), None, contract::Options::default(), None) + .await + { + Ok(version) => Some(version), + Err(err) => { + log::error!( + "Fail to read class() from contract at {:?}: {:?}", + eth_contract_address, + err.to_string() + ); + if status == WatchDogStatus::UpAndRunning { + status = WatchDogStatus::EvmErrors; + } + None + } + }; + + let eth_contract_version: Option = match eth_contract + .query("version", (), None, contract::Options::default(), None) + .await + { + Ok(version) => Some(version), + Err(web3::contract::Error::InterfaceUnsupported) => None, + Err(err) => { + log::error!( + "Fail to read version() from contract at {:?}: {:?}", + eth_contract_address, + err.to_string() + ); + if status == WatchDogStatus::UpAndRunning { + status = WatchDogStatus::EvmErrors; + } + None + } + }; + + metrics.push_str(&format!("\"evmAccount\": \"{eth_account}\", ")); + if eth_balance.is_some() { + let eth_balance = eth_balance.unwrap(); + metrics.push_str(&format!("\"evmBalance\": {:.5}, ", eth_balance)); + metrics.push_str(&format!("\"evmContract\": \"{eth_contract_address}\", ")); + if let Some(eth_contract_class) = eth_contract_class { + if let Some(eth_contract_version) = eth_contract_version { + metrics.push_str(&format!( + "\"evmContractVersion\": \"{}:{}\", ", + eth_contract_class, eth_contract_version + )); + } else { + metrics.push_str(&format!( + "\"evmContractVersion\": {:?}, ", + eth_contract_class + )); + } + } + if let Some(start_eth_balance) = start_eth_balance { + let eth_hourly_earnings = + ((eth_balance - start_eth_balance) / f64::from(running_secs)) * 3600_f64; + metrics.push_str(&format!( + "\"evmHourlyEarnings\": {:.5}, ", + eth_hourly_earnings + )); + } + } + + if let Some(wit_client) = wit_client { + if let Err(err) = check_wit_connection_status(&wit_client).await { + status = err; + } + + let (wit_account, wit_balance, wit_utxos_above_threshold) = + match fetch_wit_info(&wit_client, wit_utxo_min_value_threshold).await { + Ok((wit_account, wit_balance, wit_utxos_above_threshold)) => { + (wit_account, wit_balance, wit_utxos_above_threshold) + } + Err(err) => { + if status == WatchDogStatus::UpAndRunning { + status = err; + } + (None, None, None) + } + }; + + if wit_account.is_some() { + metrics.push_str(&format!("\"witAccount\": {:?}, ", wit_account.unwrap())); + } + if wit_balance.is_some() { + let wit_balance = wit_balance.unwrap(); + metrics.push_str(&format!("\"witBalance\": {:.5}, ", wit_balance)); + if let Some(start_wit_balance) = start_wit_balance { + let wit_hourly_expenditure = ((start_wit_balance - wit_balance) + / f64::from(running_secs)) + * 3600_f64; + metrics.push_str(&format!( + "\"witHourlyExpenditure\": {:.1}, ", + wit_hourly_expenditure + )); + if wit_hourly_expenditure > 0.0 + && wit_balance / wit_hourly_expenditure < 72.0 + && status == WatchDogStatus::UpAndRunning + { + status = WatchDogStatus::WitBalanceLow; + } + } + } + metrics.push_str(&format!("\"witNodeSocket\": \"{wit_jsonrpc_socket}\", ")); + if let Some(wit_utxos_above_threshold) = wit_utxos_above_threshold { + metrics.push_str(&format!( + "\"witUtxosAboveThreshold\": {}, ", + wit_utxos_above_threshold + )); + if wit_utxos_above_threshold < 10 && status == WatchDogStatus::UpAndRunning { + status = WatchDogStatus::WitUtxosLow; + } + } + + wit_next_balance = wit_balance; + } + + metrics.push_str(&format!("\"runningSecs\": {running_secs}, ")); + metrics.push_str(&format!("\"status\": \"{}\"", status)); + metrics.push('}'); + + log::info!("{metrics}"); + + (eth_balance, wit_next_balance, Some(drs_history)) + }; + + ctx.spawn(fut.into_actor(self).then( + move |(eth_balance, wit_balance, drs_history), act, ctx| { + let time_now = Utc::now().time(); + let period_minutes = act.polling_rate_minutes; + let time_next_minute = + period_minutes * (time_now.minute().div_euclid(period_minutes) + 1); + let time_next = if time_next_minute >= 60 { + NaiveTime::from_hms_opt(time_now.hour() + 1, time_next_minute - 60, 0) + } else { + NaiveTime::from_hms_opt(time_now.hour(), time_next_minute, 0) + }; + let dur = if let Some(time_next) = time_next { + let num_nanosecs = (time_next - time_now).num_nanoseconds(); + if let Some(num_nanosecs) = num_nanosecs { + Duration::from_nanos(num_nanosecs.unsigned_abs()) + } else { + Duration::from_secs(u64::from(period_minutes * 60)) + } + } else { + Duration::from_secs(u64::from(period_minutes * 60)) + }; + // Schedule next iteration only when finished, + // as to avoid multiple tasks running in parallel + ctx.run_later(dur, move |act, ctx| { + act.watch_global_status(eth_balance, wit_balance, drs_history, ctx); + }); + actix::fut::ready(()) + }, + )); + } +} + +async fn check_eth_account_balance( + eth_jsonrpc_url: &str, + eth_account: H160, +) -> Result, WatchDogStatus> { + let web3_http = web3::transports::Http::new(eth_jsonrpc_url) + .map_err(|_e| WatchDogStatus::EvmDisconnect) + .unwrap(); + + let web3 = web3::Web3::new(web3_http); + match web3.eth().syncing().await { + Ok(syncing) => match syncing { + web3::types::SyncState::NotSyncing => { + match web3.eth().balance(eth_account, None).await { + Ok(eth_balance) => { + let eth_balance: f64 = eth_balance.to_string().parse().unwrap_or_default(); + Ok(Some(eth_balance / 1000000000000000000.0)) + } + _ => Ok(None), + } + } + web3::types::SyncState::Syncing(_) => Err(WatchDogStatus::EvmSyncing), + }, + Err(e) => { + log::debug!("check_eth_account_balance => {}", e); + + Err(WatchDogStatus::EvmErrors) + } + } +} + +async fn check_wit_connection_status( + wit_client: &Addr, +) -> Result<(), WatchDogStatus> { + let req = jsonrpc::Request::method("syncStatus").timeout(Duration::from_secs(5)); + let res = wit_client.send(req).await; + match res { + Ok(Ok(result)) => { + if let Some(node_state) = result["node_state"].as_str() { + match node_state { + "Synced" => Ok(()), + "AlmostSynced" => Err(WatchDogStatus::WitAlmostSynced), + "WaitingConsensus" => Err(WatchDogStatus::WitWaitingConsensus), + _ => Err(WatchDogStatus::WitSyncing), + } + } else { + log::debug!("check_wit_connection_status => unknown node_state"); + Err(WatchDogStatus::WitErrors) + } + } + Ok(Err(err)) => { + log::debug!("check_wit_connection_status => {}", err); + Err(WatchDogStatus::WitDisconnect) + } + Err(err) => { + log::debug!("check_wit_connection_status => {}", err); + Err(WatchDogStatus::WitDisconnect) + } + } +} + +async fn fetch_wit_info( + wit_client: &Addr, + wit_utxos_min_threshold: u64, +) -> Result<(Option, Option, Option), WatchDogStatus> { + let req = jsonrpc::Request::method("getPkh").timeout(Duration::from_secs(5)); + let res = wit_client.send(req).await; + let wit_account = match res { + Ok(Ok(res)) => match serde_json::from_value::(res) { + Ok(pkh) => Some(pkh), + Err(_) => None, + }, + Ok(Err(_)) => None, + Err(err) => { + log::debug!("fetch_wit_info => {}", err); + return Err(WatchDogStatus::WitErrors); + } + }; + + let wit_account_balance = match wit_account.clone() { + Some(wit_account) => { + let req = jsonrpc::Request::method("getBalance") + .timeout(Duration::from_secs(5)) + .params(wit_account) + .expect("getBalance wrong params"); + let res = wit_client.send(req).await; + let res = match res { + Ok(res) => res, + Err(err) => { + log::debug!("fetch_wit_info => {}", err); + return Err(WatchDogStatus::WitErrors); + } + }; + match res { + Ok(value) => match value.get("total") { + Some(value) => value.as_f64().map(|value| value / 1000000000.0), + None => None, + }, + Err(err) => { + log::debug!("fetch_wit_info => {}", err); + return Err(WatchDogStatus::WitErrors); + } + } + } + None => None, + }; + + let wit_utxos_above_threshold = match wit_account.clone() { + Some(wit_account) => { + let req = jsonrpc::Request::method("getUtxoInfo") + .timeout(Duration::from_secs(5)) + .params(wit_account) + .expect("getUtxoInfo wrong params"); + let res = wit_client.send(req).await; + let res = match res { + Ok(res) => res, + Err(err) => { + log::debug!("fetch_wit_info => {}", err); + return Err(WatchDogStatus::WitErrors); + } + }; + match res { + Ok(utxo_info) => { + if let Some(utxos) = utxo_info["utxos"].as_array() { + let mut counter: u64 = u64::default(); + for utxo in utxos { + if let Some(value) = utxo["value"].as_u64() { + if value >= wit_utxos_min_threshold { + counter += 1; + } + } + } + + Some(counter) + } else { + None + } + } + Err(err) => { + log::debug!("fetch_wit_info => {}", err); + return Err(WatchDogStatus::WitErrors); + } + } + } + None => None, + }; + + Ok((wit_account, wit_account_balance, wit_utxos_above_threshold)) +} diff --git a/bridges/centralized-ethereum/src/config.rs b/bridges/centralized-ethereum/src/config.rs index 20acc11f7..2d1bb8775 100644 --- a/bridges/centralized-ethereum/src/config.rs +++ b/bridges/centralized-ethereum/src/config.rs @@ -37,6 +37,12 @@ pub struct Config { /// Address of the WitnetRequestsBoard contract pub eth_witnet_oracle: H160, + /// Let the dog out? + pub watch_dog_enabled: bool, + /// Watch dog polling rate + #[serde(default = "default_watch_dog_polling_rate_minutes")] + pub watch_dog_polling_rate_minutes: u32, + /// Minimum collateral required on data requests read from the WitnetOracle contract pub witnet_dr_min_collateral_nanowits: u64, /// Maximium data request transaction fee assumed by the bridge @@ -53,6 +59,8 @@ pub struct Config { pub witnet_jsonrpc_socket: SocketAddr, /// Running in the witnet testnet? pub witnet_testnet: bool, + /// Bridge UTXO min value threshold + pub witnet_utxo_min_value_threshold: u64, /// Storage #[serde(deserialize_with = "nested_toml_if_using_envy")] @@ -69,6 +77,10 @@ fn default_max_batch_size() -> u16 { 256 } +fn default_watch_dog_polling_rate_minutes() -> u32 { + 15 +} + /// Gas limits for some methods. If missing, let the client estimate #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] diff --git a/bridges/centralized-ethereum/src/main.rs b/bridges/centralized-ethereum/src/main.rs index aab040692..c0f687bb7 100644 --- a/bridges/centralized-ethereum/src/main.rs +++ b/bridges/centralized-ethereum/src/main.rs @@ -7,7 +7,7 @@ use structopt::StructOpt; use witnet_centralized_ethereum_bridge::{ actors::{ dr_database::DrDatabase, dr_reporter::DrReporter, dr_sender::DrSender, - eth_poller::EthPoller, wit_poller::WitPoller, + eth_poller::EthPoller, watch_dog::WatchDog, wit_poller::WitPoller, }, check_ethereum_node_running, check_witnet_node_running, config, create_wrb_contract, }; @@ -83,6 +83,7 @@ fn run(callback: fn()) -> Result<(), String> { check_ethereum_node_running(&config.eth_jsonrpc_url) .await .expect("ethereum node not running"); + check_witnet_node_running(&config.witnet_jsonrpc_socket.to_string()) .await .expect("witnet node not running"); @@ -94,6 +95,7 @@ fn run(callback: fn()) -> Result<(), String> { // Web3 contract using HTTP transport with an Ethereum client let (web3, wrb_contract) = create_wrb_contract(&config.eth_jsonrpc_url, config.eth_witnet_oracle); + let wrb_contract = Arc::new(wrb_contract); // Start EthPoller actor @@ -102,19 +104,20 @@ fn run(callback: fn()) -> Result<(), String> { SystemRegistry::set(eth_poller_addr); // Start DrReporter actor - let dr_reporter_addr = DrReporter::from_config(&config, web3, wrb_contract).start(); + let dr_reporter_addr = + DrReporter::from_config(&config, web3.clone(), wrb_contract.clone()).start(); SystemRegistry::set(dr_reporter_addr); // Start Json-RPC actor connected to Witnet node let node_client = JsonRpcClient::start(&config.witnet_jsonrpc_socket.to_string()) - .expect("Json-RPC Client actor failed to started"); + .expect("JSON WIT/RPC node client failed to start"); // Start WitPoller actor let wit_poller_addr = WitPoller::from_config(&config, node_client.clone()).start(); SystemRegistry::set(wit_poller_addr); // Start DrSender actor - let dr_sender_addr = DrSender::from_config(&config, node_client).start(); + let dr_sender_addr = DrSender::from_config(&config, node_client.clone()).start(); SystemRegistry::set(dr_sender_addr); // Initialize Storage Manager @@ -124,6 +127,12 @@ fn run(callback: fn()) -> Result<(), String> { .db_path .clone_from(&config.storage.db_path); storage_mngr::start_from_config(node_config); + + // Start WatchDog actor + if config.watch_dog_enabled { + let watch_dog_addr = WatchDog::from_config(&config, wrb_contract.clone()).start(); + SystemRegistry::set(watch_dog_addr); + } }); // Run system diff --git a/bridges/wrb_abi.json b/bridges/wrb_abi.json index 6301e2155..543139cf4 100644 --- a/bridges/wrb_abi.json +++ b/bridges/wrb_abi.json @@ -308,5 +308,18 @@ ], "stateMutability": "view", "type": "function" + }, + { + "inputs": [], + "name": "version", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "stateMutability": "view", + "type": "function" } ] \ No newline at end of file diff --git a/data_structures/src/staking/helpers.rs b/data_structures/src/staking/helpers.rs index 855456d81..d00e26617 100644 --- a/data_structures/src/staking/helpers.rs +++ b/data_structures/src/staking/helpers.rs @@ -25,7 +25,6 @@ pub type Power = u64; /// The resulting type for all the fallible functions in this module. pub type StakesResult = Result>; - /// Pairs a stake key and the stake data it refers. #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] pub struct StakeEntry diff --git a/data_structures/src/staking/mod.rs b/data_structures/src/staking/mod.rs index 5ed43f0ce..8da41b2f0 100644 --- a/data_structures/src/staking/mod.rs +++ b/data_structures/src/staking/mod.rs @@ -12,12 +12,11 @@ pub mod stakes; /// Module re-exporting virtually every submodule on a single level to ease importing of everything /// staking-related. pub mod prelude { - pub use crate::capabilities::*; - pub use super::errors::*; pub use super::helpers::*; pub use super::stake::*; pub use super::stakes::*; + pub use crate::capabilities::*; } #[cfg(test)] diff --git a/witnet_centralized_ethereum_bridge.toml b/witnet_centralized_ethereum_bridge.toml index 3a47efa82..bce7b8976 100644 --- a/witnet_centralized_ethereum_bridge.toml +++ b/witnet_centralized_ethereum_bridge.toml @@ -25,18 +25,23 @@ eth_txs_timeout_ms = 900000 # Address of the WitnetRequestsBoard deployed contract eth_witnet_oracle = "0x77703aE126B971c9946d562F41Dd47071dA00777" +# Let the dog out? +watch_dog_enabled = true + +# Polling period for checking and tracing global status +watch_dog_polling_rate_minutes = 1 # Minimum collateral required on data requests read from the WitnetOracle contract -witnet_dr_min_collateral_nanowits = 20000000000 +witnet_dr_min_collateral_nanowits = 20_000_000_000 # Maximium data request transaction fee assumed by the bridge -witnet_dr_max_fee_nanowits = 100000 +witnet_dr_max_fee_nanowits = 100_000 # Maximum data request result size (in bytes) will accept to report witnet_dr_max_result_size = 64 # Maximum data request value that the bridge will accept to relay -witnet_dr_max_value_nanowits = 100000000000 +witnet_dr_max_value_nanowits = 100_000_000_000 # Polling period for checking resolution of data requests in the Witnet blockchain witnet_dr_txs_polling_rate_ms = 45000 @@ -50,6 +55,8 @@ witnet_jsonrpc_socket = "127.0.0.1:21338" # Running in the witnet testnet? witnet_testnet = false +# Bridge UTXO min value threshold +witnet_utxo_min_value_threshold = 2_000_000_000 [eth_gas_limits] # Gas limits for some methods. @@ -59,3 +66,4 @@ witnet_testnet = false [storage] # Path of the folder where RocksDB storage files will be written to. db_path = ".witnet_bridge/storage" +