Skip to content

Commit

Permalink
feat: store failed receipts and RAVs
Browse files Browse the repository at this point in the history
Signed-off-by: Alexis Asseman <[email protected]>
  • Loading branch information
aasseman committed Jan 9, 2024
1 parent 6c08697 commit 53ead40
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 10 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions migrations/20230912220523_tap_receipts.down.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ DROP TRIGGER IF EXISTS receipt_update ON scalar_tap_receipts CASCADE;
DROP FUNCTION IF EXISTS scalar_tap_receipt_notify() CASCADE;

DROP TABLE IF EXISTS scalar_tap_receipts CASCADE;
DROP TABLE IF EXISTS scalar_tap_receipts_invalid CASCADE;
11 changes: 11 additions & 0 deletions migrations/20230912220523_tap_receipts.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,14 @@ CREATE TRIGGER receipt_update AFTER INSERT OR UPDATE

CREATE INDEX IF NOT EXISTS scalar_tap_receipts_allocation_id_idx ON scalar_tap_receipts (allocation_id);
CREATE INDEX IF NOT EXISTS scalar_tap_receipts_timestamp_ns_idx ON scalar_tap_receipts (timestamp_ns);

-- This table is used to store invalid receipts (receipts that fail at least one of the checks in the tap-agent).
-- Used for logging and debugging purposes.
CREATE TABLE IF NOT EXISTS scalar_tap_receipts_invalid (
id BIGSERIAL PRIMARY KEY,
allocation_id CHAR(40) NOT NULL,
sender_address CHAR(40) NOT NULL,
timestamp_ns NUMERIC(20) NOT NULL,
value NUMERIC(39) NOT NULL,
received_receipt JSON NOT NULL
);
1 change: 1 addition & 0 deletions migrations/20230915230734_tap_ravs.down.sql
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DROP TABLE IF EXISTS scalar_tap_ravs CASCADE;
DROP TABLE IF EXISTS scalar_tap_rav_requests_failed CASCADE;
11 changes: 11 additions & 0 deletions migrations/20230915230734_tap_ravs.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,14 @@ CREATE TABLE IF NOT EXISTS scalar_tap_ravs (
final BOOLEAN DEFAULT FALSE NOT NULL,
PRIMARY KEY (allocation_id, sender_address)
);

-- This table is used to store failed RAV requests.
-- Used for logging and debugging purposes.
CREATE TABLE IF NOT EXISTS scalar_tap_rav_requests_failed (
id BIGSERIAL PRIMARY KEY,
allocation_id CHAR(40) NOT NULL,
sender_address CHAR(40) NOT NULL,
expected_rav JSON NOT NULL,
rav_response JSON NOT NULL,
reason TEXT NOT NULL
);
130 changes: 120 additions & 10 deletions tap-agent/src/tap/sender_allocation_relationship.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};

