Skip to content

Commit

Permalink
make receiver inactive
Browse files Browse the repository at this point in the history
  • Loading branch information
move47 committed Apr 10, 2024
1 parent ddb375d commit b9b7702
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
9 changes: 5 additions & 4 deletions src/events_source.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use async_broadcast::{broadcast, Receiver as BroadcastReceiver, Sender as BroadcastSender};
use async_broadcast::{broadcast, InactiveReceiver, Sender as BroadcastSender};
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::stream::{self, BoxStream, Stream, StreamExt};
Expand Down Expand Up @@ -123,7 +123,7 @@ where
#[derive(Debug)]
pub struct EventsStreamer<Types: NodeType> {
// required for api subscription
to_subscribe_clone_recv: BroadcastReceiver<Arc<BuilderEvent<Types>>>,
inactive_to_subscribe_clone_recv: InactiveReceiver<Arc<BuilderEvent<Types>>>,
subscriber_send_channel: BroadcastSender<Arc<BuilderEvent<Types>>>,

// required for sending startup info
Expand Down Expand Up @@ -165,7 +165,7 @@ impl<Types: NodeType> EventsSource<Types> for EventsStreamer<Types> {
type EventStream = BoxStream<'static, Arc<BuilderEvent<Types>>>;

async fn get_event_stream(&self) -> Self::EventStream {
let recv_channel = self.to_subscribe_clone_recv.clone();
let recv_channel = self.inactive_to_subscribe_clone_recv.activate_cloned();
let starup_event_initialized = false;
let startup_event = self.get_startup_event().clone();
stream::unfold(
Expand Down Expand Up @@ -196,9 +196,10 @@ impl<Types: NodeType> EventsStreamer<Types> {
let (mut subscriber_send_channel, to_subscribe_clone_recv) =
broadcast::<Arc<BuilderEvent<Types>>>(RETAINED_EVENTS_COUNT);
subscriber_send_channel.set_overflow(true);
let inactive_to_subscribe_clone_recv = to_subscribe_clone_recv.deactivate();
EventsStreamer {
subscriber_send_channel,
to_subscribe_clone_recv,
inactive_to_subscribe_clone_recv,
known_nodes_with_stake,
non_staked_node_count,
}
Expand Down
4 changes: 2 additions & 2 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ mod tests {

tracing::info!("Client 1 Connected to server");

// client subscrive to hotshot events
// client 1 subscribe to hotshot events
let mut events_1 = client_1
.socket("events")
.subscribe::<BuilderEvent<TestTypes>>()
Expand All @@ -89,7 +89,7 @@ mod tests {

tracing::info!("Client 2 Connected to server");

// client 1 subscrive to hotshot events
// client 2 subscrive to hotshot events
let mut events_2 = client_2
.socket("events")
.subscribe::<BuilderEvent<TestTypes>>()
Expand Down

0 comments on commit b9b7702

Please sign in to comment.