diff --git a/src/config.rs b/src/config.rs index c2ad61e1..d129dd50 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,3 @@ -//! The Graph Gateway configuration. - use std::{ collections::{BTreeMap, HashSet}, path::{Path, PathBuf}, @@ -26,9 +24,9 @@ pub struct Config { #[serde(default)] pub api_keys: Option, pub attestations: AttestationConfig, - /// List of indexer addresses to block. This should only be used temprorarily. + /// Blocklist applying to indexers. #[serde(default)] - pub blocked_indexers: BTreeMap, + pub blocklist: Vec, /// Chain aliases #[serde(default)] pub chain_aliases: BTreeMap, @@ -53,9 +51,6 @@ pub struct Config { pub trusted_indexers: Vec, /// Check payment state of client (disable for testnets) pub payment_required: bool, - /// POI blocklist - #[serde(default)] - pub poi_blocklist: Vec, /// public API port pub port_api: u16, /// private metrics port @@ -96,11 +91,30 @@ pub enum ApiKeys { Fixed(Vec), } -#[derive(Deserialize)] -pub struct BlockedIndexer { - /// empty array blocks on all deployments - pub deployments: Vec, - pub reason: String, +#[derive(Clone, Deserialize, Serialize)] +#[serde(untagged)] +pub enum BlocklistEntry { + Poi { + deployment: DeploymentId, + info: BlocklistInfo, + public_poi: B256, + block: BlockNumber, + }, + Other { + deployment: DeploymentId, + info: BlocklistInfo, + indexer: Address, + }, +} + +#[derive(Clone, Deserialize, Serialize)] +pub struct BlocklistInfo { + /// Example query (should be minimal to reproduce bad response) + #[serde(default, skip_serializing_if = "Option::is_none")] + query: Option, + /// Bad query response, from the above query executed on indexers with this blocked PoI + #[serde(default, skip_serializing_if = "Option::is_none")] + bad_query_response: Option, } /// Attestation configuration. @@ -128,7 +142,7 @@ pub enum ExchangeRateProvider { /// Kafka configuration. /// /// See [`Config`]'s [`kafka`](struct.Config.html#structfield.kafka). -#[derive(Deserialize)] +#[derive(Clone, Deserialize)] pub struct KafkaConfig(BTreeMap); impl Default for KafkaConfig { @@ -171,17 +185,6 @@ pub struct Receipts { pub verifier: Address, } -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct BlockedPoi { - pub public_poi: B256, - pub deployment: DeploymentId, - pub block_number: BlockNumber, - /// Example query (should be minimal to reproduce bad response) - pub query: Option, - /// Bad query response, from the above query executed on indexers with this blocked PoI - pub bad_query_response: Option, -} - /// Load the configuration from a JSON file. pub fn load_from_file(path: &Path) -> anyhow::Result { let config_content = std::fs::read_to_string(path)?; diff --git a/src/main.rs b/src/main.rs index 93c185de..f6679748 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,13 +39,13 @@ use axum::{ use budgets::{Budgeter, USD}; use chains::Chains; use client_query::context::Context; -use config::{ApiKeys, ExchangeRateProvider}; +use config::{ApiKeys, BlocklistEntry, ExchangeRateProvider}; use indexer_client::IndexerClient; use indexing_performance::IndexingPerformance; use middleware::{ legacy_auth_adapter, RequestTracingLayer, RequireAuthorizationLayer, SetRequestIdLayer, }; -use network::subgraph_client::Client as SubgraphClient; +use network::{indexer_blocklist, subgraph_client::Client as SubgraphClient}; use prometheus::{self, Encoder as _}; use receipts::ReceiptSigner; use thegraph_core::{ @@ -109,14 +109,15 @@ async fn main() { } None => Default::default(), }; + let indexer_blocklist = + indexer_blocklist::Blocklist::spawn(conf.blocklist, conf.kafka.clone().into()); let mut network = network::service::spawn( http_client.clone(), network_subgraph_client, + indexer_blocklist.clone(), conf.min_indexer_version, conf.min_graph_node_version, - conf.blocked_indexers, indexer_host_blocklist, - conf.poi_blocklist.clone(), ); let indexing_perf = IndexingPerformance::new(network.clone()); network.wait_until_ready().await; @@ -127,7 +128,6 @@ async fn main() { conf.receipts.verifier, ))); - // Initialize the auth service let auth_service = init_auth_service(http_client.clone(), conf.api_keys, conf.payment_required).await; @@ -157,7 +157,7 @@ async fn main() { reporter, }; - let poi_blocklist: &'static str = serde_json::to_string(&conf.poi_blocklist).unwrap().leak(); + let blocklist: watch::Receiver> = indexer_blocklist.blocklist; // Host metrics on a separate server with a port that isn't open to public requests. tokio::spawn(async move { @@ -225,10 +225,7 @@ async fn main() { .route("/ready", routing::get(|| async { "Ready" })) .route( "/blocklist", - routing::get(move || async move { - let headers = [(reqwest::header::CONTENT_TYPE, "application/json")]; - (headers, poi_blocklist) - }), + routing::get(move || async move { axum::Json(blocklist.borrow().clone()) }), ) .nest("/api", api); diff --git a/src/network.rs b/src/network.rs index 03bb9051..ac9114eb 100644 --- a/src/network.rs +++ b/src/network.rs @@ -4,6 +4,7 @@ use thegraph_graphql_http::graphql::{IntoDocument as _, IntoDocumentWithVariable pub mod cost_model; pub mod host_filter; +pub mod indexer_blocklist; mod indexer_processing; pub mod indexing_progress; pub mod poi_filter; diff --git a/src/network/indexer_blocklist.rs b/src/network/indexer_blocklist.rs new file mode 100644 index 00000000..274cedb7 --- /dev/null +++ b/src/network/indexer_blocklist.rs @@ -0,0 +1,212 @@ +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; + +use anyhow::{anyhow, Context as _}; +use futures::StreamExt as _; +use rand::{thread_rng, RngCore as _}; +use rdkafka::{ + consumer::{Consumer as _, StreamConsumer}, + Message, TopicPartitionList, +}; +use thegraph_core::{alloy::primitives::Address, DeploymentId, ProofOfIndexing}; +use tokio::sync::watch; + +use crate::config::BlocklistEntry; + +#[derive(Clone)] +pub struct Blocklist { + pub blocklist: watch::Receiver>, + pub poi: watch::Receiver>>, + pub indexer: watch::Receiver>>, +} + +impl Blocklist { + pub fn spawn(init: Vec, kafka_config: rdkafka::ClientConfig) -> Self { + let (blocklist_tx, blocklist_rx) = watch::channel(Default::default()); + let (poi_tx, poi_rx) = watch::channel(Default::default()); + let (indexer_tx, indexer_rx) = watch::channel(Default::default()); + let mut actor = Actor { + blocklist: blocklist_tx, + poi: poi_tx, + indexer: indexer_tx, + }; + for entry in init { + actor.add_entry(entry); + } + tokio::spawn(async move { + actor.run(kafka_config).await; + }); + Self { + blocklist: blocklist_rx, + poi: poi_rx, + indexer: indexer_rx, + } + } +} + +struct Actor { + blocklist: watch::Sender>, + poi: watch::Sender>>, + indexer: watch::Sender>>, +} + +impl Actor { + async fn run(&mut self, kafka_config: rdkafka::ClientConfig) { + let consumer = match create_consumer(kafka_config).await { + Ok(consumer) => consumer, + Err(blocklist_err) => { + tracing::error!(%blocklist_err); + return; + } + }; + + let mut records: HashMap = Default::default(); + let mut stream = consumer.stream(); + while let Some(msg) = stream.next().await { + let msg = match msg { + Ok(msg) => msg, + Err(blocklist_recv_error) => { + tracing::error!(%blocklist_recv_error); + continue; + } + }; + let key = match msg.key_view::() { + Some(Ok(key)) => key, + result => { + tracing::error!("invalid key: {result:?}"); + continue; + } + }; + match msg.payload().map(serde_json::from_slice::) { + Some(Ok(entry)) => { + records.insert(key.to_string(), entry.clone()); + self.add_entry(entry); + } + None => { + let entry = records.remove(key); + if let Some(entry) = entry { + self.remove_entry(&entry); + } + } + Some(Err(blocklist_deserialize_err)) => { + tracing::error!(%blocklist_deserialize_err); + } + }; + } + tracing::error!("blocklist consumer stopped"); + } + + fn add_entry(&mut self, entry: BlocklistEntry) { + match entry { + BlocklistEntry::Poi { + deployment, + block, + public_poi, + .. + } => { + self.poi.send_modify(move |blocklist| { + blocklist + .entry(deployment) + .or_default() + .push((block, public_poi.into())); + }); + } + BlocklistEntry::Other { + deployment, + indexer, + .. + } => { + self.indexer.send_modify(move |blocklist| { + blocklist.entry(indexer).or_default().insert(deployment); + }); + } + }; + self.blocklist + .send_modify(move |blocklist| blocklist.push(entry)); + } + + fn remove_entry(&mut self, entry: &BlocklistEntry) { + match entry { + BlocklistEntry::Poi { + deployment, + block, + public_poi, + .. + } => { + self.poi.send_modify(|blocklist| { + if let Some(entry) = blocklist.get_mut(deployment) { + entry.retain(|value| &(*block, (*public_poi).into()) != value); + } + }); + } + BlocklistEntry::Other { + deployment, + indexer, + .. + } => { + self.indexer.send_modify(|blocklist| { + if let Some(entry) = blocklist.get_mut(indexer) { + entry.remove(deployment); + } + }); + } + }; + fn matching(a: &BlocklistEntry, b: &BlocklistEntry) -> bool { + match (a, b) { + ( + BlocklistEntry::Poi { + deployment, + public_poi, + block, + info: _, + }, + BlocklistEntry::Poi { + deployment: deployment_, + public_poi: public_poi_, + block: block_, + info: _, + }, + ) => { + (deployment == deployment_) && (public_poi == public_poi_) && (block == block_) + } + ( + BlocklistEntry::Other { + indexer, + deployment, + info: _, + }, + BlocklistEntry::Other { + indexer: indexer_, + deployment: deployment_, + info: _, + }, + ) => (indexer == indexer_) && (deployment == deployment_), + _ => false, + } + } + self.blocklist + .send_modify(|blocklist| blocklist.retain(|value| !matching(entry, value))); + } +} + +async fn create_consumer( + mut kafka_config: rdkafka::ClientConfig, +) -> anyhow::Result { + let topic = "gateway_blocklist"; + let group_id = format!("gateway-{:x}", thread_rng().next_u64()); + let consumer: StreamConsumer = kafka_config.set("group.id", group_id).create()?; + let metadata = consumer + .fetch_metadata(Some(topic), Duration::from_secs(30)) + .with_context(|| anyhow!("fetch {topic} metadata"))?; + anyhow::ensure!(!metadata.topics().is_empty()); + let topic_info = &metadata.topics()[0]; + let mut assignment = TopicPartitionList::new(); + for partition in topic_info.partitions() { + assignment.add_partition_offset(topic, partition.id(), rdkafka::Offset::Beginning)?; + } + tracing::debug!(?assignment); + consumer.assign(&assignment)?; + Ok(consumer) +} diff --git a/src/network/indexer_processing.rs b/src/network/indexer_processing.rs index 6c710001..1b1f5e74 100644 --- a/src/network/indexer_processing.rs +++ b/src/network/indexer_processing.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use custom_debug::CustomDebug; use thegraph_core::{alloy::primitives::BlockNumber, AllocationId, DeploymentId, IndexerId}; @@ -6,7 +6,6 @@ use tracing::Instrument; use url::Url; use crate::{ - config::BlockedIndexer, errors::UnavailableReason, network::{indexing_progress::IndexingProgressResolver, service::InternalState}, }; @@ -156,7 +155,13 @@ pub async fn process_info( return (*indexer_id, Err(err)); } - let blocklist = state.indexer_blocklist.get(&*indexer.id); + let blocklist = state + .indexer_blocklist + .borrow() + .get(&*indexer.id) + .cloned() + .unwrap_or_default(); + // Resolve the indexer's indexings information let indexings = process_indexer_indexings( state, @@ -191,29 +196,19 @@ async fn process_indexer_indexings( state: &InternalState, url: &Url, indexings: HashMap, - blocklist: Option<&BlockedIndexer>, + blocklist: HashSet, ) -> HashMap> { let mut indexer_indexings: HashMap, _>> = indexings .into_iter() .map(|(id, info)| (id, Ok(info.into()))) .collect(); - match blocklist { - None => (), - Some(blocklist) if blocklist.deployments.is_empty() => { - for entry in indexer_indexings.values_mut() { - *entry = Err(UnavailableReason::Blocked(blocklist.reason.clone())); - } - } - Some(blocklist) => { - for deployment in &blocklist.deployments { - indexer_indexings.insert( - *deployment, - Err(UnavailableReason::Blocked(blocklist.reason.clone())), - ); - } - } - }; + for deployment in blocklist { + indexer_indexings.insert( + deployment, + Err(UnavailableReason::Blocked("missing data".to_string())), + ); + } // ref: df8e647b-1e6e-422a-8846-dc9ee7e0dcc2 let status_url = url.join("status").unwrap(); diff --git a/src/network/poi_filter.rs b/src/network/poi_filter.rs index c1e17f23..cfadd9fd 100644 --- a/src/network/poi_filter.rs +++ b/src/network/poi_filter.rs @@ -6,14 +6,14 @@ use std::{ use serde_with::serde_as; use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId, ProofOfIndexing}; use thegraph_graphql_http::http_client::ReqwestExt; -use tokio::time::Instant; +use tokio::{sync::watch, time::Instant}; use url::Url; use super::GraphQlRequest; pub struct PoiFilter { http: reqwest::Client, - blocklist: HashMap>, + blocklist: watch::Receiver>>, cache: parking_lot::RwLock>, } @@ -25,7 +25,7 @@ struct IndexerEntry { impl PoiFilter { pub fn new( http: reqwest::Client, - blocklist: HashMap>, + blocklist: watch::Receiver>>, ) -> Self { Self { http, @@ -39,8 +39,9 @@ impl PoiFilter { status_url: &Url, deployments: &[DeploymentId], ) -> HashSet { - let requests: Vec<(DeploymentId, BlockNumber)> = self - .blocklist + let blocklist = self.blocklist.borrow().clone(); + + let requests: Vec<(DeploymentId, BlockNumber)> = blocklist .iter() .filter(|(deployment, _)| deployments.contains(deployment)) .flat_map(|(deployment, entries)| { @@ -51,7 +52,7 @@ impl PoiFilter { deployments .iter() - .filter(|deployment| match self.blocklist.get(deployment) { + .filter(|deployment| match blocklist.get(deployment) { None => false, Some(blocklist) => blocklist.iter().any(|(block, poi)| { pois.get(&(**deployment, *block)) @@ -208,6 +209,7 @@ mod tests { alloy::{hex, primitives::FixedBytes}, DeploymentId, }; + use tokio::sync::watch; use url::Url; use crate::init_logging; @@ -255,7 +257,7 @@ mod tests { }); let blocklist = HashMap::from([(deployment, vec![(0, bad_poi.into())])]); - let poi_filter = super::PoiFilter::new(reqwest::Client::new(), blocklist); + let poi_filter = super::PoiFilter::new(reqwest::Client::new(), watch::channel(blocklist).1); let status_url = indexer_url.join("status").unwrap(); let assert_blocked = |blocked: Vec| async { diff --git a/src/network/service.rs b/src/network/service.rs index 626e4803..a337a0de 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -3,7 +3,7 @@ //! query processing pipeline use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{HashMap, HashSet}, time::Duration, }; @@ -18,6 +18,7 @@ use tokio::{sync::watch, time::MissedTickBehavior}; use super::{ cost_model::CostModelResolver, host_filter::HostFilter, + indexer_blocklist, indexer_processing::{self, IndexerRawInfo}, indexing_progress::IndexingProgressResolver, poi_filter::PoiFilter, @@ -28,10 +29,7 @@ use super::{ version_filter::{MinimumVersionRequirements, VersionFilter}, DeploymentError, SubgraphError, }; -use crate::{ - config::{BlockedIndexer, BlockedPoi}, - errors::UnavailableReason, -}; +use crate::errors::UnavailableReason; /// Subgraph resolution information returned by the [`NetworkService`]. pub struct ResolvedSubgraphInfo { @@ -164,30 +162,13 @@ impl NetworkService { pub fn spawn( http: reqwest::Client, subgraph_client: SubgraphClient, + indexer_blocklist: indexer_blocklist::Blocklist, min_indexer_service_version: Version, min_graph_node_version: Version, - indexer_blocklist: BTreeMap, indexer_host_blocklist: HashSet, - poi_blocklist: Vec, ) -> NetworkService { - let poi_blocklist = poi_blocklist - .iter() - .map(|entry| &entry.deployment) - .collect::>() - .into_iter() - .map(|deployment| { - ( - *deployment, - poi_blocklist - .iter() - .filter(|entry| &entry.deployment == deployment) - .map(|entry| (entry.block_number, entry.public_poi.into())) - .collect::>(), - ) - }) - .collect(); let internal_state = InternalState { - indexer_blocklist, + indexer_blocklist: indexer_blocklist.indexer, indexer_host_filter: HostFilter::new(indexer_host_blocklist) .expect("failed to create host resolver"), indexer_version_filter: VersionFilter::new( @@ -197,7 +178,7 @@ pub fn spawn( graph_node: min_graph_node_version, }, ), - indexer_poi_filer: PoiFilter::new(http.clone(), poi_blocklist), + indexer_poi_filer: PoiFilter::new(http.clone(), indexer_blocklist.poi), indexing_progress_resolver: IndexingProgressResolver::new(http.clone()), cost_model_resolver: CostModelResolver::new(http.clone()), }; @@ -207,7 +188,7 @@ pub fn spawn( } pub struct InternalState { - pub indexer_blocklist: BTreeMap, + pub indexer_blocklist: watch::Receiver>>, pub indexer_host_filter: HostFilter, pub indexer_version_filter: VersionFilter, pub indexer_poi_filer: PoiFilter,