Skip to content

Commit

Permalink
feat: use epoch block oracle subgraph for supported networks
Browse files Browse the repository at this point in the history
  • Loading branch information
Maikol committed Apr 19, 2024
1 parent 569a743 commit 5cc4352
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 12 deletions.
116 changes: 116 additions & 0 deletions availability-oracle/src/epoch_block_oracle_subgraph.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use common::prelude::*;
use futures::stream;
use futures::Stream;
use reqwest::Client;
use serde_derive::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::Arc;

pub trait EpochBlockOracleSubgraph {
fn supported_networks(self: Arc<Self>) -> Pin<Box<dyn Stream<Item = Result<String, Error>>>>;
}

pub struct EpochBlockOracleSubgraphImpl {
logger: Logger,
endpoint: String,
client: Client,
}

impl EpochBlockOracleSubgraphImpl {
pub fn new(logger: Logger, endpoint: String) -> Arc<Self> {
Arc::new(EpochBlockOracleSubgraphImpl {
logger,
endpoint,
client: Client::new(),
})
}
}

#[derive(Serialize)]
struct GraphqlRequest {
query: String,
variables: BTreeMap<String, serde_json::Value>,
}

#[derive(Deserialize)]
struct GraphqlResponse {
data: Option<BTreeMap<String, serde_json::Value>>,
errors: Option<Vec<serde_json::Value>>,
}

const SUPPORTED_NETWORKS_QUERY: &str = r#"
query Networks($skip: Int!) {
networks(first: 1000, skip: $skip) {
id
alias
}
}
"#;

impl EpochBlockOracleSubgraph for EpochBlockOracleSubgraphImpl {
fn supported_networks(self: Arc<Self>) -> Pin<Box<dyn Stream<Item = Result<String, Error>>>> {
stream::iter((0..).step_by(1000))
.then(move |skip| {
let this = self.clone();
async move {
let req = GraphqlRequest {
query: SUPPORTED_NETWORKS_QUERY.to_string(),
variables: vec![("skip".to_string(), skip.into())]
.into_iter()
.collect(),
};

let res: GraphqlResponse = this
.client
.post(&this.endpoint)
.json(&req)
.send()
.await?
.error_for_status()?
.json()
.await?;

if let Some(errs) = res.errors.filter(|errs| !errs.is_empty()) {
return Err(anyhow!(
"error querying supported networks from subgraph {}",
serde_json::to_string(&errs)?
));
}

let data = res
.data
.ok_or_else(|| anyhow!("Data field is missing in the response"))?
.remove("networks")
.ok_or_else(|| anyhow!("'networks' field is missing in the data"))?;

#[derive(Deserialize)]
#[allow(non_snake_case)]
struct RawNetwork {
id: String,
alias: String,
}

let page: Vec<RawNetwork> = serde_json::from_value(data)?;
let page: Vec<String> = page
.into_iter()
.flat_map(|raw_network| vec![raw_network.id, raw_network.alias])
.collect();

trace!(this.logger, "networks page"; "page_size" => page.len());

Ok(page)
}
})
.take_while(|networks| {
let keep_paginating = match networks {
Ok(networks) => !networks.is_empty(),
Err(_) => true,
};
async move { keep_paginating }
})
.map_ok(|networks| stream::iter(networks.into_iter().map(Ok)))
.try_flatten()
.boxed()
}
}
40 changes: 28 additions & 12 deletions availability-oracle/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod contract;
mod epoch_block_oracle_subgraph;
mod ipfs;
mod manifest;
mod network_subgraph;
Expand All @@ -8,6 +9,7 @@ mod util;
use common::prelude::*;
use common::prometheus;
use contract::*;
use epoch_block_oracle_subgraph::{EpochBlockOracleSubgraph, EpochBlockOracleSubgraphImpl};
use ethers::abi::Address;
use ethers::signers::LocalWallet;
use ethers::signers::Signer;
Expand Down Expand Up @@ -105,14 +107,11 @@ struct Config {
metrics_port: u16,

#[structopt(
short,
long,
default_value = "mainnet",
value_delimiter = ",",
env = "SUPPORTED_NETWORKS",
help = "a comma separated list of the supported network ids"
env = "EPOCH_BLOCK_ORACLE_SUBGRAPH",
help = "Graphql endpoint to the epoch block oracle subgraph"
)]
supported_networks: Vec<String>,
epoch_block_oracle_subgraph: String,

