Skip to content

Commit

Permalink
More wax should fix the problem, right?
Browse files Browse the repository at this point in the history
  • Loading branch information
QuentinI committed Oct 31, 2024
1 parent efdf2cc commit 81840cf
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 20 deletions.
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ vbs = "0.1"
vec1 = "1.12"

[workspace.package]
version = "0.1.51"
version = "0.1.52"
edition = "2021"

[workspace.lints.rust]
Expand Down
4 changes: 4 additions & 0 deletions crates/shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,9 @@ url = { workspace = true }
vbs = { workspace = true }
vec1 = { workspace = true }

[dev-dependencies]
portpicker = "0.1.1"
tide-disco = { workspace = true }

[lints]
workspace = true
189 changes: 173 additions & 16 deletions crates/shared/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use std::{
time::{Duration, Instant},
};

use async_compatibility_layer::art::async_sleep;
use either::Either::{self, Left, Right};
use futures::{FutureExt, Stream, StreamExt};
use futures::{ready, FutureExt, Stream, StreamExt};
use hotshot::types::Event;
use hotshot_events_service::events::Error as EventStreamError;
use hotshot_types::traits::node_implementation::NodeType;
use surf_disco::client::HealthStatus;
use surf_disco::Client;
use tracing::error;
use tracing::{error, warn};
use url::Url;
use vbs::version::StaticVersionType;

Expand Down Expand Up @@ -116,8 +118,11 @@ impl<Types: NodeType, ApiVer: StaticVersionType> EventServiceStream<Types, ApiVe
> {
let client = Client::<hotshot_events_service::events::Error, ApiVer>::new(url.clone());

if !(client.connect(None).await) {
anyhow::bail!("Couldn't connect to API url");
loop {
if client.healthcheck::<HealthStatus>().await.is_ok() {
break;
}
async_sleep(Duration::from_secs(1)).await;
}

tracing::info!("Builder client connected to the hotshot events api");
Expand Down Expand Up @@ -148,37 +153,189 @@ impl<Types: NodeType, V: StaticVersionType + 'static> Stream for EventServiceStr
) -> Poll<Option<Self::Item>> {
match &mut self.connection {
Left(connection) => {
let Poll::Ready(next) = connection.poll_next_unpin(cx) else {
return Poll::Pending;
};
let next = ready!(connection.poll_next_unpin(cx));
match next {
Some(Ok(event)) => Poll::Ready(Some(event)),
Some(Err(err)) => {
error!("Error in the event stream: {err:?}");
warn!(?err, "Error in event stream");
Poll::Pending
}
None => {
warn!("Event stream ended, attempting reconnection");
let fut = Self::connect_inner(self.api_url.clone());
let _ = std::mem::replace(&mut self.connection, Right(Box::pin(fut)));
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
Right(reconnect_future) => {
let Poll::Ready(ready) = reconnect_future.poll_unpin(cx) else {
return Poll::Pending;
};
match ready {
let next = ready!(reconnect_future.poll_unpin(cx));
match next {
Err(err) => {
error!(?err, "Error while reconnecting, giving up");
Poll::Ready(None)
}
Ok(connection) => {
let _ = std::mem::replace(&mut self.connection, Left(connection));
cx.waker().wake_by_ref();
Poll::Pending
}
Err(err) => {
error!("Failed to reconnect to the event service: {err:?}");
Poll::Ready(None)
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};

use async_compatibility_layer::art::async_spawn;
use async_std::prelude::FutureExt;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use async_trait::async_trait;
use futures::{future::BoxFuture, stream, StreamExt};
use hotshot::types::{Event, EventType};
use hotshot_events_service::{
events::define_api,
events_source::{EventFilterSet, EventsSource, StartupInfo},
};
use hotshot_example_types::node_types::TestTypes;
use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime};
use tide_disco::{method::ReadState, App};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use url::Url;
use vbs::version::StaticVersion;

use crate::utils::EventServiceStream;

type MockVersion = StaticVersion<0, 1>;

struct MockEventsSource {
counter: AtomicU64,
}

#[async_trait]
impl EventsSource<TestTypes> for MockEventsSource {
type EventStream = futures::stream::Iter<std::vec::IntoIter<Arc<Event<TestTypes>>>>;

async fn get_event_stream(
&self,
_filter: Option<EventFilterSet<TestTypes>>,
) -> Self::EventStream {
let view = ViewNumber::new(self.counter.load(Ordering::SeqCst));
let test_event = Arc::new(Event {
view_number: view,
event: EventType::ViewFinished { view_number: view },
});
self.counter.fetch_add(1, Ordering::SeqCst);
stream::iter(vec![test_event])
}

async fn get_startup_info(&self) -> StartupInfo<TestTypes> {
StartupInfo {
known_node_with_stake: Vec::new(),
non_staked_node_count: 0,
}
}
}

#[async_trait]
impl ReadState for MockEventsSource {
type State = Self;

async fn read<T>(
&self,
op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
) -> T {
op(self).await
}
}

fn run_app(path: &'static str, bind_url: Url) -> JoinHandle<()> {
let source = MockEventsSource {
counter: AtomicU64::new(0),
};
let api = define_api::<MockEventsSource, _, MockVersion>(&Default::default()).unwrap();

let mut app: App<MockEventsSource, hotshot_events_service::events::Error> =
App::with_state(source);

app.register_module(path, api).unwrap();

async_spawn(async move { app.serve(bind_url, MockVersion {}).await.unwrap() })
}

#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn event_stream_wrapper() {
async_compatibility_layer::logging::setup_logging();
async_compatibility_layer::logging::setup_backtrace();

const TIMEOUT: Duration = Duration::from_secs(3);

let url: Url = format!(
"http://localhost:{}",
portpicker::pick_unused_port().unwrap()
)
.parse()
.unwrap();

let app_handle = run_app("hotshot-events", url.clone());

let mut stream = EventServiceStream::<TestTypes, MockVersion>::connect(url.clone())
.await
.unwrap();

stream
.next()
.timeout(TIMEOUT)
.await
.expect("When mock event server is spawned, stream should work");

#[cfg(async_executor_impl = "tokio")]
app_handle.abort();
#[cfg(async_executor_impl = "async-std")]
app_handle.cancel().await;

stream
.next()
.timeout(TIMEOUT)
.await
.expect_err("When mock event server is killed, stream should be in reconnecting state and never return");

let app_handle = run_app("hotshot-events", url.clone());

stream
.next()
.timeout(TIMEOUT)
.await
.expect("When mock event server is restarted, stream should work again");

#[cfg(async_executor_impl = "tokio")]
app_handle.abort();
#[cfg(async_executor_impl = "async-std")]
app_handle.cancel().await;

run_app("wrong-path", url.clone());

assert!(
stream
.next()
.timeout(TIMEOUT)
.await
.expect("API is reachable")
.is_none(),
"Stream should've ended, because while url is reachable, it doesn't conform to expeted API"
)
}
}

0 comments on commit 81840cf

Please sign in to comment.