diff --git a/.sqlx/query-a39cf8e5d4b670583e8a2b0af20c270c3db62668e2df5a2b7e7a2fac0ef69405.json b/.sqlx/query-089809ab6cd948db1adb1cedff47f5b1ce3c8594b3657436a6cd68d683ef8190.json similarity index 65% rename from .sqlx/query-a39cf8e5d4b670583e8a2b0af20c270c3db62668e2df5a2b7e7a2fac0ef69405.json rename to .sqlx/query-089809ab6cd948db1adb1cedff47f5b1ce3c8594b3657436a6cd68d683ef8190.json index 30e417eb6..39c2485ca 100644 --- a/.sqlx/query-a39cf8e5d4b670583e8a2b0af20c270c3db62668e2df5a2b7e7a2fac0ef69405.json +++ b/.sqlx/query-089809ab6cd948db1adb1cedff47f5b1ce3c8594b3657436a6cd68d683ef8190.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE scalar_tap_latest_ravs\n SET is_last = true\n WHERE allocation_id = $1 AND sender_address = $2\n RETURNING *\n ", + "query": "\n UPDATE scalar_tap_latest_ravs\n SET is_last = true\n WHERE allocation_id = $1 AND sender_address = $2\n RETURNING *\n ", "describe": { "columns": [ { @@ -37,5 +37,5 @@ false ] }, - "hash": "a39cf8e5d4b670583e8a2b0af20c270c3db62668e2df5a2b7e7a2fac0ef69405" + "hash": "089809ab6cd948db1adb1cedff47f5b1ce3c8594b3657436a6cd68d683ef8190" } diff --git a/.sqlx/query-0e46530ba12da19cedaebaab66bddcfb586bc049fa2d61740c400ae64d33d3b8.json b/.sqlx/query-0e46530ba12da19cedaebaab66bddcfb586bc049fa2d61740c400ae64d33d3b8.json new file mode 100644 index 000000000..ecc6face2 --- /dev/null +++ b/.sqlx/query-0e46530ba12da19cedaebaab66bddcfb586bc049fa2d61740c400ae64d33d3b8.json @@ -0,0 +1,35 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH rav AS (\n SELECT \n rav -> 'message' ->> 'timestamp_ns' AS timestamp_ns \n FROM \n scalar_tap_latest_ravs \n WHERE \n allocation_id = $1 \n AND sender_address = $2\n ) \n SELECT \n COUNT(*), \n MAX(id), \n SUM(value) \n FROM \n scalar_tap_receipts \n WHERE \n allocation_id = $1 \n AND sender_address = $2 \n AND CASE WHEN (\n SELECT \n timestamp_ns :: NUMERIC \n FROM \n rav\n ) IS NOT NULL THEN timestamp_ns > (\n SELECT \n timestamp_ns :: NUMERIC \n FROM \n rav\n ) ELSE TRUE END\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "max", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "sum", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [ + "Bpchar", + "Bpchar" + ] + }, + "nullable": [ + null, + null, + null + ] + }, + "hash": "0e46530ba12da19cedaebaab66bddcfb586bc049fa2d61740c400ae64d33d3b8" +} diff --git a/.sqlx/query-26e4dda54e43d15e3c747982c6c021befec9a5efd2f9d39078d9b45f79c1f31a.json b/.sqlx/query-26e4dda54e43d15e3c747982c6c021befec9a5efd2f9d39078d9b45f79c1f31a.json new file mode 100644 index 000000000..c27abc7a9 --- /dev/null +++ b/.sqlx/query-26e4dda54e43d15e3c747982c6c021befec9a5efd2f9d39078d9b45f79c1f31a.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO scalar_tap_latest_ravs (\n allocation_id, sender_address, rav\n )\n VALUES ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bpchar", + "Bpchar", + "Json" + ] + }, + "nullable": [] + }, + "hash": "26e4dda54e43d15e3c747982c6c021befec9a5efd2f9d39078d9b45f79c1f31a" +} diff --git a/.sqlx/query-349bb2ab33307fc52d25d74c8fe736dbfc396116aa2a93bb379029672701cef8.json b/.sqlx/query-349bb2ab33307fc52d25d74c8fe736dbfc396116aa2a93bb379029672701cef8.json deleted file mode 100644 index 0c4418172..000000000 --- a/.sqlx/query-349bb2ab33307fc52d25d74c8fe736dbfc396116aa2a93bb379029672701cef8.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT MAX(id), SUM(value)\n FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND sender_address = $2\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "max", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "sum", - "type_info": "Numeric" - } - ], - "parameters": { - "Left": [ - "Bpchar", - "Bpchar" - ] - }, - "nullable": [ - null, - null - ] - }, - "hash": "349bb2ab33307fc52d25d74c8fe736dbfc396116aa2a93bb379029672701cef8" -} diff --git a/Cargo.lock b/Cargo.lock index 009937fa9..2c584d58d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3037,6 +3037,7 @@ dependencies = [ "tower-http 0.4.4", "tracing", "tracing-subscriber", + "wiremock", ] [[package]] diff --git a/tap_agent/Cargo.toml b/tap_agent/Cargo.toml index 5561cdb68..7572b3882 100644 --- a/tap_agent/Cargo.toml +++ b/tap_agent/Cargo.toml @@ -49,3 +49,4 @@ faux = "0.1.10" indexer-common = { path = "../common", features = ["mock"] } rstest = "0.18.1" tempfile = "3.8.0" +wiremock = "0.5.19" diff --git a/tap_agent/src/config.rs b/tap_agent/src/config.rs index 984e3f0ff..7e7c11773 100644 --- a/tap_agent/src/config.rs +++ b/tap_agent/src/config.rs @@ -35,7 +35,7 @@ pub struct Cli { env = "CONFIG", help = "Indexer service configuration file (YAML format)" )] - config: Option, + pub config: Option, } #[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] diff --git a/tap_agent/src/tap/escrow_adapter.rs b/tap_agent/src/tap/escrow_adapter.rs index 66b48757f..75c4e6176 100644 --- a/tap_agent/src/tap/escrow_adapter.rs +++ b/tap_agent/src/tap/escrow_adapter.rs @@ -60,34 +60,21 @@ impl EscrowAdapterTrait for EscrowAdapter { ) .to_string(), })?; + let fees = self .sender_pending_fees .read() .await .get(&sender) .copied() - .ok_or(AdapterError::AdapterError { - error: format!( - "Gateway {} not found in pending fees map, could not get available escrow.", - sender - ) - .to_string(), - })?; + .unwrap_or(0); Ok(balance - fees) } async fn subtract_escrow(&self, sender: Address, value: u128) -> Result<(), AdapterError> { let current_available_escrow = self.get_available_escrow(sender).await?; let mut fees_write = self.sender_pending_fees.write().await; - let fees = fees_write - .get_mut(&sender) - .ok_or(AdapterError::AdapterError { - error: format!( - "Gateway {} not found in pending fees map, could not subtract available escrow.", - sender - ) - .to_string(), - })?; + let fees = fees_write.entry(sender).or_insert(0); if current_available_escrow < value { return Err(AdapterError::AdapterError { error: format!( diff --git a/tap_agent/src/tap/manager.rs b/tap_agent/src/tap/manager.rs index 740f178b8..3bd4f3c3d 100644 --- a/tap_agent/src/tap/manager.rs +++ b/tap_agent/src/tap/manager.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, sync::Arc}; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; +use anyhow::ensure; use ethereum_types::U256; use eventuals::Eventual; use indexer_common::subgraph_client::SubgraphClient; @@ -12,7 +13,7 @@ use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; use tap_core::{ eip_712_signed_message::EIP712SignedMessage, receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_manager::RAVRequest, - tap_receipt::get_full_list_of_checks, + tap_receipt::ReceiptCheck, }; use tokio::sync::Mutex; @@ -33,7 +34,7 @@ type TapManager = tap_core::tap_manager::Manager< RAVStorageAdapter, >; -#[derive(Default)] +#[derive(Default, Debug)] struct UnaggregatedFees { pub value: u128, pub last_id: u64, @@ -73,6 +74,14 @@ impl Manager { tap_eip712_domain_separator: Eip712Domain, sender_aggregator_endpoint: String, ) -> Self { + let required_checks = vec![ + ReceiptCheck::CheckUnique, + ReceiptCheck::CheckAllocationId, + ReceiptCheck::CheckTimestamp, + // ReceiptCheck::CheckValue, + ReceiptCheck::CheckSignature, + ReceiptCheck::CheckAndReserveEscrow, + ]; let escrow_adapter = EscrowAdapter::new(escrow_accounts.clone()); let receipt_checks_adapter = ReceiptChecksAdapter::new( config, @@ -84,8 +93,12 @@ impl Manager { escrow_subgraph, sender, ); - let receipt_storage_adapter = - ReceiptStorageAdapter::new(pgpool.clone(), allocation_id, sender); + let receipt_storage_adapter = ReceiptStorageAdapter::new( + pgpool.clone(), + allocation_id, + sender, + required_checks.clone(), + ); let rav_storage_adapter = RAVStorageAdapter::new(pgpool.clone(), allocation_id, sender); let tap_manager = TapManager::new( tap_eip712_domain_separator.clone(), @@ -93,7 +106,7 @@ impl Manager { receipt_checks_adapter, rav_storage_adapter, receipt_storage_adapter, - get_full_list_of_checks(), + required_checks, 0, ); Self { @@ -176,11 +189,38 @@ impl Manager { async fn update_unaggregated_fees_static(inner: &Inner) -> Result<(), anyhow::Error> { inner.tap_manager.remove_obsolete_receipts().await?; + // TODO: Get `rav.timestamp_ns` from the TAP Manager's RAV storage adapter instead? let res = sqlx::query!( r#" - SELECT MAX(id), SUM(value) - FROM scalar_tap_receipts - WHERE allocation_id = $1 AND sender_address = $2 + WITH rav AS ( + SELECT + rav -> 'message' ->> 'timestamp_ns' AS timestamp_ns + FROM + scalar_tap_latest_ravs + WHERE + allocation_id = $1 + AND sender_address = $2 + ) + SELECT + COUNT(*), + MAX(id), + SUM(value) + FROM + scalar_tap_receipts + WHERE + allocation_id = $1 + AND sender_address = $2 + AND CASE WHEN ( + SELECT + timestamp_ns :: NUMERIC + FROM + rav + ) IS NOT NULL THEN timestamp_ns > ( + SELECT + timestamp_ns :: NUMERIC + FROM + rav + ) ELSE TRUE END "#, inner .allocation_id @@ -195,149 +235,396 @@ impl Manager { .unwrap() .to_owned() ) - .fetch_optional(&inner.pgpool) + .fetch_one(&inner.pgpool) .await?; let mut unaggregated_fees = inner.unaggregated_fees.lock().await; - match res { - Some(res) => { - unaggregated_fees.last_id = res.max.unwrap_or(0).try_into()?; - unaggregated_fees.value = - res.sum.unwrap_or(0.into()).to_string().parse::()?; - } - None => { + // `COUNT(*)` will always return a value, so we don't need to check for `None`. + match res.count.unwrap() { + 0 => { unaggregated_fees.last_id = 0; unaggregated_fees.value = 0; } + // If the count is non-zero, then `MAX(id)` and `SUM(value)` will be non-null. + // If they are null, then something is extremely wrong with the database. + _ => { + ensure!( + res.max.is_some(), + "MAX(id) is null but the receipt COUNT(*) is not zero" + ); + ensure!( + res.sum.is_some(), + "SUM(value) is null, but the receipt COUNT(*) is not zero" + ); + + unaggregated_fees.last_id = res.max.unwrap().try_into()?; + unaggregated_fees.value = res.sum.unwrap().to_string().parse::()?; + } } + // TODO: check if we need to run a RAV request here. + Ok(()) } /// Request a RAV from the sender's TAP aggregator. /// Will remove the aggregated receipts from the database if successful. async fn rav_requester(inner: Arc) { - // Wrapping everything in a makeshift "try" block - (async { - loop { - // TODO: limit the number of receipts to aggregate per request. - let RAVRequest { - valid_receipts, - previous_rav, - invalid_receipts: _, - expected_rav, - } = inner - .tap_manager - .create_rav_request(inner.config.tap.rav_request_timestamp_buffer_ns) - .await?; - - // TODO: Request compression and response decompression. Also a fancy user agent? - let client = - HttpClientBuilder::default().build(&inner.sender_aggregator_endpoint)?; - - // TODO: Add a timeout. - let response: JsonRpcResponse> = - client - .request( - "aggregate_receipts", - rpc_params!( - "0.0", // TODO: Set the version in a smarter place. - valid_receipts, - previous_rav - ), - ) - .await?; - - if let Some(warnings) = response.warnings { - warn!("Warnings from sender's TAP aggregator: {:?}", warnings); - } - - inner - .tap_manager - .verify_and_store_rav(expected_rav, response.data) - .await?; - - // TODO: Handle invalid receipts - - // This is not the fastest way to do this, but it's the easiest. - // Note: we rely on the unaggregated_fees lock to make sure we don't miss any receipt - // notifications during this. - // TODO: If needed, faster alternative? - Self::update_unaggregated_fees_static(&inner).await?; - - let unaggregated_fees = inner.unaggregated_fees.lock().await; - if unaggregated_fees.value < inner.config.tap.rav_request_trigger_value.into() { - break; - } else { - // Go right back to requesting a RAV and warn the user. - warn!( - "Unaggregated fees for allocation {} and sender {} are {} right \ - after the RAV request. This is a sign that the TAP agent can't keep \ - up with the rate of new receipts. Consider increasing the \ - `rav_request_trigger_value` in the TAP agent config. It could also be \ - a sign that the sender's TAP aggregator is too slow.", - inner.allocation_id, inner.sender, unaggregated_fees.value - ); - } - } - - let mut state = inner.state.lock().await; - if *state == State::LastRavPending { - // Mark the last RAV as last in the DB as a cue for the indexer-agent. - let updated_rows = sqlx::query!( - r#" - UPDATE scalar_tap_latest_ravs - SET is_last = true - WHERE allocation_id = $1 AND sender_address = $2 - RETURNING * - "#, - inner - .allocation_id - .to_string() - .strip_prefix("0x") - .unwrap() - .to_owned(), - inner - .sender - .to_string() - .strip_prefix("0x") - .unwrap() - .to_owned(), - ) - .fetch_all(&inner.pgpool) - .await?; - - // Make sure exactly one row was updated. - if updated_rows.len() != 1 { - anyhow::bail!( - "Expected exactly one row to be updated in the latest RAVs table, \ - but {} were updated.", - updated_rows.len() - ); - } - - *state = State::Finished; - }; - anyhow::Ok(()) - }) - .await - .unwrap_or_else(|e| { + Self::rav_requester_try(&inner).await.unwrap_or_else(|e| { error!( "Error while requesting a RAV for allocation {} and sender {}: {:?}", inner.allocation_id, inner.sender, e ); }); } + + async fn rav_requester_try(inner: &Arc) -> anyhow::Result<()> { + loop { + // TODO: limit the number of receipts to aggregate per request. + let RAVRequest { + valid_receipts, + previous_rav, + invalid_receipts: _, + expected_rav, + } = inner + .tap_manager + .create_rav_request(inner.config.tap.rav_request_timestamp_buffer_ns) + .await?; + + // TODO: Request compression and response decompression. Also a fancy user agent? + let client = HttpClientBuilder::default().build(&inner.sender_aggregator_endpoint)?; + + // TODO: Add a timeout. + let response: JsonRpcResponse> = client + .request( + "aggregate_receipts", + rpc_params!( + "0.0", // TODO: Set the version in a smarter place. + valid_receipts, + previous_rav + ), + ) + .await?; + + if let Some(warnings) = response.warnings { + warn!("Warnings from sender's TAP aggregator: {:?}", warnings); + } + + inner + .tap_manager + .verify_and_store_rav(expected_rav, response.data) + .await?; + + // TODO: Handle invalid receipts + + // This is not the fastest way to do this, but it's the easiest. + // Note: we rely on the unaggregated_fees lock to make sure we don't miss any receipt + // notifications during this. + // TODO: If needed, faster alternative? + Self::update_unaggregated_fees_static(inner).await?; + + let unaggregated_fees = inner.unaggregated_fees.lock().await; + if unaggregated_fees.value < inner.config.tap.rav_request_trigger_value.into() { + break; + } else { + // Go right back to requesting a RAV and warn the user. + warn!( + "Unaggregated fees for allocation {} and sender {} are {} right \ + after the RAV request. This is a sign that the TAP agent can't keep \ + up with the rate of new receipts. Consider increasing the \ + `rav_request_trigger_value` in the TAP agent config. It could also be \ + a sign that the sender's TAP aggregator is too slow.", + inner.allocation_id, inner.sender, unaggregated_fees.value + ); + } + } + + let mut state = inner.state.lock().await; + if *state == State::LastRavPending { + // Mark the last RAV as last in the DB as a cue for the indexer-agent. + let updated_rows = sqlx::query!( + r#" + UPDATE scalar_tap_latest_ravs + SET is_last = true + WHERE allocation_id = $1 AND sender_address = $2 + RETURNING * + "#, + inner + .allocation_id + .to_string() + .strip_prefix("0x") + .unwrap() + .to_owned(), + inner + .sender + .to_string() + .strip_prefix("0x") + .unwrap() + .to_owned(), + ) + .fetch_all(&inner.pgpool) + .await?; + + // Make sure exactly one row was updated. + if updated_rows.len() != 1 { + anyhow::bail!( + "Expected exactly one row to be updated in the latest RAVs table, \ + but {} were updated.", + updated_rows.len() + ); + } + + *state = State::Finished; + }; + anyhow::Ok(()) + } } -// Destructor -impl Drop for Manager { - fn drop(&mut self) { - // Cancel the rav_requester task. - let mut rav_requester_task = self.rav_requester_task.blocking_lock(); - if let Some(rav_requester_task) = rav_requester_task.take() { - rav_requester_task.abort(); +#[cfg(test)] +mod tests { + + use serde_json::json; + use tap_aggregator::server::run_server; + use wiremock::{ + matchers::{body_string_contains, method, path}, + Mock, MockServer, ResponseTemplate, + }; + + use super::*; + use crate::tap::test_utils::{ + create_rav, create_received_receipt, domain, store_rav, store_receipt, ALLOCATION_ID, + INDEXER, SENDER, + }; + + const DUMMY_URL: &str = "http://localhost:1234"; + + async fn create_manager( + pgpool: PgPool, + sender_aggregator_endpoint: String, + escrow_subgraph_endpoint: &str, + ) -> Manager { + let config = Box::leak(Box::new(config::Cli { + config: None, + ethereum: config::Ethereum { + indexer_address: INDEXER.1, + ..Default::default() + }, + tap: config::Tap { + rav_request_trigger_value: 100, + rav_request_timestamp_buffer_ns: 1000, + ..Default::default() + }, + ..Default::default() + })); + + let escrow_subgraph = Box::leak(Box::new( + SubgraphClient::new(None, None, escrow_subgraph_endpoint).unwrap(), + )); + + let (mut escrow_accounts_writer, escrow_accounts_eventual) = + Eventual::>::new(); + escrow_accounts_writer.write(HashMap::from([(SENDER.1, 1000.into())])); + + Manager::new( + config, + *ALLOCATION_ID, + SENDER.1, + escrow_accounts_eventual, + escrow_subgraph, + pgpool.clone(), + domain(), + sender_aggregator_endpoint, + ) + } + + /// Test that the manager correctly updates the unaggregated fees from the database when there + /// is no RAV in the database. + /// + /// The manager should consider all receipts found for the allocation and sender. + #[sqlx::test(migrations = "../migrations")] + async fn test_update_unaggregated_fees_no_rav(pgpool: PgPool) { + let manager = create_manager(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await; + + // Add receipts to the database. + for i in 1..10 { + let receipt = + create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + } + + // Let the manager update the unaggregated fees from the database. + manager.update_unaggregated_fees().await.unwrap(); + + // Check that the unaggregated fees are correct. + assert_eq!(manager.inner.unaggregated_fees.lock().await.value, 45u128); + } + + /// Test that the manager correctly updates the unaggregated fees from the database when there + /// is a RAV in the database as well as receipts which timestamp are lesser and greater than + /// the RAV's timestamp. + /// + /// The manager should only consider receipts with a timestamp greater than the RAV's timestamp. + #[sqlx::test(migrations = "../migrations")] + async fn test_update_unaggregated_fees_with_rav(pgpool: PgPool) { + let manager = create_manager(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await; + + // Add the RAV to the database. + // This RAV has timestamp 4. The Manager should only consider receipts with a timestamp + // greater than 4. + let signed_rav = create_rav(*ALLOCATION_ID, SENDER.0.clone(), 4, 10).await; + store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap(); + + // Add receipts to the database. + for i in 1..10 { + let receipt = + create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + } + + // Let the manager update the unaggregated fees from the database. + manager.update_unaggregated_fees().await.unwrap(); + + // Check that the unaggregated fees are correct. + assert_eq!(manager.inner.unaggregated_fees.lock().await.value, 35u128); + } + + /// Test that the manager correctly ignores new receipt notifications with an ID lower than + /// the last receipt ID processed (be it from the DB or from a prior receipt notification). + #[sqlx::test(migrations = "../migrations")] + async fn test_handle_new_receipt_notification(pgpool: PgPool) { + let manager = create_manager(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await; + + // Add receipts to the database. + let mut expected_unaggregated_fees = 0u128; + for i in 10..20 { + let receipt = + create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + expected_unaggregated_fees += u128::from(i); + } + + manager.update_unaggregated_fees().await.unwrap(); + + // Check that the unaggregated fees are correct. + assert_eq!( + manager.inner.unaggregated_fees.lock().await.value, + expected_unaggregated_fees + ); + + // Send a new receipt notification that has a lower ID than the last loaded from the DB. + // The last ID in the DB should be 10, since we added 10 receipts to the empty receipts table, + let new_receipt_notification = NewReceiptNotification { + allocation_id: *ALLOCATION_ID, + sender_address: SENDER.1, + id: 10, + timestamp_ns: 19, + value: 19, + }; + manager + .handle_new_receipt_notification(new_receipt_notification) + .await; + + // Check that the unaggregated fees have *not* increased. + assert_eq!( + manager.inner.unaggregated_fees.lock().await.value, + expected_unaggregated_fees + ); + + // Send a new receipt notification. + let new_receipt_notification = NewReceiptNotification { + allocation_id: *ALLOCATION_ID, + sender_address: SENDER.1, + id: 30, + timestamp_ns: 20, + value: 20, + }; + manager + .handle_new_receipt_notification(new_receipt_notification) + .await; + expected_unaggregated_fees += 20; + + // Check that the unaggregated fees are correct. + assert_eq!( + manager.inner.unaggregated_fees.lock().await.value, + expected_unaggregated_fees + ); + + // Send a new receipt notification that has a lower ID than the previous one. + let new_receipt_notification = NewReceiptNotification { + allocation_id: *ALLOCATION_ID, + sender_address: SENDER.1, + id: 25, + timestamp_ns: 19, + value: 19, + }; + manager + .handle_new_receipt_notification(new_receipt_notification) + .await; + + // Check that the unaggregated fees have *not* increased. + assert_eq!( + manager.inner.unaggregated_fees.lock().await.value, + expected_unaggregated_fees + ); + } + + /// Test the RAV requester + #[sqlx::test(migrations = "../migrations")] + async fn test_rav_requester(pgpool: PgPool) { + // Start a TAP aggregator server. + let (handle, aggregator_endpoint) = + run_server(0, SENDER.0.clone(), domain(), 100 * 1024, 100 * 1024, 1) + .await + .unwrap(); + + // Start a mock graphql server using wiremock + let mock_server = MockServer::start().await; + + // Mock result for TAP redeem txs for (allocation, sender) pair. + mock_server + .register( + Mock::given(method("POST")) + .and(path("/escrow-subgraph")) + .and(body_string_contains("transactions")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(json!({ "data": { "transactions": []}})), + ), + ) + .await; + + // Create a manager. + let manager = create_manager( + pgpool.clone(), + "http://".to_owned() + &aggregator_endpoint.to_string(), + (mock_server.uri() + "/escrow-subgraph").as_str(), + ) + .await; + + // Add receipts to the database. + for i in 0..10 { + let receipt = + create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i + 1, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); } + + // Let the manager update the unaggregated fees from the database. + manager.update_unaggregated_fees().await.unwrap(); + + // Trigger a RAV request manually. + Manager::rav_requester_try(&manager.inner).await.unwrap(); + + // Stop the TAP aggregator server. + handle.stop().unwrap(); + handle.stopped().await; } } diff --git a/tap_agent/src/tap/mod.rs b/tap_agent/src/tap/mod.rs index 5ffc68903..d6b20c7d9 100644 --- a/tap_agent/src/tap/mod.rs +++ b/tap_agent/src/tap/mod.rs @@ -6,5 +6,6 @@ pub mod managers; mod rav_storage_adapter; mod receipt_checks_adapter; mod receipt_storage_adapter; + #[cfg(test)] pub mod test_utils; diff --git a/tap_agent/src/tap/receipt_storage_adapter.rs b/tap_agent/src/tap/receipt_storage_adapter.rs index ed266b029..7f7c15390 100644 --- a/tap_agent/src/tap/receipt_storage_adapter.rs +++ b/tap_agent/src/tap/receipt_storage_adapter.rs @@ -5,11 +5,12 @@ use std::{ use alloy_primitives::Address; use async_trait::async_trait; + use sqlx::{postgres::types::PgRange, types::BigDecimal, PgPool}; -use tap_core::tap_receipt::ReceivedReceipt; +use tap_core::adapters::receipt_storage_adapter::ReceiptStorageAdapter as ReceiptStorageAdapterTrait; use tap_core::{ - adapters::receipt_storage_adapter::ReceiptStorageAdapter as ReceiptStorageAdapterTrait, - tap_manager::SignedReceipt, tap_receipt::get_full_list_of_checks, + tap_manager::SignedReceipt, + tap_receipt::{ReceiptCheck, ReceivedReceipt}, }; use thiserror::Error; @@ -18,6 +19,7 @@ pub struct ReceiptStorageAdapter { pgpool: PgPool, allocation_id: Address, sender: Address, + required_checks: Vec, } #[derive(Debug, Error)] @@ -134,7 +136,7 @@ impl ReceiptStorageAdapterTrait for ReceiptStorageAdapter { let id: u64 = record.id.try_into()?; let signed_receipt: SignedReceipt = serde_json::from_value(record.receipt)?; let received_receipt = - ReceivedReceipt::new(signed_receipt, id, &get_full_list_of_checks()); + ReceivedReceipt::new(signed_receipt, id, &self.required_checks); Ok((id, received_receipt)) }) .collect() @@ -193,11 +195,17 @@ impl ReceiptStorageAdapterTrait for ReceiptStorageAdapter { } impl ReceiptStorageAdapter { - pub fn new(pgpool: PgPool, allocation_id: Address, sender: Address) -> Self { + pub fn new( + pgpool: PgPool, + allocation_id: Address, + sender: Address, + required_checks: Vec, + ) -> Self { Self { pgpool, allocation_id, sender, + required_checks, } } } @@ -212,6 +220,7 @@ mod test { use anyhow::Result; use serde_json::Value; use sqlx::PgPool; + use tap_core::tap_receipt::get_full_list_of_checks; // #[sqlx::test] // async fn store_receipt(pgpool: PgPool) { @@ -399,7 +408,12 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn retrieve_receipts_in_timestamp_range(pgpool: PgPool) { - let storage_adapter = ReceiptStorageAdapter::new(pgpool.clone(), *ALLOCATION_ID, SENDER.1); + let storage_adapter = ReceiptStorageAdapter::new( + pgpool.clone(), + *ALLOCATION_ID, + SENDER.1, + get_full_list_of_checks(), + ); // Creating 10 receipts with timestamps 42 to 51 let mut received_receipt_vec = Vec::new(); @@ -532,7 +546,8 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn remove_receipts_in_timestamp_range(pgpool: PgPool) { - let storage_adapter = ReceiptStorageAdapter::new(pgpool, *ALLOCATION_ID, SENDER.1); + let storage_adapter = + ReceiptStorageAdapter::new(pgpool, *ALLOCATION_ID, SENDER.1, get_full_list_of_checks()); // Creating 10 receipts with timestamps 42 to 51 let mut received_receipt_vec = Vec::new(); diff --git a/tap_agent/src/tap/test_utils.rs b/tap_agent/src/tap/test_utils.rs index 6fef3b033..acaaa631b 100644 --- a/tap_agent/src/tap/test_utils.rs +++ b/tap_agent/src/tap/test_utils.rs @@ -19,6 +19,7 @@ lazy_static! { Address::from_str("0xbcdebcdebcdebcdebcdebcdebcdebcdebcdebcde").unwrap(); pub static ref SENDER: (LocalWallet, Address) = wallet(0); pub static ref SENDER_IRRELEVANT: (LocalWallet, Address) = wallet(1); + pub static ref INDEXER: (LocalWallet, Address) = wallet(2); } /// Fixture to generate a wallet and address @@ -121,3 +122,27 @@ pub async fn store_receipt(pgpool: &PgPool, signed_receipt: SignedReceipt) -> Re let id: u64 = record.id.try_into()?; Ok(id) } + +pub async fn store_rav(pgpool: &PgPool, signed_rav: SignedRAV, sender: Address) -> Result<()> { + sqlx::query!( + r#" + INSERT INTO scalar_tap_latest_ravs ( + allocation_id, sender_address, rav + ) + VALUES ($1, $2, $3) + "#, + signed_rav + .message + .allocation_id + .to_string() + .strip_prefix("0x") + .unwrap() + .to_owned(), + sender.to_string().strip_prefix("0x").unwrap().to_owned(), + serde_json::to_value(signed_rav).unwrap(), + ) + .execute(pgpool) + .await?; + + Ok(()) +}