Skip to content

Commit

Permalink
Merge pull request #254 from graphprotocol/mde/add-blockmeta-api-support
Browse files Browse the repository at this point in the history
feat: add Blockmeta service client
  • Loading branch information
Maikol authored Feb 28, 2024
2 parents 7518503 + ba71ff4 commit 207ecd3
Show file tree
Hide file tree
Showing 16 changed files with 2,758 additions and 135 deletions.
1,517 changes: 1,391 additions & 126 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 12 additions & 1 deletion crates/oracle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ name = "block-oracle"
version = "0.1.0"
edition = "2021"

[features]
# Enable to run protobuf messages code generation at compile time (requires protoc, etc.).
proto-gen = []

[dependencies]
anyhow = "1.0.57"
async-trait = "0.1.53"
Expand All @@ -26,16 +30,23 @@ serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1"
serde_with = "1.1.12"
thiserror = "1.0.30"
tokio = { version = "1", features = ["rt", "macros", "sync"] }
tokio = { version = "1.36.0", features = ["rt", "macros", "sync"] }
toml = "0.5.8"
tracing = "0.1.34"
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
url = { version = "2.2.2", features = ["serde"] }
web3 = { version = "0.18.0", features = ["signing"] }
warp = "0.3"
either = "1.8.0"
tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
prost = "0.12.3"
prost-types = "0.12.3"
alloy-primitives = { version = "0.6.3", features = ["serde"] }

[dev-dependencies]
hyper = { version = "0.14", features = ["full"] }
rand = "0.8"
serde_json = "1"

[build-dependencies]
tonic-build = "0.11.0"
69 changes: 69 additions & 0 deletions crates/oracle/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::env;
use std::path::PathBuf;
use std::process::Command;

/// Return the path to root of the crate being built.
///
/// The `CARGO_MANIFEST_DIR` env variable contains the path to the directory containing the
/// manifest for the package being built (the package containing the build script). Also note that
/// this is the value of the current working directory of the build script when it starts.
///
/// https://doc.rust-lang.org/cargo/reference/environment-variables.html#environment-variables-cargo-sets-for-build-scripts
fn root_dir() -> PathBuf {
PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap())
}

/// Check if all the build requirements are met.
///
/// This function checks if the following tools are installed:
/// - protoc (required by `prost-build`, see: https://github.com/tokio-rs/prost#protoc)
fn check_build_requirements() -> Result<(), String> {
let mut errors = vec![];

// Check if protoc is installed.
let protoc = Command::new("protoc").arg("--version").status().unwrap();
if !protoc.success() {
errors.push(
"protoc not found. Please install protoc: https://grpc.io/docs/protoc-installation/",
);
}

if !errors.is_empty() {
return Err(format!(
"Build requirements not met:\n - {}",
errors.join("\n - ")
));
}

Ok(())
}

fn main() {
// Run code generation only if 'proto-gen' feature is enabled.
if env::var("CARGO_FEATURE_PROTO_GEN").is_ok() {
// Check if all the build requirements are met.
if let Err(err) = check_build_requirements() {
panic!("{}", err);
}

let src_dir = root_dir().join("src");
let proto_dir = root_dir().join("proto");

// Streamingfast Blockmeta service gRPC proto files
let sf_blockmeta_proto_dir = proto_dir.join("sf/blockmeta/v2");
let sf_blockmeta_src_dir = src_dir.join("blockmeta/sf_blockmeta_client");

let status = tonic_build::configure()
.build_client(true)
.out_dir(sf_blockmeta_src_dir)
.emit_rerun_if_changed(true)
.compile(
&[sf_blockmeta_proto_dir.join("blockmeta.proto")],
&[sf_blockmeta_proto_dir],
);

if let Err(err) = status {
panic!("Protobuf code generation failed: {}", err);
}
}
}
45 changes: 45 additions & 0 deletions crates/oracle/proto/sf/blockmeta/v2/blockmeta.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
syntax = "proto3";
package sf.blockmeta.v2;
import "google/protobuf/timestamp.proto";

option go_package = "github.com/streamingfast/blockmeta-service;pbbmsrv";

service Block {
rpc NumToID (NumToIDReq) returns (BlockResp);
rpc IDToNum(IDToNumReq) returns (BlockResp);
rpc Head(Empty) returns (BlockResp);
}

message Empty {}

service BlockByTime {
rpc At (TimeReq) returns (BlockResp);
rpc After (RelativeTimeReq) returns (BlockResp);
rpc Before (RelativeTimeReq) returns (BlockResp);
}

// Block Requests
message NumToIDReq {
uint64 blockNum = 1;
}

message IDToNumReq {
string blockID = 1;
}

// Block & BlockByTime Responses
message BlockResp {
string id = 1;
uint64 num = 2;
google.protobuf.Timestamp time = 3;
}

// BlockByTime Requests
message TimeReq {
google.protobuf.Timestamp time = 1;
}

message RelativeTimeReq {
google.protobuf.Timestamp time = 1;
bool inclusive = 2;
}
177 changes: 177 additions & 0 deletions crates/oracle/src/blockmeta/blockmeta_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
//! StreamingFast Blockmeta gRPC client.
use std::collections::BTreeMap;
use std::time::Duration;

