Skip to content

Commit

Permalink
test PoI filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Dec 9, 2024
1 parent b05fb4b commit d8efaee
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 77 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ thegraph-core = { version = "0.9.0", features = [
"attestation",
"serde",
] }
thegraph-graphql-http = { version = "0.2.1", features = [
"http-client-reqwest",
] }
thegraph-graphql-http = { version = "0.3.1", features = ["reqwest"] }
thiserror = "2.0.2"
tokio = { version = "1.38.0", features = [
"macros",
Expand Down
4 changes: 2 additions & 2 deletions src/network/cost_model.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashMap;

use anyhow::anyhow;
use thegraph_core::DeploymentId;
use thegraph_graphql_http::http_client::ReqwestExt;
use url::Url;
Expand Down Expand Up @@ -48,7 +47,8 @@ impl CostModelResolver {
url: &Url,
deployments: &[DeploymentId],
) -> anyhow::Result<HashMap<DeploymentId, String>> {
let url = url.join("cost").map_err(|_| anyhow!("invalid URL"))?;
// ref: df8e647b-1e6e-422a-8846-dc9ee7e0dcc2
let url = url.join("cost").unwrap();

let query = r#"
query costModels($deployments: [String!]!) {
Expand Down
11 changes: 7 additions & 4 deletions src/network/indexer_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,17 @@ async fn process_indexer_indexings(
}
};

// ref: df8e647b-1e6e-422a-8846-dc9ee7e0dcc2
let status_url = url.join("status").unwrap();

// Keep track of the healthy indexers, so we efficiently resolve the indexer's indexings thar
// are not marked as unhealthy in a previous resolution step
let mut healthy_indexer_indexings = indexer_indexings.keys().copied().collect::<Vec<_>>();

// Check if the indexer's indexings should be blocked by POI
let blocked_indexings_by_poi = state
.indexer_poi_filer
.blocked_deployments(url, &healthy_indexer_indexings)
.blocked_deployments(&status_url, &healthy_indexer_indexings)
.await;

// Remove the blocked indexings from the healthy indexers list
Expand All @@ -247,7 +250,7 @@ async fn process_indexer_indexings(
// Resolve the indexer's indexing progress information
let mut indexing_progress = resolve_indexer_progress(
&state.indexing_progress_resolver,
url,
&status_url,
&healthy_indexer_indexings,
)
.await;
Expand Down Expand Up @@ -304,10 +307,10 @@ async fn process_indexer_indexings(
/// Resolve the indexer's progress information.
async fn resolve_indexer_progress(
resolver: &IndexingProgressResolver,
url: &Url,
status_url: &Url,
indexings: &[DeploymentId],
) -> HashMap<DeploymentId, Result<IndexingProgress, UnavailableReason>> {
let mut progress_info = resolver.resolve(url, indexings).await;
let mut progress_info = resolver.resolve(status_url, indexings).await;

// Get the progress information for each indexing
indexings
Expand Down
7 changes: 3 additions & 4 deletions src/network/indexing_progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ impl IndexingProgressResolver {

pub async fn resolve(
&self,
url: &Url,
status_url: &Url,
deployments: &[DeploymentId],
) -> HashMap<DeploymentId, IndexingProgressInfo> {
let url_string = url.to_string();
let url_string = status_url.to_string();

let status_url = url.join("status").unwrap();
let results = send_requests(&self.http, status_url, deployments).await;

let mut outer_cache = self.cache.read();
Expand All @@ -56,7 +55,7 @@ impl IndexingProgressResolver {

async fn send_requests(
http: &reqwest::Client,
status_url: Url,
status_url: &Url,
indexings: &[DeploymentId],
) -> HashMap<DeploymentId, IndexingProgressInfo> {
let request_batches = indexings.chunks(100);
Expand Down
150 changes: 88 additions & 62 deletions src/network/poi_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl PoiFilter {

pub async fn blocked_deployments(
&self,
url: &Url,
status_url: &Url,
deployments: &[DeploymentId],
) -> HashSet<DeploymentId> {
let requests: Vec<(DeploymentId, BlockNumber)> = self
Expand All @@ -47,7 +47,7 @@ impl PoiFilter {
entries.iter().map(|(block, _)| (*deployment, *block))
})
.collect();
let pois = self.resolve(url, requests).await;
let pois = self.resolve(status_url, requests).await;

deployments
.iter()
Expand All @@ -66,10 +66,10 @@ impl PoiFilter {
/// Fetch public PoIs, such that results are cached indefinitely and refreshed every 20 minutes.
async fn resolve(
&self,
url: &Url,
status_url: &Url,
requests: Vec<(DeploymentId, BlockNumber)>,
) -> HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> {
let url_string = url.to_string();
let url_string = status_url.to_string();

let mut results: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> = Default::default();
let mut refresh = false;
Expand Down Expand Up @@ -98,7 +98,7 @@ impl PoiFilter {
return results;
}
let fetched: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> =
send_requests(&self.http, url, updates).await;
send_requests(&self.http, status_url, updates).await;
{
let now = Instant::now();
let mut cache = self.cache.write();
Expand Down Expand Up @@ -163,15 +163,16 @@ async fn send_request(
block { number }
}
}"#;
let request = GraphQlRequest {
document: query.to_string(),
variables: serde_json::json!({ "requests": pois.iter().map(|(deployment, block)| serde_json::json!({
"deployment": deployment,
"blockNumber": block,
})).collect::<Vec<_>>() }),
};
let response = http
.post(status_url)
.send_graphql::<Response>(GraphQlRequest {
document: query.to_string(),
variables: serde_json::json!({ "requests": pois.iter().map(|(deployment, block)| serde_json::json!({
"deployment": deployment,
"blockNumber": block,
})).collect::<Vec<_>>() }),
})
.send_graphql::<Response>(request)
.await??;
Ok(response.public_proofs_of_indexing)
}
Expand All @@ -197,57 +198,82 @@ pub struct PartialBlockPtr {

#[cfg(test)]
mod tests {
use thegraph_core::{deployment_id, proof_of_indexing as poi};

use super::Response;

#[test]
fn deserialize_public_pois_response() {
//* Given
let response = r#"{
"publicProofsOfIndexing": [
{
"deployment": "QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH",
"proofOfIndexing": "0xba8a057796a81e013789789996551bb5b2920fb9947334db956992f7098bd287",
"block": {
"number": "123"
}
},
{
"deployment": "QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw",
"block": {
"number": "456"
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

use thegraph_core::{
alloy::{hex, primitives::FixedBytes},
DeploymentId,
};
use url::Url;

use crate::init_logging;

#[tokio::test]
async fn poi_filter() {
init_logging("poi_filter", false);

type ResponsePoi = Arc<parking_lot::Mutex<FixedBytes<32>>>;
let deployment: DeploymentId = "QmUzRg2HHMpbgf6Q4VHKNDbtBEJnyp5JWCh2gUX9AV6jXv"
.parse()
.unwrap();
let bad_poi =
hex!("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").into();
let response_poi: ResponsePoi = Arc::new(parking_lot::Mutex::new(bad_poi));
let request_count = Arc::new(parking_lot::Mutex::new(0));

let route = {
let response_poi = response_poi.clone();
let request_count = request_count.clone();
axum::routing::post(move || async move {
*request_count.lock() += 1;
axum::Json(serde_json::json!({
"data": {
"publicProofsOfIndexing": [
{
"deployment": deployment,
"proofOfIndexing": response_poi.lock().to_string(),
"block": { "number": "0" }
}
]
}
}
]
}"#;
}))
})
};
let app = axum::Router::new().route("/status", route);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let indexer_url: Url =
format!("http://127.0.0.1:{}", listener.local_addr().unwrap().port())
.parse()
.unwrap();
eprintln!("listening on {indexer_url}");
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});

//* When
let response = serde_json::from_str::<Response>(response);

//* Then
let response = response.expect("deserialization failed");

assert_eq!(response.public_proofs_of_indexing.len(), 2);
assert_eq!(
response.public_proofs_of_indexing[0].deployment,
deployment_id!("QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH")
);
assert_eq!(
response.public_proofs_of_indexing[0].proof_of_indexing,
Some(poi!(
"ba8a057796a81e013789789996551bb5b2920fb9947334db956992f7098bd287"
))
);
assert_eq!(response.public_proofs_of_indexing[0].block.number, 123);
assert_eq!(
response.public_proofs_of_indexing[1].deployment,
deployment_id!("QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw")
);
assert_eq!(
response.public_proofs_of_indexing[1].proof_of_indexing,
None
);
assert_eq!(response.public_proofs_of_indexing[1].block.number, 456);
let blocklist = HashMap::from([(deployment, vec![(0, bad_poi.into())])]);
let poi_filter = super::PoiFilter::new(reqwest::Client::new(), blocklist);

let status_url = indexer_url.join("status").unwrap();
let assert_blocked = |blocked: Vec<DeploymentId>| async {
let result = poi_filter
.blocked_deployments(&status_url, &[deployment])
.await;
assert_eq!(result, HashSet::from_iter(blocked));
};

assert_blocked(vec![deployment]).await;
assert_eq!(*request_count.lock(), 1);
*response_poi.lock() =
hex!("1111111111111111111111111111111111111111111111111111111111111111").into();
assert_blocked(vec![deployment]).await;
assert_eq!(*request_count.lock(), 1);
tokio::time::pause();
tokio::time::advance(Duration::from_secs(20 * 60)).await;
assert_blocked(vec![]).await;
assert_eq!(*request_count.lock(), 2);
}
}

0 comments on commit d8efaee

Please sign in to comment.