Skip to content

Commit

Permalink
feat(indexer-service): Sender denylist
Browse files Browse the repository at this point in the history
indexer-service will reject all paid queries from denied senders.
Denylist DB table updates are watched through PG notifications.

Signed-off-by: Alexis Asseman <[email protected]>
  • Loading branch information
aasseman committed Jan 9, 2024
1 parent 6c08697 commit a2e4621
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 18 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion common/src/indexer_service/http/indexer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ impl IndexerService {
chain_id: options.config.scalar.chain_id,
verifying_contract: options.config.scalar.receipts_verifier_address,
},
);
)
.await;

let state = Arc::new(IndexerServiceState {
config: options.config.clone(),
Expand Down
237 changes: 220 additions & 17 deletions common/src/tap_manager.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,67 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use crate::prelude::Allocation;
use alloy_primitives::Address;
use alloy_sol_types::Eip712Domain;
use anyhow::anyhow;
use ethers_core::types::U256;
use eventuals::Eventual;
use sqlx::postgres::PgListener;
use sqlx::{types::BigDecimal, PgPool};
use std::collections::HashSet;
use std::{collections::HashMap, str::FromStr, sync::Arc};
use tap_core::tap_manager::SignedReceipt;
use tokio::sync::RwLock;
use tracing::error;

use crate::prelude::Allocation;

#[derive(Clone)]
pub struct TapManager {
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
escrow_accounts: Eventual<HashMap<Address, U256>>,
pgpool: PgPool,
domain_separator: Arc<Eip712Domain>,
sender_denylist: Arc<RwLock<HashSet<Address>>>,
_sender_denylist_watcher_handle: Arc<tokio::task::JoinHandle<()>>,
}

impl TapManager {
pub fn new(
pub async fn new(
pgpool: PgPool,
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
escrow_accounts: Eventual<HashMap<Address, U256>>,
domain_separator: Eip712Domain,
) -> Self {
// Listen to pg_notify events. We start it before updating the sender_denylist so that we
// don't miss any updates. PG will buffer the notifications until we start consuming them.
let mut pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap();
pglistener
.listen("scalar_tap_deny_notification")
.await
.expect(
"should be able to subscribe to Postgres Notify events on the channel \
'scalar_tap_deny_notification'",
);

// Fetch the denylist from the DB
let sender_denylist = Arc::new(RwLock::new(HashSet::new()));
Self::sender_denylist_reload(pgpool.clone(), sender_denylist.clone())
.await
.expect("should be able to fetch the sender_denylist from the DB on startup");

let sender_denylist_watcher_handle = Arc::new(tokio::spawn(Self::sender_denylist_watcher(
pgpool.clone(),
pglistener,
sender_denylist.clone(),
)));

Self {
indexer_allocations,
escrow_accounts,
pgpool,
domain_separator: Arc::new(domain_separator),
sender_denylist,
_sender_denylist_watcher_handle: sender_denylist_watcher_handle,
}
}

Expand Down Expand Up @@ -67,6 +96,14 @@ impl TapManager {
anyhow!(e)
})?;

// Check that the sender is not denylisted
if self.sender_denylist.read().await.contains(&receipt_signer) {
anyhow::bail!(
"Received a receipt from a denylisted sender: {}",
receipt_signer
);
}

if !self
.escrow_accounts
.value_immediate()
Expand Down Expand Up @@ -106,6 +143,84 @@ impl TapManager {

Ok(())
}

async fn sender_denylist_reload(
pgpool: PgPool,
denylist_rwlock: Arc<RwLock<HashSet<Address>>>,
) -> anyhow::Result<()> {
// Fetch the denylist from the DB
let sender_denylist = sqlx::query!(
r#"
SELECT sender_address FROM scalar_tap_denylist
"#
)
.fetch_all(&pgpool)
.await?
.iter()
.map(|row| Address::from_str(&row.sender_address))
.collect::<Result<HashSet<_>, _>>()?;

*(denylist_rwlock.write().await) = sender_denylist;

Ok(())
}

async fn sender_denylist_watcher(
pgpool: PgPool,
mut pglistener: PgListener,
denylist: Arc<RwLock<HashSet<Address>>>,
) {
#[derive(serde::Deserialize)]
struct DenylistNotification {
tg_op: String,
sender_address: Address,
}

loop {
let pg_notification = pglistener.recv().await.expect(
"should be able to receive Postgres Notify events on the channel \
'scalar_tap_deny_notification'",
);

println!(
"Received a denylist table notification: {}",
pg_notification.payload()
);

let denylist_notification: DenylistNotification =
serde_json::from_str(pg_notification.payload()).expect(
"should be able to deserialize the Postgres Notify event payload as a \
DenylistNotification",
);

match denylist_notification.tg_op.as_str() {
"INSERT" => {
denylist
.write()
.await
.insert(denylist_notification.sender_address);
}
"DELETE" => {
denylist
.write()
.await
.remove(&denylist_notification.sender_address);
}
// UPDATE and TRUNCATE are not expected to happen. Reload the entire denylist.
_ => {
error!(
"Received an unexpected denylist table notification: {}. Reloading entire \
denylist.",
denylist_notification.tg_op
);

Self::sender_denylist_reload(pgpool.clone(), denylist.clone())
.await
.expect("should be able to reload the sender denylist")
}
}
}
}
}

#[cfg(test)]
Expand All @@ -121,19 +236,10 @@ mod test {

use super::*;

#[sqlx::test(migrations = "../migrations")]
async fn test_verify_and_store_receipt(pgpool: PgPool) {
// Listen to pg_notify events
let mut listener = PgListener::connect_with(&pgpool).await.unwrap();
listener
.listen("scalar_tap_receipt_notification")
.await
.unwrap();
const ALLOCATION_ID: &str = "0xdeadbeefcafebabedeadbeefcafebabedeadbeef";

let allocation_id =
Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap();
let signed_receipt =
create_signed_receipt(allocation_id, u64::MAX, u64::MAX, u128::MAX).await;
async fn new_tap_manager(pgpool: PgPool) -> TapManager {
let allocation_id = Address::from_str(ALLOCATION_ID).unwrap();

// Mock allocation
let allocation = Allocation {
Expand Down Expand Up @@ -162,12 +268,29 @@ mod test {
let escrow_accounts =
Eventual::from_value(HashMap::from_iter(vec![(TAP_SENDER.1, U256::from(123))]));

let tap_manager = TapManager::new(
TapManager::new(
pgpool.clone(),
indexer_allocations,
escrow_accounts,
test_vectors::TAP_EIP712_DOMAIN.to_owned(),
);
)
.await
}

#[sqlx::test(migrations = "../migrations")]
async fn test_verify_and_store_receipt(pgpool: PgPool) {
// Listen to pg_notify events
let mut listener = PgListener::connect_with(&pgpool).await.unwrap();
listener
.listen("scalar_tap_receipt_notification")
.await
.unwrap();

let allocation_id = Address::from_str(ALLOCATION_ID).unwrap();
let signed_receipt =
create_signed_receipt(allocation_id, u64::MAX, u64::MAX, u128::MAX).await;

let tap_manager = new_tap_manager(pgpool.clone()).await;

tap_manager
.verify_and_store_receipt(signed_receipt.clone())
Expand Down Expand Up @@ -195,4 +318,84 @@ mod test {
assert_eq!(notification_payload["timestamp_ns"], u64::MAX);
assert!(notification_payload["id"].is_u64());
}

#[sqlx::test(migrations = "../migrations")]
async fn test_sender_denylist(pgpool: PgPool) {
// Add the sender to the denylist
sqlx::query!(
r#"
INSERT INTO scalar_tap_denylist (sender_address)
VALUES ($1)
"#,
TAP_SENDER.1.to_string().trim_start_matches("0x").to_owned()
)
.execute(&pgpool)
.await
.unwrap();

let allocation_id = Address::from_str(ALLOCATION_ID).unwrap();
let signed_receipt =
create_signed_receipt(allocation_id, u64::MAX, u64::MAX, u128::MAX).await;

let tap_manager = new_tap_manager(pgpool.clone()).await;

// Check that the receipt is rejected
assert!(tap_manager
.verify_and_store_receipt(signed_receipt.clone())
.await
.is_err());
}

#[sqlx::test(migrations = "../migrations")]
async fn test_sender_denylist_updates(pgpool: PgPool) {
let allocation_id = Address::from_str(ALLOCATION_ID).unwrap();
let signed_receipt =
create_signed_receipt(allocation_id, u64::MAX, u64::MAX, u128::MAX).await;

let tap_manager = new_tap_manager(pgpool.clone()).await;

// Check that the receipt is valid
tap_manager
.verify_and_store_receipt(signed_receipt.clone())
.await
.unwrap();

// Add the sender to the denylist
sqlx::query!(
r#"
INSERT INTO scalar_tap_denylist (sender_address)
VALUES ($1)
"#,
TAP_SENDER.1.to_string().trim_start_matches("0x").to_owned()
)
.execute(&pgpool)
.await
.unwrap();

// Check that the receipt is rejected
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(tap_manager
.verify_and_store_receipt(signed_receipt.clone())
.await
.is_err());

// Remove the sender from the denylist
sqlx::query!(
r#"
DELETE FROM scalar_tap_denylist
WHERE sender_address = $1
"#,
TAP_SENDER.1.to_string().trim_start_matches("0x").to_owned()
)
.execute(&pgpool)
.await
.unwrap();

// Check that the receipt is valid again
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tap_manager
.verify_and_store_receipt(signed_receipt.clone())
.await
.unwrap();
}
}
5 changes: 5 additions & 0 deletions migrations/20231118024433_tap_denylist.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP TRIGGER IF EXISTS deny_update ON scalar_tap_deny CASCADE;

DROP FUNCTION IF EXISTS scalar_tap_deny_notify() CASCADE;

DROP TABLE IF EXISTS scalar_tap_denylist CASCADE;
Loading

0 comments on commit a2e4621

Please sign in to comment.