Skip to content

Commit

Permalink
refactor(tap-agent): split rav_requester_try
Browse files Browse the repository at this point in the history
Signed-off-by: Alexis Asseman <[email protected]>
  • Loading branch information
aasseman committed Jan 18, 2024
1 parent f265d36 commit 161dced
Showing 1 changed file with 116 additions and 122 deletions.
238 changes: 116 additions & 122 deletions tap-agent/src/tap/sender_allocation_relationship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{str::FromStr, sync::Arc, time::Duration};

use alloy_primitives::Address;
use alloy_sol_types::Eip712Domain;
use anyhow::{anyhow, ensure};
use anyhow::{anyhow, ensure, Result};

use eventuals::Eventual;
use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient};
Expand Down Expand Up @@ -206,13 +206,13 @@ impl SenderAllocationRelationship {

/// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager
/// with the latest unaggregated fees from the database.
pub async fn update_unaggregated_fees(&self) -> Result<(), anyhow::Error> {
pub async fn update_unaggregated_fees(&self) -> Result<()> {
Self::update_unaggregated_fees_static(&self.inner).await
}

/// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager
/// with the latest unaggregated fees from the database.
async fn update_unaggregated_fees_static(inner: &Inner) -> Result<(), anyhow::Error> {
async fn update_unaggregated_fees_static(inner: &Inner) -> Result<()> {
inner.tap_manager.remove_obsolete_receipts().await?;

let signers = signers_trimmed(&inner.escrow_accounts, inner.sender).await?;
Expand Down Expand Up @@ -295,98 +295,11 @@ impl SenderAllocationRelationship {
}
}

async fn rav_requester_try(inner: &Arc<Inner>) -> anyhow::Result<()> {
async fn rav_requester_try(inner: &Arc<Inner>) -> Result<()> {
loop {
let RAVRequest {
valid_receipts,
previous_rav,
invalid_receipts,
expected_rav,
} = inner
.tap_manager
.create_rav_request(
inner.config.tap.rav_request_timestamp_buffer_ms * 1_000_000,
// TODO: limit the number of receipts to aggregate per request.
None,
)
.await?;

// Invalid receipts
if !invalid_receipts.is_empty() {
warn!(
"Found {} invalid receipts for allocation {} and sender {}.",
invalid_receipts.len(),
inner.allocation_id,
inner.sender
);

// Save invalid receipts to the database for logs.
// TODO: consider doing that in a spawned task?
Self::store_invalid_receipts(inner, &invalid_receipts).await?;
}

// TODO: Request compression and response decompression. Also a fancy user agent?
let client = HttpClientBuilder::default()
.request_timeout(Duration::from_secs(
inner.config.tap.rav_request_timeout_secs,
))
.build(&inner.sender_aggregator_endpoint)?;

let response: JsonRpcResponse<EIP712SignedMessage<ReceiptAggregateVoucher>> = client
.request(
"aggregate_receipts",
rpc_params!(
"0.0", // TODO: Set the version in a smarter place.
valid_receipts,
previous_rav
),
)
.await?;

if let Some(warnings) = response.warnings {
warn!("Warnings from sender's TAP aggregator: {:?}", warnings);
}

match inner
.tap_manager
.verify_and_store_rav(expected_rav.clone(), response.data.clone())
.await
{
Ok(_) => {}

// Adapter errors are local software errors. Shouldn't be a problem with the sender.
Err(tap_core::Error::AdapterError { source_error: e }) => {
anyhow::bail!("TAP Adapter error while storing RAV: {:?}", e)
}

// The 3 errors below signal an invalid RAV, which should be about problems with the
// sender. The sender could be malicious.
Err(
e @ tap_core::Error::InvalidReceivedRAV {
expected_rav: _,
received_rav: _,
}
| e @ tap_core::Error::SignatureError(_)
| e @ tap_core::Error::InvalidRecoveredSigner { address: _ },
) => {
Self::store_failed_rav(inner, &expected_rav, &response.data, &e.to_string())
.await?;
anyhow::bail!("Invalid RAV, sender could be malicious: {:?}.", e);
}

// All relevant errors should be handled above. If we get here, we forgot to handle
// an error case.
Err(e) => {
anyhow::bail!("Error while verifying and storing RAV: {:?}", e);
}
}

// This is not the fastest way to do this, but it's the easiest.
// Note: we rely on the unaggregated_fees lock to make sure we don't miss any receipt
// notifications during this.
// TODO: If needed, faster alternative?
Self::update_unaggregated_fees_static(inner).await?;
Self::rav_requester_single(inner).await?;

// Check if we need to request another RAV immediately.
let unaggregated_fees = inner.unaggregated_fees.lock().await;
if unaggregated_fees.value < inner.config.tap.rav_request_trigger_value.into() {
break;
Expand All @@ -405,46 +318,127 @@ impl SenderAllocationRelationship {

let mut state = inner.state.lock().await;
if *state == State::LastRavPending {
// Mark the last RAV as last in the DB as a cue for the indexer-agent.
let updated_rows = sqlx::query!(
r#"
UPDATE scalar_tap_ravs
SET final = true
WHERE allocation_id = $1 AND sender_address = $2
RETURNING *
"#,
inner
.allocation_id
.to_string()
.trim_start_matches("0x")
.to_owned(),
inner.sender.to_string().trim_start_matches("0x").to_owned(),
// Mark the last RAV as final in the DB as a cue for the indexer-agent.
Self::mark_rav_final(inner).await?;

*state = State::Finished;
};
anyhow::Ok(())
}

async fn rav_requester_single(inner: &Arc<Inner>) -> Result<()> {
let RAVRequest {
valid_receipts,
previous_rav,
invalid_receipts,
expected_rav,
} = inner
.tap_manager
.create_rav_request(
inner.config.tap.rav_request_timestamp_buffer_ms * 1_000_000,
// TODO: limit the number of receipts to aggregate per request.
None,
)
.fetch_all(&inner.pgpool)
.await?;
if !invalid_receipts.is_empty() {
warn!(
"Found {} invalid receipts for allocation {} and sender {}.",
invalid_receipts.len(),
inner.allocation_id,
inner.sender
);

// Make sure exactly one row was updated.
if updated_rows.len() != 1 {
anyhow::bail!(
"Expected exactly one row to be updated in the latest RAVs table, \
but {} were updated.",
updated_rows.len()
);
// Save invalid receipts to the database for logs.
// TODO: consider doing that in a spawned task?
Self::store_invalid_receipts(inner, &invalid_receipts).await?;
}
let client = HttpClientBuilder::default()
.request_timeout(Duration::from_secs(
inner.config.tap.rav_request_timeout_secs,
))
.build(&inner.sender_aggregator_endpoint)?;
let response: JsonRpcResponse<EIP712SignedMessage<ReceiptAggregateVoucher>> = client
.request(
"aggregate_receipts",
rpc_params!(
"0.0", // TODO: Set the version in a smarter place.
valid_receipts,
previous_rav
),
)
.await?;
if let Some(warnings) = response.warnings {
warn!("Warnings from sender's TAP aggregator: {:?}", warnings);
}
match inner
.tap_manager
.verify_and_store_rav(expected_rav.clone(), response.data.clone())
.await
{
Ok(_) => {}

// Adapter errors are local software errors. Shouldn't be a problem with the sender.
Err(tap_core::Error::AdapterError { source_error: e }) => {
anyhow::bail!("TAP Adapter error while storing RAV: {:?}", e)
}

*state = State::Finished;
// The 3 errors below signal an invalid RAV, which should be about problems with the
// sender. The sender could be malicious.
Err(
e @ tap_core::Error::InvalidReceivedRAV {
expected_rav: _,
received_rav: _,
}
| e @ tap_core::Error::SignatureError(_)
| e @ tap_core::Error::InvalidRecoveredSigner { address: _ },
) => {
Self::store_failed_rav(inner, &expected_rav, &response.data, &e.to_string())
.await?;
anyhow::bail!("Invalid RAV, sender could be malicious: {:?}.", e);
}

// All relevant errors should be handled above. If we get here, we forgot to handle
// an error case.
Err(e) => {
anyhow::bail!("Error while verifying and storing RAV: {:?}", e);
}
}
Self::update_unaggregated_fees_static(inner).await?;
Ok(())
}

async fn mark_rav_final(inner: &Arc<Inner>) -> Result<()> {
let updated_rows = sqlx::query!(
r#"
UPDATE scalar_tap_ravs
SET final = true
WHERE allocation_id = $1 AND sender_address = $2
RETURNING *
"#,
inner
.allocation_id
.to_string()
.trim_start_matches("0x")
.to_owned(),
inner.sender.to_string().trim_start_matches("0x").to_owned(),
)
.fetch_all(&inner.pgpool)
.await?;
if updated_rows.len() != 1 {
anyhow::bail!(
"Expected exactly one row to be updated in the latest RAVs table, \
but {} were updated.",
updated_rows.len()
);
};
anyhow::Ok(())
Ok(())
}

pub async fn state(&self) -> State {
*self.inner.state.lock().await
}

async fn store_invalid_receipts(
inner: &Inner,
receipts: &[ReceivedReceipt],
) -> anyhow::Result<()> {
async fn store_invalid_receipts(inner: &Inner, receipts: &[ReceivedReceipt]) -> Result<()> {
for received_receipt in receipts.iter() {
sqlx::query!(
r#"
Expand Down Expand Up @@ -480,7 +474,7 @@ impl SenderAllocationRelationship {
expected_rav: &ReceiptAggregateVoucher,
rav: &EIP712SignedMessage<ReceiptAggregateVoucher>,
reason: &str,
) -> anyhow::Result<()> {
) -> Result<()> {
sqlx::query!(
r#"
INSERT INTO scalar_tap_rav_requests_failed (
Expand Down

0 comments on commit 161dced

Please sign in to comment.