diff --git a/common/src/allocations/monitor.rs b/common/src/allocations/monitor.rs index 514a1104c..c26d08a13 100644 --- a/common/src/allocations/monitor.rs +++ b/common/src/allocations/monitor.rs @@ -189,28 +189,29 @@ pub fn indexer_allocations( #[cfg(test)] mod test { - use reqwest::Url; use serde_json::json; use wiremock::{ matchers::{body_string_contains, method, path}, Mock, MockServer, ResponseTemplate, }; - use crate::{prelude::SubgraphClient, test_vectors}; + use crate::{prelude::SubgraphClient, subgraph_client::DeploymentDetails, test_vectors}; use super::*; async fn setup_mock_network_subgraph() -> (&'static SubgraphClient, MockServer) { // Set up a mock network subgraph let mock_server = MockServer::start().await; - let network_subgraph_endpoint = Url::parse(&format!( - "{}/subgraphs/id/{}", - &mock_server.uri(), - *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT - )) + let network_subgraph = SubgraphClient::new( + None, + DeploymentDetails::for_query_url(&format!( + "{}/subgraphs/id/{}", + &mock_server.uri(), + *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT + )) + .unwrap(), + ) .unwrap(); - let network_subgraph = - SubgraphClient::new("network-subgraph", network_subgraph_endpoint.as_ref()).unwrap(); // Mock result for current epoch requests mock_server diff --git a/common/src/attestations/dispute_manager.rs b/common/src/attestations/dispute_manager.rs index 2a03f367d..5a071b6d5 100644 --- a/common/src/attestations/dispute_manager.rs +++ b/common/src/attestations/dispute_manager.rs @@ -81,7 +81,6 @@ pub fn dispute_manager( #[cfg(test)] mod test { - use reqwest::Url; use serde_json::json; use wiremock::{ matchers::{method, path}, @@ -90,6 +89,7 @@ mod test { use crate::{ prelude::SubgraphClient, + subgraph_client::DeploymentDetails, test_vectors::{self, DISPUTE_MANAGER_ADDRESS}, }; @@ -98,14 +98,16 @@ mod test { async fn setup_mock_network_subgraph() -> (&'static SubgraphClient, MockServer) { // Set up a mock network subgraph let mock_server = MockServer::start().await; - let network_subgraph_endpoint = Url::parse(&format!( - "{}/subgraphs/id/{}", - &mock_server.uri(), - *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT - )) + let network_subgraph = SubgraphClient::new( + None, + DeploymentDetails::for_query_url(&format!( + "{}/subgraphs/id/{}", + &mock_server.uri(), + *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT + )) + .unwrap(), + ) .unwrap(); - let network_subgraph = - SubgraphClient::new("network-subgraph", network_subgraph_endpoint.as_ref()).unwrap(); // Mock result for current epoch requests mock_server diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index e5c902efd..3d91cd4d4 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -115,10 +115,10 @@ pub fn escrow_accounts( #[cfg(test)] mod tests { - use reqwest::Url; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; + use crate::prelude::DeploymentDetails; use crate::test_vectors; use super::*; @@ -127,14 +127,17 @@ mod tests { async fn test_current_accounts() { // Set up a mock escrow subgraph let mock_server = MockServer::start().await; - let escrow_subgraph_endpoint = Url::parse(&format!( - "{}/subgraphs/id/{}", - &mock_server.uri(), - *test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT - )) - .unwrap(); let escrow_subgraph = Box::leak(Box::new( - SubgraphClient::new("escrow-subgraph", escrow_subgraph_endpoint.as_ref()).unwrap(), + SubgraphClient::new( + None, + DeploymentDetails::for_query_url(&format!( + "{}/subgraphs/id/{}", + &mock_server.uri(), + *test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT + )) + .unwrap(), + ) + .unwrap(), )); let mock = Mock::given(method("POST")) diff --git a/common/src/lib.rs b/common/src/lib.rs index 7bbc5e08f..67c4aabe2 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -20,6 +20,6 @@ pub mod prelude { dispute_manager::dispute_manager, signer::AttestationSigner, signers::attestation_signers, }; pub use super::escrow_accounts::escrow_accounts; - pub use super::subgraph_client::SubgraphClient; + pub use super::subgraph_client::{DeploymentDetails, SubgraphClient}; pub use super::tap_manager::TapManager; } diff --git a/common/src/subgraph_client.rs b/common/src/subgraph_client.rs deleted file mode 100644 index 1b00f22b7..000000000 --- a/common/src/subgraph_client.rs +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::sync::Arc; - -use anyhow::anyhow; -use graphql::http::Response; -use reqwest::{header, Client, Url}; -use serde::de::Deserialize; -use serde_json::Value; - -/// Network subgraph query wrapper -/// -/// This is Arc internally, so it can be cloned and shared between threads. -#[derive(Debug, Clone)] -pub struct SubgraphClient { - client: Client, // it is Arc - subgraph_url: Arc, -} - -impl SubgraphClient { - pub fn new(name: &str, query_url: &str) -> Result { - let query_url = Url::parse(query_url).map_err(|e| { - anyhow!( - "Could not parse `{}` subgraph query URL `{}`: {}", - name, - query_url, - e - ) - })?; - - let client = reqwest::Client::builder() - .user_agent("indexer-common") - .build() - .map_err(|err| { - anyhow!( - "Could not build a client for `{name}` subgraph query URL `{query_url}`: {err}" - ) - }) - .expect("Building subgraph client"); - - Ok(Self { - client, - subgraph_url: Arc::new(query_url), - }) - } - - pub async fn query Deserialize<'de>>( - &self, - body: &Value, - ) -> Result, reqwest::Error> { - self.client - .post(Url::clone(&self.subgraph_url)) - .json(body) - .header(header::CONTENT_TYPE, "application/json") - .send() - .await - .and_then(|response| response.error_for_status())? - .json::>() - .await - } -} - -#[cfg(test)] -mod test { - use serde_json::json; - use wiremock::matchers::{method, path}; - use wiremock::{Mock, MockServer, ResponseTemplate}; - - use crate::test_vectors; - - use super::*; - - const GRAPH_NODE_STATUS_ENDPOINT: &str = "http://localhost:8000/"; - const NETWORK_SUBGRAPH_URL: &str = - "https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli"; - - async fn mock_graph_node_server() -> MockServer { - let mock_server = MockServer::start().await; - let mock = Mock::given(method("POST")) - .and(path(format!( - "/subgraphs/id/{}", - *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT - ))) - .respond_with(ResponseTemplate::new(200).set_body_raw( - r#" - { - "data": { - "graphNetwork": { - "currentEpoch": 960 - } - } - } - "#, - "application/json", - )); - mock_server.register(mock).await; - - mock_server - } - - fn network_subgraph_client() -> SubgraphClient { - SubgraphClient::new("network-subgraph", NETWORK_SUBGRAPH_URL).unwrap() - } - - #[tokio::test] - async fn test_network_query() { - let _mock_server = mock_graph_node_server().await; - - // Check that the response is valid JSON - let result = network_subgraph_client() - .query::(&json!({ - "query": r#" - query { - graphNetwork(id: 1) { - currentEpoch - } - } - "#, - })) - .await - .unwrap(); - - assert!(result.data.is_some()); - } -} diff --git a/common/src/subgraph_client/client.rs b/common/src/subgraph_client/client.rs new file mode 100644 index 000000000..f61179de9 --- /dev/null +++ b/common/src/subgraph_client/client.rs @@ -0,0 +1,202 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::anyhow; +use eventuals::Eventual; +use graphql::http::Response; +use log::warn; +use reqwest::{header, Url}; +use serde::de::Deserialize; +use serde_json::Value; +use toolshed::thegraph::DeploymentId; + +use super::monitor::{monitor_deployment_status, DeploymentStatus}; + +#[derive(Debug, Clone)] +pub struct DeploymentDetails { + pub deployment: Option, + pub status_url: Option, + pub query_url: Url, +} + +impl DeploymentDetails { + pub fn for_graph_node( + graph_node_base_url: &str, + deployment: DeploymentId, + ) -> Result { + Ok(Self { + deployment: Some(deployment), + status_url: Some(Url::parse(&format!("{graph_node_base_url}/status"))?), + query_url: Url::parse(&format!("{graph_node_base_url}/subgraphs/id/{deployment}"))?, + }) + } + + pub fn for_query_url(query_url: &str) -> Result { + Ok(Self { + deployment: None, + status_url: None, + query_url: Url::parse(query_url)?, + }) + } +} + +struct DeploymentClient { + pub status: Option>, + pub query_url: Url, +} + +impl DeploymentClient { + pub fn new(details: DeploymentDetails) -> Self { + Self { + status: details + .deployment + .zip(details.status_url) + .map(|(deployment, url)| monitor_deployment_status(deployment, url)), + query_url: details.query_url, + } + } + + pub async fn query Deserialize<'de>>( + &self, + body: &Value, + ) -> Result, anyhow::Error> { + if let Some(ref status) = self.status { + let deployment_status = status.value().await.expect("reading deployment status"); + + if !deployment_status.synced || &deployment_status.health != "healthy" { + return Err(anyhow!( + "Deployment `{}` is not ready or healthy to be queried", + self.query_url + )); + } + } + + Ok(reqwest::Client::new() + .post(self.query_url.as_ref()) + .json(body) + .header(header::USER_AGENT, "indexer-common") + .header(header::CONTENT_TYPE, "application/json") + .send() + .await + .and_then(|response| response.error_for_status())? + .json::>() + .await?) + } +} + +/// Network subgraph query wrapper +/// +/// This is Arc internally, so it can be cloned and shared between threads. +pub struct SubgraphClient { + local_client: Option, + remote_client: DeploymentClient, +} + +impl SubgraphClient { + pub fn new( + local_deployment: Option, + remote_deployment: DeploymentDetails, + ) -> Result { + let local_client = local_deployment.map(DeploymentClient::new); + let remote_client = DeploymentClient::new(remote_deployment); + + Ok(Self { + local_client, + remote_client, + }) + } + + pub async fn query Deserialize<'de>>( + &self, + body: &Value, + ) -> Result, anyhow::Error> { + // Try the local client first; if that fails, log the error and move on + // to the remote client + if let Some(ref local_client) = self.local_client { + match local_client.query(body).await { + Ok(response) => return Ok(response), + Err(err) => warn!( + "Failed to query local subgraph deployment `{}`, trying remote deployment next: {}", + local_client.query_url, err + ), + } + } + + // Try the remote client + self.remote_client.query(body).await.map_err(|err| { + warn!( + "Failed to query remote subgraph deployment `{}`: {}", + self.remote_client.query_url, err + ); + + err + }) + } +} + +#[cfg(test)] +mod test { + use serde_json::json; + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + use crate::test_vectors::{self}; + + use super::*; + + const NETWORK_SUBGRAPH_URL: &str = + "https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli"; + + async fn mock_graph_node_server() -> MockServer { + let mock_server = MockServer::start().await; + let mock = Mock::given(method("POST")) + .and(path(format!( + "/subgraphs/id/{}", + *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT + ))) + .respond_with(ResponseTemplate::new(200).set_body_raw( + r#" + { + "data": { + "graphNetwork": { + "currentEpoch": 960 + } + } + } + "#, + "application/json", + )); + mock_server.register(mock).await; + + mock_server + } + + fn network_subgraph_client() -> SubgraphClient { + SubgraphClient::new( + None, + DeploymentDetails::for_query_url(NETWORK_SUBGRAPH_URL).unwrap(), + ) + .unwrap() + } + + #[tokio::test] + async fn test_network_query() { + let _mock_server = mock_graph_node_server().await; + + // Check that the response is valid JSON + let result = network_subgraph_client() + .query::(&json!({ + "query": r#" + query { + graphNetwork(id: 1) { + currentEpoch + } + } + "#, + })) + .await + .unwrap(); + + assert!(result.data.is_some()); + } +} diff --git a/common/src/subgraph_client/mod.rs b/common/src/subgraph_client/mod.rs new file mode 100644 index 000000000..b3f0b3f7c --- /dev/null +++ b/common/src/subgraph_client/mod.rs @@ -0,0 +1,4 @@ +mod client; +mod monitor; + +pub use client::{DeploymentDetails, SubgraphClient}; diff --git a/common/src/subgraph_client/monitor.rs b/common/src/subgraph_client/monitor.rs new file mode 100644 index 000000000..e4780d10d --- /dev/null +++ b/common/src/subgraph_client/monitor.rs @@ -0,0 +1,95 @@ +use std::time::Duration; + +use eventuals::{timer, Eventual, EventualExt}; +use graphql::http::Response; +use log::warn; +use reqwest::{header, Url}; +use serde::Deserialize; +use serde_json::{json, Value}; +use tokio::time::sleep; +use toolshed::thegraph::DeploymentId; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct DeploymentStatusResponse { + indexing_statuses: Option>, +} + +#[derive(Clone, Deserialize, Eq, PartialEq)] +pub struct DeploymentStatus { + pub synced: bool, + pub health: String, +} + +async fn query Deserialize<'de>>( + url: Url, + body: &Value, +) -> Result, reqwest::Error> { + reqwest::Client::new() + .post(url) + .json(body) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await + .and_then(|response| response.error_for_status())? + .json::>() + .await +} + +pub fn monitor_deployment_status( + deployment: DeploymentId, + status_url: Url, +) -> Eventual { + timer(Duration::from_secs(30)).map_with_retry( + move |_| { + let status_url = status_url.clone(); + + async move { + let body = json!({ + "query": r#" + query indexingStatuses($ids: [ID!]!) { + indexingStatuses(deployments: $ids) { + synced + health + } + } + "#, + "variables": { + "ids": [deployment.to_string()] + } + }); + + let response = query::(status_url, &body) + .await + .map_err(|e| { + format!("Failed to query status of deployment `{deployment}`: {e}") + })?; + + if let Some(errors) = response.errors { + warn!( + "Errors encountered querying the deployment status for `{}`: {}", + deployment, + errors + .into_iter() + .map(|e| e.message) + .collect::>() + .join(", ") + ); + } + + response + .data + .and_then(|data| data.indexing_statuses) + .and_then(|data| data.get(0).map(Clone::clone)) + .ok_or_else(|| format!("Deployment `{}` not found", deployment)) + } + }, + move |err: String| async move { + warn!( + "Error querying deployment status for `{}`: {}", + deployment, err + ); + sleep(Duration::from_secs(15)).await + }, + ) +} diff --git a/service/src/main.rs b/service/src/main.rs index aa1050942..fea30cfc3 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -12,6 +12,7 @@ use indexer_common::{ prelude::{ attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, SubgraphClient, }, + subgraph_client::DeploymentDetails, tap_manager::TapManager, }; @@ -70,8 +71,21 @@ async fn main() -> Result<(), std::io::Error> { // no problem. let network_subgraph = Box::leak(Box::new( SubgraphClient::new( - "network-subgraph", - &config.network_subgraph.network_subgraph_endpoint, + config + .network_subgraph + .network_subgraph_deployment + .map(|deployment| { + DeploymentDetails::for_graph_node( + &config.indexer_infrastructure.graph_node_query_endpoint, + deployment, + ) + }) + .transpose() + .expect( + "Failed to parse graph node query endpoint and network subgraph deployment", + ), + DeploymentDetails::for_query_url(&config.network_subgraph.network_subgraph_endpoint) + .expect("Failed to parse network subgraph endpoint"), ) .expect("Failed to set up network subgraph client"), )); @@ -107,8 +121,19 @@ async fn main() -> Result<(), std::io::Error> { let escrow_subgraph = Box::leak(Box::new( SubgraphClient::new( - "escrow-subgraph", - &config.escrow_subgraph.escrow_subgraph_endpoint, + config + .escrow_subgraph + .escrow_subgraph_deployment + .map(|deployment| { + DeploymentDetails::for_graph_node( + &config.indexer_infrastructure.graph_node_query_endpoint, + deployment, + ) + }) + .transpose() + .expect("Failed to parse graph node query endpoint and escrow subgraph deployment"), + DeploymentDetails::for_query_url(&config.escrow_subgraph.escrow_subgraph_endpoint) + .expect("Failed to parse escrow subgraph endpoint"), ) .expect("Failed to set up escrow subgraph client"), ));