diff --git a/Cargo.lock b/Cargo.lock index 9572817c..fc83794c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3071,7 +3071,7 @@ dependencies = [ [[package]] name = "hotshot-builder-core" -version = "0.1.51" +version = "0.1.52" dependencies = [ "anyhow", "async-broadcast", @@ -4720,7 +4720,7 @@ dependencies = [ [[package]] name = "marketplace-builder-core" -version = "0.1.51" +version = "0.1.52" dependencies = [ "anyhow", "async-broadcast", @@ -4762,7 +4762,7 @@ dependencies = [ [[package]] name = "marketplace-builder-shared" -version = "0.1.51" +version = "0.1.52" dependencies = [ "anyhow", "async-broadcast", @@ -4782,9 +4782,11 @@ dependencies = [ "hotshot-task-impls", "hotshot-testing", "hotshot-types", + "portpicker", "rand 0.8.5", "serde", "surf-disco", + "tide-disco", "tokio", "tracing", "url", diff --git a/Cargo.toml b/Cargo.toml index 4aa5c12d..892866a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,7 @@ vbs = "0.1" vec1 = "1.12" [workspace.package] -version = "0.1.51" +version = "0.1.52" edition = "2021" [workspace.lints.rust] diff --git a/crates/shared/Cargo.toml b/crates/shared/Cargo.toml index 5a6afeaf..a2a9b3a0 100644 --- a/crates/shared/Cargo.toml +++ b/crates/shared/Cargo.toml @@ -32,5 +32,9 @@ url = { workspace = true } vbs = { workspace = true } vec1 = { workspace = true } +[dev-dependencies] +portpicker = "0.1.1" +tide-disco = { workspace = true } + [lints] workspace = true diff --git a/crates/shared/src/utils.rs b/crates/shared/src/utils.rs index 8d4f1f61..9dd2972b 100644 --- a/crates/shared/src/utils.rs +++ b/crates/shared/src/utils.rs @@ -1,4 +1,4 @@ -use std::{future::Future, pin::Pin, task::Poll}; +use std::{future::Future, pin::Pin}; use std::{ collections::HashSet, @@ -7,13 +7,18 @@ use std::{ time::{Duration, Instant}, }; +use anyhow::Context; +use async_compatibility_layer::art::async_sleep; +use async_std::prelude::FutureExt; use either::Either::{self, Left, Right}; -use futures::{FutureExt, Stream, StreamExt}; +use futures::stream::unfold; +use futures::{Stream, StreamExt}; use hotshot::types::Event; use hotshot_events_service::events::Error as EventStreamError; use hotshot_types::traits::node_implementation::NodeType; +use surf_disco::client::HealthStatus; use surf_disco::Client; -use tracing::error; +use tracing::{error, warn}; use url::Url; use vbs::version::StaticVersionType; @@ -103,7 +108,10 @@ pub struct EventServiceStream { connection: Either, EventServiceReconnect>, } -impl EventServiceStream { +impl EventServiceStream { + const RETRY_PERIOD: Duration = Duration::from_secs(1); + const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60); + async fn connect_inner( url: Url, ) -> anyhow::Result< @@ -116,11 +124,22 @@ impl EventServiceStream { let client = Client::::new(url.clone()); - if !(client.connect(None).await) { - anyhow::bail!("Couldn't connect to API url"); + async { + loop { + match client.healthcheck::().await { + Ok(_) => break, + Err(err) => { + tracing::debug!(?err, "Healthcheck failed, retrying"); + } + } + async_sleep(Self::RETRY_PERIOD).await; + } } + .timeout(Self::CONNECTION_TIMEOUT) + .await + .context("Couldn't connect to hotshot events API")?; - tracing::info!("Builder client connected to the hotshot events api"); + tracing::info!("Builder client connected to the hotshot events API"); Ok(client .socket("hotshot-events/events") @@ -129,56 +148,200 @@ impl EventServiceStream anyhow::Result { + pub async fn connect(api_url: Url) -> anyhow::Result> + Unpin> { let connection = Self::connect_inner(api_url.clone()).await?; - Ok(Self { + let this = Self { api_url, connection: Left(connection), - }) - } -} + }; -impl Stream for EventServiceStream { - type Item = Event; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - match &mut self.connection { - Left(connection) => { - let Poll::Ready(next) = connection.poll_next_unpin(cx) else { - return Poll::Pending; - }; - match next { - Some(Ok(event)) => Poll::Ready(Some(event)), - Some(Err(err)) => { - error!("Error in the event stream: {err:?}"); - Poll::Pending - } - None => { - let fut = Self::connect_inner(self.api_url.clone()); - let _ = std::mem::replace(&mut self.connection, Right(Box::pin(fut))); - Poll::Pending - } + let stream = unfold(this, |mut this| async move { + loop { + match &mut this.connection { + Left(connection) => match connection.next().await { + Some(Ok(event)) => { + return Some((event, this)); + } + Some(Err(err)) => { + warn!(?err, "Error in event stream"); + continue; + } + None => { + warn!("Event stream ended, attempting reconnection"); + let fut = Self::connect_inner(this.api_url.clone()); + let _ = std::mem::replace(&mut this.connection, Right(Box::pin(fut))); + continue; + } + }, + Right(reconnection) => match reconnection.await { + Ok(connection) => { + let _ = std::mem::replace(&mut this.connection, Left(connection)); + continue; + } + Err(err) => { + error!(?err, "Error while reconnecting, will retry in a while"); + async_sleep(Self::RETRY_PERIOD).await; + let fut = Self::connect_inner(this.api_url.clone()); + let _ = std::mem::replace(&mut this.connection, Right(Box::pin(fut))); + continue; + } + }, } } - Right(reconnect_future) => { - let Poll::Ready(ready) = reconnect_future.poll_unpin(cx) else { - return Poll::Pending; - }; - match ready { - Ok(connection) => { - let _ = std::mem::replace(&mut self.connection, Left(connection)); - Poll::Pending - } - Err(err) => { - error!("Failed to reconnect to the event service: {err:?}"); - Poll::Ready(None) - } - } + }); + + Ok(Box::pin(stream)) + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, + }; + + use async_compatibility_layer::art::async_spawn; + use async_std::prelude::FutureExt; + #[cfg(async_executor_impl = "async-std")] + use async_std::task::JoinHandle; + use async_trait::async_trait; + use futures::{future::BoxFuture, stream, StreamExt}; + use hotshot::types::{Event, EventType}; + use hotshot_events_service::{ + events::define_api, + events_source::{EventFilterSet, EventsSource, StartupInfo}, + }; + use hotshot_example_types::node_types::TestTypes; + use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime}; + use tide_disco::{method::ReadState, App}; + #[cfg(async_executor_impl = "tokio")] + use tokio::task::JoinHandle; + use url::Url; + use vbs::version::StaticVersion; + + use crate::utils::EventServiceStream; + + type MockVersion = StaticVersion<0, 1>; + + struct MockEventsSource { + counter: AtomicU64, + } + + #[async_trait] + impl EventsSource for MockEventsSource { + type EventStream = futures::stream::Iter>>>; + + async fn get_event_stream( + &self, + _filter: Option>, + ) -> Self::EventStream { + let view = ViewNumber::new(self.counter.load(Ordering::SeqCst)); + let test_event = Arc::new(Event { + view_number: view, + event: EventType::ViewFinished { view_number: view }, + }); + self.counter.fetch_add(1, Ordering::SeqCst); + stream::iter(vec![test_event]) + } + + async fn get_startup_info(&self) -> StartupInfo { + StartupInfo { + known_node_with_stake: Vec::new(), + non_staked_node_count: 0, } } } + + #[async_trait] + impl ReadState for MockEventsSource { + type State = Self; + + async fn read( + &self, + op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait, + ) -> T { + op(self).await + } + } + + fn run_app(path: &'static str, bind_url: Url) -> JoinHandle<()> { + let source = MockEventsSource { + counter: AtomicU64::new(0), + }; + let api = define_api::(&Default::default()).unwrap(); + + let mut app: App = + App::with_state(source); + + app.register_module(path, api).unwrap(); + + async_spawn(async move { app.serve(bind_url, MockVersion {}).await.unwrap() }) + } + + #[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] + async fn event_stream_wrapper() { + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + + const TIMEOUT: Duration = Duration::from_secs(3); + + let url: Url = format!( + "http://localhost:{}", + portpicker::pick_unused_port().unwrap() + ) + .parse() + .unwrap(); + + let app_handle = run_app("hotshot-events", url.clone()); + + let mut stream = EventServiceStream::::connect(url.clone()) + .await + .unwrap(); + + stream + .next() + .timeout(TIMEOUT) + .await + .expect("When mock event server is spawned, stream should work") + .unwrap(); + + #[cfg(async_executor_impl = "tokio")] + app_handle.abort(); + #[cfg(async_executor_impl = "async-std")] + app_handle.cancel().await; + + stream + .next() + .timeout(TIMEOUT) + .await + .expect_err("When mock event server is killed, stream should be in reconnecting state and never return"); + + let app_handle = run_app("hotshot-events", url.clone()); + + stream + .next() + .timeout(TIMEOUT) + .await + .expect("When mock event server is restarted, stream should work again") + .unwrap(); + + #[cfg(async_executor_impl = "tokio")] + app_handle.abort(); + #[cfg(async_executor_impl = "async-std")] + app_handle.cancel().await; + + run_app("wrong-path", url.clone()); + + stream + .next() + .timeout(TIMEOUT) + .await + .expect_err("API is reachable, but is on wrong path"); + } }