diff --git a/bridges/centralized-ethereum/src/actors/mod.rs b/bridges/centralized-ethereum/src/actors/mod.rs index 94a2b84d2..0f80bbc0c 100644 --- a/bridges/centralized-ethereum/src/actors/mod.rs +++ b/bridges/centralized-ethereum/src/actors/mod.rs @@ -12,3 +12,6 @@ pub mod eth_poller; /// wit_poller actor module pub mod wit_poller; + +/// watch_dog actor module +pub mod watch_dog; diff --git a/bridges/centralized-ethereum/src/actors/watch_dog.rs b/bridges/centralized-ethereum/src/actors/watch_dog.rs new file mode 100644 index 000000000..1c6633148 --- /dev/null +++ b/bridges/centralized-ethereum/src/actors/watch_dog.rs @@ -0,0 +1,364 @@ +use crate::{ + actors::dr_database::{CountDrsPerState, DrDatabase}, + config::Config, +}; +use actix::prelude::*; +use async_jsonrpc_client::{transports::tcp::TcpSocket, Transport}; +use futures_util::compat::Compat01As03; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; +use web3::{ + contract::Contract, + transports::Http, + types::{H160, U256}, +}; +use witnet_net::client::tcp::{jsonrpc, JsonRpcClient}; +use witnet_node::utils::stop_system_if_panicking; + +/// EthPoller actor reads periodically new requests from the WRB Contract and includes them +/// in the DrDatabase +#[derive(Default)] +pub struct WatchDog { + /// JSON RPC connection to Wit/node + pub wit_jsonrpc_socket: String, + /// Bridge UTXO min value threshold + pub wit_utxo_min_value_threshold: u64, + /// Web3 object + pub eth_jsonrpc_url: String, + /// Web3 signer address + pub eth_account: H160, + /// WitOracle bridge contract + pub eth_contract: Option>>, + /// Polling period for global status + pub polling_rate_ms: u64, + /// Instant at which the actor is created + pub start_ts: Option, + /// Eth balance upon first metric report: + pub start_eth_balance: Option, + /// Wit balance upon last refund + pub start_wit_balance: Option, +} + +#[derive(Serialize, Deserialize)] +struct WatchDogOutput { + pub running_secs: u64, +} + +impl Drop for WatchDog { + fn drop(&mut self) { + log::trace!("Dropping WatchDog"); + stop_system_if_panicking("WatchDog"); + } +} + +/// Make actor from EthPoller +impl Actor for WatchDog { + /// Every actor has to provide execution Context in which it can run. + type Context = Context; + + /// Method to be executed when the actor is started + fn started(&mut self, ctx: &mut Self::Context) { + log::debug!("WatchDog actor has been started!"); + + self.watch_global_status(None, None, ctx, Duration::from_millis(self.polling_rate_ms)); + } +} + +/// Required trait for being able to retrieve WatchDog address from system registry +impl actix::Supervised for WatchDog {} +impl SystemService for WatchDog {} + +impl WatchDog { + /// Initialize from config + pub fn from_config(config: &Config, eth_contract: Arc>) -> Self { + Self { + wit_jsonrpc_socket: config.witnet_jsonrpc_socket.to_string(), + wit_utxo_min_value_threshold: config.witnet_utxo_min_value_threshold, + eth_account: config.eth_from, + eth_contract: Some(eth_contract), + eth_jsonrpc_url: config.eth_jsonrpc_url.clone(), + polling_rate_ms: config.watch_dog_polling_rate_ms, + start_ts: Some(Instant::now()), + start_eth_balance: None, + start_wit_balance: None, + } + } + + fn watch_global_status( + &mut self, + eth_balance: Option, + wit_balance: Option, + ctx: &mut Context, + period: Duration, + ) { + if self.start_eth_balance.is_none() && eth_balance.is_some() { + self.start_eth_balance = eth_balance; + } + if let Some(wit_balance) = wit_balance { + if wit_balance > self.start_wit_balance.unwrap_or_default() { + self.start_wit_balance = Some(wit_balance); + log::warn!("Wit account refunded to {} $WIT", wit_balance); + } + } + let start_eth_balance = self.start_eth_balance; + let start_wit_balance = self.start_wit_balance; + let wit_jsonrpc_socket = self.wit_jsonrpc_socket.clone(); + let wit_utxo_min_value_threshold = self.wit_utxo_min_value_threshold; + let eth_jsonrpc_url = self.eth_jsonrpc_url.clone(); + let eth_account = self.eth_account; + let eth_contract_address = self.eth_contract.clone().unwrap().address(); + let running_secs = self.start_ts.unwrap().elapsed().as_secs(); + + let fut = async move { + let mut status = "up-and-running".to_string(); + + if let Err(err) = check_wit_connection_status(&wit_jsonrpc_socket).await { + status = err; + } + let wit_client = JsonRpcClient::start(&wit_jsonrpc_socket) + .expect("cannot start JSON/WIT connection"); + let wit_account = match fetch_wit_account(&wit_client).await { + Ok(pkh) => pkh, + Err(err) => { + if status.eq("up-and-running") { + status = err; + } + None + } + }; + + let wit_balance = match wit_account.clone() { + Some(pkh) => match fetch_wit_account_balance(&wit_client, pkh.as_str()).await { + Ok(wit_balance) => wit_balance, + Err(err) => { + if status.eq("up-and-running") { + status = err; + } + None + } + }, + None => None, + }; + + let wit_utxos_above_threshold = match wit_account.clone() { + Some(pkh) => { + match fetch_wit_account_count_utxos_above( + &wit_client, + pkh.as_str(), + wit_utxo_min_value_threshold, + ) + .await + { + Ok(wit_utxos_above_threshold) => wit_utxos_above_threshold, + Err(err) => { + if status.eq("up-and-running") { + status = err; + } + None + } + } + } + None => None, + }; + + let eth_balance = match check_eth_account_balance(ð_jsonrpc_url, eth_account).await { + Ok(Some(eth_balance)) => { + let eth_balance: f64 = eth_balance.to_string().parse().unwrap_or_default(); + //Some(Unit::Wei(ð_balance.to_string()).to_eth_str().unwrap_or_default()), + Some(eth_balance / 1000000000000000000.0) + } + Ok(None) => None, + Err(err) => { + if status.eq("up-and-running") { + status = err; + } + None + } + }; + + let dr_database = DrDatabase::from_registry(); + let (_, drs_pending, drs_finished, _) = + dr_database.send(CountDrsPerState).await.unwrap().unwrap(); + + let mut metrics: String = "{".to_string(); + metrics.push_str(&format!("\"drsFinished\": {drs_finished}, ")); + metrics.push_str(&format!("\"drsPending\": {drs_pending}, ")); + metrics.push_str(&format!("\"evmAccount\": \"{eth_account}\", ")); + if eth_balance.is_some() { + let eth_balance = eth_balance.unwrap(); + metrics.push_str(&format!("\"evmBalance\": {:.5}, ", eth_balance)); + metrics.push_str(&format!("\"evmContract\": \"{eth_contract_address}\", ")); + if let Some(start_eth_balance) = start_eth_balance { + let eth_hourly_earnings = + ((eth_balance - start_eth_balance) / running_secs as f64) * 3600_f64; + metrics.push_str(&format!( + "\"evmHourlyEarnings\": {:.5}, ", + eth_hourly_earnings + )); + } + } + if wit_account.is_some() { + metrics.push_str(&format!("\"witAccount\": {:?}, ", wit_account.unwrap())); + } + if wit_balance.is_some() { + let wit_balance = wit_balance.unwrap(); + metrics.push_str(&format!("\"witBalance\": {:.5}, ", wit_balance)); + if let Some(start_wit_balance) = start_wit_balance { + let wit_hourly_expenditure = + ((start_wit_balance - wit_balance) / running_secs as f64) * 3600_f64; + metrics.push_str(&format!( + "\"witHourlyExpenditure\": {:.1}, ", + wit_hourly_expenditure + )); + } + } + metrics.push_str(&format!("\"witNodeSocket\": \"{}\", ", wit_jsonrpc_socket)); + if wit_utxos_above_threshold.is_some() { + metrics.push_str(&format!( + "\"witUtxosAboveThreshold\": {}, ", + wit_utxos_above_threshold.unwrap() + )); + } + metrics.push_str(&format!("\"runningSecs\": {running_secs}, ")); + metrics.push_str(&format!("\"status\": \"{status}\"")); + metrics.push_str("}}"); + log::info!("{metrics}"); + + (eth_balance, wit_balance) + }; + + ctx.spawn( + fut.into_actor(self) + .then(move |(eth_balance, wit_balance), _act, ctx| { + // Schedule next iteration only when finished, + // as to avoid multiple tasks running in parallel + ctx.run_later(period, move |act, ctx| { + act.watch_global_status(eth_balance, wit_balance, ctx, period); + }); + actix::fut::ready(()) + }), + ); + } +} + +async fn check_eth_account_balance( + eth_jsonrpc_url: &str, + eth_account: H160, +) -> Result, String> { + let web3_http = web3::transports::Http::new(eth_jsonrpc_url) + .map_err(|_e| "evm-disconnect".to_string()) + .unwrap(); + + let web3 = web3::Web3::new(web3_http); + match web3.eth().syncing().await { + Ok(syncing) => match syncing { + web3::types::SyncState::NotSyncing => { + match web3.eth().balance(eth_account, None).await { + Ok(balance) => Ok(Some(balance)), + _ => Ok(None), + } + } + web3::types::SyncState::Syncing(_) => Err("evm-syncing".to_string()), + }, + Err(_e) => Err("evm-errors".to_string()), + } +} + +async fn check_wit_connection_status(wit_jsonrpc_socket: &str) -> Result<(), String> { + let (_handle, wit_client) = TcpSocket::new(wit_jsonrpc_socket).unwrap(); + let wit_client = Arc::new(wit_client); + let res = wit_client.execute("syncStatus", json!(null)); + let res = Compat01As03::new(res); + let res = tokio::time::timeout(Duration::from_secs(5), res).await; + + match res { + Ok(Ok(_)) => Ok(()), + Ok(Err(_)) => Err("wit-syncing".to_string()), + Err(_elapse) => Err("wit-disconnect".to_string()), + } +} + +async fn fetch_wit_account(wit_client: &Addr) -> Result, String> { + let req = jsonrpc::Request::method("getPkh").timeout(Duration::from_secs(5)); + let res = wit_client.send(req).await; + match res { + Ok(Ok(res)) => match serde_json::from_value::(res) { + Ok(pkh) => Ok(Some(pkh)), + Err(_) => Ok(None), + }, + Ok(Err(_)) => Ok(None), + Err(_) => Err("wit-errors-getPkh".to_string()), + } +} + +async fn fetch_wit_account_balance( + wit_client: &Addr, + wit_account: &str, +) -> Result, String> { + let req = jsonrpc::Request::method("getBalance") + .timeout(Duration::from_secs(5)) + .params(vec![wit_account, "true"]) + .expect("getBalance wrong params"); + + let res = wit_client.send(req).await; + let res = match res { + Ok(res) => res, + Err(_) => { + return Err("wit-errors-getBalance".to_string()); + } + }; + + match res { + Ok(value) => match value.get("total") { + Some(value) => match value.as_f64() { + Some(value) => Ok(Some(value / 1000000000.0)), + None => Ok(None), + }, + None => Ok(None), + }, + Err(_) => Err("wit-errors-getBalance".to_string()), + } +} + +async fn fetch_wit_account_count_utxos_above( + wit_client: &Addr, + wit_account: &str, + threshold: u64, +) -> Result, String> { + let req = jsonrpc::Request::method("getUtxoInfo") + .timeout(Duration::from_secs(5)) + .params(wit_account) + .expect("getUtxoInfo wrong params"); + + let res = wit_client.send(req).await; + let res = match res { + Ok(res) => res, + Err(_) => { + return Err("wit-errors-getUtxoInfo".to_string()); + } + }; + + match res { + Ok(utxo_info) => { + if let Some(utxos) = utxo_info["utxos"].as_array() { + let mut counter: u64 = u64::default(); + for utxo in utxos { + if let Some(value) = utxo["value"].as_u64() { + if value >= threshold { + counter += 1; + } + } + } + + Ok(Some(counter)) + } else { + Ok(None) + } + } + Err(_) => Err("wit-errors-getUtxoInfo".to_string()), + } +} diff --git a/bridges/centralized-ethereum/src/main.rs b/bridges/centralized-ethereum/src/main.rs index aab040692..c0f687bb7 100644 --- a/bridges/centralized-ethereum/src/main.rs +++ b/bridges/centralized-ethereum/src/main.rs @@ -7,7 +7,7 @@ use structopt::StructOpt; use witnet_centralized_ethereum_bridge::{ actors::{ dr_database::DrDatabase, dr_reporter::DrReporter, dr_sender::DrSender, - eth_poller::EthPoller, wit_poller::WitPoller, + eth_poller::EthPoller, watch_dog::WatchDog, wit_poller::WitPoller, }, check_ethereum_node_running, check_witnet_node_running, config, create_wrb_contract, }; @@ -83,6 +83,7 @@ fn run(callback: fn()) -> Result<(), String> { check_ethereum_node_running(&config.eth_jsonrpc_url) .await .expect("ethereum node not running"); + check_witnet_node_running(&config.witnet_jsonrpc_socket.to_string()) .await .expect("witnet node not running"); @@ -94,6 +95,7 @@ fn run(callback: fn()) -> Result<(), String> { // Web3 contract using HTTP transport with an Ethereum client let (web3, wrb_contract) = create_wrb_contract(&config.eth_jsonrpc_url, config.eth_witnet_oracle); + let wrb_contract = Arc::new(wrb_contract); // Start EthPoller actor @@ -102,19 +104,20 @@ fn run(callback: fn()) -> Result<(), String> { SystemRegistry::set(eth_poller_addr); // Start DrReporter actor - let dr_reporter_addr = DrReporter::from_config(&config, web3, wrb_contract).start(); + let dr_reporter_addr = + DrReporter::from_config(&config, web3.clone(), wrb_contract.clone()).start(); SystemRegistry::set(dr_reporter_addr); // Start Json-RPC actor connected to Witnet node let node_client = JsonRpcClient::start(&config.witnet_jsonrpc_socket.to_string()) - .expect("Json-RPC Client actor failed to started"); + .expect("JSON WIT/RPC node client failed to start"); // Start WitPoller actor let wit_poller_addr = WitPoller::from_config(&config, node_client.clone()).start(); SystemRegistry::set(wit_poller_addr); // Start DrSender actor - let dr_sender_addr = DrSender::from_config(&config, node_client).start(); + let dr_sender_addr = DrSender::from_config(&config, node_client.clone()).start(); SystemRegistry::set(dr_sender_addr); // Initialize Storage Manager @@ -124,6 +127,12 @@ fn run(callback: fn()) -> Result<(), String> { .db_path .clone_from(&config.storage.db_path); storage_mngr::start_from_config(node_config); + + // Start WatchDog actor + if config.watch_dog_enabled { + let watch_dog_addr = WatchDog::from_config(&config, wrb_contract.clone()).start(); + SystemRegistry::set(watch_dog_addr); + } }); // Run system