Skip to content

Commit

Permalink
refactor: record some indexer_errors
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Oct 23, 2023
1 parent e21f9ef commit 56c6517
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 37 deletions.
2 changes: 1 addition & 1 deletion common/src/subgraph_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl SubgraphClient {
subgraph_url: &str,
) -> Result<Self, anyhow::Error> {
// TODO: Check indexing status of the local subgraph deployment
// if the deployment is healthy and synced, use local_subgraoh_endpoint
// if the deployment is healthy and synced, use local_subgraph_endpoint
let _local_subgraph_endpoint = match (graph_node_query_endpoint, deployment) {
(Some(endpoint), Some(id)) => Some(Self::local_deployment_endpoint(endpoint, id)?),
_ => None,
Expand Down
39 changes: 34 additions & 5 deletions service/src/query_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use tap_core::tap_manager::SignedReceipt;
use toolshed::thegraph::attestation::Attestation;
use toolshed::thegraph::DeploymentId;

use crate::metrics;
use indexer_common::indexer_errors::IndexerErrorCode;
use indexer_common::prelude::AttestationSigner;

use crate::graph_node::GraphNodeInstance;
Expand Down Expand Up @@ -91,7 +93,14 @@ impl QueryProcessor {
let response = self
.graph_node
.subgraph_query_raw(&query.subgraph_deployment_id, query.query)
.await?;
.await
.map_err(|e| {
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE033.to_string()])
.inc();

e
})?;

Ok(Response {
result: response,
Expand All @@ -109,22 +118,42 @@ impl QueryProcessor {
receipt,
} = query;

// TODO: Emit IndexerErrorCode::IE031 on error
let parsed_receipt: SignedReceipt = serde_json::from_str(&receipt)
.map_err(|e| QueryError::Other(anyhow::Error::from(e)))?;
let parsed_receipt: SignedReceipt = match serde_json::from_str(&receipt)
.map_err(|e| QueryError::Other(anyhow::Error::from(e)))
{
Ok(r) => r,
Err(e) => {
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE031.to_string()])
.inc();

return Err(e);
}
};

let allocation_id = parsed_receipt.message.allocation_id;

self.tap_manager
.verify_and_store_receipt(parsed_receipt)
.await
.map_err(QueryError::Other)?;
.map_err(|e| {
//TODO: fit indexer errors to TAP better, currently keeping the old messages
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE053.to_string()])
.inc();

QueryError::Other(e)
})?;

let signers = self
.attestation_signers
.value_immediate()
.ok_or_else(|| QueryError::Other(anyhow::anyhow!("System is not ready yet")))?;
let signer = signers.get(&allocation_id).ok_or_else(|| {
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE022.to_string()])
.inc();

QueryError::Other(anyhow::anyhow!(
"No signer found for allocation id {}",
allocation_id
Expand Down
18 changes: 14 additions & 4 deletions service/src/server/routes/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use hyper::body::Bytes;

use reqwest::{header, Client};

use crate::server::ServerOptions;
use indexer_common::graphql::filter_supported_fields;
use crate::{metrics, server::ServerOptions};
use indexer_common::{graphql::filter_supported_fields, indexer_errors::IndexerErrorCode};

use super::bad_request_response;

Expand Down Expand Up @@ -67,8 +67,18 @@ pub async fn status_queries(
match request.send().await {
Ok(r) => match r.json::<Box<serde_json::value::RawValue>>().await {
Ok(r) => (StatusCode::OK, Json(r)).into_response(),
Err(e) => bad_request_response(&e.to_string()),
Err(e) => {
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE018.to_string()])
.inc();
bad_request_response(&e.to_string())
}
},
Err(e) => bad_request_response(&e.to_string()),
Err(e) => {
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE018.to_string()])
.inc();
bad_request_response(&e.to_string())
}
}
}
47 changes: 24 additions & 23 deletions service/src/server/routes/subgraphs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
ServerOptions,
},
};
use indexer_common::indexer_errors::IndexerErrorCode;

/// Parse an incoming query request and route queries with authenticated
/// free query token to graph node
Expand Down Expand Up @@ -52,6 +53,9 @@ pub async fn subgraph_queries(
metrics::QUERIES_WITH_INVALID_RECEIPT_HEADER
.with_label_values(&[&deployment_label])
.inc();
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE029.to_string()])
.inc();
return bad_request_response("Bad scalar receipt for subgraph query");
}
}
Expand Down Expand Up @@ -87,34 +91,28 @@ pub async fn subgraph_queries(
query: query_string,
};

// TODO: Emit IndexerErrorCode::IE033 on error
let res = server
.query_processor
.execute_free_query(free_query)
.await
.expect("Failed to execute free query");
query_duration_timer.observe_duration();
match res.status {
200 => (StatusCode::OK, Json(res.result)).into_response(),
_ => bad_request_response("Bad response from Graph node"),
match server.query_processor.execute_free_query(free_query).await {
Ok(res) if res.status == 200 => {
query_duration_timer.observe_duration();
(StatusCode::OK, Json(res.result)).into_response()
}
_ => {
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE033.to_string()])
.inc();
bad_request_response("Failed to execute free query")
}
}
} else if receipt.is_some() {
} else if let Some(receipt) = receipt {
let paid_query = crate::query_processor::PaidQuery {
subgraph_deployment_id,
query: query_string,
receipt: receipt.unwrap().to_string(),
receipt: receipt.to_string(),
};

// TODO: Emit IndexerErrorCode::IE032 on error
let res = server
.query_processor
.execute_paid_query(paid_query)
.await
.expect("Failed to execute paid query");

query_duration_timer.observe_duration();
match res.status {
200 => {
match server.query_processor.execute_paid_query(paid_query).await {
Ok(res) if res.status == 200 => {
query_duration_timer.observe_duration();
metrics::SUCCESSFUL_QUERIES
.with_label_values(&[&deployment_label])
.inc();
Expand All @@ -124,7 +122,10 @@ pub async fn subgraph_queries(
metrics::FAILED_QUERIES
.with_label_values(&[&deployment_label])
.inc();
bad_request_response("Bad response from Graph node")
metrics::INDEXER_ERROR
.with_label_values(&[&IndexerErrorCode::IE032.to_string()])
.inc();
return bad_request_response("Failed to execute paid query");
}
}
} else {
Expand Down
13 changes: 9 additions & 4 deletions service/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ pub fn package_version() -> Result<PackageVersion, IndexerError> {
.as_str()
.unwrap()
.to_string();
let dependencies = pkg.get("dependencies").and_then(|d| d.as_table()).unwrap();
let indexer_native = dependencies
.get("indexer-native")
.map(|d| d.as_str().unwrap().to_string());
let dependencies = pkg
.get("dependencies")
.and_then(|d| d.as_table())
.expect("Parse package dependencies");
let indexer_native = dependencies.get("indexer-native").map(|d| {
d.as_str()
.expect("Parse indexer-service dependency version")
.to_string()
});

let release = PackageVersion {
version,
Expand Down

0 comments on commit 56c6517

Please sign in to comment.