Skip to content
This repository has been archived by the owner on Dec 3, 2024. It is now read-only.

Pass in Url so builder can reconnect #129

Merged
merged 4 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
[package]
name = "hotshot-builder-core"
version = "0.1.16"
version = "0.1.17"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1"
async-broadcast = "0.7"
async-compatibility-layer = { version = "1.1", default-features = false, features = [
"logging-utils",
Expand Down
95 changes: 72 additions & 23 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use hotshot_builder_api::{
data_source::{AcceptsTxnSubmits, BuilderDataSource},
};
use hotshot_types::{
constants::Version01,
data::{DAProposal, Leaf, QuorumProposal},
event::EventType,
message::Proposal,
Expand All @@ -28,6 +29,7 @@ use crate::builder_state::{
};
use crate::builder_state::{MessageType, RequestMessage, ResponseMessage};
use crate::WaitAndKeep;
use anyhow::anyhow;
use async_broadcast::Sender as BroadcastSender;
pub use async_broadcast::{broadcast, RecvError, TryRecvError};
use async_compatibility_layer::{art::async_timeout, channel::unbounded};
Expand All @@ -48,7 +50,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::{fmt::Display, time::Instant};
use tagged_base64::TaggedBase64;
use tide_disco::method::ReadState;
use tide_disco::{method::ReadState, Url};

// It holds all the necessary information for a block
#[derive(Debug)]
Expand Down Expand Up @@ -655,30 +657,34 @@ impl<Types: NodeType> ReadState for ProxyGlobalState<Types> {
}
}

/*
Running Non-Permissioned Builder Service
*/
pub async fn run_non_permissioned_standalone_builder_service<Types: NodeType>(
// sending a transaction from the hotshot mempool to the builder states
tx_sender: BroadcastSender<MessageType<Types>>,

// sending a DA proposal from the hotshot to the builder states
da_sender: BroadcastSender<MessageType<Types>>,

// sending a QC proposal from the hotshot to the builder states
qc_sender: BroadcastSender<MessageType<Types>>,

// sending a Decide event from the hotshot to the builder states
decide_sender: BroadcastSender<MessageType<Types>>,

// connection to the events stream
mut subscribed_events: surf_disco::socket::Connection<
async fn connect_to_events_service<Types: NodeType>(
hotshot_events_api_url: Url,
) -> Option<(
surf_disco::socket::Connection<
BuilderEvent<Types>,
surf_disco::socket::Unsupported,
EventStreamError,
vbs::version::StaticVersion<0, 1>,
>,
) {
GeneralStaticCommittee<Types, <Types as NodeType>::SignatureKey>,
)> {
let client = surf_disco::Client::<hotshot_events_service::events::Error, Version01>::new(
hotshot_events_api_url.clone(),
);

if !(client.connect(None).await) {
return None;
}

tracing::info!("Builder client connected to the hotshot events api");

// client subscrive to hotshot events
let mut subscribed_events = client
.socket("hotshot-events/events")
.subscribe::<BuilderEvent<Types>>()
.await
.ok()?;

// handle the startup event at the start
let membership = if let Some(Ok(event)) = subscribed_events.next().await {
match event.event {
Expand All @@ -700,16 +706,52 @@ pub async fn run_non_permissioned_standalone_builder_service<Types: NodeType>(
known_node_with_stake,
non_staked_node_count
);
membership
Some(membership)
}
_ => {
tracing::error!("Startup info event not received as first event");
return;
None
}
}
} else {
return;
None
};
membership.map(|membership| (subscribed_events, membership))
}
/*
Running Non-Permissioned Builder Service
*/
pub async fn run_non_permissioned_standalone_builder_service<Types: NodeType>(
// sending a transaction from the hotshot mempool to the builder states
tx_sender: BroadcastSender<MessageType<Types>>,

// sending a DA proposal from the hotshot to the builder states
da_sender: BroadcastSender<MessageType<Types>>,

// sending a QC proposal from the hotshot to the builder states
qc_sender: BroadcastSender<MessageType<Types>>,

// sending a Decide event from the hotshot to the builder states
decide_sender: BroadcastSender<MessageType<Types>>,

// Url to (re)connect to for the events stream
hotshot_events_api_url: Url,
) -> Result<(), anyhow::Error> {
// connection to the events stream
// mut subscribed_events: surf_disco::socket::Connection<
// BuilderEvent<Types>,
// surf_disco::socket::Unsupported,
// EventStreamError,
// vbs::version::StaticVersion<0, 1>,
// >,

let connected = connect_to_events_service(hotshot_events_api_url.clone()).await;
if connected.is_none() {
return Err(anyhow!(
"failed to connect to API at {hotshot_events_api_url}"
));
}
let (mut subscribed_events, mut membership) = connected.unwrap();

loop {
let event = subscribed_events.next().await;
Expand Down Expand Up @@ -768,6 +810,13 @@ pub async fn run_non_permissioned_standalone_builder_service<Types: NodeType>(
}
None => {
tracing::error!("Event stream ended");
let connected = connect_to_events_service(hotshot_events_api_url.clone()).await;
if connected.is_none() {
return Err(anyhow!(
"failed to reconnect to API at {hotshot_events_api_url}"
));
}
(subscribed_events, membership) = connected.unwrap();
}
}
}
Expand Down
Loading