Skip to content

Commit

Permalink
refactor: generalize network subgraph into a subgraph client
Browse files Browse the repository at this point in the history
This way we can also use it for the escrow subgraph and others that we may need
in the future.
  • Loading branch information
Jannis committed Oct 17, 2023
1 parent b10a09d commit 9321ef4
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 58 deletions.
20 changes: 11 additions & 9 deletions common/src/allocations/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use serde::Deserialize;
use serde_json::json;
use tokio::time::sleep;

use crate::prelude::NetworkSubgraph;
use crate::prelude::SubgraphClient;

use super::Allocation;

async fn current_epoch(
network_subgraph: &'static NetworkSubgraph,
network_subgraph: &'static SubgraphClient,
graph_network_id: u64,
) -> Result<u64, anyhow::Error> {
// Types for deserializing the network subgraph response
Expand Down Expand Up @@ -63,7 +63,7 @@ async fn current_epoch(

/// An always up-to-date list of an indexer's active and recently closed allocations.
pub fn indexer_allocations(
network_subgraph: &'static NetworkSubgraph,
network_subgraph: &'static SubgraphClient,
indexer_address: Address,
graph_network_id: u64,
interval: Duration,
Expand Down Expand Up @@ -195,22 +195,24 @@ mod test {
Mock, MockServer, ResponseTemplate,
};

use crate::{prelude::NetworkSubgraph, test_vectors};
use crate::{prelude::SubgraphClient, test_vectors};

use super::*;

async fn setup_mock_network_subgraph() -> (&'static NetworkSubgraph, MockServer) {
async fn setup_mock_network_subgraph() -> (&'static SubgraphClient, MockServer) {
// Set up a mock network subgraph
let mock_server = MockServer::start().await;
let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint(
let network_subgraph_endpoint = SubgraphClient::local_deployment_endpoint(
&mock_server.uri(),
&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT,
);
let network_subgraph = NetworkSubgraph::new(
)
.unwrap();
let network_subgraph = SubgraphClient::new(
Some(&mock_server.uri()),
Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT),
network_subgraph_endpoint.as_ref(),
);
)
.unwrap();

// Mock result for current epoch requests
mock_server
Expand Down
18 changes: 10 additions & 8 deletions common/src/attestations/dispute_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use serde::Deserialize;
use serde_json::json;
use tokio::time::sleep;

use crate::network_subgraph::NetworkSubgraph;
use crate::subgraph_client::SubgraphClient;

pub fn dispute_manager(
network_subgraph: &'static NetworkSubgraph,
network_subgraph: &'static SubgraphClient,
graph_network_id: u64,
interval: Duration,
) -> Eventual<Address> {
Expand Down Expand Up @@ -88,24 +88,26 @@ mod test {
};

use crate::{
prelude::NetworkSubgraph,
prelude::SubgraphClient,
test_vectors::{self, DISPUTE_MANAGER_ADDRESS},
};

use super::*;

async fn setup_mock_network_subgraph() -> (&'static NetworkSubgraph, MockServer) {
async fn setup_mock_network_subgraph() -> (&'static SubgraphClient, MockServer) {
// Set up a mock network subgraph
let mock_server = MockServer::start().await;
let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint(
let network_subgraph_endpoint = SubgraphClient::local_deployment_endpoint(
&mock_server.uri(),
&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT,
);
let network_subgraph = NetworkSubgraph::new(
)
.unwrap();
let network_subgraph = SubgraphClient::new(
Some(&mock_server.uri()),
Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT),
network_subgraph_endpoint.as_ref(),
);
)
.unwrap();

// Mock result for current epoch requests
mock_server
Expand Down
4 changes: 2 additions & 2 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
pub mod allocations;
pub mod attestations;
pub mod graphql;
pub mod network_subgraph;
pub mod signature_verification;
pub mod subgraph_client;

#[cfg(test)]
mod test_vectors;
Expand All @@ -17,5 +17,5 @@ pub mod prelude {
pub use super::attestations::{
dispute_manager::dispute_manager, signer::AttestationSigner, signers::attestation_signers,
};
pub use super::network_subgraph::NetworkSubgraph;
pub use super::subgraph_client::SubgraphClient;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::sync::Arc;

use anyhow::anyhow;
use graphql::http::Response;
use reqwest::{header, Client, Url};
use serde::de::Deserialize;
Expand All @@ -13,56 +14,60 @@ use toolshed::thegraph::DeploymentId;
///
/// This is Arc internally, so it can be cloned and shared between threads.
#[derive(Debug, Clone)]
pub struct NetworkSubgraph {
pub struct SubgraphClient {
client: Client, // it is Arc
network_subgraph_url: Arc<Url>,
subgraph_url: Arc<Url>,
}

impl NetworkSubgraph {
impl SubgraphClient {
pub fn new(
graph_node_query_endpoint: Option<&str>,
deployment: Option<&DeploymentId>,
network_subgraph_url: &str,
) -> NetworkSubgraph {
//TODO: Check indexing status of the local network subgraph deployment
//if the deployment is healthy and synced, use local_network_subgraph_endpoint
let _local_network_subgraph_endpoint = match (graph_node_query_endpoint, deployment) {
(Some(endpoint), Some(id)) => {
Some(NetworkSubgraph::local_deployment_endpoint(endpoint, id))
}
subgraph_url: &str,
) -> Result<Self, anyhow::Error> {
// TODO: Check indexing status of the local subgraph deployment
// if the deployment is healthy and synced, use local_subgraoh_endpoint
let _local_subgraph_endpoint = match (graph_node_query_endpoint, deployment) {
(Some(endpoint), Some(id)) => Some(Self::local_deployment_endpoint(endpoint, id)?),
_ => None,
};

let network_subgraph_url =
Url::parse(network_subgraph_url).expect("Could not parse network subgraph url");
let subgraph_url = Url::parse(subgraph_url)
.map_err(|e| anyhow!("Could not parse subgraph url `{}`: {}", subgraph_url, e))?;

let client = reqwest::Client::builder()
.user_agent("indexer-service")
.user_agent("indexer-common")
.build()
.expect("Could not build a client to graph node query endpoint");
.expect("Could not build a client for the Graph Node query endpoint");

NetworkSubgraph {
Ok(Self {
client,
network_subgraph_url: Arc::new(network_subgraph_url),
}
subgraph_url: Arc::new(subgraph_url),
})
}

pub fn local_deployment_endpoint(
graph_node_query_endpoint: &str,
deployment: &DeploymentId,
) -> Url {
) -> Result<Url, anyhow::Error> {
Url::parse(graph_node_query_endpoint)
.and_then(|u| u.join("/subgraphs/id/"))
.and_then(|u| u.join(&deployment.to_string()))
.expect("Could not parse graph node query endpoint for the network subgraph deployment")
.map_err(|e| {
anyhow!(
"Could not parse Graph Node query endpoint for subgraph deployment `{}`: {}",
deployment,
e
)
})
}

pub async fn query<T: for<'de> Deserialize<'de>>(
&self,
body: &Value,
) -> Result<Response<T>, reqwest::Error> {
self.client
.post(Url::clone(&self.network_subgraph_url))
.post(Url::clone(&self.subgraph_url))
.json(body)
.header(header::CONTENT_TYPE, "application/json")
.send()
Expand Down Expand Up @@ -111,20 +116,21 @@ mod test {
mock_server
}

fn network_subgraph() -> NetworkSubgraph {
NetworkSubgraph::new(
fn network_subgraph_client() -> SubgraphClient {
SubgraphClient::new(
Some(GRAPH_NODE_STATUS_ENDPOINT),
Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT),
NETWORK_SUBGRAPH_URL,
)
.unwrap()
}

#[tokio::test]
async fn test_network_query() {
let _mock_server = mock_graph_node_server().await;

// Check that the response is valid JSON
let result = network_subgraph()
let result = network_subgraph_client()
.query::<Value>(&json!({
"query": r#"
query {
Expand Down
27 changes: 15 additions & 12 deletions service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use toolshed::thegraph::DeploymentId;
use tracing::info;

use indexer_common::prelude::{
attestation_signers, dispute_manager, indexer_allocations, NetworkSubgraph,
attestation_signers, dispute_manager, indexer_allocations, SubgraphClient,
};

use util::{package_version, shutdown_signal};
Expand Down Expand Up @@ -68,17 +68,20 @@ async fn main() -> Result<(), std::io::Error> {
// a static lifetime, which avoids having to pass around and clone `Arc`
// objects everywhere. Since the network subgraph is read-only, this is
// no problem.
let network_subgraph = Box::leak(Box::new(NetworkSubgraph::new(
Some(&config.indexer_infrastructure.graph_node_query_endpoint),
config
.network_subgraph
.network_subgraph_deployment
.map(|s| DeploymentId::from_str(&s))
.transpose()
.expect("Failed to parse invalid network subgraph deployment")
.as_ref(),
&config.network_subgraph.network_subgraph_endpoint,
)));
let network_subgraph = Box::leak(Box::new(
SubgraphClient::new(
Some(&config.indexer_infrastructure.graph_node_query_endpoint),
config
.network_subgraph
.network_subgraph_deployment
.map(|s| DeploymentId::from_str(&s))
.transpose()
.expect("Failed to parse invalid network subgraph deployment")
.as_ref(),
&config.network_subgraph.network_subgraph_endpoint,
)
.expect("Failed to set up network subgraph client"),
));

let indexer_allocations = indexer_allocations(
network_subgraph,
Expand Down
6 changes: 3 additions & 3 deletions service/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tower_http::{
};
use tracing::Level;

use indexer_common::prelude::NetworkSubgraph;
use indexer_common::prelude::SubgraphClient;

use crate::{
query_processor::QueryProcessor,
Expand All @@ -38,7 +38,7 @@ pub struct ServerOptions {
pub graph_node_status_endpoint: String,
pub indexer_management_db: PgPool,
pub operator_public_key: String,
pub network_subgraph: &'static NetworkSubgraph,
pub network_subgraph: &'static SubgraphClient,
pub network_subgraph_auth_token: Option<String>,
pub serve_network_subgraph: bool,
}
Expand All @@ -53,7 +53,7 @@ impl ServerOptions {
graph_node_status_endpoint: String,
indexer_management_db: PgPool,
operator_public_key: String,
network_subgraph: &'static NetworkSubgraph,
network_subgraph: &'static SubgraphClient,
network_subgraph_auth_token: Option<String>,
serve_network_subgraph: bool,
) -> Self {
Expand Down

0 comments on commit 9321ef4

Please sign in to comment.