diff --git a/.sqlx/query-be914423c719351af8ec785671d8446de1cc6b352f33825ebc4ae2de03da4263.json b/.sqlx/query-0245e3c8b1c93a2a4f86e2a7e684be6e47ee6452751af47df3ad6907d163fc65.json similarity index 71% rename from .sqlx/query-be914423c719351af8ec785671d8446de1cc6b352f33825ebc4ae2de03da4263.json rename to .sqlx/query-0245e3c8b1c93a2a4f86e2a7e684be6e47ee6452751af47df3ad6907d163fc65.json index 530511f1..29a15220 100644 --- a/.sqlx/query-be914423c719351af8ec785671d8446de1cc6b352f33825ebc4ae2de03da4263.json +++ b/.sqlx/query-0245e3c8b1c93a2a4f86e2a7e684be6e47ee6452751af47df3ad6907d163fc65.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO scalar_tap_receipts (allocation_id, sender_address, timestamp_ns, value, receipt)\n VALUES ($1, $2, $3, $4, $5)\n ", + "query": "\n INSERT INTO scalar_tap_receipts (allocation_id, signer_address, timestamp_ns, value, receipt)\n VALUES ($1, $2, $3, $4, $5)\n ", "describe": { "columns": [], "parameters": { @@ -14,5 +14,5 @@ }, "nullable": [] }, - "hash": "be914423c719351af8ec785671d8446de1cc6b352f33825ebc4ae2de03da4263" + "hash": "0245e3c8b1c93a2a4f86e2a7e684be6e47ee6452751af47df3ad6907d163fc65" } diff --git a/.sqlx/query-1fdec6cc247605be5d1991db42d0a64bf03831f535c6f8766f9ebea7b26d18dc.json b/.sqlx/query-0d0b4c9a450ef82c4fdd5903afc6f7f8b921e2316f1137476ed2e5265cba5e1b.json similarity index 78% rename from .sqlx/query-1fdec6cc247605be5d1991db42d0a64bf03831f535c6f8766f9ebea7b26d18dc.json rename to .sqlx/query-0d0b4c9a450ef82c4fdd5903afc6f7f8b921e2316f1137476ed2e5265cba5e1b.json index 47709ffb..7fb87cc8 100644 --- a/.sqlx/query-1fdec6cc247605be5d1991db42d0a64bf03831f535c6f8766f9ebea7b26d18dc.json +++ b/.sqlx/query-0d0b4c9a450ef82c4fdd5903afc6f7f8b921e2316f1137476ed2e5265cba5e1b.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO scalar_tap_receipts (\n allocation_id, sender_address, timestamp_ns, value, receipt\n )\n VALUES ($1, $2, $3, $4, $5)\n RETURNING id\n ", + "query": "\n INSERT INTO scalar_tap_receipts (\n allocation_id, signer_address, timestamp_ns, value, receipt\n )\n VALUES ($1, $2, $3, $4, $5)\n RETURNING id\n ", "describe": { "columns": [ { @@ -22,5 +22,5 @@ false ] }, - "hash": "1fdec6cc247605be5d1991db42d0a64bf03831f535c6f8766f9ebea7b26d18dc" + "hash": "0d0b4c9a450ef82c4fdd5903afc6f7f8b921e2316f1137476ed2e5265cba5e1b" } diff --git a/.sqlx/query-cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7.json b/.sqlx/query-21911049356b0593c99c8c71645f49fc07692a008daf16f0f5530f5bc4ed6cae.json similarity index 78% rename from .sqlx/query-cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7.json rename to .sqlx/query-21911049356b0593c99c8c71645f49fc07692a008daf16f0f5530f5bc4ed6cae.json index 83116833..dc9952a3 100644 --- a/.sqlx/query-cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7.json +++ b/.sqlx/query-21911049356b0593c99c8c71645f49fc07692a008daf16f0f5530f5bc4ed6cae.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO scalar_tap_receipts_invalid (\n allocation_id,\n sender_address,\n timestamp_ns,\n value,\n received_receipt\n )\n VALUES ($1, $2, $3, $4, $5)\n ", + "query": "\n INSERT INTO scalar_tap_receipts_invalid (\n allocation_id,\n signer_address,\n timestamp_ns,\n value,\n received_receipt\n )\n VALUES ($1, $2, $3, $4, $5)\n ", "describe": { "columns": [], "parameters": { @@ -14,5 +14,5 @@ }, "nullable": [] }, - "hash": "cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7" + "hash": "21911049356b0593c99c8c71645f49fc07692a008daf16f0f5530f5bc4ed6cae" } diff --git a/.sqlx/query-2cfcdd0b2aca57b1d0b4c54aef18b200386f4eb7cf441c43d5e9f899f408cc49.json b/.sqlx/query-2cfcdd0b2aca57b1d0b4c54aef18b200386f4eb7cf441c43d5e9f899f408cc49.json new file mode 100644 index 00000000..b256905f --- /dev/null +++ b/.sqlx/query-2cfcdd0b2aca57b1d0b4c54aef18b200386f4eb7cf441c43d5e9f899f408cc49.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND signer_address IN (SELECT unnest($2::text[]))\n AND $3::numrange @> timestamp_ns\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bpchar", + "TextArray", + "NumRange" + ] + }, + "nullable": [] + }, + "hash": "2cfcdd0b2aca57b1d0b4c54aef18b200386f4eb7cf441c43d5e9f899f408cc49" +} diff --git a/.sqlx/query-aede274abf6f8fabe510f7765fc4315bab48aeb9ddd1ed80486f22511c39c92e.json b/.sqlx/query-40d9eaa41f7e38e91e850a6d77d619ea4591595cf92516ccd36bf7a4461e53b6.json similarity index 70% rename from .sqlx/query-aede274abf6f8fabe510f7765fc4315bab48aeb9ddd1ed80486f22511c39c92e.json rename to .sqlx/query-40d9eaa41f7e38e91e850a6d77d619ea4591595cf92516ccd36bf7a4461e53b6.json index 07dfbd7b..3cf83c82 100644 --- a/.sqlx/query-aede274abf6f8fabe510f7765fc4315bab48aeb9ddd1ed80486f22511c39c92e.json +++ b/.sqlx/query-40d9eaa41f7e38e91e850a6d77d619ea4591595cf92516ccd36bf7a4461e53b6.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT id, receipt\n FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND sender_address = $2 AND $3::numrange @> timestamp_ns\n ", + "query": "\n SELECT id, receipt\n FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND signer_address IN (SELECT unnest($2::text[]))\n AND $3::numrange @> timestamp_ns\n ", "describe": { "columns": [ { @@ -17,7 +17,7 @@ "parameters": { "Left": [ "Bpchar", - "Bpchar", + "TextArray", "NumRange" ] }, @@ -26,5 +26,5 @@ false ] }, - "hash": "aede274abf6f8fabe510f7765fc4315bab48aeb9ddd1ed80486f22511c39c92e" + "hash": "40d9eaa41f7e38e91e850a6d77d619ea4591595cf92516ccd36bf7a4461e53b6" } diff --git a/.sqlx/query-6d2f5eecfd846d8f1e2db87e1a79c73af715e64ea63132d4768731b222ad672b.json b/.sqlx/query-47f757bea4815b78fca6bc9b20a1dad6228922688b46a295548ad605e81bfc49.json similarity index 59% rename from .sqlx/query-6d2f5eecfd846d8f1e2db87e1a79c73af715e64ea63132d4768731b222ad672b.json rename to .sqlx/query-47f757bea4815b78fca6bc9b20a1dad6228922688b46a295548ad605e81bfc49.json index e878a43f..caad4737 100644 --- a/.sqlx/query-6d2f5eecfd846d8f1e2db87e1a79c73af715e64ea63132d4768731b222ad672b.json +++ b/.sqlx/query-47f757bea4815b78fca6bc9b20a1dad6228922688b46a295548ad605e81bfc49.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH rav AS (\n SELECT \n rav -> 'message' ->> 'timestamp_ns' AS timestamp_ns \n FROM \n scalar_tap_ravs \n WHERE \n allocation_id = $1 \n AND sender_address = $2\n ) \n SELECT \n MAX(id), \n SUM(value) \n FROM \n scalar_tap_receipts \n WHERE \n allocation_id = $1 \n AND sender_address = $2 \n AND CASE WHEN (\n SELECT \n timestamp_ns :: NUMERIC \n FROM \n rav\n ) IS NOT NULL THEN timestamp_ns > (\n SELECT \n timestamp_ns :: NUMERIC \n FROM \n rav\n ) ELSE TRUE END\n ", + "query": "\n WITH rav AS (\n SELECT \n rav -> 'message' ->> 'timestamp_ns' AS timestamp_ns \n FROM \n scalar_tap_ravs \n WHERE \n allocation_id = $1 \n AND sender_address = $2\n ) \n SELECT \n MAX(id), \n SUM(value) \n FROM \n scalar_tap_receipts \n WHERE \n allocation_id = $1 \n AND signer_address IN (SELECT unnest($3::text[]))\n AND CASE WHEN (\n SELECT \n timestamp_ns :: NUMERIC \n FROM \n rav\n ) IS NOT NULL THEN timestamp_ns > (\n SELECT \n timestamp_ns :: NUMERIC \n FROM \n rav\n ) ELSE TRUE END\n ", "describe": { "columns": [ { @@ -17,7 +17,8 @@ "parameters": { "Left": [ "Bpchar", - "Bpchar" + "Bpchar", + "TextArray" ] }, "nullable": [ @@ -25,5 +26,5 @@ null ] }, - "hash": "6d2f5eecfd846d8f1e2db87e1a79c73af715e64ea63132d4768731b222ad672b" + "hash": "47f757bea4815b78fca6bc9b20a1dad6228922688b46a295548ad605e81bfc49" } diff --git a/.sqlx/query-f0569836ad31081ca3cf7406ef5d397ad89619d6e111741254864e3a1eaeaa7a.json b/.sqlx/query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json similarity index 59% rename from .sqlx/query-f0569836ad31081ca3cf7406ef5d397ad89619d6e111741254864e3a1eaeaa7a.json rename to .sqlx/query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json index 1aed43b9..8d1a90b1 100644 --- a/.sqlx/query-f0569836ad31081ca3cf7406ef5d397ad89619d6e111741254864e3a1eaeaa7a.json +++ b/.sqlx/query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT DISTINCT allocation_id, sender_address\n FROM scalar_tap_receipts\n ", + "query": "\n SELECT DISTINCT allocation_id, signer_address\n FROM scalar_tap_receipts\n ", "describe": { "columns": [ { @@ -10,7 +10,7 @@ }, { "ordinal": 1, - "name": "sender_address", + "name": "signer_address", "type_info": "Bpchar" } ], @@ -22,5 +22,5 @@ false ] }, - "hash": "f0569836ad31081ca3cf7406ef5d397ad89619d6e111741254864e3a1eaeaa7a" + "hash": "778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb" } diff --git a/.sqlx/query-c3e88c5a56db17eb8e1f8056c58f5c57d44af2722c379e4528596e9b041242d4.json b/.sqlx/query-c3e88c5a56db17eb8e1f8056c58f5c57d44af2722c379e4528596e9b041242d4.json deleted file mode 100644 index 7596f9d3..00000000 --- a/.sqlx/query-c3e88c5a56db17eb8e1f8056c58f5c57d44af2722c379e4528596e9b041242d4.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND sender_address = $2 AND $3::numrange @> timestamp_ns\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bpchar", - "Bpchar", - "NumRange" - ] - }, - "nullable": [] - }, - "hash": "c3e88c5a56db17eb8e1f8056c58f5c57d44af2722c379e4528596e9b041242d4" -} diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index a7bcb4b1..660a2714 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -1,30 +1,109 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; use alloy_primitives::Address; use anyhow::Result; use ethers_core::types::U256; use eventuals::{timer, Eventual, EventualExt}; use serde::Deserialize; +use thiserror::Error; use tokio::time::sleep; use tracing::{error, warn}; use crate::prelude::{Query, SubgraphClient}; +#[derive(Error, Debug)] +pub enum EscrowAccountsError { + #[error("No signer found for sender {sender}")] + NoSignerFound { sender: Address }, + #[error("No balance found for sender {sender}")] + NoBalanceFound { sender: Address }, + #[error("No sender found for signer {signer}")] + NoSenderFound { signer: Address }, +} + +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct EscrowAccounts { + senders_balances: HashMap, + signers_to_senders: HashMap, + senders_to_signers: HashMap>, +} + +impl EscrowAccounts { + pub fn new( + senders_balances: HashMap, + senders_to_signers: HashMap>, + ) -> Self { + let signers_to_senders = senders_to_signers + .iter() + .flat_map(|(sender, signers)| signers.iter().map(move |signer| (*signer, *sender))) + .collect(); + + Self { + senders_balances, + signers_to_senders, + senders_to_signers, + } + } + + pub fn get_signers_for_sender( + &self, + sender: &Address, + ) -> Result, EscrowAccountsError> { + self.senders_to_signers + .get(sender) + .filter(|signers| !signers.is_empty()) + .ok_or(EscrowAccountsError::NoSignerFound { + sender: sender.to_owned(), + }) + .map(|signers| signers.to_owned()) + } + + pub fn get_sender_for_signer(&self, signer: &Address) -> Result { + self.signers_to_senders + .get(signer) + .ok_or(EscrowAccountsError::NoSenderFound { + signer: signer.to_owned(), + }) + .copied() + } + + pub fn get_balance_for_sender(&self, sender: &Address) -> Result { + self.senders_balances + .get(sender) + .ok_or(EscrowAccountsError::NoBalanceFound { + sender: sender.to_owned(), + }) + .copied() + } + + pub fn get_balance_for_signer(&self, signer: &Address) -> Result { + self.get_sender_for_signer(signer) + .and_then(|sender| self.get_balance_for_sender(&sender)) + } + + pub fn get_senders(&self) -> HashSet
{ + self.senders_balances.keys().copied().collect() + } +} + pub fn escrow_accounts( escrow_subgraph: &'static SubgraphClient, indexer_address: Address, interval: Duration, -) -> Eventual> { + reject_thawing_signers: bool, +) -> Eventual { // Types for deserializing the network subgraph response #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct EscrowAccountsResponse { escrow_accounts: Vec, } - // These 2 structs are used to deserialize the response from the escrow subgraph. // Note that U256's serde implementation is based on serializing the internal bytes, not the string decimal // representation. This is why we deserialize them as strings below. #[derive(Deserialize)] @@ -38,50 +117,105 @@ pub fn escrow_accounts( #[serde(rename_all = "camelCase")] struct Sender { id: Address, + authorized_signers: Vec, + } + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct AuthorizedSigner { + id: Address, } + // thawEndTimestamp == 0 means that the signer is not thawing. This also means + // that we don't wait for the thawing period to end before stopping serving + // queries for this signer. + // isAuthorized == true means that the signer is still authorized to sign + // payments in the name of the sender. + let query = if reject_thawing_signers { + r#" + query ($indexer: ID!) { + escrowAccounts(where: {receiver_: {id: $indexer}}) { + balance + totalAmountThawing + sender { + id + authorizedSigners( + where: {thawEndTimestamp: "0", isAuthorized: true} + ) { + id + } + } + } + } + "# + } else { + r#" + query ($indexer: ID!) { + escrowAccounts(where: {receiver_: {id: $indexer}}) { + balance + totalAmountThawing + sender { + id + authorizedSigners( + where: {isAuthorized: true} + ) { + id + } + } + } + } + "# + }; + timer(interval).map_with_retry( move |_| async move { let response = escrow_subgraph .query::(Query::new_with_variables( - r#" - query ($indexer: ID!) { - escrowAccounts(where: {receiver_: {id: $indexer}}) { - balance - totalAmountThawing - sender { - id - } - } - } - "#, + query, [("indexer", format!("{:x?}", indexer_address).into())], )) .await .map_err(|e| e.to_string())?; - response.map_err(|e| e.to_string()).and_then(|data| { - data.escrow_accounts - .iter() - .map(|account| { - let balance = U256::checked_sub( - U256::from_dec_str(&account.balance)?, - U256::from_dec_str(&account.total_amount_thawing)?, - ) - .unwrap_or_else(|| { - warn!( - "Balance minus total amount thawing underflowed for account {}. \ + let response = response.map_err(|e| e.to_string())?; + + let senders_balances = response + .escrow_accounts + .iter() + .map(|account| { + let balance = U256::checked_sub( + U256::from_dec_str(&account.balance)?, + U256::from_dec_str(&account.total_amount_thawing)?, + ) + .unwrap_or_else(|| { + warn!( + "Balance minus total amount thawing underflowed for account {}. \ Setting balance to 0, no queries will be served for this sender.", - account.sender.id - ); - U256::from(0) - }); - - Ok((account.sender.id, balance)) - }) - .collect::, anyhow::Error>>() - .map_err(|e| format!("{}", e)) - }) + account.sender.id + ); + U256::from(0) + }); + + Ok((account.sender.id, balance)) + }) + .collect::, anyhow::Error>>() + .map_err(|e| format!("{}", e))?; + + let senders_to_signers = response + .escrow_accounts + .iter() + .map(|account| { + let sender = account.sender.id; + let signers = account + .sender + .authorized_signers + .iter() + .map(|signer| signer.id) + .collect(); + (sender, signers) + }) + .collect(); + + Ok(EscrowAccounts::new(senders_balances, senders_to_signers)) }, move |err: String| { error!( @@ -96,6 +230,7 @@ pub fn escrow_accounts( #[cfg(test)] mod tests { + use test_log::test; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; @@ -104,7 +239,20 @@ mod tests { use super::*; - #[tokio::test] + #[test] + fn test_new_escrow_accounts() { + let escrow_accounts = EscrowAccounts::new( + test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(), + test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), + ); + + assert_eq!( + escrow_accounts.signers_to_senders, + test_vectors::ESCROW_ACCOUNTS_SIGNERS_TO_SENDERS.to_owned() + ) + } + + #[test(tokio::test)] async fn test_current_accounts() { // Set up a mock escrow subgraph let mock_server = MockServer::start().await; @@ -134,11 +282,15 @@ mod tests { escrow_subgraph, *test_vectors::INDEXER_ADDRESS, Duration::from_secs(60), + true, ); assert_eq!( accounts.value().await.unwrap(), - *test_vectors::ESCROW_ACCOUNTS + EscrowAccounts::new( + test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(), + test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), + ) ); } } diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 37b00183..671a74c9 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -262,6 +262,7 @@ impl IndexerService { escrow_subgraph, options.config.indexer.indexer_address, Duration::from_secs(options.config.escrow_subgraph.syncing_interval), + true, // Reject thawing signers eagerly ); // Establish Database connection necessary for serving indexer management diff --git a/common/src/tap_manager.rs b/common/src/tap_manager.rs index 42559b5a..28da0b3c 100644 --- a/common/src/tap_manager.rs +++ b/common/src/tap_manager.rs @@ -11,12 +11,12 @@ use std::{collections::HashMap, str::FromStr, sync::Arc}; use tap_core::tap_manager::SignedReceipt; use tracing::error; -use crate::prelude::Allocation; +use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation}; #[derive(Clone)] pub struct TapManager { indexer_allocations: Eventual>, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, pgpool: PgPool, domain_separator: Arc, } @@ -25,7 +25,7 @@ impl TapManager { pub fn new( pgpool: PgPool, indexer_allocations: Eventual>, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, domain_separator: Eip712Domain, ) -> Self { Self { @@ -67,23 +67,22 @@ impl TapManager { anyhow!(e) })?; - if !self - .escrow_accounts - .value_immediate() - .unwrap_or_default() - .get(&receipt_signer) - .map_or(false, |balance| balance > &U256::zero()) + let escrow_accounts = self.escrow_accounts.value_immediate().unwrap_or_default(); + + if !escrow_accounts + .get_balance_for_signer(&receipt_signer) + .map_or(false, |balance| balance > U256::zero()) { - return Err(anyhow!( + anyhow::bail!( "Receipt sender `{}` is not eligible for this indexer", - receipt_signer - )); + receipt_signer, + ); } // TODO: consider doing this in another async task to avoid slowing down the paid query flow. sqlx::query!( r#" - INSERT INTO scalar_tap_receipts (allocation_id, sender_address, timestamp_ns, value, receipt) + INSERT INTO scalar_tap_receipts (allocation_id, signer_address, timestamp_ns, value, receipt) VALUES ($1, $2, $3, $4, $5) "#, format!("{:?}", allocation_id) @@ -117,7 +116,7 @@ mod test { use keccak_hash::H256; use sqlx::postgres::PgListener; - use crate::test_vectors::{self, create_signed_receipt, TAP_SENDER}; + use crate::test_vectors::{self, create_signed_receipt}; use super::*; @@ -159,8 +158,10 @@ mod test { )); // Mock escrow accounts - let escrow_accounts = - Eventual::from_value(HashMap::from_iter(vec![(TAP_SENDER.1, U256::from(123))])); + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(), + test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), + )); let tap_manager = TapManager::new( pgpool.clone(), diff --git a/common/src/test_vectors.rs b/common/src/test_vectors.rs index c6b7eaaf..9608349a 100644 --- a/common/src/test_vectors.rs +++ b/common/src/test_vectors.rs @@ -102,14 +102,35 @@ pub const ESCROW_QUERY_RESPONSE: &str = r#" "balance": "34", "totalAmountThawing": "10", "sender": { - "id": "0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1" + "id": "0x9858EfFD232B4033E47d90003D41EC34EcaEda94", + "authorizedSigners": [ + { + "id": "0x533661F0fb14d2E8B26223C86a610Dd7D2260892" + }, + { + "id": "0x2740f6fA9188cF53ffB6729DDD21575721dE92ce" + } + ] } }, { "balance": "42", "totalAmountThawing": "0", "sender": { - "id": "0x22d491bde2303f2f43325b2108d26f1eaba1e32b" + "id": "0x22d491bde2303f2f43325b2108d26f1eaba1e32b", + "authorizedSigners": [ + { + "id": "0x245059163ff6ee14279aa7b35ea8f0fdb967df6e" + } + ] + } + }, + { + "balance": "2987", + "totalAmountThawing": "12", + "sender": { + "id": "0x192c3B6e0184Fa0Cc5B9D2bDDEb6B79Fb216a002", + "authorizedSigners": [] } } ] @@ -240,12 +261,48 @@ lazy_static! { ), ]); - pub static ref ESCROW_ACCOUNTS: HashMap = HashMap::from([ - (Address::from_str("0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1").unwrap(), U256::from(24)), + pub static ref ESCROW_ACCOUNTS_BALANCES: HashMap = HashMap::from([ + (Address::from_str("0x9858EfFD232B4033E47d90003D41EC34EcaEda94").unwrap(), U256::from(24)), // TAP_SENDER (Address::from_str("0x22d491bde2303f2f43325b2108d26f1eaba1e32b").unwrap(), U256::from(42)), + (Address::from_str("0x192c3B6e0184Fa0Cc5B9D2bDDEb6B79Fb216a002").unwrap(), U256::from(2975)), + ]); + + /// Maps signers back to their senders + pub static ref ESCROW_ACCOUNTS_SIGNERS_TO_SENDERS: HashMap = HashMap::from([ + ( + Address::from_str("0x533661F0fb14d2E8B26223C86a610Dd7D2260892").unwrap(), // TAP_SIGNER + Address::from_str("0x9858EfFD232B4033E47d90003D41EC34EcaEda94").unwrap(), // TAP_SENDER + ), + ( + Address::from_str("0x2740f6fA9188cF53ffB6729DDD21575721dE92ce").unwrap(), + Address::from_str("0x9858EfFD232B4033E47d90003D41EC34EcaEda94").unwrap(), // TAP_SENDER + ), + ( + Address::from_str("0x245059163ff6ee14279aa7b35ea8f0fdb967df6e").unwrap(), + Address::from_str("0x22d491bde2303f2f43325b2108d26f1eaba1e32b").unwrap(), + ), + ]); + + pub static ref ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS: HashMap> = HashMap::from([ + ( + Address::from_str("0x9858EfFD232B4033E47d90003D41EC34EcaEda94").unwrap(), // TAP_SENDER + vec![ + Address::from_str("0x533661F0fb14d2E8B26223C86a610Dd7D2260892").unwrap(), // TAP_SIGNER + Address::from_str("0x2740f6fA9188cF53ffB6729DDD21575721dE92ce").unwrap(), + ], + ), + ( + Address::from_str("0x22d491bde2303f2f43325b2108d26f1eaba1e32b").unwrap(), + vec![Address::from_str("0x245059163ff6ee14279aa7b35ea8f0fdb967df6e").unwrap()], + ), + ( + Address::from_str("0x192c3B6e0184Fa0Cc5B9D2bDDEb6B79Fb216a002").unwrap(), + vec![], + ), ]); - /// Fixture to generate a wallet and address + /// Fixture to generate a wallet and address. + /// Address: 0x9858EfFD232B4033E47d90003D41EC34EcaEda94 pub static ref TAP_SENDER: (LocalWallet, Address) = { let wallet: LocalWallet = MnemonicBuilder::::default() .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") @@ -256,6 +313,18 @@ lazy_static! { (wallet, Address::from_slice(address.as_bytes())) }; + /// Fixture to generate a wallet and address. + /// Address: 0x533661F0fb14d2E8B26223C86a610Dd7D2260892 + pub static ref TAP_SIGNER: (LocalWallet, Address) = { + let wallet: LocalWallet = MnemonicBuilder::::default() + .phrase("rude pipe parade travel organ vendor card festival magnet novel forget refuse keep draft tool") + .build() + .unwrap(); + let address = wallet.address(); + + (wallet, Address::from_slice(address.as_bytes())) + }; + pub static ref TAP_EIP712_DOMAIN: Eip712Domain = eip712_domain! { name: "TAP", version: "1", @@ -264,14 +333,14 @@ lazy_static! { }; } -/// Function to generate a signed receipt using the TAP_SENDER wallet. +/// Function to generate a signed receipt using the TAP_SIGNER wallet. pub async fn create_signed_receipt( allocation_id: Address, nonce: u64, timestamp_ns: u64, value: u128, ) -> SignedReceipt { - let (wallet, _) = &*self::TAP_SENDER; + let (wallet, _) = &*self::TAP_SIGNER; EIP712SignedMessage::new( &self::TAP_EIP712_DOMAIN, diff --git a/migrations/20230912220523_tap_receipts.up.sql b/migrations/20230912220523_tap_receipts.up.sql index a95a4a09..1f303901 100644 --- a/migrations/20230912220523_tap_receipts.up.sql +++ b/migrations/20230912220523_tap_receipts.up.sql @@ -1,7 +1,7 @@ CREATE TABLE IF NOT EXISTS scalar_tap_receipts ( id BIGSERIAL PRIMARY KEY, -- id being SERIAL is important for the function of tap-agent allocation_id CHAR(40) NOT NULL, - sender_address CHAR(40) NOT NULL, + signer_address CHAR(40) NOT NULL, timestamp_ns NUMERIC(20) NOT NULL, -- signature CHAR(130) NOT NULL, value NUMERIC(39) NOT NULL, @@ -12,7 +12,7 @@ CREATE FUNCTION scalar_tap_receipt_notify() RETURNS trigger AS $$ BEGIN - PERFORM pg_notify('scalar_tap_receipt_notification', format('{"id": %s, "allocation_id": "%s", "sender_address": "%s", "timestamp_ns": %s, "value": %s}', NEW.id, NEW.allocation_id, NEW.sender_address, NEW.timestamp_ns, NEW.value)); + PERFORM pg_notify('scalar_tap_receipt_notification', format('{"id": %s, "allocation_id": "%s", "signer_address": "%s", "timestamp_ns": %s, "value": %s}', NEW.id, NEW.allocation_id, NEW.signer_address, NEW.timestamp_ns, NEW.value)); RETURN NEW; END; $$ LANGUAGE 'plpgsql'; @@ -29,7 +29,7 @@ CREATE INDEX IF NOT EXISTS scalar_tap_receipts_timestamp_ns_idx ON scalar_tap_re CREATE TABLE IF NOT EXISTS scalar_tap_receipts_invalid ( id BIGSERIAL PRIMARY KEY, allocation_id CHAR(40) NOT NULL, - sender_address CHAR(40) NOT NULL, + signer_address CHAR(40) NOT NULL, timestamp_ns NUMERIC(20) NOT NULL, value NUMERIC(39) NOT NULL, received_receipt JSON NOT NULL diff --git a/tap-agent/src/agent.rs b/tap-agent/src/agent.rs index d1ecde26..1db7a90b 100644 --- a/tap-agent/src/agent.rs +++ b/tap-agent/src/agent.rs @@ -65,6 +65,7 @@ pub async fn start_agent(config: &'static config::Cli) -> SenderAllocationRelati escrow_subgraph, config.ethereum.indexer_address, Duration::from_millis(config.escrow_subgraph.escrow_syncing_interval_ms), + false, ); // TODO: replace with a proper implementation once the gateway registry contract is ready diff --git a/tap-agent/src/tap/escrow_adapter.rs b/tap-agent/src/tap/escrow_adapter.rs index b5957145..0b725774 100644 --- a/tap-agent/src/tap/escrow_adapter.rs +++ b/tap-agent/src/tap/escrow_adapter.rs @@ -5,8 +5,8 @@ use std::{collections::HashMap, sync::Arc}; use alloy_primitives::Address; use async_trait::async_trait; -use ethereum_types::U256; use eventuals::Eventual; +use indexer_common::escrow_accounts::EscrowAccounts; use tap_core::adapters::escrow_adapter::EscrowAdapter as EscrowAdapterTrait; use thiserror::Error; use tokio::sync::RwLock; @@ -21,18 +21,40 @@ use tokio::sync::RwLock; /// receipt checks only when we need to send a RAV request. #[derive(Clone)] pub struct EscrowAdapter { - escrow_accounts: Eventual>, + escrow_accounts: Eventual, sender_pending_fees: Arc>>, } #[derive(Debug, Error)] pub enum AdapterError { - #[error("Error in EscrowAdapter: {error}")] - AdapterError { error: String }, + #[error("Could not get escrow accounts from eventual")] + EscrowEventualError { error: String }, + + #[error("Could not get available escrow for sender")] + AvailableEscrowError(#[from] indexer_common::escrow_accounts::EscrowAccountsError), + + #[error("Sender {sender} escrow balance is too large to fit in u128, could not get available escrow.")] + BalanceTooLarge { sender: Address }, + + #[error("Sender {sender} does not have enough escrow to subtract {fees} from {balance}.")] + NotEnoughEscrow { + sender: Address, + fees: u128, + balance: u128, + }, +} + +// Conversion from eventuals::error::Closed to AdapterError::EscrowEventualError +impl From for AdapterError { + fn from(e: eventuals::error::Closed) -> Self { + AdapterError::EscrowEventualError { + error: format!("{:?}", e), + } + } } impl EscrowAdapter { - pub fn new(escrow_accounts: Eventual>) -> Self { + pub fn new(escrow_accounts: Eventual) -> Self { Self { escrow_accounts, sender_pending_fees: Arc::new(RwLock::new(HashMap::new())), @@ -45,30 +67,16 @@ impl EscrowAdapterTrait for EscrowAdapter { type AdapterError = AdapterError; async fn get_available_escrow(&self, sender: Address) -> Result { - let balance = self - .escrow_accounts - .value() - .await - .map_err(|e| AdapterError::AdapterError { - error: format!("Could not get escrow balance from eventual: {:?}.", e), - })? - .get(&sender) - .ok_or(AdapterError::AdapterError { - error: format!( - "Sender {} not found in escrow balances map, could not get available escrow.", - sender - ) - .to_string(), - })? - .to_owned(); - let balance: u128 = balance.try_into().map_err(|_| AdapterError::AdapterError { - error: format!( - "Sender {} escrow balance is too large to fit in u128, \ - could not get available escrow.", - sender - ) - .to_string(), - })?; + let escrow_accounts = self.escrow_accounts.value().await?; + + let sender = escrow_accounts.get_sender_for_signer(&sender)?; + + let balance = escrow_accounts.get_balance_for_sender(&sender)?.to_owned(); + let balance: u128 = balance + .try_into() + .map_err(|_| AdapterError::BalanceTooLarge { + sender: sender.to_owned(), + })?; let fees = self .sender_pending_fees @@ -81,16 +89,19 @@ impl EscrowAdapterTrait for EscrowAdapter { } async fn subtract_escrow(&self, sender: Address, value: u128) -> Result<(), AdapterError> { + let escrow_accounts = self.escrow_accounts.value().await?; + let current_available_escrow = self.get_available_escrow(sender).await?; + + let sender = escrow_accounts.get_sender_for_signer(&sender)?; + let mut fees_write = self.sender_pending_fees.write().await; - let fees = fees_write.entry(sender).or_insert(0); + let fees = fees_write.entry(sender.to_owned()).or_insert(0); if current_available_escrow < value { - return Err(AdapterError::AdapterError { - error: format!( - "Sender {} does not have enough escrow to subtract {} from {}.", - sender, value, *fees - ) - .to_string(), + return Err(AdapterError::NotEnoughEscrow { + sender: sender.to_owned(), + fees: value, + balance: current_available_escrow, }); } *fees += value; @@ -100,29 +111,32 @@ impl EscrowAdapterTrait for EscrowAdapter { #[cfg(test)] mod test { - use super::*; - use ethereum_types::U256; + use std::vec; - use std::str::FromStr; + use crate::tap::test_utils::{SENDER, SIGNER}; + + use super::*; #[tokio::test] async fn test_subtract_escrow() { - let sender = Address::from_str("0xdeadbeefcafebabedeadbeefcafebabadeadbeef").unwrap(); - let escrow_accounts: Eventual> = - Eventual::from_value(HashMap::from([(sender, U256::from(1000))])); + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + let sender_pending_fees = Arc::new(RwLock::new(HashMap::new())); - sender_pending_fees.write().await.insert(sender, 500); + sender_pending_fees.write().await.insert(SENDER.1, 500); let adapter = EscrowAdapter { escrow_accounts, sender_pending_fees, }; adapter - .subtract_escrow(sender, 500) + .subtract_escrow(SIGNER.1, 500) .await .expect("Subtract escrow."); let available_escrow = adapter - .get_available_escrow(sender) + .get_available_escrow(SIGNER.1) .await .expect("Get available escrow."); assert_eq!(available_escrow, 0); @@ -130,23 +144,25 @@ mod test { #[tokio::test] async fn test_subtract_escrow_overflow() { - let sender = Address::from_str("0xdeadbeefcafebabedeadbeefcafebabadeadbeef").unwrap(); - let escrow_accounts: Eventual> = - Eventual::from_value(HashMap::from([(sender, U256::from(1000))])); + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + let sender_pending_fees = Arc::new(RwLock::new(HashMap::new())); - sender_pending_fees.write().await.insert(sender, 500); + sender_pending_fees.write().await.insert(SENDER.1, 500); let adapter = EscrowAdapter { escrow_accounts, sender_pending_fees, }; adapter - .subtract_escrow(sender, 250) + .subtract_escrow(SIGNER.1, 250) .await .expect("Subtract escrow."); - assert!(adapter.subtract_escrow(sender, 251).await.is_err()); + assert!(adapter.subtract_escrow(SIGNER.1, 251).await.is_err()); let available_escrow = adapter - .get_available_escrow(sender) + .get_available_escrow(SIGNER.1) .await .expect("Get available escrow."); assert_eq!(available_escrow, 250); diff --git a/tap-agent/src/tap/mod.rs b/tap-agent/src/tap/mod.rs index 07e01cb1..11f2f647 100644 --- a/tap-agent/src/tap/mod.rs +++ b/tap-agent/src/tap/mod.rs @@ -1,6 +1,11 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use alloy_primitives::Address; +use anyhow::anyhow; +use eventuals::Eventual; +use indexer_common::escrow_accounts::EscrowAccounts; + mod escrow_adapter; mod rav_storage_adapter; mod receipt_checks_adapter; @@ -10,3 +15,19 @@ pub mod sender_allocation_relationships_manager; #[cfg(test)] pub mod test_utils; + +async fn signers_trimmed( + escrow_accounts: &Eventual, + sender: Address, +) -> Result, anyhow::Error> { + let signers = escrow_accounts + .value() + .await + .map_err(|e| anyhow!("Error while getting escrow accounts: {:?}", e))? + .get_signers_for_sender(&sender)? + .iter() + .map(|s| s.to_string().trim_start_matches("0x").to_owned()) + .collect::>(); + + Ok(signers) +} diff --git a/tap-agent/src/tap/rav_storage_adapter.rs b/tap-agent/src/tap/rav_storage_adapter.rs index 3db3312c..9e0b0a94 100644 --- a/tap-agent/src/tap/rav_storage_adapter.rs +++ b/tap-agent/src/tap/rav_storage_adapter.rs @@ -96,7 +96,7 @@ impl RAVStorageAdapter { #[cfg(test)] mod test { use super::*; - use crate::tap::test_utils::{create_rav, ALLOCATION_ID, SENDER}; + use crate::tap::test_utils::{create_rav, ALLOCATION_ID, SENDER, SIGNER}; use tap_core::adapters::rav_storage_adapter::RAVStorageAdapter as RAVStorageAdapterTrait; #[sqlx::test(migrations = "../migrations")] @@ -108,7 +108,7 @@ mod test { // Insert a rav let mut new_rav = create_rav( *ALLOCATION_ID, - SENDER.0.clone(), + SIGNER.0.clone(), timestamp_ns, value_aggregate, ) @@ -127,7 +127,7 @@ mod test { for i in 0..3 { new_rav = create_rav( *ALLOCATION_ID, - SENDER.0.clone(), + SIGNER.0.clone(), timestamp_ns + i, value_aggregate - (i as u128), ) diff --git a/tap-agent/src/tap/receipt_checks_adapter.rs b/tap-agent/src/tap/receipt_checks_adapter.rs index e0f9c406..ec3687fa 100644 --- a/tap-agent/src/tap/receipt_checks_adapter.rs +++ b/tap-agent/src/tap/receipt_checks_adapter.rs @@ -7,6 +7,7 @@ use alloy_primitives::Address; use async_trait::async_trait; use ethereum_types::U256; use eventuals::{timer, Eventual, EventualExt}; +use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::subgraph_client::{Query, SubgraphClient}; use sqlx::PgPool; use tap_core::adapters::receipt_checks_adapter::ReceiptChecksAdapter as ReceiptChecksAdapterTrait; @@ -20,7 +21,7 @@ use crate::config; pub struct ReceiptChecksAdapter { query_appraisals: Option>>>, allocation_id: Address, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, tap_allocation_redeemed: Eventual, } @@ -30,7 +31,7 @@ impl ReceiptChecksAdapter { _pgpool: PgPool, query_appraisals: Option>>>, allocation_id: Address, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, sender_id: Address, ) -> Self { @@ -125,8 +126,9 @@ impl ReceiptChecksAdapterTrait for ReceiptChecksAdapter { })?; Ok(escrow_accounts - .get(&sender_id) - .map_or(false, |balance| *balance > U256::from(0))) + .get_balance_for_signer(&sender_id) + .map(|balance| balance > U256::from(0)) + .unwrap_or(false)) } } diff --git a/tap-agent/src/tap/receipt_storage_adapter.rs b/tap-agent/src/tap/receipt_storage_adapter.rs index 2b144a03..71d10e8b 100644 --- a/tap-agent/src/tap/receipt_storage_adapter.rs +++ b/tap-agent/src/tap/receipt_storage_adapter.rs @@ -8,7 +8,8 @@ use std::{ use alloy_primitives::Address; use async_trait::async_trait; - +use eventuals::Eventual; +use indexer_common::escrow_accounts::EscrowAccounts; use sqlx::{postgres::types::PgRange, types::BigDecimal, PgPool}; use tap_core::adapters::receipt_storage_adapter::ReceiptStorageAdapter as ReceiptStorageAdapterTrait; use tap_core::{ @@ -18,12 +19,14 @@ use tap_core::{ use thiserror::Error; use tracing::error; -#[derive(Debug)] +use crate::tap::signers_trimmed; + pub struct ReceiptStorageAdapter { pgpool: PgPool, allocation_id: Address, sender: Address, required_checks: Vec, + escrow_accounts: Eventual, } #[derive(Debug, Error)] @@ -100,17 +103,24 @@ impl ReceiptStorageAdapterTrait for ReceiptStorageAdapter { // TODO: Make use of this limit in this function _receipts_limit: Option, ) -> Result, Self::AdapterError> { + let signers = signers_trimmed(&self.escrow_accounts, self.sender) + .await + .map_err(|e| AdapterError::AdapterError { + error: format!("{:?}.", e), + })?; + let records = sqlx::query!( r#" SELECT id, receipt FROM scalar_tap_receipts - WHERE allocation_id = $1 AND sender_address = $2 AND $3::numrange @> timestamp_ns + WHERE allocation_id = $1 AND signer_address IN (SELECT unnest($2::text[])) + AND $3::numrange @> timestamp_ns "#, self.allocation_id .to_string() .trim_start_matches("0x") .to_owned(), - self.sender.to_string().trim_start_matches("0x").to_owned(), + &signers, rangebounds_to_pgrange(timestamp_range_ns) ) .fetch_all(&self.pgpool) @@ -140,16 +150,23 @@ impl ReceiptStorageAdapterTrait for ReceiptStorageAdapter { &self, timestamp_ns: R, ) -> Result<(), Self::AdapterError> { + let signers = signers_trimmed(&self.escrow_accounts, self.sender) + .await + .map_err(|e| AdapterError::AdapterError { + error: format!("{:?}.", e), + })?; + sqlx::query!( r#" DELETE FROM scalar_tap_receipts - WHERE allocation_id = $1 AND sender_address = $2 AND $3::numrange @> timestamp_ns + WHERE allocation_id = $1 AND signer_address IN (SELECT unnest($2::text[])) + AND $3::numrange @> timestamp_ns "#, self.allocation_id .to_string() .trim_start_matches("0x") .to_owned(), - self.sender.to_string().trim_start_matches("0x").to_owned(), + &signers, rangebounds_to_pgrange(timestamp_ns) ) .execute(&self.pgpool) @@ -164,24 +181,29 @@ impl ReceiptStorageAdapter { allocation_id: Address, sender: Address, required_checks: Vec, + escrow_accounts: Eventual, ) -> Self { Self { pgpool, allocation_id, sender, required_checks, + escrow_accounts, } } } #[cfg(test)] mod test { + use std::collections::HashMap; + use super::*; use crate::tap::test_utils::{ create_received_receipt, store_receipt, ALLOCATION_ID, ALLOCATION_ID_IRRELEVANT, SENDER, - SENDER_IRRELEVANT, TAP_EIP712_DOMAIN_SEPARATOR, + SENDER_IRRELEVANT, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, }; use anyhow::Result; + use serde_json::Value; use sqlx::PgPool; use tap_core::tap_receipt::get_full_list_of_checks; @@ -191,9 +213,12 @@ mod test { /// retrieve_receipts_in_timestamp_range. async fn retrieve_range_and_check + Send>( storage_adapter: &ReceiptStorageAdapter, + escrow_accounts: &Eventual, received_receipt_vec: &[(u64, ReceivedReceipt)], range: R, ) -> Result<()> { + let escrow_accounts_snapshot = escrow_accounts.value().await.unwrap(); + // Filtering the received receipts by timestamp range let received_receipt_vec: Vec<(u64, ReceivedReceipt)> = received_receipt_vec .iter() @@ -201,11 +226,14 @@ mod test { range.contains(&received_receipt.signed_receipt().message.timestamp_ns) && (received_receipt.signed_receipt().message.allocation_id == storage_adapter.allocation_id) - && (received_receipt - .signed_receipt() - .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) - .unwrap() - == storage_adapter.sender) + && (escrow_accounts_snapshot + .get_sender_for_signer( + &received_receipt + .signed_receipt() + .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) + .unwrap(), + ) + .map_or(false, |v| v == storage_adapter.sender)) }) .cloned() .collect(); @@ -243,9 +271,12 @@ mod test { async fn remove_range_and_check + Send>( storage_adapter: &ReceiptStorageAdapter, + escrow_accounts: &Eventual, received_receipt_vec: &[ReceivedReceipt], range: R, ) -> Result<()> { + let escrow_accounts_snapshot = escrow_accounts.value().await.unwrap(); + // Storing the receipts let mut received_receipt_id_vec = Vec::new(); for received_receipt in received_receipt_vec.iter() { @@ -269,11 +300,14 @@ mod test { .filter(|(_, received_receipt)| { if (received_receipt.signed_receipt().message.allocation_id == storage_adapter.allocation_id) - && (received_receipt - .signed_receipt() - .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) - .unwrap() - == storage_adapter.sender) + && (escrow_accounts_snapshot + .get_sender_for_signer( + &received_receipt + .signed_receipt() + .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) + .unwrap(), + ) + .map_or(false, |v| v == storage_adapter.sender)) { !range.contains(&received_receipt.signed_receipt().message.timestamp_ns) } else { @@ -345,11 +379,17 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn retrieve_receipts_in_timestamp_range(pgpool: PgPool) { + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + let storage_adapter = ReceiptStorageAdapter::new( pgpool.clone(), *ALLOCATION_ID, SENDER.1, get_full_list_of_checks(), + escrow_accounts.clone(), ); // Creating 10 receipts with timestamps 42 to 51 @@ -358,7 +398,7 @@ mod test { received_receipt_vec.push( create_received_receipt( &ALLOCATION_ID, - &SENDER.0, + &SIGNER.0, i + 684, i + 42, (i + 124).into(), @@ -371,7 +411,7 @@ mod test { received_receipt_vec.push( create_received_receipt( &ALLOCATION_ID_IRRELEVANT, - &SENDER.0, + &SIGNER.0, i + 684, i + 42, (i + 124).into(), @@ -413,7 +453,7 @@ mod test { { $( assert!( - retrieve_range_and_check(&storage_adapter, &received_receipt_vec, $arg) + retrieve_range_and_check(&storage_adapter, &escrow_accounts, &received_receipt_vec, $arg) .await .is_ok()); )+ @@ -483,8 +523,18 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn remove_receipts_in_timestamp_range(pgpool: PgPool) { - let storage_adapter = - ReceiptStorageAdapter::new(pgpool, *ALLOCATION_ID, SENDER.1, get_full_list_of_checks()); + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + + let storage_adapter = ReceiptStorageAdapter::new( + pgpool, + *ALLOCATION_ID, + SENDER.1, + get_full_list_of_checks(), + escrow_accounts.clone(), + ); // Creating 10 receipts with timestamps 42 to 51 let mut received_receipt_vec = Vec::new(); @@ -492,7 +542,7 @@ mod test { received_receipt_vec.push( create_received_receipt( &ALLOCATION_ID, - &SENDER.0, + &SIGNER.0, i + 684, i + 42, (i + 124).into(), @@ -505,7 +555,7 @@ mod test { received_receipt_vec.push( create_received_receipt( &ALLOCATION_ID_IRRELEVANT, - &SENDER.0, + &SIGNER.0, i + 684, i + 42, (i + 124).into(), @@ -531,7 +581,7 @@ mod test { { $( assert!( - remove_range_and_check(&storage_adapter, &received_receipt_vec, $arg) + remove_range_and_check(&storage_adapter, &escrow_accounts, &received_receipt_vec, $arg) .await.is_ok() ); ) + diff --git a/tap-agent/src/tap/sender_allocation_relationship.rs b/tap-agent/src/tap/sender_allocation_relationship.rs index a253167c..64db68a2 100644 --- a/tap-agent/src/tap/sender_allocation_relationship.rs +++ b/tap-agent/src/tap/sender_allocation_relationship.rs @@ -1,14 +1,14 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; +use std::{str::FromStr, sync::Arc, time::Duration}; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use anyhow::{anyhow, ensure}; -use ethereum_types::U256; + use eventuals::Eventual; -use indexer_common::prelude::SubgraphClient; +use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; use sqlx::{types::BigDecimal, PgPool}; use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; @@ -30,7 +30,7 @@ use crate::{ tap::{ escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, receipt_checks_adapter::ReceiptChecksAdapter, - receipt_storage_adapter::ReceiptStorageAdapter, + receipt_storage_adapter::ReceiptStorageAdapter, signers_trimmed, }, }; @@ -66,6 +66,7 @@ struct Inner { unaggregated_fees: Arc>, state: Arc>, config: &'static config::Cli, + escrow_accounts: Eventual, } /// A SenderAllocationRelationship is the relationship between the indexer and the sender in the @@ -88,7 +89,7 @@ impl SenderAllocationRelationship { pgpool: PgPool, allocation_id: Address, sender: Address, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, escrow_adapter: EscrowAdapter, tap_eip712_domain_separator: Eip712Domain, @@ -118,6 +119,7 @@ impl SenderAllocationRelationship { allocation_id, sender, required_checks.clone(), + escrow_accounts.clone(), ); let rav_storage_adapter = RAVStorageAdapter::new(pgpool.clone(), allocation_id, sender); let tap_manager = TapManager::new( @@ -139,6 +141,7 @@ impl SenderAllocationRelationship { unaggregated_fees: Arc::new(Mutex::new(UnaggregatedFees::default())), state: Arc::new(Mutex::new(State::Running)), config, + escrow_accounts, }), rav_requester_task: Arc::new(Mutex::new(None)), } @@ -175,7 +178,7 @@ impl SenderAllocationRelationship { new_receipt_notification.value, unaggregated_fees.value, new_receipt_notification.allocation_id, - new_receipt_notification.sender_address + self.inner.sender ); u128::MAX }); @@ -210,6 +213,8 @@ impl SenderAllocationRelationship { async fn update_unaggregated_fees_static(inner: &Inner) -> Result<(), anyhow::Error> { inner.tap_manager.remove_obsolete_receipts().await?; + let signers = signers_trimmed(&inner.escrow_accounts, inner.sender).await?; + // TODO: Get `rav.timestamp_ns` from the TAP Manager's RAV storage adapter instead? let res = sqlx::query!( r#" @@ -229,7 +234,7 @@ impl SenderAllocationRelationship { scalar_tap_receipts WHERE allocation_id = $1 - AND sender_address = $2 + AND signer_address IN (SELECT unnest($3::text[])) AND CASE WHEN ( SELECT timestamp_ns :: NUMERIC @@ -247,7 +252,8 @@ impl SenderAllocationRelationship { .to_string() .trim_start_matches("0x") .to_owned(), - inner.sender.to_string().trim_start_matches("0x").to_owned() + inner.sender.to_string().trim_start_matches("0x").to_owned(), + &signers ) .fetch_one(&inner.pgpool) .await?; @@ -447,7 +453,7 @@ impl SenderAllocationRelationship { r#" INSERT INTO scalar_tap_receipts_invalid ( allocation_id, - sender_address, + signer_address, timestamp_ns, value, received_receipt @@ -525,6 +531,8 @@ impl Drop for SenderAllocationRelationship { #[cfg(test)] mod tests { + use std::collections::HashMap; + use indexer_common::subgraph_client::DeploymentDetails; use serde_json::json; use tap_aggregator::server::run_server; @@ -537,7 +545,7 @@ mod tests { use super::*; use crate::tap::test_utils::{ create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID, INDEXER, - SENDER, TAP_EIP712_DOMAIN_SEPARATOR, + SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, }; const DUMMY_URL: &str = "http://localhost:1234"; @@ -567,9 +575,10 @@ mod tests { DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), ))); - let (mut escrow_accounts_writer, escrow_accounts_eventual) = - Eventual::>::new(); - escrow_accounts_writer.write(HashMap::from([(SENDER.1, 1000.into())])); + let escrow_accounts_eventual = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); let escrow_adapter = EscrowAdapter::new(escrow_accounts_eventual.clone()); @@ -600,7 +609,7 @@ mod tests { // Add receipts to the database. for i in 1..10 { let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -639,13 +648,13 @@ mod tests { // Add the RAV to the database. // This RAV has timestamp 4. The sender_allocation_relatioship should only consider receipts // with a timestamp greater than 4. - let signed_rav = create_rav(*ALLOCATION_ID, SENDER.0.clone(), 4, 10).await; + let signed_rav = create_rav(*ALLOCATION_ID, SIGNER.0.clone(), 4, 10).await; store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap(); // Add receipts to the database. for i in 1..10 { let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -682,7 +691,7 @@ mod tests { let mut expected_unaggregated_fees = 0u128; for i in 10..20 { let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -710,7 +719,7 @@ mod tests { // table let new_receipt_notification = NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: 10, timestamp_ns: 19, value: 19, @@ -733,7 +742,7 @@ mod tests { // Send a new receipt notification. let new_receipt_notification = NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: 30, timestamp_ns: 20, value: 20, @@ -757,7 +766,7 @@ mod tests { // Send a new receipt notification that has a lower ID than the previous one. let new_receipt_notification = NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: 25, timestamp_ns: 19, value: 19, @@ -783,7 +792,7 @@ mod tests { // Start a TAP aggregator server. let (handle, aggregator_endpoint) = run_server( 0, - SENDER.0.clone(), + SIGNER.0.clone(), TAP_EIP712_DOMAIN_SEPARATOR.clone(), 100 * 1024, 100 * 1024, @@ -818,7 +827,7 @@ mod tests { // Add receipts to the database. for i in 0..10 { let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i + 1, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -845,7 +854,7 @@ mod tests { // Start a TAP aggregator server. let (handle, aggregator_endpoint) = run_server( 0, - SENDER.0.clone(), + SIGNER.0.clone(), TAP_EIP712_DOMAIN_SEPARATOR.clone(), 100 * 1024, 100 * 1024, @@ -887,14 +896,14 @@ mod tests { let value = (i + 10) as u128; let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i + 1, value, i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, value, i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); sender_allocation_relatioship .handle_new_receipt_notification(NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: i, timestamp_ns: i + 1, value, @@ -967,7 +976,7 @@ mod tests { let value = (i + 10) as u128; let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i + 1, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -975,7 +984,7 @@ mod tests { sender_allocation_relatioship .handle_new_receipt_notification(NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: i, timestamp_ns: i + 1, value, diff --git a/tap-agent/src/tap/sender_allocation_relationships_manager.rs b/tap-agent/src/tap/sender_allocation_relationships_manager.rs index 6832a0fb..dd2a67a2 100644 --- a/tap-agent/src/tap/sender_allocation_relationships_manager.rs +++ b/tap-agent/src/tap/sender_allocation_relationships_manager.rs @@ -1,14 +1,15 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashSet; use std::{collections::HashMap, str::FromStr, sync::Arc}; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use anyhow::anyhow; use anyhow::Result; -use ethereum_types::U256; use eventuals::{Eventual, EventualExt, PipeHandle}; +use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::prelude::{Allocation, SubgraphClient}; use serde::Deserialize; use sqlx::{postgres::PgListener, PgPool}; @@ -23,7 +24,7 @@ use crate::config; pub struct NewReceiptNotification { pub id: u64, pub allocation_id: Address, - pub sender_address: Address, + pub signer_address: Address, pub timestamp_ns: u64, pub value: u128, } @@ -42,7 +43,7 @@ struct Inner { sender_allocation_relationships: Arc>>, indexer_allocations: Eventual>, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, escrow_adapter: EscrowAdapter, tap_eip712_domain_separator: Eip712Domain, @@ -54,7 +55,7 @@ impl SenderAllocationRelationshipsManager { config: &'static config::Cli, pgpool: PgPool, indexer_allocations: Eventual>, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, tap_eip712_domain_separator: Eip712Domain, sender_aggregator_endpoints: HashMap, @@ -73,6 +74,12 @@ impl SenderAllocationRelationshipsManager { sender_aggregator_endpoints, }); + let escrow_accounts_snapshot = inner + .escrow_accounts + .value() + .await + .expect("Should get escrow accounts from Eventual"); + Self::update_sender_allocation_relationships( &inner, inner @@ -80,11 +87,7 @@ impl SenderAllocationRelationshipsManager { .value() .await .expect("Should get indexer allocations from Eventual"), - inner - .escrow_accounts - .value() - .await - .expect("Should get escrow accounts from Eventual"), + escrow_accounts_snapshot.get_senders(), ) .await .expect("Should be able to update sender_allocation_relationships"); @@ -111,7 +114,7 @@ impl SenderAllocationRelationshipsManager { // still need to get aggregated. sqlx::query!( r#" - SELECT DISTINCT allocation_id, sender_address + SELECT DISTINCT allocation_id, signer_address FROM scalar_tap_receipts "# ) @@ -122,8 +125,11 @@ impl SenderAllocationRelationshipsManager { .for_each(|row| { let allocation_id = Address::from_str(&row.allocation_id) .expect("allocation_id should be a valid address"); - let sender = Address::from_str(&row.sender_address) - .expect("sender_address should be a valid address"); + let signer = Address::from_str(&row.signer_address) + .expect("signer_address should be a valid address"); + let sender = escrow_accounts_snapshot + .get_sender_for_signer(&signer) + .expect("should be able to get sender from signer"); // Only create a SenderAllocationRelationship if it doesn't exist yet. if let std::collections::hash_map::Entry::Vacant(e) = @@ -162,6 +168,7 @@ impl SenderAllocationRelationshipsManager { let new_receipts_watcher_handle = tokio::spawn(Self::new_receipts_watcher( pglistener, inner.sender_allocation_relationships.clone(), + inner.escrow_accounts.clone(), )); // Start the eligible_allocations_senders_pipe that watches for changes in eligible senders @@ -177,7 +184,7 @@ impl SenderAllocationRelationshipsManager { Self::update_sender_allocation_relationships( &inner, indexer_allocations, - escrow_accounts, + escrow_accounts.get_senders(), ) .await .unwrap_or_else(|e| { @@ -203,6 +210,7 @@ impl SenderAllocationRelationshipsManager { sender_allocation_relationships: Arc< RwLock>, >, + escrow_accounts: Eventual, ) { loop { // TODO: recover from errors or shutdown the whole program? @@ -216,11 +224,29 @@ impl SenderAllocationRelationshipsManager { NewReceiptNotification", ); - if let Some(sender_allocation_relationship) = - sender_allocation_relationships.read().await.get(&( - new_receipt_notification.allocation_id, - new_receipt_notification.sender_address, - )) + let sender_address = escrow_accounts + .value() + .await + .expect("should be able to get escrow accounts") + .get_sender_for_signer(&new_receipt_notification.signer_address); + + let sender_address = match sender_address { + Ok(sender_address) => sender_address, + Err(_) => { + error!( + "No sender address found for receipt signer address {}. \ + This should not happen.", + new_receipt_notification.signer_address + ); + // TODO: save the receipt in the failed receipts table? + continue; + } + }; + + if let Some(sender_allocation_relationship) = sender_allocation_relationships + .read() + .await + .get(&(new_receipt_notification.allocation_id, sender_address)) { sender_allocation_relationship .handle_new_receipt_notification(new_receipt_notification) @@ -228,9 +254,9 @@ impl SenderAllocationRelationshipsManager { } else { warn!( "No sender_allocation_relationship found for allocation_id {} and \ - sender_address {} to process new receipt notification. This should not \ - happen.", - new_receipt_notification.allocation_id, new_receipt_notification.sender_address + sender_address {} to process new receipt notification. This should not \ + happen.", + new_receipt_notification.allocation_id, sender_address ); } } @@ -239,10 +265,9 @@ impl SenderAllocationRelationshipsManager { async fn update_sender_allocation_relationships( inner: &Inner, indexer_allocations: HashMap, - escrow_accounts: HashMap, + senders: HashSet
, ) -> Result<()> { let eligible_allocations: Vec
= indexer_allocations.keys().copied().collect(); - let senders: Vec
= escrow_accounts.keys().copied().collect(); let mut sender_allocation_relationships_write = inner.sender_allocation_relationships.write().await; @@ -303,6 +328,9 @@ impl Drop for SenderAllocationRelationshipsManager { #[cfg(test)] mod tests { + use std::vec; + + use ethereum_types::U256; use indexer_common::{ prelude::{AllocationStatus, SubgraphDeployment}, subgraph_client::DeploymentDetails, @@ -316,7 +344,7 @@ mod tests { use crate::tap::{ sender_allocation_relationship::State, - test_utils::{INDEXER, SENDER, TAP_EIP712_DOMAIN_SEPARATOR}, + test_utils::{INDEXER, SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR}, }; use super::*; @@ -341,8 +369,8 @@ mod tests { indexer_allocations_writer.write(HashMap::new()); let (mut escrow_accounts_writer, escrow_accounts_eventual) = - Eventual::>::new(); - escrow_accounts_writer.write(HashMap::new()); + Eventual::::new(); + escrow_accounts_writer.write(EscrowAccounts::default()); // Mock escrow subgraph. let mock_server = MockServer::start().await; @@ -405,7 +433,10 @@ mod tests { )])); // Add an escrow account to the escrow_accounts Eventual. - escrow_accounts_writer.write(HashMap::from([(SENDER.1, U256::from_str("1000").unwrap())])); + escrow_accounts_writer.write(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); // Wait for the SenderAllocationRelationship to be created. tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; @@ -419,7 +450,7 @@ mod tests { .contains_key(&(allocation_id, SENDER.1))); // Remove the escrow account from the escrow_accounts Eventual. - escrow_accounts_writer.write(HashMap::new()); + escrow_accounts_writer.write(EscrowAccounts::default()); // Wait a bit tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; diff --git a/tap-agent/src/tap/test_utils.rs b/tap-agent/src/tap/test_utils.rs index 1df900e2..c538d17b 100644 --- a/tap-agent/src/tap/test_utils.rs +++ b/tap-agent/src/tap/test_utils.rs @@ -22,7 +22,8 @@ lazy_static! { Address::from_str("0xbcdebcdebcdebcdebcdebcdebcdebcdebcdebcde").unwrap(); pub static ref SENDER: (LocalWallet, Address) = wallet(0); pub static ref SENDER_IRRELEVANT: (LocalWallet, Address) = wallet(1); - pub static ref INDEXER: (LocalWallet, Address) = wallet(2); + pub static ref SIGNER: (LocalWallet, Address) = wallet(2); + pub static ref INDEXER: (LocalWallet, Address) = wallet(3); pub static ref TAP_EIP712_DOMAIN_SEPARATOR: Eip712Domain = eip712_domain! { name: "TAP", version: "1", @@ -47,7 +48,7 @@ pub fn wallet(index: u32) -> (LocalWallet, Address) { /// given `query_id` and `value` pub async fn create_received_receipt( allocation_id: &Address, - sender_wallet: &LocalWallet, + signer_wallet: &LocalWallet, nonce: u64, timestamp_ns: u64, value: u128, @@ -61,7 +62,7 @@ pub async fn create_received_receipt( timestamp_ns, value, }, - sender_wallet, + signer_wallet, ) .await .unwrap(); @@ -71,7 +72,7 @@ pub async fn create_received_receipt( /// Fixture to generate a RAV using the wallet from `keys()` pub async fn create_rav( allocation_id: Address, - sender_wallet: LocalWallet, + signer_wallet: LocalWallet, timestamp_ns: u64, value_aggregate: u128, ) -> SignedRAV { @@ -82,7 +83,7 @@ pub async fn create_rav( timestamp_ns, value_aggregate, }, - &sender_wallet, + &signer_wallet, ) .await .unwrap() @@ -92,7 +93,7 @@ pub async fn store_receipt(pgpool: &PgPool, signed_receipt: SignedReceipt) -> Re let record = sqlx::query!( r#" INSERT INTO scalar_tap_receipts ( - allocation_id, sender_address, timestamp_ns, value, receipt + allocation_id, signer_address, timestamp_ns, value, receipt ) VALUES ($1, $2, $3, $4, $5) RETURNING id