Skip to content

Commit

Permalink
fix: support sender signers
Browse files Browse the repository at this point in the history
Signed-off-by: Alexis Asseman <[email protected]>
  • Loading branch information
aasseman committed Jan 12, 2024
1 parent 505fbb1 commit a7da3f9
Show file tree
Hide file tree
Showing 14 changed files with 541 additions and 178 deletions.
179 changes: 143 additions & 36 deletions common/src/escrow_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,48 @@ use tracing::{error, warn};

use crate::prelude::{Query, SubgraphClient};

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct EscrowAccounts {
pub balances: HashMap<Address, U256>,
pub signers_to_senders: HashMap<Address, Address>,
pub senders_to_signers: HashMap<Address, Vec<Address>>,
}

impl EscrowAccounts {
pub fn new(
balances: HashMap<Address, U256>,
senders_to_signers: HashMap<Address, Vec<Address>>,
) -> Self {
let signers_to_senders = senders_to_signers
.iter()
.flat_map(|(sender, signers)| {
signers
.iter()
.map(move |signer| (*signer, *sender))
.collect::<Vec<_>>()
})
.collect();

Self {
balances,
signers_to_senders,
senders_to_signers,
}
}
}

pub fn escrow_accounts(
escrow_subgraph: &'static SubgraphClient,
indexer_address: Address,
interval: Duration,
) -> Eventual<HashMap<Address, U256>> {
reject_thawing_signers: bool,
) -> Eventual<EscrowAccounts> {
// Types for deserializing the network subgraph response
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct EscrowAccountsResponse {
escrow_accounts: Vec<EscrowAccount>,
}
// These 2 structs are used to deserialize the response from the escrow subgraph.
// Note that U256's serde implementation is based on serializing the internal bytes, not the string decimal
// representation. This is why we deserialize them as strings below.
#[derive(Deserialize)]
Expand All @@ -38,50 +68,109 @@ pub fn escrow_accounts(
#[serde(rename_all = "camelCase")]
struct Sender {
id: Address,
authorized_signers: Vec<AuthorizedSigner>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct AuthorizedSigner {
id: Address,
}

// thawEndTimestamp == 0 means that the signer is not thawing. This also means
// that we don't wait for the thawing period to end before stopping serving
// queries for this signer.
// isAuthorized == true means that the signer is still authorized to sign
// payments in the name of the sender.
let query_no_thawing_signers = r#"
query ($indexer: ID!) {
escrowAccounts(where: {receiver_: {id: $indexer}}) {
balance
totalAmountThawing
sender {
id
authorizedSigners(
where: {thawEndTimestamp: "0", isAuthorized: true}
) {
id
}
}
}
}
"#;

let query_with_thawing_signers = r#"
query ($indexer: ID!) {
escrowAccounts(where: {receiver_: {id: $indexer}}) {
balance
totalAmountThawing
sender {
id
authorizedSigners(
where: {isAuthorized: true}
) {
id
}
}
}
}
"#;

let query = if reject_thawing_signers {
query_no_thawing_signers
} else {
query_with_thawing_signers
};

timer(interval).map_with_retry(
move |_| async move {
let response = escrow_subgraph
.query::<EscrowAccountsResponse>(Query::new_with_variables(
r#"
query ($indexer: ID!) {
escrowAccounts(where: {receiver_: {id: $indexer}}) {
balance
totalAmountThawing
sender {
id
}
}
}
"#,
query,
[("indexer", format!("{:x?}", indexer_address).into())],
))
.await
.map_err(|e| e.to_string())?;

response.map_err(|e| e.to_string()).and_then(|data| {
data.escrow_accounts
.iter()
.map(|account| {
let balance = U256::checked_sub(
U256::from_dec_str(&account.balance)?,
U256::from_dec_str(&account.total_amount_thawing)?,
)
.unwrap_or_else(|| {
warn!(
"Balance minus total amount thawing underflowed for account {}. \
let response = response.map_err(|e| e.to_string())?;

let balances = response
.escrow_accounts
.iter()
.map(|account| {
let balance = U256::checked_sub(
U256::from_dec_str(&account.balance)?,
U256::from_dec_str(&account.total_amount_thawing)?,
)
.unwrap_or_else(|| {
warn!(
"Balance minus total amount thawing underflowed for account {}. \
Setting balance to 0, no queries will be served for this sender.",
account.sender.id
);
U256::from(0)
});

Ok((account.sender.id, balance))
})
.collect::<Result<HashMap<_, _>, anyhow::Error>>()
.map_err(|e| format!("{}", e))
})
account.sender.id
);
U256::from(0)
});

Ok((account.sender.id, balance))
})
.collect::<Result<HashMap<_, _>, anyhow::Error>>()
.map_err(|e| format!("{}", e))?;

let senders_to_signers = response
.escrow_accounts
.iter()
.map(|account| {
let sender = account.sender.id;
let signers = account
.sender
.authorized_signers
.iter()
.map(|signer| signer.id)
.collect();
(sender, signers)
})
.collect();

Ok(EscrowAccounts::new(balances, senders_to_signers))
},
move |err: String| {
error!(
Expand All @@ -96,6 +185,7 @@ pub fn escrow_accounts(

#[cfg(test)]
mod tests {
use test_log::test;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

Expand All @@ -104,7 +194,20 @@ mod tests {

use super::*;

#[tokio::test]
#[test]
fn test_new_escrow_accounts() {
let escrow_accounts = EscrowAccounts::new(
test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(),
test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),
);

assert_eq!(
escrow_accounts.signers_to_senders,
test_vectors::ESCROW_ACCOUNTS_SIGNERS_TO_SENDERS.to_owned()
)
}

#[test(tokio::test)]
async fn test_current_accounts() {
// Set up a mock escrow subgraph
let mock_server = MockServer::start().await;
Expand Down Expand Up @@ -134,11 +237,15 @@ mod tests {
escrow_subgraph,
*test_vectors::INDEXER_ADDRESS,
Duration::from_secs(60),
true,
);

assert_eq!(
accounts.value().await.unwrap(),
*test_vectors::ESCROW_ACCOUNTS
EscrowAccounts::new(
test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(),
test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),
)
);
}
}
1 change: 1 addition & 0 deletions common/src/indexer_service/http/indexer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ impl IndexerService {
escrow_subgraph,
options.config.indexer.indexer_address,
Duration::from_secs(options.config.escrow_subgraph.syncing_interval),
true, // Reject thawing signers eagerly
);

// Establish Database connection necessary for serving indexer management
Expand Down
34 changes: 23 additions & 11 deletions common/src/tap_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use std::{collections::HashMap, str::FromStr, sync::Arc};
use tap_core::tap_manager::SignedReceipt;
use tracing::error;

use crate::prelude::Allocation;
use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation};

#[derive(Clone)]
pub struct TapManager {
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
escrow_accounts: Eventual<HashMap<Address, U256>>,
escrow_accounts: Eventual<EscrowAccounts>,
pgpool: PgPool,
domain_separator: Arc<Eip712Domain>,
}
Expand All @@ -25,7 +25,7 @@ impl TapManager {
pub fn new(
pgpool: PgPool,
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
escrow_accounts: Eventual<HashMap<Address, U256>>,
escrow_accounts: Eventual<EscrowAccounts>,
domain_separator: Eip712Domain,
) -> Self {
Self {
Expand Down Expand Up @@ -67,11 +67,21 @@ impl TapManager {
anyhow!(e)
})?;

if !self
.escrow_accounts
.value_immediate()
.unwrap_or_default()
let escrow_accounts = self.escrow_accounts.value_immediate().unwrap_or_default();

let receipt_sender = escrow_accounts
.signers_to_senders
.get(&receipt_signer)
.ok_or_else(|| {
anyhow!(
"Receipt signer `{}` is not eligible for this indexer",
receipt_signer
)
})?;

if !escrow_accounts
.balances
.get(receipt_sender)
.map_or(false, |balance| balance > &U256::zero())
{
return Err(anyhow!(
Expand All @@ -83,7 +93,7 @@ impl TapManager {
// TODO: consider doing this in another async task to avoid slowing down the paid query flow.
sqlx::query!(
r#"
INSERT INTO scalar_tap_receipts (allocation_id, sender_address, timestamp_ns, value, receipt)
INSERT INTO scalar_tap_receipts (allocation_id, signer_address, timestamp_ns, value, receipt)
VALUES ($1, $2, $3, $4, $5)
"#,
format!("{:?}", allocation_id)
Expand Down Expand Up @@ -117,7 +127,7 @@ mod test {
use keccak_hash::H256;
use sqlx::postgres::PgListener;

use crate::test_vectors::{self, create_signed_receipt, TAP_SENDER};
use crate::test_vectors::{self, create_signed_receipt};

use super::*;

Expand Down Expand Up @@ -159,8 +169,10 @@ mod test {
));

// Mock escrow accounts
let escrow_accounts =
Eventual::from_value(HashMap::from_iter(vec![(TAP_SENDER.1, U256::from(123))]));
let escrow_accounts = Eventual::from_value(EscrowAccounts::new(
test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(),
test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),
));

let tap_manager = TapManager::new(
pgpool.clone(),
Expand Down
Loading

0 comments on commit a7da3f9

Please sign in to comment.