Skip to content

Commit

Permalink
Make indexer attempts their own messages and topic (#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
goaaron authored Mar 17, 2022
1 parent 9124d90 commit 20d79dd
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 25 deletions.
30 changes: 8 additions & 22 deletions src/kafka_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ pub struct ClientQueryResult {
pub budget: String,
pub status: String,
pub status_code: u32,
pub indexer_attempts: Vec<IndexerAttempt>,
}

#[derive(Serialize)]
Expand All @@ -78,10 +77,11 @@ pub struct IndexerAttempt {
pub response_time_ms: u32,
pub status: String,
pub status_code: u32,
pub timestamp: u64,
}

impl ClientQueryResult {
pub fn new(query: &Query, result: Result<String, String>) -> Self {
pub fn new(query: &Query, result: Result<String, String>, timestamp: u64) -> Self {
let api_key = query.api_key.as_ref().map(|k| k.key.as_ref()).unwrap_or("");
let subgraph = query.subgraph.as_ref().unwrap();
let deployment = subgraph.deployment.to_string();
Expand All @@ -96,40 +96,26 @@ impl ClientQueryResult {
Ok(status) => (status, 0),
Err(status) => (status, sip24_hash(status) as u32 | 0x1),
};
let indexer_attempts = query
.indexer_attempts
.iter()
.map(|attempt| IndexerAttempt {
indexer: attempt.indexer.to_string(),
url: attempt.score.url.to_string(),
allocation: attempt.allocation.to_string(),
fee: attempt.score.fee.as_f64(),
utility: *attempt.score.utility,
blocks_behind: attempt.score.blocks_behind,
response_time_ms: attempt.duration.as_millis() as u32,
status: match &attempt.result {
Ok(response) => response.status.to_string(),
Err(err) => format!("{:?}", err),
},
status_code: attempt.status_code(),
})
.collect::<Vec<IndexerAttempt>>();

Self {
ray_id: query.ray_id.clone(),
query_id: query.id.to_string(),
timestamp: timestamp(),
timestamp,
api_key: api_key.to_string(),
deployment: deployment,
network: network.clone(),
response_time_ms,
budget,
status: status.clone(),
status_code,
indexer_attempts,
}
}
}

impl Msg for IndexerAttempt {
const TOPIC: &'static str = "gateway_indexer_attempts";
}

impl Msg for ClientQueryResult {
const TOPIC: &'static str = "gateway_client_query_results";
}
Expand Down
38 changes: 35 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
fisherman_client::*,
indexer_client::IndexerClient,
ipfs_client::*,
kafka_client::{ClientQueryResult, KafkaClient, KafkaInterface as _},
kafka_client::{ClientQueryResult, IndexerAttempt, KafkaClient, KafkaInterface as _},
manifest_client::*,
opt::*,
prelude::*,
Expand All @@ -38,7 +38,7 @@ use prometheus::{self, Encoder as _};
use reqwest;
use serde::Deserialize;
use serde_json::{json, value::RawValue};
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::SystemTime};
use structopt::StructOpt as _;
use url::Url;

Expand Down Expand Up @@ -537,8 +537,40 @@ pub fn graphql_error_response<S: ToString>(message: S) -> HttpResponse {
.body(json!({"errors": {"message": message.to_string()}}).to_string())
}

fn timestamp() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}

fn notify_query_result(kafka_client: &KafkaClient, query: &Query, result: Result<String, String>) {
kafka_client.send(&ClientQueryResult::new(&query, result.clone()));
let ts = timestamp();
kafka_client.send(&ClientQueryResult::new(&query, result.clone(), ts));

let indexer_attempts = query
.indexer_attempts
.iter()
.map(|attempt| IndexerAttempt {
indexer: attempt.indexer.to_string(),
url: attempt.score.url.to_string(),
allocation: attempt.allocation.to_string(),
fee: attempt.score.fee.as_f64(),
utility: *attempt.score.utility,
blocks_behind: attempt.score.blocks_behind,
response_time_ms: attempt.duration.as_millis() as u32,
status: match &attempt.result {
Ok(response) => response.status.to_string(),
Err(err) => format!("{:?}", err),
},
status_code: attempt.status_code(),
timestamp: ts,
})
.collect::<Vec<IndexerAttempt>>();

for attempt in indexer_attempts {
kafka_client.send(&attempt);
}

let (status, status_code) = match &result {
Ok(status) => (status, 0),
Expand Down

0 comments on commit 20d79dd

Please sign in to comment.