diff --git a/Cargo.lock b/Cargo.lock index ecdef04e..b27adc4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4703,9 +4703,9 @@ dependencies = [ [[package]] name = "thegraph-graphql-http" -version = "0.2.4" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae3b675ae2fd6e213fa1b428ba44009b309338b6e9b7e6205a674ccecd5d67d4" +checksum = "1381fb2b4952a416d0dd31373ddda85d908daa59e845ed1225c1f29eed3c47fb" dependencies = [ "async-trait", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 708c3e17..4e3e109f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,9 +54,7 @@ thegraph-core = { version = "0.9.0", features = [ "attestation", "serde", ] } -thegraph-graphql-http = { version = "0.2.1", features = [ - "http-client-reqwest", -] } +thegraph-graphql-http = { version = "0.3.1", features = ["reqwest"] } thiserror = "2.0.2" tokio = { version = "1.38.0", features = [ "macros", diff --git a/src/network/cost_model.rs b/src/network/cost_model.rs index 2e29f4e2..ce05fec8 100644 --- a/src/network/cost_model.rs +++ b/src/network/cost_model.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use anyhow::anyhow; use thegraph_core::DeploymentId; use thegraph_graphql_http::http_client::ReqwestExt; use url::Url; @@ -48,7 +47,8 @@ impl CostModelResolver { url: &Url, deployments: &[DeploymentId], ) -> anyhow::Result> { - let url = url.join("cost").map_err(|_| anyhow!("invalid URL"))?; + // ref: df8e647b-1e6e-422a-8846-dc9ee7e0dcc2 + let url = url.join("cost").unwrap(); let query = r#" query costModels($deployments: [String!]!) { diff --git a/src/network/indexer_processing.rs b/src/network/indexer_processing.rs index c0628636..6c710001 100644 --- a/src/network/indexer_processing.rs +++ b/src/network/indexer_processing.rs @@ -215,6 +215,9 @@ async fn process_indexer_indexings( } }; + // ref: df8e647b-1e6e-422a-8846-dc9ee7e0dcc2 + let status_url = url.join("status").unwrap(); + // Keep track of the healthy indexers, so we efficiently resolve the indexer's indexings thar // are not marked as unhealthy in a previous resolution step let mut healthy_indexer_indexings = indexer_indexings.keys().copied().collect::>(); @@ -222,7 +225,7 @@ async fn process_indexer_indexings( // Check if the indexer's indexings should be blocked by POI let blocked_indexings_by_poi = state .indexer_poi_filer - .blocked_deployments(url, &healthy_indexer_indexings) + .blocked_deployments(&status_url, &healthy_indexer_indexings) .await; // Remove the blocked indexings from the healthy indexers list @@ -247,7 +250,7 @@ async fn process_indexer_indexings( // Resolve the indexer's indexing progress information let mut indexing_progress = resolve_indexer_progress( &state.indexing_progress_resolver, - url, + &status_url, &healthy_indexer_indexings, ) .await; @@ -304,10 +307,10 @@ async fn process_indexer_indexings( /// Resolve the indexer's progress information. async fn resolve_indexer_progress( resolver: &IndexingProgressResolver, - url: &Url, + status_url: &Url, indexings: &[DeploymentId], ) -> HashMap> { - let mut progress_info = resolver.resolve(url, indexings).await; + let mut progress_info = resolver.resolve(status_url, indexings).await; // Get the progress information for each indexing indexings diff --git a/src/network/indexing_progress.rs b/src/network/indexing_progress.rs index f75d700d..61dfab5b 100644 --- a/src/network/indexing_progress.rs +++ b/src/network/indexing_progress.rs @@ -29,12 +29,11 @@ impl IndexingProgressResolver { pub async fn resolve( &self, - url: &Url, + status_url: &Url, deployments: &[DeploymentId], ) -> HashMap { - let url_string = url.to_string(); + let url_string = status_url.to_string(); - let status_url = url.join("status").unwrap(); let results = send_requests(&self.http, status_url, deployments).await; let mut outer_cache = self.cache.read(); @@ -56,7 +55,7 @@ impl IndexingProgressResolver { async fn send_requests( http: &reqwest::Client, - status_url: Url, + status_url: &Url, indexings: &[DeploymentId], ) -> HashMap { let request_batches = indexings.chunks(100); diff --git a/src/network/poi_filter.rs b/src/network/poi_filter.rs index 57470a72..275057a2 100644 --- a/src/network/poi_filter.rs +++ b/src/network/poi_filter.rs @@ -36,7 +36,7 @@ impl PoiFilter { pub async fn blocked_deployments( &self, - url: &Url, + status_url: &Url, deployments: &[DeploymentId], ) -> HashSet { let requests: Vec<(DeploymentId, BlockNumber)> = self @@ -47,7 +47,7 @@ impl PoiFilter { entries.iter().map(|(block, _)| (*deployment, *block)) }) .collect(); - let pois = self.resolve(url, requests).await; + let pois = self.resolve(status_url, requests).await; deployments .iter() @@ -66,10 +66,10 @@ impl PoiFilter { /// Fetch public PoIs, such that results are cached indefinitely and refreshed every 20 minutes. async fn resolve( &self, - url: &Url, + status_url: &Url, requests: Vec<(DeploymentId, BlockNumber)>, ) -> HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> { - let url_string = url.to_string(); + let url_string = status_url.to_string(); let mut results: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> = Default::default(); let mut refresh = false; @@ -98,7 +98,7 @@ impl PoiFilter { return results; } let fetched: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> = - send_requests(&self.http, url, updates).await; + send_requests(&self.http, status_url, updates).await; { let now = Instant::now(); let mut cache = self.cache.write(); @@ -163,15 +163,16 @@ async fn send_request( block { number } } }"#; + let request = GraphQlRequest { + document: query.to_string(), + variables: serde_json::json!({ "requests": pois.iter().map(|(deployment, block)| serde_json::json!({ + "deployment": deployment, + "blockNumber": block, + })).collect::>() }), + }; let response = http .post(status_url) - .send_graphql::(GraphQlRequest { - document: query.to_string(), - variables: serde_json::json!({ "requests": pois.iter().map(|(deployment, block)| serde_json::json!({ - "deployment": deployment, - "blockNumber": block, - })).collect::>() }), - }) + .send_graphql::(request) .await??; Ok(response.public_proofs_of_indexing) } @@ -197,57 +198,82 @@ pub struct PartialBlockPtr { #[cfg(test)] mod tests { - use thegraph_core::{deployment_id, proof_of_indexing as poi}; - - use super::Response; - - #[test] - fn deserialize_public_pois_response() { - //* Given - let response = r#"{ - "publicProofsOfIndexing": [ - { - "deployment": "QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH", - "proofOfIndexing": "0xba8a057796a81e013789789996551bb5b2920fb9947334db956992f7098bd287", - "block": { - "number": "123" - } - }, - { - "deployment": "QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw", - "block": { - "number": "456" + use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, + }; + + use thegraph_core::{ + alloy::{hex, primitives::FixedBytes}, + DeploymentId, + }; + use url::Url; + + use crate::init_logging; + + #[tokio::test] + async fn poi_filter() { + init_logging("poi_filter", false); + + type ResponsePoi = Arc>>; + let deployment: DeploymentId = "QmUzRg2HHMpbgf6Q4VHKNDbtBEJnyp5JWCh2gUX9AV6jXv" + .parse() + .unwrap(); + let bad_poi = + hex!("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").into(); + let response_poi: ResponsePoi = Arc::new(parking_lot::Mutex::new(bad_poi)); + let request_count = Arc::new(parking_lot::Mutex::new(0)); + + let route = { + let response_poi = response_poi.clone(); + let request_count = request_count.clone(); + axum::routing::post(move || async move { + *request_count.lock() += 1; + axum::Json(serde_json::json!({ + "data": { + "publicProofsOfIndexing": [ + { + "deployment": deployment, + "proofOfIndexing": response_poi.lock().to_string(), + "block": { "number": "0" } + } + ] } - } - ] - }"#; + })) + }) + }; + let app = axum::Router::new().route("/status", route); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let indexer_url: Url = + format!("http://127.0.0.1:{}", listener.local_addr().unwrap().port()) + .parse() + .unwrap(); + eprintln!("listening on {indexer_url}"); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); - //* When - let response = serde_json::from_str::(response); - - //* Then - let response = response.expect("deserialization failed"); - - assert_eq!(response.public_proofs_of_indexing.len(), 2); - assert_eq!( - response.public_proofs_of_indexing[0].deployment, - deployment_id!("QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH") - ); - assert_eq!( - response.public_proofs_of_indexing[0].proof_of_indexing, - Some(poi!( - "ba8a057796a81e013789789996551bb5b2920fb9947334db956992f7098bd287" - )) - ); - assert_eq!(response.public_proofs_of_indexing[0].block.number, 123); - assert_eq!( - response.public_proofs_of_indexing[1].deployment, - deployment_id!("QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw") - ); - assert_eq!( - response.public_proofs_of_indexing[1].proof_of_indexing, - None - ); - assert_eq!(response.public_proofs_of_indexing[1].block.number, 456); + let blocklist = HashMap::from([(deployment, vec![(0, bad_poi.into())])]); + let poi_filter = super::PoiFilter::new(reqwest::Client::new(), blocklist); + + let status_url = indexer_url.join("status").unwrap(); + let assert_blocked = |blocked: Vec| async { + let result = poi_filter + .blocked_deployments(&status_url, &[deployment]) + .await; + assert_eq!(result, HashSet::from_iter(blocked)); + }; + + assert_blocked(vec![deployment]).await; + assert_eq!(*request_count.lock(), 1); + *response_poi.lock() = + hex!("1111111111111111111111111111111111111111111111111111111111111111").into(); + assert_blocked(vec![deployment]).await; + assert_eq!(*request_count.lock(), 1); + tokio::time::pause(); + tokio::time::advance(Duration::from_secs(20 * 60)).await; + assert_blocked(vec![]).await; + assert_eq!(*request_count.lock(), 2); } }