diff --git a/src/reports.rs b/src/reports.rs index 0a18c23b..a789db05 100644 --- a/src/reports.rs +++ b/src/reports.rs @@ -1,8 +1,10 @@ +use std::{collections::HashSet, time::Duration}; + use anyhow::{anyhow, Context}; use ordered_float::NotNan; use prost::Message; use thegraph_core::{alloy::primitives::Address, DeploymentId, IndexerId}; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, time::Instant}; use crate::{concat_bytes, errors, indexer_client::IndexerResponse, receipts::Receipt}; @@ -31,20 +33,43 @@ pub struct IndexerRequest { pub request: String, } +pub struct Topics { + pub queries: &'static str, + pub attestations: &'static str, +} + pub struct Reporter { - pub tap_signer: Address, - pub graph_env: String, - pub topics: Topics, - pub write_buf: Vec, - pub kafka_producer: rdkafka::producer::ThreadedProducer< + tap_signer: Address, + graph_env: String, + topics: Topics, + write_buf: Vec, + kafka_producer: rdkafka::producer::ThreadedProducer< rdkafka::producer::DefaultProducerContext, rdkafka::producer::NoCustomPartitioner, >, + attestation_sampler: AttestationSampler, } -pub struct Topics { - pub queries: &'static str, - pub attestations: &'static str, +struct AttestationSampler { + records: HashSet<(DeploymentId, Address)>, + last_eviction: Instant, +} + +impl AttestationSampler { + fn new() -> Self { + Self { + records: Default::default(), + last_eviction: Instant::now(), + } + } + + fn should_sample(&mut self, now: Instant, deployment: DeploymentId, indexer: Address) -> bool { + if now.duration_since(self.last_eviction) > Duration::from_secs(10) { + self.records.clear(); + self.last_eviction = now; + } + self.records.insert((deployment, indexer)) + } } impl Reporter { @@ -64,6 +89,7 @@ impl Reporter { topics, write_buf: Default::default(), kafka_producer, + attestation_sampler: AttestationSampler::new(), }; let (tx, mut rx) = mpsc::unbounded_channel(); @@ -143,13 +169,21 @@ impl Reporter { .context(anyhow!("failed to send to topic {}", self.topics.queries))?; self.write_buf.clear(); + let now = Instant::now(); for indexer_request in client_request.indexer_requests { + if !self.attestation_sampler.should_sample( + now, + indexer_request.deployment, + *indexer_request.indexer, + ) { + continue; + } if let Some((original_response, attestation)) = indexer_request .result .ok() .and_then(|r| Some((r.original_response, r.attestation?))) { - const MAX_PAYLOAD_BYTES: usize = 10_000; + const MAX_PAYLOAD_BYTES: usize = 100_000; AttestationProtobuf { request: Some(indexer_request.request).filter(|r| r.len() <= MAX_PAYLOAD_BYTES), response: Some(original_response).filter(|r| r.len() <= MAX_PAYLOAD_BYTES),