Skip to content

Commit

Permalink
Stop removing indexers on retry
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Feb 17, 2022
1 parent 6e52b6a commit 668b706
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 38 deletions.
5 changes: 2 additions & 3 deletions src/indexer_selection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use crate::{
},
};
use cost_model;
use im;
use lazy_static::lazy_static;
use num_traits::identities::Zero as _;
pub use ordered_float::NotNan;
Expand Down Expand Up @@ -313,7 +312,7 @@ impl Indexers {
config: &UtilityConfig,
network: &str,
subgraph: &SubgraphDeploymentID,
indexers: &im::Vector<Address>,
indexers: &[Address],
context: &mut Context<'_>,
block_resolver: &BlockResolver,
freshness_requirements: &BlockRequirements,
Expand Down Expand Up @@ -384,7 +383,7 @@ impl Indexers {
deployment: &SubgraphDeploymentID,
context: &mut Context<'_>,
block_resolver: &BlockResolver,
indexers: &im::Vector<Address>,
indexers: &[Address],
budget: USD,
freshness_requirements: &BlockRequirements,
) -> Result<Option<Selection>, SelectionError> {
Expand Down
9 changes: 6 additions & 3 deletions src/indexer_selection/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ async fn run_simulation(
let deployment: SubgraphDeploymentID = bytes_from_id(99).into();

let mut results = Vec::<IndexerResults>::new();
let mut indexer_ids = im::Vector::new();
let mut indexer_ids = Vec::new();
let test_key = SecretKey::from_str(TEST_KEY).unwrap();
let mut special_indexers = HashMap::<Address, NotNan<f64>>::new();
for data in tests.iter() {
Expand All @@ -254,7 +254,7 @@ async fn run_simulation(
indexer: bytes_from_id(indexer_ids.len()).into(),
deployment,
};
indexer_ids.push_back(indexing.indexer);
indexer_ids.push(indexing.indexer);
let indexing_writer = input_writers.indexings.write(&indexing).await;
indexing_writer
.cost_model
Expand Down Expand Up @@ -310,7 +310,10 @@ async fn run_simulation(
Some((query, _)) => query,
None => continue,
};
let index = indexer_ids.index_of(&query.indexing.indexer).unwrap();
let index = indexer_ids
.iter()
.position(|id| id == &query.indexing.indexer)
.unwrap();
let entry = results.get_mut(index).unwrap();
entry.queries_received += 1;
let data = tests.get(index).unwrap();
Expand Down
43 changes: 15 additions & 28 deletions src/query_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub use crate::{
prelude::*,
};
pub use graphql_client::Response;
use im;
use lazy_static::lazy_static;
use prometheus;
use serde_json::value::RawValue;
Expand Down Expand Up @@ -137,11 +136,6 @@ impl From<SelectionError> for QueryEngineError {
}
}

enum RemoveIndexer {
Yes,
No,
}

#[derive(Clone)]
pub struct Config {
pub indexer_selection_retry_limit: usize,
Expand All @@ -153,15 +147,14 @@ pub struct Config {
pub struct Inputs {
pub indexers: Arc<Indexers>,
pub current_deployments: Eventual<Ptr<HashMap<SubgraphID, SubgraphDeploymentID>>>,
pub deployment_indexers: Eventual<Ptr<HashMap<SubgraphDeploymentID, im::Vector<Address>>>>,
pub deployment_indexers: Eventual<Ptr<HashMap<SubgraphDeploymentID, Vec<Address>>>>,
}

pub struct InputWriters {
pub indexer_inputs: indexer_selection::InputWriters,
pub indexers: Arc<Indexers>,
pub current_deployments: EventualWriter<Ptr<HashMap<SubgraphID, SubgraphDeploymentID>>>,
pub deployment_indexers:
EventualWriter<Ptr<HashMap<SubgraphDeploymentID, im::Vector<Address>>>>,
pub deployment_indexers: EventualWriter<Ptr<HashMap<SubgraphDeploymentID, Vec<Address>>>>,
}

impl Inputs {
Expand Down Expand Up @@ -192,7 +185,7 @@ where
F: FishermanInterface + Clone + Send,
{
indexers: Arc<Indexers>,
deployment_indexers: Eventual<Ptr<HashMap<SubgraphDeploymentID, im::Vector<Address>>>>,
deployment_indexers: Eventual<Ptr<HashMap<SubgraphDeploymentID, Vec<Address>>>>,
block_resolvers: Arc<HashMap<String, BlockResolver>>,
indexer_client: I,
fisherman_client: Option<Arc<F>>,
Expand Down Expand Up @@ -247,7 +240,7 @@ where
) -> Result<(), QueryEngineError> {
use QueryEngineError::*;
let subgraph = query.subgraph.as_ref().unwrap().clone();
let mut indexers = self
let indexers = self
.deployment_indexers
.value_immediate()
.and_then(|map| map.get(&subgraph.deployment).cloned())
Expand Down Expand Up @@ -347,7 +340,6 @@ where
}
_ => (),
};
let indexer = indexer_query.indexing.indexer;
let result = self
.execute_indexer_query(
query,
Expand All @@ -358,14 +350,9 @@ where
)
.await;
assert_eq!(retry_count + 1, query.indexer_attempts.len());
match result {
Ok(response) => return Ok(response),
Err(RemoveIndexer::No) => (),
Err(RemoveIndexer::Yes) => {
// TODO: There should be a penalty here, but the indexer should not be removed.
indexers.remove(indexers.iter().position(|i| i == &indexer).unwrap());
}
};
if let Ok(()) = result {
return Ok(());
}
}
tracing::info!("retry limit reached");
Err(NoIndexerSelected)
Expand Down Expand Up @@ -419,7 +406,7 @@ where
deployment_id: &str,
context: &mut Context<'_>,
block_resolver: &BlockResolver,
) -> Result<(), RemoveIndexer> {
) -> Result<(), ()> {
let indexer_id = indexer_query.indexing.indexer.to_string();
tracing::info!(indexer = %indexer_id);
self.observe_indexer_selection_metrics(deployment_id, &indexer_query);
Expand Down Expand Up @@ -453,7 +440,7 @@ where
&[&deployment_id, &indexer_id],
|counter| counter.inc(),
);
return Err(RemoveIndexer::Yes);
return Err(());
}
};
with_metric(
Expand All @@ -472,7 +459,7 @@ where
IndexerError::NoAttestation,
)
.await;
return Err(RemoveIndexer::Yes);
return Err(());
}

