diff --git a/.sqlx/query-7a48651e528b87c6a618534806e70c2c494f3ba0774097652df984248869c20d.json b/.sqlx/query-7ff23083cf2afaf2c6b9693736053136baee1406f48f61db2b7bc504825a6fc3.json similarity index 66% rename from .sqlx/query-7a48651e528b87c6a618534806e70c2c494f3ba0774097652df984248869c20d.json rename to .sqlx/query-7ff23083cf2afaf2c6b9693736053136baee1406f48f61db2b7bc504825a6fc3.json index 3bfe8040..f50105ee 100644 --- a/.sqlx/query-7a48651e528b87c6a618534806e70c2c494f3ba0774097652df984248869c20d.json +++ b/.sqlx/query-7ff23083cf2afaf2c6b9693736053136baee1406f48f61db2b7bc504825a6fc3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE scalar_tap_ravs\n SET final = true\n WHERE allocation_id = $1 AND sender_address = $2\n RETURNING *\n ", + "query": "\n UPDATE scalar_tap_ravs\n SET final = true\n WHERE allocation_id = $1 AND sender_address = $2\n RETURNING *\n ", "describe": { "columns": [ { @@ -37,5 +37,5 @@ false ] }, - "hash": "7a48651e528b87c6a618534806e70c2c494f3ba0774097652df984248869c20d" + "hash": "7ff23083cf2afaf2c6b9693736053136baee1406f48f61db2b7bc504825a6fc3" } diff --git a/tap-agent/src/tap/sender_allocation_relationship.rs b/tap-agent/src/tap/sender_allocation_relationship.rs index 64db68a2..232dcff9 100644 --- a/tap-agent/src/tap/sender_allocation_relationship.rs +++ b/tap-agent/src/tap/sender_allocation_relationship.rs @@ -5,7 +5,7 @@ use std::{str::FromStr, sync::Arc, time::Duration}; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; -use anyhow::{anyhow, ensure}; +use anyhow::{anyhow, ensure, Result}; use eventuals::Eventual; use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; @@ -18,10 +18,7 @@ use tap_core::{ tap_manager::RAVRequest, tap_receipt::{ReceiptCheck, ReceivedReceipt}, }; -use tokio::{ - sync::{Mutex, MutexGuard}, - task::JoinHandle, -}; +use tokio::sync::Mutex; use tracing::{error, warn}; use super::sender_allocation_relationships_manager::NewReceiptNotification; @@ -79,7 +76,8 @@ struct Inner { /// - Requesting the last RAV from the sender's TAP aggregator (on SenderAllocationRelationship EOL) pub struct SenderAllocationRelationship { inner: Arc, - rav_requester_task: Arc>>>, + rav_requester_task: tokio::task::JoinHandle<()>, + rav_requester_notify: Arc, } impl SenderAllocationRelationship { @@ -131,19 +129,29 @@ impl SenderAllocationRelationship { required_checks, 0, ); + + let inner = Arc::new(Inner { + pgpool, + tap_manager, + allocation_id, + sender, + sender_aggregator_endpoint, + unaggregated_fees: Arc::new(Mutex::new(UnaggregatedFees::default())), + state: Arc::new(Mutex::new(State::Running)), + config, + escrow_accounts, + }); + + let rav_requester_notify = Arc::new(tokio::sync::Notify::new()); + let rav_requester_task = tokio::spawn(Self::rav_requester( + inner.clone(), + rav_requester_notify.clone(), + )); + Self { - inner: Arc::new(Inner { - pgpool, - tap_manager, - allocation_id, - sender, - sender_aggregator_endpoint, - unaggregated_fees: Arc::new(Mutex::new(UnaggregatedFees::default())), - state: Arc::new(Mutex::new(State::Running)), - config, - escrow_accounts, - }), - rav_requester_task: Arc::new(Mutex::new(None)), + inner, + rav_requester_task, + rav_requester_notify, } } @@ -184,33 +192,27 @@ impl SenderAllocationRelationship { }); unaggregated_fees.last_id = new_receipt_notification.id; - let mut rav_requester_task = self.rav_requester_task.lock().await; // TODO: consider making the trigger per sender, instead of per (sender, allocation). - if unaggregated_fees.value >= self.inner.config.tap.rav_request_trigger_value.into() - && !Self::rav_requester_task_is_running(&rav_requester_task) - { - *rav_requester_task = Some(tokio::spawn(Self::rav_requester(self.inner.clone()))); + if unaggregated_fees.value >= self.inner.config.tap.rav_request_trigger_value.into() { + self.rav_requester_notify.notify_waiters(); } } } pub async fn start_last_rav_request(&self) { *(self.inner.state.lock().await) = State::LastRavPending; - let mut rav_requester_task = self.rav_requester_task.lock().await; - if !Self::rav_requester_task_is_running(&rav_requester_task) { - *rav_requester_task = Some(tokio::spawn(Self::rav_requester(self.inner.clone()))); - } + self.rav_requester_notify.notify_one(); } /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager /// with the latest unaggregated fees from the database. - pub async fn update_unaggregated_fees(&self) -> Result<(), anyhow::Error> { + pub async fn update_unaggregated_fees(&self) -> Result<()> { Self::update_unaggregated_fees_static(&self.inner).await } /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager /// with the latest unaggregated fees from the database. - async fn update_unaggregated_fees_static(inner: &Inner) -> Result<(), anyhow::Error> { + async fn update_unaggregated_fees_static(inner: &Inner) -> Result<()> { inner.tap_manager.remove_obsolete_receipts().await?; let signers = signers_trimmed(&inner.escrow_accounts, inner.sender).await?; @@ -279,107 +281,25 @@ impl SenderAllocationRelationship { /// Request a RAV from the sender's TAP aggregator. /// Will remove the aggregated receipts from the database if successful. - async fn rav_requester(inner: Arc) { - Self::rav_requester_try(&inner).await.unwrap_or_else(|e| { - error!( - "Error while requesting a RAV for allocation {} and sender {}: {:?}", - inner.allocation_id, inner.sender, e - ); - }); - } - - async fn rav_requester_try(inner: &Arc) -> anyhow::Result<()> { + async fn rav_requester(inner: Arc, notifications: Arc) { loop { - let RAVRequest { - valid_receipts, - previous_rav, - invalid_receipts, - expected_rav, - } = inner - .tap_manager - .create_rav_request( - inner.config.tap.rav_request_timestamp_buffer_ms * 1_000_000, - // TODO: limit the number of receipts to aggregate per request. - None, - ) - .await?; + // Wait for a RAV request notification. + notifications.notified().await; - // Invalid receipts - if !invalid_receipts.is_empty() { - warn!( - "Found {} invalid receipts for allocation {} and sender {}.", - invalid_receipts.len(), - inner.allocation_id, - inner.sender + Self::rav_requester_try(&inner).await.unwrap_or_else(|e| { + error!( + "Error while requesting a RAV for allocation {} and sender {}: {:?}", + inner.allocation_id, inner.sender, e ); + }); + } + } - // Save invalid receipts to the database for logs. - // TODO: consider doing that in a spawned task? - Self::store_invalid_receipts(inner, &invalid_receipts).await?; - } - - // TODO: Request compression and response decompression. Also a fancy user agent? - let client = HttpClientBuilder::default() - .request_timeout(Duration::from_secs( - inner.config.tap.rav_request_timeout_secs, - )) - .build(&inner.sender_aggregator_endpoint)?; - - let response: JsonRpcResponse> = client - .request( - "aggregate_receipts", - rpc_params!( - "0.0", // TODO: Set the version in a smarter place. - valid_receipts, - previous_rav - ), - ) - .await?; - - if let Some(warnings) = response.warnings { - warn!("Warnings from sender's TAP aggregator: {:?}", warnings); - } - - match inner - .tap_manager - .verify_and_store_rav(expected_rav.clone(), response.data.clone()) - .await - { - Ok(_) => {} - - // Adapter errors are local software errors. Shouldn't be a problem with the sender. - Err(tap_core::Error::AdapterError { source_error: e }) => { - anyhow::bail!("TAP Adapter error while storing RAV: {:?}", e) - } - - // The 3 errors below signal an invalid RAV, which should be about problems with the - // sender. The sender could be malicious. - Err( - e @ tap_core::Error::InvalidReceivedRAV { - expected_rav: _, - received_rav: _, - } - | e @ tap_core::Error::SignatureError(_) - | e @ tap_core::Error::InvalidRecoveredSigner { address: _ }, - ) => { - Self::store_failed_rav(inner, &expected_rav, &response.data, &e.to_string()) - .await?; - anyhow::bail!("Invalid RAV, sender could be malicious: {:?}.", e); - } - - // All relevant errors should be handled above. If we get here, we forgot to handle - // an error case. - Err(e) => { - anyhow::bail!("Error while verifying and storing RAV: {:?}", e); - } - } - - // This is not the fastest way to do this, but it's the easiest. - // Note: we rely on the unaggregated_fees lock to make sure we don't miss any receipt - // notifications during this. - // TODO: If needed, faster alternative? - Self::update_unaggregated_fees_static(inner).await?; + async fn rav_requester_try(inner: &Arc) -> Result<()> { + loop { + Self::rav_requester_single(inner).await?; + // Check if we need to request another RAV immediately. let unaggregated_fees = inner.unaggregated_fees.lock().await; if unaggregated_fees.value < inner.config.tap.rav_request_trigger_value.into() { break; @@ -398,56 +318,127 @@ impl SenderAllocationRelationship { let mut state = inner.state.lock().await; if *state == State::LastRavPending { - // Mark the last RAV as last in the DB as a cue for the indexer-agent. - let updated_rows = sqlx::query!( - r#" - UPDATE scalar_tap_ravs - SET final = true - WHERE allocation_id = $1 AND sender_address = $2 - RETURNING * - "#, - inner - .allocation_id - .to_string() - .trim_start_matches("0x") - .to_owned(), - inner.sender.to_string().trim_start_matches("0x").to_owned(), - ) - .fetch_all(&inner.pgpool) - .await?; - - // Make sure exactly one row was updated. - if updated_rows.len() != 1 { - anyhow::bail!( - "Expected exactly one row to be updated in the latest RAVs table, \ - but {} were updated.", - updated_rows.len() - ); - } + // Mark the last RAV as final in the DB as a cue for the indexer-agent. + Self::mark_rav_final(inner).await?; *state = State::Finished; }; anyhow::Ok(()) } - fn rav_requester_task_is_running( - rav_requester_task_lock: &MutexGuard<'_, Option>>, - ) -> bool { - if let Some(handle) = rav_requester_task_lock.as_ref() { - !handle.is_finished() - } else { - false + async fn rav_requester_single(inner: &Arc) -> Result<()> { + let RAVRequest { + valid_receipts, + previous_rav, + invalid_receipts, + expected_rav, + } = inner + .tap_manager + .create_rav_request( + inner.config.tap.rav_request_timestamp_buffer_ms * 1_000_000, + // TODO: limit the number of receipts to aggregate per request. + None, + ) + .await?; + if !invalid_receipts.is_empty() { + warn!( + "Found {} invalid receipts for allocation {} and sender {}.", + invalid_receipts.len(), + inner.allocation_id, + inner.sender + ); + + // Save invalid receipts to the database for logs. + // TODO: consider doing that in a spawned task? + Self::store_invalid_receipts(inner, &invalid_receipts).await?; } + let client = HttpClientBuilder::default() + .request_timeout(Duration::from_secs( + inner.config.tap.rav_request_timeout_secs, + )) + .build(&inner.sender_aggregator_endpoint)?; + let response: JsonRpcResponse> = client + .request( + "aggregate_receipts", + rpc_params!( + "0.0", // TODO: Set the version in a smarter place. + valid_receipts, + previous_rav + ), + ) + .await?; + if let Some(warnings) = response.warnings { + warn!("Warnings from sender's TAP aggregator: {:?}", warnings); + } + match inner + .tap_manager + .verify_and_store_rav(expected_rav.clone(), response.data.clone()) + .await + { + Ok(_) => {} + + // Adapter errors are local software errors. Shouldn't be a problem with the sender. + Err(tap_core::Error::AdapterError { source_error: e }) => { + anyhow::bail!("TAP Adapter error while storing RAV: {:?}", e) + } + + // The 3 errors below signal an invalid RAV, which should be about problems with the + // sender. The sender could be malicious. + Err( + e @ tap_core::Error::InvalidReceivedRAV { + expected_rav: _, + received_rav: _, + } + | e @ tap_core::Error::SignatureError(_) + | e @ tap_core::Error::InvalidRecoveredSigner { address: _ }, + ) => { + Self::store_failed_rav(inner, &expected_rav, &response.data, &e.to_string()) + .await?; + anyhow::bail!("Invalid RAV, sender could be malicious: {:?}.", e); + } + + // All relevant errors should be handled above. If we get here, we forgot to handle + // an error case. + Err(e) => { + anyhow::bail!("Error while verifying and storing RAV: {:?}", e); + } + } + Self::update_unaggregated_fees_static(inner).await?; + Ok(()) + } + + async fn mark_rav_final(inner: &Arc) -> Result<()> { + let updated_rows = sqlx::query!( + r#" + UPDATE scalar_tap_ravs + SET final = true + WHERE allocation_id = $1 AND sender_address = $2 + RETURNING * + "#, + inner + .allocation_id + .to_string() + .trim_start_matches("0x") + .to_owned(), + inner.sender.to_string().trim_start_matches("0x").to_owned(), + ) + .fetch_all(&inner.pgpool) + .await?; + if updated_rows.len() != 1 { + anyhow::bail!( + "Expected exactly one row to be updated in the latest RAVs table, \ + but {} were updated.", + updated_rows.len() + ); + }; + Ok(()) } pub async fn state(&self) -> State { *self.inner.state.lock().await } - async fn store_invalid_receipts( - inner: &Inner, - receipts: &[ReceivedReceipt], - ) -> anyhow::Result<()> { + async fn store_invalid_receipts(inner: &Inner, receipts: &[ReceivedReceipt]) -> Result<()> { for received_receipt in receipts.iter() { sqlx::query!( r#" @@ -483,7 +474,7 @@ impl SenderAllocationRelationship { expected_rav: &ReceiptAggregateVoucher, rav: &EIP712SignedMessage, reason: &str, - ) -> anyhow::Result<()> { + ) -> Result<()> { sqlx::query!( r#" INSERT INTO scalar_tap_rav_requests_failed ( @@ -517,14 +508,7 @@ impl Drop for SenderAllocationRelationship { /// Trying to make sure the RAV requester task is dropped when the SenderAllocationRelationship /// is dropped. fn drop(&mut self) { - let rav_requester_task = self.rav_requester_task.clone(); - - tokio::spawn(async move { - let mut rav_requester_task = rav_requester_task.lock().await; - if let Some(rav_requester_task) = rav_requester_task.take() { - rav_requester_task.abort(); - } - }); + self.rav_requester_task.abort(); } } @@ -917,12 +901,17 @@ mod tests { } // Wait for the RAV requester to finish. - while SenderAllocationRelationship::rav_requester_task_is_running( - &sender_allocation_relatioship - .rav_requester_task + for _ in 0..100 { + if sender_allocation_relatioship + .inner + .unaggregated_fees .lock() - .await, - ) { + .await + .value + < trigger_value + { + break; + } tokio::time::sleep(std::time::Duration::from_millis(100)).await; } @@ -998,12 +987,17 @@ mod tests { } // Wait for the RAV requester to finish. - while SenderAllocationRelationship::rav_requester_task_is_running( - &sender_allocation_relatioship - .rav_requester_task + for _ in 0..100 { + if sender_allocation_relatioship + .inner + .unaggregated_fees .lock() - .await, - ) { + .await + .value + < trigger_value + { + break; + } tokio::time::sleep(std::time::Duration::from_millis(100)).await; }