From aa7d0813beeda30d3f99cddca61660b7c423d679 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Mon, 11 Sep 2023 14:45:08 -0500 Subject: [PATCH] refactor: reduce duplicated code and clean --- service/src/common/address.rs | 9 +- service/src/common/network_subgraph.rs | 8 +- service/src/common/types.rs | 2 +- service/src/config.rs | 2 +- service/src/graph_node.rs | 9 -- service/src/main.rs | 84 ++------------- service/src/server/mod.rs | 135 ++++++++++++++++--------- service/src/util.rs | 16 +-- 8 files changed, 107 insertions(+), 158 deletions(-) diff --git a/service/src/common/address.rs b/service/src/common/address.rs index 68012a5c..4e2232c7 100644 --- a/service/src/common/address.rs +++ b/service/src/common/address.rs @@ -1,7 +1,9 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use ethers::signers::{coins_bip39::English, LocalWallet, MnemonicBuilder, Wallet, WalletError}; +use ethers::signers::{ + coins_bip39::English, LocalWallet, MnemonicBuilder, Signer, Wallet, WalletError, +}; use ethers_core::{k256::ecdsa::SigningKey, utils::hex}; use sha3::{Digest, Keccak256}; @@ -21,3 +23,8 @@ pub fn build_wallet(value: &str) -> Result, WalletError> { .parse::() .or(MnemonicBuilder::::default().phrase(value).build()) } + +/// Get wallet public address to String +pub fn wallet_address(wallet: &Wallet) -> String { + format!("{:?}", wallet.address()) +} diff --git a/service/src/common/network_subgraph.rs b/service/src/common/network_subgraph.rs index 8715d4e8..7e7c3121 100644 --- a/service/src/common/network_subgraph.rs +++ b/service/src/common/network_subgraph.rs @@ -7,6 +7,7 @@ use reqwest::{header, Client, Url}; use serde::{Deserialize, Serialize}; use serde_json::Value; +use crate::common::types::GraphQLQuery; use crate::query_processor::{QueryError, UnattestedQueryResult}; #[derive(Debug, Serialize, Deserialize, PartialEq)] @@ -24,13 +25,6 @@ pub struct NetworkSubgraph { network_subgraph_url: Arc, } -#[derive(Debug, Clone, Serialize, Deserialize)] -struct GraphQLQuery { - pub query: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub variables: Option, -} - impl NetworkSubgraph { pub fn new( graph_node_query_endpoint: Option<&str>, diff --git a/service/src/common/types.rs b/service/src/common/types.rs index fe80ed83..0e06c767 100644 --- a/service/src/common/types.rs +++ b/service/src/common/types.rs @@ -6,7 +6,7 @@ use serde_json::Value; /// GraphQLQuery request to a reqwest client #[derive(Debug, Clone, Serialize, Deserialize)] -struct GraphQLQuery { +pub struct GraphQLQuery { pub query: String, #[serde(skip_serializing_if = "Option::is_none")] pub variables: Option, diff --git a/service/src/config.rs b/service/src/config.rs index 0c1dde03..bdcaca7d 100644 --- a/service/src/config.rs +++ b/service/src/config.rs @@ -14,7 +14,7 @@ use crate::{query_processor::QueryError, util::init_tracing}; about = "Indexer service on top of graph node", author = "hopeyen" )] -#[command(author, version, about, long_about = None)] +#[command(author, version, about, long_about = None, arg_required_else_help = true)] pub struct Cli { #[command(flatten)] pub ethereum: Ethereum, diff --git a/service/src/graph_node.rs b/service/src/graph_node.rs index 57be0d36..fe2b3de4 100644 --- a/service/src/graph_node.rs +++ b/service/src/graph_node.rs @@ -5,8 +5,6 @@ use std::sync::Arc; use anyhow::anyhow; use reqwest::{header, Client, Url}; -use serde::{Deserialize, Serialize}; -use serde_json::Value; use crate::query_processor::{QueryError, UnattestedQueryResult}; @@ -19,13 +17,6 @@ pub struct GraphNodeInstance { subgraphs_base_url: Arc, } -#[derive(Debug, Clone, Serialize, Deserialize)] -struct GraphQLQuery { - pub query: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub variables: Option, -} - impl GraphNodeInstance { pub fn new(endpoint: &str) -> GraphNodeInstance { let subgraphs_base_url = Url::parse(endpoint) diff --git a/service/src/main.rs b/service/src/main.rs index 8c1e154c..a71c3a32 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -2,24 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use async_graphql::{EmptyMutation, EmptySubscription, Schema}; -use axum::{ - error_handling::HandleErrorLayer, - handler::Handler, - http::{Method, StatusCode}, - routing::get, -}; -use axum::{routing::post, Extension, Router, Server}; +use axum::Server; use dotenvy::dotenv; use ethereum_types::{Address, U256}; -use std::{net::SocketAddr, str::FromStr, time::Duration}; -use tower::{BoxError, ServiceBuilder}; -use tower_http::{ - add_extension::AddExtensionLayer, - cors::CorsLayer, - trace::{self, TraceLayer}, -}; -use tracing::{info, Level}; +use std::{net::SocketAddr, str::FromStr}; + +use tracing::info; use util::{package_version, shutdown_signal}; @@ -32,11 +21,11 @@ use crate::{ config::Cli, metrics::handle_serve_metrics, query_processor::QueryProcessor, - server::routes::{network_ratelimiter, slow_ratelimiter}, + server::create_server, util::public_key, }; -use server::{routes, ServerOptions}; +use server::ServerOptions; mod allocation_monitor; mod attestation_signers; @@ -138,68 +127,11 @@ async fn main() -> Result<(), std::io::Error> { config.network_subgraph.serve_network_subgraph, ); - //defineCostModelModels + // defineCostModelModels let schema = Schema::build(QueryRoot, EmptyMutation, EmptySubscription).finish(); info!("Initialized server options"); - let app = Router::new() - .route("/", get(routes::basic::index)) - .route("/health", get(routes::basic::health)) - .route("/version", get(routes::basic::version)) - .route( - "/status", - post(routes::status::status_queries) - .layer(AddExtensionLayer::new(network_ratelimiter())), - ) - .route( - "/subgraphs/health/:deployment", - get(routes::deployment::deployment_health - .layer(AddExtensionLayer::new(slow_ratelimiter()))), - ) - .route( - "/cost", - post(routes::cost::graphql_handler) - .get(routes::cost::graphql_handler) - .layer(AddExtensionLayer::new(slow_ratelimiter())), - ) - .nest( - "/operator", - routes::basic::create_operator_server(service_options.clone()) - .layer(AddExtensionLayer::new(slow_ratelimiter())), - ) - .route( - "/network", - post(routes::network::network_queries) - .layer(AddExtensionLayer::new(network_ratelimiter())), - ) - .route( - "/subgraphs/id/:id", - post(routes::subgraphs::subgraph_queries), - ) - .layer(Extension(schema)) - .layer(Extension(service_options.clone())) - .layer(CorsLayer::new().allow_methods([Method::GET, Method::POST])) - .layer( - // Handle error for timeout, ratelimit, or a general internal server error - ServiceBuilder::new() - .layer(HandleErrorLayer::new(|error: BoxError| async move { - if error.is::() { - Ok(StatusCode::REQUEST_TIMEOUT) - } else { - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Unhandled internal error: {}", error), - )) - } - })) - .layer( - TraceLayer::new_for_http() - .make_span_with(trace::DefaultMakeSpan::new().level(Level::DEBUG)) - .on_response(trace::DefaultOnResponse::new().level(Level::DEBUG)), - ) - .timeout(Duration::from_secs(10)) - .into_inner(), - ); + let app = create_server(service_options, schema).await; let addr = SocketAddr::from_str(&format!("0.0.0.0:{}", config.indexer_infrastructure.port)) .expect("Start server port"); diff --git a/service/src/server/mod.rs b/service/src/server/mod.rs index ce1960f1..762f4be3 100644 --- a/service/src/server/mod.rs +++ b/service/src/server/mod.rs @@ -1,9 +1,30 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use async_graphql::{EmptyMutation, EmptySubscription, Schema}; +use axum::{ + error_handling::HandleErrorLayer, + handler::Handler, + http::{Method, StatusCode}, + routing::get, +}; +use axum::{routing::post, Extension, Router}; + +use std::time::Duration; +use tower::{BoxError, ServiceBuilder}; +use tower_http::{ + add_extension::AddExtensionLayer, + cors::CorsLayer, + trace::{self, TraceLayer}, +}; +use tracing::Level; + use crate::{ - common::indexer_management_client::IndexerManagementClient, query_processor::QueryProcessor, - util::PackageVersion, NetworkSubgraph, + common::indexer_management_client::{IndexerManagementClient, QueryRoot}, + common::network_subgraph::NetworkSubgraph, + query_processor::QueryProcessor, + server::routes::{network_ratelimiter, slow_ratelimiter}, + util::PackageVersion, }; pub mod routes; @@ -53,50 +74,66 @@ impl ServerOptions { } } -// ADD: Endpoint for public status API, public cost API, information -// subgraph health checks - -// // Endpoint for the public status API -// #[get("/status")] -// async fn status( -// server: web::Data, -// // graph_node_status_endpoint: Data, -// query: web::Bytes, -// ) -> impl Responder { -// // Implementation for creating status server -// // Replace `createStatusServer` with your logic -// match response { -// Ok(result) => HttpResponse::Ok().json(result), -// Err(error) => HttpResponse::InternalServerError().json(error), -// } -// } - -// // Endpoint for subgraph health checks -// #[post("/subgraphs/health")] -// async fn subgraph_health( -// graph_node_status_endpoint: Data, -// ) -> impl Responder { -// // Implementation for creating deployment health server -// // Replace `createDeploymentHealthServer` with your logic -// let response = createDeploymentHealthServer(graph_node_status_endpoint.get_ref()).await; -// match response { -// Ok(result) => HttpResponse::Ok().json(result), -// Err(error) => HttpResponse::InternalServerError().json(error), -// } -// } - -// // Endpoint for the public cost API -// #[post("/cost")] -// async fn cost( -// indexer_management_client: Data, -// metrics: Data, -// payload: Json, -// ) -> impl Responder { -// // Implementation for creating cost server -// // Replace `createCostServer` with your logic -// let response = createCostServer(indexer_management_client.get_ref(), metrics.get_ref(), payload.into_inner()).await; -// match response { -// Ok(result) => HttpResponse::Ok().json(result), -// Err(error) => HttpResponse::InternalServerError().json(error), -// } -// } +pub async fn create_server( + options: ServerOptions, + schema: Schema, +) -> Router { + Router::new() + .route("/", get(routes::basic::index)) + .route("/health", get(routes::basic::health)) + .route("/version", get(routes::basic::version)) + .route( + "/status", + post(routes::status::status_queries) + .layer(AddExtensionLayer::new(network_ratelimiter())), + ) + .route( + "/subgraphs/health/:deployment", + get(routes::deployment::deployment_health + .layer(AddExtensionLayer::new(slow_ratelimiter()))), + ) + .route( + "/cost", + post(routes::cost::graphql_handler) + .get(routes::cost::graphql_handler) + .layer(AddExtensionLayer::new(slow_ratelimiter())), + ) + .nest( + "/operator", + routes::basic::create_operator_server(options.clone()) + .layer(AddExtensionLayer::new(slow_ratelimiter())), + ) + .route( + "/network", + post(routes::network::network_queries) + .layer(AddExtensionLayer::new(network_ratelimiter())), + ) + .route( + "/subgraphs/id/:id", + post(routes::subgraphs::subgraph_queries), + ) + .layer(Extension(schema)) + .layer(Extension(options.clone())) + .layer(CorsLayer::new().allow_methods([Method::GET, Method::POST])) + .layer( + // Handle error for timeout, ratelimit, or a general internal server error + ServiceBuilder::new() + .layer(HandleErrorLayer::new(|error: BoxError| async move { + if error.is::() { + Ok(StatusCode::REQUEST_TIMEOUT) + } else { + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Unhandled internal error: {}", error), + )) + } + })) + .layer( + TraceLayer::new_for_http() + .make_span_with(trace::DefaultMakeSpan::new().level(Level::DEBUG)) + .on_response(trace::DefaultOnResponse::new().level(Level::DEBUG)), + ) + .timeout(Duration::from_secs(10)) + .into_inner(), + ) +} diff --git a/service/src/util.rs b/service/src/util.rs index 07ff4006..b5459464 100644 --- a/service/src/util.rs +++ b/service/src/util.rs @@ -3,9 +3,7 @@ use ethereum_types::Address; use ethereum_types::U256; -use ethers::signers::{ - coins_bip39::English, LocalWallet, MnemonicBuilder, Signer, Wallet, WalletError, -}; +use ethers::signers::WalletError; use ethers_core::k256::ecdsa::SigningKey; use native::attestation::AttestationSigner; @@ -22,6 +20,7 @@ use tracing::{ }; use tracing_subscriber::{EnvFilter, FmtSubscriber}; +use crate::common::address::{build_wallet, wallet_address}; use crate::common::indexer_error::{indexer_error, IndexerError}; /// Struct for version control @@ -73,17 +72,6 @@ pub fn package_version() -> Result { }) } -pub fn build_wallet(value: &str) -> Result, WalletError> { - value - .parse::() - .or(MnemonicBuilder::::default().phrase(value).build()) -} - -/// Get wallet public address to String -pub fn wallet_address(wallet: &Wallet) -> String { - format!("{:?}", wallet.address()) -} - /// Validate that private key as an Eth wallet pub fn public_key(value: &str) -> Result { // The wallet can be stored instead of the original private key