use alloy_primitives::Address;
use alloy_sol_types::Eip712Domain;
use anyhow::ensure;
use anyhow::{anyhow, ensure};
use ethereum_types::U256;
use eventuals::Eventual;
use indexer_common::prelude::SubgraphClient;
Expand All @@ -14,8 +14,9 @@ use sqlx::{types::BigDecimal, PgPool};
use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse;
use tap_core::{
eip_712_signed_message::EIP712SignedMessage,
receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_manager::RAVRequest,
tap_receipt::ReceiptCheck,
receipt_aggregate_voucher::ReceiptAggregateVoucher,
tap_manager::RAVRequest,
tap_receipt::{ReceiptCheck, ReceivedReceipt},
};
use tokio::{
sync::{Mutex, MutexGuard},
Expand Down Expand Up @@ -286,7 +287,7 @@ impl SenderAllocationRelationship {
let RAVRequest {
valid_receipts,
previous_rav,
invalid_receipts: _,
invalid_receipts,
expected_rav,
} = inner
.tap_manager
Expand All @@ -297,6 +298,20 @@ impl SenderAllocationRelationship {
)
.await?;

// Invalid receipts
if !invalid_receipts.is_empty() {
warn!(
"Found {} invalid receipts for allocation {} and sender {}.",
invalid_receipts.len(),
inner.allocation_id,
inner.sender
);

// Save invalid receipts to the database for logs.
// TODO: consider doing that in a spawned task?
Self::store_invalid_receipts(inner, &invalid_receipts).await?;
}

// TODO: Request compression and response decompression. Also a fancy user agent?
let client = HttpClientBuilder::default()
.request_timeout(Duration::from_secs(
Expand All @@ -319,12 +334,39 @@ impl SenderAllocationRelationship {
warn!("Warnings from sender's TAP aggregator: {:?}", warnings);
}

inner
match inner
.tap_manager
.verify_and_store_rav(expected_rav, response.data)
.await?;

// TODO: Handle invalid receipts
.verify_and_store_rav(expected_rav.clone(), response.data.clone())
.await
{
Ok(_) => {}

// Adapter errors are local software errors. Shouldn't be a problem with the sender.
Err(tap_core::Error::AdapterError { source_error: e }) => {
anyhow::bail!("TAP Adapter error while storing RAV: {:?}", e)
}

// The 3 errors below signal an invalid RAV, which should be about problems with the
// sender. The sender could be malicious.
Err(
e @ tap_core::Error::InvalidReceivedRAV {
expected_rav: _,
received_rav: _,
}
| e @ tap_core::Error::SignatureError(_)
| e @ tap_core::Error::InvalidRecoveredSigner { address: _ },
) => {
Self::store_failed_rav(inner, &expected_rav, &response.data, &e.to_string())
.await?;
anyhow::bail!("Invalid RAV, sender could be malicious: {:?}.", e);
}

// All relevant errors should be handled above. If we get here, we forgot to handle
// an error case.
Err(e) => {
anyhow::bail!("Error while verifying and storing RAV: {:?}", e);
}
}

// This is not the fastest way to do this, but it's the easiest.
// Note: we rely on the unaggregated_fees lock to make sure we don't miss any receipt
Expand Down Expand Up @@ -395,6 +437,74 @@ impl SenderAllocationRelationship {
pub async fn state(&self) -> State {
*self.inner.state.lock().await
}

async fn store_invalid_receipts(
inner: &Inner,
receipts: &[ReceivedReceipt],
) -> anyhow::Result<()> {
for received_receipt in receipts.iter() {
sqlx::query!(
r#"
INSERT INTO scalar_tap_receipts_invalid (
allocation_id,
sender_address,
timestamp_ns,
value,
received_receipt
)
VALUES ($1, $2, $3, $4, $5)
"#,
inner
.allocation_id
.to_string()
.trim_start_matches("0x")
.to_owned(),
inner.sender.to_string().trim_start_matches("0x").to_owned(),
BigDecimal::from(received_receipt.signed_receipt().message.timestamp_ns),
BigDecimal::from_str(&received_receipt.signed_receipt().message.value.to_string())?,
serde_json::to_value(received_receipt)?
)
.execute(&inner.pgpool)
.await
.map_err(|e| anyhow!("Failed to store failed receipt: {:?}", e))?;
}

Ok(())
}

async fn store_failed_rav(
inner: &Inner,
expected_rav: &ReceiptAggregateVoucher,
rav: &EIP712SignedMessage<ReceiptAggregateVoucher>,
reason: &str,
) -> anyhow::Result<()> {
sqlx::query!(
r#"
INSERT INTO scalar_tap_rav_requests_failed (
allocation_id,
sender_address,
expected_rav,
rav_response,
reason
)
VALUES ($1, $2, $3, $4, $5)
"#,
inner
.allocation_id
.to_string()
.trim_start_matches("0x")
.to_owned(),
inner.sender.to_string().trim_start_matches("0x").to_owned(),
serde_json::to_value(expected_rav)?,
serde_json::to_value(rav)?,
reason
)
.execute(&inner.pgpool)
.await
.map_err(|e| anyhow!("Failed to store failed RAV: {:?}", e))?;

Ok(())
}
}

impl Drop for SenderAllocationRelationship {
Expand Down

0 comments on commit 53ead40

Please sign in to comment.