From eac59267c3263324895c8a822447bb92491c5907 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Mon, 23 Oct 2023 20:52:01 +0200 Subject: [PATCH 1/6] refactor: only support a query URL in SubgraphClient --- common/src/allocations/monitor.rs | 16 +++---- common/src/attestations/dispute_manager.rs | 16 +++---- common/src/escrow_accounts.rs | 15 +++--- common/src/subgraph_client.rs | 56 +++++++--------------- service/src/main.rs | 6 +-- 5 files changed, 39 insertions(+), 70 deletions(-) diff --git a/common/src/allocations/monitor.rs b/common/src/allocations/monitor.rs index caee3fb3..514a1104 100644 --- a/common/src/allocations/monitor.rs +++ b/common/src/allocations/monitor.rs @@ -189,6 +189,7 @@ pub fn indexer_allocations( #[cfg(test)] mod test { + use reqwest::Url; use serde_json::json; use wiremock::{ matchers::{body_string_contains, method, path}, @@ -202,17 +203,14 @@ mod test { async fn setup_mock_network_subgraph() -> (&'static SubgraphClient, MockServer) { // Set up a mock network subgraph let mock_server = MockServer::start().await; - let network_subgraph_endpoint = SubgraphClient::local_deployment_endpoint( + let network_subgraph_endpoint = Url::parse(&format!( + "{}/subgraphs/id/{}", &mock_server.uri(), - &test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT, - ) - .unwrap(); - let network_subgraph = SubgraphClient::new( - Some(&mock_server.uri()), - Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT), - network_subgraph_endpoint.as_ref(), - ) + *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT + )) .unwrap(); + let network_subgraph = + SubgraphClient::new("network-subgraph", network_subgraph_endpoint.as_ref()).unwrap(); // Mock result for current epoch requests mock_server diff --git a/common/src/attestations/dispute_manager.rs b/common/src/attestations/dispute_manager.rs index 5a15c10d..2a03f367 100644 --- a/common/src/attestations/dispute_manager.rs +++ b/common/src/attestations/dispute_manager.rs @@ -81,6 +81,7 @@ pub fn dispute_manager( #[cfg(test)] mod test { + use reqwest::Url; use serde_json::json; use wiremock::{ matchers::{method, path}, @@ -97,17 +98,14 @@ mod test { async fn setup_mock_network_subgraph() -> (&'static SubgraphClient, MockServer) { // Set up a mock network subgraph let mock_server = MockServer::start().await; - let network_subgraph_endpoint = SubgraphClient::local_deployment_endpoint( + let network_subgraph_endpoint = Url::parse(&format!( + "{}/subgraphs/id/{}", &mock_server.uri(), - &test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT, - ) - .unwrap(); - let network_subgraph = SubgraphClient::new( - Some(&mock_server.uri()), - Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT), - network_subgraph_endpoint.as_ref(), - ) + *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT + )) .unwrap(); + let network_subgraph = + SubgraphClient::new("network-subgraph", network_subgraph_endpoint.as_ref()).unwrap(); // Mock result for current epoch requests mock_server diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index 1d878847..e5c902ef 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -115,6 +115,7 @@ pub fn escrow_accounts( #[cfg(test)] mod tests { + use reqwest::Url; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; @@ -126,18 +127,14 @@ mod tests { async fn test_current_accounts() { // Set up a mock escrow subgraph let mock_server = MockServer::start().await; - let escrow_subgraph_endpoint = SubgraphClient::local_deployment_endpoint( + let escrow_subgraph_endpoint = Url::parse(&format!( + "{}/subgraphs/id/{}", &mock_server.uri(), - &test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT, - ) + *test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT + )) .unwrap(); let escrow_subgraph = Box::leak(Box::new( - SubgraphClient::new( - Some(&mock_server.uri()), - Some(&test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT), - escrow_subgraph_endpoint.as_ref(), - ) - .unwrap(), + SubgraphClient::new("escrow-subgraph", escrow_subgraph_endpoint.as_ref()).unwrap(), )); let mock = Mock::given(method("POST")) diff --git a/common/src/subgraph_client.rs b/common/src/subgraph_client.rs index 23fce374..1b00f22b 100644 --- a/common/src/subgraph_client.rs +++ b/common/src/subgraph_client.rs @@ -8,7 +8,6 @@ use graphql::http::Response; use reqwest::{header, Client, Url}; use serde::de::Deserialize; use serde_json::Value; -use toolshed::thegraph::DeploymentId; /// Network subgraph query wrapper /// @@ -20,48 +19,32 @@ pub struct SubgraphClient { } impl SubgraphClient { - pub fn new( - graph_node_query_endpoint: Option<&str>, - deployment: Option<&DeploymentId>, - subgraph_url: &str, - ) -> Result { - // TODO: Check indexing status of the local subgraph deployment - // if the deployment is healthy and synced, use local_subgraoh_endpoint - let _local_subgraph_endpoint = match (graph_node_query_endpoint, deployment) { - (Some(endpoint), Some(id)) => Some(Self::local_deployment_endpoint(endpoint, id)?), - _ => None, - }; - - let subgraph_url = Url::parse(subgraph_url) - .map_err(|e| anyhow!("Could not parse subgraph url `{}`: {}", subgraph_url, e))?; + pub fn new(name: &str, query_url: &str) -> Result { + let query_url = Url::parse(query_url).map_err(|e| { + anyhow!( + "Could not parse `{}` subgraph query URL `{}`: {}", + name, + query_url, + e + ) + })?; let client = reqwest::Client::builder() .user_agent("indexer-common") .build() - .expect("Could not build a client for the Graph Node query endpoint"); + .map_err(|err| { + anyhow!( + "Could not build a client for `{name}` subgraph query URL `{query_url}`: {err}" + ) + }) + .expect("Building subgraph client"); Ok(Self { client, - subgraph_url: Arc::new(subgraph_url), + subgraph_url: Arc::new(query_url), }) } - pub fn local_deployment_endpoint( - graph_node_query_endpoint: &str, - deployment: &DeploymentId, - ) -> Result { - Url::parse(graph_node_query_endpoint) - .and_then(|u| u.join("/subgraphs/id/")) - .and_then(|u| u.join(&deployment.to_string())) - .map_err(|e| { - anyhow!( - "Could not parse Graph Node query endpoint for subgraph deployment `{}`: {}", - deployment, - e - ) - }) - } - pub async fn query Deserialize<'de>>( &self, body: &Value, @@ -117,12 +100,7 @@ mod test { } fn network_subgraph_client() -> SubgraphClient { - SubgraphClient::new( - Some(GRAPH_NODE_STATUS_ENDPOINT), - Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT), - NETWORK_SUBGRAPH_URL, - ) - .unwrap() + SubgraphClient::new("network-subgraph", NETWORK_SUBGRAPH_URL).unwrap() } #[tokio::test] diff --git a/service/src/main.rs b/service/src/main.rs index e706cbd3..aa105094 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -70,8 +70,7 @@ async fn main() -> Result<(), std::io::Error> { // no problem. let network_subgraph = Box::leak(Box::new( SubgraphClient::new( - Some(&config.indexer_infrastructure.graph_node_query_endpoint), - config.network_subgraph.network_subgraph_deployment.as_ref(), + "network-subgraph", &config.network_subgraph.network_subgraph_endpoint, ) .expect("Failed to set up network subgraph client"), @@ -108,8 +107,7 @@ async fn main() -> Result<(), std::io::Error> { let escrow_subgraph = Box::leak(Box::new( SubgraphClient::new( - Some(&config.indexer_infrastructure.graph_node_query_endpoint), - config.escrow_subgraph.escrow_subgraph_deployment.as_ref(), + "escrow-subgraph", &config.escrow_subgraph.escrow_subgraph_endpoint, ) .expect("Failed to set up escrow subgraph client"), From aad4f52bbf8a1521f9a16c49ed4ba18c13d88524 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Tue, 24 Oct 2023 16:34:57 +0200 Subject: [PATCH 2/6] feat: support prioritizing local subgraph if synced and healthy This is how the existing indexer-common does it - it uses the local network subgraph if it's synced and healthy, otherwise falls back to a remote subgraph URL. --- common/src/allocations/monitor.rs | 19 +- common/src/attestations/dispute_manager.rs | 18 +- common/src/escrow_accounts.rs | 19 +- common/src/lib.rs | 2 +- common/src/subgraph_client.rs | 126 ------------- common/src/subgraph_client/client.rs | 202 +++++++++++++++++++++ common/src/subgraph_client/mod.rs | 4 + common/src/subgraph_client/monitor.rs | 95 ++++++++++ service/src/main.rs | 33 +++- 9 files changed, 362 insertions(+), 156 deletions(-) delete mode 100644 common/src/subgraph_client.rs create mode 100644 common/src/subgraph_client/client.rs create mode 100644 common/src/subgraph_client/mod.rs create mode 100644 common/src/subgraph_client/monitor.rs diff --git a/common/src/allocations/monitor.rs b/common/src/allocations/monitor.rs index 514a1104..c26d08a1 100644 --- a/common/src/allocations/monitor.rs +++ b/common/src/allocations/monitor.rs @@ -189,28 +189,29 @@ pub fn indexer_allocations( #[cfg(test)] mod test { - use reqwest::Url; use serde_json::json; use wiremock::{ matchers::{body_string_contains, method, path}, Mock, MockServer, ResponseTemplate, }; - use crate::{prelude::SubgraphClient, test_vectors}; + use crate::{prelude::SubgraphClient, subgraph_client::DeploymentDetails, test_vectors}; use super::*; async fn setup_mock_network_subgraph() -> (&'static SubgraphClient, MockServer) { // Set up a mock network subgraph let mock_server = MockServer::start().await; - let network_subgraph_endpoint = Url::parse(&format!( - "{}/subgraphs/id/{}", - &mock_server.uri(), - *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT - )) + let network_subgraph = SubgraphClient::new( + None, + DeploymentDetails::for_query_url(&format!( + "{}/subgraphs/id/{}", + &mock_server.uri(), + *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT + )) + .unwrap(), + ) .unwrap(); - let network_subgraph = - SubgraphClient::new("network-subgraph", network_subgraph_endpoint.as_ref()).unwrap(); // Mock result for current epoch requests mock_server diff --git a/common/src/attestations/dispute_manager.rs b/common/src/attestations/dispute_manager.rs index 2a03f367..5a071b6d 100644 --- a/common/src/attestations/dispute_manager.rs +++ b/common/src/attestations/dispute_manager.rs @@ -81,7 +81,6 @@ pub fn dispute_manager( #[cfg(test)] mod test { - use reqwest::Url; use serde_json::json; use wiremock::{ matchers::{method, path}, @@ -90,6 +89,7 @@ mod test { use crate::{ prelude::SubgraphClient, + subgraph_client::DeploymentDetails, test_vectors::{self, DISPUTE_MANAGER_ADDRESS}, }; @@ -98,14 +98,16 @@ mod test { async fn setup_mock_network_subgraph() -> (&'static SubgraphClient, MockServer) { // Set up a mock network subgraph let mock_server = MockServer::start().await; - let network_subgraph_endpoint = Url::parse(&format!( - "{}/subgraphs/id/{}", - &mock_server.uri(), - *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT - )) + let network_subgraph = SubgraphClient::new( + None, + DeploymentDetails::for_query_url(&format!( + "{}/subgraphs/id/{}", + &mock_server.uri(), + *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT + )) + .unwrap(), + ) .unwrap(); - let network_subgraph = - SubgraphClient::new("network-subgraph", network_subgraph_endpoint.as_ref()).unwrap(); // Mock result for current epoch requests mock_server diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index e5c902ef..3d91cd4d 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -115,10 +115,10 @@ pub fn escrow_accounts( #[cfg(test)] mod tests { - use reqwest::Url; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; + use crate::prelude::DeploymentDetails; use crate::test_vectors; use super::*; @@ -127,14 +127,17 @@ mod tests { async fn test_current_accounts() { // Set up a mock escrow subgraph let mock_server = MockServer::start().await; - let escrow_subgraph_endpoint = Url::parse(&format!( - "{}/subgraphs/id/{}", - &mock_server.uri(), - *test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT - )) - .unwrap(); let escrow_subgraph = Box::leak(Box::new( - SubgraphClient::new("escrow-subgraph", escrow_subgraph_endpoint.as_ref()).unwrap(), + SubgraphClient::new( + None, + DeploymentDetails::for_query_url(&format!( + "{}/subgraphs/id/{}", + &mock_server.uri(), + *test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT + )) + .unwrap(), + ) + .unwrap(), )); let mock = Mock::given(method("POST")) diff --git a/common/src/lib.rs b/common/src/lib.rs index 7bbc5e08..67c4aabe 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -20,6 +20,6 @@ pub mod prelude { dispute_manager::dispute_manager, signer::AttestationSigner, signers::attestation_signers, }; pub use super::escrow_accounts::escrow_accounts; - pub use super::subgraph_client::SubgraphClient; + pub use super::subgraph_client::{DeploymentDetails, SubgraphClient}; pub use super::tap_manager::TapManager; } diff --git a/common/src/subgraph_client.rs b/common/src/subgraph_client.rs deleted file mode 100644 index 1b00f22b..00000000 --- a/common/src/subgraph_client.rs +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::sync::Arc; - -use anyhow::anyhow; -use graphql::http::Response; -use reqwest::{header, Client, Url}; -use serde::de::Deserialize; -use serde_json::Value; - -/// Network subgraph query wrapper -/// -/// This is Arc internally, so it can be cloned and shared between threads. -#[derive(Debug, Clone)] -pub struct SubgraphClient { - client: Client, // it is Arc - subgraph_url: Arc, -} - -impl SubgraphClient { - pub fn new(name: &str, query_url: &str) -> Result { - let query_url = Url::parse(query_url).map_err(|e| { - anyhow!( - "Could not parse `{}` subgraph query URL `{}`: {}", - name, - query_url, - e - ) - })?; - - let client = reqwest::Client::builder() - .user_agent("indexer-common") - .build() - .map_err(|err| { - anyhow!( - "Could not build a client for `{name}` subgraph query URL `{query_url}`: {err}" - ) - }) - .expect("Building subgraph client"); - - Ok(Self { - client, - subgraph_url: Arc::new(query_url), - }) - } - - pub async fn query Deserialize<'de>>( - &self, - body: &Value, - ) -> Result, reqwest::Error> { - self.client - .post(Url::clone(&self.subgraph_url)) - .json(body) - .header(header::CONTENT_TYPE, "application/json") - .send() - .await - .and_then(|response| response.error_for_status())? - .json::>() - .await - } -} - -#[cfg(test)] -mod test { - use serde_json::json; - use wiremock::matchers::{method, path}; - use wiremock::{Mock, MockServer, ResponseTemplate}; - - use crate::test_vectors; - - use super::*; - - const GRAPH_NODE_STATUS_ENDPOINT: &str = "http://localhost:8000/"; - const NETWORK_SUBGRAPH_URL: &str = - "https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli"; - - async fn mock_graph_node_server() -> MockServer { - let mock_server = MockServer::start().await; - let mock = Mock::given(method("POST")) - .and(path(format!( - "/subgraphs/id/{}", - *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT - ))) - .respond_with(ResponseTemplate::new(200).set_body_raw( - r#" - { - "data": { - "graphNetwork": { - "currentEpoch": 960 - } - } - } - "#, - "application/json", - )); - mock_server.register(mock).await; - - mock_server - } - - fn network_subgraph_client() -> SubgraphClient { - SubgraphClient::new("network-subgraph", NETWORK_SUBGRAPH_URL).unwrap() - } - - #[tokio::test] - async fn test_network_query() { - let _mock_server = mock_graph_node_server().await; - - // Check that the response is valid JSON - let result = network_subgraph_client() - .query::(&json!({ - "query": r#" - query { - graphNetwork(id: 1) { - currentEpoch - } - } - "#, - })) - .await - .unwrap(); - - assert!(result.data.is_some()); - } -} diff --git a/common/src/subgraph_client/client.rs b/common/src/subgraph_client/client.rs new file mode 100644 index 00000000..f61179de --- /dev/null +++ b/common/src/subgraph_client/client.rs @@ -0,0 +1,202 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::anyhow; +use eventuals::Eventual; +use graphql::http::Response; +use log::warn; +use reqwest::{header, Url}; +use serde::de::Deserialize; +use serde_json::Value; +use toolshed::thegraph::DeploymentId; + +use super::monitor::{monitor_deployment_status, DeploymentStatus}; + +#[derive(Debug, Clone)] +pub struct DeploymentDetails { + pub deployment: Option, + pub status_url: Option, + pub query_url: Url, +} + +impl DeploymentDetails { + pub fn for_graph_node( + graph_node_base_url: &str, + deployment: DeploymentId, + ) -> Result { + Ok(Self { + deployment: Some(deployment), + status_url: Some(Url::parse(&format!("{graph_node_base_url}/status"))?), + query_url: Url::parse(&format!("{graph_node_base_url}/subgraphs/id/{deployment}"))?, + }) + } + + pub fn for_query_url(query_url: &str) -> Result { + Ok(Self { + deployment: None, + status_url: None, + query_url: Url::parse(query_url)?, + }) + } +} + +struct DeploymentClient { + pub status: Option>, + pub query_url: Url, +} + +impl DeploymentClient { + pub fn new(details: DeploymentDetails) -> Self { + Self { + status: details + .deployment + .zip(details.status_url) + .map(|(deployment, url)| monitor_deployment_status(deployment, url)), + query_url: details.query_url, + } + } + + pub async fn query Deserialize<'de>>( + &self, + body: &Value, + ) -> Result, anyhow::Error> { + if let Some(ref status) = self.status { + let deployment_status = status.value().await.expect("reading deployment status"); + + if !deployment_status.synced || &deployment_status.health != "healthy" { + return Err(anyhow!( + "Deployment `{}` is not ready or healthy to be queried", + self.query_url + )); + } + } + + Ok(reqwest::Client::new() + .post(self.query_url.as_ref()) + .json(body) + .header(header::USER_AGENT, "indexer-common") + .header(header::CONTENT_TYPE, "application/json") + .send() + .await + .and_then(|response| response.error_for_status())? + .json::>() + .await?) + } +} + +/// Network subgraph query wrapper +/// +/// This is Arc internally, so it can be cloned and shared between threads. +pub struct SubgraphClient { + local_client: Option, + remote_client: DeploymentClient, +} + +impl SubgraphClient { + pub fn new( + local_deployment: Option, + remote_deployment: DeploymentDetails, + ) -> Result { + let local_client = local_deployment.map(DeploymentClient::new); + let remote_client = DeploymentClient::new(remote_deployment); + + Ok(Self { + local_client, + remote_client, + }) + } + + pub async fn query Deserialize<'de>>( + &self, + body: &Value, + ) -> Result, anyhow::Error> { + // Try the local client first; if that fails, log the error and move on + // to the remote client + if let Some(ref local_client) = self.local_client { + match local_client.query(body).await { + Ok(response) => return Ok(response), + Err(err) => warn!( + "Failed to query local subgraph deployment `{}`, trying remote deployment next: {}", + local_client.query_url, err + ), + } + } + + // Try the remote client + self.remote_client.query(body).await.map_err(|err| { + warn!( + "Failed to query remote subgraph deployment `{}`: {}", + self.remote_client.query_url, err + ); + + err + }) + } +} + +#[cfg(test)] +mod test { + use serde_json::json; + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + use crate::test_vectors::{self}; + + use super::*; + + const NETWORK_SUBGRAPH_URL: &str = + "https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli"; + + async fn mock_graph_node_server() -> MockServer { + let mock_server = MockServer::start().await; + let mock = Mock::given(method("POST")) + .and(path(format!( + "/subgraphs/id/{}", + *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT + ))) + .respond_with(ResponseTemplate::new(200).set_body_raw( + r#" + { + "data": { + "graphNetwork": { + "currentEpoch": 960 + } + } + } + "#, + "application/json", + )); + mock_server.register(mock).await; + + mock_server + } + + fn network_subgraph_client() -> SubgraphClient { + SubgraphClient::new( + None, + DeploymentDetails::for_query_url(NETWORK_SUBGRAPH_URL).unwrap(), + ) + .unwrap() + } + + #[tokio::test] + async fn test_network_query() { + let _mock_server = mock_graph_node_server().await; + + // Check that the response is valid JSON + let result = network_subgraph_client() + .query::(&json!({ + "query": r#" + query { + graphNetwork(id: 1) { + currentEpoch + } + } + "#, + })) + .await + .unwrap(); + + assert!(result.data.is_some()); + } +} diff --git a/common/src/subgraph_client/mod.rs b/common/src/subgraph_client/mod.rs new file mode 100644 index 00000000..b3f0b3f7 --- /dev/null +++ b/common/src/subgraph_client/mod.rs @@ -0,0 +1,4 @@ +mod client; +mod monitor; + +pub use client::{DeploymentDetails, SubgraphClient}; diff --git a/common/src/subgraph_client/monitor.rs b/common/src/subgraph_client/monitor.rs new file mode 100644 index 00000000..e4780d10 --- /dev/null +++ b/common/src/subgraph_client/monitor.rs @@ -0,0 +1,95 @@ +use std::time::Duration; + +use eventuals::{timer, Eventual, EventualExt}; +use graphql::http::Response; +use log::warn; +use reqwest::{header, Url}; +use serde::Deserialize; +use serde_json::{json, Value}; +use tokio::time::sleep; +use toolshed::thegraph::DeploymentId; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct DeploymentStatusResponse { + indexing_statuses: Option>, +} + +#[derive(Clone, Deserialize, Eq, PartialEq)] +pub struct DeploymentStatus { + pub synced: bool, + pub health: String, +} + +async fn query Deserialize<'de>>( + url: Url, + body: &Value, +) -> Result, reqwest::Error> { + reqwest::Client::new() + .post(url) + .json(body) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await + .and_then(|response| response.error_for_status())? + .json::>() + .await +} + +pub fn monitor_deployment_status( + deployment: DeploymentId, + status_url: Url, +) -> Eventual { + timer(Duration::from_secs(30)).map_with_retry( + move |_| { + let status_url = status_url.clone(); + + async move { + let body = json!({ + "query": r#" + query indexingStatuses($ids: [ID!]!) { + indexingStatuses(deployments: $ids) { + synced + health + } + } + "#, + "variables": { + "ids": [deployment.to_string()] + } + }); + + let response = query::(status_url, &body) + .await + .map_err(|e| { + format!("Failed to query status of deployment `{deployment}`: {e}") + })?; + + if let Some(errors) = response.errors { + warn!( + "Errors encountered querying the deployment status for `{}`: {}", + deployment, + errors + .into_iter() + .map(|e| e.message) + .collect::>() + .join(", ") + ); + } + + response + .data + .and_then(|data| data.indexing_statuses) + .and_then(|data| data.get(0).map(Clone::clone)) + .ok_or_else(|| format!("Deployment `{}` not found", deployment)) + } + }, + move |err: String| async move { + warn!( + "Error querying deployment status for `{}`: {}", + deployment, err + ); + sleep(Duration::from_secs(15)).await + }, + ) +} diff --git a/service/src/main.rs b/service/src/main.rs index aa105094..fea30cfc 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -12,6 +12,7 @@ use indexer_common::{ prelude::{ attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, SubgraphClient, }, + subgraph_client::DeploymentDetails, tap_manager::TapManager, }; @@ -70,8 +71,21 @@ async fn main() -> Result<(), std::io::Error> { // no problem. let network_subgraph = Box::leak(Box::new( SubgraphClient::new( - "network-subgraph", - &config.network_subgraph.network_subgraph_endpoint, + config + .network_subgraph + .network_subgraph_deployment + .map(|deployment| { + DeploymentDetails::for_graph_node( + &config.indexer_infrastructure.graph_node_query_endpoint, + deployment, + ) + }) + .transpose() + .expect( + "Failed to parse graph node query endpoint and network subgraph deployment", + ), + DeploymentDetails::for_query_url(&config.network_subgraph.network_subgraph_endpoint) + .expect("Failed to parse network subgraph endpoint"), ) .expect("Failed to set up network subgraph client"), )); @@ -107,8 +121,19 @@ async fn main() -> Result<(), std::io::Error> { let escrow_subgraph = Box::leak(Box::new( SubgraphClient::new( - "escrow-subgraph", - &config.escrow_subgraph.escrow_subgraph_endpoint, + config + .escrow_subgraph + .escrow_subgraph_deployment + .map(|deployment| { + DeploymentDetails::for_graph_node( + &config.indexer_infrastructure.graph_node_query_endpoint, + deployment, + ) + }) + .transpose() + .expect("Failed to parse graph node query endpoint and escrow subgraph deployment"), + DeploymentDetails::for_query_url(&config.escrow_subgraph.escrow_subgraph_endpoint) + .expect("Failed to parse escrow subgraph endpoint"), ) .expect("Failed to set up escrow subgraph client"), )); From 460c6bdd462632b2877a955982589258698d9953 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Tue, 24 Oct 2023 16:40:14 +0200 Subject: [PATCH 3/6] chore: add missing license headers --- common/src/subgraph_client/mod.rs | 3 +++ common/src/subgraph_client/monitor.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/common/src/subgraph_client/mod.rs b/common/src/subgraph_client/mod.rs index b3f0b3f7..36dca750 100644 --- a/common/src/subgraph_client/mod.rs +++ b/common/src/subgraph_client/mod.rs @@ -1,3 +1,6 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + mod client; mod monitor; diff --git a/common/src/subgraph_client/monitor.rs b/common/src/subgraph_client/monitor.rs index e4780d10..6a2dd8bd 100644 --- a/common/src/subgraph_client/monitor.rs +++ b/common/src/subgraph_client/monitor.rs @@ -1,3 +1,6 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + use std::time::Duration; use eventuals::{timer, Eventual, EventualExt}; From 9b6ff55b2d8f2cd1a38feb30c55bc939a7f8fd0a Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Wed, 25 Oct 2023 10:09:40 +0200 Subject: [PATCH 4/6] chore: fix outdated comment --- common/src/subgraph_client/client.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/common/src/subgraph_client/client.rs b/common/src/subgraph_client/client.rs index f61179de..5d820d91 100644 --- a/common/src/subgraph_client/client.rs +++ b/common/src/subgraph_client/client.rs @@ -84,9 +84,7 @@ impl DeploymentClient { } } -/// Network subgraph query wrapper -/// -/// This is Arc internally, so it can be cloned and shared between threads. +/// Client for a subgraph that can fall back from a local deployment to a remote query URL pub struct SubgraphClient { local_client: Option, remote_client: DeploymentClient, From 3e86e97ff66d65e5f4a1f329d5f0801ad934c633 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Wed, 25 Oct 2023 11:02:30 +0200 Subject: [PATCH 5/6] test: add tests for monitoring the status of deployments --- common/src/subgraph_client/monitor.rs | 164 +++++++++++++++++++++++++- 1 file changed, 163 insertions(+), 1 deletion(-) diff --git a/common/src/subgraph_client/monitor.rs b/common/src/subgraph_client/monitor.rs index 6a2dd8bd..177d96da 100644 --- a/common/src/subgraph_client/monitor.rs +++ b/common/src/subgraph_client/monitor.rs @@ -18,7 +18,7 @@ struct DeploymentStatusResponse { indexing_statuses: Option>, } -#[derive(Clone, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] pub struct DeploymentStatus { pub synced: bool, pub health: String, @@ -96,3 +96,165 @@ pub fn monitor_deployment_status( }, ) } + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + use super::*; + + #[tokio::test] + async fn test_parses_synced_and_healthy_response() { + let mock_server = MockServer::start().await; + let status_url: Url = mock_server + .uri() + .parse::() + .unwrap() + .join("/status") + .unwrap(); + let deployment = + DeploymentId::from_str("QmAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap(); + + Mock::given(method("POST")) + .and(path("/status")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "indexingStatuses": [ + { + "synced": true, + "health": "healthy" + } + ] + } + }))) + .mount(&mock_server) + .await; + + let status = monitor_deployment_status(deployment, status_url); + + assert_eq!( + status.value().await.unwrap(), + DeploymentStatus { + synced: true, + health: "healthy".to_string() + } + ); + } + + #[tokio::test] + async fn test_parses_not_synced_and_healthy_response() { + let mock_server = MockServer::start().await; + let status_url: Url = mock_server + .uri() + .parse::() + .unwrap() + .join("/status") + .unwrap(); + let deployment = + DeploymentId::from_str("QmAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap(); + + Mock::given(method("POST")) + .and(path("/status")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "indexingStatuses": [ + { + "synced": false, + "health": "healthy" + } + ] + } + }))) + .mount(&mock_server) + .await; + + let status = monitor_deployment_status(deployment, status_url); + + assert_eq!( + status.value().await.unwrap(), + DeploymentStatus { + synced: false, + health: "healthy".to_string() + } + ); + } + + #[tokio::test] + async fn test_parses_synced_and_unhealthy_response() { + let mock_server = MockServer::start().await; + let status_url: Url = mock_server + .uri() + .parse::() + .unwrap() + .join("/status") + .unwrap(); + let deployment = + DeploymentId::from_str("QmAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap(); + + Mock::given(method("POST")) + .and(path("/status")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "indexingStatuses": [ + { + "synced": true, + "health": "unhealthy" + } + ] + } + }))) + .mount(&mock_server) + .await; + + let status = monitor_deployment_status(deployment, status_url); + + assert_eq!( + status.value().await.unwrap(), + DeploymentStatus { + synced: true, + health: "unhealthy".to_string() + } + ); + } + + #[tokio::test] + async fn test_parses_synced_and_failed_response() { + let mock_server = MockServer::start().await; + let status_url: Url = mock_server + .uri() + .parse::() + .unwrap() + .join("/status") + .unwrap(); + let deployment = + DeploymentId::from_str("QmAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap(); + + Mock::given(method("POST")) + .and(path("/status")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "indexingStatuses": [ + { + "synced": true, + "health": "failed" + } + ] + } + }))) + .mount(&mock_server) + .await; + + let status = monitor_deployment_status(deployment, status_url); + + assert_eq!( + status.value().await.unwrap(), + DeploymentStatus { + synced: true, + health: "failed".to_string() + } + ); + } +} From 0b551850c70e7c1ba3ae34dc4d5aaf9bdda65863 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Wed, 25 Oct 2023 14:37:06 +0200 Subject: [PATCH 6/6] test: add tests for subgraph client fallback logic --- common/src/subgraph_client/client.rs | 218 +++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) diff --git a/common/src/subgraph_client/client.rs b/common/src/subgraph_client/client.rs index 5d820d91..d893a6c2 100644 --- a/common/src/subgraph_client/client.rs +++ b/common/src/subgraph_client/client.rs @@ -134,6 +134,8 @@ impl SubgraphClient { #[cfg(test)] mod test { + use std::str::FromStr; + use serde_json::json; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; @@ -197,4 +199,220 @@ mod test { assert!(result.data.is_some()); } + + #[tokio::test] + async fn test_uses_local_deployment_if_healthy_and_synced() { + let deployment = + DeploymentId::from_str("QmAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap(); + + let mock_server_local = MockServer::start().await; + mock_server_local + .register( + Mock::given(method("POST")) + .and(path("/status")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "indexingStatuses": [ + { + "synced": true, + "health": "healthy" + } + ] + } + }))), + ) + .await; + mock_server_local + .register( + Mock::given(method("POST")) + .and(path(&format!("/subgraphs/id/{}", deployment))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "user": { + "name": "local" + } + } + }))), + ) + .await; + + let mock_server_remote = MockServer::start().await; + mock_server_remote + .register( + Mock::given(method("POST")) + .and(path(&format!("/subgraphs/id/{}", deployment))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "user": { + "name": "remote" + } + } + }))), + ) + .await; + + // Create the subgraph client + let client = SubgraphClient::new( + Some(DeploymentDetails::for_graph_node(&mock_server_local.uri(), deployment).unwrap()), + DeploymentDetails::for_query_url(&format!( + "{}/subgraphs/id/{}", + mock_server_remote.uri(), + deployment + )) + .unwrap(), + ) + .unwrap(); + + // Query the subgraph + let response: Response = client + .query(&json!({ "query": "{ user(id: 1} { name } }"})) + .await + .unwrap(); + + assert_eq!(response.data, Some(json!({ "user": { "name": "local" } }))); + } + + #[tokio::test] + async fn test_uses_query_url_if_local_deployment_is_unhealthy() { + let deployment = + DeploymentId::from_str("QmAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap(); + + let mock_server_local = MockServer::start().await; + mock_server_local + .register( + Mock::given(method("POST")) + .and(path("/status")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "indexingStatuses": [ + { + "synced": true, + "health": "unhealthy" + } + ] + } + }))), + ) + .await; + mock_server_local + .register( + Mock::given(method("POST")) + .and(path(&format!("/subgraphs/id/{}", deployment))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "user": { + "name": "local" + } + } + }))), + ) + .await; + + let mock_server_remote = MockServer::start().await; + mock_server_remote + .register( + Mock::given(method("POST")) + .and(path(&format!("/subgraphs/id/{}", deployment))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "user": { + "name": "remote" + } + } + }))), + ) + .await; + + // Create the subgraph client + let client = SubgraphClient::new( + Some(DeploymentDetails::for_graph_node(&mock_server_local.uri(), deployment).unwrap()), + DeploymentDetails::for_query_url(&format!( + "{}/subgraphs/id/{}", + mock_server_remote.uri(), + deployment + )) + .unwrap(), + ) + .unwrap(); + + // Query the subgraph + let response: Response = client + .query(&json!({ "query": "{ user(id: 1} { name } }"})) + .await + .unwrap(); + + assert_eq!(response.data, Some(json!({ "user": { "name": "remote" } }))); + } + + #[tokio::test] + async fn test_uses_query_url_if_local_deployment_is_not_synced() { + let deployment = + DeploymentId::from_str("QmAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap(); + + let mock_server_local = MockServer::start().await; + mock_server_local + .register( + Mock::given(method("POST")) + .and(path("/status")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "indexingStatuses": [ + { + "synced": false, + "health": "healthy" + } + ] + } + }))), + ) + .await; + mock_server_local + .register( + Mock::given(method("POST")) + .and(path(&format!("/subgraphs/id/{}", deployment))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "user": { + "name": "local" + } + } + }))), + ) + .await; + + let mock_server_remote = MockServer::start().await; + mock_server_remote + .register( + Mock::given(method("POST")) + .and(path(&format!("/subgraphs/id/{}", deployment))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "user": { + "name": "remote" + } + } + }))), + ) + .await; + + // Create the subgraph client + let client = SubgraphClient::new( + Some(DeploymentDetails::for_graph_node(&mock_server_local.uri(), deployment).unwrap()), + DeploymentDetails::for_query_url(&format!( + "{}/subgraphs/id/{}", + mock_server_remote.uri(), + deployment + )) + .unwrap(), + ) + .unwrap(); + + // Query the subgraph + let response: Response = client + .query(&json!({ "query": "{ user(id: 1} { name } }"})) + .await + .unwrap(); + + assert_eq!(response.data, Some(json!({ "user": { "name": "remote" } }))); + } }