From 56c651783dd44cd80592dfff6d8e2d216cef660a Mon Sep 17 00:00:00 2001 From: hopeyen Date: Mon, 23 Oct 2023 11:23:57 +0300 Subject: [PATCH] refactor: record some indexer_errors --- common/src/subgraph_client.rs | 2 +- service/src/query_processor.rs | 39 ++++++++++++++++++--- service/src/server/routes/status.rs | 18 +++++++--- service/src/server/routes/subgraphs.rs | 47 +++++++++++++------------- service/src/util.rs | 13 ++++--- 5 files changed, 82 insertions(+), 37 deletions(-) diff --git a/common/src/subgraph_client.rs b/common/src/subgraph_client.rs index 23fce374a..7bcb8b3f5 100644 --- a/common/src/subgraph_client.rs +++ b/common/src/subgraph_client.rs @@ -26,7 +26,7 @@ impl SubgraphClient { subgraph_url: &str, ) -> Result { // TODO: Check indexing status of the local subgraph deployment - // if the deployment is healthy and synced, use local_subgraoh_endpoint + // if the deployment is healthy and synced, use local_subgraph_endpoint let _local_subgraph_endpoint = match (graph_node_query_endpoint, deployment) { (Some(endpoint), Some(id)) => Some(Self::local_deployment_endpoint(endpoint, id)?), _ => None, diff --git a/service/src/query_processor.rs b/service/src/query_processor.rs index 9dd9084b4..00c432898 100644 --- a/service/src/query_processor.rs +++ b/service/src/query_processor.rs @@ -12,6 +12,8 @@ use tap_core::tap_manager::SignedReceipt; use toolshed::thegraph::attestation::Attestation; use toolshed::thegraph::DeploymentId; +use crate::metrics; +use indexer_common::indexer_errors::IndexerErrorCode; use indexer_common::prelude::AttestationSigner; use crate::graph_node::GraphNodeInstance; @@ -91,7 +93,14 @@ impl QueryProcessor { let response = self .graph_node .subgraph_query_raw(&query.subgraph_deployment_id, query.query) - .await?; + .await + .map_err(|e| { + metrics::INDEXER_ERROR + .with_label_values(&[&IndexerErrorCode::IE033.to_string()]) + .inc(); + + e + })?; Ok(Response { result: response, @@ -109,22 +118,42 @@ impl QueryProcessor { receipt, } = query; - // TODO: Emit IndexerErrorCode::IE031 on error - let parsed_receipt: SignedReceipt = serde_json::from_str(&receipt) - .map_err(|e| QueryError::Other(anyhow::Error::from(e)))?; + let parsed_receipt: SignedReceipt = match serde_json::from_str(&receipt) + .map_err(|e| QueryError::Other(anyhow::Error::from(e))) + { + Ok(r) => r, + Err(e) => { + metrics::INDEXER_ERROR + .with_label_values(&[&IndexerErrorCode::IE031.to_string()]) + .inc(); + + return Err(e); + } + }; let allocation_id = parsed_receipt.message.allocation_id; self.tap_manager .verify_and_store_receipt(parsed_receipt) .await - .map_err(QueryError::Other)?; + .map_err(|e| { + //TODO: fit indexer errors to TAP better, currently keeping the old messages + metrics::INDEXER_ERROR + .with_label_values(&[&IndexerErrorCode::IE053.to_string()]) + .inc(); + + QueryError::Other(e) + })?; let signers = self .attestation_signers .value_immediate() .ok_or_else(|| QueryError::Other(anyhow::anyhow!("System is not ready yet")))?; let signer = signers.get(&allocation_id).ok_or_else(|| { + metrics::INDEXER_ERROR + .with_label_values(&[&IndexerErrorCode::IE022.to_string()]) + .inc(); + QueryError::Other(anyhow::anyhow!( "No signer found for allocation id {}", allocation_id diff --git a/service/src/server/routes/status.rs b/service/src/server/routes/status.rs index 65a0eba78..2cbfb7ab2 100644 --- a/service/src/server/routes/status.rs +++ b/service/src/server/routes/status.rs @@ -13,8 +13,8 @@ use hyper::body::Bytes; use reqwest::{header, Client}; -use crate::server::ServerOptions; -use indexer_common::graphql::filter_supported_fields; +use crate::{metrics, server::ServerOptions}; +use indexer_common::{graphql::filter_supported_fields, indexer_errors::IndexerErrorCode}; use super::bad_request_response; @@ -67,8 +67,18 @@ pub async fn status_queries( match request.send().await { Ok(r) => match r.json::>().await { Ok(r) => (StatusCode::OK, Json(r)).into_response(), - Err(e) => bad_request_response(&e.to_string()), + Err(e) => { + metrics::INDEXER_ERROR + .with_label_values(&[&IndexerErrorCode::IE018.to_string()]) + .inc(); + bad_request_response(&e.to_string()) + } }, - Err(e) => bad_request_response(&e.to_string()), + Err(e) => { + metrics::INDEXER_ERROR + .with_label_values(&[&IndexerErrorCode::IE018.to_string()]) + .inc(); + bad_request_response(&e.to_string()) + } } } diff --git a/service/src/server/routes/subgraphs.rs b/service/src/server/routes/subgraphs.rs index 128ff9f9b..cd3952642 100644 --- a/service/src/server/routes/subgraphs.rs +++ b/service/src/server/routes/subgraphs.rs @@ -19,6 +19,7 @@ use crate::{ ServerOptions, }, }; +use indexer_common::indexer_errors::IndexerErrorCode; /// Parse an incoming query request and route queries with authenticated /// free query token to graph node @@ -52,6 +53,9 @@ pub async fn subgraph_queries( metrics::QUERIES_WITH_INVALID_RECEIPT_HEADER .with_label_values(&[&deployment_label]) .inc(); + metrics::INDEXER_ERROR + .with_label_values(&[&IndexerErrorCode::IE029.to_string()]) + .inc(); return bad_request_response("Bad scalar receipt for subgraph query"); } } @@ -87,34 +91,28 @@ pub async fn subgraph_queries( query: query_string, }; - // TODO: Emit IndexerErrorCode::IE033 on error - let res = server - .query_processor - .execute_free_query(free_query) - .await - .expect("Failed to execute free query"); - query_duration_timer.observe_duration(); - match res.status { - 200 => (StatusCode::OK, Json(res.result)).into_response(), - _ => bad_request_response("Bad response from Graph node"), + match server.query_processor.execute_free_query(free_query).await { + Ok(res) if res.status == 200 => { + query_duration_timer.observe_duration(); + (StatusCode::OK, Json(res.result)).into_response() + } + _ => { + metrics::INDEXER_ERROR + .with_label_values(&[&IndexerErrorCode::IE033.to_string()]) + .inc(); + bad_request_response("Failed to execute free query") + } } - } else if receipt.is_some() { + } else if let Some(receipt) = receipt { let paid_query = crate::query_processor::PaidQuery { subgraph_deployment_id, query: query_string, - receipt: receipt.unwrap().to_string(), + receipt: receipt.to_string(), }; - // TODO: Emit IndexerErrorCode::IE032 on error - let res = server - .query_processor - .execute_paid_query(paid_query) - .await - .expect("Failed to execute paid query"); - - query_duration_timer.observe_duration(); - match res.status { - 200 => { + match server.query_processor.execute_paid_query(paid_query).await { + Ok(res) if res.status == 200 => { + query_duration_timer.observe_duration(); metrics::SUCCESSFUL_QUERIES .with_label_values(&[&deployment_label]) .inc(); @@ -124,7 +122,10 @@ pub async fn subgraph_queries( metrics::FAILED_QUERIES .with_label_values(&[&deployment_label]) .inc(); - bad_request_response("Bad response from Graph node") + metrics::INDEXER_ERROR + .with_label_values(&[&IndexerErrorCode::IE032.to_string()]) + .inc(); + return bad_request_response("Failed to execute paid query"); } } } else { diff --git a/service/src/util.rs b/service/src/util.rs index 4a103d7ff..9592dbf07 100644 --- a/service/src/util.rs +++ b/service/src/util.rs @@ -43,10 +43,15 @@ pub fn package_version() -> Result { .as_str() .unwrap() .to_string(); - let dependencies = pkg.get("dependencies").and_then(|d| d.as_table()).unwrap(); - let indexer_native = dependencies - .get("indexer-native") - .map(|d| d.as_str().unwrap().to_string()); + let dependencies = pkg + .get("dependencies") + .and_then(|d| d.as_table()) + .expect("Parse package dependencies"); + let indexer_native = dependencies.get("indexer-native").map(|d| { + d.as_str() + .expect("Parse indexer-service dependency version") + .to_string() + }); let release = PackageVersion { version,