diff --git a/common/src/attestations/dispute_manager.rs b/common/src/attestations/dispute_manager.rs new file mode 100644 index 000000000..796e76fc4 --- /dev/null +++ b/common/src/attestations/dispute_manager.rs @@ -0,0 +1,138 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +use alloy_primitives::Address; +use eventuals::{timer, Eventual, EventualExt}; +use log::warn; +use serde::Deserialize; +use serde_json::json; +use tokio::time::sleep; + +use crate::network_subgraph::NetworkSubgraph; + +pub fn dispute_manager( + network_subgraph: &'static NetworkSubgraph, + graph_network_id: u64, + interval: Duration, +) -> Eventual
{ + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct DisputeManagerResponse { + graph_network: Option, + } + + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct GraphNetwork { + dispute_manager: Address, + } + + timer(interval).map_with_retry( + move |_| async move { + let response = network_subgraph + .query::(&json!({ + "query": r#" + query network($id: ID!) { + graphNetwork(id: $id) { + disputeManager + } + } + "#, + "variables": { + "id": graph_network_id + } + })) + .await + .map_err(|e| e.to_string())?; + + if let Some(errors) = response.errors { + warn!( + "Errors encountered querying the dispute manager for network {}: {}", + graph_network_id, + errors + .into_iter() + .map(|e| e.message) + .collect::>() + .join(", ") + ); + } + + response + .data + .and_then(|data| data.graph_network) + .map(|network| network.dispute_manager) + .ok_or_else(|| { + format!("Network {} not found in network subgraph", graph_network_id) + }) + }, + move |err: String| { + warn!( + "Failed to query dispute manager for network {}: {}", + graph_network_id, err, + ); + + // Sleep for a bit before we retry + sleep(interval.div_f32(2.0)) + }, + ) +} + +#[cfg(test)] +mod test { + use serde_json::json; + use wiremock::{ + matchers::{method, path}, + Mock, MockServer, ResponseTemplate, + }; + + use crate::{ + prelude::NetworkSubgraph, + test_vectors::{self, DISPUTE_MANAGER_ADDRESS}, + }; + + use super::*; + + async fn setup_mock_network_subgraph() -> (&'static NetworkSubgraph, MockServer) { + // Set up a mock network subgraph + let mock_server = MockServer::start().await; + let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint( + &mock_server.uri(), + &test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT, + ); + let network_subgraph = NetworkSubgraph::new( + Some(&mock_server.uri()), + Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT), + network_subgraph_endpoint.as_ref(), + ); + + // Mock result for current epoch requests + mock_server + .register( + Mock::given(method("POST")) + .and(path(format!( + "/subgraphs/id/{}", + *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT + ))) + .respond_with(ResponseTemplate::new(200).set_body_json( + json!({ "data": { "graphNetwork": { "disputeManager": *DISPUTE_MANAGER_ADDRESS }}}), + )), + ) + .await; + + (Box::leak(Box::new(network_subgraph)), mock_server) + } + + #[test_log::test(tokio::test)] + async fn test_parses_dispute_manager_from_network_subgraph_correctly() { + let (network_subgraph, _mock_server) = setup_mock_network_subgraph().await; + + let dispute_manager = dispute_manager(network_subgraph, 1, Duration::from_secs(60)); + + assert_eq!( + dispute_manager.value().await.unwrap(), + *DISPUTE_MANAGER_ADDRESS + ); + } +} diff --git a/common/src/attestations/mod.rs b/common/src/attestations/mod.rs index 3ab13f4dd..62c50a6c0 100644 --- a/common/src/attestations/mod.rs +++ b/common/src/attestations/mod.rs @@ -3,3 +3,4 @@ pub mod signer; pub mod signers; +pub mod dispute_manager; \ No newline at end of file diff --git a/common/src/attestations/signers.rs b/common/src/attestations/signers.rs index 28609d91f..ca230e955 100644 --- a/common/src/attestations/signers.rs +++ b/common/src/attestations/signers.rs @@ -3,7 +3,7 @@ use alloy_primitives::Address; use ethers_core::types::U256; -use eventuals::{Eventual, EventualExt}; +use eventuals::{join, Eventual, EventualExt}; use log::warn; use std::collections::HashMap; use std::sync::Arc; @@ -16,7 +16,7 @@ pub fn attestation_signers( indexer_allocations: Eventual>, indexer_mnemonic: String, chain_id: U256, - dispute_manager: Address, + dispute_manager: Eventual
, ) -> Eventual> { let attestation_signers_map: &'static Mutex> = Box::leak(Box::new(Mutex::new(HashMap::new()))); @@ -25,7 +25,7 @@ pub fn attestation_signers( // Whenever the indexer's active or recently closed allocations change, make sure // we have attestation signers for all of them - indexer_allocations.map(move |allocations| { + join((indexer_allocations, dispute_manager)).map(move |(allocations, dispute_manager)| { let indexer_mnemonic = indexer_mnemonic.clone(); async move { let mut signers = attestation_signers_map.lock().await; @@ -72,12 +72,15 @@ mod tests { #[tokio::test] async fn test_attestation_signers_update_with_allocations() { let (mut allocations_writer, allocations) = Eventual::>::new(); + let (mut dispute_manager_writer, dispute_manager) = Eventual::
::new(); + + dispute_manager_writer.write(*DISPUTE_MANAGER_ADDRESS); let signers = attestation_signers( allocations, (*INDEXER_OPERATOR_MNEMONIC).to_string(), U256::from(1), - *DISPUTE_MANAGER_ADDRESS, + dispute_manager, ); let mut signers = signers.subscribe(); diff --git a/common/src/lib.rs b/common/src/lib.rs index 1d84f956d..980e182ff 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -14,6 +14,8 @@ pub mod prelude { pub use super::allocations::{ monitor::indexer_allocations, Allocation, AllocationStatus, SubgraphDeployment, }; - pub use super::attestations::{signer::AttestationSigner, signers::attestation_signers}; + pub use super::attestations::{ + dispute_manager::dispute_manager, signer::AttestationSigner, signers::attestation_signers, + }; pub use super::network_subgraph::NetworkSubgraph; } diff --git a/service/src/main.rs b/service/src/main.rs index e91f36b00..ff3df1ce4 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -1,7 +1,6 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use alloy_primitives::Address; use alloy_sol_types::eip712_domain; use axum::Server; use dotenvy::dotenv; @@ -10,7 +9,9 @@ use std::{net::SocketAddr, str::FromStr, time::Duration}; use toolshed::thegraph::DeploymentId; use tracing::info; -use indexer_common::prelude::{attestation_signers, indexer_allocations, NetworkSubgraph}; +use indexer_common::prelude::{ + attestation_signers, dispute_manager, indexer_allocations, NetworkSubgraph, +}; use util::{package_version, shutdown_signal}; @@ -86,13 +87,17 @@ async fn main() -> Result<(), std::io::Error> { Duration::from_secs(config.network_subgraph.allocation_syncing_interval), ); + // TODO: Chain ID should be a config + let graph_network_id = 1; + + let dispute_manager = + dispute_manager(network_subgraph, graph_network_id, Duration::from_secs(60)); + let attestation_signers = attestation_signers( indexer_allocations.clone(), config.ethereum.mnemonic.clone(), - // TODO: Chain ID should be a config - U256::from(1), - // TODO: Dispute manager address should be a config - Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(), + U256::from(graph_network_id), + dispute_manager, ); // Establish Database connection necessary for serving indexer management