if let Err(remove_indexer) = self
Expand Down Expand Up @@ -543,12 +530,12 @@ where
indexing: &Indexing,
receipt: &Receipt,
response: &IndexerResponse,
) -> Result<(), RemoveIndexer> {
) -> Result<(), ()> {
// Special-casing for a few known indexer errors; the block scope here
// is just to separate the code neatly from the rest

let parsed_response = serde_json::from_str::<Response<Box<RawValue>>>(&response.payload)
.map_err(|_| RemoveIndexer::Yes)?;
let parsed_response =
serde_json::from_str::<Response<Box<RawValue>>>(&response.payload).map_err(|_| ())?;

if indexer_response_has_error(
&parsed_response,
Expand All @@ -558,15 +545,15 @@ where
self.indexers
.observe_indexing_behind(context, indexing, block_resolver)
.await;
return Err(RemoveIndexer::No);
return Err(());
}

if indexer_response_has_error(&parsed_response, "panic processing query") {
tracing::info!(indexer_response_err = "panic processing query");
self.indexers
.observe_failed_query(indexing, receipt, IndexerError::NondeterministicResponse)
.await;
return Err(RemoveIndexer::Yes);
return Err(());
}

Ok(())
Expand Down
5 changes: 1 addition & 4 deletions src/sync_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
};
use eventuals::EventualExt as _;
use graphql_client::{GraphQLQuery, Response};
use im;
use lazy_static::lazy_static;
use prometheus;
use reqwest;
Expand Down Expand Up @@ -591,9 +590,7 @@ fn handle_cost_models(

fn handle_indexers(
indexers: SharedLookupWriter<Address, IndexerDataReader, IndexerDataWriter>,
mut deployment_indexers: EventualWriter<
Ptr<HashMap<SubgraphDeploymentID, im::Vector<Address>>>,
>,
mut deployment_indexers: EventualWriter<Ptr<HashMap<SubgraphDeploymentID, Vec<Address>>>>,
indexer_statuses: Eventual<Ptr<Vec<(SubgraphDeploymentID, Vec<ParsedIndexerInfo>)>>>,
) {
let indexers = Arc::new(Mutex::new(indexers));
Expand Down

0 comments on commit 668b706

Please sign in to comment.