diff --git a/Cargo.lock b/Cargo.lock index 1d3ed638..b01c4d1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3147,6 +3147,7 @@ dependencies = [ name = "hotshot-builder-core" version = "0.1.18" dependencies = [ + "anyhow", "async-broadcast", "async-compatibility-layer", "async-lock 2.8.0", diff --git a/Cargo.toml b/Cargo.toml index 52dba5ea..cfe98331 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1" async-broadcast = "0.7" async-compatibility-layer = { version = "1.1", default-features = false, features = [ "logging-utils", diff --git a/src/service.rs b/src/service.rs index 70b48c1f..a5457f0d 100644 --- a/src/service.rs +++ b/src/service.rs @@ -8,6 +8,7 @@ use hotshot_builder_api::{ data_source::{AcceptsTxnSubmits, BuilderDataSource}, }; use hotshot_types::{ + constants::Version01, data::{DAProposal, Leaf, QuorumProposal}, event::EventType, message::Proposal, @@ -28,6 +29,7 @@ use crate::builder_state::{ }; use crate::builder_state::{MessageType, RequestMessage, ResponseMessage}; use crate::WaitAndKeep; +use anyhow::anyhow; use async_broadcast::Sender as BroadcastSender; pub use async_broadcast::{broadcast, RecvError, TryRecvError}; use async_compatibility_layer::{art::async_timeout, channel::unbounded}; @@ -48,7 +50,7 @@ use std::sync::Arc; use std::time::Duration; use std::{fmt::Display, time::Instant}; use tagged_base64::TaggedBase64; -use tide_disco::method::ReadState; +use tide_disco::{method::ReadState, Url}; // It holds all the necessary information for a block #[derive(Debug)] @@ -655,30 +657,34 @@ impl ReadState for ProxyGlobalState { } } -/* -Running Non-Permissioned Builder Service -*/ -pub async fn run_non_permissioned_standalone_builder_service( - // sending a transaction from the hotshot mempool to the builder states - tx_sender: BroadcastSender>, - - // sending a DA proposal from the hotshot to the builder states - da_sender: BroadcastSender>, - - // sending a QC proposal from the hotshot to the builder states - qc_sender: BroadcastSender>, - - // sending a Decide event from the hotshot to the builder states - decide_sender: BroadcastSender>, - - // connection to the events stream - mut subscribed_events: surf_disco::socket::Connection< +async fn connect_to_events_service( + hotshot_events_api_url: Url, +) -> Option<( + surf_disco::socket::Connection< BuilderEvent, surf_disco::socket::Unsupported, EventStreamError, vbs::version::StaticVersion<0, 1>, >, -) { + GeneralStaticCommittee::SignatureKey>, +)> { + let client = surf_disco::Client::::new( + hotshot_events_api_url.clone(), + ); + + if !(client.connect(None).await) { + return None; + } + + tracing::info!("Builder client connected to the hotshot events api"); + + // client subscrive to hotshot events + let mut subscribed_events = client + .socket("hotshot-events/events") + .subscribe::>() + .await + .ok()?; + // handle the startup event at the start let membership = if let Some(Ok(event)) = subscribed_events.next().await { match event.event { @@ -700,16 +706,52 @@ pub async fn run_non_permissioned_standalone_builder_service( known_node_with_stake, non_staked_node_count ); - membership + Some(membership) } _ => { tracing::error!("Startup info event not received as first event"); - return; + None } } } else { - return; + None }; + membership.map(|membership| (subscribed_events, membership)) +} +/* +Running Non-Permissioned Builder Service +*/ +pub async fn run_non_permissioned_standalone_builder_service( + // sending a transaction from the hotshot mempool to the builder states + tx_sender: BroadcastSender>, + + // sending a DA proposal from the hotshot to the builder states + da_sender: BroadcastSender>, + + // sending a QC proposal from the hotshot to the builder states + qc_sender: BroadcastSender>, + + // sending a Decide event from the hotshot to the builder states + decide_sender: BroadcastSender>, + + // Url to (re)connect to for the events stream + hotshot_events_api_url: Url, +) -> Result<(), anyhow::Error> { + // connection to the events stream + // mut subscribed_events: surf_disco::socket::Connection< + // BuilderEvent, + // surf_disco::socket::Unsupported, + // EventStreamError, + // vbs::version::StaticVersion<0, 1>, + // >, + + let connected = connect_to_events_service(hotshot_events_api_url.clone()).await; + if connected.is_none() { + return Err(anyhow!( + "failed to connect to API at {hotshot_events_api_url}" + )); + } + let (mut subscribed_events, mut membership) = connected.unwrap(); loop { let event = subscribed_events.next().await; @@ -768,6 +810,13 @@ pub async fn run_non_permissioned_standalone_builder_service( } None => { tracing::error!("Event stream ended"); + let connected = connect_to_events_service(hotshot_events_api_url.clone()).await; + if connected.is_none() { + return Err(anyhow!( + "failed to reconnect to API at {hotshot_events_api_url}" + )); + } + (subscribed_events, membership) = connected.unwrap(); } } }