diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs new file mode 100644 index 00000000..1d4149fe --- /dev/null +++ b/common/src/escrow_accounts.rs @@ -0,0 +1,165 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::HashMap, time::Duration}; + +use alloy_primitives::Address; +use anyhow::Result; +use ethers_core::types::U256; +use eventuals::{timer, Eventual, EventualExt}; +use log::warn; +use serde::Deserialize; +use serde_json::json; +use tokio::time::sleep; + +use crate::prelude::SubgraphClient; + +pub fn escrow_accounts( + escrow_subgraph: &'static SubgraphClient, + indexer_address: Address, + interval: Duration, +) -> Eventual> { + // Types for deserializing the network subgraph response + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct EscrowAccountsResponse { + escrow_accounts: Vec, + } + // These 2 structs are used to deserialize the response from the escrow subgraph. + // Note that U256's serde implementation is based on serializing the internal bytes, not the string decimal + // representation. This is why we deserialize them as strings below. + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct EscrowAccount { + balance: String, + total_amount_thawing: String, + sender: Sender, + } + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct Sender { + id: Address, + } + + timer(interval).map_with_retry( + move |_| async move { + let response = escrow_subgraph + .query::(&json!({ + "query": r#" + query ($indexer: ID!) { + escrowAccounts(where: {receiver_: {id: $indexer}}) { + balance + totalAmountThawing + sender { + id + } + } + } + "#, + "variables": { + "indexer": indexer_address, + } + } + )) + .await + .map_err(|e| e.to_string())?; + + // If there are any GraphQL errors returned, we'll log them for debugging + if let Some(errors) = response.errors { + warn!( + "Errors encountered fetching escrow accounts for indexer {:?}: {}", + indexer_address, + errors + .into_iter() + .map(|e| e.message) + .collect::>() + .join(", ") + ); + } + + let sender_accounts = response + .data + .map_or(vec![], |data| data.escrow_accounts) + .iter() + .map(|account| { + let balance = U256::checked_sub( + U256::from_dec_str(&account.balance)?, + U256::from_dec_str(&account.total_amount_thawing)?, + ) + .unwrap_or_else(|| { + warn!( + "Balance minus total amount thawing underflowed for account {}. \ + Setting balance to 0, no queries will be served for this sender.", + account.sender.id + ); + U256::from(0) + }); + + Ok((account.sender.id, balance)) + }) + .collect::, anyhow::Error>>() + .map_err(|e| format!("{}", e))?; + + Ok(sender_accounts) + }, + move |err: String| { + warn!( + "Failed to fetch escrow accounts for indexer {:?}: {}", + indexer_address, err + ); + + sleep(interval.div_f32(2.0)) + }, + ) +} + +#[cfg(test)] +mod tests { + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + use crate::test_vectors; + + use super::*; + + #[tokio::test] + async fn test_current_accounts() { + // Set up a mock escrow subgraph + let mock_server = MockServer::start().await; + let escrow_subgraph_endpoint = SubgraphClient::local_deployment_endpoint( + &mock_server.uri(), + &test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT, + ) + .unwrap(); + let escrow_subgraph = Box::leak(Box::new( + SubgraphClient::new( + Some(&mock_server.uri()), + Some(&test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT), + escrow_subgraph_endpoint.as_ref(), + ) + .unwrap(), + )); + + let mock = Mock::given(method("POST")) + .and(path(format!( + "/subgraphs/id/{}", + *test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT + ))) + .respond_with( + ResponseTemplate::new(200) + .set_body_raw(test_vectors::ESCROW_QUERY_RESPONSE, "application/json"), + ); + mock_server.register(mock).await; + + let accounts = escrow_accounts( + escrow_subgraph, + *test_vectors::INDEXER_ADDRESS, + Duration::from_secs(60), + ); + + assert_eq!( + accounts.value().await.unwrap(), + *test_vectors::ESCROW_ACCOUNTS + ); + } +} diff --git a/common/src/escrow_monitor.rs b/common/src/escrow_monitor.rs deleted file mode 100644 index df7054de..00000000 --- a/common/src/escrow_monitor.rs +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::collections::HashMap; -use std::sync::Arc; - -use alloy_primitives::Address; -use anyhow::Result; -use ethers_core::types::U256; -use log::{error, info, warn}; -use serde::Deserialize; -use serde_json::json; -use tokio::sync::RwLock; - -use crate::prelude::SubgraphClient; - -#[derive(Debug)] -struct EscrowMonitorInner { - escrow_subgraph: &'static SubgraphClient, - indexer_address: Address, - interval_ms: u64, - sender_accounts: Arc>>, -} - -#[cfg_attr(test, faux::create)] -#[derive(Debug, Clone)] -pub struct EscrowMonitor { - _monitor_handle: Arc>, - inner: Arc, -} - -#[cfg_attr(test, faux::methods)] -impl EscrowMonitor { - pub async fn new( - escrow_subgraph: &'static SubgraphClient, - indexer_address: Address, - interval_ms: u64, - ) -> Result { - let sender_accounts = Arc::new(RwLock::new(HashMap::new())); - - let inner = Arc::new(EscrowMonitorInner { - escrow_subgraph, - indexer_address, - interval_ms, - sender_accounts, - }); - - let inner_clone = inner.clone(); - - let monitor = EscrowMonitor { - _monitor_handle: Arc::new(tokio::spawn(async move { - EscrowMonitor::monitor_loop(&inner_clone).await.unwrap(); - })), - inner, - }; - - Ok(monitor) - } - - async fn current_accounts( - escrow_subgraph: &'static SubgraphClient, - indexer_address: &Address, - ) -> Result> { - // Types for deserializing the network subgraph response - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct EscrowAccountsResponse { - escrow_accounts: Vec, - } - // These 2 structs are used to deserialize the response from the escrow subgraph. - // Note that U256's serde implementation is based on serializing the internal bytes, not the string decimal - // representation. This is why we deserialize them as strings below. - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct EscrowAccount { - balance: String, - total_amount_thawing: String, - sender: Sender, - } - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct Sender { - id: Address, - } - - let response = escrow_subgraph - .query::(&json!({ - "query": r#" - query ($indexer: ID!) { - escrowAccounts(where: {receiver_: {id: $indexer}}) { - balance - totalAmountThawing - sender { - id - } - } - } - "#, - "variables": { - "indexer": indexer_address, - } - } - )) - .await?; - - // If there are any GraphQL errors returned, we'll log them for debugging - if let Some(errors) = response.errors { - warn!( - "Errors encountered fetching escrow accounts for indexer {:?}: {}", - indexer_address, - errors - .into_iter() - .map(|e| e.message) - .collect::>() - .join(", ") - ); - } - - let sender_accounts = response - .data - .map_or(vec![], |data| data.escrow_accounts) - .iter() - .map(|account| { - let balance = U256::checked_sub( - U256::from_str_radix(&account.balance, 16)?, - U256::from_str_radix(&account.total_amount_thawing, 16)?, - ) - .unwrap_or_else(|| { - warn!( - "Balance minus total amount thawing underflowed for account {}. \ - Setting balance to 0, no queries will be served for this sender.", - account.sender.id - ); - U256::from(0) - }); - - Ok((account.sender.id, balance)) - }) - .collect::, anyhow::Error>>()?; - - Ok(sender_accounts) - } - - async fn update_accounts(inner: &Arc) -> Result<(), anyhow::Error> { - *(inner.sender_accounts.write().await) = - Self::current_accounts(inner.escrow_subgraph, &inner.indexer_address).await?; - Ok(()) - } - - async fn monitor_loop(inner: &Arc) -> Result<()> { - loop { - match Self::update_accounts(inner).await { - Ok(_) => { - info!("Updated escrow accounts"); - } - Err(e) => { - error!("Error updating escrow accounts: {}", e); - } - } - - tokio::time::sleep(tokio::time::Duration::from_millis(inner.interval_ms)).await; - } - } - - pub async fn get_accounts(&self) -> tokio::sync::RwLockReadGuard<'_, HashMap> { - self.inner.sender_accounts.read().await - } - - /// Returns true if the given address has a non-zero balance in the escrow contract. - /// - /// Note that this method does not take into account the outstanding TAP balance (Escrow balance - TAP receipts). - pub async fn is_sender_eligible(&self, address: &Address) -> bool { - self.inner.sender_accounts.read().await.get(address) > Some(&U256::from(0)) - } -} - -#[cfg(test)] -mod tests { - use wiremock::matchers::{method, path}; - use wiremock::{Mock, MockServer, ResponseTemplate}; - - use crate::test_vectors; - use crate::test_vectors::{ESCROW_SUBGRAPH_DEPLOYMENT, INDEXER_ADDRESS}; - - use super::*; - - #[tokio::test] - async fn test_current_accounts() { - // Set up a mock escrow subgraph - let mock_server = MockServer::start().await; - let escrow_subgraph_endpoint = SubgraphClient::local_deployment_endpoint( - &mock_server.uri(), - &test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT, - ) - .unwrap(); - let escrow_subgraph = Box::leak(Box::new( - SubgraphClient::new( - Some(&mock_server.uri()), - Some(&test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT), - escrow_subgraph_endpoint.as_ref(), - ) - .unwrap(), - )); - - let mock = Mock::given(method("POST")) - .and(path(format!( - "/subgraphs/id/{}", - *ESCROW_SUBGRAPH_DEPLOYMENT - ))) - .respond_with( - ResponseTemplate::new(200) - .set_body_raw(test_vectors::ESCROW_QUERY_RESPONSE, "application/json"), - ); - mock_server.register(mock).await; - - let accounts = EscrowMonitor::current_accounts(escrow_subgraph, &INDEXER_ADDRESS) - .await - .unwrap(); - - assert_eq!(accounts, *test_vectors::ESCROW_ACCOUNTS); - } -} diff --git a/common/src/lib.rs b/common/src/lib.rs index f15d7c8c..37191e20 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -3,7 +3,7 @@ pub mod allocations; pub mod attestations; -pub mod escrow_monitor; +pub mod escrow_accounts; pub mod graphql; pub mod signature_verification; pub mod subgraph_client; @@ -18,6 +18,6 @@ pub mod prelude { pub use super::attestations::{ dispute_manager::dispute_manager, signer::AttestationSigner, signers::attestation_signers, }; - pub use super::escrow_monitor::EscrowMonitor; + pub use super::escrow_accounts::escrow_accounts; pub use super::subgraph_client::SubgraphClient; } diff --git a/service/src/main.rs b/service/src/main.rs index 76ba3126..e27a9c35 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -9,7 +9,7 @@ use std::{net::SocketAddr, str::FromStr, time::Duration}; use tracing::info; use indexer_common::prelude::{ - attestation_signers, dispute_manager, indexer_allocations, EscrowMonitor, SubgraphClient, + attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, SubgraphClient, }; use util::{package_version, shutdown_signal}; @@ -113,18 +113,16 @@ async fn main() -> Result<(), std::io::Error> { .expect("Failed to set up escrow subgraph client"), )); - let escrow_monitor = EscrowMonitor::new( + let escrow_accounts = escrow_accounts( escrow_subgraph, config.ethereum.indexer_address, - config.escrow_subgraph.escrow_syncing_interval, - ) - .await - .expect("Initialize escrow monitor"); + Duration::from_secs(config.escrow_subgraph.escrow_syncing_interval), + ); let tap_manager = tap_manager::TapManager::new( indexer_management_db.clone(), indexer_allocations, - escrow_monitor, + escrow_accounts, // TODO: arguments for eip712_domain should be a config eip712_domain! { name: "TapManager", diff --git a/service/src/tap_manager.rs b/service/src/tap_manager.rs index c85da973..394f5673 100644 --- a/service/src/tap_manager.rs +++ b/service/src/tap_manager.rs @@ -3,8 +3,9 @@ use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; +use ethers_core::types::U256; use eventuals::Eventual; -use indexer_common::prelude::{Allocation, EscrowMonitor}; +use indexer_common::prelude::Allocation; use log::error; use sqlx::{types::BigDecimal, PgPool}; use std::{collections::HashMap, sync::Arc}; @@ -15,7 +16,7 @@ use crate::query_processor::QueryError; #[derive(Clone)] pub struct TapManager { indexer_allocations: Eventual>, - escrow_monitor: EscrowMonitor, + escrow_accounts: Eventual>, pgpool: PgPool, domain_separator: Arc, } @@ -24,12 +25,12 @@ impl TapManager { pub fn new( pgpool: PgPool, indexer_allocations: Eventual>, - escrow_monitor: EscrowMonitor, + escrow_accounts: Eventual>, domain_separator: Eip712Domain, ) -> Self { Self { indexer_allocations, - escrow_monitor, + escrow_accounts, pgpool, domain_separator: Arc::new(domain_separator), } @@ -63,9 +64,11 @@ impl TapManager { QueryError::Other(anyhow::Error::from(e)) })?; if !self - .escrow_monitor - .is_sender_eligible(&receipt_signer) + .escrow_accounts + .value() .await + .map(|accounts| accounts.contains_key(&receipt_signer)) + .unwrap_or(false) { return Err(QueryError::Other(anyhow::Error::msg(format!( "Receipt's sender ({}) is not eligible for this indexer", @@ -112,6 +115,8 @@ mod test { use tap_core::{eip_712_signed_message::EIP712SignedMessage, tap_receipt::Receipt}; use toolshed::thegraph::DeploymentId; + use crate::test_vectors; + use super::*; /// Fixture to generate a wallet and address @@ -189,7 +194,7 @@ mod test { previous_epoch_start_block_hash: None, created_at_block_hash: H256::zero().to_string(), created_at_epoch: 0, - indexer: Address::ZERO, + indexer: *test_vectors::INDEXER_ADDRESS, query_fee_rebates: None, query_fees_collected: None, }; @@ -197,16 +202,14 @@ mod test { vec![(allocation_id, allocation)].into_iter(), )); - // Mock escrow monitor - let mut mock_escrow_monitor = EscrowMonitor::faux(); - faux::when!(mock_escrow_monitor.is_sender_eligible).then_return(true); + // Mock escrow accounts + let escrow_accounts = Eventual::from_value(HashMap::from_iter(vec![( + *test_vectors::INDEXER_ADDRESS, + U256::from(123), + )])); - let tap_manager = TapManager::new( - pgpool.clone(), - indexer_allocations, - mock_escrow_monitor, - domain, - ); + let tap_manager = + TapManager::new(pgpool.clone(), indexer_allocations, escrow_accounts, domain); tap_manager .verify_and_store_receipt(signed_receipt.clone()) diff --git a/service/src/test_vectors.rs b/service/src/test_vectors.rs index fa40c68a..b21c78e7 100644 --- a/service/src/test_vectors.rs +++ b/service/src/test_vectors.rs @@ -1,4 +1,12 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -pub const INDEXER_ADDRESS: &str = "0x1234567890123456789012345678901234567890"; +use std::str::FromStr; + +use alloy_primitives::Address; +use lazy_static::lazy_static; + +lazy_static! { + pub static ref INDEXER_ADDRESS: Address = + Address::from_str("0x1234567890123456789012345678901234567890").unwrap(); +}