Skip to content

Commit

Permalink
refactor: reduce duplicated code and clean
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Sep 12, 2023
1 parent 46a072a commit fb5e144
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 158 deletions.
9 changes: 8 additions & 1 deletion service/src/common/address.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use ethers::signers::{coins_bip39::English, LocalWallet, MnemonicBuilder, Wallet, WalletError};
use ethers::signers::{
coins_bip39::English, LocalWallet, MnemonicBuilder, Signer, Wallet, WalletError,
};
use ethers_core::{k256::ecdsa::SigningKey, utils::hex};
use sha3::{Digest, Keccak256};

Expand All @@ -21,3 +23,8 @@ pub fn build_wallet(value: &str) -> Result<Wallet<SigningKey>, WalletError> {
.parse::<LocalWallet>()
.or(MnemonicBuilder::<English>::default().phrase(value).build())
}

/// Get wallet public address to String
pub fn wallet_address(wallet: &Wallet<SigningKey>) -> String {
format!("{:?}", wallet.address())
}
8 changes: 1 addition & 7 deletions service/src/common/network_subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use reqwest::{header, Client, Url};
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::common::types::GraphQLQuery;
use crate::query_processor::{QueryError, UnattestedQueryResult};

#[derive(Debug, Serialize, Deserialize, PartialEq)]
Expand All @@ -24,13 +25,6 @@ pub struct NetworkSubgraph {
network_subgraph_url: Arc<Url>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct GraphQLQuery {
pub query: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub variables: Option<Value>,
}

impl NetworkSubgraph {
pub fn new(
graph_node_query_endpoint: Option<&str>,
Expand Down
2 changes: 1 addition & 1 deletion service/src/common/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde_json::Value;

/// GraphQLQuery request to a reqwest client
#[derive(Debug, Clone, Serialize, Deserialize)]
struct GraphQLQuery {
pub struct GraphQLQuery {
pub query: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub variables: Option<Value>,
Expand Down
2 changes: 1 addition & 1 deletion service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{query_processor::QueryError, util::init_tracing};
about = "Indexer service on top of graph node",
author = "hopeyen"
)]
#[command(author, version, about, long_about = None)]
#[command(author, version, about, long_about = None, arg_required_else_help = true)]
pub struct Cli {
#[command(flatten)]
pub ethereum: Ethereum,
Expand Down
9 changes: 0 additions & 9 deletions service/src/graph_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use std::sync::Arc;

use anyhow::anyhow;
use reqwest::{header, Client, Url};
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::query_processor::{QueryError, UnattestedQueryResult};

Expand All @@ -19,13 +17,6 @@ pub struct GraphNodeInstance {
subgraphs_base_url: Arc<Url>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct GraphQLQuery {
pub query: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub variables: Option<Value>,
}

impl GraphNodeInstance {
pub fn new(endpoint: &str) -> GraphNodeInstance {
let subgraphs_base_url = Url::parse(endpoint)
Expand Down
85 changes: 9 additions & 76 deletions service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,14 @@
// SPDX-License-Identifier: Apache-2.0

use async_graphql::{EmptyMutation, EmptySubscription, Schema};
use axum::{
error_handling::HandleErrorLayer,
handler::Handler,
http::{Method, StatusCode},
routing::get,
};
use axum::{routing::post, Extension, Router, Server};
use axum::handler::Handler;
use axum::Server;
use dotenvy::dotenv;
use ethereum_types::{Address, U256};

use std::{net::SocketAddr, str::FromStr, time::Duration};
use tower::{BoxError, ServiceBuilder};
use tower_http::{
add_extension::AddExtensionLayer,
cors::CorsLayer,
trace::{self, TraceLayer},
};
use tracing::{info, Level};
use std::{net::SocketAddr, str::FromStr};

use tracing::info;

use util::{package_version, shutdown_signal};

Expand All @@ -32,11 +22,11 @@ use crate::{
config::Cli,
metrics::handle_serve_metrics,
query_processor::QueryProcessor,
server::routes::{network_ratelimiter, slow_ratelimiter},
server::create_server,
util::public_key,
};

use server::{routes, ServerOptions};
use server::ServerOptions;

mod allocation_monitor;
mod attestation_signers;
Expand Down Expand Up @@ -138,68 +128,11 @@ async fn main() -> Result<(), std::io::Error> {
config.network_subgraph.serve_network_subgraph,
);

//defineCostModelModels
// defineCostModelModels
let schema = Schema::build(QueryRoot, EmptyMutation, EmptySubscription).finish();

info!("Initialized server options");
let app = Router::new()
.route("/", get(routes::basic::index))
.route("/health", get(routes::basic::health))
.route("/version", get(routes::basic::version))
.route(
"/status",
post(routes::status::status_queries)
.layer(AddExtensionLayer::new(network_ratelimiter())),
)
.route(
"/subgraphs/health/:deployment",
get(routes::deployment::deployment_health
.layer(AddExtensionLayer::new(slow_ratelimiter()))),
)
.route(
"/cost",
post(routes::cost::graphql_handler)
.get(routes::cost::graphql_handler)
.layer(AddExtensionLayer::new(slow_ratelimiter())),
)
.nest(
"/operator",
routes::basic::create_operator_server(service_options.clone())
.layer(AddExtensionLayer::new(slow_ratelimiter())),
)
.route(
"/network",
post(routes::network::network_queries)
.layer(AddExtensionLayer::new(network_ratelimiter())),
)
.route(
"/subgraphs/id/:id",
post(routes::subgraphs::subgraph_queries),
)
.layer(Extension(schema))
.layer(Extension(service_options.clone()))
.layer(CorsLayer::new().allow_methods([Method::GET, Method::POST]))
.layer(
// Handle error for timeout, ratelimit, or a general internal server error
ServiceBuilder::new()
.layer(HandleErrorLayer::new(|error: BoxError| async move {
if error.is::<tower::timeout::error::Elapsed>() {
Ok(StatusCode::REQUEST_TIMEOUT)
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Unhandled internal error: {}", error),
))
}
}))
.layer(
TraceLayer::new_for_http()
.make_span_with(trace::DefaultMakeSpan::new().level(Level::DEBUG))
.on_response(trace::DefaultOnResponse::new().level(Level::DEBUG)),
)
.timeout(Duration::from_secs(10))
.into_inner(),
);
let app = create_server(service_options, schema).await;

let addr = SocketAddr::from_str(&format!("0.0.0.0:{}", config.indexer_infrastructure.port))
.expect("Start server port");
Expand Down
135 changes: 86 additions & 49 deletions service/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use async_graphql::{EmptyMutation, EmptySubscription, Schema};
use axum::{
error_handling::HandleErrorLayer,
handler::Handler,
http::{Method, StatusCode},
routing::get,
};
use axum::{routing::post, Extension, Router};

use std::time::Duration;
use tower::{BoxError, ServiceBuilder};
use tower_http::{
add_extension::AddExtensionLayer,
cors::CorsLayer,
trace::{self, TraceLayer},
};
use tracing::Level;

use crate::{
common::indexer_management_client::IndexerManagementClient, query_processor::QueryProcessor,
util::PackageVersion, NetworkSubgraph,
common::indexer_management_client::{IndexerManagementClient, QueryRoot},
common::network_subgraph::NetworkSubgraph,
query_processor::QueryProcessor,
server::routes::{network_ratelimiter, slow_ratelimiter},
util::PackageVersion,
};

pub mod routes;
Expand Down Expand Up @@ -53,50 +74,66 @@ impl ServerOptions {
}
}

// ADD: Endpoint for public status API, public cost API, information
// subgraph health checks

// // Endpoint for the public status API
// #[get("/status")]
// async fn status(
// server: web::Data<ServerOptions>,
// // graph_node_status_endpoint: Data<GraphStatusEndpoint>,
// query: web::Bytes,
// ) -> impl Responder {
// // Implementation for creating status server
// // Replace `createStatusServer` with your logic
// match response {
// Ok(result) => HttpResponse::Ok().json(result),
// Err(error) => HttpResponse::InternalServerError().json(error),
// }
// }

// // Endpoint for subgraph health checks
// #[post("/subgraphs/health")]
// async fn subgraph_health(
// graph_node_status_endpoint: Data<GraphStatusEndpoint>,
// ) -> impl Responder {
// // Implementation for creating deployment health server
// // Replace `createDeploymentHealthServer` with your logic
// let response = createDeploymentHealthServer(graph_node_status_endpoint.get_ref()).await;
// match response {
// Ok(result) => HttpResponse::Ok().json(result),
// Err(error) => HttpResponse::InternalServerError().json(error),
// }
// }

// // Endpoint for the public cost API
// #[post("/cost")]
// async fn cost(
// indexer_management_client: Data<IndexerManagementClient>,
// metrics: Data<Metrics>,
// payload: Json<CostPayload>,
// ) -> impl Responder {
// // Implementation for creating cost server
// // Replace `createCostServer` with your logic
// let response = createCostServer(indexer_management_client.get_ref(), metrics.get_ref(), payload.into_inner()).await;
// match response {
// Ok(result) => HttpResponse::Ok().json(result),
// Err(error) => HttpResponse::InternalServerError().json(error),
// }
// }
pub async fn create_server(
options: ServerOptions,
schema: Schema<QueryRoot, EmptyMutation, EmptySubscription>,
) -> Router {
Router::new()
.route("/", get(routes::basic::index))
.route("/health", get(routes::basic::health))
.route("/version", get(routes::basic::version))
.route(
"/status",
post(routes::status::status_queries)
.layer(AddExtensionLayer::new(network_ratelimiter())),
)
.route(
"/subgraphs/health/:deployment",
get(routes::deployment::deployment_health
.layer(AddExtensionLayer::new(slow_ratelimiter()))),
)
.route(
"/cost",
post(routes::cost::graphql_handler)
.get(routes::cost::graphql_handler)
.layer(AddExtensionLayer::new(slow_ratelimiter())),
)
.nest(
"/operator",
routes::basic::create_operator_server(options.clone())
.layer(AddExtensionLayer::new(slow_ratelimiter())),
)
.route(
"/network",
post(routes::network::network_queries)
.layer(AddExtensionLayer::new(network_ratelimiter())),
)
.route(
"/subgraphs/id/:id",
post(routes::subgraphs::subgraph_queries),
)
.layer(Extension(schema))
.layer(Extension(options.clone()))
.layer(CorsLayer::new().allow_methods([Method::GET, Method::POST]))
.layer(
// Handle error for timeout, ratelimit, or a general internal server error
ServiceBuilder::new()
.layer(HandleErrorLayer::new(|error: BoxError| async move {
if error.is::<tower::timeout::error::Elapsed>() {
Ok(StatusCode::REQUEST_TIMEOUT)
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Unhandled internal error: {}", error),
))
}
}))
.layer(
TraceLayer::new_for_http()
.make_span_with(trace::DefaultMakeSpan::new().level(Level::DEBUG))
.on_response(trace::DefaultOnResponse::new().level(Level::DEBUG)),
)
.timeout(Duration::from_secs(10))
.into_inner(),
)
}
16 changes: 2 additions & 14 deletions service/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@

use ethereum_types::Address;
use ethereum_types::U256;
use ethers::signers::{
coins_bip39::English, LocalWallet, MnemonicBuilder, Signer, Wallet, WalletError,
};
use ethers::signers::WalletError;
use ethers_core::k256::ecdsa::SigningKey;

use native::attestation::AttestationSigner;
Expand All @@ -22,6 +20,7 @@ use tracing::{
};
use tracing_subscriber::{EnvFilter, FmtSubscriber};

use crate::common::address::{build_wallet, wallet_address};
use crate::common::indexer_error::{indexer_error, IndexerError};

/// Struct for version control
Expand Down Expand Up @@ -73,17 +72,6 @@ pub fn package_version() -> Result<PackageVersion, IndexerError> {
})
}

pub fn build_wallet(value: &str) -> Result<Wallet<SigningKey>, WalletError> {
value
.parse::<LocalWallet>()
.or(MnemonicBuilder::<English>::default().phrase(value).build())
}

/// Get wallet public address to String
pub fn wallet_address(wallet: &Wallet<SigningKey>) -> String {
format!("{:?}", wallet.address())
}

/// Validate that private key as an Eth wallet
pub fn public_key(value: &str) -> Result<String, WalletError> {
// The wallet can be stored instead of the original private key
Expand Down

0 comments on commit fb5e144

Please sign in to comment.