diff --git a/.sqlx/query-019bf1fa6c038ad1027e2b67a949d67ae9dd5fbc3f3e0091f127c95264319e63.json b/.sqlx/query-019bf1fa6c038ad1027e2b67a949d67ae9dd5fbc3f3e0091f127c95264319e63.json new file mode 100644 index 00000000..c7feed38 --- /dev/null +++ b/.sqlx/query-019bf1fa6c038ad1027e2b67a949d67ae9dd5fbc3f3e0091f127c95264319e63.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT sender_address FROM scalar_tap_denylist\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "sender_address", + "type_info": "Bpchar" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "019bf1fa6c038ad1027e2b67a949d67ae9dd5fbc3f3e0091f127c95264319e63" +} diff --git a/.sqlx/query-48e85bc0089528df458970d0aeee72f26fad65b8e3cbb706bc17bb7140e47192.json b/.sqlx/query-48e85bc0089528df458970d0aeee72f26fad65b8e3cbb706bc17bb7140e47192.json new file mode 100644 index 00000000..77adeb10 --- /dev/null +++ b/.sqlx/query-48e85bc0089528df458970d0aeee72f26fad65b8e3cbb706bc17bb7140e47192.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO scalar_tap_denylist (sender_address)\n VALUES ($1)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bpchar" + ] + }, + "nullable": [] + }, + "hash": "48e85bc0089528df458970d0aeee72f26fad65b8e3cbb706bc17bb7140e47192" +} diff --git a/.sqlx/query-b5eeb6b9501c2f2ca883040d9f62cc59876e13468b43b3fb5860592918e647f0.json b/.sqlx/query-b5eeb6b9501c2f2ca883040d9f62cc59876e13468b43b3fb5860592918e647f0.json new file mode 100644 index 00000000..e0803947 --- /dev/null +++ b/.sqlx/query-b5eeb6b9501c2f2ca883040d9f62cc59876e13468b43b3fb5860592918e647f0.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM scalar_tap_denylist\n WHERE sender_address = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bpchar" + ] + }, + "nullable": [] + }, + "hash": "b5eeb6b9501c2f2ca883040d9f62cc59876e13468b43b3fb5860592918e647f0" +} diff --git a/Cargo.lock b/Cargo.lock index 7f2f8be9..6a06de8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3028,6 +3028,7 @@ dependencies = [ "thegraph", "thiserror", "tokio", + "tokio-util", "tower", "tower_governor", "tracing", @@ -6472,9 +6473,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.9" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ "bytes", "futures-core", diff --git a/common/Cargo.toml b/common/Cargo.toml index 79587efd..d67461ea 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -45,6 +45,7 @@ autometrics = { version = "0.6.0", features = ["prometheus-exporter"] } tracing = "0.1.40" tower = "0.4.13" tower_governor = "0.1.0" +tokio-util = "0.7.10" [dev-dependencies] env_logger = "0.9.0" diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 671a74c9..e515c8e6 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -288,7 +288,8 @@ impl IndexerService { chain_id: options.config.scalar.chain_id, verifying_contract: options.config.scalar.receipts_verifier_address, }, - ); + ) + .await; let state = Arc::new(IndexerServiceState { config: options.config.clone(), diff --git a/common/src/tap_manager.rs b/common/src/tap_manager.rs index 28da0b3c..35074555 100644 --- a/common/src/tap_manager.rs +++ b/common/src/tap_manager.rs @@ -6,9 +6,12 @@ use alloy_sol_types::Eip712Domain; use anyhow::anyhow; use ethers_core::types::U256; use eventuals::Eventual; +use sqlx::postgres::PgListener; use sqlx::{types::BigDecimal, PgPool}; +use std::collections::HashSet; use std::{collections::HashMap, str::FromStr, sync::Arc}; use tap_core::tap_manager::SignedReceipt; +use tokio::sync::RwLock; use tracing::error; use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation}; @@ -19,20 +22,51 @@ pub struct TapManager { escrow_accounts: Eventual, pgpool: PgPool, domain_separator: Arc, + sender_denylist: Arc>>, + _sender_denylist_watcher_handle: Arc>, + sender_denylist_watcher_cancel_token: tokio_util::sync::CancellationToken, } impl TapManager { - pub fn new( + pub async fn new( pgpool: PgPool, indexer_allocations: Eventual>, escrow_accounts: Eventual, domain_separator: Eip712Domain, ) -> Self { + // Listen to pg_notify events. We start it before updating the sender_denylist so that we + // don't miss any updates. PG will buffer the notifications until we start consuming them. + let mut pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap(); + pglistener + .listen("scalar_tap_deny_notification") + .await + .expect( + "should be able to subscribe to Postgres Notify events on the channel \ + 'scalar_tap_deny_notification'", + ); + + // Fetch the denylist from the DB + let sender_denylist = Arc::new(RwLock::new(HashSet::new())); + Self::sender_denylist_reload(pgpool.clone(), sender_denylist.clone()) + .await + .expect("should be able to fetch the sender_denylist from the DB on startup"); + + let sender_denylist_watcher_cancel_token = tokio_util::sync::CancellationToken::new(); + let sender_denylist_watcher_handle = Arc::new(tokio::spawn(Self::sender_denylist_watcher( + pgpool.clone(), + pglistener, + sender_denylist.clone(), + sender_denylist_watcher_cancel_token.clone(), + ))); + Self { indexer_allocations, escrow_accounts, pgpool, domain_separator: Arc::new(domain_separator), + sender_denylist, + _sender_denylist_watcher_handle: sender_denylist_watcher_handle, + sender_denylist_watcher_cancel_token, } } @@ -67,18 +101,32 @@ impl TapManager { anyhow!(e) })?; - let escrow_accounts = self.escrow_accounts.value_immediate().unwrap_or_default(); + let escrow_accounts_snapshot = self.escrow_accounts.value_immediate().unwrap_or_default(); - if !escrow_accounts - .get_balance_for_signer(&receipt_signer) + // We bail if the receipt signer does not have a corresponding sender in the escrow + // accounts. + let receipt_sender = escrow_accounts_snapshot.get_sender_for_signer(&receipt_signer)?; + + // Check that the sender has a non-zero balance -- more advanced accounting is done in + // `tap-agent`. + if !escrow_accounts_snapshot + .get_balance_for_sender(&receipt_sender) .map_or(false, |balance| balance > U256::zero()) { anyhow::bail!( - "Receipt sender `{}` is not eligible for this indexer", + "Receipt sender `{}` does not have a sufficient balance", receipt_signer, ); } + // Check that the sender is not denylisted + if self.sender_denylist.read().await.contains(&receipt_sender) { + anyhow::bail!( + "Received a receipt from a denylisted sender: {}", + receipt_signer + ); + } + // TODO: consider doing this in another async task to avoid slowing down the paid query flow. sqlx::query!( r#" @@ -105,6 +153,96 @@ impl TapManager { Ok(()) } + + async fn sender_denylist_reload( + pgpool: PgPool, + denylist_rwlock: Arc>>, + ) -> anyhow::Result<()> { + // Fetch the denylist from the DB + let sender_denylist = sqlx::query!( + r#" + SELECT sender_address FROM scalar_tap_denylist + "# + ) + .fetch_all(&pgpool) + .await? + .iter() + .map(|row| Address::from_str(&row.sender_address)) + .collect::, _>>()?; + + *(denylist_rwlock.write().await) = sender_denylist; + + Ok(()) + } + + async fn sender_denylist_watcher( + pgpool: PgPool, + mut pglistener: PgListener, + denylist: Arc>>, + cancel_token: tokio_util::sync::CancellationToken, + ) { + #[derive(serde::Deserialize)] + struct DenylistNotification { + tg_op: String, + sender_address: Address, + } + + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + break; + } + + pg_notification = pglistener.recv() => { + let pg_notification = pg_notification.expect( + "should be able to receive Postgres Notify events on the channel \ + 'scalar_tap_deny_notification'", + ); + + let denylist_notification: DenylistNotification = + serde_json::from_str(pg_notification.payload()).expect( + "should be able to deserialize the Postgres Notify event payload as a \ + DenylistNotification", + ); + + match denylist_notification.tg_op.as_str() { + "INSERT" => { + denylist + .write() + .await + .insert(denylist_notification.sender_address); + } + "DELETE" => { + denylist + .write() + .await + .remove(&denylist_notification.sender_address); + } + // UPDATE and TRUNCATE are not expected to happen. Reload the entire denylist. + _ => { + error!( + "Received an unexpected denylist table notification: {}. Reloading entire \ + denylist.", + denylist_notification.tg_op + ); + + Self::sender_denylist_reload(pgpool.clone(), denylist.clone()) + .await + .expect("should be able to reload the sender denylist") + } + } + } + } + } + } +} + +impl Drop for TapManager { + fn drop(&mut self) { + // Clean shutdown for the sender_denylist_watcher + // Though since it's not a critical task, we don't wait for it to finish (join). + self.sender_denylist_watcher_cancel_token.cancel(); + } } #[cfg(test)] @@ -116,23 +254,14 @@ mod test { use keccak_hash::H256; use sqlx::postgres::PgListener; - use crate::test_vectors::{self, create_signed_receipt}; + use crate::test_vectors::{self, create_signed_receipt, TAP_SENDER}; use super::*; - #[sqlx::test(migrations = "../migrations")] - async fn test_verify_and_store_receipt(pgpool: PgPool) { - // Listen to pg_notify events - let mut listener = PgListener::connect_with(&pgpool).await.unwrap(); - listener - .listen("scalar_tap_receipt_notification") - .await - .unwrap(); + const ALLOCATION_ID: &str = "0xdeadbeefcafebabedeadbeefcafebabedeadbeef"; - let allocation_id = - Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(); - let signed_receipt = - create_signed_receipt(allocation_id, u64::MAX, u64::MAX, u128::MAX).await; + async fn new_tap_manager(pgpool: PgPool) -> TapManager { + let allocation_id = Address::from_str(ALLOCATION_ID).unwrap(); // Mock allocation let allocation = Allocation { @@ -163,12 +292,29 @@ mod test { test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), )); - let tap_manager = TapManager::new( + TapManager::new( pgpool.clone(), indexer_allocations, escrow_accounts, test_vectors::TAP_EIP712_DOMAIN.to_owned(), - ); + ) + .await + } + + #[sqlx::test(migrations = "../migrations")] + async fn test_verify_and_store_receipt(pgpool: PgPool) { + // Listen to pg_notify events + let mut listener = PgListener::connect_with(&pgpool).await.unwrap(); + listener + .listen("scalar_tap_receipt_notification") + .await + .unwrap(); + + let allocation_id = Address::from_str(ALLOCATION_ID).unwrap(); + let signed_receipt = + create_signed_receipt(allocation_id, u64::MAX, u64::MAX, u128::MAX).await; + + let tap_manager = new_tap_manager(pgpool.clone()).await; tap_manager .verify_and_store_receipt(signed_receipt.clone()) @@ -196,4 +342,84 @@ mod test { assert_eq!(notification_payload["timestamp_ns"], u64::MAX); assert!(notification_payload["id"].is_u64()); } + + #[sqlx::test(migrations = "../migrations")] + async fn test_sender_denylist(pgpool: PgPool) { + // Add the sender to the denylist + sqlx::query!( + r#" + INSERT INTO scalar_tap_denylist (sender_address) + VALUES ($1) + "#, + TAP_SENDER.1.to_string().trim_start_matches("0x").to_owned() + ) + .execute(&pgpool) + .await + .unwrap(); + + let allocation_id = Address::from_str(ALLOCATION_ID).unwrap(); + let signed_receipt = + create_signed_receipt(allocation_id, u64::MAX, u64::MAX, u128::MAX).await; + + let tap_manager = new_tap_manager(pgpool.clone()).await; + + // Check that the receipt is rejected + assert!(tap_manager + .verify_and_store_receipt(signed_receipt.clone()) + .await + .is_err()); + } + + #[sqlx::test(migrations = "../migrations")] + async fn test_sender_denylist_updates(pgpool: PgPool) { + let allocation_id = Address::from_str(ALLOCATION_ID).unwrap(); + let signed_receipt = + create_signed_receipt(allocation_id, u64::MAX, u64::MAX, u128::MAX).await; + + let tap_manager = new_tap_manager(pgpool.clone()).await; + + // Check that the receipt is valid + tap_manager + .verify_and_store_receipt(signed_receipt.clone()) + .await + .unwrap(); + + // Add the sender to the denylist + sqlx::query!( + r#" + INSERT INTO scalar_tap_denylist (sender_address) + VALUES ($1) + "#, + TAP_SENDER.1.to_string().trim_start_matches("0x").to_owned() + ) + .execute(&pgpool) + .await + .unwrap(); + + // Check that the receipt is rejected + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + assert!(tap_manager + .verify_and_store_receipt(signed_receipt.clone()) + .await + .is_err()); + + // Remove the sender from the denylist + sqlx::query!( + r#" + DELETE FROM scalar_tap_denylist + WHERE sender_address = $1 + "#, + TAP_SENDER.1.to_string().trim_start_matches("0x").to_owned() + ) + .execute(&pgpool) + .await + .unwrap(); + + // Check that the receipt is valid again + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tap_manager + .verify_and_store_receipt(signed_receipt.clone()) + .await + .unwrap(); + } } diff --git a/migrations/20231118024433_tap_denylist.down.sql b/migrations/20231118024433_tap_denylist.down.sql new file mode 100644 index 00000000..71c8ca03 --- /dev/null +++ b/migrations/20231118024433_tap_denylist.down.sql @@ -0,0 +1,5 @@ +DROP TRIGGER IF EXISTS deny_update ON scalar_tap_deny CASCADE; + +DROP FUNCTION IF EXISTS scalar_tap_deny_notify() CASCADE; + +DROP TABLE IF EXISTS scalar_tap_denylist CASCADE; diff --git a/migrations/20231118024433_tap_denylist.up.sql b/migrations/20231118024433_tap_denylist.up.sql new file mode 100644 index 00000000..befa52db --- /dev/null +++ b/migrations/20231118024433_tap_denylist.up.sql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS scalar_tap_denylist ( + sender_address CHAR(40) PRIMARY KEY +); + +CREATE FUNCTION scalar_tap_deny_notify() +RETURNS trigger AS +$$ +BEGIN + IF TG_OP = 'DELETE' THEN + PERFORM pg_notify('scalar_tap_deny_notification', format('{"tg_op": "DELETE", "sender_address": "%s"}', OLD.sender_address)); + RETURN OLD; + ELSIF TG_OP = 'INSERT' THEN + PERFORM pg_notify('scalar_tap_deny_notification', format('{"tg_op": "INSERT", "sender_address": "%s"}', NEW.sender_address)); + RETURN NEW; + ELSE -- UPDATE OR TRUNCATE, should never happen + PERFORM pg_notify('scalar_tap_deny_notification', format('{"tg_op": "%s", "sender_address": null}', TG_OP, NEW.sender_address)); + RETURN NEW; + END IF; +END; +$$ LANGUAGE 'plpgsql'; + +CREATE TRIGGER deny_update AFTER INSERT OR UPDATE OR DELETE + ON scalar_tap_denylist + FOR EACH ROW EXECUTE PROCEDURE scalar_tap_deny_notify();