use prost::bytes::Bytes;

use futures::stream::{FuturesUnordered, StreamExt};

use tonic::codegen::{Body, InterceptedService, StdError};
use tonic::transport::{Channel, Uri};

pub use self::auth::AuthInterceptor;
use self::gen::block_client::BlockClient;
pub use self::gen::BlockResp as Block;
use self::gen::Empty;
use crate::{BlockmetaProviderForChain, Caip2ChainId};

/// This file is **generated** by the `build.rs` when compiling the crate with the `proto-gen`
/// feature enabled. The `build.rs` script uses the `tonic-build` crate to generate the files.
///
/// ```shell
/// cargo build --features proto-gen --bin block-oracle
/// ```
mod gen {
include!("sf_blockmeta_client/sf.blockmeta.v2.rs");
}

mod auth {
use tonic::{Request, Status};

/// The `AuthInterceptor` is a gRPC interceptor that adds an `authorization` header to the request
/// metadata.
///
/// This middleware inserts the `authorization` header into the request metadata. The header is
/// expected to be in the format `Bearer <token>`.
///
/// It is used to authenticate requests to the StreamingFast Blockmeta service.
#[derive(Clone)]
pub struct AuthInterceptor {
header_value: String,
}

impl AuthInterceptor {
/// Create a new `AuthInterceptor` with the given authorization token.
pub(super) fn with_token(token: &str) -> Self {
Self {
header_value: format!("bearer {}", token),
}
}
}

impl tonic::service::Interceptor for AuthInterceptor {
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
// The `authorization` header is expected to be in the format `Bearer <token>`
let auth = self.header_value.parse().map_err(|err| {
Status::new(
tonic::Code::Unauthenticated,
format!("invalid authorization token: {}", err),
)
})?;

// Insert the `authorization` header into the request metadata
request.metadata_mut().insert("authorization", auth);
Ok(request)
}
}
}

/// StreamingFast Blockmeta gRPC client.
///
/// The `BlockmetaClient` is a gRPC client for the StreamingFast Blockmeta service. It provides
/// method to fetch the latest block.
#[derive(Debug, Clone)]
pub struct BlockmetaClient<T> {
grpc_client: BlockClient<T>,
}

impl BlockmetaClient<Channel> {
/// Create a new `BlockmetaClient` with the given gRPC endpoint.
///
/// The service will connect once the first request is made. It will attempt to connect for
/// 5 seconds before timing out.
pub fn new(endpoint: Uri) -> Self {
let channel = Channel::builder(endpoint)
.tls_config(Default::default())
.expect("failed to configure TLS")
.connect_timeout(Duration::from_secs(5))
.connect_lazy();
Self {
grpc_client: BlockClient::new(channel),
}
}
}

impl BlockmetaClient<InterceptedService<Channel, AuthInterceptor>> {
/// Create a new `BlockmetaClient` with the given gRPC endpoint and authorization token.
///
/// The cliient will connect to the given endpoint and authenticate requests with the given
/// authorization token inserted into the `authorization` header by the [`AuthInterceptor`].
///
/// The service will connect once the first request is made. It will attempt to connect for
/// 5 seconds before timing out.
pub fn new_with_auth(endpoint: Uri, auth: impl AsRef<str>) -> Self {
let interceptor = AuthInterceptor::with_token(auth.as_ref());
let channel = Channel::builder(endpoint)
.tls_config(Default::default())
.expect("failed to configure TLS")
.connect_timeout(Duration::from_secs(5))
.connect_lazy();

Self {
grpc_client: BlockClient::with_interceptor(channel, interceptor),
}
}
}

impl<T> BlockmetaClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
/// Fetch the latest block from the StreamingFast Blockmeta service.
///
/// Returns `None` if the block does not exist.
pub async fn get_latest_block(&mut self) -> anyhow::Result<Option<Block>> {
let request = Empty {};

match self.grpc_client.head(request).await {
Ok(res) => Ok(Some(res.into_inner())),
Err(err) if err.code() == tonic::Code::NotFound => Ok(None),
Err(err) => Err(anyhow::anyhow!("request failed: {}", err.message())),
}
}
}

/// Fetches the latest available block number and hash from all `chains`.
pub async fn get_latest_blockmeta_blocks<T>(
chains: &[BlockmetaProviderForChain<T>],
) -> BTreeMap<Caip2ChainId, anyhow::Result<Block>>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
T: std::clone::Clone,
{
let mut tasks = chains
.iter()
.cloned()
.map(|mut chain| async move {
chain.client.get_latest_block().await.map(|block| {
(
chain.chain_id,
block.ok_or_else(|| anyhow::anyhow!("Block not found")),
)
})
})
.collect::<FuturesUnordered<_>>();

let mut block_ptr_per_chain = BTreeMap::new();
while let Some(result) = tasks.next().await {
match result {
Ok((chain_id, block)) => {
block_ptr_per_chain.insert(chain_id, block);
}
Err(e) => {
println!("Error: {:?}", e);
}
}
}

assert!(block_ptr_per_chain.len() == chains.len());
block_ptr_per_chain
}
Loading

0 comments on commit 207ecd3

Please sign in to comment.