diff --git a/src/indexer_selection/indexers.rs b/src/indexer_selection/indexers.rs index ebcee217..451ef440 100644 --- a/src/indexer_selection/indexers.rs +++ b/src/indexer_selection/indexers.rs @@ -1,12 +1,13 @@ use crate::prelude::*; +use std::sync::Arc; pub struct IndexerDataReader { - pub url: Eventual, + pub url: Eventual>, pub stake: Eventual, } pub struct IndexerDataWriter { - pub url: EventualWriter, + pub url: EventualWriter>, pub stake: EventualWriter, } diff --git a/src/indexer_selection/mod.rs b/src/indexer_selection/mod.rs index 37133613..c02d79ad 100644 --- a/src/indexer_selection/mod.rs +++ b/src/indexer_selection/mod.rs @@ -38,7 +38,7 @@ pub use ordered_float::NotNan; use prometheus; use rand::{thread_rng, Rng as _}; pub use secp256k1::SecretKey; -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use utility::*; pub type Context<'c> = cost_model::Context<'c, &'c str>; @@ -149,7 +149,7 @@ pub struct UtilityConfig { #[derive(Clone, Debug)] pub struct IndexerScore { - pub url: String, + pub url: Arc, pub fee: GRT, pub slashable: USD, pub utility: NotNan, diff --git a/src/indexer_selection/tests.rs b/src/indexer_selection/tests.rs index eada274f..e7130fe1 100644 --- a/src/indexer_selection/tests.rs +++ b/src/indexer_selection/tests.rs @@ -12,7 +12,7 @@ use plotters::{ }; use rand::{thread_rng, Rng as _}; use secp256k1::SecretKey; -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; #[derive(Clone)] struct IndexerCharacteristics { @@ -267,7 +267,7 @@ async fn run_simulation( latest: latest.number, }); let indexer_writer = input_writers.indexers.write(&indexing.indexer).await; - indexer_writer.url.write("".to_string()); + indexer_writer.url.write(Arc::default()); indexer_writer.stake.write(data.stake); if let Some(special_weight) = data.special_weight { special_indexers.insert(indexing.indexer, special_weight.try_into().unwrap()); diff --git a/src/main.rs b/src/main.rs index d96eca89..3028cac6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -400,7 +400,7 @@ async fn handle_subgraph_query( "handle_subgraph_query", ray_id = %query.ray_id, query_id = %query.id, - deployment = %query.subgraph.as_ref().unwrap().deployment, + %deployment, network = %query.subgraph.as_ref().unwrap().network, ); let api_key = request.match_info().get("api_key").unwrap_or(""); @@ -420,7 +420,7 @@ async fn handle_subgraph_query( tracing::info!( ray_id = %query.ray_id, query_id = %query.id, - deployment = %query.subgraph.as_ref().unwrap().deployment, + %deployment, network = %query.subgraph.as_ref().unwrap().network, %api_key, query = %query.query, @@ -437,8 +437,11 @@ async fn handle_subgraph_query( tracing::info!( ray_id = %query.ray_id, query_id = %query.id, + api_key = %api_key, + %deployment, attempt_index, indexer = %attempt.indexer, + url = %attempt.score.url, allocation = %attempt.allocation, fee = %attempt.score.fee, utility = *attempt.score.utility, diff --git a/src/query_engine/mod.rs b/src/query_engine/mod.rs index 94374170..6f4a7da5 100644 --- a/src/query_engine/mod.rs +++ b/src/query_engine/mod.rs @@ -315,11 +315,6 @@ where .await; selection_timer.map(|t| t.observe_duration()); - match &selection_result { - Ok(None) => tracing::info!(err = ?NoIndexerSelected), - Err(err) => tracing::info!(?err), - _ => (), - }; let (indexer_query, scoring_sample) = match selection_result { Ok(Some(indexer_query)) => indexer_query, Ok(None) => return Err(NoIndexerSelected), @@ -350,9 +345,12 @@ where ) .await; assert_eq!(retry_count + 1, query.indexer_attempts.len()); - if let Ok(()) = result { - return Ok(()); - } + match result { + Ok(()) => return Ok(()), + Err(rejection) => { + query.indexer_attempts.last_mut().unwrap().rejection = Some(rejection); + } + }; } tracing::info!("retry limit reached"); Err(NoIndexerSelected) @@ -369,6 +367,7 @@ where query_id = %query.id, deployment = %query.subgraph.as_ref().unwrap().deployment, %indexer, + url = %score.url, fee = %score.fee, slashable = %score.slashable, utility = *score.utility, @@ -406,9 +405,8 @@ where deployment_id: &str, context: &mut Context<'_>, block_resolver: &BlockResolver, - ) -> Result<(), ()> { + ) -> Result<(), String> { let indexer_id = indexer_query.indexing.indexer.to_string(); - tracing::info!(indexer = %indexer_id); self.observe_indexer_selection_metrics(deployment_id, &indexer_query); let t0 = Instant::now(); let result = self.indexer_client.query_indexer(&indexer_query).await; @@ -440,7 +438,7 @@ where &[&deployment_id, &indexer_id], |counter| counter.inc(), ); - return Err(()); + return Err(err.to_string()); } }; with_metric( @@ -451,7 +449,6 @@ where let subgraph = query.subgraph.as_ref().unwrap(); if !subgraph.features.is_empty() && response.attestation.is_none() { - tracing::info!(indexer_response_err = "Attestable response has no attestation"); self.indexers .observe_failed_query( &indexer_query.indexing, @@ -459,10 +456,10 @@ where IndexerError::NoAttestation, ) .await; - return Err(()); + return Err("Attestable response has no attestation".into()); } - if let Err(remove_indexer) = self + if let Err(rejection) = self .check_unattestable_responses( context, &block_resolver, @@ -477,7 +474,7 @@ where &[&deployment_id, &indexer_id], |counter| counter.inc(), ); - return Err(remove_indexer); + return Err(rejection); } if let Some(attestation) = &response.attestation { @@ -530,30 +527,28 @@ where indexing: &Indexing, receipt: &Receipt, response: &IndexerResponse, - ) -> Result<(), ()> { + ) -> Result<(), String> { // Special-casing for a few known indexer errors; the block scope here // is just to separate the code neatly from the rest - let parsed_response = - serde_json::from_str::>>(&response.payload).map_err(|_| ())?; + let parsed_response = serde_json::from_str::>>(&response.payload) + .map_err(|_| "invalid indexer response")?; if indexer_response_has_error( &parsed_response, "Failed to decode `block.hash` value: `no block with that hash found`", ) { - tracing::info!(indexer_response_err = "indexing behind"); self.indexers .observe_indexing_behind(context, indexing, block_resolver) .await; - return Err(()); + return Err("indexer failed to resolve block".into()); } if indexer_response_has_error(&parsed_response, "panic processing query") { - tracing::info!(indexer_response_err = "panic processing query"); self.indexers .observe_failed_query(indexing, receipt, IndexerError::NondeterministicResponse) .await; - return Err(()); + return Err("indexer panicked processing query".into()); } Ok(()) diff --git a/src/query_engine/tests.rs b/src/query_engine/tests.rs index 1aa56a89..b23499b0 100644 --- a/src/query_engine/tests.rs +++ b/src/query_engine/tests.rs @@ -342,7 +342,7 @@ impl Topology { let stake_table = [0.0, 50e3, 100e3, 150e3]; for indexer in self.indexers.values() { let indexer_writer = indexer_inputs.indexers.write(&indexer.id).await; - indexer_writer.url.write("".into()); + indexer_writer.url.write(Arc::default()); indexer_writer .stake .write(indexer.staked_grt.as_udecimal(&stake_table)); diff --git a/src/sync_client.rs b/src/sync_client.rs index 8f503ce3..82f3807e 100644 --- a/src/sync_client.rs +++ b/src/sync_client.rs @@ -620,7 +620,7 @@ fn handle_indexers( let mut indexers = indexers.lock().await; for (indexer, status) in statuses { let indexer = indexers.write(&indexer).await; - indexer.url.write(status.url); + indexer.url.write(Arc::new(status.url)); indexer.stake.write(status.staked); } }