Skip to content

Commit

Permalink
feat: direct indexer query API (#867)
Browse files Browse the repository at this point in the history
implements #859
  • Loading branch information
Theodus authored Jul 15, 2024
1 parent b8f29d2 commit 9d1992b
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 12 deletions.
2 changes: 1 addition & 1 deletion gateway-framework/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl IntoResponse for Error {
}

#[derive(Debug, Clone, Default)]
pub struct IndexerErrors(BTreeMap<Address, IndexerError>);
pub struct IndexerErrors(pub BTreeMap<Address, IndexerError>);

impl std::ops::Deref for IndexerErrors {
type Target = BTreeMap<Address, IndexerError>;
Expand Down
172 changes: 163 additions & 9 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use alloy_primitives::{Address, BlockNumber};
use anyhow::anyhow;
use axum::{
body::Bytes,
extract::{OriginalUri, State},
extract::{OriginalUri, Path, State},
http::{HeaderMap, Response, StatusCode},
Extension,
};
Expand Down Expand Up @@ -142,7 +142,7 @@ pub async fn handle_query(
METRICS
.client_query
.duration
.observe(Instant::now().duration_since(start_time).as_secs_f64());
.observe(start_time.elapsed().as_secs_f64());

result.map(
|IndexerResponse {
Expand Down Expand Up @@ -318,9 +318,7 @@ async fn run_indexer_queries(

// If a client query cannot be handled by the available indexers, we should give a reason for
// all the available indexers in the `bad indexers` response.
while !candidates.is_empty()
&& (Instant::now().duration_since(start_time) < Duration::from_secs(60))
{
while !candidates.is_empty() && (start_time.elapsed() < Duration::from_secs(60)) {
let selections: ArrayVec<_, SELECTION_LIMIT> = indexer_selection::select(&candidates);
if selections.is_empty() {
// Candidates that would never be selected should be filtered out for improved errors.
Expand Down Expand Up @@ -390,8 +388,7 @@ async fn run_indexer_queries(
)
.in_current_span()
.await;
let response_time_ms =
Instant::now().duration_since(start_time).as_millis() as u16;
let response_time_ms = start_time.elapsed().as_millis() as u16;
let report = reports::IndexerRequest {
indexer,
deployment,
Expand All @@ -416,7 +413,7 @@ async fn run_indexer_queries(
match report.result.as_ref() {
Ok(response) if client_response_time.is_none() => {
let _ = client_response.try_send(Ok(response.clone()));
client_response_time = Some(Instant::now().duration_since(start_time));
client_response_time = Some(start_time.elapsed());
}
Ok(_) => (),
Err(err) => {
Expand Down Expand Up @@ -453,7 +450,7 @@ async fn run_indexer_queries(
// Send fallback error to use when no indexers are successful.
None => {
let _ = client_response.try_send(Err(Error::BadIndexers(indexer_errors.clone())));
Instant::now().duration_since(start_time)
start_time.elapsed()
}
};

Expand Down Expand Up @@ -784,6 +781,163 @@ impl From<network::ResolutionError> for IndexerError {
}
}

pub async fn handle_indexer_query(
State(ctx): State<Context>,
Extension(auth): Extension<AuthSettings>,
Extension(RequestId(request_id)): Extension<RequestId>,
_query_settings: Option<Extension<QuerySettings>>,
Path((deployment, indexer)): Path<(DeploymentId, Address)>,
payload: String,
) -> Result<Response<String>, Error> {
let start_time = Instant::now();

let bad_indexers = |err: IndexerError| -> Error {
Error::BadIndexers(IndexerErrors(BTreeMap::from_iter([(indexer, err)])))
};

let indexing_id = IndexingId {
deployment,
indexer,
};
let subgraph = resolve_subgraph_info(&ctx, &auth, QuerySelector::Deployment(deployment))
.await?
.map_err(|err| match err {
ResolutionError::TransferredToL2 { .. } => {
Error::SubgraphNotFound(anyhow!("deployment transferred to L2"))
}
})?;
let indexing = subgraph
.indexings
.get(&indexing_id)
.ok_or_else(|| Error::NoIndexers)?
.as_ref()
.map_err(|err| bad_indexers(err.clone().into()))?;

let (latest_block, blocks_per_minute) = {
let chain = ctx.chains.chain(&subgraph.chain);
let chain = chain.read();
let latest_block = chain.latest().map(|block| block.number);
(latest_block, chain.blocks_per_minute())
};
let blocks_behind = latest_block
.map(|head| head.saturating_sub(indexing.progress.latest_block))
.unwrap_or(0);
let seconds_behind = ((blocks_behind as f64 * 60.0) / blocks_per_minute as f64) as u32;

// Use budget as fee.
let grt_per_usd = *ctx.grt_per_usd.borrow();
let one_grt = NotNan::new(1e18).unwrap();
let fee = *(ctx.budgeter.query_fees_target.0 * grt_per_usd * one_grt) as u128;

let allocation = indexing.largest_allocation;
let receipt = match if indexing.indexer.tap_support {
ctx.receipt_signer.create_receipt(allocation, fee)
} else {
ctx.receipt_signer.create_legacy_receipt(allocation, fee)
} {
Ok(receipt) => receipt,
Err(err) => {
return Err(Error::Internal(anyhow!("failed to create receipt: {err}")));
}
};

let indexer_start_time = Instant::now();
let result = ctx
.indexer_client
.query_indexer(
&deployment,
&indexing.indexer.url,
&receipt,
ctx.attestation_domain,
&payload,
)
.in_current_span()
.await;
let response_time_ms = start_time.elapsed().as_millis() as u16;
let indexer_request = reports::IndexerRequest {
indexer: indexing_id.indexer,
deployment: indexing_id.deployment,
largest_allocation: allocation,
url: indexing.indexer.url.to_string(),
receipt,
subgraph_chain: subgraph.chain,
result: result.clone(),
response_time_ms: indexer_start_time.elapsed().as_millis() as u16,
seconds_behind,
blocks_behind,
request: payload,
};

let report_result = match &result {
Ok(_) => Ok(()),
Err(err) => Err(bad_indexers(err.clone())),
};
let result = result.map_err(bad_indexers);

let deployment = indexing_id.deployment.to_string();
let indexer = indexing_id.indexer.to_string();
let labels = [deployment.as_str(), indexer.as_str()];
METRICS
.indexer_query
.check(&labels, &indexer_request.result);
with_metric(&METRICS.indexer_query.duration, &labels, |hist| {
hist.observe(indexer_request.response_time_ms as f64)
});
match &result {
Ok(_) => METRICS.client_query.ok.inc(),
Err(_) => METRICS.client_query.err.inc(),
};
METRICS
.client_query
.duration
.observe(response_time_ms as f64);

tracing::info!(
indexer = ?indexer_request.indexer,
deployment = %indexer_request.deployment,
allocation = ?indexer_request.receipt.allocation(),
url = indexer_request.url,
result = ?indexer_request.result.as_ref().map(|_| ()),
response_time_ms = indexer_request.response_time_ms,
seconds_behind = indexer_request.seconds_behind,
fee = indexer_request.receipt.grt_value() as f64 * 1e-18,
"indexer_request"
);

ctx.indexing_perf.feedback(
indexer_request.indexer,
indexer_request.deployment,
indexer_request.result.is_ok(),
indexer_request.response_time_ms,
latest_block,
);

let _ = ctx.reporter.send(reports::ClientRequest {
id: request_id,
response_time_ms,
result: report_result,
api_key: auth.key,
user_address: auth.user,
grt_per_usd,
indexer_requests: vec![indexer_request],
});

result.map(
|IndexerResponse {
client_response,
attestation,
..
}| {
Response::builder()
.status(StatusCode::OK)
.header_typed(ContentType::json())
.header_typed(GraphAttestation(attestation))
.body(client_response)
.unwrap()
},
)
}

#[cfg(test)]
mod tests {
mod require_req_auth {
Expand Down
8 changes: 6 additions & 2 deletions graph-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,17 @@ async fn main() {
routing::post(client_query::handle_query),
)
.route(
"/:api_key/deployments/id/:deployment_id",
routing::post(client_query::handle_query),
"/deployments/id/:deployment_id/indexers/id/:indexer",
routing::post(client_query::handle_indexer_query),
)
.route(
"/subgraphs/id/:subgraph_id",
routing::post(client_query::handle_query),
)
.route(
"/:api_key/deployments/id/:deployment_id",
routing::post(client_query::handle_query),
)
.route(
"/:api_key/subgraphs/id/:subgraph_id",
routing::post(client_query::handle_query),
Expand Down

0 comments on commit 9d1992b

Please sign in to comment.