diff --git a/.sqlx/query-4b2b09179ed5f7acc2b04b5ced93613210267f47350d4aabdd91b108479fbf59.json b/.sqlx/query-4b2b09179ed5f7acc2b04b5ced93613210267f47350d4aabdd91b108479fbf59.json new file mode 100644 index 00000000..f4413c05 --- /dev/null +++ b/.sqlx/query-4b2b09179ed5f7acc2b04b5ced93613210267f47350d4aabdd91b108479fbf59.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT DISTINCT \n signer_address,\n (\n SELECT ARRAY \n (\n SELECT DISTINCT allocation_id\n FROM scalar_tap_receipts\n WHERE signer_address = signer_address\n )\n ) AS allocation_ids\n FROM scalar_tap_receipts\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "signer_address", + "type_info": "Bpchar" + }, + { + "ordinal": 1, + "name": "allocation_ids", + "type_info": "BpcharArray" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + null + ] + }, + "hash": "4b2b09179ed5f7acc2b04b5ced93613210267f47350d4aabdd91b108479fbf59" +} diff --git a/.sqlx/query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json b/.sqlx/query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json deleted file mode 100644 index 8d1a90b1..00000000 --- a/.sqlx/query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT DISTINCT allocation_id, signer_address\n FROM scalar_tap_receipts\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "allocation_id", - "type_info": "Bpchar" - }, - { - "ordinal": 1, - "name": "signer_address", - "type_info": "Bpchar" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false - ] - }, - "hash": "778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb" -} diff --git a/tap-agent/src/agent.rs b/tap-agent/src/agent.rs index 8789138e..345e5696 100644 --- a/tap-agent/src/agent.rs +++ b/tap-agent/src/agent.rs @@ -9,11 +9,10 @@ use indexer_common::prelude::{ }; use crate::{ - aggregator_endpoints, config, database, - tap::sender_allocation_relationships_manager::SenderAllocationRelationshipsManager, + aggregator_endpoints, config, database, tap::sender_accounts_manager::SenderAccountsManager, }; -pub async fn start_agent(config: &'static config::Cli) -> SenderAllocationRelationshipsManager { +pub async fn start_agent(config: &'static config::Cli) -> SenderAccountsManager { let pgpool = database::connect(&config.postgres).await; let http_client = reqwest::Client::new(); @@ -80,7 +79,7 @@ pub async fn start_agent(config: &'static config::Cli) -> SenderAllocationRelati verifying_contract: config.receipts.receipts_verifier_address, }; - SenderAllocationRelationshipsManager::new( + SenderAccountsManager::new( config, pgpool, indexer_allocations, diff --git a/tap-agent/src/tap/mod.rs b/tap-agent/src/tap/mod.rs index ab4e9998..7afd0d03 100644 --- a/tap-agent/src/tap/mod.rs +++ b/tap-agent/src/tap/mod.rs @@ -11,8 +11,10 @@ mod escrow_adapter; mod rav_storage_adapter; mod receipt_checks_adapter; mod receipt_storage_adapter; -mod sender_allocation_relationship; -pub mod sender_allocation_relationships_manager; +mod sender_account; +pub mod sender_accounts_manager; +mod sender_allocation; +mod unaggregated_receipts; #[cfg(test)] pub mod test_utils; diff --git a/tap-agent/src/tap/rav_storage_adapter.rs b/tap-agent/src/tap/rav_storage_adapter.rs index 7bdbd898..3a464f19 100644 --- a/tap-agent/src/tap/rav_storage_adapter.rs +++ b/tap-agent/src/tap/rav_storage_adapter.rs @@ -91,18 +91,18 @@ impl RAVStorageAdapter { #[cfg(test)] mod test { use super::*; - use crate::tap::test_utils::{create_rav, ALLOCATION_ID, SENDER, SIGNER}; + use crate::tap::test_utils::{create_rav, ALLOCATION_ID_0, SENDER, SIGNER}; use tap_core::adapters::rav_storage_adapter::RAVStorageAdapter as RAVStorageAdapterTrait; #[sqlx::test(migrations = "../migrations")] async fn update_and_retrieve_rav(pool: PgPool) { let timestamp_ns = u64::MAX - 10; let value_aggregate = u128::MAX; - let rav_storage_adapter = RAVStorageAdapter::new(pool.clone(), *ALLOCATION_ID, SENDER.1); + let rav_storage_adapter = RAVStorageAdapter::new(pool.clone(), *ALLOCATION_ID_0, SENDER.1); // Insert a rav let mut new_rav = create_rav( - *ALLOCATION_ID, + *ALLOCATION_ID_0, SIGNER.0.clone(), timestamp_ns, value_aggregate, @@ -121,7 +121,7 @@ mod test { // Update the RAV 3 times in quick succession for i in 0..3 { new_rav = create_rav( - *ALLOCATION_ID, + *ALLOCATION_ID_0, SIGNER.0.clone(), timestamp_ns + i, value_aggregate - (i as u128), diff --git a/tap-agent/src/tap/receipt_storage_adapter.rs b/tap-agent/src/tap/receipt_storage_adapter.rs index ffff9572..64cb0392 100644 --- a/tap-agent/src/tap/receipt_storage_adapter.rs +++ b/tap-agent/src/tap/receipt_storage_adapter.rs @@ -194,7 +194,7 @@ mod test { use super::*; use crate::tap::test_utils::{ - create_received_receipt, store_receipt, ALLOCATION_ID, ALLOCATION_ID_IRRELEVANT, SENDER, + create_received_receipt, store_receipt, ALLOCATION_ID_0, ALLOCATION_ID_IRRELEVANT, SENDER, SENDER_IRRELEVANT, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, }; use anyhow::Result; @@ -381,7 +381,7 @@ mod test { let storage_adapter = ReceiptStorageAdapter::new( pgpool.clone(), - *ALLOCATION_ID, + *ALLOCATION_ID_0, SENDER.1, get_full_list_of_checks(), escrow_accounts.clone(), @@ -392,7 +392,7 @@ mod test { for i in 0..10 { received_receipt_vec.push( create_received_receipt( - &ALLOCATION_ID, + &ALLOCATION_ID_0, &SIGNER.0, i + 684, i + 42, @@ -416,7 +416,7 @@ mod test { ); received_receipt_vec.push( create_received_receipt( - &ALLOCATION_ID, + &ALLOCATION_ID_0, &SENDER_IRRELEVANT.0, i + 684, i + 42, @@ -525,7 +525,7 @@ mod test { let storage_adapter = ReceiptStorageAdapter::new( pgpool, - *ALLOCATION_ID, + *ALLOCATION_ID_0, SENDER.1, get_full_list_of_checks(), escrow_accounts.clone(), @@ -536,7 +536,7 @@ mod test { for i in 0..10 { received_receipt_vec.push( create_received_receipt( - &ALLOCATION_ID, + &ALLOCATION_ID_0, &SIGNER.0, i + 684, i + 42, @@ -560,7 +560,7 @@ mod test { ); received_receipt_vec.push( create_received_receipt( - &ALLOCATION_ID, + &ALLOCATION_ID_0, &SENDER_IRRELEVANT.0, i + 684, i + 42, diff --git a/tap-agent/src/tap/sender_account.rs b/tap-agent/src/tap/sender_account.rs new file mode 100644 index 00000000..d84f7507 --- /dev/null +++ b/tap-agent/src/tap/sender_account.rs @@ -0,0 +1,917 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + cmp::max, + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; + +use alloy_sol_types::Eip712Domain; +use anyhow::{anyhow, Result}; +use eventuals::Eventual; +use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; +use sqlx::PgPool; +use thegraph::types::Address; +use tokio::{ + select, + sync::{Mutex, MutexGuard, Notify}, + task::JoinSet, + time, +}; +use tracing::{error, warn}; + +use crate::config::{self}; +use crate::tap::{ + escrow_adapter::EscrowAdapter, sender_accounts_manager::NewReceiptNotification, + sender_allocation::SenderAllocation, unaggregated_receipts::UnaggregatedReceipts, +}; + +pub struct Inner { + config: &'static config::Cli, + pgpool: PgPool, + allocations_active: Mutex>>, + allocations_ineligible: Mutex>>, + sender: Address, + sender_aggregator_endpoint: String, + unaggregated_fees: Mutex, +} + +/// A SenderAccount manages the receipts accounting between the indexer and the sender across +/// multiple allocations. +/// +/// Manages the lifecycle of Scalar TAP for the SenderAccount, including: +/// - Monitoring new receipts and keeping track of the cumulative unaggregated fees across +/// allocations. +/// - Requesting RAVs from the sender's TAP aggregator once the cumulative unaggregated fees reach a +/// certain threshold. +/// - Requesting the last RAV from the sender's TAP aggregator for all EOL allocations. +pub struct SenderAccount { + inner: Arc, + escrow_accounts: Eventual, + escrow_subgraph: &'static SubgraphClient, + escrow_adapter: EscrowAdapter, + tap_eip712_domain_separator: Eip712Domain, + rav_requester_task: tokio::task::JoinHandle<()>, + rav_requester_notify: Arc, + rav_requester_finalize_task: tokio::task::JoinHandle<()>, + rav_requester_finalize_notify: Arc, +} + +impl SenderAccount { + #[allow(clippy::too_many_arguments)] + pub fn new( + config: &'static config::Cli, + pgpool: PgPool, + sender_id: Address, + escrow_accounts: Eventual, + escrow_subgraph: &'static SubgraphClient, + escrow_adapter: EscrowAdapter, + tap_eip712_domain_separator: Eip712Domain, + sender_aggregator_endpoint: String, + ) -> Self { + let inner = Arc::new(Inner { + config, + pgpool, + allocations_active: Mutex::new(HashMap::new()), + allocations_ineligible: Mutex::new(HashMap::new()), + sender: sender_id, + sender_aggregator_endpoint, + unaggregated_fees: Mutex::new(UnaggregatedReceipts::default()), + }); + + let rav_requester_notify = Arc::new(Notify::new()); + let rav_requester_finalize_notify = Arc::new(Notify::new()); + + Self { + inner: inner.clone(), + escrow_accounts, + escrow_subgraph, + escrow_adapter, + tap_eip712_domain_separator, + rav_requester_task: tokio::spawn(Self::rav_requester( + inner.clone(), + rav_requester_notify.clone(), + )), + rav_requester_notify, + rav_requester_finalize_task: tokio::spawn(Self::rav_requester_finalize( + inner.clone(), + rav_requester_finalize_notify.clone(), + )), + rav_requester_finalize_notify, + } + } + + /// Update the sender's allocations to match the target allocations. + pub async fn update_allocations(&self, target_allocations: HashSet
) { + let mut allocations_active = self.inner.allocations_active.lock().await; + let mut allocations_ineligible = self.inner.allocations_ineligible.lock().await; + + // Move allocations that are no longer to be active to the ineligible map. + let mut allocations_to_move = Vec::new(); + for allocation_id in allocations_active.keys() { + if !target_allocations.contains(allocation_id) { + allocations_to_move.push(*allocation_id); + } + } + for allocation_id in &allocations_to_move { + allocations_ineligible.insert( + *allocation_id, + allocations_active.remove(allocation_id).unwrap(), + ); + } + + // If we moved any allocations to the ineligible map, notify the RAV requester finalize + // task. + if !allocations_to_move.is_empty() { + self.rav_requester_finalize_notify.notify_waiters(); + } + + // Add new allocations. + for allocation_id in target_allocations { + if !allocations_active.contains_key(&allocation_id) + && !allocations_ineligible.contains_key(&allocation_id) + { + allocations_active.insert( + allocation_id, + Arc::new( + SenderAllocation::new( + self.inner.config, + self.inner.pgpool.clone(), + allocation_id, + self.inner.sender, + self.escrow_accounts.clone(), + self.escrow_subgraph, + self.escrow_adapter.clone(), + self.tap_eip712_domain_separator.clone(), + self.inner.sender_aggregator_endpoint.clone(), + ) + .await, + ), + ); + } + } + } + + pub async fn handle_new_receipt_notification( + &self, + new_receipt_notification: NewReceiptNotification, + ) { + let mut unaggregated_fees = self.inner.unaggregated_fees.lock().await; + + // Else we already processed that receipt, most likely from pulling the receipts + // from the database. + if new_receipt_notification.id > unaggregated_fees.last_id { + if let Some(allocation) = self + .inner + .allocations_active + .lock() + .await + .get(&new_receipt_notification.allocation_id) + { + // Add the receipt value to the allocation's unaggregated fees value. + allocation.fees_add(new_receipt_notification.value).await; + // Add the receipt value to the sender's unaggregated fees value. + Self::fees_add( + self.inner.clone(), + &mut unaggregated_fees, + new_receipt_notification.value, + ); + + unaggregated_fees.last_id = new_receipt_notification.id; + + // Check if we need to trigger a RAV request. + if unaggregated_fees.value >= self.inner.config.tap.rav_request_trigger_value.into() + { + self.rav_requester_notify.notify_waiters(); + } + } else { + error!( + "Received a new receipt notification for allocation {} that doesn't exist \ + or is ineligible for sender {}.", + new_receipt_notification.allocation_id, self.inner.sender + ); + } + } + } + + async fn rav_requester(inner: Arc, notif_value_trigger: Arc) { + loop { + notif_value_trigger.notified().await; + + Self::rav_requester_single(inner.clone()).await; + + // Check if we already need to send another RAV request. + let unaggregated_fees = inner.unaggregated_fees.lock().await; + if unaggregated_fees.value >= inner.config.tap.rav_request_trigger_value.into() { + // If so, "self-notify" to trigger another RAV request. + notif_value_trigger.notify_one(); + + warn!( + "Sender {} has {} unaggregated fees immediately after a RAV request, which is + over the trigger value. Triggering another RAV request.", + inner.sender, unaggregated_fees.value, + ); + } + } + } + + async fn rav_requester_finalize(inner: Arc, notif_finalize_allocations: Arc) { + loop { + // Wait for either 5 minutes or a notification that we need to try to finalize + // allocation receipts. + select! { + _ = time::sleep(Duration::from_secs(300)) => (), + _ = notif_finalize_allocations.notified() => () + } + + // Get a quick snapshot of the current finalizing allocations. They are + // Arcs, so it should be cheap. + let allocations_finalizing = inner + .allocations_ineligible + .lock() + .await + .values() + .cloned() + .collect::>(); + + for allocation in allocations_finalizing { + match allocation.rav_requester_single().await { + Ok(_) => { + if let Err(e) = allocation.mark_rav_final().await { + error!( + "Error while marking allocation {} as final for sender {}: {}", + allocation.get_allocation_id(), + inner.sender, + e + ); + } + + // Remove the allocation from the finalizing map. + inner + .allocations_ineligible + .lock() + .await + .remove(&allocation.get_allocation_id()); + } + Err(e) => { + error!( + "Error while requesting final RAV for sender {} and allocation {}: {}", + inner.sender, + allocation.get_allocation_id(), + e + ) + } + } + } + } + } + + /// Does a single RAV request for the sender's allocation with the highest unaggregated fees + async fn rav_requester_single(inner: Arc) { + let heaviest_allocation = match Self::get_heaviest_allocation(inner.clone()).await { + Ok(a) => a, + Err(e) => { + error!( + "Error while getting allocation with most unaggregated fees: {}", + e + ); + return; + } + }; + + if let Err(e) = heaviest_allocation.rav_requester_single().await { + error!( + "Error while requesting RAV for sender {} and allocation {}: {}", + inner.sender, + heaviest_allocation.get_allocation_id(), + e + ); + return; + }; + + if let Err(e) = Self::recompute_unaggregated_fees_static(inner.clone()).await { + error!( + "Error while recomputing unaggregated fees for sender {}: {}", + inner.sender, e + ); + } + } + + /// Returns the allocation with the highest unaggregated fees value. + async fn get_heaviest_allocation(inner: Arc) -> Result> { + // Get a quick snapshot of the current allocations. They are Arcs, so it should be cheap, + // and we don't want to hold the lock for too long. + let allocations: Vec<_> = inner + .allocations_active + .lock() + .await + .values() + .cloned() + .collect(); + + // Get the fees for each allocation in parallel. This is required because the + // SenderAllocation's fees is behind a Mutex. + let mut set = JoinSet::new(); + for allocation in allocations { + set.spawn(async move { + ( + allocation.clone(), + allocation.get_unaggregated_fees().await.value, + ) + }); + } + + // Find the allocation with the highest fees. Doing it "manually" because we can't get an + // iterator from the JoinSet, and collecting into a Vec doesn't make it much simpler + // anyway. + let mut heaviest_allocation = (None, 0u128); + while let Some(res) = set.join_next().await { + let (allocation, fees) = res?; + if fees > heaviest_allocation.1 { + heaviest_allocation = (Some(allocation), fees); + } + } + + heaviest_allocation + .0 + .ok_or(anyhow!("Heaviest allocation is None")) + } + + pub async fn recompute_unaggregated_fees(&self) -> Result<()> { + Self::recompute_unaggregated_fees_static(self.inner.clone()).await + } + + /// Recompute the sender's total unaggregated fees value and last receipt ID. + async fn recompute_unaggregated_fees_static(inner: Arc) -> Result<()> { + // Similar pattern to get_heaviest_allocation(). + let allocations: Vec<_> = inner + .allocations_active + .lock() + .await + .values() + .cloned() + .collect(); + + let mut set = JoinSet::new(); + for allocation in allocations { + set.spawn(async move { allocation.get_unaggregated_fees().await }); + } + + // Added benefit to this lock is that it pauses the handle_new_receipt_notification() calls + // while we're recomputing the unaggregated fees value. Hopefully this is a very short + // pause overall. + let mut unaggregated_fees = inner.unaggregated_fees.lock().await; + + // Recompute the sender's total unaggregated fees value and last receipt ID, because new + // receipts might have been added to the DB in the meantime. + *unaggregated_fees = UnaggregatedReceipts::default(); // Reset to 0. + while let Some(uf) = set.join_next().await { + let uf = uf?; + Self::fees_add(inner.clone(), &mut unaggregated_fees, uf.value); + unaggregated_fees.last_id = max(unaggregated_fees.last_id, uf.last_id); + } + + Ok(()) + } + + /// Safe add the fees to the unaggregated fees value, log an error if there is an overflow and + /// set the unaggregated fees value to u128::MAX. + fn fees_add( + inner: Arc, + unaggregated_fees: &mut MutexGuard<'_, UnaggregatedReceipts>, + value: u128, + ) { + unaggregated_fees.value = unaggregated_fees + .value + .checked_add(value) + .unwrap_or_else(|| { + // This should never happen, but if it does, we want to know about it. + error!( + "Overflow when adding receipt value {} to total unaggregated fees {} for \ + sender {}. Setting total unaggregated fees to u128::MAX.", + value, unaggregated_fees.value, inner.sender + ); + u128::MAX + }); + } +} + +// Abort tasks on Drop +impl Drop for SenderAccount { + fn drop(&mut self) { + self.rav_requester_task.abort(); + self.rav_requester_finalize_task.abort(); + } +} + +#[cfg(test)] +impl SenderAccount { + pub async fn _tests_get_allocations_active( + &self, + ) -> MutexGuard<'_, HashMap>> { + self.inner.allocations_active.lock().await + } + + pub async fn _tests_get_allocations_ineligible( + &self, + ) -> MutexGuard<'_, HashMap>> { + self.inner.allocations_ineligible.lock().await + } +} + +#[cfg(test)] +mod tests { + + use alloy_primitives::hex::ToHex; + use indexer_common::subgraph_client::DeploymentDetails; + use serde_json::json; + use tap_aggregator::server::run_server; + use tap_core::tap_manager::SignedRAV; + use wiremock::{ + matchers::{body_string_contains, method}, + Mock, MockServer, ResponseTemplate, + }; + + use crate::tap::test_utils::{ + create_received_receipt, store_receipt, ALLOCATION_ID_0, ALLOCATION_ID_1, ALLOCATION_ID_2, + INDEXER, SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, + }; + + use super::*; + + const DUMMY_URL: &str = "http://localhost:1234"; + + async fn create_sender_with_allocations( + pgpool: PgPool, + sender_aggregator_endpoint: String, + escrow_subgraph_endpoint: &str, + ) -> SenderAccount { + let config = Box::leak(Box::new(config::Cli { + config: None, + ethereum: config::Ethereum { + indexer_address: INDEXER.1, + }, + tap: config::Tap { + rav_request_trigger_value: 100, + rav_request_timestamp_buffer_ms: 1, + rav_request_timeout_secs: 5, + ..Default::default() + }, + ..Default::default() + })); + + let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( + reqwest::Client::new(), + None, + DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), + ))); + + let escrow_accounts_eventual = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + + let escrow_adapter = EscrowAdapter::new(escrow_accounts_eventual.clone()); + + let sender = SenderAccount::new( + config, + pgpool, + SENDER.1, + escrow_accounts_eventual, + escrow_subgraph, + escrow_adapter, + TAP_EIP712_DOMAIN_SEPARATOR.clone(), + sender_aggregator_endpoint, + ); + + sender + .update_allocations(HashSet::from([ + *ALLOCATION_ID_0, + *ALLOCATION_ID_1, + *ALLOCATION_ID_2, + ])) + .await; + sender.recompute_unaggregated_fees().await.unwrap(); + + sender + } + + /// Test that the sender_account correctly ignores new receipt notifications with + /// an ID lower than the last receipt ID processed (be it from the DB or from a prior receipt + /// notification). + #[sqlx::test(migrations = "../migrations")] + async fn test_handle_new_receipt_notification(pgpool: PgPool) { + // Add receipts to the database. Before creating the sender and allocation so that it loads + // the receipts from the DB. + let mut expected_unaggregated_fees = 0u128; + for i in 10..20 { + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + expected_unaggregated_fees += u128::from(i); + } + + let sender = + create_sender_with_allocations(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await; + + // Check that the allocation's unaggregated fees are correct. + assert_eq!( + sender + .inner + .allocations_active + .lock() + .await + .get(&*ALLOCATION_ID_0) + .unwrap() + .get_unaggregated_fees() + .await + .value, + expected_unaggregated_fees + ); + + // Check that the sender's unaggregated fees are correct. + assert_eq!( + sender.inner.unaggregated_fees.lock().await.value, + expected_unaggregated_fees + ); + + // Send a new receipt notification that has a lower ID than the last loaded from the DB. + // The last ID in the DB should be 10, since we added 10 receipts to the empty receipts + // table + let new_receipt_notification = NewReceiptNotification { + allocation_id: *ALLOCATION_ID_0, + signer_address: SIGNER.1, + id: 10, + timestamp_ns: 19, + value: 19, + }; + sender + .handle_new_receipt_notification(new_receipt_notification) + .await; + + // Check that the allocation's unaggregated fees have *not* increased. + assert_eq!( + sender + .inner + .allocations_active + .lock() + .await + .get(&*ALLOCATION_ID_0) + .unwrap() + .get_unaggregated_fees() + .await + .value, + expected_unaggregated_fees + ); + + // Check that the unaggregated fees have *not* increased. + assert_eq!( + sender.inner.unaggregated_fees.lock().await.value, + expected_unaggregated_fees + ); + + // Send a new receipt notification. + let new_receipt_notification = NewReceiptNotification { + allocation_id: *ALLOCATION_ID_0, + signer_address: SIGNER.1, + id: 30, + timestamp_ns: 20, + value: 20, + }; + sender + .handle_new_receipt_notification(new_receipt_notification) + .await; + expected_unaggregated_fees += 20; + + // Check that the allocation's unaggregated fees are correct. + assert_eq!( + sender + .inner + .allocations_active + .lock() + .await + .get(&*ALLOCATION_ID_0) + .unwrap() + .get_unaggregated_fees() + .await + .value, + expected_unaggregated_fees + ); + + // Check that the unaggregated fees are correct. + assert_eq!( + sender.inner.unaggregated_fees.lock().await.value, + expected_unaggregated_fees + ); + + // Send a new receipt notification that has a lower ID than the previous one. + let new_receipt_notification = NewReceiptNotification { + allocation_id: *ALLOCATION_ID_0, + signer_address: SIGNER.1, + id: 25, + timestamp_ns: 19, + value: 19, + }; + sender + .handle_new_receipt_notification(new_receipt_notification) + .await; + + // Check that the allocation's unaggregated fees have *not* increased. + assert_eq!( + sender + .inner + .allocations_active + .lock() + .await + .get(&*ALLOCATION_ID_0) + .unwrap() + .get_unaggregated_fees() + .await + .value, + expected_unaggregated_fees + ); + + // Check that the unaggregated fees have *not* increased. + assert_eq!( + sender.inner.unaggregated_fees.lock().await.value, + expected_unaggregated_fees + ); + } + + #[sqlx::test(migrations = "../migrations")] + async fn test_rav_requester_auto(pgpool: PgPool) { + // Start a TAP aggregator server. + let (handle, aggregator_endpoint) = run_server( + 0, + SIGNER.0.clone(), + TAP_EIP712_DOMAIN_SEPARATOR.clone(), + 100 * 1024, + 100 * 1024, + 1, + ) + .await + .unwrap(); + + // Start a mock graphql server using wiremock + let mock_server = MockServer::start().await; + + // Mock result for TAP redeem txs for (allocation, sender) pair. + mock_server + .register( + Mock::given(method("POST")) + .and(body_string_contains("transactions")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(json!({ "data": { "transactions": []}})), + ), + ) + .await; + + // Create a sender_account. + let sender_account = create_sender_with_allocations( + pgpool.clone(), + "http://".to_owned() + &aggregator_endpoint.to_string(), + &mock_server.uri(), + ) + .await; + + // Add receipts to the database and call the `handle_new_receipt_notification` method + // correspondingly. + let mut total_value = 0; + let mut trigger_value = 0; + for i in 0..10 { + // These values should be enough to trigger a RAV request at i == 7 since we set the + // `rav_request_trigger_value` to 100. + let value = (i + 10) as u128; + + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i + 1, value, i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + sender_account + .handle_new_receipt_notification(NewReceiptNotification { + allocation_id: *ALLOCATION_ID_0, + signer_address: SIGNER.1, + id: i, + timestamp_ns: i + 1, + value, + }) + .await; + + total_value += value; + if total_value >= 100 && trigger_value == 0 { + trigger_value = total_value; + } + } + + // Wait for the RAV requester to finish. + for _ in 0..100 { + if sender_account.inner.unaggregated_fees.lock().await.value < trigger_value { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + // Get the latest RAV from the database. + let latest_rav = sqlx::query!( + r#" + SELECT rav + FROM scalar_tap_ravs + WHERE allocation_id = $1 AND sender_address = $2 + "#, + ALLOCATION_ID_0.encode_hex::(), + SENDER.1.encode_hex::() + ) + .fetch_optional(&pgpool) + .await + .map(|r| r.map(|r| r.rav)) + .unwrap(); + + let latest_rav = latest_rav + .map(|r| serde_json::from_value::(r).unwrap()) + .unwrap(); + + // Check that the latest RAV value is correct. + assert!(latest_rav.message.value_aggregate >= trigger_value); + + // Check that the allocation's unaggregated fees value is reduced. + assert!( + sender_account + .inner + .allocations_active + .lock() + .await + .get(&*ALLOCATION_ID_0) + .unwrap() + .get_unaggregated_fees() + .await + .value + <= trigger_value + ); + + // Check that the sender's unaggregated fees value is reduced. + assert!(sender_account.inner.unaggregated_fees.lock().await.value <= trigger_value); + + // Reset the total value and trigger value. + total_value = sender_account.inner.unaggregated_fees.lock().await.value; + trigger_value = 0; + + // Add more receipts + for i in 10..20 { + let value = (i + 10) as u128; + + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i + 1, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + + sender_account + .handle_new_receipt_notification(NewReceiptNotification { + allocation_id: *ALLOCATION_ID_0, + signer_address: SIGNER.1, + id: i, + timestamp_ns: i + 1, + value, + }) + .await; + + total_value += value; + if total_value >= 100 && trigger_value == 0 { + trigger_value = total_value; + } + } + + // Wait for the RAV requester to finish. + for _ in 0..100 { + if sender_account.inner.unaggregated_fees.lock().await.value < trigger_value { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + // Get the latest RAV from the database. + let latest_rav = sqlx::query!( + r#" + SELECT rav + FROM scalar_tap_ravs + WHERE allocation_id = $1 AND sender_address = $2 + "#, + ALLOCATION_ID_0.encode_hex::(), + SENDER.1.encode_hex::() + ) + .fetch_optional(&pgpool) + .await + .map(|r| r.map(|r| r.rav)) + .unwrap(); + + let latest_rav = latest_rav + .map(|r| serde_json::from_value::(r).unwrap()) + .unwrap(); + + // Check that the latest RAV value is correct. + + assert!(latest_rav.message.value_aggregate >= trigger_value); + + // Check that the allocation's unaggregated fees value is reduced. + assert!( + sender_account + .inner + .allocations_active + .lock() + .await + .get(&*ALLOCATION_ID_0) + .unwrap() + .get_unaggregated_fees() + .await + .value + <= trigger_value + ); + + // Check that the unaggregated fees value is reduced. + assert!(sender_account.inner.unaggregated_fees.lock().await.value <= trigger_value); + + // Stop the TAP aggregator server. + handle.stop().unwrap(); + handle.stopped().await; + } + + #[sqlx::test(migrations = "../migrations")] + async fn test_sender_unaggregated_fees(pgpool: PgPool) { + // Create a sender_account. + let sender_account = Arc::new( + create_sender_with_allocations(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await, + ); + + // Closure that adds a number of receipts to an allocation. + let add_receipts = |allocation_id: Address, iterations: u64| { + let sender_account = sender_account.clone(); + + async move { + let mut total_value = 0; + for i in 0..iterations { + let value = (i + 10) as u128; + + let id = sender_account.inner.unaggregated_fees.lock().await.last_id + 1; + + sender_account + .handle_new_receipt_notification(NewReceiptNotification { + allocation_id, + signer_address: SIGNER.1, + id, + timestamp_ns: i + 1, + value, + }) + .await; + + total_value += value; + } + + assert_eq!( + sender_account + .inner + .allocations_active + .lock() + .await + .get(&allocation_id) + .unwrap() + .get_unaggregated_fees() + .await + .value, + total_value + ); + + total_value + } + }; + + // Add receipts to the database for allocation_0 + let total_value_0 = add_receipts(*ALLOCATION_ID_0, 9).await; + + // Add receipts to the database for allocation_1 + let total_value_1 = add_receipts(*ALLOCATION_ID_1, 10).await; + + // Add receipts to the database for allocation_2 + let total_value_2 = add_receipts(*ALLOCATION_ID_2, 8).await; + + // Get the heaviest allocation. + let heaviest_allocation = + SenderAccount::get_heaviest_allocation(sender_account.inner.clone()) + .await + .unwrap(); + + // Check that the heaviest allocation is correct. + assert_eq!(heaviest_allocation.get_allocation_id(), *ALLOCATION_ID_1); + + // Check that the sender's unaggregated fees value is correct. + assert_eq!( + sender_account.inner.unaggregated_fees.lock().await.value, + total_value_0 + total_value_1 + total_value_2 + ); + } +} diff --git a/tap-agent/src/tap/sender_allocation_relationships_manager.rs b/tap-agent/src/tap/sender_accounts_manager.rs similarity index 61% rename from tap-agent/src/tap/sender_allocation_relationships_manager.rs rename to tap-agent/src/tap/sender_accounts_manager.rs index 2a1441e6..a0deb204 100644 --- a/tap-agent/src/tap/sender_allocation_relationships_manager.rs +++ b/tap-agent/src/tap/sender_accounts_manager.rs @@ -16,9 +16,9 @@ use thegraph::types::Address; use tokio::sync::RwLock; use tracing::{error, warn}; -use super::escrow_adapter::EscrowAdapter; -use super::sender_allocation_relationship::SenderAllocationRelationship; use crate::config; +use crate::tap::escrow_adapter::EscrowAdapter; +use crate::tap::sender_account::SenderAccount; #[derive(Deserialize, Debug)] pub struct NewReceiptNotification { @@ -29,7 +29,7 @@ pub struct NewReceiptNotification { pub value: u128, } -pub struct SenderAllocationRelationshipsManager { +pub struct SenderAccountsManager { _inner: Arc, new_receipts_watcher_handle: tokio::task::JoinHandle<()>, _eligible_allocations_senders_pipe: PipeHandle, @@ -39,9 +39,8 @@ pub struct SenderAllocationRelationshipsManager { struct Inner { config: &'static config::Cli, pgpool: PgPool, - /// Map of (allocation_id, sender_address) to SenderAllocationRelationship. - sender_allocation_relationships: - Arc>>, + /// Map of sender_address to SenderAllocation. + sender_accounts: Arc>>, indexer_allocations: Eventual>, escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, @@ -50,7 +49,7 @@ struct Inner { sender_aggregator_endpoints: HashMap, } -impl SenderAllocationRelationshipsManager { +impl SenderAccountsManager { pub async fn new( config: &'static config::Cli, pgpool: PgPool, @@ -65,7 +64,7 @@ impl SenderAllocationRelationshipsManager { let inner = Arc::new(Inner { config, pgpool, - sender_allocation_relationships: Arc::new(RwLock::new(HashMap::new())), + sender_accounts: Arc::new(RwLock::new(HashMap::new())), indexer_allocations, escrow_accounts, escrow_subgraph, @@ -74,27 +73,9 @@ impl SenderAllocationRelationshipsManager { sender_aggregator_endpoints, }); - let escrow_accounts_snapshot = inner - .escrow_accounts - .value() - .await - .expect("Should get escrow accounts from Eventual"); - - Self::update_sender_allocation_relationships( - &inner, - inner - .indexer_allocations - .value() - .await - .expect("Should get indexer allocations from Eventual"), - escrow_accounts_snapshot.get_senders(), - ) - .await - .expect("Should be able to update sender_allocation_relationships"); - // Listen to pg_notify events. We start it before updating the unaggregated_fees for all - // SenderAllocationRelationship instances, so that we don't miss any receipts. PG will - // buffer the notifications until we start consuming them with `new_receipts_watcher`. + // SenderAccount instances, so that we don't miss any receipts. PG will buffer the\ + // notifications until we start consuming them with `new_receipts_watcher`. let mut pglistener = PgListener::connect_with(&inner.pgpool.clone()) .await .unwrap(); @@ -106,73 +87,104 @@ impl SenderAllocationRelationshipsManager { 'scalar_tap_receipt_notification'", ); - let mut sender_allocation_relationships_write_lock = - inner.sender_allocation_relationships.write().await; + let escrow_accounts_snapshot = inner + .escrow_accounts + .value() + .await + .expect("Should get escrow accounts from Eventual"); + + let mut sender_accounts_write_lock = inner.sender_accounts.write().await; - // Create SenderAllocationRelationship instances for all outstanding receipts in the - // database, because they may be linked to allocations that are not eligible anymore, but + // Create Sender and SenderAllocation instances for all outstanding receipts in the + // database because they may be linked to allocations that are not eligible anymore, but // still need to get aggregated. - sqlx::query!( + let unfinalized_allocations_in_db = sqlx::query!( r#" - SELECT DISTINCT allocation_id, signer_address + SELECT DISTINCT + signer_address, + ( + SELECT ARRAY + ( + SELECT DISTINCT allocation_id + FROM scalar_tap_receipts + WHERE signer_address = signer_address + ) + ) AS allocation_ids FROM scalar_tap_receipts "# ) .fetch_all(&inner.pgpool) .await - .unwrap() - .into_iter() - .for_each(|row| { - let allocation_id = Address::from_str(&row.allocation_id) - .expect("allocation_id should be a valid address"); - let signer = Address::from_str(&row.signer_address) + .expect("should be able to fetch unfinalized allocations from the database"); + + for row in unfinalized_allocations_in_db { + let allocation_ids = row + .allocation_ids + .expect("all receipts should have an allocation_id") + .iter() + .map(|allocation_id| { + Address::from_str(allocation_id) + .expect("allocation_id should be a valid address") + }) + .collect::>(); + let signer_id = Address::from_str(&row.signer_address) .expect("signer_address should be a valid address"); - let sender = escrow_accounts_snapshot - .get_sender_for_signer(&signer) + let sender_id = escrow_accounts_snapshot + .get_sender_for_signer(&signer_id) .expect("should be able to get sender from signer"); - // Only create a SenderAllocationRelationship if it doesn't exist yet. - if let std::collections::hash_map::Entry::Vacant(e) = - sender_allocation_relationships_write_lock.entry((allocation_id, sender)) - { - e.insert(SenderAllocationRelationship::new( + let sender = sender_accounts_write_lock + .entry(sender_id) + .or_insert(SenderAccount::new( config, inner.pgpool.clone(), - allocation_id, - sender, + sender_id, inner.escrow_accounts.clone(), inner.escrow_subgraph, inner.escrow_adapter.clone(), inner.tap_eip712_domain_separator.clone(), inner .sender_aggregator_endpoints - .get(&sender) - .unwrap() + .get(&sender_id) + .expect("should be able to get sender_aggregator_endpoint for sender") .clone(), )); - } - }); - // Update the unaggregated_fees for all SenderAllocationRelationship instances by pulling - // the receipts from the database. - for sender_allocation_relationship in sender_allocation_relationships_write_lock.values() { - sender_allocation_relationship - .update_unaggregated_fees() + // Update sender's allocations + sender.update_allocations(allocation_ids.clone()).await; + sender + .recompute_unaggregated_fees() .await - .expect("should be able to update unaggregated_fees"); + .expect("should be able to recompute unaggregated fees"); } - drop(sender_allocation_relationships_write_lock); + drop(sender_accounts_write_lock); + + // Update senders and allocations based on the current state of the network. + // It is important to do this after creating the Sender and SenderAllocation instances based + // on the receipts in the database, because now all ineligible allocation and/or sender that + // we created above will be set for receipt finalization. + Self::update_sender_accounts( + &inner, + inner + .indexer_allocations + .value() + .await + .expect("Should get indexer allocations from Eventual"), + escrow_accounts_snapshot.get_senders(), + ) + .await + .expect("Should be able to update_sender_accounts"); // Start the new_receipts_watcher task that will consume from the `pglistener` let new_receipts_watcher_handle = tokio::spawn(Self::new_receipts_watcher( pglistener, - inner.sender_allocation_relationships.clone(), + inner.sender_accounts.clone(), inner.escrow_accounts.clone(), )); // Start the eligible_allocations_senders_pipe that watches for changes in eligible senders - // and allocations and updates the SenderAllocationRelationship instances accordingly. + // and allocations and updates the SenderAccount instances accordingly. let inner_clone = inner.clone(); let eligible_allocations_senders_pipe = eventuals::join(( inner.indexer_allocations.clone(), @@ -181,17 +193,14 @@ impl SenderAllocationRelationshipsManager { .pipe_async(move |(indexer_allocations, escrow_accounts)| { let inner = inner_clone.clone(); async move { - Self::update_sender_allocation_relationships( + Self::update_sender_accounts( &inner, indexer_allocations, escrow_accounts.get_senders(), ) .await .unwrap_or_else(|e| { - error!( - "Error while updating sender_allocation_relationships: {:?}", - e - ); + error!("Error while updating sender_accounts: {:?}", e); }); } }); @@ -204,12 +213,10 @@ impl SenderAllocationRelationshipsManager { } /// Continuously listens for new receipt notifications from Postgres and forwards them to the - /// corresponding SenderAllocationRelationship. + /// corresponding SenderAccount. async fn new_receipts_watcher( mut pglistener: PgListener, - sender_allocation_relationships: Arc< - RwLock>, - >, + sender_accounts: Arc>>, escrow_accounts: Eventual, ) { loop { @@ -243,81 +250,74 @@ impl SenderAllocationRelationshipsManager { } }; - if let Some(sender_allocation_relationship) = sender_allocation_relationships - .read() - .await - .get(&(new_receipt_notification.allocation_id, sender_address)) - { - sender_allocation_relationship + if let Some(sender_account) = sender_accounts.read().await.get(&sender_address) { + sender_account .handle_new_receipt_notification(new_receipt_notification) .await; } else { warn!( - "No sender_allocation_relationship found for allocation_id {} and \ - sender_address {} to process new receipt notification. This should not \ - happen.", - new_receipt_notification.allocation_id, sender_address + "No sender_allocation_manager found for sender_address {} to process new \ + receipt notification. This should not happen.", + sender_address ); } } } - async fn update_sender_allocation_relationships( + async fn update_sender_accounts( inner: &Inner, indexer_allocations: HashMap, - senders: HashSet
, + target_senders: HashSet
, ) -> Result<()> { - let eligible_allocations: Vec
= indexer_allocations.keys().copied().collect(); - let mut sender_allocation_relationships_write = - inner.sender_allocation_relationships.write().await; - - // Create SenderAllocationRelationship instances for all currently eligible - // (allocation, sender) - for allocation_id in &eligible_allocations { - for sender in &senders { - // Only create a SenderAllocationRelationship if it doesn't exist yet. - if let std::collections::hash_map::Entry::Vacant(e) = - sender_allocation_relationships_write.entry((*allocation_id, *sender)) - { - e.insert(SenderAllocationRelationship::new( - inner.config, - inner.pgpool.clone(), - *allocation_id, - *sender, - inner.escrow_accounts.clone(), - inner.escrow_subgraph, - inner.escrow_adapter.clone(), - inner.tap_eip712_domain_separator.clone(), - inner - .sender_aggregator_endpoints - .get(sender) - .ok_or_else(|| { - anyhow!("No sender_aggregator_endpoint found for sender {}", sender) - })? - .clone(), - )); - } + let eligible_allocations: HashSet
= indexer_allocations.keys().copied().collect(); + let mut sender_accounts_write = inner.sender_accounts.write().await; + + // For all Senders that are not in the target_senders HashSet, set all their allocations to + // ineligible. That will trigger a finalization of all their receipts. + for (sender_id, sender_account) in sender_accounts_write.iter_mut() { + if !target_senders.contains(sender_id) { + sender_account.update_allocations(HashSet::new()).await; } } - // Trigger a last rav request for all SenderAllocationRelationship instances that correspond - // to ineligible (allocations, sender). - for ((allocation_id, sender), sender_allocation_relatioship) in - sender_allocation_relationships_write.iter() - { - if !eligible_allocations.contains(allocation_id) || !senders.contains(sender) { - sender_allocation_relatioship.start_last_rav_request().await - } + // Get or create SenderAccount instances for all currently eligible + // senders. + for sender_id in &target_senders { + let sender = sender_accounts_write + .entry(*sender_id) + .or_insert(SenderAccount::new( + inner.config, + inner.pgpool.clone(), + *sender_id, + inner.escrow_accounts.clone(), + inner.escrow_subgraph, + inner.escrow_adapter.clone(), + inner.tap_eip712_domain_separator.clone(), + inner + .sender_aggregator_endpoints + .get(sender_id) + .ok_or_else(|| { + anyhow!( + "No sender_aggregator_endpoint found for sender {}", + sender_id + ) + })? + .clone(), + )); + + // Update sender's allocations + sender + .update_allocations(eligible_allocations.clone()) + .await; } - // TODO: remove SenderAllocationRelationship instances that are finished. Ideally done in - // another async task? + // TODO: remove Sender instances that are finished. Ideally done in another async task? Ok(()) } } -impl Drop for SenderAllocationRelationshipsManager { +impl Drop for SenderAccountsManager { fn drop(&mut self) { // Abort the notification watcher on drop. Otherwise it may panic because the PgPool could // get dropped before. (Observed in tests) @@ -342,15 +342,12 @@ mod tests { Mock, MockServer, ResponseTemplate, }; - use crate::tap::{ - sender_allocation_relationship::State, - test_utils::{INDEXER, SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR}, - }; + use crate::tap::test_utils::{INDEXER, SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR}; use super::*; #[sqlx::test(migrations = "../migrations")] - async fn test_sender_allocation_relatioship_creation_and_eol(pgpool: PgPool) { + async fn test_sender_account_creation_and_eol(pgpool: PgPool) { let config = Box::leak(Box::new(config::Cli { config: None, ethereum: config::Ethereum { @@ -390,7 +387,7 @@ mod tests { DeploymentDetails::for_query_url(&mock_server.uri()).unwrap(), ))); - let sender_allocation_relatioships = SenderAllocationRelationshipsManager::new( + let sender_account = SenderAccountsManager::new( config, pgpool.clone(), indexer_allocations_eventual, @@ -436,16 +433,16 @@ mod tests { HashMap::from([(SENDER.1, vec![SIGNER.1])]), )); - // Wait for the SenderAllocationRelationship to be created. + // Wait for the SenderAccount to be created. tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - // Check that the SenderAllocationRelationship was created. - assert!(sender_allocation_relatioships + // Check that the SenderAccount was created. + assert!(sender_account ._inner - .sender_allocation_relationships + .sender_accounts .write() .await - .contains_key(&(allocation_id, SENDER.1))); + .contains_key(&SENDER.1)); // Remove the escrow account from the escrow_accounts Eventual. escrow_accounts_writer.write(EscrowAccounts::default()); @@ -453,18 +450,26 @@ mod tests { // Wait a bit tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - // Check that the SenderAllocationRelationship state is last_rav_pending - assert_eq!( - sender_allocation_relatioships - ._inner - .sender_allocation_relationships - .read() - .await - .get(&(allocation_id, SENDER.1)) - .unwrap() - .state() - .await, - State::LastRavPending - ); + // Check that the Sender's allocation moved from active to ineligible. + assert!(sender_account + ._inner + .sender_accounts + .read() + .await + .get(&SENDER.1) + .unwrap() + ._tests_get_allocations_active() + .await + .is_empty()); + assert!(sender_account + ._inner + .sender_accounts + .read() + .await + .get(&SENDER.1) + .unwrap() + ._tests_get_allocations_ineligible() + .await + .contains_key(&allocation_id)); } } diff --git a/tap-agent/src/tap/sender_allocation.rs b/tap-agent/src/tap/sender_allocation.rs new file mode 100644 index 00000000..8a604c6a --- /dev/null +++ b/tap-agent/src/tap/sender_allocation.rs @@ -0,0 +1,576 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{str::FromStr, sync::Arc, time::Duration}; + +use alloy_primitives::hex::ToHex; +use alloy_sol_types::Eip712Domain; +use anyhow::{anyhow, ensure, Result}; +use eventuals::Eventual; +use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; +use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; +use sqlx::{types::BigDecimal, PgPool}; +use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; +use tap_core::{ + eip_712_signed_message::EIP712SignedMessage, + receipt_aggregate_voucher::ReceiptAggregateVoucher, + tap_manager::RAVRequest, + tap_receipt::{ReceiptCheck, ReceivedReceipt}, +}; +use thegraph::types::Address; +use tokio::sync::Mutex; +use tracing::{error, warn}; + +use crate::{ + config::{self}, + tap::{ + escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, + receipt_checks_adapter::ReceiptChecksAdapter, + receipt_storage_adapter::ReceiptStorageAdapter, signers_trimmed, + unaggregated_receipts::UnaggregatedReceipts, + }, +}; + +type TapManager = tap_core::tap_manager::Manager< + EscrowAdapter, + ReceiptChecksAdapter, + ReceiptStorageAdapter, + RAVStorageAdapter, +>; + +/// Manages unaggregated fees and the TAP lifecyle for a specific (allocation, sender) pair. +pub struct SenderAllocation { + pgpool: PgPool, + tap_manager: TapManager, + allocation_id: Address, + sender: Address, + sender_aggregator_endpoint: String, + unaggregated_fees: Arc>, + config: &'static config::Cli, + escrow_accounts: Eventual, + rav_request_guard: tokio::sync::Mutex<()>, +} + +impl SenderAllocation { + #[allow(clippy::too_many_arguments)] + pub async fn new( + config: &'static config::Cli, + pgpool: PgPool, + allocation_id: Address, + sender: Address, + escrow_accounts: Eventual, + escrow_subgraph: &'static SubgraphClient, + escrow_adapter: EscrowAdapter, + tap_eip712_domain_separator: Eip712Domain, + sender_aggregator_endpoint: String, + ) -> Self { + let required_checks = vec![ + ReceiptCheck::CheckUnique, + ReceiptCheck::CheckAllocationId, + ReceiptCheck::CheckTimestamp, + // ReceiptCheck::CheckValue, + ReceiptCheck::CheckSignature, + ReceiptCheck::CheckAndReserveEscrow, + ]; + + let receipt_checks_adapter = ReceiptChecksAdapter::new( + config, + pgpool.clone(), + // TODO: Implement query appraisals. + None, + allocation_id, + escrow_accounts.clone(), + escrow_subgraph, + sender, + ); + let receipt_storage_adapter = ReceiptStorageAdapter::new( + pgpool.clone(), + allocation_id, + sender, + required_checks.clone(), + escrow_accounts.clone(), + ); + let rav_storage_adapter = RAVStorageAdapter::new(pgpool.clone(), allocation_id, sender); + let tap_manager = TapManager::new( + tap_eip712_domain_separator.clone(), + escrow_adapter, + receipt_checks_adapter, + rav_storage_adapter, + receipt_storage_adapter, + required_checks, + 0, + ); + + let sender_allocation = Self { + pgpool, + tap_manager, + allocation_id, + sender, + sender_aggregator_endpoint, + unaggregated_fees: Arc::new(Mutex::new(UnaggregatedReceipts::default())), + config, + escrow_accounts, + rav_request_guard: tokio::sync::Mutex::new(()), + }; + + sender_allocation + .update_unaggregated_fees() + .await + .map_err(|e| { + error!( + "Error while updating unaggregated fees for allocation {}: {}", + allocation_id, e + ) + }) + .ok(); + + sender_allocation + } + + /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager + /// with the latest unaggregated fees from the database. + async fn update_unaggregated_fees(&self) -> Result<()> { + self.tap_manager.remove_obsolete_receipts().await?; + + let signers = signers_trimmed(&self.escrow_accounts, self.sender).await?; + + // TODO: Get `rav.timestamp_ns` from the TAP Manager's RAV storage adapter instead? + let res = sqlx::query!( + r#" + WITH rav AS ( + SELECT + rav -> 'message' ->> 'timestamp_ns' AS timestamp_ns + FROM + scalar_tap_ravs + WHERE + allocation_id = $1 + AND sender_address = $2 + ) + SELECT + MAX(id), + SUM(value) + FROM + scalar_tap_receipts + WHERE + allocation_id = $1 + AND signer_address IN (SELECT unnest($3::text[])) + AND CASE WHEN ( + SELECT + timestamp_ns :: NUMERIC + FROM + rav + ) IS NOT NULL THEN timestamp_ns > ( + SELECT + timestamp_ns :: NUMERIC + FROM + rav + ) ELSE TRUE END + "#, + self.allocation_id.encode_hex::(), + self.sender.encode_hex::(), + &signers + ) + .fetch_one(&self.pgpool) + .await?; + + let mut unaggregated_fees = self.unaggregated_fees.lock().await; + + ensure!( + res.sum.is_none() == res.max.is_none(), + "Exactly one of SUM(value) and MAX(id) is null. This should not happen." + ); + + unaggregated_fees.last_id = res.max.unwrap_or(0).try_into()?; + unaggregated_fees.value = res + .sum + .unwrap_or(BigDecimal::from(0)) + .to_string() + .parse::()?; + + // TODO: check if we need to run a RAV request here. + + Ok(()) + } + + /// Request a RAV from the sender's TAP aggregator. Only one RAV request will be running at a + /// time through the use of an internal guard. + pub async fn rav_requester_single(&self) -> Result<()> { + // Making extra sure that only one RAV request is running at a time. + let _guard = self.rav_request_guard.lock().await; + + let RAVRequest { + valid_receipts, + previous_rav, + invalid_receipts, + expected_rav, + } = self + .tap_manager + .create_rav_request( + self.config.tap.rav_request_timestamp_buffer_ms * 1_000_000, + // TODO: limit the number of receipts to aggregate per request. + None, + ) + .await?; + if !invalid_receipts.is_empty() { + warn!( + "Found {} invalid receipts for allocation {} and sender {}.", + invalid_receipts.len(), + self.allocation_id, + self.sender + ); + + // Save invalid receipts to the database for logs. + // TODO: consider doing that in a spawned task? + Self::store_invalid_receipts(self, &invalid_receipts).await?; + } + let client = HttpClientBuilder::default() + .request_timeout(Duration::from_secs( + self.config.tap.rav_request_timeout_secs, + )) + .build(&self.sender_aggregator_endpoint)?; + let response: JsonRpcResponse> = client + .request( + "aggregate_receipts", + rpc_params!( + "0.0", // TODO: Set the version in a smarter place. + valid_receipts, + previous_rav + ), + ) + .await?; + if let Some(warnings) = response.warnings { + warn!("Warnings from sender's TAP aggregator: {:?}", warnings); + } + match self + .tap_manager + .verify_and_store_rav(expected_rav.clone(), response.data.clone()) + .await + { + Ok(_) => {} + + // Adapter errors are local software errors. Shouldn't be a problem with the sender. + Err(tap_core::Error::AdapterError { source_error: e }) => { + anyhow::bail!("TAP Adapter error while storing RAV: {:?}", e) + } + + // The 3 errors below signal an invalid RAV, which should be about problems with the + // sender. The sender could be malicious. + Err( + e @ tap_core::Error::InvalidReceivedRAV { + expected_rav: _, + received_rav: _, + } + | e @ tap_core::Error::SignatureError(_) + | e @ tap_core::Error::InvalidRecoveredSigner { address: _ }, + ) => { + Self::store_failed_rav(self, &expected_rav, &response.data, &e.to_string()).await?; + anyhow::bail!("Invalid RAV, sender could be malicious: {:?}.", e); + } + + // All relevant errors should be handled above. If we get here, we forgot to handle + // an error case. + Err(e) => { + anyhow::bail!("Error while verifying and storing RAV: {:?}", e); + } + } + Self::update_unaggregated_fees(self).await?; + Ok(()) + } + + pub async fn mark_rav_final(&self) -> Result<()> { + let updated_rows = sqlx::query!( + r#" + UPDATE scalar_tap_ravs + SET final = true + WHERE allocation_id = $1 AND sender_address = $2 + RETURNING * + "#, + self.allocation_id.encode_hex::(), + self.sender.encode_hex::(), + ) + .fetch_all(&self.pgpool) + .await?; + if updated_rows.len() != 1 { + anyhow::bail!( + "Expected exactly one row to be updated in the latest RAVs table, \ + but {} were updated.", + updated_rows.len() + ); + }; + Ok(()) + } + + async fn store_invalid_receipts(&self, receipts: &[ReceivedReceipt]) -> Result<()> { + for received_receipt in receipts.iter() { + sqlx::query!( + r#" + INSERT INTO scalar_tap_receipts_invalid ( + allocation_id, + signer_address, + timestamp_ns, + value, + received_receipt + ) + VALUES ($1, $2, $3, $4, $5) + "#, + self.allocation_id.encode_hex::(), + self.sender.encode_hex::(), + BigDecimal::from(received_receipt.signed_receipt().message.timestamp_ns), + BigDecimal::from_str(&received_receipt.signed_receipt().message.value.to_string())?, + serde_json::to_value(received_receipt)? + ) + .execute(&self.pgpool) + .await + .map_err(|e| anyhow!("Failed to store failed receipt: {:?}", e))?; + } + + Ok(()) + } + + async fn store_failed_rav( + &self, + expected_rav: &ReceiptAggregateVoucher, + rav: &EIP712SignedMessage, + reason: &str, + ) -> Result<()> { + sqlx::query!( + r#" + INSERT INTO scalar_tap_rav_requests_failed ( + allocation_id, + sender_address, + expected_rav, + rav_response, + reason + ) + VALUES ($1, $2, $3, $4, $5) + "#, + self.allocation_id.encode_hex::(), + self.sender.encode_hex::(), + serde_json::to_value(expected_rav)?, + serde_json::to_value(rav)?, + reason + ) + .execute(&self.pgpool) + .await + .map_err(|e| anyhow!("Failed to store failed RAV: {:?}", e))?; + + Ok(()) + } + + /// Safe add the fees to the unaggregated fees value, log an error if there is an overflow and + /// set the unaggregated fees value to u128::MAX. + pub async fn fees_add(&self, fees: u128) { + let mut unaggregated_fees = self.unaggregated_fees.lock().await; + unaggregated_fees.value = unaggregated_fees + .value + .checked_add(fees) + .unwrap_or_else(|| { + // This should never happen, but if it does, we want to know about it. + error!( + "Overflow when adding receipt value {} to total unaggregated fees {} for \ + allocation {} and sender {}. Setting total unaggregated fees to u128::MAX.", + fees, unaggregated_fees.value, self.allocation_id, self.sender + ); + u128::MAX + }); + } + + pub async fn get_unaggregated_fees(&self) -> UnaggregatedReceipts { + self.unaggregated_fees.lock().await.clone() + } + + pub fn get_allocation_id(&self) -> Address { + self.allocation_id + } +} + +#[cfg(test)] +mod tests { + + use std::collections::HashMap; + + use indexer_common::subgraph_client::DeploymentDetails; + use serde_json::json; + use tap_aggregator::server::run_server; + + use wiremock::{ + matchers::{body_string_contains, method}, + Mock, MockServer, ResponseTemplate, + }; + + use super::*; + use crate::tap::test_utils::{ + create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID_0, INDEXER, + SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, + }; + + const DUMMY_URL: &str = "http://localhost:1234"; + + async fn create_sender_allocation( + pgpool: PgPool, + sender_aggregator_endpoint: String, + escrow_subgraph_endpoint: &str, + ) -> SenderAllocation { + let config = Box::leak(Box::new(config::Cli { + config: None, + ethereum: config::Ethereum { + indexer_address: INDEXER.1, + }, + tap: config::Tap { + rav_request_trigger_value: 100, + rav_request_timestamp_buffer_ms: 1, + rav_request_timeout_secs: 5, + ..Default::default() + }, + ..Default::default() + })); + + let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( + reqwest::Client::new(), + None, + DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), + ))); + + let escrow_accounts_eventual = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + + let escrow_adapter = EscrowAdapter::new(escrow_accounts_eventual.clone()); + + SenderAllocation::new( + config, + pgpool.clone(), + *ALLOCATION_ID_0, + SENDER.1, + escrow_accounts_eventual, + escrow_subgraph, + escrow_adapter, + TAP_EIP712_DOMAIN_SEPARATOR.clone(), + sender_aggregator_endpoint, + ) + .await + } + + /// Test that the sender_allocation correctly updates the unaggregated fees from the + /// database when there is no RAV in the database. + /// + /// The sender_allocation should consider all receipts found for the allocation and + /// sender. + #[sqlx::test(migrations = "../migrations")] + async fn test_update_unaggregated_fees_no_rav(pgpool: PgPool) { + let sender_allocation = + create_sender_allocation(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await; + + // Add receipts to the database. + for i in 1..10 { + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + } + + // Let the sender_allocation update the unaggregated fees from the database. + sender_allocation.update_unaggregated_fees().await.unwrap(); + + // Check that the unaggregated fees are correct. + assert_eq!( + sender_allocation.unaggregated_fees.lock().await.value, + 45u128 + ); + } + + /// Test that the sender_allocation correctly updates the unaggregated fees from the + /// database when there is a RAV in the database as well as receipts which timestamp are lesser + /// and greater than the RAV's timestamp. + /// + /// The sender_allocation should only consider receipts with a timestamp greater + /// than the RAV's timestamp. + #[sqlx::test(migrations = "../migrations")] + async fn test_update_unaggregated_fees_with_rav(pgpool: PgPool) { + let sender_allocation = + create_sender_allocation(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await; + + // Add the RAV to the database. + // This RAV has timestamp 4. The sender_allocation should only consider receipts + // with a timestamp greater than 4. + let signed_rav = create_rav(*ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10).await; + store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap(); + + // Add receipts to the database. + for i in 1..10 { + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + } + + // Let the sender_allocation update the unaggregated fees from the database. + sender_allocation.update_unaggregated_fees().await.unwrap(); + + // Check that the unaggregated fees are correct. + assert_eq!( + sender_allocation.unaggregated_fees.lock().await.value, + 35u128 + ); + } + + #[sqlx::test(migrations = "../migrations")] + async fn test_rav_requester_manual(pgpool: PgPool) { + // Start a TAP aggregator server. + let (handle, aggregator_endpoint) = run_server( + 0, + SIGNER.0.clone(), + TAP_EIP712_DOMAIN_SEPARATOR.clone(), + 100 * 1024, + 100 * 1024, + 1, + ) + .await + .unwrap(); + + // Start a mock graphql server using wiremock + let mock_server = MockServer::start().await; + + // Mock result for TAP redeem txs for (allocation, sender) pair. + mock_server + .register( + Mock::given(method("POST")) + .and(body_string_contains("transactions")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(json!({ "data": { "transactions": []}})), + ), + ) + .await; + + // Create a sender_allocation. + let sender_allocation = create_sender_allocation( + pgpool.clone(), + "http://".to_owned() + &aggregator_endpoint.to_string(), + &mock_server.uri(), + ) + .await; + + // Add receipts to the database. + for i in 0..10 { + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i + 1, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + } + + // Let the sender_allocation update the unaggregated fees from the database. + sender_allocation.update_unaggregated_fees().await.unwrap(); + + // Trigger a RAV request manually. + sender_allocation.rav_requester_single().await.unwrap(); + + // Stop the TAP aggregator server. + handle.stop().unwrap(); + handle.stopped().await; + } +} diff --git a/tap-agent/src/tap/sender_allocation_relationship.rs b/tap-agent/src/tap/sender_allocation_relationship.rs deleted file mode 100644 index 198dc6f5..00000000 --- a/tap-agent/src/tap/sender_allocation_relationship.rs +++ /dev/null @@ -1,1024 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::{str::FromStr, sync::Arc, time::Duration}; - -use alloy_primitives::hex::ToHex; -use alloy_sol_types::Eip712Domain; -use anyhow::{anyhow, ensure, Result}; -use thegraph::types::Address; - -use eventuals::Eventual; -use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; -use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; -use sqlx::{types::BigDecimal, PgPool}; -use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; -use tap_core::{ - eip_712_signed_message::EIP712SignedMessage, - receipt_aggregate_voucher::ReceiptAggregateVoucher, - tap_manager::RAVRequest, - tap_receipt::{ReceiptCheck, ReceivedReceipt}, -}; -use tokio::sync::Mutex; -use tracing::{error, warn}; - -use super::sender_allocation_relationships_manager::NewReceiptNotification; -use crate::{ - config::{self}, - tap::{ - escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, - receipt_checks_adapter::ReceiptChecksAdapter, - receipt_storage_adapter::ReceiptStorageAdapter, signers_trimmed, - }, -}; - -type TapManager = tap_core::tap_manager::Manager< - EscrowAdapter, - ReceiptChecksAdapter, - ReceiptStorageAdapter, - RAVStorageAdapter, ->; - -#[derive(Default, Debug)] -struct UnaggregatedFees { - pub value: u128, - /// The ID of the last receipt value added to the unaggregated fees value. - /// This is used to make sure we don't process the same receipt twice. Relies on the fact that - /// the receipts IDs are SERIAL in the database. - pub last_id: u64, -} - -#[derive(Debug, PartialEq, Clone, Copy)] -pub enum State { - Running, - LastRavPending, - Finished, -} - -struct Inner { - pgpool: PgPool, - tap_manager: TapManager, - allocation_id: Address, - sender: Address, - sender_aggregator_endpoint: String, - unaggregated_fees: Arc>, - state: Arc>, - config: &'static config::Cli, - escrow_accounts: Eventual, -} - -/// A SenderAllocationRelationship is the relationship between the indexer and the sender in the -/// context of a single allocation. -/// -/// Manages the lifecycle of Scalar TAP for the SenderAllocationRelationship, including: -/// - Monitoring new receipts and keeping track of the unaggregated fees. -/// - Requesting RAVs from the sender's TAP aggregator once the unaggregated fees reach a certain -/// threshold. -/// - Requesting the last RAV from the sender's TAP aggregator (on SenderAllocationRelationship EOL) -pub struct SenderAllocationRelationship { - inner: Arc, - rav_requester_task: tokio::task::JoinHandle<()>, - rav_requester_notify: Arc, -} - -impl SenderAllocationRelationship { - #[allow(clippy::too_many_arguments)] - pub fn new( - config: &'static config::Cli, - pgpool: PgPool, - allocation_id: Address, - sender: Address, - escrow_accounts: Eventual, - escrow_subgraph: &'static SubgraphClient, - escrow_adapter: EscrowAdapter, - tap_eip712_domain_separator: Eip712Domain, - sender_aggregator_endpoint: String, - ) -> Self { - let required_checks = vec![ - ReceiptCheck::CheckUnique, - ReceiptCheck::CheckAllocationId, - ReceiptCheck::CheckTimestamp, - // ReceiptCheck::CheckValue, - ReceiptCheck::CheckSignature, - ReceiptCheck::CheckAndReserveEscrow, - ]; - - let receipt_checks_adapter = ReceiptChecksAdapter::new( - config, - pgpool.clone(), - // TODO: Implement query appraisals. - None, - allocation_id, - escrow_accounts.clone(), - escrow_subgraph, - sender, - ); - let receipt_storage_adapter = ReceiptStorageAdapter::new( - pgpool.clone(), - allocation_id, - sender, - required_checks.clone(), - escrow_accounts.clone(), - ); - let rav_storage_adapter = RAVStorageAdapter::new(pgpool.clone(), allocation_id, sender); - let tap_manager = TapManager::new( - tap_eip712_domain_separator.clone(), - escrow_adapter, - receipt_checks_adapter, - rav_storage_adapter, - receipt_storage_adapter, - required_checks, - 0, - ); - - let inner = Arc::new(Inner { - pgpool, - tap_manager, - allocation_id, - sender, - sender_aggregator_endpoint, - unaggregated_fees: Arc::new(Mutex::new(UnaggregatedFees::default())), - state: Arc::new(Mutex::new(State::Running)), - config, - escrow_accounts, - }); - - let rav_requester_notify = Arc::new(tokio::sync::Notify::new()); - let rav_requester_task = tokio::spawn(Self::rav_requester( - inner.clone(), - rav_requester_notify.clone(), - )); - - Self { - inner, - rav_requester_task, - rav_requester_notify, - } - } - - pub async fn handle_new_receipt_notification( - &self, - new_receipt_notification: NewReceiptNotification, - ) { - // If we're in the last rav pending state or finished, we don't want to process any new - // receipts. - if self.state().await != State::Running { - error!( - "Received a new receipt notification for now ineligible allocation {} and \ - sender {}.", - self.inner.allocation_id, self.inner.sender - ); - return; - } - - let mut unaggregated_fees = self.inner.unaggregated_fees.lock().await; - - // Else we already processed that receipt, most likely from pulling the receipts - // from the database. - if new_receipt_notification.id > unaggregated_fees.last_id { - unaggregated_fees.value = unaggregated_fees - .value - .checked_add(new_receipt_notification.value) - .unwrap_or_else(|| { - // This should never happen, but if it does, we want to know about it. - error!( - "Overflow when adding receipt value {} to total unaggregated fees {} for \ - allocation {} and sender {}. Setting total unaggregated fees to u128::MAX.", - new_receipt_notification.value, - unaggregated_fees.value, - new_receipt_notification.allocation_id, - self.inner.sender - ); - u128::MAX - }); - unaggregated_fees.last_id = new_receipt_notification.id; - - // TODO: consider making the trigger per sender, instead of per (sender, allocation). - if unaggregated_fees.value >= self.inner.config.tap.rav_request_trigger_value.into() { - self.rav_requester_notify.notify_waiters(); - } - } - } - - pub async fn start_last_rav_request(&self) { - *(self.inner.state.lock().await) = State::LastRavPending; - self.rav_requester_notify.notify_one(); - } - - /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager - /// with the latest unaggregated fees from the database. - pub async fn update_unaggregated_fees(&self) -> Result<()> { - Self::update_unaggregated_fees_static(&self.inner).await - } - - /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager - /// with the latest unaggregated fees from the database. - async fn update_unaggregated_fees_static(inner: &Inner) -> Result<()> { - inner.tap_manager.remove_obsolete_receipts().await?; - - let signers = signers_trimmed(&inner.escrow_accounts, inner.sender).await?; - - // TODO: Get `rav.timestamp_ns` from the TAP Manager's RAV storage adapter instead? - let res = sqlx::query!( - r#" - WITH rav AS ( - SELECT - rav -> 'message' ->> 'timestamp_ns' AS timestamp_ns - FROM - scalar_tap_ravs - WHERE - allocation_id = $1 - AND sender_address = $2 - ) - SELECT - MAX(id), - SUM(value) - FROM - scalar_tap_receipts - WHERE - allocation_id = $1 - AND signer_address IN (SELECT unnest($3::text[])) - AND CASE WHEN ( - SELECT - timestamp_ns :: NUMERIC - FROM - rav - ) IS NOT NULL THEN timestamp_ns > ( - SELECT - timestamp_ns :: NUMERIC - FROM - rav - ) ELSE TRUE END - "#, - inner.allocation_id.encode_hex::(), - inner.sender.encode_hex::(), - &signers - ) - .fetch_one(&inner.pgpool) - .await?; - - let mut unaggregated_fees = inner.unaggregated_fees.lock().await; - - ensure!( - res.sum.is_none() == res.max.is_none(), - "Exactly one of SUM(value) and MAX(id) is null. This should not happen." - ); - - unaggregated_fees.last_id = res.max.unwrap_or(0).try_into()?; - unaggregated_fees.value = res - .sum - .unwrap_or(BigDecimal::from(0)) - .to_string() - .parse::()?; - - // TODO: check if we need to run a RAV request here. - - Ok(()) - } - - /// Request a RAV from the sender's TAP aggregator. - /// Will remove the aggregated receipts from the database if successful. - async fn rav_requester(inner: Arc, notifications: Arc) { - loop { - // Wait for a RAV request notification. - notifications.notified().await; - - Self::rav_requester_try(&inner).await.unwrap_or_else(|e| { - error!( - "Error while requesting a RAV for allocation {} and sender {}: {:?}", - inner.allocation_id, inner.sender, e - ); - }); - } - } - - async fn rav_requester_try(inner: &Arc) -> Result<()> { - loop { - Self::rav_requester_single(inner).await?; - - // Check if we need to request another RAV immediately. - let unaggregated_fees = inner.unaggregated_fees.lock().await; - if unaggregated_fees.value < inner.config.tap.rav_request_trigger_value.into() { - break; - } else { - // Go right back to requesting a RAV and warn the user. - warn!( - "Unaggregated fees for allocation {} and sender {} are {} right \ - after the RAV request. This is a sign that the TAP agent can't keep \ - up with the rate of new receipts. Consider increasing the \ - `rav_request_trigger_value` in the TAP agent config. It could also be \ - a sign that the sender's TAP aggregator is too slow.", - inner.allocation_id, inner.sender, unaggregated_fees.value - ); - } - } - - let mut state = inner.state.lock().await; - if *state == State::LastRavPending { - // Mark the last RAV as final in the DB as a cue for the indexer-agent. - Self::mark_rav_final(inner).await?; - - *state = State::Finished; - }; - anyhow::Ok(()) - } - - async fn rav_requester_single(inner: &Arc) -> Result<()> { - let RAVRequest { - valid_receipts, - previous_rav, - invalid_receipts, - expected_rav, - } = inner - .tap_manager - .create_rav_request( - inner.config.tap.rav_request_timestamp_buffer_ms * 1_000_000, - // TODO: limit the number of receipts to aggregate per request. - None, - ) - .await?; - if !invalid_receipts.is_empty() { - warn!( - "Found {} invalid receipts for allocation {} and sender {}.", - invalid_receipts.len(), - inner.allocation_id, - inner.sender - ); - - // Save invalid receipts to the database for logs. - // TODO: consider doing that in a spawned task? - Self::store_invalid_receipts(inner, &invalid_receipts).await?; - } - let client = HttpClientBuilder::default() - .request_timeout(Duration::from_secs( - inner.config.tap.rav_request_timeout_secs, - )) - .build(&inner.sender_aggregator_endpoint)?; - let response: JsonRpcResponse> = client - .request( - "aggregate_receipts", - rpc_params!( - "0.0", // TODO: Set the version in a smarter place. - valid_receipts, - previous_rav - ), - ) - .await?; - if let Some(warnings) = response.warnings { - warn!("Warnings from sender's TAP aggregator: {:?}", warnings); - } - match inner - .tap_manager - .verify_and_store_rav(expected_rav.clone(), response.data.clone()) - .await - { - Ok(_) => {} - - // Adapter errors are local software errors. Shouldn't be a problem with the sender. - Err(tap_core::Error::AdapterError { source_error: e }) => { - anyhow::bail!("TAP Adapter error while storing RAV: {:?}", e) - } - - // The 3 errors below signal an invalid RAV, which should be about problems with the - // sender. The sender could be malicious. - Err( - e @ tap_core::Error::InvalidReceivedRAV { - expected_rav: _, - received_rav: _, - } - | e @ tap_core::Error::SignatureError(_) - | e @ tap_core::Error::InvalidRecoveredSigner { address: _ }, - ) => { - Self::store_failed_rav(inner, &expected_rav, &response.data, &e.to_string()) - .await?; - anyhow::bail!("Invalid RAV, sender could be malicious: {:?}.", e); - } - - // All relevant errors should be handled above. If we get here, we forgot to handle - // an error case. - Err(e) => { - anyhow::bail!("Error while verifying and storing RAV: {:?}", e); - } - } - Self::update_unaggregated_fees_static(inner).await?; - Ok(()) - } - - async fn mark_rav_final(inner: &Arc) -> Result<()> { - let updated_rows = sqlx::query!( - r#" - UPDATE scalar_tap_ravs - SET final = true - WHERE allocation_id = $1 AND sender_address = $2 - RETURNING * - "#, - inner.allocation_id.encode_hex::(), - inner.sender.encode_hex::(), - ) - .fetch_all(&inner.pgpool) - .await?; - if updated_rows.len() != 1 { - anyhow::bail!( - "Expected exactly one row to be updated in the latest RAVs table, \ - but {} were updated.", - updated_rows.len() - ); - }; - Ok(()) - } - - pub async fn state(&self) -> State { - *self.inner.state.lock().await - } - - async fn store_invalid_receipts(inner: &Inner, receipts: &[ReceivedReceipt]) -> Result<()> { - for received_receipt in receipts.iter() { - sqlx::query!( - r#" - INSERT INTO scalar_tap_receipts_invalid ( - allocation_id, - signer_address, - timestamp_ns, - value, - received_receipt - ) - VALUES ($1, $2, $3, $4, $5) - "#, - inner.allocation_id.encode_hex::(), - inner.sender.encode_hex::(), - BigDecimal::from(received_receipt.signed_receipt().message.timestamp_ns), - BigDecimal::from_str(&received_receipt.signed_receipt().message.value.to_string())?, - serde_json::to_value(received_receipt)? - ) - .execute(&inner.pgpool) - .await - .map_err(|e| anyhow!("Failed to store failed receipt: {:?}", e))?; - } - - Ok(()) - } - - async fn store_failed_rav( - inner: &Inner, - expected_rav: &ReceiptAggregateVoucher, - rav: &EIP712SignedMessage, - reason: &str, - ) -> Result<()> { - sqlx::query!( - r#" - INSERT INTO scalar_tap_rav_requests_failed ( - allocation_id, - sender_address, - expected_rav, - rav_response, - reason - ) - VALUES ($1, $2, $3, $4, $5) - "#, - inner.allocation_id.encode_hex::(), - inner.sender.encode_hex::(), - serde_json::to_value(expected_rav)?, - serde_json::to_value(rav)?, - reason - ) - .execute(&inner.pgpool) - .await - .map_err(|e| anyhow!("Failed to store failed RAV: {:?}", e))?; - - Ok(()) - } -} - -impl Drop for SenderAllocationRelationship { - /// Trying to make sure the RAV requester task is dropped when the SenderAllocationRelationship - /// is dropped. - fn drop(&mut self) { - self.rav_requester_task.abort(); - } -} - -#[cfg(test)] -mod tests { - - use std::collections::HashMap; - - use indexer_common::subgraph_client::DeploymentDetails; - use serde_json::json; - use tap_aggregator::server::run_server; - use tap_core::tap_manager::SignedRAV; - use wiremock::{ - matchers::{body_string_contains, method}, - Mock, MockServer, ResponseTemplate, - }; - - use super::*; - use crate::tap::test_utils::{ - create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID, INDEXER, - SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, - }; - - const DUMMY_URL: &str = "http://localhost:1234"; - - async fn create_sender_allocation_relationship( - pgpool: PgPool, - sender_aggregator_endpoint: String, - escrow_subgraph_endpoint: &str, - ) -> SenderAllocationRelationship { - let config = Box::leak(Box::new(config::Cli { - config: None, - ethereum: config::Ethereum { - indexer_address: INDEXER.1, - }, - tap: config::Tap { - rav_request_trigger_value: 100, - rav_request_timestamp_buffer_ms: 1, - rav_request_timeout_secs: 5, - ..Default::default() - }, - ..Default::default() - })); - - let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( - reqwest::Client::new(), - None, - DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), - ))); - - let escrow_accounts_eventual = Eventual::from_value(EscrowAccounts::new( - HashMap::from([(SENDER.1, 1000.into())]), - HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); - - let escrow_adapter = EscrowAdapter::new(escrow_accounts_eventual.clone()); - - SenderAllocationRelationship::new( - config, - pgpool.clone(), - *ALLOCATION_ID, - SENDER.1, - escrow_accounts_eventual, - escrow_subgraph, - escrow_adapter, - TAP_EIP712_DOMAIN_SEPARATOR.clone(), - sender_aggregator_endpoint, - ) - } - - /// Test that the sender_allocation_relatioship correctly updates the unaggregated fees from the - /// database when there is no RAV in the database. - /// - /// The sender_allocation_relatioship should consider all receipts found for the allocation and - /// sender. - #[sqlx::test(migrations = "../migrations")] - async fn test_update_unaggregated_fees_no_rav(pgpool: PgPool) { - let sender_allocation_relatioship = - create_sender_allocation_relationship(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL) - .await; - - // Add receipts to the database. - for i in 1..10 { - let receipt = - create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; - store_receipt(&pgpool, receipt.signed_receipt()) - .await - .unwrap(); - } - - // Let the sender_allocation_relatioship update the unaggregated fees from the database. - sender_allocation_relatioship - .update_unaggregated_fees() - .await - .unwrap(); - - // Check that the unaggregated fees are correct. - assert_eq!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value, - 45u128 - ); - } - - /// Test that the sender_allocation_relatioship correctly updates the unaggregated fees from the - /// database when there is a RAV in the database as well as receipts which timestamp are lesser - /// and greater than the RAV's timestamp. - /// - /// The sender_allocation_relatioship should only consider receipts with a timestamp greater - /// than the RAV's timestamp. - #[sqlx::test(migrations = "../migrations")] - async fn test_update_unaggregated_fees_with_rav(pgpool: PgPool) { - let sender_allocation_relatioship = - create_sender_allocation_relationship(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL) - .await; - - // Add the RAV to the database. - // This RAV has timestamp 4. The sender_allocation_relatioship should only consider receipts - // with a timestamp greater than 4. - let signed_rav = create_rav(*ALLOCATION_ID, SIGNER.0.clone(), 4, 10).await; - store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap(); - - // Add receipts to the database. - for i in 1..10 { - let receipt = - create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; - store_receipt(&pgpool, receipt.signed_receipt()) - .await - .unwrap(); - } - - // Let the sender_allocation_relatioship update the unaggregated fees from the database. - sender_allocation_relatioship - .update_unaggregated_fees() - .await - .unwrap(); - - // Check that the unaggregated fees are correct. - assert_eq!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value, - 35u128 - ); - } - - /// Test that the sender_allocation_relatioship correctly ignores new receipt notifications with - /// an ID lower than the last receipt ID processed (be it from the DB or from a prior receipt - /// notification). - #[sqlx::test(migrations = "../migrations")] - async fn test_handle_new_receipt_notification(pgpool: PgPool) { - let sender_allocation_relatioship = - create_sender_allocation_relationship(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL) - .await; - - // Add receipts to the database. - let mut expected_unaggregated_fees = 0u128; - for i in 10..20 { - let receipt = - create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; - store_receipt(&pgpool, receipt.signed_receipt()) - .await - .unwrap(); - expected_unaggregated_fees += u128::from(i); - } - - sender_allocation_relatioship - .update_unaggregated_fees() - .await - .unwrap(); - - // Check that the unaggregated fees are correct. - assert_eq!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value, - expected_unaggregated_fees - ); - - // Send a new receipt notification that has a lower ID than the last loaded from the DB. - // The last ID in the DB should be 10, since we added 10 receipts to the empty receipts - // table - let new_receipt_notification = NewReceiptNotification { - allocation_id: *ALLOCATION_ID, - signer_address: SIGNER.1, - id: 10, - timestamp_ns: 19, - value: 19, - }; - sender_allocation_relatioship - .handle_new_receipt_notification(new_receipt_notification) - .await; - - // Check that the unaggregated fees have *not* increased. - assert_eq!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value, - expected_unaggregated_fees - ); - - // Send a new receipt notification. - let new_receipt_notification = NewReceiptNotification { - allocation_id: *ALLOCATION_ID, - signer_address: SIGNER.1, - id: 30, - timestamp_ns: 20, - value: 20, - }; - sender_allocation_relatioship - .handle_new_receipt_notification(new_receipt_notification) - .await; - expected_unaggregated_fees += 20; - - // Check that the unaggregated fees are correct. - assert_eq!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value, - expected_unaggregated_fees - ); - - // Send a new receipt notification that has a lower ID than the previous one. - let new_receipt_notification = NewReceiptNotification { - allocation_id: *ALLOCATION_ID, - signer_address: SIGNER.1, - id: 25, - timestamp_ns: 19, - value: 19, - }; - sender_allocation_relatioship - .handle_new_receipt_notification(new_receipt_notification) - .await; - - // Check that the unaggregated fees have *not* increased. - assert_eq!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value, - expected_unaggregated_fees - ); - } - - #[sqlx::test(migrations = "../migrations")] - async fn test_rav_requester_manual(pgpool: PgPool) { - // Start a TAP aggregator server. - let (handle, aggregator_endpoint) = run_server( - 0, - SIGNER.0.clone(), - TAP_EIP712_DOMAIN_SEPARATOR.clone(), - 100 * 1024, - 100 * 1024, - 1, - ) - .await - .unwrap(); - - // Start a mock graphql server using wiremock - let mock_server = MockServer::start().await; - - // Mock result for TAP redeem txs for (allocation, sender) pair. - mock_server - .register( - Mock::given(method("POST")) - .and(body_string_contains("transactions")) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(json!({ "data": { "transactions": []}})), - ), - ) - .await; - - // Create a sender_allocation_relatioship. - let sender_allocation_relatioship = create_sender_allocation_relationship( - pgpool.clone(), - "http://".to_owned() + &aggregator_endpoint.to_string(), - &mock_server.uri(), - ) - .await; - - // Add receipts to the database. - for i in 0..10 { - let receipt = - create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, i.into(), i).await; - store_receipt(&pgpool, receipt.signed_receipt()) - .await - .unwrap(); - } - - // Let the sender_allocation_relatioship update the unaggregated fees from the database. - sender_allocation_relatioship - .update_unaggregated_fees() - .await - .unwrap(); - - // Trigger a RAV request manually. - SenderAllocationRelationship::rav_requester_try(&sender_allocation_relatioship.inner) - .await - .unwrap(); - - // Stop the TAP aggregator server. - handle.stop().unwrap(); - handle.stopped().await; - } - - #[sqlx::test(migrations = "../migrations")] - async fn test_rav_requester_auto(pgpool: PgPool) { - // Start a TAP aggregator server. - let (handle, aggregator_endpoint) = run_server( - 0, - SIGNER.0.clone(), - TAP_EIP712_DOMAIN_SEPARATOR.clone(), - 100 * 1024, - 100 * 1024, - 1, - ) - .await - .unwrap(); - - // Start a mock graphql server using wiremock - let mock_server = MockServer::start().await; - - // Mock result for TAP redeem txs for (allocation, sender) pair. - mock_server - .register( - Mock::given(method("POST")) - .and(body_string_contains("transactions")) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(json!({ "data": { "transactions": []}})), - ), - ) - .await; - - // Create a sender_allocation_relatioship. - let sender_allocation_relatioship = create_sender_allocation_relationship( - pgpool.clone(), - "http://".to_owned() + &aggregator_endpoint.to_string(), - &mock_server.uri(), - ) - .await; - - // Add receipts to the database and call the `handle_new_receipt_notification` method - // correspondingly. - let mut total_value = 0; - let mut trigger_value = 0; - for i in 0..10 { - // These values should be enough to trigger a RAV request at i == 7 since we set the - // `rav_request_trigger_value` to 100. - let value = (i + 10) as u128; - - let receipt = - create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, value, i).await; - store_receipt(&pgpool, receipt.signed_receipt()) - .await - .unwrap(); - sender_allocation_relatioship - .handle_new_receipt_notification(NewReceiptNotification { - allocation_id: *ALLOCATION_ID, - signer_address: SIGNER.1, - id: i, - timestamp_ns: i + 1, - value, - }) - .await; - - total_value += value; - if total_value >= 100 && trigger_value == 0 { - trigger_value = total_value; - } - } - - // Wait for the RAV requester to finish. - for _ in 0..100 { - if sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value - < trigger_value - { - break; - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - - // Get the latest RAV from the database. - let latest_rav = sqlx::query!( - r#" - SELECT rav - FROM scalar_tap_ravs - WHERE allocation_id = $1 AND sender_address = $2 - "#, - ALLOCATION_ID.encode_hex::(), - SENDER.1.encode_hex::() - ) - .fetch_optional(&pgpool) - .await - .map(|r| r.map(|r| r.rav)) - .unwrap(); - - let latest_rav = latest_rav - .map(|r| serde_json::from_value::(r).unwrap()) - .unwrap(); - - // Check that the latest RAV value is correct. - assert!(latest_rav.message.value_aggregate >= trigger_value); - - // Check that the unaggregated fees value is reduced. - assert!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value - <= trigger_value - ); - - // Reset the total value and trigger value. - total_value = sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value; - trigger_value = 0; - - // Add more receipts - for i in 10..20 { - let value = (i + 10) as u128; - - let receipt = - create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, i.into(), i).await; - store_receipt(&pgpool, receipt.signed_receipt()) - .await - .unwrap(); - - sender_allocation_relatioship - .handle_new_receipt_notification(NewReceiptNotification { - allocation_id: *ALLOCATION_ID, - signer_address: SIGNER.1, - id: i, - timestamp_ns: i + 1, - value, - }) - .await; - - total_value += value; - if total_value >= 100 && trigger_value == 0 { - trigger_value = total_value; - } - } - - // Wait for the RAV requester to finish. - for _ in 0..100 { - if sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value - < trigger_value - { - break; - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - - // Get the latest RAV from the database. - let latest_rav = sqlx::query!( - r#" - SELECT rav - FROM scalar_tap_ravs - WHERE allocation_id = $1 AND sender_address = $2 - "#, - ALLOCATION_ID.encode_hex::(), - SENDER.1.encode_hex::() - ) - .fetch_optional(&pgpool) - .await - .map(|r| r.map(|r| r.rav)) - .unwrap(); - - let latest_rav = latest_rav - .map(|r| serde_json::from_value::(r).unwrap()) - .unwrap(); - - // Check that the latest RAV value is correct. - - assert!(latest_rav.message.value_aggregate >= trigger_value); - - // Check that the unaggregated fees value is reduced. - assert!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value - <= trigger_value - ); - - // Stop the TAP aggregator server. - handle.stop().unwrap(); - handle.stopped().await; - } -} diff --git a/tap-agent/src/tap/test_utils.rs b/tap-agent/src/tap/test_utils.rs index 0edf3b74..d0cf8b7d 100644 --- a/tap-agent/src/tap/test_utils.rs +++ b/tap-agent/src/tap/test_utils.rs @@ -17,8 +17,12 @@ use tap_core::{eip_712_signed_message::EIP712SignedMessage, tap_receipt::Receipt use thegraph::types::Address; lazy_static! { - pub static ref ALLOCATION_ID: Address = + pub static ref ALLOCATION_ID_0: Address = Address::from_str("0xabababababababababababababababababababab").unwrap(); + pub static ref ALLOCATION_ID_1: Address = + Address::from_str("0xbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc").unwrap(); + pub static ref ALLOCATION_ID_2: Address = + Address::from_str("0xcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd").unwrap(); pub static ref ALLOCATION_ID_IRRELEVANT: Address = Address::from_str("0xbcdebcdebcdebcdebcdebcdebcdebcdebcdebcde").unwrap(); pub static ref SENDER: (LocalWallet, Address) = wallet(0); diff --git a/tap-agent/src/tap/unaggregated_receipts.rs b/tap-agent/src/tap/unaggregated_receipts.rs new file mode 100644 index 00000000..0511152c --- /dev/null +++ b/tap-agent/src/tap/unaggregated_receipts.rs @@ -0,0 +1,11 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +#[derive(Default, Debug, Clone)] +pub struct UnaggregatedReceipts { + pub value: u128, + /// The ID of the last receipt value added to the unaggregated fees value. + /// This is used to make sure we don't process the same receipt twice. Relies on the fact that + /// the receipts IDs are SERIAL in the database. + pub last_id: u64, +}