Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port subgraph indexer-service to the indexer service framework #104

Merged
merged 20 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
204b4c3
feat: update subgraph client to latest toolshed/graphql-http
Jannis Nov 30, 2023
247b893
feat: update allocations monitor to latest subgraph client
Jannis Nov 30, 2023
76eb891
feat: update escrow accounts to latest subgraph client
Jannis Nov 30, 2023
4a5b1a0
feat: update dispute manager to latest subgraph client
Jannis Nov 30, 2023
9406235
feat: reuse reqwest client in indexer service framework
Jannis Nov 30, 2023
45c6458
chore: fix formatting in tap manager
Jannis Nov 30, 2023
7cf89b1
feat: update tap agent to latest subgraph client
Jannis Nov 30, 2023
ad40ba1
feat(service): adopt the indexer service framework
Jannis Nov 30, 2023
b3f8d40
fix(common): pass lowercase indexer address to escrow accounts query
Jannis Dec 7, 2023
89518cc
fix(common): correctly serve requests at /<namespace>/id/:id
Jannis Dec 7, 2023
46f2ddf
fix(common): handle free query auth tokens correctly
Jannis Dec 7, 2023
695a576
fix(common): use sepeparate graph network and chain IDs in config
Jannis Dec 7, 2023
9ca1a80
feat(common): improve how responses are attested to and finalized
Jannis Dec 7, 2023
2679ea3
feat(service): finish implementing the new subgraph service
Jannis Dec 7, 2023
0434d96
fix(common): Use TAP as the name in the receipt EIP-712 domain
Jannis Dec 7, 2023
e592875
chore(service): remove config structs covered by indexer framework
Jannis Dec 11, 2023
0cd8675
fix(common): drop unnecessary lifetime and reference
Jannis Dec 11, 2023
6d520f9
feat(subgraph_client): add raw query method
Jannis Dec 11, 2023
138a5e6
feat: add serving /network and /escrow subgraphs back in
Jannis Dec 11, 2023
f7cdb11
fix(service): avoid unnecessary error conversion
Jannis Dec 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
516 changes: 216 additions & 300 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ sqlx = { version = "0.7.1", features = [
] }
tokio = { version = "1.32.0", features = ["full", "macros", "rt"] }
thegraph = { git = "https://github.com/edgeandnode/toolshed", branch = "main" }
graphql-http = { git = "https://github.com/edgeandnode/toolshed", branch = "main" }
graphql-http = { git = "https://github.com/edgeandnode/toolshed", branch = "main", features = [
"http-reqwest",
] }
tap_core = "0.7.0"
axum = { version = "0.6.20", default_features = true, features = ["headers"] }
thiserror = "1.0.49"
Expand Down
76 changes: 26 additions & 50 deletions common/src/allocations/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use alloy_primitives::Address;
use anyhow::anyhow;
use eventuals::{timer, Eventual, EventualExt};
use serde::Deserialize;
use serde_json::json;
use tokio::time::sleep;
use tracing::warn;

use crate::prelude::SubgraphClient;
use crate::prelude::{Query, SubgraphClient};

use super::Allocation;

