From a7da3f91d99486e32258048579cab3f854550165 Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Fri, 12 Jan 2024 15:49:10 -0800 Subject: [PATCH] fix: support sender signers Signed-off-by: Alexis Asseman --- common/src/escrow_accounts.rs | 179 ++++++++++++++---- .../indexer_service/http/indexer_service.rs | 1 + common/src/tap_manager.rs | 34 ++-- common/src/test_vectors.rs | 83 +++++++- migrations/20230912220523_tap_receipts.up.sql | 6 +- tap-agent/src/agent.rs | 1 + tap-agent/src/tap/escrow_adapter.rs | 99 +++++++--- tap-agent/src/tap/mod.rs | 23 +++ tap-agent/src/tap/rav_storage_adapter.rs | 6 +- tap-agent/src/tap/receipt_checks_adapter.rs | 10 +- tap-agent/src/tap/receipt_storage_adapter.rs | 110 ++++++++--- .../src/tap/sender_allocation_relationship.rs | 63 +++--- ...sender_allocation_relationships_manager.rs | 91 ++++++--- tap-agent/src/tap/test_utils.rs | 13 +- 14 files changed, 541 insertions(+), 178 deletions(-) diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index a7bcb4b1c..d8067f793 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -13,18 +13,48 @@ use tracing::{error, warn}; use crate::prelude::{Query, SubgraphClient}; +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct EscrowAccounts { + pub balances: HashMap, + pub signers_to_senders: HashMap, + pub senders_to_signers: HashMap>, +} + +impl EscrowAccounts { + pub fn new( + balances: HashMap, + senders_to_signers: HashMap>, + ) -> Self { + let signers_to_senders = senders_to_signers + .iter() + .flat_map(|(sender, signers)| { + signers + .iter() + .map(move |signer| (*signer, *sender)) + .collect::>() + }) + .collect(); + + Self { + balances, + signers_to_senders, + senders_to_signers, + } + } +} + pub fn escrow_accounts( escrow_subgraph: &'static SubgraphClient, indexer_address: Address, interval: Duration, -) -> Eventual> { + reject_thawing_signers: bool, +) -> Eventual { // Types for deserializing the network subgraph response #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct EscrowAccountsResponse { escrow_accounts: Vec, } - // These 2 structs are used to deserialize the response from the escrow subgraph. // Note that U256's serde implementation is based on serializing the internal bytes, not the string decimal // representation. This is why we deserialize them as strings below. #[derive(Deserialize)] @@ -38,50 +68,109 @@ pub fn escrow_accounts( #[serde(rename_all = "camelCase")] struct Sender { id: Address, + authorized_signers: Vec, + } + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct AuthorizedSigner { + id: Address, } + // thawEndTimestamp == 0 means that the signer is not thawing. This also means + // that we don't wait for the thawing period to end before stopping serving + // queries for this signer. + // isAuthorized == true means that the signer is still authorized to sign + // payments in the name of the sender. + let query_no_thawing_signers = r#" + query ($indexer: ID!) { + escrowAccounts(where: {receiver_: {id: $indexer}}) { + balance + totalAmountThawing + sender { + id + authorizedSigners( + where: {thawEndTimestamp: "0", isAuthorized: true} + ) { + id + } + } + } + } + "#; + + let query_with_thawing_signers = r#" + query ($indexer: ID!) { + escrowAccounts(where: {receiver_: {id: $indexer}}) { + balance + totalAmountThawing + sender { + id + authorizedSigners( + where: {isAuthorized: true} + ) { + id + } + } + } + } + "#; + + let query = if reject_thawing_signers { + query_no_thawing_signers + } else { + query_with_thawing_signers + }; + timer(interval).map_with_retry( move |_| async move { let response = escrow_subgraph .query::(Query::new_with_variables( - r#" - query ($indexer: ID!) { - escrowAccounts(where: {receiver_: {id: $indexer}}) { - balance - totalAmountThawing - sender { - id - } - } - } - "#, + query, [("indexer", format!("{:x?}", indexer_address).into())], )) .await .map_err(|e| e.to_string())?; - response.map_err(|e| e.to_string()).and_then(|data| { - data.escrow_accounts - .iter() - .map(|account| { - let balance = U256::checked_sub( - U256::from_dec_str(&account.balance)?, - U256::from_dec_str(&account.total_amount_thawing)?, - ) - .unwrap_or_else(|| { - warn!( - "Balance minus total amount thawing underflowed for account {}. \ + let response = response.map_err(|e| e.to_string())?; + + let balances = response + .escrow_accounts + .iter() + .map(|account| { + let balance = U256::checked_sub( + U256::from_dec_str(&account.balance)?, + U256::from_dec_str(&account.total_amount_thawing)?, + ) + .unwrap_or_else(|| { + warn!( + "Balance minus total amount thawing underflowed for account {}. \ Setting balance to 0, no queries will be served for this sender.", - account.sender.id - ); - U256::from(0) - }); - - Ok((account.sender.id, balance)) - }) - .collect::, anyhow::Error>>() - .map_err(|e| format!("{}", e)) - }) + account.sender.id + ); + U256::from(0) + }); + + Ok((account.sender.id, balance)) + }) + .collect::, anyhow::Error>>() + .map_err(|e| format!("{}", e))?; + + let senders_to_signers = response + .escrow_accounts + .iter() + .map(|account| { + let sender = account.sender.id; + let signers = account + .sender + .authorized_signers + .iter() + .map(|signer| signer.id) + .collect(); + (sender, signers) + }) + .collect(); + + Ok(EscrowAccounts::new(balances, senders_to_signers)) }, move |err: String| { error!( @@ -96,6 +185,7 @@ pub fn escrow_accounts( #[cfg(test)] mod tests { + use test_log::test; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; @@ -104,7 +194,20 @@ mod tests { use super::*; - #[tokio::test] + #[test] + fn test_new_escrow_accounts() { + let escrow_accounts = EscrowAccounts::new( + test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(), + test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), + ); + + assert_eq!( + escrow_accounts.signers_to_senders, + test_vectors::ESCROW_ACCOUNTS_SIGNERS_TO_SENDERS.to_owned() + ) + } + + #[test(tokio::test)] async fn test_current_accounts() { // Set up a mock escrow subgraph let mock_server = MockServer::start().await; @@ -134,11 +237,15 @@ mod tests { escrow_subgraph, *test_vectors::INDEXER_ADDRESS, Duration::from_secs(60), + true, ); assert_eq!( accounts.value().await.unwrap(), - *test_vectors::ESCROW_ACCOUNTS + EscrowAccounts::new( + test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(), + test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), + ) ); } } diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 37b001834..671a74c99 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -262,6 +262,7 @@ impl IndexerService { escrow_subgraph, options.config.indexer.indexer_address, Duration::from_secs(options.config.escrow_subgraph.syncing_interval), + true, // Reject thawing signers eagerly ); // Establish Database connection necessary for serving indexer management diff --git a/common/src/tap_manager.rs b/common/src/tap_manager.rs index 42559b5a6..07cb60b8e 100644 --- a/common/src/tap_manager.rs +++ b/common/src/tap_manager.rs @@ -11,12 +11,12 @@ use std::{collections::HashMap, str::FromStr, sync::Arc}; use tap_core::tap_manager::SignedReceipt; use tracing::error; -use crate::prelude::Allocation; +use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation}; #[derive(Clone)] pub struct TapManager { indexer_allocations: Eventual>, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, pgpool: PgPool, domain_separator: Arc, } @@ -25,7 +25,7 @@ impl TapManager { pub fn new( pgpool: PgPool, indexer_allocations: Eventual>, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, domain_separator: Eip712Domain, ) -> Self { Self { @@ -67,11 +67,21 @@ impl TapManager { anyhow!(e) })?; - if !self - .escrow_accounts - .value_immediate() - .unwrap_or_default() + let escrow_accounts = self.escrow_accounts.value_immediate().unwrap_or_default(); + + let receipt_sender = escrow_accounts + .signers_to_senders .get(&receipt_signer) + .ok_or_else(|| { + anyhow!( + "Receipt signer `{}` is not eligible for this indexer", + receipt_signer + ) + })?; + + if !escrow_accounts + .balances + .get(receipt_sender) .map_or(false, |balance| balance > &U256::zero()) { return Err(anyhow!( @@ -83,7 +93,7 @@ impl TapManager { // TODO: consider doing this in another async task to avoid slowing down the paid query flow. sqlx::query!( r#" - INSERT INTO scalar_tap_receipts (allocation_id, sender_address, timestamp_ns, value, receipt) + INSERT INTO scalar_tap_receipts (allocation_id, signer_address, timestamp_ns, value, receipt) VALUES ($1, $2, $3, $4, $5) "#, format!("{:?}", allocation_id) @@ -117,7 +127,7 @@ mod test { use keccak_hash::H256; use sqlx::postgres::PgListener; - use crate::test_vectors::{self, create_signed_receipt, TAP_SENDER}; + use crate::test_vectors::{self, create_signed_receipt}; use super::*; @@ -159,8 +169,10 @@ mod test { )); // Mock escrow accounts - let escrow_accounts = - Eventual::from_value(HashMap::from_iter(vec![(TAP_SENDER.1, U256::from(123))])); + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(), + test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), + )); let tap_manager = TapManager::new( pgpool.clone(), diff --git a/common/src/test_vectors.rs b/common/src/test_vectors.rs index c6b7eaaf3..9608349a7 100644 --- a/common/src/test_vectors.rs +++ b/common/src/test_vectors.rs @@ -102,14 +102,35 @@ pub const ESCROW_QUERY_RESPONSE: &str = r#" "balance": "34", "totalAmountThawing": "10", "sender": { - "id": "0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1" + "id": "0x9858EfFD232B4033E47d90003D41EC34EcaEda94", + "authorizedSigners": [ + { + "id": "0x533661F0fb14d2E8B26223C86a610Dd7D2260892" + }, + { + "id": "0x2740f6fA9188cF53ffB6729DDD21575721dE92ce" + } + ] } }, { "balance": "42", "totalAmountThawing": "0", "sender": { - "id": "0x22d491bde2303f2f43325b2108d26f1eaba1e32b" + "id": "0x22d491bde2303f2f43325b2108d26f1eaba1e32b", + "authorizedSigners": [ + { + "id": "0x245059163ff6ee14279aa7b35ea8f0fdb967df6e" + } + ] + } + }, + { + "balance": "2987", + "totalAmountThawing": "12", + "sender": { + "id": "0x192c3B6e0184Fa0Cc5B9D2bDDEb6B79Fb216a002", + "authorizedSigners": [] } } ] @@ -240,12 +261,48 @@ lazy_static! { ), ]); - pub static ref ESCROW_ACCOUNTS: HashMap = HashMap::from([ - (Address::from_str("0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1").unwrap(), U256::from(24)), + pub static ref ESCROW_ACCOUNTS_BALANCES: HashMap = HashMap::from([ + (Address::from_str("0x9858EfFD232B4033E47d90003D41EC34EcaEda94").unwrap(), U256::from(24)), // TAP_SENDER (Address::from_str("0x22d491bde2303f2f43325b2108d26f1eaba1e32b").unwrap(), U256::from(42)), + (Address::from_str("0x192c3B6e0184Fa0Cc5B9D2bDDEb6B79Fb216a002").unwrap(), U256::from(2975)), + ]); + + /// Maps signers back to their senders + pub static ref ESCROW_ACCOUNTS_SIGNERS_TO_SENDERS: HashMap = HashMap::from([ + ( + Address::from_str("0x533661F0fb14d2E8B26223C86a610Dd7D2260892").unwrap(), // TAP_SIGNER + Address::from_str("0x9858EfFD232B4033E47d90003D41EC34EcaEda94").unwrap(), // TAP_SENDER + ), + ( + Address::from_str("0x2740f6fA9188cF53ffB6729DDD21575721dE92ce").unwrap(), + Address::from_str("0x9858EfFD232B4033E47d90003D41EC34EcaEda94").unwrap(), // TAP_SENDER + ), + ( + Address::from_str("0x245059163ff6ee14279aa7b35ea8f0fdb967df6e").unwrap(), + Address::from_str("0x22d491bde2303f2f43325b2108d26f1eaba1e32b").unwrap(), + ), + ]); + + pub static ref ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS: HashMap> = HashMap::from([ + ( + Address::from_str("0x9858EfFD232B4033E47d90003D41EC34EcaEda94").unwrap(), // TAP_SENDER + vec![ + Address::from_str("0x533661F0fb14d2E8B26223C86a610Dd7D2260892").unwrap(), // TAP_SIGNER + Address::from_str("0x2740f6fA9188cF53ffB6729DDD21575721dE92ce").unwrap(), + ], + ), + ( + Address::from_str("0x22d491bde2303f2f43325b2108d26f1eaba1e32b").unwrap(), + vec![Address::from_str("0x245059163ff6ee14279aa7b35ea8f0fdb967df6e").unwrap()], + ), + ( + Address::from_str("0x192c3B6e0184Fa0Cc5B9D2bDDEb6B79Fb216a002").unwrap(), + vec![], + ), ]); - /// Fixture to generate a wallet and address + /// Fixture to generate a wallet and address. + /// Address: 0x9858EfFD232B4033E47d90003D41EC34EcaEda94 pub static ref TAP_SENDER: (LocalWallet, Address) = { let wallet: LocalWallet = MnemonicBuilder::::default() .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") @@ -256,6 +313,18 @@ lazy_static! { (wallet, Address::from_slice(address.as_bytes())) }; + /// Fixture to generate a wallet and address. + /// Address: 0x533661F0fb14d2E8B26223C86a610Dd7D2260892 + pub static ref TAP_SIGNER: (LocalWallet, Address) = { + let wallet: LocalWallet = MnemonicBuilder::::default() + .phrase("rude pipe parade travel organ vendor card festival magnet novel forget refuse keep draft tool") + .build() + .unwrap(); + let address = wallet.address(); + + (wallet, Address::from_slice(address.as_bytes())) + }; + pub static ref TAP_EIP712_DOMAIN: Eip712Domain = eip712_domain! { name: "TAP", version: "1", @@ -264,14 +333,14 @@ lazy_static! { }; } -/// Function to generate a signed receipt using the TAP_SENDER wallet. +/// Function to generate a signed receipt using the TAP_SIGNER wallet. pub async fn create_signed_receipt( allocation_id: Address, nonce: u64, timestamp_ns: u64, value: u128, ) -> SignedReceipt { - let (wallet, _) = &*self::TAP_SENDER; + let (wallet, _) = &*self::TAP_SIGNER; EIP712SignedMessage::new( &self::TAP_EIP712_DOMAIN, diff --git a/migrations/20230912220523_tap_receipts.up.sql b/migrations/20230912220523_tap_receipts.up.sql index a95a4a096..1f303901d 100644 --- a/migrations/20230912220523_tap_receipts.up.sql +++ b/migrations/20230912220523_tap_receipts.up.sql @@ -1,7 +1,7 @@ CREATE TABLE IF NOT EXISTS scalar_tap_receipts ( id BIGSERIAL PRIMARY KEY, -- id being SERIAL is important for the function of tap-agent allocation_id CHAR(40) NOT NULL, - sender_address CHAR(40) NOT NULL, + signer_address CHAR(40) NOT NULL, timestamp_ns NUMERIC(20) NOT NULL, -- signature CHAR(130) NOT NULL, value NUMERIC(39) NOT NULL, @@ -12,7 +12,7 @@ CREATE FUNCTION scalar_tap_receipt_notify() RETURNS trigger AS $$ BEGIN - PERFORM pg_notify('scalar_tap_receipt_notification', format('{"id": %s, "allocation_id": "%s", "sender_address": "%s", "timestamp_ns": %s, "value": %s}', NEW.id, NEW.allocation_id, NEW.sender_address, NEW.timestamp_ns, NEW.value)); + PERFORM pg_notify('scalar_tap_receipt_notification', format('{"id": %s, "allocation_id": "%s", "signer_address": "%s", "timestamp_ns": %s, "value": %s}', NEW.id, NEW.allocation_id, NEW.signer_address, NEW.timestamp_ns, NEW.value)); RETURN NEW; END; $$ LANGUAGE 'plpgsql'; @@ -29,7 +29,7 @@ CREATE INDEX IF NOT EXISTS scalar_tap_receipts_timestamp_ns_idx ON scalar_tap_re CREATE TABLE IF NOT EXISTS scalar_tap_receipts_invalid ( id BIGSERIAL PRIMARY KEY, allocation_id CHAR(40) NOT NULL, - sender_address CHAR(40) NOT NULL, + signer_address CHAR(40) NOT NULL, timestamp_ns NUMERIC(20) NOT NULL, value NUMERIC(39) NOT NULL, received_receipt JSON NOT NULL diff --git a/tap-agent/src/agent.rs b/tap-agent/src/agent.rs index d1ecde26b..1db7a90b7 100644 --- a/tap-agent/src/agent.rs +++ b/tap-agent/src/agent.rs @@ -65,6 +65,7 @@ pub async fn start_agent(config: &'static config::Cli) -> SenderAllocationRelati escrow_subgraph, config.ethereum.indexer_address, Duration::from_millis(config.escrow_subgraph.escrow_syncing_interval_ms), + false, ); // TODO: replace with a proper implementation once the gateway registry contract is ready diff --git a/tap-agent/src/tap/escrow_adapter.rs b/tap-agent/src/tap/escrow_adapter.rs index b5957145d..afd9b8288 100644 --- a/tap-agent/src/tap/escrow_adapter.rs +++ b/tap-agent/src/tap/escrow_adapter.rs @@ -5,8 +5,8 @@ use std::{collections::HashMap, sync::Arc}; use alloy_primitives::Address; use async_trait::async_trait; -use ethereum_types::U256; use eventuals::Eventual; +use indexer_common::escrow_accounts::EscrowAccounts; use tap_core::adapters::escrow_adapter::EscrowAdapter as EscrowAdapterTrait; use thiserror::Error; use tokio::sync::RwLock; @@ -21,7 +21,7 @@ use tokio::sync::RwLock; /// receipt checks only when we need to send a RAV request. #[derive(Clone)] pub struct EscrowAdapter { - escrow_accounts: Eventual>, + escrow_accounts: Eventual, sender_pending_fees: Arc>>, } @@ -32,7 +32,7 @@ pub enum AdapterError { } impl EscrowAdapter { - pub fn new(escrow_accounts: Eventual>) -> Self { + pub fn new(escrow_accounts: Eventual) -> Self { Self { escrow_accounts, sender_pending_fees: Arc::new(RwLock::new(HashMap::new())), @@ -45,14 +45,29 @@ impl EscrowAdapterTrait for EscrowAdapter { type AdapterError = AdapterError; async fn get_available_escrow(&self, sender: Address) -> Result { - let balance = self - .escrow_accounts - .value() - .await - .map_err(|e| AdapterError::AdapterError { - error: format!("Could not get escrow balance from eventual: {:?}.", e), - })? - .get(&sender) + let escrow_accounts = + self.escrow_accounts + .value() + .await + .map_err(|e| AdapterError::AdapterError { + error: format!("Could not get escrow accounts from eventual: {:?}.", e), + })?; + + let sender = + escrow_accounts + .signers_to_senders + .get(&sender) + .ok_or(AdapterError::AdapterError { + error: format!( + "Sender {} not found for receipt signer, could not get available escrow.", + sender + ) + .to_string(), + })?; + + let balance = escrow_accounts + .balances + .get(sender) .ok_or(AdapterError::AdapterError { error: format!( "Sender {} not found in escrow balances map, could not get available escrow.", @@ -74,16 +89,37 @@ impl EscrowAdapterTrait for EscrowAdapter { .sender_pending_fees .read() .await - .get(&sender) + .get(sender) .copied() .unwrap_or(0); Ok(balance - fees) } async fn subtract_escrow(&self, sender: Address, value: u128) -> Result<(), AdapterError> { + let escrow_accounts = + self.escrow_accounts + .value() + .await + .map_err(|e| AdapterError::AdapterError { + error: format!("Could not get escrow accounts from eventual: {:?}.", e), + })?; + let current_available_escrow = self.get_available_escrow(sender).await?; + + let sender = + escrow_accounts + .signers_to_senders + .get(&sender) + .ok_or(AdapterError::AdapterError { + error: format!( + "Sender {} not found for receipt signer, could not get available escrow.", + sender + ) + .to_string(), + })?; + let mut fees_write = self.sender_pending_fees.write().await; - let fees = fees_write.entry(sender).or_insert(0); + let fees = fees_write.entry(sender.to_owned()).or_insert(0); if current_available_escrow < value { return Err(AdapterError::AdapterError { error: format!( @@ -100,29 +136,32 @@ impl EscrowAdapterTrait for EscrowAdapter { #[cfg(test)] mod test { - use super::*; - use ethereum_types::U256; + use std::vec; + + use crate::tap::test_utils::{SENDER, SIGNER}; - use std::str::FromStr; + use super::*; #[tokio::test] async fn test_subtract_escrow() { - let sender = Address::from_str("0xdeadbeefcafebabedeadbeefcafebabadeadbeef").unwrap(); - let escrow_accounts: Eventual> = - Eventual::from_value(HashMap::from([(sender, U256::from(1000))])); + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + let sender_pending_fees = Arc::new(RwLock::new(HashMap::new())); - sender_pending_fees.write().await.insert(sender, 500); + sender_pending_fees.write().await.insert(SENDER.1, 500); let adapter = EscrowAdapter { escrow_accounts, sender_pending_fees, }; adapter - .subtract_escrow(sender, 500) + .subtract_escrow(SIGNER.1, 500) .await .expect("Subtract escrow."); let available_escrow = adapter - .get_available_escrow(sender) + .get_available_escrow(SIGNER.1) .await .expect("Get available escrow."); assert_eq!(available_escrow, 0); @@ -130,23 +169,25 @@ mod test { #[tokio::test] async fn test_subtract_escrow_overflow() { - let sender = Address::from_str("0xdeadbeefcafebabedeadbeefcafebabadeadbeef").unwrap(); - let escrow_accounts: Eventual> = - Eventual::from_value(HashMap::from([(sender, U256::from(1000))])); + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + let sender_pending_fees = Arc::new(RwLock::new(HashMap::new())); - sender_pending_fees.write().await.insert(sender, 500); + sender_pending_fees.write().await.insert(SENDER.1, 500); let adapter = EscrowAdapter { escrow_accounts, sender_pending_fees, }; adapter - .subtract_escrow(sender, 250) + .subtract_escrow(SIGNER.1, 250) .await .expect("Subtract escrow."); - assert!(adapter.subtract_escrow(sender, 251).await.is_err()); + assert!(adapter.subtract_escrow(SIGNER.1, 251).await.is_err()); let available_escrow = adapter - .get_available_escrow(sender) + .get_available_escrow(SIGNER.1) .await .expect("Get available escrow."); assert_eq!(available_escrow, 250); diff --git a/tap-agent/src/tap/mod.rs b/tap-agent/src/tap/mod.rs index 07e01cb15..f40e5a0d8 100644 --- a/tap-agent/src/tap/mod.rs +++ b/tap-agent/src/tap/mod.rs @@ -1,6 +1,11 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use alloy_primitives::Address; +use anyhow::anyhow; +use eventuals::Eventual; +use indexer_common::escrow_accounts::EscrowAccounts; + mod escrow_adapter; mod rav_storage_adapter; mod receipt_checks_adapter; @@ -10,3 +15,21 @@ pub mod sender_allocation_relationships_manager; #[cfg(test)] pub mod test_utils; + +async fn signers_trimmed( + escrow_accounts: &Eventual, + sender: Address, +) -> Result, anyhow::Error> { + let signers = escrow_accounts + .value() + .await + .map_err(|e| anyhow!("Error while getting escrow accounts: {:?}", e))? + .senders_to_signers + .get(&sender) + .ok_or(anyhow!("No signers found for sender {}.", sender))? + .iter() + .map(|s| s.to_string().trim_start_matches("0x").to_owned()) + .collect::>(); + + Ok(signers) +} diff --git a/tap-agent/src/tap/rav_storage_adapter.rs b/tap-agent/src/tap/rav_storage_adapter.rs index 3db3312c0..9e0b0a942 100644 --- a/tap-agent/src/tap/rav_storage_adapter.rs +++ b/tap-agent/src/tap/rav_storage_adapter.rs @@ -96,7 +96,7 @@ impl RAVStorageAdapter { #[cfg(test)] mod test { use super::*; - use crate::tap::test_utils::{create_rav, ALLOCATION_ID, SENDER}; + use crate::tap::test_utils::{create_rav, ALLOCATION_ID, SENDER, SIGNER}; use tap_core::adapters::rav_storage_adapter::RAVStorageAdapter as RAVStorageAdapterTrait; #[sqlx::test(migrations = "../migrations")] @@ -108,7 +108,7 @@ mod test { // Insert a rav let mut new_rav = create_rav( *ALLOCATION_ID, - SENDER.0.clone(), + SIGNER.0.clone(), timestamp_ns, value_aggregate, ) @@ -127,7 +127,7 @@ mod test { for i in 0..3 { new_rav = create_rav( *ALLOCATION_ID, - SENDER.0.clone(), + SIGNER.0.clone(), timestamp_ns + i, value_aggregate - (i as u128), ) diff --git a/tap-agent/src/tap/receipt_checks_adapter.rs b/tap-agent/src/tap/receipt_checks_adapter.rs index e0f9c406a..105637743 100644 --- a/tap-agent/src/tap/receipt_checks_adapter.rs +++ b/tap-agent/src/tap/receipt_checks_adapter.rs @@ -7,6 +7,7 @@ use alloy_primitives::Address; use async_trait::async_trait; use ethereum_types::U256; use eventuals::{timer, Eventual, EventualExt}; +use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::subgraph_client::{Query, SubgraphClient}; use sqlx::PgPool; use tap_core::adapters::receipt_checks_adapter::ReceiptChecksAdapter as ReceiptChecksAdapterTrait; @@ -20,7 +21,7 @@ use crate::config; pub struct ReceiptChecksAdapter { query_appraisals: Option>>>, allocation_id: Address, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, tap_allocation_redeemed: Eventual, } @@ -30,7 +31,7 @@ impl ReceiptChecksAdapter { _pgpool: PgPool, query_appraisals: Option>>>, allocation_id: Address, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, sender_id: Address, ) -> Self { @@ -125,8 +126,11 @@ impl ReceiptChecksAdapterTrait for ReceiptChecksAdapter { })?; Ok(escrow_accounts + .signers_to_senders .get(&sender_id) - .map_or(false, |balance| *balance > U256::from(0))) + .and_then(|sender| escrow_accounts.balances.get(sender)) + .map(|balance| *balance > U256::from(0)) + .unwrap_or(false)) } } diff --git a/tap-agent/src/tap/receipt_storage_adapter.rs b/tap-agent/src/tap/receipt_storage_adapter.rs index 2b144a038..f54a03f93 100644 --- a/tap-agent/src/tap/receipt_storage_adapter.rs +++ b/tap-agent/src/tap/receipt_storage_adapter.rs @@ -8,7 +8,8 @@ use std::{ use alloy_primitives::Address; use async_trait::async_trait; - +use eventuals::Eventual; +use indexer_common::escrow_accounts::EscrowAccounts; use sqlx::{postgres::types::PgRange, types::BigDecimal, PgPool}; use tap_core::adapters::receipt_storage_adapter::ReceiptStorageAdapter as ReceiptStorageAdapterTrait; use tap_core::{ @@ -18,12 +19,14 @@ use tap_core::{ use thiserror::Error; use tracing::error; -#[derive(Debug)] +use crate::tap::signers_trimmed; + pub struct ReceiptStorageAdapter { pgpool: PgPool, allocation_id: Address, sender: Address, required_checks: Vec, + escrow_accounts: Eventual, } #[derive(Debug, Error)] @@ -100,17 +103,24 @@ impl ReceiptStorageAdapterTrait for ReceiptStorageAdapter { // TODO: Make use of this limit in this function _receipts_limit: Option, ) -> Result, Self::AdapterError> { + let signers = signers_trimmed(&self.escrow_accounts, self.sender) + .await + .map_err(|e| AdapterError::AdapterError { + error: format!("{:?}.", e), + })?; + let records = sqlx::query!( r#" SELECT id, receipt FROM scalar_tap_receipts - WHERE allocation_id = $1 AND sender_address = $2 AND $3::numrange @> timestamp_ns + WHERE allocation_id = $1 AND signer_address IN (SELECT unnest($2::text[])) + AND $3::numrange @> timestamp_ns "#, self.allocation_id .to_string() .trim_start_matches("0x") .to_owned(), - self.sender.to_string().trim_start_matches("0x").to_owned(), + &signers, rangebounds_to_pgrange(timestamp_range_ns) ) .fetch_all(&self.pgpool) @@ -140,16 +150,23 @@ impl ReceiptStorageAdapterTrait for ReceiptStorageAdapter { &self, timestamp_ns: R, ) -> Result<(), Self::AdapterError> { + let signers = signers_trimmed(&self.escrow_accounts, self.sender) + .await + .map_err(|e| AdapterError::AdapterError { + error: format!("{:?}.", e), + })?; + sqlx::query!( r#" DELETE FROM scalar_tap_receipts - WHERE allocation_id = $1 AND sender_address = $2 AND $3::numrange @> timestamp_ns + WHERE allocation_id = $1 AND signer_address IN (SELECT unnest($2::text[])) + AND $3::numrange @> timestamp_ns "#, self.allocation_id .to_string() .trim_start_matches("0x") .to_owned(), - self.sender.to_string().trim_start_matches("0x").to_owned(), + &signers, rangebounds_to_pgrange(timestamp_ns) ) .execute(&self.pgpool) @@ -164,24 +181,29 @@ impl ReceiptStorageAdapter { allocation_id: Address, sender: Address, required_checks: Vec, + escrow_accounts: Eventual, ) -> Self { Self { pgpool, allocation_id, sender, required_checks, + escrow_accounts, } } } #[cfg(test)] mod test { + use std::collections::HashMap; + use super::*; use crate::tap::test_utils::{ create_received_receipt, store_receipt, ALLOCATION_ID, ALLOCATION_ID_IRRELEVANT, SENDER, - SENDER_IRRELEVANT, TAP_EIP712_DOMAIN_SEPARATOR, + SENDER_IRRELEVANT, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, }; use anyhow::Result; + use serde_json::Value; use sqlx::PgPool; use tap_core::tap_receipt::get_full_list_of_checks; @@ -191,9 +213,17 @@ mod test { /// retrieve_receipts_in_timestamp_range. async fn retrieve_range_and_check + Send>( storage_adapter: &ReceiptStorageAdapter, + escrow_accounts: &Eventual, received_receipt_vec: &[(u64, ReceivedReceipt)], range: R, ) -> Result<()> { + let signers_to_senders = escrow_accounts + .value() + .await + .unwrap() + .signers_to_senders + .to_owned(); + // Filtering the received receipts by timestamp range let received_receipt_vec: Vec<(u64, ReceivedReceipt)> = received_receipt_vec .iter() @@ -201,11 +231,14 @@ mod test { range.contains(&received_receipt.signed_receipt().message.timestamp_ns) && (received_receipt.signed_receipt().message.allocation_id == storage_adapter.allocation_id) - && (received_receipt - .signed_receipt() - .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) - .unwrap() - == storage_adapter.sender) + && (signers_to_senders + .get( + &received_receipt + .signed_receipt() + .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) + .unwrap(), + ) + .map_or(false, |v| *v == storage_adapter.sender)) }) .cloned() .collect(); @@ -243,9 +276,17 @@ mod test { async fn remove_range_and_check + Send>( storage_adapter: &ReceiptStorageAdapter, + escrow_accounts: &Eventual, received_receipt_vec: &[ReceivedReceipt], range: R, ) -> Result<()> { + let signers_to_senders = escrow_accounts + .value() + .await + .unwrap() + .signers_to_senders + .to_owned(); + // Storing the receipts let mut received_receipt_id_vec = Vec::new(); for received_receipt in received_receipt_vec.iter() { @@ -269,11 +310,14 @@ mod test { .filter(|(_, received_receipt)| { if (received_receipt.signed_receipt().message.allocation_id == storage_adapter.allocation_id) - && (received_receipt - .signed_receipt() - .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) - .unwrap() - == storage_adapter.sender) + && (signers_to_senders + .get( + &received_receipt + .signed_receipt() + .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) + .unwrap(), + ) + .map_or(false, |v| *v == storage_adapter.sender)) { !range.contains(&received_receipt.signed_receipt().message.timestamp_ns) } else { @@ -345,11 +389,17 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn retrieve_receipts_in_timestamp_range(pgpool: PgPool) { + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + let storage_adapter = ReceiptStorageAdapter::new( pgpool.clone(), *ALLOCATION_ID, SENDER.1, get_full_list_of_checks(), + escrow_accounts.clone(), ); // Creating 10 receipts with timestamps 42 to 51 @@ -358,7 +408,7 @@ mod test { received_receipt_vec.push( create_received_receipt( &ALLOCATION_ID, - &SENDER.0, + &SIGNER.0, i + 684, i + 42, (i + 124).into(), @@ -371,7 +421,7 @@ mod test { received_receipt_vec.push( create_received_receipt( &ALLOCATION_ID_IRRELEVANT, - &SENDER.0, + &SIGNER.0, i + 684, i + 42, (i + 124).into(), @@ -413,7 +463,7 @@ mod test { { $( assert!( - retrieve_range_and_check(&storage_adapter, &received_receipt_vec, $arg) + retrieve_range_and_check(&storage_adapter, &escrow_accounts, &received_receipt_vec, $arg) .await .is_ok()); )+ @@ -483,8 +533,18 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn remove_receipts_in_timestamp_range(pgpool: PgPool) { - let storage_adapter = - ReceiptStorageAdapter::new(pgpool, *ALLOCATION_ID, SENDER.1, get_full_list_of_checks()); + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + + let storage_adapter = ReceiptStorageAdapter::new( + pgpool, + *ALLOCATION_ID, + SENDER.1, + get_full_list_of_checks(), + escrow_accounts.clone(), + ); // Creating 10 receipts with timestamps 42 to 51 let mut received_receipt_vec = Vec::new(); @@ -492,7 +552,7 @@ mod test { received_receipt_vec.push( create_received_receipt( &ALLOCATION_ID, - &SENDER.0, + &SIGNER.0, i + 684, i + 42, (i + 124).into(), @@ -505,7 +565,7 @@ mod test { received_receipt_vec.push( create_received_receipt( &ALLOCATION_ID_IRRELEVANT, - &SENDER.0, + &SIGNER.0, i + 684, i + 42, (i + 124).into(), @@ -531,7 +591,7 @@ mod test { { $( assert!( - remove_range_and_check(&storage_adapter, &received_receipt_vec, $arg) + remove_range_and_check(&storage_adapter, &escrow_accounts, &received_receipt_vec, $arg) .await.is_ok() ); ) + diff --git a/tap-agent/src/tap/sender_allocation_relationship.rs b/tap-agent/src/tap/sender_allocation_relationship.rs index a253167c6..64db68a2e 100644 --- a/tap-agent/src/tap/sender_allocation_relationship.rs +++ b/tap-agent/src/tap/sender_allocation_relationship.rs @@ -1,14 +1,14 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; +use std::{str::FromStr, sync::Arc, time::Duration}; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use anyhow::{anyhow, ensure}; -use ethereum_types::U256; + use eventuals::Eventual; -use indexer_common::prelude::SubgraphClient; +use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; use sqlx::{types::BigDecimal, PgPool}; use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; @@ -30,7 +30,7 @@ use crate::{ tap::{ escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, receipt_checks_adapter::ReceiptChecksAdapter, - receipt_storage_adapter::ReceiptStorageAdapter, + receipt_storage_adapter::ReceiptStorageAdapter, signers_trimmed, }, }; @@ -66,6 +66,7 @@ struct Inner { unaggregated_fees: Arc>, state: Arc>, config: &'static config::Cli, + escrow_accounts: Eventual, } /// A SenderAllocationRelationship is the relationship between the indexer and the sender in the @@ -88,7 +89,7 @@ impl SenderAllocationRelationship { pgpool: PgPool, allocation_id: Address, sender: Address, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, escrow_adapter: EscrowAdapter, tap_eip712_domain_separator: Eip712Domain, @@ -118,6 +119,7 @@ impl SenderAllocationRelationship { allocation_id, sender, required_checks.clone(), + escrow_accounts.clone(), ); let rav_storage_adapter = RAVStorageAdapter::new(pgpool.clone(), allocation_id, sender); let tap_manager = TapManager::new( @@ -139,6 +141,7 @@ impl SenderAllocationRelationship { unaggregated_fees: Arc::new(Mutex::new(UnaggregatedFees::default())), state: Arc::new(Mutex::new(State::Running)), config, + escrow_accounts, }), rav_requester_task: Arc::new(Mutex::new(None)), } @@ -175,7 +178,7 @@ impl SenderAllocationRelationship { new_receipt_notification.value, unaggregated_fees.value, new_receipt_notification.allocation_id, - new_receipt_notification.sender_address + self.inner.sender ); u128::MAX }); @@ -210,6 +213,8 @@ impl SenderAllocationRelationship { async fn update_unaggregated_fees_static(inner: &Inner) -> Result<(), anyhow::Error> { inner.tap_manager.remove_obsolete_receipts().await?; + let signers = signers_trimmed(&inner.escrow_accounts, inner.sender).await?; + // TODO: Get `rav.timestamp_ns` from the TAP Manager's RAV storage adapter instead? let res = sqlx::query!( r#" @@ -229,7 +234,7 @@ impl SenderAllocationRelationship { scalar_tap_receipts WHERE allocation_id = $1 - AND sender_address = $2 + AND signer_address IN (SELECT unnest($3::text[])) AND CASE WHEN ( SELECT timestamp_ns :: NUMERIC @@ -247,7 +252,8 @@ impl SenderAllocationRelationship { .to_string() .trim_start_matches("0x") .to_owned(), - inner.sender.to_string().trim_start_matches("0x").to_owned() + inner.sender.to_string().trim_start_matches("0x").to_owned(), + &signers ) .fetch_one(&inner.pgpool) .await?; @@ -447,7 +453,7 @@ impl SenderAllocationRelationship { r#" INSERT INTO scalar_tap_receipts_invalid ( allocation_id, - sender_address, + signer_address, timestamp_ns, value, received_receipt @@ -525,6 +531,8 @@ impl Drop for SenderAllocationRelationship { #[cfg(test)] mod tests { + use std::collections::HashMap; + use indexer_common::subgraph_client::DeploymentDetails; use serde_json::json; use tap_aggregator::server::run_server; @@ -537,7 +545,7 @@ mod tests { use super::*; use crate::tap::test_utils::{ create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID, INDEXER, - SENDER, TAP_EIP712_DOMAIN_SEPARATOR, + SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, }; const DUMMY_URL: &str = "http://localhost:1234"; @@ -567,9 +575,10 @@ mod tests { DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), ))); - let (mut escrow_accounts_writer, escrow_accounts_eventual) = - Eventual::>::new(); - escrow_accounts_writer.write(HashMap::from([(SENDER.1, 1000.into())])); + let escrow_accounts_eventual = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); let escrow_adapter = EscrowAdapter::new(escrow_accounts_eventual.clone()); @@ -600,7 +609,7 @@ mod tests { // Add receipts to the database. for i in 1..10 { let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -639,13 +648,13 @@ mod tests { // Add the RAV to the database. // This RAV has timestamp 4. The sender_allocation_relatioship should only consider receipts // with a timestamp greater than 4. - let signed_rav = create_rav(*ALLOCATION_ID, SENDER.0.clone(), 4, 10).await; + let signed_rav = create_rav(*ALLOCATION_ID, SIGNER.0.clone(), 4, 10).await; store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap(); // Add receipts to the database. for i in 1..10 { let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -682,7 +691,7 @@ mod tests { let mut expected_unaggregated_fees = 0u128; for i in 10..20 { let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -710,7 +719,7 @@ mod tests { // table let new_receipt_notification = NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: 10, timestamp_ns: 19, value: 19, @@ -733,7 +742,7 @@ mod tests { // Send a new receipt notification. let new_receipt_notification = NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: 30, timestamp_ns: 20, value: 20, @@ -757,7 +766,7 @@ mod tests { // Send a new receipt notification that has a lower ID than the previous one. let new_receipt_notification = NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: 25, timestamp_ns: 19, value: 19, @@ -783,7 +792,7 @@ mod tests { // Start a TAP aggregator server. let (handle, aggregator_endpoint) = run_server( 0, - SENDER.0.clone(), + SIGNER.0.clone(), TAP_EIP712_DOMAIN_SEPARATOR.clone(), 100 * 1024, 100 * 1024, @@ -818,7 +827,7 @@ mod tests { // Add receipts to the database. for i in 0..10 { let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i + 1, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -845,7 +854,7 @@ mod tests { // Start a TAP aggregator server. let (handle, aggregator_endpoint) = run_server( 0, - SENDER.0.clone(), + SIGNER.0.clone(), TAP_EIP712_DOMAIN_SEPARATOR.clone(), 100 * 1024, 100 * 1024, @@ -887,14 +896,14 @@ mod tests { let value = (i + 10) as u128; let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i + 1, value, i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, value, i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); sender_allocation_relatioship .handle_new_receipt_notification(NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: i, timestamp_ns: i + 1, value, @@ -967,7 +976,7 @@ mod tests { let value = (i + 10) as u128; let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i + 1, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -975,7 +984,7 @@ mod tests { sender_allocation_relatioship .handle_new_receipt_notification(NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: i, timestamp_ns: i + 1, value, diff --git a/tap-agent/src/tap/sender_allocation_relationships_manager.rs b/tap-agent/src/tap/sender_allocation_relationships_manager.rs index 6832a0fbf..f9ebe44a4 100644 --- a/tap-agent/src/tap/sender_allocation_relationships_manager.rs +++ b/tap-agent/src/tap/sender_allocation_relationships_manager.rs @@ -1,14 +1,15 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashSet; use std::{collections::HashMap, str::FromStr, sync::Arc}; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use anyhow::anyhow; use anyhow::Result; -use ethereum_types::U256; use eventuals::{Eventual, EventualExt, PipeHandle}; +use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::prelude::{Allocation, SubgraphClient}; use serde::Deserialize; use sqlx::{postgres::PgListener, PgPool}; @@ -23,7 +24,7 @@ use crate::config; pub struct NewReceiptNotification { pub id: u64, pub allocation_id: Address, - pub sender_address: Address, + pub signer_address: Address, pub timestamp_ns: u64, pub value: u128, } @@ -42,7 +43,7 @@ struct Inner { sender_allocation_relationships: Arc>>, indexer_allocations: Eventual>, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, escrow_adapter: EscrowAdapter, tap_eip712_domain_separator: Eip712Domain, @@ -54,7 +55,7 @@ impl SenderAllocationRelationshipsManager { config: &'static config::Cli, pgpool: PgPool, indexer_allocations: Eventual>, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, tap_eip712_domain_separator: Eip712Domain, sender_aggregator_endpoints: HashMap, @@ -73,6 +74,12 @@ impl SenderAllocationRelationshipsManager { sender_aggregator_endpoints, }); + let escrow_accounts_snapshot = inner + .escrow_accounts + .value() + .await + .expect("Should get escrow accounts from Eventual"); + Self::update_sender_allocation_relationships( &inner, inner @@ -80,11 +87,7 @@ impl SenderAllocationRelationshipsManager { .value() .await .expect("Should get indexer allocations from Eventual"), - inner - .escrow_accounts - .value() - .await - .expect("Should get escrow accounts from Eventual"), + escrow_accounts_snapshot.balances.keys().copied().collect(), ) .await .expect("Should be able to update sender_allocation_relationships"); @@ -111,7 +114,7 @@ impl SenderAllocationRelationshipsManager { // still need to get aggregated. sqlx::query!( r#" - SELECT DISTINCT allocation_id, sender_address + SELECT DISTINCT allocation_id, signer_address FROM scalar_tap_receipts "# ) @@ -122,8 +125,13 @@ impl SenderAllocationRelationshipsManager { .for_each(|row| { let allocation_id = Address::from_str(&row.allocation_id) .expect("allocation_id should be a valid address"); - let sender = Address::from_str(&row.sender_address) - .expect("sender_address should be a valid address"); + let signer = Address::from_str(&row.signer_address) + .expect("signer_address should be a valid address"); + let sender = escrow_accounts_snapshot + .signers_to_senders + .get(&signer) + .copied() + .expect("should be able to get sender from signer"); // Only create a SenderAllocationRelationship if it doesn't exist yet. if let std::collections::hash_map::Entry::Vacant(e) = @@ -162,6 +170,7 @@ impl SenderAllocationRelationshipsManager { let new_receipts_watcher_handle = tokio::spawn(Self::new_receipts_watcher( pglistener, inner.sender_allocation_relationships.clone(), + inner.escrow_accounts.clone(), )); // Start the eligible_allocations_senders_pipe that watches for changes in eligible senders @@ -177,7 +186,7 @@ impl SenderAllocationRelationshipsManager { Self::update_sender_allocation_relationships( &inner, indexer_allocations, - escrow_accounts, + escrow_accounts.balances.keys().copied().collect(), ) .await .unwrap_or_else(|e| { @@ -203,6 +212,7 @@ impl SenderAllocationRelationshipsManager { sender_allocation_relationships: Arc< RwLock>, >, + escrow_accounts: Eventual, ) { loop { // TODO: recover from errors or shutdown the whole program? @@ -216,11 +226,31 @@ impl SenderAllocationRelationshipsManager { NewReceiptNotification", ); - if let Some(sender_allocation_relationship) = - sender_allocation_relationships.read().await.get(&( - new_receipt_notification.allocation_id, - new_receipt_notification.sender_address, - )) + let sender_address = escrow_accounts + .value() + .await + .expect("should be able to get escrow accounts") + .signers_to_senders + .get(&new_receipt_notification.signer_address) + .copied(); + + let sender_address = match sender_address { + Some(sender_address) => sender_address, + None => { + error!( + "No sender address found for receipt signer address {}. \ + This should not happen.", + new_receipt_notification.signer_address + ); + // TODO: save the receipt in the failed receipts table? + continue; + } + }; + + if let Some(sender_allocation_relationship) = sender_allocation_relationships + .read() + .await + .get(&(new_receipt_notification.allocation_id, sender_address)) { sender_allocation_relationship .handle_new_receipt_notification(new_receipt_notification) @@ -228,9 +258,9 @@ impl SenderAllocationRelationshipsManager { } else { warn!( "No sender_allocation_relationship found for allocation_id {} and \ - sender_address {} to process new receipt notification. This should not \ - happen.", - new_receipt_notification.allocation_id, new_receipt_notification.sender_address + sender_address {} to process new receipt notification. This should not \ + happen.", + new_receipt_notification.allocation_id, sender_address ); } } @@ -239,10 +269,9 @@ impl SenderAllocationRelationshipsManager { async fn update_sender_allocation_relationships( inner: &Inner, indexer_allocations: HashMap, - escrow_accounts: HashMap, + senders: HashSet
, ) -> Result<()> { let eligible_allocations: Vec
= indexer_allocations.keys().copied().collect(); - let senders: Vec
= escrow_accounts.keys().copied().collect(); let mut sender_allocation_relationships_write = inner.sender_allocation_relationships.write().await; @@ -303,6 +332,9 @@ impl Drop for SenderAllocationRelationshipsManager { #[cfg(test)] mod tests { + use std::vec; + + use ethereum_types::U256; use indexer_common::{ prelude::{AllocationStatus, SubgraphDeployment}, subgraph_client::DeploymentDetails, @@ -316,7 +348,7 @@ mod tests { use crate::tap::{ sender_allocation_relationship::State, - test_utils::{INDEXER, SENDER, TAP_EIP712_DOMAIN_SEPARATOR}, + test_utils::{INDEXER, SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR}, }; use super::*; @@ -341,8 +373,8 @@ mod tests { indexer_allocations_writer.write(HashMap::new()); let (mut escrow_accounts_writer, escrow_accounts_eventual) = - Eventual::>::new(); - escrow_accounts_writer.write(HashMap::new()); + Eventual::::new(); + escrow_accounts_writer.write(EscrowAccounts::default()); // Mock escrow subgraph. let mock_server = MockServer::start().await; @@ -405,7 +437,10 @@ mod tests { )])); // Add an escrow account to the escrow_accounts Eventual. - escrow_accounts_writer.write(HashMap::from([(SENDER.1, U256::from_str("1000").unwrap())])); + escrow_accounts_writer.write(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); // Wait for the SenderAllocationRelationship to be created. tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; @@ -419,7 +454,7 @@ mod tests { .contains_key(&(allocation_id, SENDER.1))); // Remove the escrow account from the escrow_accounts Eventual. - escrow_accounts_writer.write(HashMap::new()); + escrow_accounts_writer.write(EscrowAccounts::default()); // Wait a bit tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; diff --git a/tap-agent/src/tap/test_utils.rs b/tap-agent/src/tap/test_utils.rs index 1df900e2b..c538d17b7 100644 --- a/tap-agent/src/tap/test_utils.rs +++ b/tap-agent/src/tap/test_utils.rs @@ -22,7 +22,8 @@ lazy_static! { Address::from_str("0xbcdebcdebcdebcdebcdebcdebcdebcdebcdebcde").unwrap(); pub static ref SENDER: (LocalWallet, Address) = wallet(0); pub static ref SENDER_IRRELEVANT: (LocalWallet, Address) = wallet(1); - pub static ref INDEXER: (LocalWallet, Address) = wallet(2); + pub static ref SIGNER: (LocalWallet, Address) = wallet(2); + pub static ref INDEXER: (LocalWallet, Address) = wallet(3); pub static ref TAP_EIP712_DOMAIN_SEPARATOR: Eip712Domain = eip712_domain! { name: "TAP", version: "1", @@ -47,7 +48,7 @@ pub fn wallet(index: u32) -> (LocalWallet, Address) { /// given `query_id` and `value` pub async fn create_received_receipt( allocation_id: &Address, - sender_wallet: &LocalWallet, + signer_wallet: &LocalWallet, nonce: u64, timestamp_ns: u64, value: u128, @@ -61,7 +62,7 @@ pub async fn create_received_receipt( timestamp_ns, value, }, - sender_wallet, + signer_wallet, ) .await .unwrap(); @@ -71,7 +72,7 @@ pub async fn create_received_receipt( /// Fixture to generate a RAV using the wallet from `keys()` pub async fn create_rav( allocation_id: Address, - sender_wallet: LocalWallet, + signer_wallet: LocalWallet, timestamp_ns: u64, value_aggregate: u128, ) -> SignedRAV { @@ -82,7 +83,7 @@ pub async fn create_rav( timestamp_ns, value_aggregate, }, - &sender_wallet, + &signer_wallet, ) .await .unwrap() @@ -92,7 +93,7 @@ pub async fn store_receipt(pgpool: &PgPool, signed_receipt: SignedReceipt) -> Re let record = sqlx::query!( r#" INSERT INTO scalar_tap_receipts ( - allocation_id, sender_address, timestamp_ns, value, receipt + allocation_id, signer_address, timestamp_ns, value, receipt ) VALUES ($1, $2, $3, $4, $5) RETURNING id