From 5dacf4ab3bada0fecb9d9aff462ec15ac161279b Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Mon, 23 Oct 2023 21:08:37 +0200 Subject: [PATCH 01/18] feat: add an indexer service framework --- Cargo.lock | 17 ++ common/Cargo.toml | 5 + common/src/indexer_service/http/config.rs | 49 ++++ .../indexer_service/http/indexer_service.rs | 231 ++++++++++++++++++ common/src/indexer_service/http/mod.rs | 12 + .../indexer_service/http/request_handler.rs | 100 ++++++++ .../http/scalar_receipt_header.rs | 54 ++++ common/src/indexer_service/mod.rs | 1 + common/src/lib.rs | 1 + 9 files changed, 470 insertions(+) create mode 100644 common/src/indexer_service/http/config.rs create mode 100644 common/src/indexer_service/http/indexer_service.rs create mode 100644 common/src/indexer_service/http/mod.rs create mode 100644 common/src/indexer_service/http/request_handler.rs create mode 100644 common/src/indexer_service/http/scalar_receipt_header.rs create mode 100644 common/src/indexer_service/mod.rs diff --git a/Cargo.lock b/Cargo.lock index aae5fea9..3dadf0ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -697,6 +697,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", + "headers", "http", "http-body", "hyper", @@ -2632,6 +2633,17 @@ dependencies = [ "http", ] +[[package]] +name = "headers-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f33cf300c485e3cbcba0235013fcc768723451c9b84d1b31aa7fec0491ac9a11" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "heck" version = "0.4.1" @@ -2909,12 +2921,16 @@ dependencies = [ "alloy-sol-types", "anyhow", "arc-swap", + "async-trait", + "axum 0.6.20", "env_logger", "ethers", "ethers-core", "eventuals", "faux", "graphql-http", + "headers", + "headers-derive", "keccak-hash", "lazy_static", "lru", @@ -2929,6 +2945,7 @@ dependencies = [ "tap_core", "test-log", "thegraph", + "thiserror", "tokio", "tracing", "wiremock", diff --git a/common/Cargo.toml b/common/Cargo.toml index fcc27f0e..1f238e51 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -34,6 +34,11 @@ thegraph = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } tracing = "0.1.34" graphql-http = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } tap_core = "0.7.0" +axum = { version = "0.6.20", default_features = true, features = ["headers"] } +thiserror = "1.0.49" +async-trait = "0.1.74" +headers-derive = "0.1.1" +headers = "0.3.9" [dev-dependencies] env_logger = "0.9.0" diff --git a/common/src/indexer_service/http/config.rs b/common/src/indexer_service/http/config.rs new file mode 100644 index 00000000..a408c4bb --- /dev/null +++ b/common/src/indexer_service/http/config.rs @@ -0,0 +1,49 @@ +use std::net::SocketAddr; + +use alloy_primitives::Address; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct DatabaseConfig { + pub postgres_url: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct NetworkSubgraphConfig { + pub query_url: String, + pub syncing_interval: u64, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct EscrowSubgraphConfig { + pub query_url: String, + pub syncing_interval: u64, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ServerConfig { + pub host_and_port: SocketAddr, + pub url_prefix: String, + pub free_query_auth_token: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct IndexerServiceConfig { + pub indexer: IndexerConfig, + pub server: ServerConfig, + pub database: DatabaseConfig, + pub network_subgraph: NetworkSubgraphConfig, + pub escrow_subgraph: EscrowSubgraphConfig, + pub graph_network: GraphNetworkConfig, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct GraphNetworkConfig { + pub id: u64, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct IndexerConfig { + pub indexer_address: Address, + pub operator_mnemonic: String, +} diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs new file mode 100644 index 00000000..fb988ea2 --- /dev/null +++ b/common/src/indexer_service/http/indexer_service.rs @@ -0,0 +1,231 @@ +use std::{collections::HashMap, fmt::Debug, path::PathBuf, sync::Arc, time::Duration}; + +use alloy_primitives::Address; +use alloy_sol_types::eip712_domain; +use anyhow; +use axum::{ + async_trait, + body::Body, + response::{IntoResponse, Response}, + routing::{get, post}, + Router, Server, +}; +use eventuals::Eventual; +use reqwest::StatusCode; +use serde::{de::DeserializeOwned, Serialize}; +use sqlx::postgres::PgPoolOptions; +use thegraph::types::DeploymentId; +use thiserror::Error; + +use crate::{ + prelude::{ + attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, + AttestationSigner, SubgraphClient, + }, + tap_manager::TapManager, +}; + +use super::{request_handler::request_handler, IndexerServiceConfig}; + +pub trait IsAttestable { + fn is_attestable(&self) -> bool; +} + +#[async_trait] +pub trait IndexerServiceImpl { + type Error: std::error::Error; + type Request: DeserializeOwned + Send + Debug + Serialize; + type Response: IntoResponse + Serialize + IsAttestable; + type State: Send + Sync; + + async fn process_request( + &self, + manifest_id: DeploymentId, + request: Self::Request, + ) -> Result<(Self::Request, Self::Response), Self::Error>; +} + +#[derive(Debug, Error)] +pub enum IndexerServiceError +where + E: std::error::Error, +{ + #[error("No receipt provided with the request")] + NoReceipt, + #[error("Issues with provided receipt: {0}")] + ReceiptError(anyhow::Error), + #[error("Service is not ready yet, try again in a moment")] + ServiceNotReady, + #[error("No attestation signer found for allocation `{0}`")] + NoSignerForAllocation(Address), + #[error("No attestation signer found for manifest `{0}`")] + NoSignerForManifest(DeploymentId), + #[error("Invalid request body: {0}")] + InvalidRequest(anyhow::Error), + #[error("Error while processing the request: {0}")] + ProcessingError(E), + #[error("No receipt or free query auth token provided")] + Unauthorized, + #[error("Invalid free query auth token: {0}")] + InvalidFreeQueryAuthToken(String), + #[error("Failed to sign attestation")] + FailedToSignAttestation, + #[error("Failed to provide attestation")] + FailedToProvideAttestation, + #[error("Failed to provide response")] + FailedToProvideResponse, +} + +impl From<&IndexerServiceError> for StatusCode +where + E: std::error::Error, +{ + fn from(err: &IndexerServiceError) -> Self { + use IndexerServiceError::*; + + match err { + ServiceNotReady => StatusCode::SERVICE_UNAVAILABLE, + + NoReceipt => StatusCode::PAYMENT_REQUIRED, + + Unauthorized => StatusCode::UNAUTHORIZED, + + NoSignerForAllocation(_) => StatusCode::INTERNAL_SERVER_ERROR, + NoSignerForManifest(_) => StatusCode::INTERNAL_SERVER_ERROR, + FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR, + FailedToProvideAttestation => StatusCode::INTERNAL_SERVER_ERROR, + FailedToProvideResponse => StatusCode::INTERNAL_SERVER_ERROR, + + ReceiptError(_) => StatusCode::BAD_REQUEST, + InvalidRequest(_) => StatusCode::BAD_REQUEST, + InvalidFreeQueryAuthToken(_) => StatusCode::BAD_REQUEST, + ProcessingError(_) => StatusCode::BAD_REQUEST, + } + } +} + +// Tell axum how to convert `RpcError` into a response. +impl IntoResponse for IndexerServiceError +where + E: std::error::Error, +{ + fn into_response(self) -> Response { + (StatusCode::from(&self), self.to_string()).into_response() + } +} + +pub struct IndexerServiceOptions +where + I: IndexerServiceImpl + Sync + Send + 'static, +{ + pub service_impl: I, + pub config: IndexerServiceConfig, + pub extra_routes: Router>, Body>, +} + +pub struct IndexerServiceState +where + I: IndexerServiceImpl + Sync + Send + 'static, +{ + pub config: IndexerServiceConfig, + pub attestation_signers: Eventual>, + pub tap_manager: TapManager, + pub service_impl: Arc, +} + +pub struct IndexerService {} + +impl IndexerService { + pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> + where + I: IndexerServiceImpl + Sync + Send + 'static, + { + let network_subgraph = Box::leak(Box::new(SubgraphClient::new( + "network-subgraph", + &options.config.network_subgraph.query_url, + )?)); + + // Identify the dispute manager for the configured network + let dispute_manager = dispute_manager( + network_subgraph, + options.config.graph_network.id, + Duration::from_secs(3600), + ); + + // Monitor the indexer's own allocations + let allocations = indexer_allocations( + network_subgraph, + options.config.indexer.indexer_address, + options.config.graph_network.id, + Duration::from_secs(options.config.network_subgraph.syncing_interval), + ); + + // Maintain an up-to-date set of attestation signers, one for each + // allocation + let attestation_signers = attestation_signers( + allocations.clone(), + options.config.indexer.operator_mnemonic.clone(), + options.config.graph_network.id.into(), + dispute_manager, + ); + + let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( + "escrow-subgraph", + &options.config.escrow_subgraph.query_url, + )?)); + + let escrow_accounts = escrow_accounts( + escrow_subgraph, + options.config.indexer.indexer_address, + Duration::from_secs(options.config.escrow_subgraph.syncing_interval), + ); + + // Establish Database connection necessary for serving indexer management + // requests with defined schema + // Note: Typically, you'd call `sqlx::migrate!();` here to sync the models + // which defaults to files in "./migrations" to sync the database; + // however, this can cause conflicts with the migrations run by indexer + // agent. Hence we leave syncing and migrating entirely to the agent and + // assume the models are up to date in the service. + let database = PgPoolOptions::new() + .max_connections(50) + .acquire_timeout(Duration::from_secs(30)) + .connect(&options.config.database.postgres_url) + .await?; + + let tap_manager = TapManager::new( + database, + allocations, + escrow_accounts, + // TODO: arguments for eip712_domain should be a config + eip712_domain! { + name: "TapManager", + version: "1", + verifying_contract: options.config.indexer.indexer_address, + }, + ); + + let state = Arc::new(IndexerServiceState { + config: options.config.clone(), + attestation_signers, + tap_manager, + service_impl: Arc::new(options.service_impl), + }); + + let router = Router::new() + .route("/", get("Service is up and running")) + .route( + PathBuf::from(options.config.server.url_prefix) + .join("manifests/:id") + .to_str() + .expect("Failed to set up `/manifest/:id` route"), + post(request_handler::), + ) + .merge(options.extra_routes) + .with_state(state); + + Ok(Server::bind(&options.config.server.host_and_port) + .serve(router.into_make_service()) + .await?) + } +} diff --git a/common/src/indexer_service/http/mod.rs b/common/src/indexer_service/http/mod.rs new file mode 100644 index 00000000..26ab0f83 --- /dev/null +++ b/common/src/indexer_service/http/mod.rs @@ -0,0 +1,12 @@ +mod config; +mod indexer_service; +mod request_handler; +mod scalar_receipt_header; + +pub use config::{ + DatabaseConfig, EscrowSubgraphConfig, GraphNetworkConfig, IndexerConfig, IndexerServiceConfig, + NetworkSubgraphConfig, ServerConfig, +}; +pub use indexer_service::{ + IndexerService, IndexerServiceImpl, IndexerServiceOptions, IsAttestable, +}; diff --git a/common/src/indexer_service/http/request_handler.rs b/common/src/indexer_service/http/request_handler.rs new file mode 100644 index 00000000..df0fcdf9 --- /dev/null +++ b/common/src/indexer_service/http/request_handler.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use axum::{ + body::Bytes, + extract::{Path, State}, + http::{HeaderMap, HeaderValue}, + response::IntoResponse, + TypedHeader, +}; +use log::info; +use reqwest::StatusCode; +use thegraph::types::DeploymentId; +use tracing::info; + +use crate::{indexer_service::http::IsAttestable, prelude::AttestationSigner}; + +use super::{ + indexer_service::{IndexerServiceError, IndexerServiceState}, + scalar_receipt_header::ScalarReceipt, + IndexerServiceImpl, +}; + +pub async fn request_handler( + Path(manifest_id): Path, + TypedHeader(receipt): TypedHeader, + State(state): State>>, + headers: HeaderMap, + body: Bytes, +) -> Result> +where + I: IndexerServiceImpl + Sync + Send + 'static, +{ + info!("Handling request for deployment `{manifest_id}`"); + + let request = + serde_json::from_slice(&body).map_err(|e| IndexerServiceError::InvalidRequest(e.into()))?; + + let mut attestation_signer: Option = None; + + if let Some(receipt) = receipt.into_signed_receipt() { + let allocation_id = receipt.message.allocation_id; + + // Verify the receipt and store it in the database + state + .tap_manager + .verify_and_store_receipt(receipt) + .await + .map_err(IndexerServiceError::ReceiptError)?; + + // Check if we have an attestation signer for the allocation the receipt was created for + let signers = state + .attestation_signers + .value_immediate() + .ok_or_else(|| IndexerServiceError::ServiceNotReady)?; + + attestation_signer = Some( + signers + .get(&allocation_id) + .cloned() + .ok_or_else(|| (IndexerServiceError::NoSignerForAllocation(allocation_id)))?, + ); + } else if state.config.server.free_query_auth_token.is_some() + && state.config.server.free_query_auth_token + != headers + .get("free-query-auth-token") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()) + { + return Err(IndexerServiceError::Unauthorized); + } + + let (request, response) = state + .service_impl + .process_request(manifest_id, request) + .await + .map_err(IndexerServiceError::ProcessingError)?; + + let attestation = match (response.is_attestable(), attestation_signer) { + (false, _) => None, + (true, None) => return Err(IndexerServiceError::NoSignerForManifest(manifest_id)), + (true, Some(signer)) => { + let req = serde_json::to_string(&request) + .map_err(|_| IndexerServiceError::FailedToSignAttestation)?; + let res = serde_json::to_string(&response) + .map_err(|_| IndexerServiceError::FailedToSignAttestation)?; + Some(signer.create_attestation(&req, &res)) + } + }; + + let mut headers = HeaderMap::new(); + if let Some(attestation) = attestation { + let raw_attestation = serde_json::to_string(&attestation) + .map_err(|_| IndexerServiceError::FailedToProvideAttestation)?; + let header_value = HeaderValue::from_str(&raw_attestation) + .map_err(|_| IndexerServiceError::FailedToProvideAttestation)?; + headers.insert("graph-attestation", header_value); + } + + Ok((StatusCode::OK, headers, response)) +} diff --git a/common/src/indexer_service/http/scalar_receipt_header.rs b/common/src/indexer_service/http/scalar_receipt_header.rs new file mode 100644 index 00000000..c8398eef --- /dev/null +++ b/common/src/indexer_service/http/scalar_receipt_header.rs @@ -0,0 +1,54 @@ +use std::ops::Deref; + +use headers::{Header, HeaderName, HeaderValue}; +use lazy_static::lazy_static; +use tap_core::tap_manager::SignedReceipt; + +pub struct ScalarReceipt(Option); + +impl ScalarReceipt { + pub fn into_signed_receipt(self) -> Option { + self.0 + } +} + +impl Deref for ScalarReceipt { + type Target = Option; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +lazy_static! { + static ref SCALAR_RECEIPT: HeaderName = HeaderName::from_static("scalar-receipt"); +} + +impl Header for ScalarReceipt { + fn name() -> &'static HeaderName { + &SCALAR_RECEIPT + } + + fn decode<'i, I>(values: &mut I) -> Result + where + I: Iterator, + { + let value = values.next(); + let raw_receipt = value + .map(|value| value.to_str()) + .transpose() + .map_err(|_| headers::Error::invalid())?; + let parsed_receipt = raw_receipt + .map(serde_json::from_str) + .transpose() + .map_err(|_| headers::Error::invalid())?; + Ok(ScalarReceipt(parsed_receipt)) + } + + fn encode(&self, _values: &mut E) + where + E: Extend, + { + unimplemented!() + } +} diff --git a/common/src/indexer_service/mod.rs b/common/src/indexer_service/mod.rs new file mode 100644 index 00000000..3883215f --- /dev/null +++ b/common/src/indexer_service/mod.rs @@ -0,0 +1 @@ +pub mod http; diff --git a/common/src/lib.rs b/common/src/lib.rs index a4796277..4fac5d54 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -6,6 +6,7 @@ pub mod attestations; pub mod escrow_accounts; pub mod graphql; pub mod indexer_errors; +pub mod indexer_service; pub mod metrics; pub mod signature_verification; pub mod subgraph_client; From 3fa8c8c962a1435ef4c75040e7365ceaf12becf5 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Wed, 25 Oct 2023 10:41:03 +0200 Subject: [PATCH 02/18] fix: update to latest subgraph client API changes --- common/src/indexer_service/http/config.rs | 9 +++++++ .../indexer_service/http/indexer_service.rs | 26 +++++++++++++++---- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/common/src/indexer_service/http/config.rs b/common/src/indexer_service/http/config.rs index a408c4bb..aa85a072 100644 --- a/common/src/indexer_service/http/config.rs +++ b/common/src/indexer_service/http/config.rs @@ -2,6 +2,7 @@ use std::net::SocketAddr; use alloy_primitives::Address; use serde::{Deserialize, Serialize}; +use thegraph::types::DeploymentId; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct DatabaseConfig { @@ -10,12 +11,14 @@ pub struct DatabaseConfig { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct NetworkSubgraphConfig { + pub deployment: Option, pub query_url: String, pub syncing_interval: u64, } #[derive(Clone, Debug, Deserialize, Serialize)] pub struct EscrowSubgraphConfig { + pub deployment: Option, pub query_url: String, pub syncing_interval: u64, } @@ -32,11 +35,17 @@ pub struct IndexerServiceConfig { pub indexer: IndexerConfig, pub server: ServerConfig, pub database: DatabaseConfig, + pub graph_node: Option, pub network_subgraph: NetworkSubgraphConfig, pub escrow_subgraph: EscrowSubgraphConfig, pub graph_network: GraphNetworkConfig, } +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct GraphNodeConfig { + pub query_base_url: String, +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct GraphNetworkConfig { pub id: u64, diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index fb988ea2..56dbbffc 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -20,7 +20,7 @@ use thiserror::Error; use crate::{ prelude::{ attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, - AttestationSigner, SubgraphClient, + AttestationSigner, DeploymentDetails, SubgraphClient, }, tap_manager::TapManager, }; @@ -141,8 +141,16 @@ impl IndexerService { I: IndexerServiceImpl + Sync + Send + 'static, { let network_subgraph = Box::leak(Box::new(SubgraphClient::new( - "network-subgraph", - &options.config.network_subgraph.query_url, + options + .config + .graph_node + .as_ref() + .zip(options.config.network_subgraph.deployment) + .map(|(graph_node, deployment)| { + DeploymentDetails::for_graph_node(&graph_node.query_base_url, deployment) + }) + .transpose()?, + DeploymentDetails::for_query_url(&options.config.network_subgraph.query_url)?, )?)); // Identify the dispute manager for the configured network @@ -170,8 +178,16 @@ impl IndexerService { ); let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( - "escrow-subgraph", - &options.config.escrow_subgraph.query_url, + options + .config + .graph_node + .as_ref() + .zip(options.config.escrow_subgraph.deployment) + .map(|(graph_node, deployment)| { + DeploymentDetails::for_graph_node(&graph_node.query_base_url, deployment) + }) + .transpose()?, + DeploymentDetails::for_query_url(&options.config.escrow_subgraph.query_url)?, )?)); let escrow_accounts = escrow_accounts( From 47d40c8928f396637e17b4d160543266bb95e49d Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Wed, 25 Oct 2023 15:02:12 +0200 Subject: [PATCH 03/18] fix: expect free query auth token to start with "Bearer " --- common/src/indexer_service/http/request_handler.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/src/indexer_service/http/request_handler.rs b/common/src/indexer_service/http/request_handler.rs index df0fcdf9..4fad5576 100644 --- a/common/src/indexer_service/http/request_handler.rs +++ b/common/src/indexer_service/http/request_handler.rs @@ -62,8 +62,9 @@ where } else if state.config.server.free_query_auth_token.is_some() && state.config.server.free_query_auth_token != headers - .get("free-query-auth-token") + .get("authorization") .and_then(|v| v.to_str().ok()) + .and_then(|s| s.strip_prefix("Bearer ")) .map(|s| s.to_string()) { return Err(IndexerServiceError::Unauthorized); From 51f085e3a8efd4268b4e3d346480801ff62fff78 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Wed, 25 Oct 2023 20:49:52 +0200 Subject: [PATCH 04/18] feat: implement /version endpoint in http indexer service --- Cargo.lock | 77 ++++++++++++++++++- Cargo.toml | 6 +- common/Cargo.toml | 1 + .../indexer_service/http/indexer_service.rs | 26 ++++++- common/src/indexer_service/http/mod.rs | 2 +- service/Cargo.toml | 1 + 6 files changed, 104 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3dadf0ef..c52a8fcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -831,6 +831,15 @@ dependencies = [ "serde", ] +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bit-set" version = "0.5.3" @@ -956,6 +965,49 @@ dependencies = [ "serde", ] +[[package]] +name = "build-info" +version = "0.0.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "155eb070980e96aeb4ef3b8620b0febb2ae5e17451dc1b329681bdd4eb0a94e1" +dependencies = [ + "build-info-common", + "build-info-proc", +] + +[[package]] +name = "build-info-common" +version = "0.0.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8209c0c2b13da7e5f7202e591b6d41b46c8f0e78d031dedf5cff71cc8c6ec773" +dependencies = [ + "chrono", + "derive_more", + "semver 1.0.20", + "serde", +] + +[[package]] +name = "build-info-proc" +version = "0.0.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fc1874cb1995691fb01f9bb56e75f9660d2614e74607fa71c08a8b3bd7e30e4" +dependencies = [ + "anyhow", + "base64 0.21.4", + "bincode", + "build-info-common", + "chrono", + "num-bigint", + "num-traits", + "proc-macro-error", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.38", + "zstd 0.12.4", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -2923,6 +2975,7 @@ dependencies = [ "arc-swap", "async-trait", "axum 0.6.20", + "build-info", "env_logger", "ethers", "ethers-core", @@ -5218,6 +5271,7 @@ dependencies = [ "ethers-core", "eventuals", "faux", + "graphql-http", "hex", "hex-literal", "hyper", @@ -6911,7 +6965,7 @@ dependencies = [ "pbkdf2 0.11.0", "sha1", "time", - "zstd", + "zstd 0.11.2+zstd.1.5.2", ] [[package]] @@ -6920,7 +6974,16 @@ version = "0.11.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" dependencies = [ - "zstd-safe", + "zstd-safe 5.0.2+zstd.1.5.2", +] + +[[package]] +name = "zstd" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" +dependencies = [ + "zstd-safe 6.0.6", ] [[package]] @@ -6933,6 +6996,16 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "zstd-safe" +version = "6.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" +dependencies = [ + "libc", + "zstd-sys", +] + [[package]] name = "zstd-sys" version = "2.0.9+zstd.1.5.5" diff --git a/Cargo.toml b/Cargo.toml index 28f64562..008c8669 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,5 @@ [workspace] -members = [ - "common", - "service", - "tap-agent", -] +members = ["common", "service", "tap-agent"] resolver = "2" [profile.dev.package."*"] diff --git a/common/Cargo.toml b/common/Cargo.toml index 1f238e51..18c47eb1 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -39,6 +39,7 @@ thiserror = "1.0.49" async-trait = "0.1.74" headers-derive = "0.1.1" headers = "0.3.9" +build-info = "0.0.34" [dev-dependencies] env_logger = "0.9.0" diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 56dbbffc..55341158 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -8,8 +8,9 @@ use axum::{ body::Body, response::{IntoResponse, Response}, routing::{get, post}, - Router, Server, + Json, Router, Server, }; +use build_info::BuildInfo; use eventuals::Eventual; use reqwest::StatusCode; use serde::{de::DeserializeOwned, Serialize}; @@ -114,12 +115,34 @@ where } } +#[derive(Clone, Serialize)] +pub struct IndexerServiceRelease { + version: String, + dependencies: HashMap, +} + +impl From<&BuildInfo> for IndexerServiceRelease { + fn from(value: &BuildInfo) -> Self { + Self { + version: value.crate_info.version.to_string(), + dependencies: HashMap::from_iter( + value + .crate_info + .dependencies + .iter() + .map(|d| (d.name.clone(), d.version.to_string())), + ), + } + } +} + pub struct IndexerServiceOptions where I: IndexerServiceImpl + Sync + Send + 'static, { pub service_impl: I, pub config: IndexerServiceConfig, + pub release: IndexerServiceRelease, pub extra_routes: Router>, Body>, } @@ -230,6 +253,7 @@ impl IndexerService { let router = Router::new() .route("/", get("Service is up and running")) + .route("/version", get(Json(options.release))) .route( PathBuf::from(options.config.server.url_prefix) .join("manifests/:id") diff --git a/common/src/indexer_service/http/mod.rs b/common/src/indexer_service/http/mod.rs index 26ab0f83..bf7b580f 100644 --- a/common/src/indexer_service/http/mod.rs +++ b/common/src/indexer_service/http/mod.rs @@ -8,5 +8,5 @@ pub use config::{ NetworkSubgraphConfig, ServerConfig, }; pub use indexer_service::{ - IndexerService, IndexerServiceImpl, IndexerServiceOptions, IsAttestable, + IndexerService, IndexerServiceImpl, IndexerServiceOptions, IndexerServiceRelease, IsAttestable, }; diff --git a/service/Cargo.toml b/service/Cargo.toml index ce264f17..689df0ea 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -59,6 +59,7 @@ alloy-primitives = { version = "0.5.2", features = ["serde"] } alloy-sol-types = "0.5.2" lazy_static = "1.4.0" thegraph = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } +graphql-http = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } [dev-dependencies] faux = "0.1.10" From 90004819bf5513def8145a64a0cfc3038d298e95 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Thu, 26 Oct 2023 14:18:25 +0200 Subject: [PATCH 05/18] refactor: use IndexerServiceRelease for /version endpoint --- Cargo.lock | 69 +++++++++++++++++++++++++++++++++++++++ service/Cargo.toml | 4 +++ service/build.rs | 5 +++ service/src/main.rs | 11 ++++--- service/src/server/mod.rs | 7 ++-- service/src/util.rs | 56 ------------------------------- 6 files changed, 87 insertions(+), 65 deletions(-) create mode 100644 service/build.rs diff --git a/Cargo.lock b/Cargo.lock index c52a8fcc..d9862dd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -975,6 +975,26 @@ dependencies = [ "build-info-proc", ] +[[package]] +name = "build-info-build" +version = "0.0.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b69d6331ec579144d39e1c128f343d23e9b837617df1bed4ed032e141f83f06a" +dependencies = [ + "anyhow", + "base64 0.21.4", + "bincode", + "build-info-common", + "cargo_metadata", + "chrono", + "git2", + "glob", + "pretty_assertions", + "rustc_version 0.4.0", + "serde_json", + "zstd 0.12.4", +] + [[package]] name = "build-info-common" version = "0.0.34" @@ -2529,6 +2549,19 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" +[[package]] +name = "git2" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbf97ba92db08df386e10c8ede66a2a0369bd277090afd8710e19e38de9ec0cd" +dependencies = [ + "bitflags 2.4.1", + "libc", + "libgit2-sys", + "log", + "url", +] + [[package]] name = "glob" version = "0.3.1" @@ -3396,6 +3429,18 @@ version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +[[package]] +name = "libgit2-sys" +version = "0.16.1+1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2a2bb3680b094add03bb3732ec520ece34da31a8cd2d633d1389d0f0fb60d0c" +dependencies = [ + "cc", + "libc", + "libz-sys", + "pkg-config", +] + [[package]] name = "libm" version = "0.2.8" @@ -3413,6 +3458,18 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libz-sys" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d97137b25e321a73eef1418d1d5d2eda4d77e12813f8e6dead84bc52c5870a7b" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.10" @@ -4234,6 +4291,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" +[[package]] +name = "pretty_assertions" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af7cee1a6c8a5b9208b3cb1061f10c0cb689087b3d8ce85fb9d2dd7a29b6ba66" +dependencies = [ + "diff", + "yansi", +] + [[package]] name = "prettyplease" version = "0.2.15" @@ -5262,6 +5329,8 @@ dependencies = [ "async-graphql-axum", "autometrics", "axum 0.5.17", + "build-info", + "build-info-build", "cargo-husky", "clap", "confy", diff --git a/service/Cargo.toml b/service/Cargo.toml index 689df0ea..9c7e6b6d 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -60,6 +60,7 @@ alloy-sol-types = "0.5.2" lazy_static = "1.4.0" thegraph = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } graphql-http = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } +build-info = "0.0.34" [dev-dependencies] faux = "0.1.10" @@ -68,6 +69,9 @@ indexer-common = { path = "../common", features = ["mock"] } test-log = "0.2.12" wiremock = "0.5.19" +[build-dependencies] +build-info-build = "0.0.34" + # [[bin]] # name = "indexer-native" # path = "native" diff --git a/service/build.rs b/service/build.rs new file mode 100644 index 00000000..0b2af0d0 --- /dev/null +++ b/service/build.rs @@ -0,0 +1,5 @@ +use build_info_build::DependencyDepth; + +fn main() { + build_info_build::build_script().collect_dependencies(DependencyDepth::Depth(1)); +} diff --git a/service/src/main.rs b/service/src/main.rs index 6d676b83..064ea507 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -9,14 +9,14 @@ use std::{net::SocketAddr, str::FromStr, time::Duration}; use tracing::info; use indexer_common::{ + indexer_service::http::IndexerServiceRelease, prelude::{ - attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, SubgraphClient, + attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, + DeploymentDetails, SubgraphClient, TapManager, }, - subgraph_client::DeploymentDetails, - tap_manager::TapManager, }; -use util::{package_version, shutdown_signal}; +use util::shutdown_signal; use crate::{ common::database, config::Cli, metrics::handle_serve_metrics, query_processor::QueryProcessor, @@ -54,7 +54,8 @@ async fn main() -> Result<(), std::io::Error> { // Parse basic configurations let config = Cli::args(); - let release = package_version().expect("Failed to resolve for release version"); + build_info::build_info!(fn build_info); + let release = IndexerServiceRelease::from(build_info()); // Initialize graph-node client let graph_node = graph_node::GraphNodeInstance::new( diff --git a/service/src/server/mod.rs b/service/src/server/mod.rs index daebfb14..329f2cc3 100644 --- a/service/src/server/mod.rs +++ b/service/src/server/mod.rs @@ -19,12 +19,11 @@ use tower_http::{ }; use tracing::Level; -use indexer_common::prelude::SubgraphClient; +use indexer_common::{indexer_service::http::IndexerServiceRelease, prelude::SubgraphClient}; use crate::{ query_processor::QueryProcessor, server::routes::{network_ratelimiter, slow_ratelimiter}, - util::PackageVersion, }; pub mod routes; @@ -32,7 +31,7 @@ pub mod routes; #[derive(Clone)] pub struct ServerOptions { pub port: Option, - pub release: PackageVersion, + pub release: IndexerServiceRelease, pub query_processor: QueryProcessor, pub free_query_auth_token: Option, pub graph_node_status_endpoint: String, @@ -47,7 +46,7 @@ impl ServerOptions { #[allow(clippy::too_many_arguments)] pub fn new( port: Option, - release: PackageVersion, + release: IndexerServiceRelease, query_processor: QueryProcessor, free_query_auth_token: Option, graph_node_status_endpoint: String, diff --git a/service/src/util.rs b/service/src/util.rs index 776a750f..0c64abc6 100644 --- a/service/src/util.rs +++ b/service/src/util.rs @@ -2,11 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use ethers::signers::WalletError; -use serde::Serialize; -use std::collections::HashMap; -use std::fs; use tokio::signal; -use toml::Value; use tracing::{ info, subscriber::{set_global_default, SetGlobalDefaultError}, @@ -15,58 +11,6 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber}; use crate::common::address::{build_wallet, wallet_address}; -/// Struct for version control -#[derive(Serialize, Debug, Clone)] -pub struct PackageVersion { - version: String, - dependencies: HashMap, -} - -/// Read the manfiest -fn read_manifest() -> Result { - let toml_string = fs::read_to_string("service/Cargo.toml")?; - let toml_value: Value = toml::from_str(&toml_string)?; - Ok(toml_value) -} - -/// Parse package versioning from the manifest -pub fn package_version() -> Result { - read_manifest().map(|toml_file| { - let pkg = toml_file.as_table().unwrap(); - let version = pkg - .get("package") - .and_then(|p| p.get("version")) - .unwrap() - .as_str() - .unwrap() - .to_string(); - let dependencies = pkg - .get("dependencies") - .and_then(|d| d.as_table()) - .expect("Parse package dependencies"); - let indexer_native = dependencies.get("indexer-native").map(|d| { - d.as_str() - .expect("Parse indexer-service dependency version") - .to_string() - }); - - let release = PackageVersion { - version, - dependencies: match indexer_native { - Some(indexer_native_version) => { - let mut map = HashMap::new(); - map.insert("indexer-native".to_string(), indexer_native_version); - map - } - None => HashMap::new(), - }, - }; - info!("Running package version {:#?}", release); - - release - }) -} - /// Validate that private key as an Eth wallet pub fn public_key(value: &str) -> Result { // The wallet can be stored instead of the original private key From e0d36ef9964344e150d9070fd1a9df5f127a1d80 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Thu, 26 Oct 2023 14:35:39 +0200 Subject: [PATCH 06/18] feat: add prometheus metrics support to http indexer service --- Cargo.lock | 272 ++++++++++++++++-- common/Cargo.toml | 1 + common/src/indexer_service/http/config.rs | 1 + .../indexer_service/http/indexer_service.rs | 29 +- 4 files changed, 284 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d9862dd9..129727f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -629,16 +629,36 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52c47a2283b04388a392a7bff8a0f68db615d967e4d177068f573773fcefa91b" dependencies = [ - "autometrics-macros", + "autometrics-macros 0.3.3", "const_format", - "metrics-exporter-prometheus", + "metrics-exporter-prometheus 0.11.0", "once_cell", - "opentelemetry-prometheus", - "opentelemetry_api", - "opentelemetry_sdk", + "opentelemetry-prometheus 0.11.0", + "opentelemetry_api 0.18.0", + "opentelemetry_sdk 0.18.0", "prometheus", ] +[[package]] +name = "autometrics" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95cef5eb1e18adfb843202bf71587174e480ed67c0ca3e976bf40e82d9adce86" +dependencies = [ + "autometrics-macros 0.6.0", + "cfg_aliases", + "http", + "linkme", + "metrics-exporter-prometheus 0.12.1", + "once_cell", + "opentelemetry-prometheus 0.13.0", + "opentelemetry_sdk 0.20.0", + "prometheus", + "prometheus-client", + "spez", + "thiserror", +] + [[package]] name = "autometrics-macros" version = "0.3.3" @@ -651,6 +671,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "autometrics-macros" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "543250f01aa62c3b2666e327b335be532845a35e440eb984f5e6bad69106833d" +dependencies = [ + "percent-encoding", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "axum" version = "0.5.17" @@ -907,7 +939,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b" dependencies = [ "borsh-derive", - "hashbrown 0.13.2", + "hashbrown 0.12.3", ] [[package]] @@ -1158,6 +1190,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "chrono" version = "0.4.31" @@ -1737,6 +1775,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "dtoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653" + [[package]] name = "dunce" version = "1.0.4" @@ -2659,9 +2703,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.13.2" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" dependencies = [ "ahash 0.8.3", ] @@ -3007,6 +3051,7 @@ dependencies = [ "anyhow", "arc-swap", "async-trait", + "autometrics 0.6.0", "axum 0.6.20", "build-info", "env_logger", @@ -3470,6 +3515,26 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linkme" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ed2ee9464ff9707af8e9ad834cffa4802f072caad90639c583dd3c62e6e608" +dependencies = [ + "linkme-impl", +] + +[[package]] +name = "linkme-impl" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba125974b109d512fccbc6c0244e7580143e460895dfd6ea7f8bbb692fd94396" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "linux-raw-sys" version = "0.4.10" @@ -3510,6 +3575,15 @@ dependencies = [ "libc", ] +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "matchers" version = "0.1.0" @@ -3563,10 +3637,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b9b8653cec6897f73b519a43fba5ee3d50f62fe9af80b428accdcc093b4a849" dependencies = [ "ahash 0.7.6", - "metrics-macros", + "metrics-macros 0.6.0", "portable-atomic 0.3.20", ] +[[package]] +name = "metrics" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" +dependencies = [ + "ahash 0.8.3", + "metrics-macros 0.7.0", + "portable-atomic 1.4.3", +] + [[package]] name = "metrics-exporter-prometheus" version = "0.11.0" @@ -3574,11 +3659,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8603921e1f54ef386189335f288441af761e0fc61bcb552168d9cedfe63ebc70" dependencies = [ "indexmap 1.9.3", - "metrics", - "metrics-util", + "metrics 0.20.1", + "metrics-util 0.14.0", "parking_lot", "portable-atomic 0.3.20", - "quanta", + "quanta 0.10.1", + "thiserror", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a4964177ddfdab1e3a2b37aec7cf320e14169abb0ed73999f558136409178d5" +dependencies = [ + "base64 0.21.4", + "indexmap 1.9.3", + "metrics 0.21.1", + "metrics-util 0.15.1", + "quanta 0.11.1", "thiserror", ] @@ -3593,6 +3692,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "metrics-macros" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "metrics-util" version = "0.14.0" @@ -3602,11 +3712,26 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.12.3", - "metrics", + "metrics 0.20.1", "num_cpus", "parking_lot", "portable-atomic 0.3.20", - "quanta", + "quanta 0.10.1", + "sketches-ddsketch", +] + +[[package]] +name = "metrics-util" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.13.1", + "metrics 0.21.1", + "num_cpus", + "quanta 0.11.1", "sketches-ddsketch", ] @@ -3896,8 +4021,8 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e" dependencies = [ - "opentelemetry_api", - "opentelemetry_sdk", + "opentelemetry_api 0.18.0", + "opentelemetry_sdk 0.18.0", ] [[package]] @@ -3911,6 +4036,19 @@ dependencies = [ "protobuf", ] +[[package]] +name = "opentelemetry-prometheus" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7d81bc254e2d572120363a2b16cdb0d715d301b5789be0cfc26ad87e4e10e53" +dependencies = [ + "once_cell", + "opentelemetry_api 0.20.0", + "opentelemetry_sdk 0.20.0", + "prometheus", + "protobuf", +] + [[package]] name = "opentelemetry_api" version = "0.18.0" @@ -3927,6 +4065,22 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry_api" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a81f725323db1b1206ca3da8bb19874bbd3f57c3bcd59471bfb04525b265b9b" +dependencies = [ + "futures-channel", + "futures-util", + "indexmap 1.9.3", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + [[package]] name = "opentelemetry_sdk" version = "0.18.0" @@ -3941,18 +4095,44 @@ dependencies = [ "futures-executor", "futures-util", "once_cell", - "opentelemetry_api", + "opentelemetry_api 0.18.0", "percent-encoding", "rand 0.8.5", "thiserror", ] +[[package]] +name = "opentelemetry_sdk" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa8e705a0612d48139799fcbaba0d4a90f06277153e43dd2bdc16c6f0edd8026" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api 0.20.0", + "ordered-float", + "regex", + "thiserror", +] + [[package]] name = "option-ext" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "3.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc" +dependencies = [ + "num-traits", +] + [[package]] name = "overload" version = "0.1.1" @@ -4392,6 +4572,29 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prometheus-client" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c99afa9a01501019ac3a14d71d9f94050346f55ca471ce90c799a15c58f61e2" +dependencies = [ + "dtoa", + "itoa", + "parking_lot", + "prometheus-client-derive-encode", +] + +[[package]] +name = "prometheus-client-derive-encode" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "proptest" version = "1.3.1" @@ -4454,6 +4657,22 @@ dependencies = [ "winapi", ] +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -5327,7 +5546,7 @@ dependencies = [ "anyhow", "async-graphql 4.0.16", "async-graphql-axum", - "autometrics", + "autometrics 0.3.3", "axum 0.5.17", "build-info", "build-info-build", @@ -5555,6 +5774,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "spez" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c87e960f4dca2788eeb86bbdde8dd246be8948790b7618d656e68f9b720a86e8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "spin" version = "0.5.2" @@ -6627,6 +6857,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf-8" version = "0.7.6" diff --git a/common/Cargo.toml b/common/Cargo.toml index 18c47eb1..eb95aadc 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -40,6 +40,7 @@ async-trait = "0.1.74" headers-derive = "0.1.1" headers = "0.3.9" build-info = "0.0.34" +autometrics = { version = "0.6.0", features = ["prometheus-exporter"] } [dev-dependencies] env_logger = "0.9.0" diff --git a/common/src/indexer_service/http/config.rs b/common/src/indexer_service/http/config.rs index aa85a072..32f94137 100644 --- a/common/src/indexer_service/http/config.rs +++ b/common/src/indexer_service/http/config.rs @@ -26,6 +26,7 @@ pub struct EscrowSubgraphConfig { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct ServerConfig { pub host_and_port: SocketAddr, + pub metrics_host_and_port: SocketAddr, pub url_prefix: String, pub free_query_auth_token: Option, } diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 55341158..e60aa792 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -1,8 +1,11 @@ -use std::{collections::HashMap, fmt::Debug, path::PathBuf, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, fmt::Debug, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration, +}; use alloy_primitives::Address; use alloy_sol_types::eip712_domain; use anyhow; +use autometrics::prometheus_exporter; use axum::{ async_trait, body::Body, @@ -12,6 +15,7 @@ use axum::{ }; use build_info::BuildInfo; use eventuals::Eventual; +use log::info; use reqwest::StatusCode; use serde::{de::DeserializeOwned, Serialize}; use sqlx::postgres::PgPoolOptions; @@ -264,8 +268,31 @@ impl IndexerService { .merge(options.extra_routes) .with_state(state); + Self::serve_metrics(options.config.server.metrics_host_and_port); + + info!( + "Serving requests at {}", + options.config.server.host_and_port + ); + Ok(Server::bind(&options.config.server.host_and_port) .serve(router.into_make_service()) .await?) } + + fn serve_metrics(host_and_port: SocketAddr) { + info!("Serving prometheus metrics at {host_and_port}"); + + tokio::spawn(async move { + let router = Router::new().route( + "/metrics", + get(|| async { prometheus_exporter::encode_http_response() }), + ); + + Server::bind(&host_and_port) + .serve(router.into_make_service()) + .await + .expect("Failed to serve metrics") + }); + } } From cc49ee63ebb9778d2c55e7ec142a3b2e7e06e2ac Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Thu, 26 Oct 2023 14:50:15 +0200 Subject: [PATCH 07/18] feat: use tracing crate for logs in http indexer service --- Cargo.lock | 2 +- common/Cargo.toml | 1 + common/src/indexer_service/http/indexer_service.rs | 9 +++++---- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 129727f7..f60a12e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -939,7 +939,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b" dependencies = [ "borsh-derive", - "hashbrown 0.12.3", + "hashbrown 0.13.1", ] [[package]] diff --git a/common/Cargo.toml b/common/Cargo.toml index eb95aadc..021b7ef6 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -41,6 +41,7 @@ headers-derive = "0.1.1" headers = "0.3.9" build-info = "0.0.34" autometrics = { version = "0.6.0", features = ["prometheus-exporter"] } +tracing = "0.1.40" [dev-dependencies] env_logger = "0.9.0" diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index e60aa792..3729159f 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -15,12 +15,12 @@ use axum::{ }; use build_info::BuildInfo; use eventuals::Eventual; -use log::info; use reqwest::StatusCode; use serde::{de::DeserializeOwned, Serialize}; use sqlx::postgres::PgPoolOptions; use thegraph::types::DeploymentId; use thiserror::Error; +use tracing::info; use crate::{ prelude::{ @@ -271,17 +271,18 @@ impl IndexerService { Self::serve_metrics(options.config.server.metrics_host_and_port); info!( - "Serving requests at {}", - options.config.server.host_and_port + address = %options.config.server.host_and_port, + "Serving requests", ); Ok(Server::bind(&options.config.server.host_and_port) .serve(router.into_make_service()) + .with_graceful_shutdown(shutdown_signal()) .await?) } fn serve_metrics(host_and_port: SocketAddr) { - info!("Serving prometheus metrics at {host_and_port}"); + info!(address = %host_and_port, "Serving prometheus metrics"); tokio::spawn(async move { let router = Router::new().route( From 5e6438ad0321df7f504555483d94c6db24bede9b Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Thu, 26 Oct 2023 14:50:44 +0200 Subject: [PATCH 08/18] feat: add graceful shutdown to http indexer service --- .../indexer_service/http/indexer_service.rs | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 3729159f..c2400b03 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -20,6 +20,7 @@ use serde::{de::DeserializeOwned, Serialize}; use sqlx::postgres::PgPoolOptions; use thegraph::types::DeploymentId; use thiserror::Error; +use tokio::signal; use tracing::info; use crate::{ @@ -297,3 +298,25 @@ impl IndexerService { }); } } + +pub async fn shutdown_signal() { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("Failed to install Ctrl+C handler"); + }; + + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("Failed to install signal handler") + .recv() + .await; + }; + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } + + info!("Signal received, starting graceful shutdown"); +} From ea9343cbf4dc0aff4adeb1eebd36d2956b9bded8 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Thu, 26 Oct 2023 16:06:47 +0200 Subject: [PATCH 09/18] feat: add rate limiting to http indexer service This uses tower-governor instead of tower::limit::RateLimitLayer, as it is a little more powerful. --- Cargo.lock | 68 +++++++++++++++++++ common/Cargo.toml | 3 +- .../indexer_service/http/indexer_service.rs | 36 +++++++++- 3 files changed, 103 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f60a12e8..3b717723 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2394,6 +2394,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "forwarded-header-value" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8835f84f38484cc86f110a805655697908257fb9a7af005234060891557198e9" +dependencies = [ + "nonempty", + "thiserror", +] + [[package]] name = "fs2" version = "0.4.3" @@ -2637,6 +2647,24 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "governor" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "821239e5672ff23e2a7060901fa622950bbd80b649cdaadd78d1c1767ed14eb4" +dependencies = [ + "cfg-if", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "quanta 0.11.1", + "rand 0.8.5", + "smallvec", +] + [[package]] name = "graphql-http" version = "0.1.1" @@ -3078,6 +3106,8 @@ dependencies = [ "thegraph", "thiserror", "tokio", + "tower", + "tower_governor", "tracing", "wiremock", ] @@ -3815,6 +3845,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -3825,6 +3861,18 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonempty" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" + +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -6634,6 +6682,26 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +[[package]] +name = "tower_governor" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db81d9313372d714152194f3f2b66badda23a783fb6a97462e35f632814f4cff" +dependencies = [ + "axum 0.6.20", + "forwarded-header-value", + "futures", + "futures-core", + "governor", + "http", + "pin-project", + "thiserror", + "tokio", + "tower", + "tower-layer", + "tracing", +] + [[package]] name = "tracing" version = "0.1.40" diff --git a/common/Cargo.toml b/common/Cargo.toml index 021b7ef6..d9d6daee 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -31,7 +31,6 @@ sqlx = { version = "0.7.1", features = [ ] } tokio = { version = "1.32.0", features = ["full", "macros", "rt"] } thegraph = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } -tracing = "0.1.34" graphql-http = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } tap_core = "0.7.0" axum = { version = "0.6.20", default_features = true, features = ["headers"] } @@ -42,6 +41,8 @@ headers = "0.3.9" build-info = "0.0.34" autometrics = { version = "0.6.0", features = ["prometheus-exporter"] } tracing = "0.1.40" +tower = "0.4.13" +tower_governor = "0.1.0" [dev-dependencies] env_logger = "0.9.0" diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index c2400b03..7c9b4433 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -9,9 +9,10 @@ use autometrics::prometheus_exporter; use axum::{ async_trait, body::Body, + error_handling::HandleErrorLayer, response::{IntoResponse, Response}, routing::{get, post}, - Json, Router, Server, + BoxError, Json, Router, Server, }; use build_info::BuildInfo; use eventuals::Eventual; @@ -21,6 +22,8 @@ use sqlx::postgres::PgPoolOptions; use thegraph::types::DeploymentId; use thiserror::Error; use tokio::signal; +use tower::ServiceBuilder; +use tower_governor::{errors::display_error, governor::GovernorConfigBuilder, GovernorLayer}; use tracing::info; use crate::{ @@ -256,9 +259,32 @@ impl IndexerService { service_impl: Arc::new(options.service_impl), }); - let router = Router::new() + // Rate limits by allowing bursts of 10 requests and requiring 100ms of + // time between consecutive requests after that, effectively rate + // limiting to 10 req/s. + let rate_limiter = GovernorLayer { + config: Box::leak(Box::new( + GovernorConfigBuilder::default() + .per_millisecond(100) + .burst_size(10) + .finish() + .expect("Failed to set up rate limiting"), + )), + }; + + let misc_routes = Router::new() .route("/", get("Service is up and running")) .route("/version", get(Json(options.release))) + .layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(|e: BoxError| async move { + display_error(e) + })) + .layer(rate_limiter), + ) + .with_state(state.clone()); + + let data_routes = Router::new() .route( PathBuf::from(options.config.server.url_prefix) .join("manifests/:id") @@ -266,6 +292,10 @@ impl IndexerService { .expect("Failed to set up `/manifest/:id` route"), post(request_handler::), ) + .with_state(state.clone()); + + let router = misc_routes + .merge(data_routes) .merge(options.extra_routes) .with_state(state); @@ -277,7 +307,7 @@ impl IndexerService { ); Ok(Server::bind(&options.config.server.host_and_port) - .serve(router.into_make_service()) + .serve(router.into_make_service_with_connect_info::()) .with_graceful_shutdown(shutdown_signal()) .await?) } From 4203e6aca980aa02c0969c21a81b448cef595127 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Thu, 26 Oct 2023 18:47:39 +0200 Subject: [PATCH 10/18] feat: make http indexer service URL namespace configurable --- common/src/indexer_service/http/indexer_service.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 7c9b4433..07967c38 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -151,6 +151,7 @@ where pub service_impl: I, pub config: IndexerServiceConfig, pub release: IndexerServiceRelease, + pub url_namespace: &'static str, pub extra_routes: Router>, Body>, } @@ -287,7 +288,7 @@ impl IndexerService { let data_routes = Router::new() .route( PathBuf::from(options.config.server.url_prefix) - .join("manifests/:id") + .join(format!("{}/:id", options.url_namespace)) .to_str() .expect("Failed to set up `/manifest/:id` route"), post(request_handler::), From b698a2e7afe3735b638045aa3177076adf3e908b Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Mon, 30 Oct 2023 20:16:24 +0100 Subject: [PATCH 11/18] feat: add request handling metrics --- .../indexer_service/http/indexer_service.rs | 6 ++++ common/src/indexer_service/http/metrics.rs | 34 +++++++++++++++++++ common/src/indexer_service/http/mod.rs | 1 + .../indexer_service/http/request_handler.rs | 7 ++++ 4 files changed, 48 insertions(+) create mode 100644 common/src/indexer_service/http/metrics.rs diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 07967c38..48ac930e 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -27,6 +27,7 @@ use tower_governor::{errors::display_error, governor::GovernorConfigBuilder, Gov use tracing::info; use crate::{ + indexer_service::http::metrics::IndexerServiceMetrics, prelude::{ attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, AttestationSigner, DeploymentDetails, SubgraphClient, @@ -152,6 +153,7 @@ where pub config: IndexerServiceConfig, pub release: IndexerServiceRelease, pub url_namespace: &'static str, + pub metrics_prefix: &'static str, pub extra_routes: Router>, Body>, } @@ -163,6 +165,7 @@ where pub attestation_signers: Eventual>, pub tap_manager: TapManager, pub service_impl: Arc, + pub metrics: IndexerServiceMetrics, } pub struct IndexerService {} @@ -172,6 +175,8 @@ impl IndexerService { where I: IndexerServiceImpl + Sync + Send + 'static, { + let metrics = IndexerServiceMetrics::new(options.metrics_prefix); + let network_subgraph = Box::leak(Box::new(SubgraphClient::new( options .config @@ -258,6 +263,7 @@ impl IndexerService { attestation_signers, tap_manager, service_impl: Arc::new(options.service_impl), + metrics, }); // Rate limits by allowing bursts of 10 requests and requiring 100ms of diff --git a/common/src/indexer_service/http/metrics.rs b/common/src/indexer_service/http/metrics.rs new file mode 100644 index 00000000..6fe2057c --- /dev/null +++ b/common/src/indexer_service/http/metrics.rs @@ -0,0 +1,34 @@ +use prometheus::{register_int_counter_vec, IntCounterVec}; + +pub struct IndexerServiceMetrics { + pub requests: IntCounterVec, + pub successful_requests: IntCounterVec, + pub failed_requests: IntCounterVec, +} + +impl IndexerServiceMetrics { + pub fn new(prefix: &str) -> Self { + IndexerServiceMetrics { + requests: register_int_counter_vec!( + format!("{prefix}_service_requests_total"), + "Incoming requests", + &["manifest"] + ) + .unwrap(), + + successful_requests: register_int_counter_vec!( + format!("{prefix}_service_requests_ok"), + "Successfully executed requests", + &["manifest"] + ) + .unwrap(), + + failed_requests: register_int_counter_vec!( + format!("{prefix}_service_requests_failed"), + "requests that failed to execute", + &["manifest"] + ) + .unwrap(), + } + } +} diff --git a/common/src/indexer_service/http/mod.rs b/common/src/indexer_service/http/mod.rs index bf7b580f..6e7916ef 100644 --- a/common/src/indexer_service/http/mod.rs +++ b/common/src/indexer_service/http/mod.rs @@ -1,5 +1,6 @@ mod config; mod indexer_service; +mod metrics; mod request_handler; mod scalar_receipt_header; diff --git a/common/src/indexer_service/http/request_handler.rs b/common/src/indexer_service/http/request_handler.rs index 4fad5576..3989f184 100644 --- a/common/src/indexer_service/http/request_handler.rs +++ b/common/src/indexer_service/http/request_handler.rs @@ -20,6 +20,7 @@ use super::{ IndexerServiceImpl, }; +#[autometrics::autometrics] pub async fn request_handler( Path(manifest_id): Path, TypedHeader(receipt): TypedHeader, @@ -32,6 +33,12 @@ where { info!("Handling request for deployment `{manifest_id}`"); + state + .metrics + .requests + .with_label_values(&[&manifest_id.to_string()]) + .inc(); + let request = serde_json::from_slice(&body).map_err(|e| IndexerServiceError::InvalidRequest(e.into()))?; From ef7367ca4aa0574fb3eadb01ee94c5f08e538caa Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Mon, 13 Nov 2023 16:53:08 +0100 Subject: [PATCH 12/18] fix: update to latest SubgraphClient API --- common/src/indexer_service/http/config.rs | 1 + .../indexer_service/http/indexer_service.rs | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/common/src/indexer_service/http/config.rs b/common/src/indexer_service/http/config.rs index 32f94137..3f201796 100644 --- a/common/src/indexer_service/http/config.rs +++ b/common/src/indexer_service/http/config.rs @@ -44,6 +44,7 @@ pub struct IndexerServiceConfig { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct GraphNodeConfig { + pub status_url: String, pub query_base_url: String, } diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 48ac930e..41b32fe9 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -178,17 +178,22 @@ impl IndexerService { let metrics = IndexerServiceMetrics::new(options.metrics_prefix); let network_subgraph = Box::leak(Box::new(SubgraphClient::new( + reqwest::Client::new(), options .config .graph_node .as_ref() .zip(options.config.network_subgraph.deployment) .map(|(graph_node, deployment)| { - DeploymentDetails::for_graph_node(&graph_node.query_base_url, deployment) + DeploymentDetails::for_graph_node( + &graph_node.status_url, + &graph_node.query_base_url, + deployment, + ) }) .transpose()?, DeploymentDetails::for_query_url(&options.config.network_subgraph.query_url)?, - )?)); + ))); // Identify the dispute manager for the configured network let dispute_manager = dispute_manager( @@ -215,17 +220,22 @@ impl IndexerService { ); let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( + reqwest::Client::new(), options .config .graph_node .as_ref() .zip(options.config.escrow_subgraph.deployment) .map(|(graph_node, deployment)| { - DeploymentDetails::for_graph_node(&graph_node.query_base_url, deployment) + DeploymentDetails::for_graph_node( + &graph_node.status_url, + &graph_node.query_base_url, + deployment, + ) }) .transpose()?, DeploymentDetails::for_query_url(&options.config.escrow_subgraph.query_url)?, - )?)); + ))); let escrow_accounts = escrow_accounts( escrow_subgraph, From 458fdf38a3cf160e382d8a3a128a565f36c5a7d4 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Mon, 13 Nov 2023 16:53:27 +0100 Subject: [PATCH 13/18] fix: use tracing crate for request handler logs --- common/src/indexer_service/http/request_handler.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/common/src/indexer_service/http/request_handler.rs b/common/src/indexer_service/http/request_handler.rs index 3989f184..3a2bd193 100644 --- a/common/src/indexer_service/http/request_handler.rs +++ b/common/src/indexer_service/http/request_handler.rs @@ -7,7 +7,6 @@ use axum::{ response::IntoResponse, TypedHeader, }; -use log::info; use reqwest::StatusCode; use thegraph::types::DeploymentId; use tracing::info; From a543ef63af0ac56aafa981926770fee6da45d67c Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Mon, 13 Nov 2023 17:11:32 +0100 Subject: [PATCH 14/18] fix: add missing license headers --- common/src/indexer_service/http/config.rs | 3 +++ common/src/indexer_service/http/indexer_service.rs | 3 +++ common/src/indexer_service/http/metrics.rs | 3 +++ common/src/indexer_service/http/mod.rs | 3 +++ common/src/indexer_service/http/request_handler.rs | 3 +++ common/src/indexer_service/http/scalar_receipt_header.rs | 3 +++ common/src/indexer_service/mod.rs | 3 +++ service/build.rs | 3 +++ 8 files changed, 24 insertions(+) diff --git a/common/src/indexer_service/http/config.rs b/common/src/indexer_service/http/config.rs index 3f201796..6142e75b 100644 --- a/common/src/indexer_service/http/config.rs +++ b/common/src/indexer_service/http/config.rs @@ -1,3 +1,6 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + use std::net::SocketAddr; use alloy_primitives::Address; diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 41b32fe9..6776332e 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -1,3 +1,6 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + use std::{ collections::HashMap, fmt::Debug, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration, }; diff --git a/common/src/indexer_service/http/metrics.rs b/common/src/indexer_service/http/metrics.rs index 6fe2057c..c5533a82 100644 --- a/common/src/indexer_service/http/metrics.rs +++ b/common/src/indexer_service/http/metrics.rs @@ -1,3 +1,6 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + use prometheus::{register_int_counter_vec, IntCounterVec}; pub struct IndexerServiceMetrics { diff --git a/common/src/indexer_service/http/mod.rs b/common/src/indexer_service/http/mod.rs index 6e7916ef..c200c10f 100644 --- a/common/src/indexer_service/http/mod.rs +++ b/common/src/indexer_service/http/mod.rs @@ -1,3 +1,6 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + mod config; mod indexer_service; mod metrics; diff --git a/common/src/indexer_service/http/request_handler.rs b/common/src/indexer_service/http/request_handler.rs index 3a2bd193..14eb7a3b 100644 --- a/common/src/indexer_service/http/request_handler.rs +++ b/common/src/indexer_service/http/request_handler.rs @@ -1,3 +1,6 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + use std::sync::Arc; use axum::{ diff --git a/common/src/indexer_service/http/scalar_receipt_header.rs b/common/src/indexer_service/http/scalar_receipt_header.rs index c8398eef..e8372d61 100644 --- a/common/src/indexer_service/http/scalar_receipt_header.rs +++ b/common/src/indexer_service/http/scalar_receipt_header.rs @@ -1,3 +1,6 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + use std::ops::Deref; use headers::{Header, HeaderName, HeaderValue}; diff --git a/common/src/indexer_service/mod.rs b/common/src/indexer_service/mod.rs index 3883215f..a47a5388 100644 --- a/common/src/indexer_service/mod.rs +++ b/common/src/indexer_service/mod.rs @@ -1 +1,4 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + pub mod http; diff --git a/service/build.rs b/service/build.rs index 0b2af0d0..4334201c 100644 --- a/service/build.rs +++ b/service/build.rs @@ -1,3 +1,6 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + use build_info_build::DependencyDepth; fn main() { From 17d86c9602428cf21e175ea744cb0df99042ca1d Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Mon, 13 Nov 2023 17:49:27 +0100 Subject: [PATCH 15/18] refactor: move receipt related fixtures into test_vectors --- common/src/tap_manager.rs | 62 +++++--------------------------------- common/src/test_vectors.rs | 46 ++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 54 deletions(-) diff --git a/common/src/tap_manager.rs b/common/src/tap_manager.rs index 50bc8849..24fb6426 100644 --- a/common/src/tap_manager.rs +++ b/common/src/tap_manager.rs @@ -113,62 +113,13 @@ mod test { use crate::prelude::{AllocationStatus, SubgraphDeployment}; use alloy_primitives::Address; - use alloy_sol_types::{eip712_domain, Eip712Domain}; - use ethers::signers::{coins_bip39::English, LocalWallet, MnemonicBuilder, Signer}; use keccak_hash::H256; use sqlx::postgres::PgListener; - use tap_core::tap_manager::SignedReceipt; - use tap_core::{eip_712_signed_message::EIP712SignedMessage, tap_receipt::Receipt}; - - use crate::test_vectors; + use crate::test_vectors::{self, create_signed_receipt, TAP_SENDER}; use super::*; - /// Fixture to generate a wallet and address - pub fn keys() -> (LocalWallet, Address) { - let wallet: LocalWallet = MnemonicBuilder::::default() - .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") - .build() - .unwrap(); - let address = wallet.address(); - - (wallet, Address::from_slice(address.as_bytes())) - } - - pub fn domain() -> Eip712Domain { - eip712_domain! { - name: "TAP", - version: "1", - chain_id: 1, - verifying_contract: Address::from([0x11u8; 20]), - } - } - - /// Fixture to generate a signed receipt using the wallet from `keys()` - /// and the given `query_id` and `value` - pub async fn create_signed_receipt( - allocation_id: Address, - nonce: u64, - timestamp_ns: u64, - value: u128, - ) -> SignedReceipt { - let (wallet, _) = keys(); - - EIP712SignedMessage::new( - &domain(), - Receipt { - allocation_id, - nonce, - timestamp_ns, - value, - }, - &wallet, - ) - .await - .unwrap() - } - #[sqlx::test(migrations = "../migrations")] async fn test_verify_and_store_receipt(pgpool: PgPool) { // Listen to pg_notify events @@ -180,7 +131,6 @@ mod test { let allocation_id = Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(); - let domain = domain(); let signed_receipt = create_signed_receipt(allocation_id, u64::MAX, u64::MAX, u128::MAX).await; @@ -209,10 +159,14 @@ mod test { // Mock escrow accounts let escrow_accounts = - Eventual::from_value(HashMap::from_iter(vec![(keys().1, U256::from(123))])); + Eventual::from_value(HashMap::from_iter(vec![(TAP_SENDER.1, U256::from(123))])); - let tap_manager = - TapManager::new(pgpool.clone(), indexer_allocations, escrow_accounts, domain); + let tap_manager = TapManager::new( + pgpool.clone(), + indexer_allocations, + escrow_accounts, + test_vectors::TAP_EIP712_DOMAIN.to_owned(), + ); tap_manager .verify_and_store_receipt(signed_receipt.clone()) diff --git a/common/src/test_vectors.rs b/common/src/test_vectors.rs index 019b16b7..c6b7eaaf 100644 --- a/common/src/test_vectors.rs +++ b/common/src/test_vectors.rs @@ -4,8 +4,13 @@ use std::{collections::HashMap, str::FromStr}; use alloy_primitives::Address; +use alloy_sol_types::{eip712_domain, Eip712Domain}; +use ethers::signers::{coins_bip39::English, LocalWallet, MnemonicBuilder, Signer}; use ethers_core::types::U256; use lazy_static::lazy_static; +use tap_core::{ + eip_712_signed_message::EIP712SignedMessage, tap_manager::SignedReceipt, tap_receipt::Receipt, +}; use thegraph::types::DeploymentId; use crate::prelude::{Allocation, AllocationStatus, SubgraphDeployment}; @@ -239,4 +244,45 @@ lazy_static! { (Address::from_str("0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1").unwrap(), U256::from(24)), (Address::from_str("0x22d491bde2303f2f43325b2108d26f1eaba1e32b").unwrap(), U256::from(42)), ]); + + /// Fixture to generate a wallet and address + pub static ref TAP_SENDER: (LocalWallet, Address) = { + let wallet: LocalWallet = MnemonicBuilder::::default() + .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") + .build() + .unwrap(); + let address = wallet.address(); + + (wallet, Address::from_slice(address.as_bytes())) + }; + + pub static ref TAP_EIP712_DOMAIN: Eip712Domain = eip712_domain! { + name: "TAP", + version: "1", + chain_id: 1, + verifying_contract: Address::from([0x11u8; 20]), + }; +} + +/// Function to generate a signed receipt using the TAP_SENDER wallet. +pub async fn create_signed_receipt( + allocation_id: Address, + nonce: u64, + timestamp_ns: u64, + value: u128, +) -> SignedReceipt { + let (wallet, _) = &*self::TAP_SENDER; + + EIP712SignedMessage::new( + &self::TAP_EIP712_DOMAIN, + Receipt { + allocation_id, + nonce, + timestamp_ns, + value, + }, + wallet, + ) + .await + .unwrap() } From 9e8364bf7729f62864bf0504a757ebc8f6dedd36 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Mon, 13 Nov 2023 17:49:45 +0100 Subject: [PATCH 16/18] feat: add tests for parsing scalar receipt headers --- .../http/scalar_receipt_header.rs | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/common/src/indexer_service/http/scalar_receipt_header.rs b/common/src/indexer_service/http/scalar_receipt_header.rs index e8372d61..188389b6 100644 --- a/common/src/indexer_service/http/scalar_receipt_header.rs +++ b/common/src/indexer_service/http/scalar_receipt_header.rs @@ -7,6 +7,7 @@ use headers::{Header, HeaderName, HeaderValue}; use lazy_static::lazy_static; use tap_core::tap_manager::SignedReceipt; +#[derive(Debug, PartialEq)] pub struct ScalarReceipt(Option); impl ScalarReceipt { @@ -55,3 +56,50 @@ impl Header for ScalarReceipt { unimplemented!() } } + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use alloy_primitives::Address; + use axum::{headers::Header, http::HeaderValue}; + + use crate::test_vectors::create_signed_receipt; + + use super::ScalarReceipt; + + #[tokio::test] + async fn test_decode_valid_scalar_receipt_header() { + let allocation = Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(); + let original_receipt = + create_signed_receipt(allocation, u64::MAX, u64::MAX, u128::MAX).await; + let serialized_receipt = serde_json::to_string(&original_receipt).unwrap(); + let header_value = HeaderValue::from_str(&serialized_receipt).unwrap(); + let header_values = vec![&header_value]; + let decoded_receipt = ScalarReceipt::decode(&mut header_values.into_iter()) + .expect("scalar receipt header value should be valid"); + + assert_eq!( + decoded_receipt, + ScalarReceipt(Some(original_receipt.clone())) + ); + } + + #[test] + fn test_decode_non_string_scalar_receipt_header() { + let header_value = HeaderValue::from_static("123"); + let header_values = vec![&header_value]; + let result = ScalarReceipt::decode(&mut header_values.into_iter()); + + assert!(result.is_err()); + } + + #[test] + fn test_decode_invalid_scalar_receipt_header() { + let header_value = HeaderValue::from_bytes(b"invalid").unwrap(); + let header_values = vec![&header_value]; + let result = ScalarReceipt::decode(&mut header_values.into_iter()); + + assert!(result.is_err()); + } +} From 7316827ed595bc056295ff47b242f2a4c0e54d7a Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Fri, 1 Dec 2023 11:50:04 +0100 Subject: [PATCH 17/18] feat: consolidate subgraph config structs --- common/src/indexer_service/http/config.rs | 13 +++---------- common/src/indexer_service/http/mod.rs | 4 ++-- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/common/src/indexer_service/http/config.rs b/common/src/indexer_service/http/config.rs index 6142e75b..b68198de 100644 --- a/common/src/indexer_service/http/config.rs +++ b/common/src/indexer_service/http/config.rs @@ -13,14 +13,7 @@ pub struct DatabaseConfig { } #[derive(Clone, Debug, Deserialize, Serialize)] -pub struct NetworkSubgraphConfig { - pub deployment: Option, - pub query_url: String, - pub syncing_interval: u64, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct EscrowSubgraphConfig { +pub struct SubgraphConfig { pub deployment: Option, pub query_url: String, pub syncing_interval: u64, @@ -40,8 +33,8 @@ pub struct IndexerServiceConfig { pub server: ServerConfig, pub database: DatabaseConfig, pub graph_node: Option, - pub network_subgraph: NetworkSubgraphConfig, - pub escrow_subgraph: EscrowSubgraphConfig, + pub network_subgraph: SubgraphConfig, + pub escrow_subgraph: SubgraphConfig, pub graph_network: GraphNetworkConfig, } diff --git a/common/src/indexer_service/http/mod.rs b/common/src/indexer_service/http/mod.rs index c200c10f..65fc1567 100644 --- a/common/src/indexer_service/http/mod.rs +++ b/common/src/indexer_service/http/mod.rs @@ -8,8 +8,8 @@ mod request_handler; mod scalar_receipt_header; pub use config::{ - DatabaseConfig, EscrowSubgraphConfig, GraphNetworkConfig, IndexerConfig, IndexerServiceConfig, - NetworkSubgraphConfig, ServerConfig, + DatabaseConfig, GraphNetworkConfig, IndexerConfig, IndexerServiceConfig, ServerConfig, + SubgraphConfig, }; pub use indexer_service::{ IndexerService, IndexerServiceImpl, IndexerServiceOptions, IndexerServiceRelease, IsAttestable, From 760df3300b00542fd8e78fe386b6e99948d58f47 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Fri, 1 Dec 2023 14:57:49 +0100 Subject: [PATCH 18/18] feat: add scalar options to indexer service base config --- common/src/indexer_service/http/config.rs | 7 +++++++ common/src/indexer_service/http/indexer_service.rs | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/common/src/indexer_service/http/config.rs b/common/src/indexer_service/http/config.rs index b68198de..e4a7d3ac 100644 --- a/common/src/indexer_service/http/config.rs +++ b/common/src/indexer_service/http/config.rs @@ -36,6 +36,7 @@ pub struct IndexerServiceConfig { pub network_subgraph: SubgraphConfig, pub escrow_subgraph: SubgraphConfig, pub graph_network: GraphNetworkConfig, + pub scalar: ScalarConfig, } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -54,3 +55,9 @@ pub struct IndexerConfig { pub indexer_address: Address, pub operator_mnemonic: String, } + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ScalarConfig { + pub chain_id: u64, + pub receipts_verifier_address: Address, +} diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 6776332e..63befe19 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -263,11 +263,11 @@ impl IndexerService { database, allocations, escrow_accounts, - // TODO: arguments for eip712_domain should be a config eip712_domain! { name: "TapManager", version: "1", - verifying_contract: options.config.indexer.indexer_address, + chain_id: options.config.scalar.chain_id, + verifying_contract: options.config.scalar.receipts_verifier_address, }, );