Skip to content

Commit

Permalink
Fix EventServiceStream (#226)
Browse files Browse the repository at this point in the history
  • Loading branch information
QuentinI committed Nov 1, 2024
1 parent 2922d96 commit 7bdbd64
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 53 deletions.
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ vbs = "0.1"
vec1 = "1.12"

[workspace.package]
version = "0.1.51"
version = "0.1.52"
edition = "2021"

[workspace.lints.rust]
Expand Down
4 changes: 4 additions & 0 deletions crates/shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,9 @@ url = { workspace = true }
vbs = { workspace = true }
vec1 = { workspace = true }

[dev-dependencies]
portpicker = "0.1.1"
tide-disco = { workspace = true }

[lints]
workspace = true
261 changes: 212 additions & 49 deletions crates/shared/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, pin::Pin, task::Poll};
use std::{future::Future, pin::Pin};

use std::{
collections::HashSet,
Expand All @@ -7,13 +7,18 @@ use std::{
time::{Duration, Instant},
};

use anyhow::Context;
use async_compatibility_layer::art::async_sleep;
use async_std::prelude::FutureExt;
use either::Either::{self, Left, Right};
use futures::{FutureExt, Stream, StreamExt};
use futures::stream::unfold;
use futures::{Stream, StreamExt};
use hotshot::types::Event;
use hotshot_events_service::events::Error as EventStreamError;
use hotshot_types::traits::node_implementation::NodeType;
use surf_disco::client::HealthStatus;
use surf_disco::Client;
use tracing::error;
use tracing::{error, warn};
use url::Url;
use vbs::version::StaticVersionType;

Expand Down Expand Up @@ -103,7 +108,10 @@ pub struct EventServiceStream<Types: NodeType, V: StaticVersionType> {
connection: Either<EventServiceConnection<Types, V>, EventServiceReconnect<Types, V>>,
}

impl<Types: NodeType, ApiVer: StaticVersionType> EventServiceStream<Types, ApiVer> {
impl<Types: NodeType, ApiVer: StaticVersionType + 'static> EventServiceStream<Types, ApiVer> {
const RETRY_PERIOD: Duration = Duration::from_secs(1);
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);

async fn connect_inner(
url: Url,
) -> anyhow::Result<
Expand All @@ -116,11 +124,22 @@ impl<Types: NodeType, ApiVer: StaticVersionType> EventServiceStream<Types, ApiVe
> {
let client = Client::<hotshot_events_service::events::Error, ApiVer>::new(url.clone());

if !(client.connect(None).await) {
anyhow::bail!("Couldn't connect to API url");
async {
loop {
match client.healthcheck::<HealthStatus>().await {
Ok(_) => break,
Err(err) => {
tracing::debug!(?err, "Healthcheck failed, retrying");
}
}
async_sleep(Self::RETRY_PERIOD).await;
}
}
.timeout(Self::CONNECTION_TIMEOUT)
.await
.context("Couldn't connect to hotshot events API")?;

tracing::info!("Builder client connected to the hotshot events api");
tracing::info!("Builder client connected to the hotshot events API");

Ok(client
.socket("hotshot-events/events")
Expand All @@ -129,56 +148,200 @@ impl<Types: NodeType, ApiVer: StaticVersionType> EventServiceStream<Types, ApiVe
}

/// Establish initial connection to the events service at `api_url`
pub async fn connect(api_url: Url) -> anyhow::Result<Self> {
pub async fn connect(api_url: Url) -> anyhow::Result<impl Stream<Item = Event<Types>> + Unpin> {
let connection = Self::connect_inner(api_url.clone()).await?;

Ok(Self {
let this = Self {
api_url,
connection: Left(connection),
})
}
}
};

impl<Types: NodeType, V: StaticVersionType + 'static> Stream for EventServiceStream<Types, V> {
type Item = Event<Types>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match &mut self.connection {
Left(connection) => {
let Poll::Ready(next) = connection.poll_next_unpin(cx) else {
return Poll::Pending;
};
match next {
Some(Ok(event)) => Poll::Ready(Some(event)),
Some(Err(err)) => {
error!("Error in the event stream: {err:?}");
Poll::Pending
}
None => {
let fut = Self::connect_inner(self.api_url.clone());
let _ = std::mem::replace(&mut self.connection, Right(Box::pin(fut)));
Poll::Pending
}
let stream = unfold(this, |mut this| async move {
loop {
match &mut this.connection {
Left(connection) => match connection.next().await {
Some(Ok(event)) => {
return Some((event, this));
}
Some(Err(err)) => {
warn!(?err, "Error in event stream");
continue;
}
None => {
warn!("Event stream ended, attempting reconnection");
let fut = Self::connect_inner(this.api_url.clone());
let _ = std::mem::replace(&mut this.connection, Right(Box::pin(fut)));
continue;
}
},
Right(reconnection) => match reconnection.await {
Ok(connection) => {
let _ = std::mem::replace(&mut this.connection, Left(connection));
continue;
}
Err(err) => {
error!(?err, "Error while reconnecting, will retry in a while");
async_sleep(Self::RETRY_PERIOD).await;
let fut = Self::connect_inner(this.api_url.clone());
let _ = std::mem::replace(&mut this.connection, Right(Box::pin(fut)));
continue;
}
},
}
}
Right(reconnect_future) => {
let Poll::Ready(ready) = reconnect_future.poll_unpin(cx) else {
return Poll::Pending;
};
match ready {
Ok(connection) => {
let _ = std::mem::replace(&mut self.connection, Left(connection));
Poll::Pending
}
Err(err) => {
error!("Failed to reconnect to the event service: {err:?}");
Poll::Ready(None)
}
}
});

Ok(Box::pin(stream))
}
}

#[cfg(test)]
mod tests {
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};

use async_compatibility_layer::art::async_spawn;
use async_std::prelude::FutureExt;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use async_trait::async_trait;
use futures::{future::BoxFuture, stream, StreamExt};
use hotshot::types::{Event, EventType};
use hotshot_events_service::{
events::define_api,
events_source::{EventFilterSet, EventsSource, StartupInfo},
};
use hotshot_example_types::node_types::TestTypes;
use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime};
use tide_disco::{method::ReadState, App};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use url::Url;
use vbs::version::StaticVersion;

use crate::utils::EventServiceStream;

type MockVersion = StaticVersion<0, 1>;

struct MockEventsSource {
counter: AtomicU64,
}

#[async_trait]
impl EventsSource<TestTypes> for MockEventsSource {
type EventStream = futures::stream::Iter<std::vec::IntoIter<Arc<Event<TestTypes>>>>;

async fn get_event_stream(
&self,
_filter: Option<EventFilterSet<TestTypes>>,
) -> Self::EventStream {
let view = ViewNumber::new(self.counter.load(Ordering::SeqCst));
let test_event = Arc::new(Event {
view_number: view,
event: EventType::ViewFinished { view_number: view },
});
self.counter.fetch_add(1, Ordering::SeqCst);
stream::iter(vec![test_event])
}

async fn get_startup_info(&self) -> StartupInfo<TestTypes> {
StartupInfo {
known_node_with_stake: Vec::new(),
non_staked_node_count: 0,
}
}
}

#[async_trait]
impl ReadState for MockEventsSource {
type State = Self;

async fn read<T>(
&self,
op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
) -> T {
op(self).await
}
}

fn run_app(path: &'static str, bind_url: Url) -> JoinHandle<()> {
let source = MockEventsSource {
counter: AtomicU64::new(0),
};
let api = define_api::<MockEventsSource, _, MockVersion>(&Default::default()).unwrap();

let mut app: App<MockEventsSource, hotshot_events_service::events::Error> =
App::with_state(source);

app.register_module(path, api).unwrap();

async_spawn(async move { app.serve(bind_url, MockVersion {}).await.unwrap() })
}

#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn event_stream_wrapper() {
async_compatibility_layer::logging::setup_logging();
async_compatibility_layer::logging::setup_backtrace();

const TIMEOUT: Duration = Duration::from_secs(3);

let url: Url = format!(
"http://localhost:{}",
portpicker::pick_unused_port().unwrap()
)
.parse()
.unwrap();

let app_handle = run_app("hotshot-events", url.clone());

let mut stream = EventServiceStream::<TestTypes, MockVersion>::connect(url.clone())
.await
.unwrap();

stream
.next()
.timeout(TIMEOUT)
.await
.expect("When mock event server is spawned, stream should work")
.unwrap();

#[cfg(async_executor_impl = "tokio")]
app_handle.abort();
#[cfg(async_executor_impl = "async-std")]
app_handle.cancel().await;

stream
.next()
.timeout(TIMEOUT)
.await
.expect_err("When mock event server is killed, stream should be in reconnecting state and never return");

let app_handle = run_app("hotshot-events", url.clone());

stream
.next()
.timeout(TIMEOUT)
.await
.expect("When mock event server is restarted, stream should work again")
.unwrap();

#[cfg(async_executor_impl = "tokio")]
app_handle.abort();
#[cfg(async_executor_impl = "async-std")]
app_handle.cancel().await;

run_app("wrong-path", url.clone());

stream
.next()
.timeout(TIMEOUT)
.await
.expect_err("API is reachable, but is on wrong path");
}
}

0 comments on commit 7bdbd64

Please sign in to comment.