Skip to content

Commit

Permalink
feat: sample attestation reports (#1009)
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus authored Dec 11, 2024
1 parent a977e66 commit d7caa65
Showing 1 changed file with 44 additions and 10 deletions.
54 changes: 44 additions & 10 deletions src/reports.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::{collections::HashSet, time::Duration};

use anyhow::{anyhow, Context};
use ordered_float::NotNan;
use prost::Message;
use thegraph_core::{alloy::primitives::Address, DeploymentId, IndexerId};
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time::Instant};

use crate::{concat_bytes, errors, indexer_client::IndexerResponse, receipts::Receipt};

Expand Down Expand Up @@ -31,20 +33,43 @@ pub struct IndexerRequest {
pub request: String,
}

pub struct Topics {
pub queries: &'static str,
pub attestations: &'static str,
}

pub struct Reporter {
pub tap_signer: Address,
pub graph_env: String,
pub topics: Topics,
pub write_buf: Vec<u8>,
pub kafka_producer: rdkafka::producer::ThreadedProducer<
tap_signer: Address,
graph_env: String,
topics: Topics,
write_buf: Vec<u8>,
kafka_producer: rdkafka::producer::ThreadedProducer<
rdkafka::producer::DefaultProducerContext,
rdkafka::producer::NoCustomPartitioner,
>,
attestation_sampler: AttestationSampler,
}

pub struct Topics {
pub queries: &'static str,
pub attestations: &'static str,
struct AttestationSampler {
records: HashSet<(DeploymentId, Address)>,
last_eviction: Instant,
}

impl AttestationSampler {
fn new() -> Self {
Self {
records: Default::default(),
last_eviction: Instant::now(),
}
}

fn should_sample(&mut self, now: Instant, deployment: DeploymentId, indexer: Address) -> bool {
if now.duration_since(self.last_eviction) > Duration::from_secs(10) {
self.records.clear();
self.last_eviction = now;
}
self.records.insert((deployment, indexer))
}
}

impl Reporter {
Expand All @@ -64,6 +89,7 @@ impl Reporter {
topics,
write_buf: Default::default(),
kafka_producer,
attestation_sampler: AttestationSampler::new(),
};

let (tx, mut rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -143,13 +169,21 @@ impl Reporter {
.context(anyhow!("failed to send to topic {}", self.topics.queries))?;
self.write_buf.clear();

let now = Instant::now();
for indexer_request in client_request.indexer_requests {
if !self.attestation_sampler.should_sample(
now,
indexer_request.deployment,
*indexer_request.indexer,
) {
continue;
}
if let Some((original_response, attestation)) = indexer_request
.result
.ok()
.and_then(|r| Some((r.original_response, r.attestation?)))
{
const MAX_PAYLOAD_BYTES: usize = 10_000;
const MAX_PAYLOAD_BYTES: usize = 100_000;
AttestationProtobuf {
request: Some(indexer_request.request).filter(|r| r.len() <= MAX_PAYLOAD_BYTES),
response: Some(original_response).filter(|r| r.len() <= MAX_PAYLOAD_BYTES),
Expand Down

0 comments on commit d7caa65

Please sign in to comment.