From b9b77021b669d11fc032813281811960b5946ce4 Mon Sep 17 00:00:00 2001 From: Himanshu Goyal Date: Wed, 10 Apr 2024 13:21:20 +0300 Subject: [PATCH] make receiver inactive --- src/events_source.rs | 9 +++++---- src/test.rs | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/events_source.rs b/src/events_source.rs index d2b4cca..7d60658 100644 --- a/src/events_source.rs +++ b/src/events_source.rs @@ -1,4 +1,4 @@ -use async_broadcast::{broadcast, Receiver as BroadcastReceiver, Sender as BroadcastSender}; +use async_broadcast::{broadcast, InactiveReceiver, Sender as BroadcastSender}; use async_trait::async_trait; use futures::future::BoxFuture; use futures::stream::{self, BoxStream, Stream, StreamExt}; @@ -123,7 +123,7 @@ where #[derive(Debug)] pub struct EventsStreamer { // required for api subscription - to_subscribe_clone_recv: BroadcastReceiver>>, + inactive_to_subscribe_clone_recv: InactiveReceiver>>, subscriber_send_channel: BroadcastSender>>, // required for sending startup info @@ -165,7 +165,7 @@ impl EventsSource for EventsStreamer { type EventStream = BoxStream<'static, Arc>>; async fn get_event_stream(&self) -> Self::EventStream { - let recv_channel = self.to_subscribe_clone_recv.clone(); + let recv_channel = self.inactive_to_subscribe_clone_recv.activate_cloned(); let starup_event_initialized = false; let startup_event = self.get_startup_event().clone(); stream::unfold( @@ -196,9 +196,10 @@ impl EventsStreamer { let (mut subscriber_send_channel, to_subscribe_clone_recv) = broadcast::>>(RETAINED_EVENTS_COUNT); subscriber_send_channel.set_overflow(true); + let inactive_to_subscribe_clone_recv = to_subscribe_clone_recv.deactivate(); EventsStreamer { subscriber_send_channel, - to_subscribe_clone_recv, + inactive_to_subscribe_clone_recv, known_nodes_with_stake, non_staked_node_count, } diff --git a/src/test.rs b/src/test.rs index 8556c22..51f5f28 100644 --- a/src/test.rs +++ b/src/test.rs @@ -70,7 +70,7 @@ mod tests { tracing::info!("Client 1 Connected to server"); - // client subscrive to hotshot events + // client 1 subscribe to hotshot events let mut events_1 = client_1 .socket("events") .subscribe::>() @@ -89,7 +89,7 @@ mod tests { tracing::info!("Client 2 Connected to server"); - // client 1 subscrive to hotshot events + // client 2 subscrive to hotshot events let mut events_2 = client_2 .socket("events") .subscribe::>()