diff --git a/Cargo.toml b/Cargo.toml index 86efc16..80c3714 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,20 +2,19 @@ name = "hotshot-events-service" version = "0.1.3" edition = "2021" - # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-broadcast = "0.7" async-compatibility-layer = { version = "1.1", default-features = false, features = [ "logging-utils", ] } -async-broadcast = "0.7" -async-std = { version = "1", features = ["attributes"] } async-lock = "2.8" +async-std = { version = "1", features = ["attributes"] } async-trait = "0.1" clap = { version = "4.4", features = ["derive", "env"] } -derive_more = "0.99" derivative = "2.2" +derive_more = "0.99" either = { version = "1.10", features = ["serde"] } futures = "0.3" hotshot-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.36" } diff --git a/src/api.rs b/src/api.rs index 0527abc..5f447fb 100644 --- a/src/api.rs +++ b/src/api.rs @@ -16,7 +16,7 @@ use tide_disco::api::{Api, ApiError}; use toml::{map::Entry, Value}; use vbs::version::StaticVersionType; -pub(crate) fn load_api( +pub(crate) fn load_api( path: Option>, default: &str, extensions: impl IntoIterator, diff --git a/src/events_source.rs b/src/events_source.rs index 7d60658..1ebf6e8 100644 --- a/src/events_source.rs +++ b/src/events_source.rs @@ -166,21 +166,21 @@ impl EventsSource for EventsStreamer { async fn get_event_stream(&self) -> Self::EventStream { let recv_channel = self.inactive_to_subscribe_clone_recv.activate_cloned(); - let starup_event_initialized = false; + let startup_event_initialized = false; let startup_event = self.get_startup_event().clone(); stream::unfold( - (recv_channel, startup_event, starup_event_initialized), - |(mut recv_channel, startup_event, mut starup_event_initialized)| async move { - let event_res = if starup_event_initialized { + (recv_channel, startup_event, startup_event_initialized), + |(mut recv_channel, startup_event, mut startup_event_initialized)| async move { + let event_res = if startup_event_initialized { recv_channel.recv().await.ok() } else { - starup_event_initialized = true; + startup_event_initialized = true; Some(Arc::new(startup_event.clone())) }; event_res.map(|event| { ( event, - (recv_channel, startup_event, starup_event_initialized), + (recv_channel, startup_event, startup_event_initialized), ) }) }, @@ -195,7 +195,10 @@ impl EventsStreamer { ) -> Self { let (mut subscriber_send_channel, to_subscribe_clone_recv) = broadcast::>>(RETAINED_EVENTS_COUNT); + // set the overflow to true to drop older messages from the channel subscriber_send_channel.set_overflow(true); + // set the await active to false to not block the sender + subscriber_send_channel.set_await_active(false); let inactive_to_subscribe_clone_recv = to_subscribe_clone_recv.deactivate(); EventsStreamer { subscriber_send_channel, diff --git a/src/test.rs b/src/test.rs index 1840d9d..8f6b457 100644 --- a/src/test.rs +++ b/src/test.rs @@ -30,9 +30,59 @@ mod tests { } } + #[async_std::test] + async fn test_no_active_receiver() { + tracing::info!("Starting test_no_active_receiver"); + setup_logging(); + setup_backtrace(); + let port = portpicker::pick_unused_port().expect("Could not find an open port"); + let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap(); + + let known_nodes_with_stake = vec![]; + let non_staked_node_count = 0; + let events_streamer = Arc::new(RwLock::new(EventsStreamer::new( + known_nodes_with_stake, + non_staked_node_count, + ))); + + // Start the web server. + let mut app = App::<_, Error>::with_state(events_streamer.clone()); + + let hotshot_events_api = + define_api::>>, TestTypes, Version01>( + &Options::default(), + ) + .expect("Failed to define hotshot eventsAPI"); + + app.register_module("hotshot_events", hotshot_events_api) + .expect("Failed to register hotshot events API"); + + async_spawn(app.serve(api_url, STATIC_VER_0_1)); + let total_count = 5; + let send_handle = async_spawn(async move { + let mut send_count = 0; + loop { + let tx_event = generate_event(send_count); + tracing::debug!("Before writing to events_source"); + events_streamer + .write() + .await + .handle_event(tx_event.clone()) + .await; + send_count += 1; + tracing::debug!("After writing to events_source"); + if send_count >= total_count { + break; + } + } + }); + + send_handle.await; + } + #[async_std::test] async fn test_event_stream() { - tracing::info!("Starting hotshot test_event_stream"); + tracing::info!("Starting test_event_stream"); setup_logging(); setup_backtrace(); @@ -107,11 +157,11 @@ mod tests { tracing::info!("Received event in Client 1: {:?}", event); receive_count += 1; if receive_count > total_count { - tracing::info!("Clien1 Received all sent events, exiting loop"); + tracing::info!("Client1 Received all sent events, exiting loop"); break; } } - // Offest 1 is due to the startup event info + // Offset 1 is due to the startup event info assert_eq!(receive_count, total_count + 1); }); @@ -127,7 +177,7 @@ mod tests { break; } } - // Offest 1 is due to the startup event info + // Offset 1 is due to the startup event info assert_eq!(receive_count, total_count + 1); });