Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Dec 12, 2024
1 parent 7543ecf commit 026c4b9
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub enum ExchangeRateProvider {
/// Kafka configuration.
///
/// See [`Config`]'s [`kafka`](struct.Config.html#structfield.kafka).
#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
pub struct KafkaConfig(BTreeMap<String, String>);

impl Default for KafkaConfig {
Expand Down
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ async fn main() {
}
None => Default::default(),
};
let indexer_blocklist = indexer_blocklist::Blocklist::spawn(conf.blocklist);
let indexer_blocklist =
indexer_blocklist::Blocklist::spawn(conf.blocklist, conf.kafka.clone().into());
let mut network = network::service::spawn(
http_client.clone(),
network_subgraph_client,
Expand Down
17 changes: 12 additions & 5 deletions src/network/indexer_blocklist.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::{HashMap, HashSet};

use futures::StreamExt;
use thegraph_core::{alloy::primitives::Address, DeploymentId, ProofOfIndexing};
use tokio::sync::watch;

Expand All @@ -13,11 +14,13 @@ pub struct Blocklist {
}

impl Blocklist {
pub fn spawn(init: Vec<BlocklistEntry>) -> Self {
pub fn spawn(init: Vec<BlocklistEntry>, kafka_config: rdkafka::ClientConfig) -> Self {
let (blocklist_tx, blocklist_rx) = watch::channel(Default::default());
let (poi_tx, poi_rx) = watch::channel(Default::default());
let (indexer_tx, indexer_rx) = watch::channel(Default::default());
let consumer = kafka_config.create().unwrap();
let mut actor = Actor {
consumer,
blocklist: blocklist_tx,
poi: poi_tx,
indexer: indexer_tx,
Expand All @@ -37,14 +40,18 @@ impl Blocklist {
}

struct Actor {
pub blocklist: watch::Sender<Vec<BlocklistEntry>>,
pub poi: watch::Sender<HashMap<DeploymentId, Vec<(u64, ProofOfIndexing)>>>,
pub indexer: watch::Sender<HashMap<Address, HashSet<DeploymentId>>>,
consumer: rdkafka::consumer::StreamConsumer,
blocklist: watch::Sender<Vec<BlocklistEntry>>,
poi: watch::Sender<HashMap<DeploymentId, Vec<(u64, ProofOfIndexing)>>>,
indexer: watch::Sender<HashMap<Address, HashSet<DeploymentId>>>,
}

impl Actor {
async fn run(&mut self) {
todo!();
let mut stream = self.consumer.stream();
while let Some(msg) = stream.next().await {
todo!();
}
}

fn add_entry(&mut self, entry: BlocklistEntry) {
Expand Down

0 comments on commit 026c4b9

Please sign in to comment.