Skip to content

Commit

Permalink
feat: dynamic blocklist (#1008)
Browse files Browse the repository at this point in the history
This merges the PoI and indexer blocklists, such that the new blocklist
is served via `/blocklist`. Also, a kafka consumer is added to update
blocklist entries from the `gateway_blocklist` topic.
  • Loading branch information
Theodus authored Dec 17, 2024
1 parent 9031ee7 commit 8c4fc46
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 87 deletions.
51 changes: 27 additions & 24 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//! The Graph Gateway configuration.
use std::{
collections::{BTreeMap, HashSet},
path::{Path, PathBuf},
Expand All @@ -26,9 +24,9 @@ pub struct Config {
#[serde(default)]
pub api_keys: Option<ApiKeys>,
pub attestations: AttestationConfig,
/// List of indexer addresses to block. This should only be used temprorarily.
/// Blocklist applying to indexers.
#[serde(default)]
pub blocked_indexers: BTreeMap<Address, BlockedIndexer>,
pub blocklist: Vec<BlocklistEntry>,
/// Chain aliases
#[serde(default)]
pub chain_aliases: BTreeMap<String, String>,
Expand All @@ -53,9 +51,6 @@ pub struct Config {
pub trusted_indexers: Vec<TrustedIndexer>,
/// Check payment state of client (disable for testnets)
pub payment_required: bool,
/// POI blocklist
#[serde(default)]
pub poi_blocklist: Vec<BlockedPoi>,
/// public API port
pub port_api: u16,
/// private metrics port
Expand Down Expand Up @@ -96,11 +91,30 @@ pub enum ApiKeys {
Fixed(Vec<ApiKey>),
}

#[derive(Deserialize)]
pub struct BlockedIndexer {
/// empty array blocks on all deployments
pub deployments: Vec<DeploymentId>,
pub reason: String,
#[derive(Clone, Deserialize, Serialize)]
#[serde(untagged)]
pub enum BlocklistEntry {
Poi {
deployment: DeploymentId,
info: BlocklistInfo,
public_poi: B256,
block: BlockNumber,
},
Other {
deployment: DeploymentId,
info: BlocklistInfo,
indexer: Address,
},
}

#[derive(Clone, Deserialize, Serialize)]
pub struct BlocklistInfo {
/// Example query (should be minimal to reproduce bad response)
#[serde(default, skip_serializing_if = "Option::is_none")]
query: Option<String>,
/// Bad query response, from the above query executed on indexers with this blocked PoI
#[serde(default, skip_serializing_if = "Option::is_none")]
bad_query_response: Option<String>,
}

/// Attestation configuration.
Expand Down Expand Up @@ -128,7 +142,7 @@ pub enum ExchangeRateProvider {
/// Kafka configuration.
///
/// See [`Config`]'s [`kafka`](struct.Config.html#structfield.kafka).
#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
pub struct KafkaConfig(BTreeMap<String, String>);

impl Default for KafkaConfig {
Expand Down Expand Up @@ -171,17 +185,6 @@ pub struct Receipts {
pub verifier: Address,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BlockedPoi {
pub public_poi: B256,
pub deployment: DeploymentId,
pub block_number: BlockNumber,
/// Example query (should be minimal to reproduce bad response)
pub query: Option<String>,
/// Bad query response, from the above query executed on indexers with this blocked PoI
pub bad_query_response: Option<String>,
}

/// Load the configuration from a JSON file.
pub fn load_from_file(path: &Path) -> anyhow::Result<Config> {
let config_content = std::fs::read_to_string(path)?;
Expand Down
17 changes: 7 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ use axum::{
use budgets::{Budgeter, USD};
use chains::Chains;
use client_query::context::Context;
use config::{ApiKeys, ExchangeRateProvider};
use config::{ApiKeys, BlocklistEntry, ExchangeRateProvider};
use indexer_client::IndexerClient;
use indexing_performance::IndexingPerformance;
use middleware::{
legacy_auth_adapter, RequestTracingLayer, RequireAuthorizationLayer, SetRequestIdLayer,
};
use network::subgraph_client::Client as SubgraphClient;
use network::{indexer_blocklist, subgraph_client::Client as SubgraphClient};
use prometheus::{self, Encoder as _};
use receipts::ReceiptSigner;
use thegraph_core::{
Expand Down Expand Up @@ -109,14 +109,15 @@ async fn main() {
}
None => Default::default(),
};
let indexer_blocklist =
indexer_blocklist::Blocklist::spawn(conf.blocklist, conf.kafka.clone().into());
let mut network = network::service::spawn(
http_client.clone(),
network_subgraph_client,
indexer_blocklist.clone(),
conf.min_indexer_version,
conf.min_graph_node_version,
conf.blocked_indexers,
indexer_host_blocklist,
conf.poi_blocklist.clone(),
);
let indexing_perf = IndexingPerformance::new(network.clone());
network.wait_until_ready().await;
Expand All @@ -127,7 +128,6 @@ async fn main() {
conf.receipts.verifier,
)));

// Initialize the auth service
let auth_service =
init_auth_service(http_client.clone(), conf.api_keys, conf.payment_required).await;

Expand Down Expand Up @@ -157,7 +157,7 @@ async fn main() {
reporter,
};

let poi_blocklist: &'static str = serde_json::to_string(&conf.poi_blocklist).unwrap().leak();
let blocklist: watch::Receiver<Vec<BlocklistEntry>> = indexer_blocklist.blocklist;

// Host metrics on a separate server with a port that isn't open to public requests.
tokio::spawn(async move {
Expand Down Expand Up @@ -225,10 +225,7 @@ async fn main() {
.route("/ready", routing::get(|| async { "Ready" }))
.route(
"/blocklist",
routing::get(move || async move {
let headers = [(reqwest::header::CONTENT_TYPE, "application/json")];
(headers, poi_blocklist)
}),
routing::get(move || async move { axum::Json(blocklist.borrow().clone()) }),
)
.nest("/api", api);

Expand Down
1 change: 1 addition & 0 deletions src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use thegraph_graphql_http::graphql::{IntoDocument as _, IntoDocumentWithVariable

pub mod cost_model;
pub mod host_filter;
pub mod indexer_blocklist;
mod indexer_processing;
pub mod indexing_progress;
pub mod poi_filter;
Expand Down
212 changes: 212 additions & 0 deletions src/network/indexer_blocklist.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use std::{
collections::{HashMap, HashSet},
time::Duration,
};

use anyhow::{anyhow, Context as _};
use futures::StreamExt as _;
use rand::{thread_rng, RngCore as _};
use rdkafka::{
consumer::{Consumer as _, StreamConsumer},
Message, TopicPartitionList,
};
use thegraph_core::{alloy::primitives::Address, DeploymentId, ProofOfIndexing};
use tokio::sync::watch;

use crate::config::BlocklistEntry;

#[derive(Clone)]
pub struct Blocklist {
pub blocklist: watch::Receiver<Vec<BlocklistEntry>>,
pub poi: watch::Receiver<HashMap<DeploymentId, Vec<(u64, ProofOfIndexing)>>>,
pub indexer: watch::Receiver<HashMap<Address, HashSet<DeploymentId>>>,
}

impl Blocklist {
pub fn spawn(init: Vec<BlocklistEntry>, kafka_config: rdkafka::ClientConfig) -> Self {
let (blocklist_tx, blocklist_rx) = watch::channel(Default::default());
let (poi_tx, poi_rx) = watch::channel(Default::default());
let (indexer_tx, indexer_rx) = watch::channel(Default::default());
let mut actor = Actor {
blocklist: blocklist_tx,
poi: poi_tx,
indexer: indexer_tx,
};
for entry in init {
actor.add_entry(entry);
}
tokio::spawn(async move {
actor.run(kafka_config).await;
});
Self {
blocklist: blocklist_rx,
poi: poi_rx,
indexer: indexer_rx,
}
}
}

struct Actor {
blocklist: watch::Sender<Vec<BlocklistEntry>>,
poi: watch::Sender<HashMap<DeploymentId, Vec<(u64, ProofOfIndexing)>>>,
indexer: watch::Sender<HashMap<Address, HashSet<DeploymentId>>>,
}

impl Actor {
async fn run(&mut self, kafka_config: rdkafka::ClientConfig) {
let consumer = match create_consumer(kafka_config).await {
Ok(consumer) => consumer,
Err(blocklist_err) => {
tracing::error!(%blocklist_err);
return;
}
};

let mut records: HashMap<String, BlocklistEntry> = Default::default();
let mut stream = consumer.stream();
while let Some(msg) = stream.next().await {
let msg = match msg {
Ok(msg) => msg,
Err(blocklist_recv_error) => {
tracing::error!(%blocklist_recv_error);
continue;
}
};
let key = match msg.key_view::<str>() {
Some(Ok(key)) => key,
result => {
tracing::error!("invalid key: {result:?}");
continue;
}
};
match msg.payload().map(serde_json::from_slice::<BlocklistEntry>) {
Some(Ok(entry)) => {
records.insert(key.to_string(), entry.clone());
self.add_entry(entry);
}
None => {
let entry = records.remove(key);
if let Some(entry) = entry {
self.remove_entry(&entry);
}
}
Some(Err(blocklist_deserialize_err)) => {
tracing::error!(%blocklist_deserialize_err);
}
};
}
tracing::error!("blocklist consumer stopped");
}

fn add_entry(&mut self, entry: BlocklistEntry) {
match entry {
BlocklistEntry::Poi {
deployment,
block,
public_poi,
..
} => {
self.poi.send_modify(move |blocklist| {
blocklist
.entry(deployment)
.or_default()
.push((block, public_poi.into()));
});
}
BlocklistEntry::Other {
deployment,
indexer,
..
} => {
self.indexer.send_modify(move |blocklist| {
blocklist.entry(indexer).or_default().insert(deployment);
});
}
};
self.blocklist
.send_modify(move |blocklist| blocklist.push(entry));
}

fn remove_entry(&mut self, entry: &BlocklistEntry) {
match entry {
BlocklistEntry::Poi {
deployment,
block,
public_poi,
..
} => {
self.poi.send_modify(|blocklist| {
if let Some(entry) = blocklist.get_mut(deployment) {
entry.retain(|value| &(*block, (*public_poi).into()) != value);
}
});
}
BlocklistEntry::Other {
deployment,
indexer,
..
} => {
self.indexer.send_modify(|blocklist| {
if let Some(entry) = blocklist.get_mut(indexer) {
entry.remove(deployment);
}
});
}
};
fn matching(a: &BlocklistEntry, b: &BlocklistEntry) -> bool {
match (a, b) {
(
BlocklistEntry::Poi {
deployment,
public_poi,
block,
info: _,
},
BlocklistEntry::Poi {
deployment: deployment_,
public_poi: public_poi_,
block: block_,
info: _,
},
) => {
(deployment == deployment_) && (public_poi == public_poi_) && (block == block_)
}
(
BlocklistEntry::Other {
indexer,
deployment,
info: _,
},
BlocklistEntry::Other {
indexer: indexer_,
deployment: deployment_,
info: _,
},
) => (indexer == indexer_) && (deployment == deployment_),
_ => false,
}
}
self.blocklist
.send_modify(|blocklist| blocklist.retain(|value| !matching(entry, value)));
}
}

async fn create_consumer(
mut kafka_config: rdkafka::ClientConfig,
) -> anyhow::Result<StreamConsumer> {
let topic = "gateway_blocklist";
let group_id = format!("gateway-{:x}", thread_rng().next_u64());
let consumer: StreamConsumer = kafka_config.set("group.id", group_id).create()?;
let metadata = consumer
.fetch_metadata(Some(topic), Duration::from_secs(30))
.with_context(|| anyhow!("fetch {topic} metadata"))?;
anyhow::ensure!(!metadata.topics().is_empty());
let topic_info = &metadata.topics()[0];
let mut assignment = TopicPartitionList::new();
for partition in topic_info.partitions() {
assignment.add_partition_offset(topic, partition.id(), rdkafka::Offset::Beginning)?;
}
tracing::debug!(?assignment);
consumer.assign(&assignment)?;
Ok(consumer)
}
Loading

0 comments on commit 8c4fc46

Please sign in to comment.