Expand All @@ -22,7 +21,7 @@ async fn current_epoch(
// Types for deserializing the network subgraph response
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GraphNetworkResponse {
struct GraphNetworkData {
graph_network: Option<GraphNetwork>,
}
#[derive(Deserialize)]
Expand All @@ -33,31 +32,15 @@ async fn current_epoch(

// Query the current epoch
let query = r#"query epoch($id: ID!) { graphNetwork(id: $id) { currentEpoch } }"#;
let response = network_subgraph
.query::<GraphNetworkResponse>(&json!({
"query": query,
"variables": {
"id": graph_network_id
}
}))
let result = network_subgraph
.query::<GraphNetworkData>(Query::new_with_variables(
query,
[("id", graph_network_id.into())],
))
.await?;

if !response.errors.is_empty() {
warn!(
"Errors encountered identifying current epoch for network {}: {}",
graph_network_id,
response
.errors
.into_iter()
.map(|e| e.message)
.collect::<Vec<_>>()
.join(", ")
);
}

response
.data
.and_then(|data| data.graph_network)
result?
.graph_network
.ok_or_else(|| anyhow!("Network {} not found", graph_network_id))
.map(|network| network.current_epoch)
}
Expand Down Expand Up @@ -138,42 +121,35 @@ pub fn indexer_allocations(
// Query active and recently closed allocations for the indexer,
// using the network subgraph
let response = network_subgraph
.query::<IndexerAllocationsResponse>(&json!({
"query": query,
"variables": {
"indexer": format!("{indexer_address:?}"),
"closedAtEpochThreshold": closed_at_epoch_threshold,
}}))
.query::<IndexerAllocationsResponse>(Query::new_with_variables(
query,
[
("indexer", format!("{indexer_address:?}").into()),
("closedAtEpochThreshold", closed_at_epoch_threshold.into()),
],
))
.await
.map_err(|e| e.to_string())?;

// If there are any GraphQL errors returned, we'll log them for debugging
if !response.errors.is_empty() {
warn!(
"Errors encountered fetching active or recently closed allocations for indexer {:?}: {}",
indexer_address,
response.errors.into_iter().map(|e| e.message).collect::<Vec<_>>().join(", ")
);
}

// Verify that the indexer could be found at all
let indexer = response
.data
.and_then(|data| data.indexer)
.ok_or_else(|| format!("Indexer {:?} could not be found on the network", indexer_address))?;
let indexer = response.map_err(|e| e.to_string()).and_then(|data| {
// Verify that the indexer could be found at all
data.indexer
.ok_or_else(|| format!("Indexer `{indexer_address}` not found on the network"))
})?;

// Pull active and recently closed allocations out of the indexer
let Indexer {
active_allocations,
recently_closed_allocations
recently_closed_allocations,
} = indexer;

Ok(HashMap::from_iter(
active_allocations.into_iter().map(|a| (a.id, a)).chain(
recently_closed_allocations.into_iter().map(|a| (a.id, a)))
active_allocations
.into_iter()
.map(|a| (a.id, a))
.chain(recently_closed_allocations.into_iter().map(|a| (a.id, a))),
))
},

// Need to use string errors here because eventuals `map_with_retry` retries
// errors that can be cloned
move |err: String| {
Expand Down
40 changes: 12 additions & 28 deletions common/src/attestations/dispute_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use std::time::Duration;
use alloy_primitives::Address;
use eventuals::{timer, Eventual, EventualExt};
use serde::Deserialize;
use serde_json::json;
use tokio::time::sleep;
use tracing::warn;

use crate::subgraph_client::SubgraphClient;
use crate::subgraph_client::{Query, SubgraphClient};

pub fn dispute_manager(
network_subgraph: &'static SubgraphClient,
Expand All @@ -32,41 +31,26 @@ pub fn dispute_manager(
timer(interval).map_with_retry(
move |_| async move {
let response = network_subgraph
.query::<DisputeManagerResponse>(&json!({
"query": r#"
.query::<DisputeManagerResponse>(Query::new_with_variables(
r#"
query network($id: ID!) {
graphNetwork(id: $id) {
disputeManager
}
}
"#,
"variables": {
"id": graph_network_id
}
}))
[("id", graph_network_id.into())],
))
.await
.map_err(|e| e.to_string())?;

if !response.errors.is_empty() {
warn!(
"Errors encountered querying the dispute manager for network {}: {}",
graph_network_id,
response
.errors
.into_iter()
.map(|e| e.message)
.collect::<Vec<_>>()
.join(", ")
);
}

response
.data
.and_then(|data| data.graph_network)
.map(|network| network.dispute_manager)
.ok_or_else(|| {
format!("Network {} not found in network subgraph", graph_network_id)
})
response.map_err(|e| e.to_string()).and_then(|data| {
data.graph_network
.map(|network| network.dispute_manager)
.ok_or_else(|| {
format!("Network {} not found in network subgraph", graph_network_id)
})
})
},
move |err: String| {
warn!(
Expand Down
72 changes: 26 additions & 46 deletions common/src/escrow_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ use anyhow::Result;
use ethers_core::types::U256;
use eventuals::{timer, Eventual, EventualExt};
use serde::Deserialize;
use serde_json::json;
use tokio::time::sleep;
use tracing::{error, warn};

use crate::prelude::SubgraphClient;
use crate::prelude::{Query, SubgraphClient};

pub fn escrow_accounts(
escrow_subgraph: &'static SubgraphClient,
Expand Down Expand Up @@ -44,8 +43,8 @@ pub fn escrow_accounts(
timer(interval).map_with_retry(
move |_| async move {
let response = escrow_subgraph
.query::<EscrowAccountsResponse>(&json!({
"query": r#"
.query::<EscrowAccountsResponse>(Query::new_with_variables(
r#"
query ($indexer: ID!) {
escrowAccounts(where: {receiver_: {id: $indexer}}) {
balance
Expand All @@ -56,52 +55,33 @@ pub fn escrow_accounts(
}
}
"#,
"variables": {
"indexer": indexer_address,
}
}
[("indexer", format!("{:x?}", indexer_address).into())],
))
.await
.map_err(|e| e.to_string())?;

// If there are any GraphQL errors returned, we'll log them for debugging
if !response.errors.is_empty() {
error!(
"Errors encountered fetching escrow accounts for indexer {:?}: {}",
indexer_address,
response
.errors
.into_iter()
.map(|e| e.message)
.collect::<Vec<_>>()
.join(", ")
);
}

let sender_accounts = response
.data
.map_or(vec![], |data| data.escrow_accounts)
.iter()
.map(|account| {
let balance = U256::checked_sub(
U256::from_dec_str(&account.balance)?,
U256::from_dec_str(&account.total_amount_thawing)?,
)
.unwrap_or_else(|| {
warn!(
"Balance minus total amount thawing underflowed for account {}. \
Setting balance to 0, no queries will be served for this sender.",
account.sender.id
);
U256::from(0)
});

Ok((account.sender.id, balance))
})
.collect::<Result<HashMap<_, _>, anyhow::Error>>()
.map_err(|e| format!("{}", e))?;

Ok(sender_accounts)
response.map_err(|e| e.to_string()).and_then(|data| {
data.escrow_accounts
.iter()
.map(|account| {
let balance = U256::checked_sub(
U256::from_dec_str(&account.balance)?,
U256::from_dec_str(&account.total_amount_thawing)?,
)
.unwrap_or_else(|| {
warn!(
"Balance minus total amount thawing underflowed for account {}. \
Setting balance to 0, no queries will be served for this sender.",
account.sender.id
);
U256::from(0)
});

Ok((account.sender.id, balance))
})
.collect::<Result<HashMap<_, _>, anyhow::Error>>()
.map_err(|e| format!("{}", e))
})
},
move |err: String| {
error!(
Expand Down
5 changes: 5 additions & 0 deletions common/src/indexer_service/http/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ pub struct DatabaseConfig {

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SubgraphConfig {
#[serde(default)]
pub serve_subgraph: bool,
pub serve_auth_token: Option<String>,

pub deployment: Option<DeploymentId>,
pub query_url: String,
pub syncing_interval: u64,
Expand Down Expand Up @@ -48,6 +52,7 @@ pub struct GraphNodeConfig {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct GraphNetworkConfig {
pub id: u64,
pub chain_id: u64,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down
Loading