diff --git a/.sqlx/query-d7e47b7dd017573de0e46f6f86377f92c832fb3ae514c27a3293cb899e5e246d.json b/.sqlx/query-349bb2ab33307fc52d25d74c8fe736dbfc396116aa2a93bb379029672701cef8.json similarity index 60% rename from .sqlx/query-d7e47b7dd017573de0e46f6f86377f92c832fb3ae514c27a3293cb899e5e246d.json rename to .sqlx/query-349bb2ab33307fc52d25d74c8fe736dbfc396116aa2a93bb379029672701cef8.json index 6041e7596..0c4418172 100644 --- a/.sqlx/query-d7e47b7dd017573de0e46f6f86377f92c832fb3ae514c27a3293cb899e5e246d.json +++ b/.sqlx/query-349bb2ab33307fc52d25d74c8fe736dbfc396116aa2a93bb379029672701cef8.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT MAX(id), SUM(value)\n FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND sender_address = $2\n ", + "query": "\n SELECT MAX(id), SUM(value)\n FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND sender_address = $2\n ", "describe": { "columns": [ { @@ -25,5 +25,5 @@ null ] }, - "hash": "d7e47b7dd017573de0e46f6f86377f92c832fb3ae514c27a3293cb899e5e246d" + "hash": "349bb2ab33307fc52d25d74c8fe736dbfc396116aa2a93bb379029672701cef8" } diff --git a/.sqlx/query-a39cf8e5d4b670583e8a2b0af20c270c3db62668e2df5a2b7e7a2fac0ef69405.json b/.sqlx/query-a39cf8e5d4b670583e8a2b0af20c270c3db62668e2df5a2b7e7a2fac0ef69405.json new file mode 100644 index 000000000..30e417eb6 --- /dev/null +++ b/.sqlx/query-a39cf8e5d4b670583e8a2b0af20c270c3db62668e2df5a2b7e7a2fac0ef69405.json @@ -0,0 +1,41 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE scalar_tap_latest_ravs\n SET is_last = true\n WHERE allocation_id = $1 AND sender_address = $2\n RETURNING *\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "allocation_id", + "type_info": "Bpchar" + }, + { + "ordinal": 1, + "name": "sender_address", + "type_info": "Bpchar" + }, + { + "ordinal": 2, + "name": "rav", + "type_info": "Json" + }, + { + "ordinal": 3, + "name": "is_last", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Bpchar", + "Bpchar" + ] + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "a39cf8e5d4b670583e8a2b0af20c270c3db62668e2df5a2b7e7a2fac0ef69405" +} diff --git a/Cargo.lock b/Cargo.lock index 3584e144c..009937fa9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3025,9 +3025,11 @@ dependencies = [ "rstest 0.18.2", "serde", "serde_json", + "serde_yaml", "sqlx", "tap_aggregator", "tap_core 0.6.0 (git+https://github.com/semiotic-ai/timeline-aggregation-protocol.git?rev=882ca394444b451538908b9996bf7d45869a1bb9)", + "tempfile", "thiserror", "tokio", "toolshed", @@ -5271,6 +5273,19 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "serde_yaml" +version = "0.9.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a49e178e4452f45cb61d0cd8cebc1b0fafd3e41929e996cef79aa3aca91f574" +dependencies = [ + "indexmap 2.0.2", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "service" version = "0.1.0" @@ -6567,6 +6582,12 @@ dependencies = [ "void", ] +[[package]] +name = "unsafe-libyaml" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28467d3e1d3c6586d8f25fa243f544f5800fec42d97032474e17222c2b75cfa" + [[package]] name = "untrusted" version = "0.7.1" diff --git a/migrations/20230915230734_tap_ravs.up.sql b/migrations/20230915230734_tap_ravs.up.sql index 0e142d303..9e4fc11ce 100644 --- a/migrations/20230915230734_tap_ravs.up.sql +++ b/migrations/20230915230734_tap_ravs.up.sql @@ -2,5 +2,6 @@ CREATE TABLE IF NOT EXISTS scalar_tap_latest_ravs ( allocation_id CHAR(40) NOT NULL, sender_address CHAR(40) NOT NULL, rav JSON NOT NULL, + is_last BOOLEAN DEFAULT FALSE NOT NULL, PRIMARY KEY (allocation_id, sender_address) ); diff --git a/tap_agent/Cargo.toml b/tap_agent/Cargo.toml index 52bf9864e..5561cdb68 100644 --- a/tap_agent/Cargo.toml +++ b/tap_agent/Cargo.toml @@ -25,6 +25,7 @@ log = "0.4.19" reqwest = "0.11.20" serde = "1.0.188" serde_json = "1.0.104" +serde_yaml = "0.9.25" sqlx = { version = "0.7.1", features = ["postgres", "runtime-tokio", "bigdecimal", "rust_decimal"] } tap_aggregator = "0.1.6" tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol.git", rev = "882ca394444b451538908b9996bf7d45869a1bb9" } @@ -47,3 +48,4 @@ ethers-signers = "2.0.8" faux = "0.1.10" indexer-common = { path = "../common", features = ["mock"] } rstest = "0.18.1" +tempfile = "3.8.0" diff --git a/tap_agent/src/agent.rs b/tap_agent/src/agent.rs index 956fae443..5e39d5a98 100644 --- a/tap_agent/src/agent.rs +++ b/tap_agent/src/agent.rs @@ -1,9 +1,10 @@ use std::time::Duration; -use crate::{config, database, tap::managers}; use alloy_primitives::Address; use indexer_common::prelude::{escrow_accounts, indexer_allocations, SubgraphClient}; +use crate::{aggregator_endpoints, config, database, tap::managers}; + pub async fn start_agent(config: &'static config::Cli) { let pgpool = database::connect(&config.postgres).await; @@ -38,6 +39,11 @@ pub async fn start_agent(config: &'static config::Cli) { Duration::from_secs(config.escrow_subgraph.escrow_syncing_interval), ); + // TODO: replace with a proper implementation once the gateway registry is ready + let sender_aggregator_endpoints = aggregator_endpoints::load_aggregator_endpoints( + config.tap.sender_aggregator_endpoints_file.clone(), + ); + let _managers = managers::TapManagers::new( config, pgpool, @@ -50,6 +56,7 @@ pub async fn start_agent(config: &'static config::Cli) { String::from("0"), // TODO: tap eip712 verifying contract config Address::ZERO, + sender_aggregator_endpoints, ) .await; } diff --git a/tap_agent/src/aggregator_endpoints.rs b/tap_agent/src/aggregator_endpoints.rs new file mode 100644 index 000000000..3f2e4e453 --- /dev/null +++ b/tap_agent/src/aggregator_endpoints.rs @@ -0,0 +1,44 @@ +/// Load a hashmap of sender addresses and their corresponding aggregator endpoints +/// from a yaml file. We're using serde_yaml. +use std::collections::HashMap; +use std::fs::File; +use std::io::BufReader; +use std::path::PathBuf; + +use alloy_primitives::Address; + +/// Load a hashmap of sender addresses and their corresponding aggregator endpoints +/// from a yaml file. We're using serde_yaml. +pub fn load_aggregator_endpoints(file_path: PathBuf) -> HashMap { + let file = File::open(file_path).unwrap(); + let reader = BufReader::new(file); + let endpoints: HashMap = serde_yaml::from_reader(reader).unwrap(); + endpoints +} + +#[cfg(test)] +mod tests { + use std::{io::Write, str::FromStr}; + + use super::*; + + /// Test that we can load the aggregator endpoints from a yaml file. + /// The test is going to create a temporary yaml file using tempfile, load it, and + /// check that the endpoints are loaded correctly. + #[test] + fn test_load_aggregator_endpoints() { + let named_temp_file = tempfile::NamedTempFile::new().unwrap(); + let mut temp_file = named_temp_file.reopen().unwrap(); + let yaml = r#" + 0xdeadbeefcafebabedeadbeefcafebabedeadbeef: https://example.com/aggregate-receipts + 0x0123456789abcdef0123456789abcdef01234567: https://other.example.com/aggregate-receipts + "#; + temp_file.write_all(yaml.as_bytes()).unwrap(); + let endpoints = load_aggregator_endpoints(named_temp_file.path().to_path_buf()); + assert_eq!( + endpoints + .get(&Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap()), + Some(&"https://example.com/aggregate-receipts".to_string()) + ); + } +} diff --git a/tap_agent/src/config.rs b/tap_agent/src/config.rs index 9c63dfd42..984e3f0ff 100644 --- a/tap_agent/src/config.rs +++ b/tap_agent/src/config.rs @@ -1,4 +1,5 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. SPDX-License-Identifier: Apache-2.0 +use std::path::PathBuf; + use alloy_primitives::Address; use clap::{command, Args, Parser, ValueEnum}; use dotenvy::dotenv; @@ -261,8 +262,8 @@ pub struct Tap { long, value_name = "rav-request-trigger-value", env = "RAV_REQUEST_TRIGGER_VALUE", - help = "Value of unaggregated fees that triggers a RAV request (in GRT).", - default_value_t = 10 + help = "Value of unaggregated fees that triggers a RAV request (in GRT wei).", + default_value_t = 10_000_000_000_000_000_000 // 10 GRT )] pub rav_request_trigger_value: u64, #[clap( @@ -271,9 +272,18 @@ pub struct Tap { env = "RAV_REQUEST_TIMESTAMP_BUFFER", help = "Buffer (in ns) to add between the current time and the timestamp of the \ last unaggregated fee when triggering a RAV request.", - default_value_t = 1000 + default_value_t = 1_000_000_000 // 1 second )] pub rav_request_timestamp_buffer_ns: u64, + + // TODO: Remove this whenever the the gateway registry is ready + #[clap( + long, + value_name = "sender-aggregator-endpoints", + env = "SENDER_AGGREGATOR_ENDPOINTS", + help = "YAML file with a map of sender addresses to aggregator endpoints." + )] + pub sender_aggregator_endpoints_file: PathBuf, } /// Sets up tracing, allows log level to be set from the environment variables diff --git a/tap_agent/src/database.rs b/tap_agent/src/database.rs index a2701843f..ef212231a 100644 --- a/tap_agent/src/database.rs +++ b/tap_agent/src/database.rs @@ -1,10 +1,11 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. SPDX-License-Identifier: -// Apache-2.0 TODO: DEDUPLICATE -use crate::config; -use sqlx::{postgres::PgPoolOptions, PgPool}; +// TODO: DEDUPLICATE use std::time::Duration; + +use sqlx::{postgres::PgPoolOptions, PgPool}; use tracing::debug; +use crate::config; + pub async fn connect(config: &config::Postgres) -> PgPool { let url = format!( "postgresql://{}:{}@{}:{}/{}", diff --git a/tap_agent/src/main.rs b/tap_agent/src/main.rs index d7a6f7cbf..db890dd65 100644 --- a/tap_agent/src/main.rs +++ b/tap_agent/src/main.rs @@ -1,11 +1,12 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. SPDX-License-Identifier: Apache-2.0 -use crate::config::Cli; use anyhow::Result; use lazy_static::lazy_static; use log::{debug, info}; use tokio::signal::unix::{signal, SignalKind}; +use crate::config::Cli; + mod agent; +mod aggregator_endpoints; mod config; mod database; mod tap; diff --git a/tap_agent/src/tap/escrow_adapter.rs b/tap_agent/src/tap/escrow_adapter.rs index 977f25d67..66b48757f 100644 --- a/tap_agent/src/tap/escrow_adapter.rs +++ b/tap_agent/src/tap/escrow_adapter.rs @@ -1,10 +1,9 @@ +use std::{collections::HashMap, sync::Arc}; + use alloy_primitives::Address; use async_trait::async_trait; use ethereum_types::U256; use eventuals::Eventual; - -/// TODO: Implement the escrow adapter. This is only a basic mock implementation. -use std::{collections::HashMap, sync::Arc}; use tap_core::adapters::escrow_adapter::EscrowAdapter as EscrowAdapterTrait; use thiserror::Error; use tokio::sync::RwLock; diff --git a/tap_agent/src/tap/manager.rs b/tap_agent/src/tap/manager.rs index d1b772995..740f178b8 100644 --- a/tap_agent/src/tap/manager.rs +++ b/tap_agent/src/tap/manager.rs @@ -1,21 +1,12 @@ use std::{collections::HashMap, sync::Arc}; -use super::managers::NewReceiptNotification; -use crate::{ - config::{self}, - tap::{ - escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, - receipt_checks_adapter::ReceiptChecksAdapter, - receipt_storage_adapter::ReceiptStorageAdapter, - }, -}; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use ethereum_types::U256; use eventuals::Eventual; use indexer_common::subgraph_client::SubgraphClient; use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; -use log::error; +use log::{error, warn}; use sqlx::PgPool; use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; use tap_core::{ @@ -23,6 +14,17 @@ use tap_core::{ receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_manager::RAVRequest, tap_receipt::get_full_list_of_checks, }; +use tokio::sync::Mutex; + +use super::managers::NewReceiptNotification; +use crate::{ + config::{self}, + tap::{ + escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, + receipt_checks_adapter::ReceiptChecksAdapter, + receipt_storage_adapter::ReceiptStorageAdapter, + }, +}; type TapManager = tap_core::tap_manager::Manager< EscrowAdapter, @@ -31,28 +33,45 @@ type TapManager = tap_core::tap_manager::Manager< RAVStorageAdapter, >; -pub struct Manager { +#[derive(Default)] +struct UnaggregatedFees { + pub value: u128, + pub last_id: u64, +} + +#[derive(Debug, PartialEq)] +enum State { + Running, + LastRavPending, + Finished, +} + +struct Inner { pgpool: PgPool, - tap_manager: Arc, + tap_manager: TapManager, allocation_id: Address, sender: Address, sender_aggregator_endpoint: String, - unaggregated_fees: u128, - unaggregated_fees_last_id: u64, - rav_requester_task: Option>, + unaggregated_fees: Arc>, + state: Arc>, config: &'static config::Cli, } +pub struct Manager { + inner: Arc, + rav_requester_task: Mutex>>, +} + impl Manager { pub fn new( config: &'static config::Cli, allocation_id: Address, sender: Address, - // sender_aggregator_endpoint: String, escrow_accounts: Eventual>, escrow_subgraph: &'static SubgraphClient, pgpool: PgPool, tap_eip712_domain_separator: Eip712Domain, + sender_aggregator_endpoint: String, ) -> Self { let escrow_adapter = EscrowAdapter::new(escrow_accounts.clone()); let receipt_checks_adapter = ReceiptChecksAdapter::new( @@ -78,28 +97,42 @@ impl Manager { 0, ); Self { - pgpool, - tap_manager: Arc::new(tap_manager), - allocation_id, - sender, - // sender_aggregator_endpoint, - sender_aggregator_endpoint: "".to_string(), - unaggregated_fees: 0, - unaggregated_fees_last_id: 0, - rav_requester_task: None, - config, + inner: Arc::new(Inner { + pgpool, + tap_manager, + allocation_id, + sender, + sender_aggregator_endpoint, + unaggregated_fees: Arc::new(Mutex::new(UnaggregatedFees::default())), + state: Arc::new(Mutex::new(State::Running)), + config, + }), + rav_requester_task: Mutex::new(None), } } - pub fn handle_new_receipt_notification( - &mut self, + pub async fn handle_new_receipt_notification( + &self, new_receipt_notification: NewReceiptNotification, ) { + // If we're in the last rav pending state, we don't want to process any new receipts. + if *self.inner.state.lock().await == State::LastRavPending { + error!( + "Received a new receipt notification for allocation {} and sender {} while \ + the last RAV is pending. This should not have happened since this allocation \ + and/or sender is not eligible anymore.", + self.inner.allocation_id, self.inner.sender + ); + return; + } + + let mut unaggregated_fees = self.inner.unaggregated_fees.lock().await; + // Else we already processed that receipt, most likely from pulling the receipts // from the database. - if new_receipt_notification.id > self.unaggregated_fees_last_id { - self.unaggregated_fees = self - .unaggregated_fees + if new_receipt_notification.id > unaggregated_fees.last_id { + unaggregated_fees.value = unaggregated_fees + .value .checked_add(new_receipt_notification.value) .unwrap_or_else(|| { // This should never happen, but if it does, we want to know about it. @@ -107,108 +140,194 @@ impl Manager { "Overflow when adding receipt value {} to total unaggregated fees {} for \ allocation {} and sender {}. Setting total unaggregated fees to u128::MAX.", new_receipt_notification.value, - self.unaggregated_fees, + unaggregated_fees.value, new_receipt_notification.allocation_id, new_receipt_notification.sender_address ); u128::MAX }); - self.unaggregated_fees_last_id = new_receipt_notification.id; + unaggregated_fees.last_id = new_receipt_notification.id; - if self.unaggregated_fees >= self.config.tap.rav_request_trigger_value.into() - && self.rav_requester_task.is_none() + let mut rav_requester_task = self.rav_requester_task.lock().await; + // TODO: consider making the trigger per sender, instead of per (sender, allocation). + if unaggregated_fees.value >= self.inner.config.tap.rav_request_trigger_value.into() + && rav_requester_task.is_none() { - let rav_requester_task = tokio::spawn(Self::rav_requester( - self.tap_manager.clone(), - self.config.tap.rav_request_timestamp_buffer_ns, - self.sender_aggregator_endpoint.clone(), - )); - self.rav_requester_task = Some(rav_requester_task); + *rav_requester_task = Some(tokio::spawn(Self::rav_requester(self.inner.clone()))); } } } + pub async fn start_last_rav_request(&self) { + let mut rav_requester_task = self.rav_requester_task.lock().await; + if rav_requester_task.is_none() { + *rav_requester_task = Some(tokio::spawn(Self::rav_requester(self.inner.clone()))); + } + } + /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager /// with the latest unaggregated fees from the database. - pub async fn update_unaggregated_fees(&mut self) -> Result<(), anyhow::Error> { - self.tap_manager.remove_obsolete_receipts().await?; + pub async fn update_unaggregated_fees(&self) -> Result<(), anyhow::Error> { + Self::update_unaggregated_fees_static(&self.inner).await + } + + /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager + /// with the latest unaggregated fees from the database. + async fn update_unaggregated_fees_static(inner: &Inner) -> Result<(), anyhow::Error> { + inner.tap_manager.remove_obsolete_receipts().await?; let res = sqlx::query!( r#" - SELECT MAX(id), SUM(value) - FROM scalar_tap_receipts - WHERE allocation_id = $1 AND sender_address = $2 - "#, - self.allocation_id + SELECT MAX(id), SUM(value) + FROM scalar_tap_receipts + WHERE allocation_id = $1 AND sender_address = $2 + "#, + inner + .allocation_id .to_string() .strip_prefix("0x") .unwrap() .to_owned(), - self.sender + inner + .sender .to_string() .strip_prefix("0x") .unwrap() .to_owned() ) - .fetch_optional(&self.pgpool) + .fetch_optional(&inner.pgpool) .await?; + let mut unaggregated_fees = inner.unaggregated_fees.lock().await; + match res { Some(res) => { - self.unaggregated_fees_last_id = res.max.unwrap_or(0).try_into()?; - self.unaggregated_fees = res.sum.unwrap_or(0.into()).to_string().parse::()?; + unaggregated_fees.last_id = res.max.unwrap_or(0).try_into()?; + unaggregated_fees.value = + res.sum.unwrap_or(0.into()).to_string().parse::()?; } None => { - self.unaggregated_fees_last_id = 0; - self.unaggregated_fees = 0; + unaggregated_fees.last_id = 0; + unaggregated_fees.value = 0; } } Ok(()) } - async fn rav_requester( - manager: Arc, - rav_request_timestamp_buffer_ns: u64, - sender_aggregator_endpoint: String, - ) { - let RAVRequest { - valid_receipts, - previous_rav, - invalid_receipts: _, - expected_rav, - } = manager - .create_rav_request(rav_request_timestamp_buffer_ns) - .await - .unwrap(); - - // TODO: Request compression and response decompression. Also a fancy user agent? - let client = HttpClientBuilder::default() - .build(sender_aggregator_endpoint) - .unwrap(); - - let response: JsonRpcResponse> = client - .request( - "aggregate_receipts", - rpc_params!( - "0.0", // TODO: Set the version in a smarter place. + /// Request a RAV from the sender's TAP aggregator. + /// Will remove the aggregated receipts from the database if successful. + async fn rav_requester(inner: Arc) { + // Wrapping everything in a makeshift "try" block + (async { + loop { + // TODO: limit the number of receipts to aggregate per request. + let RAVRequest { valid_receipts, - previous_rav - ), - ) - .await - .unwrap(); - - if let Some(warnings) = response.warnings { - for warning in warnings { - error!("Warning from sender's TAP aggregator: {:?}", warning); + previous_rav, + invalid_receipts: _, + expected_rav, + } = inner + .tap_manager + .create_rav_request(inner.config.tap.rav_request_timestamp_buffer_ns) + .await?; + + // TODO: Request compression and response decompression. Also a fancy user agent? + let client = + HttpClientBuilder::default().build(&inner.sender_aggregator_endpoint)?; + + // TODO: Add a timeout. + let response: JsonRpcResponse> = + client + .request( + "aggregate_receipts", + rpc_params!( + "0.0", // TODO: Set the version in a smarter place. + valid_receipts, + previous_rav + ), + ) + .await?; + + if let Some(warnings) = response.warnings { + warn!("Warnings from sender's TAP aggregator: {:?}", warnings); + } + + inner + .tap_manager + .verify_and_store_rav(expected_rav, response.data) + .await?; + + // TODO: Handle invalid receipts + + // This is not the fastest way to do this, but it's the easiest. + // Note: we rely on the unaggregated_fees lock to make sure we don't miss any receipt + // notifications during this. + // TODO: If needed, faster alternative? + Self::update_unaggregated_fees_static(&inner).await?; + + let unaggregated_fees = inner.unaggregated_fees.lock().await; + if unaggregated_fees.value < inner.config.tap.rav_request_trigger_value.into() { + break; + } else { + // Go right back to requesting a RAV and warn the user. + warn!( + "Unaggregated fees for allocation {} and sender {} are {} right \ + after the RAV request. This is a sign that the TAP agent can't keep \ + up with the rate of new receipts. Consider increasing the \ + `rav_request_trigger_value` in the TAP agent config. It could also be \ + a sign that the sender's TAP aggregator is too slow.", + inner.allocation_id, inner.sender, unaggregated_fees.value + ); + } } - } - manager - .verify_and_store_rav(expected_rav, response.data) - .await - .unwrap(); + let mut state = inner.state.lock().await; + if *state == State::LastRavPending { + // Mark the last RAV as last in the DB as a cue for the indexer-agent. + let updated_rows = sqlx::query!( + r#" + UPDATE scalar_tap_latest_ravs + SET is_last = true + WHERE allocation_id = $1 AND sender_address = $2 + RETURNING * + "#, + inner + .allocation_id + .to_string() + .strip_prefix("0x") + .unwrap() + .to_owned(), + inner + .sender + .to_string() + .strip_prefix("0x") + .unwrap() + .to_owned(), + ) + .fetch_all(&inner.pgpool) + .await?; + + // Make sure exactly one row was updated. + if updated_rows.len() != 1 { + anyhow::bail!( + "Expected exactly one row to be updated in the latest RAVs table, \ + but {} were updated.", + updated_rows.len() + ); + } + + *state = State::Finished; + }; + anyhow::Ok(()) + }) + .await + .unwrap_or_else(|e| { + error!( + "Error while requesting a RAV for allocation {} and sender {}: {:?}", + inner.allocation_id, inner.sender, e + ); + }); } } @@ -216,7 +335,8 @@ impl Manager { impl Drop for Manager { fn drop(&mut self) { // Cancel the rav_requester task. - if let Some(rav_requester_task) = self.rav_requester_task.take() { + let mut rav_requester_task = self.rav_requester_task.blocking_lock(); + if let Some(rav_requester_task) = rav_requester_task.take() { rav_requester_task.abort(); } } diff --git a/tap_agent/src/tap/managers.rs b/tap_agent/src/tap/managers.rs index 9e8cf875e..9efcae6fd 100644 --- a/tap_agent/src/tap/managers.rs +++ b/tap_agent/src/tap/managers.rs @@ -1,20 +1,21 @@ use crate::config; -use super::manager::Manager; +use std::{collections::HashMap, str::FromStr, sync::Arc}; + use alloy_primitives::Address; use alloy_sol_types::{eip712_domain, Eip712Domain}; use anyhow::anyhow; use anyhow::Result; use ethereum_types::U256; -use eventuals::Eventual; -use indexer_common::prelude::Allocation; -use indexer_common::subgraph_client::SubgraphClient; +use eventuals::{Eventual, EventualExt, PipeHandle}; +use indexer_common::prelude::{Allocation, SubgraphClient}; use log::{error, warn}; use serde::Deserialize; use sqlx::{postgres::PgListener, PgPool}; -use std::{collections::HashMap, str::FromStr, sync::Arc}; use tokio::sync::{Mutex, RwLock}; +use super::manager::Manager; + #[derive(Deserialize)] pub struct NewReceiptNotification { pub id: u64, @@ -25,8 +26,9 @@ pub struct NewReceiptNotification { } pub struct TapManagers { - inner: Arc, + _inner: Arc, new_receipts_watcher: Option>, + _eligible_allocations_senders_pipe: PipeHandle, } #[derive(Clone)] @@ -34,11 +36,12 @@ struct Inner { config: &'static config::Cli, pgpool: PgPool, /// Map of (allocation_id, sender_address) to Manager. - managers: Arc>>>, + managers: Arc>>, indexer_allocations: Eventual>, escrow_accounts: Eventual>, escrow_subgraph: &'static SubgraphClient, tap_eip712_domain_separator: Eip712Domain, + sender_aggregator_endpoints: HashMap, } impl TapManagers { @@ -51,6 +54,7 @@ impl TapManagers { tap_eip712_chain_id: u64, tap_eip712_version: String, tap_eip712_verifying_contract: Address, + sender_aggregator_endpoints: HashMap, ) -> Self { let tap_eip712_domain_separator = eip712_domain! { name: "TAP", @@ -66,6 +70,7 @@ impl TapManagers { escrow_accounts, escrow_subgraph, tap_eip712_domain_separator, + sender_aggregator_endpoints, }); Self::update_managers( @@ -75,6 +80,11 @@ impl TapManagers { .value() .await .expect("Should get indexer allocations from Eventual"), + inner + .escrow_accounts + .value() + .await + .expect("Should get escrow accounts from Eventual"), ) .await .expect("Should be able to update managers"); @@ -125,7 +135,7 @@ impl TapManagers { if let std::collections::hash_map::Entry::Vacant(e) = managers_write.entry((allocation_id, sender)) { - e.insert(RwLock::new(Manager::new( + e.insert(Manager::new( config, allocation_id, sender, @@ -133,7 +143,12 @@ impl TapManagers { inner.escrow_subgraph, inner.pgpool.clone(), inner.tap_eip712_domain_separator.clone(), - ))); + inner + .sender_aggregator_endpoints + .get(&sender) + .unwrap() + .clone(), + )); } }); @@ -141,8 +156,6 @@ impl TapManagers { // database. for manager in managers_write.values() { manager - .write() - .await .update_unaggregated_fees() .await .expect("should be able to update unaggregated_fees"); @@ -155,15 +168,34 @@ impl TapManagers { inner.managers.clone(), ))); + // Start the eligible_allocations_senders_pipe that watches for changes in eligible senders + // and allocations and updates the managers accordingly. + let inner_clone = inner.clone(); + let eligible_allocations_senders_pipe = eventuals::join(( + inner.indexer_allocations.clone(), + inner.escrow_accounts.clone(), + )) + .pipe_async(move |(indexer_allocations, escrow_accounts)| { + let inner = inner_clone.clone(); + async move { + Self::update_managers(&inner, indexer_allocations, escrow_accounts) + .await + .unwrap_or_else(|e| { + error!("Error while updating managers: {:?}", e); + }); + } + }); + Self { - inner, + _inner: inner, new_receipts_watcher, + _eligible_allocations_senders_pipe: eligible_allocations_senders_pipe, } } async fn new_receipts_watcher( pglistener: Arc>, - managers: Arc>>>, + managers: Arc>>, ) { let mut pglistener = pglistener.lock().await; @@ -183,17 +215,18 @@ impl TapManagers { new_receipt_notification.allocation_id, new_receipt_notification.sender_address, )) { - Some(manager) => manager - .write() - .await - .handle_new_receipt_notification(new_receipt_notification), + Some(manager) => { + manager + .handle_new_receipt_notification(new_receipt_notification) + .await + } None => warn!( "No manager found for allocation_id {} and sender_address {} to process \ new receipt notification. This should not happen.", new_receipt_notification.allocation_id, new_receipt_notification.sender_address ), - } + }; } } } @@ -201,58 +234,50 @@ impl TapManagers { async fn update_managers( inner: &Inner, indexer_allocations: HashMap, + escrow_accounts: HashMap, ) -> Result<()> { let eligible_allocations: Vec
= indexer_allocations.keys().copied().collect(); - let senders: Vec
= inner - .escrow_accounts - .value() - .await - .map_err(|e| anyhow!("Could not get escrow accounts from eventual: {:?}", e))? - .keys() - .copied() - .collect(); + let senders: Vec
= escrow_accounts.keys().copied().collect(); let mut managers_write = inner.managers.write().await; - // TODO: remove managers for allocations that are not eligible anymore. - // Create managers for all currently eligible (allocation, sender) - for allocation_id in eligible_allocations { + for allocation_id in &eligible_allocations { for sender in &senders { - managers_write.insert( - (allocation_id, *sender), - RwLock::new(Manager::new( + // Only create a manager if it doesn't exist yet. + if let std::collections::hash_map::Entry::Vacant(e) = + managers_write.entry((*allocation_id, *sender)) + { + e.insert(Manager::new( inner.config, - allocation_id, + *allocation_id, *sender, inner.escrow_accounts.clone(), inner.escrow_subgraph, inner.pgpool.clone(), inner.tap_eip712_domain_separator.clone(), - )), - ); + inner + .sender_aggregator_endpoints + .get(sender) + .ok_or_else(|| { + anyhow!("No sender_aggregator_endpoint found for sender {}", sender) + })? + .clone(), + )); + } } } - Ok(()) - } - - async fn eligible_allocations_watcher(inner: Arc) { - let mut subscription = inner.indexer_allocations.subscribe(); - - loop { - match subscription.next().await { - Ok(indexer_allocations) => { - Self::update_managers(&inner, indexer_allocations) - .await - .unwrap_or_else(|e| { - error!("Error updating managers: {:?}", e); - }); - } - Err(e) => { - error!("Could not get indexer allocations from eventual: {:?}", e); - } + // Trigger a last rav request for all managers that correspond to ineligible + // (allocations, sender). + for ((allocation_id, sender), manager) in managers_write.iter() { + if !eligible_allocations.contains(allocation_id) || !senders.contains(sender) { + manager.start_last_rav_request().await } } + + // TODO: remove managers that are finished. Ideally done in another async task? + + Ok(()) } } diff --git a/tap_agent/src/tap/receipt_checks_adapter.rs b/tap_agent/src/tap/receipt_checks_adapter.rs index 2e5f103ac..fd86c3f83 100644 --- a/tap_agent/src/tap/receipt_checks_adapter.rs +++ b/tap_agent/src/tap/receipt_checks_adapter.rs @@ -1,4 +1,5 @@ -// Copyright 2023-, Semiotic AI, Inc. SPDX-License-Identifier: Apache-2.0 +use std::{collections::HashMap, sync::Arc, time::Duration}; + use alloy_primitives::Address; use async_trait::async_trait; use ethereum_types::U256; @@ -6,8 +7,6 @@ use eventuals::{timer, Eventual, EventualExt}; use indexer_common::subgraph_client::SubgraphClient; use serde_json::json; use sqlx::PgPool; - -use std::{collections::HashMap, sync::Arc, time::Duration}; use tap_core::adapters::receipt_checks_adapter::ReceiptChecksAdapter as ReceiptChecksAdapterTrait; use tap_core::{eip_712_signed_message::EIP712SignedMessage, tap_receipt::Receipt}; use thiserror::Error; @@ -158,6 +157,7 @@ impl ReceiptChecksAdapter { ) -> Eventual { #[derive(serde::Deserialize)] struct AllocationResponse { + #[allow(dead_code)] id: String, } diff --git a/tap_agent/src/tap/receipt_storage_adapter.rs b/tap_agent/src/tap/receipt_storage_adapter.rs index 3cf8c7d3c..ed266b029 100644 --- a/tap_agent/src/tap/receipt_storage_adapter.rs +++ b/tap_agent/src/tap/receipt_storage_adapter.rs @@ -1,11 +1,11 @@ -// Copyright 2023-, Semiotic AI, Inc. SPDX-License-Identifier: Apache-2.0 -use alloy_primitives::Address; -use async_trait::async_trait; -use sqlx::{postgres::types::PgRange, types::BigDecimal, PgPool}; use std::{ num::TryFromIntError, ops::{Bound, RangeBounds}, }; + +use alloy_primitives::Address; +use async_trait::async_trait; +use sqlx::{postgres::types::PgRange, types::BigDecimal, PgPool}; use tap_core::tap_receipt::ReceivedReceipt; use tap_core::{ adapters::receipt_storage_adapter::ReceiptStorageAdapter as ReceiptStorageAdapterTrait, diff --git a/tap_agent/src/tap/test_utils.rs b/tap_agent/src/tap/test_utils.rs index 966f50a1c..6fef3b033 100644 --- a/tap_agent/src/tap/test_utils.rs +++ b/tap_agent/src/tap/test_utils.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + use alloy_primitives::Address; use alloy_sol_types::{eip712_domain, Eip712Domain}; use anyhow::Result; @@ -5,7 +7,6 @@ use ethers_signers::{coins_bip39::English, LocalWallet, MnemonicBuilder, Signer} use lazy_static::lazy_static; use serde_json; use sqlx::{types::BigDecimal, PgPool}; -use std::str::FromStr; use tap_core::receipt_aggregate_voucher::ReceiptAggregateVoucher; use tap_core::tap_manager::{SignedRAV, SignedReceipt}; use tap_core::tap_receipt::{get_full_list_of_checks, ReceivedReceipt};