// Note: `ethereum/contract` is a valid alias for `ethereum`
#[structopt(
Expand Down Expand Up @@ -157,6 +156,8 @@ async fn main() -> Result<()> {
async fn run(logger: Logger, config: Config) -> Result<()> {
let ipfs = IpfsImpl::new(config.ipfs, config.ipfs_concurrency, config.ipfs_timeout);
let subgraph = NetworkSubgraphImpl::new(logger.clone(), config.subgraph);
let epoch_subgraph =
EpochBlockOracleSubgraphImpl::new(logger.clone(), config.epoch_block_oracle_subgraph);
let contract: Box<dyn StateManager> = if config.dry_run {
Box::new(StateManagerDryRun::new(logger.clone()))
} else {
Expand Down Expand Up @@ -194,7 +195,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
subgraph.clone(),
config.min_signal,
grace_period,
&config.supported_networks,
epoch_subgraph.clone(),
&config.supported_data_source_kinds,
)
.await
Expand Down Expand Up @@ -227,7 +228,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
subgraph,
config.min_signal,
grace_period,
&config.supported_networks,
epoch_subgraph.clone(),
&config.supported_data_source_kinds,
)
.await
Expand Down Expand Up @@ -278,18 +279,33 @@ pub async fn reconcile_deny_list(
subgraph: Arc<impl NetworkSubgraph>,
min_signal: u64,
grace_period: Duration,
supported_network_ids: &[String],
epoch_subgraph: Arc<impl EpochBlockOracleSubgraph>,
supported_ds_kinds: &[String],
) -> Result<(), Error> {
let logger = logger.clone();

// Fetch supported networks
let mut supported_networks = Vec::new();
let networks_stream = epoch_subgraph.supported_networks();
futures::pin_mut!(networks_stream);
while let Some(network) = networks_stream.next().await {
match network {
Ok(network_id) => supported_networks.push(network_id),
Err(e) => Err(e)?,
}
}

info!(logger, "Supported networks";
"alias" => supported_networks.join(", ")
);

// Check the availability status of all subgraphs, and gather which should flip the deny flag.
let status_changes: Vec<([u8; 32], bool)> = subgraph
.deployments_over_threshold(min_signal, grace_period)
.map(|deployment| async {
let deployment = deployment?;
let id = bytes32_to_cid_v0(deployment.id);
let validity = match check(ipfs, id, supported_network_ids, supported_ds_kinds).await {
let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await {
Ok(()) => Valid::Yes,
Err(CheckError::Invalid(e)) => Valid::No(e),
Err(CheckError::Other(e)) => return Err(e),
Expand Down Expand Up @@ -419,7 +435,7 @@ impl From<Invalid> for CheckError {
async fn check(
ipfs: &impl Ipfs,
deployment_id: Cid,
supported_network_ids: &[String],
supported_networks: &[String],
supported_ds_kinds: &[String],
) -> Result<(), CheckError> {
fn check_link(file: &manifest::Link) -> Result<Cid, Invalid> {
Expand Down Expand Up @@ -483,7 +499,7 @@ async fn check(
// - That network is listed in the `supported_networks` list
match (network, ds_network) {
(None, Some(ds_network)) => {
if !supported_network_ids.contains(ds_network) {
if !supported_networks.contains(ds_network) {
return Err(Invalid::UnsupportedNetwork(ds_network.clone()).into());
}
network = Some(ds_network)
Expand Down

0 comments on commit 5cc4352

Please sign in to comment.