From 0b86c7a283e3d25c9aede628015956232af4ffec Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Mon, 30 Sep 2024 13:52:49 +0200 Subject: [PATCH 01/12] Extract shared crate --- Cargo.lock | 24 + Cargo.toml | 13 +- crates/legacy/Cargo.toml | 1 + crates/legacy/src/builder_state.rs | 23 +- crates/legacy/src/lib.rs | 49 +- crates/legacy/src/service.rs | 923 +++--------------- crates/legacy/src/testing/basic_test.rs | 3 +- .../legacy/src/testing/finalization_test.rs | 23 +- crates/marketplace/Cargo.toml | 1 + crates/marketplace/src/builder_state.rs | 4 +- crates/marketplace/src/service.rs | 67 +- crates/marketplace/src/testing/mod.rs | 4 +- crates/marketplace/src/testing/order_test.rs | 10 +- crates/marketplace/src/utils.rs | 237 +---- crates/shared/Cargo.toml | 26 + crates/shared/src/block.rs | 66 ++ crates/shared/src/lib.rs | 4 + crates/shared/src/testing.rs | 1 + crates/shared/src/utils.rs | 183 ++++ 19 files changed, 512 insertions(+), 1150 deletions(-) create mode 100644 crates/shared/Cargo.toml create mode 100644 crates/shared/src/block.rs create mode 100644 crates/shared/src/lib.rs create mode 100644 crates/shared/src/testing.rs create mode 100644 crates/shared/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index c37201bd..bd4e0b3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3061,6 +3061,7 @@ dependencies = [ "hotshot-example-types", "hotshot-types", "lru 0.12.4", + "marketplace-builder-shared", "serde", "sha2 0.10.8", "snafu", @@ -4616,6 +4617,7 @@ dependencies = [ "hotshot-task-impls", "hotshot-types", "lru 0.12.4", + "marketplace-builder-shared", "multimap", "rkyv", "serde", @@ -4632,6 +4634,28 @@ dependencies = [ "vbs", ] +[[package]] +name = "marketplace-builder-shared" +version = "0.1.51" +dependencies = [ + "anyhow", + "async-broadcast", + "async-trait", + "committable", + "either", + "futures", + "hex", + "hotshot", + "hotshot-builder-api", + "hotshot-events-service", + "hotshot-task-impls", + "hotshot-types", + "surf-disco", + "tracing", + "url", + "vbs", +] + [[package]] name = "match_cfg" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index cd9142fc..bd7f6846 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,14 +1,16 @@ [workspace] resolver = "2" -members = [ - "crates/*" -] +members = ["crates/*"] [workspace.dependencies] -hotshot = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.77", features = ["dependency-tasks"] } +hotshot = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.77", features = [ + "dependency-tasks", +] } hotshot-builder-api = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.77" } hotshot-events-service = { git = "https://github.com/EspressoSystems/hotshot-events-service.git", tag = "0.1.49" } -hotshot-task-impls = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.77", features = ["dependency-tasks"] } +hotshot-task-impls = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.77", features = [ + "dependency-tasks", +] } hotshot-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.77" } hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.77" } @@ -48,4 +50,5 @@ edition = "2021" [workspace.lints.rust] unexpected_cfgs = { level = "warn", check-cfg = [ 'cfg(async_executor_impl, values("async-std", "tokio"))', + 'cfg(async_channel_impl, values("async-std", "tokio"))', ] } diff --git a/crates/legacy/Cargo.toml b/crates/legacy/Cargo.toml index 498c92ac..1e92e640 100644 --- a/crates/legacy/Cargo.toml +++ b/crates/legacy/Cargo.toml @@ -4,6 +4,7 @@ version = { workspace = true } edition = { workspace = true } [dependencies] +marketplace-builder-shared = { path = "../shared" } anyhow = { workspace = true } async-broadcast = { workspace = true } diff --git a/crates/legacy/src/builder_state.rs b/crates/legacy/src/builder_state.rs index b040c6f6..774e0786 100644 --- a/crates/legacy/src/builder_state.rs +++ b/crates/legacy/src/builder_state.rs @@ -10,12 +10,13 @@ use hotshot_types::{ utils::BuilderCommitment, vid::{VidCommitment, VidPrecomputeData}, }; +use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; use committable::Commitment; use crate::{ service::{GlobalState, ReceivedTransaction}, - BlockId, BuilderStateId, LegacyCommit, ParentBlockReferences, + LegacyCommit, }; use async_broadcast::broadcast; use async_broadcast::Receiver as BroadcastReceiver; @@ -267,7 +268,7 @@ async fn best_builder_states_to_extend( let current_commitment = quorum_proposal.data.block_header.payload_commitment(); let current_builder_state_id = BuilderStateId:: { parent_commitment: current_commitment, - view: current_view_number, + parent_view: current_view_number, }; let global_state_read_lock = global_state.read_arc().await; @@ -319,7 +320,7 @@ async fn best_builder_states_to_extend( let maximum_stored_view_number_smaller_than_quorum_proposal = global_state_read_lock .spawned_builder_states .keys() - .map(|builder_state_id| *builder_state_id.view) + .map(|builder_state_id| *builder_state_id.parent_view) .filter(|view_number| view_number < ¤t_view_number) .max(); @@ -337,7 +338,7 @@ async fn best_builder_states_to_extend( .spawned_builder_states .keys() .filter(|builder_state_id| { - builder_state_id.view.u64() + builder_state_id.parent_view.u64() == maximum_stored_view_number_smaller_than_quorum_proposal }) { @@ -386,7 +387,7 @@ impl BuilderState { .map(|builder_state_id| format!( "{}@{}", builder_state_id.parent_commitment, - builder_state_id.view.u64() + builder_state_id.parent_view.u64() )) .collect::>(), quorum_proposal.data.block_header.payload_commitment(), @@ -397,7 +398,7 @@ impl BuilderState { // best [BuilderState]s to extend from. best_builder_states_to_extend.contains(&BuilderStateId { parent_commitment: self.parent_block_references.vid_commitment, - view: self.parent_block_references.view_number, + parent_view: self.parent_block_references.view_number, }) } @@ -644,7 +645,7 @@ impl BuilderState { let builder_state_id = BuilderStateId { parent_commitment: self.parent_block_references.vid_commitment, - view: self.parent_block_references.view_number, + parent_view: self.parent_block_references.view_number, }; { @@ -689,7 +690,7 @@ impl BuilderState { self.global_state.write_arc().await.register_builder_state( BuilderStateId { parent_commitment: self.parent_block_references.vid_commitment, - view: self.parent_block_references.view_number, + parent_view: self.parent_block_references.view_number, }, self.parent_block_references.clone(), req_sender, @@ -725,7 +726,7 @@ impl BuilderState { let should_prioritize_finalization = self .allow_empty_block_until - .map(|until| state_id.view < until) + .map(|until| state_id.parent_view < until) .unwrap_or(false); if self.tx_queue.is_empty() && !should_prioritize_finalization { @@ -841,7 +842,7 @@ impl BuilderState { async fn process_block_request(&mut self, req: RequestMessage) { // If a spawned clone is active then it will handle the request, otherwise the highest view num builder will handle if req.state_id.parent_commitment != self.parent_block_references.vid_commitment - || req.state_id.view != self.parent_block_references.view_number + || req.state_id.parent_view != self.parent_block_references.view_number { tracing::debug!( "Builder {:?} Requested Builder commitment does not match the built_from_view, so ignoring it", @@ -857,7 +858,7 @@ impl BuilderState { .highest_view_num_builder_id .clone(); - if highest_view_num_builder_id.view != self.parent_block_references.view_number { + if highest_view_num_builder_id.parent_view != self.parent_block_references.view_number { tracing::debug!( "Builder {:?} Requested Builder commitment does not match the highest_view_num_builder_id, so ignoring it", self.parent_block_references.view_number diff --git a/crates/legacy/src/lib.rs b/crates/legacy/src/lib.rs index 8c5185a0..8c2dadf2 100644 --- a/crates/legacy/src/lib.rs +++ b/crates/legacy/src/lib.rs @@ -22,11 +22,8 @@ pub mod service; pub mod testing; use async_compatibility_layer::channel::UnboundedReceiver; -use committable::Commitment; use hotshot_builder_api::v0_1::builder::BuildError; -use hotshot_types::{ - data::Leaf, traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment, -}; +use hotshot_types::traits::node_implementation::NodeType; /// WaitAndKeep is a helper enum that allows for the lazy polling of a single /// value from an unbound receiver. @@ -78,50 +75,6 @@ impl WaitAndKeep { } } -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct BlockId { - hash: BuilderCommitment, - view: Types::Time, -} - -impl std::fmt::Display for BlockId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Block({}@{})", - hex::encode(self.hash.as_ref()), - *self.view - ) - } -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct BuilderStateId { - parent_commitment: VidCommitment, - view: Types::Time, -} - -impl std::fmt::Display for BuilderStateId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "BuilderState({}@{})", self.parent_commitment, *self.view) - } -} - -/// References to the parent block that is extended to spawn the new builder state. -#[derive(Debug, Clone)] -pub struct ParentBlockReferences { - pub view_number: TYPES::Time, - pub vid_commitment: VidCommitment, - pub leaf_commit: Commitment>, - pub builder_commitment: BuilderCommitment, -} -// implement display for the referenced info -impl std::fmt::Display for ParentBlockReferences { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "View Number: {:?}", self.view_number) - } -} - // TODO: Update commitment calculation with the new `commit`. // trait LegacyCommit { diff --git a/crates/legacy/src/service.rs b/crates/legacy/src/service.rs index 42369b2c..416504d5 100644 --- a/crates/legacy/src/service.rs +++ b/crates/legacy/src/service.rs @@ -1,7 +1,4 @@ -use hotshot::{ - traits::{election::static_committee::StaticCommittee, NodeImplementation}, - types::{Event, SystemContextHandle}, -}; +use hotshot::types::Event; use hotshot_builder_api::v0_1::{ block_info::{AvailableBlockData, AvailableBlockHeaderInput, AvailableBlockInfo}, builder::BuildError, @@ -13,10 +10,7 @@ use hotshot_types::{ message::Proposal, traits::{ block_contents::BlockPayload, - consensus_api::ConsensusApi, - election::Membership, - network::Topic, - node_implementation::{ConsensusTime, NodeType, Versions}, + node_implementation::{ConsensusTime, NodeType}, signature_key::{BuilderSignatureKey, SignatureKey}, }, utils::BuilderCommitment, @@ -25,19 +19,17 @@ use hotshot_types::{ use lru::LruCache; use vbs::version::StaticVersionType; +use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; + +use crate::builder_state::{MessageType, RequestMessage, ResponseMessage}; use crate::{ builder_state::{ BuildBlockInfo, DaProposalMessage, DecideMessage, QuorumProposalMessage, TransactionSource, TriggerStatus, }, - BlockId, LegacyCommit as _, -}; -use crate::{ - builder_state::{MessageType, RequestMessage, ResponseMessage}, - BuilderStateId, ParentBlockReferences, + LegacyCommit as _, }; use crate::{WaitAndKeep, WaitAndKeepGetError}; -use anyhow::anyhow; pub use async_broadcast::{broadcast, RecvError, TryRecvError}; use async_broadcast::{Sender as BroadcastSender, TrySendError}; use async_compatibility_layer::{ @@ -50,15 +42,14 @@ use async_trait::async_trait; use committable::{Commitment, Committable}; use futures::stream::StreamExt; use futures::{future::BoxFuture, Stream}; -use hotshot_events_service::{events::Error as EventStreamError, events_source::StartupInfo}; use sha2::{Digest, Sha256}; +use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; -use std::{collections::HashMap, marker::PhantomData}; use std::{fmt::Display, time::Instant}; use tagged_base64::TaggedBase64; -use tide_disco::{method::ReadState, Url}; +use tide_disco::method::ReadState; // Start assuming we're fine calculatig VID for 5 megabyte blocks const INITIAL_MAX_BLOCK_SIZE: u64 = 5_000_000; @@ -182,7 +173,7 @@ impl GlobalState { let mut spawned_builder_states = HashMap::new(); let bootstrap_id = BuilderStateId { parent_commitment: bootstrapped_builder_state_id, - view: bootstrapped_view_num, + parent_view: bootstrapped_view_num, }; spawned_builder_states.insert(bootstrap_id.clone(), (None, bootstrap_sender.clone())); GlobalState { @@ -228,7 +219,7 @@ impl GlobalState { } // keep track of the max view number - if parent_id.view > self.highest_view_num_builder_id.view { + if parent_id.parent_view > self.highest_view_num_builder_id.parent_view { tracing::info!("registering builder {parent_id} as highest",); self.highest_view_num_builder_id = parent_id; } else { @@ -308,9 +299,9 @@ impl GlobalState { pub fn remove_handles(&mut self, on_decide_view: Types::Time) -> Types::Time { // remove everything from the spawned builder states when view_num <= on_decide_view; // if we don't have a highest view > decide, use highest view as cutoff. - let cutoff = std::cmp::min(self.highest_view_num_builder_id.view, on_decide_view); + let cutoff = std::cmp::min(self.highest_view_num_builder_id.parent_view, on_decide_view); self.spawned_builder_states - .retain(|id, _| id.view >= cutoff); + .retain(|id, _| id.parent_view >= cutoff); let cutoff_u64 = cutoff.u64(); let gc_view = if cutoff_u64 > 0 { cutoff_u64 - 1 } else { 0 }; @@ -366,7 +357,7 @@ impl GlobalState { // iterate over the spawned builder states and check if the view number exists self.spawned_builder_states .iter() - .any(|(id, _)| id.view == *key) + .any(|(id, _)| id.parent_view == *key) } pub fn should_view_handle_other_proposals( @@ -374,7 +365,7 @@ impl GlobalState { builder_view: &Types::Time, proposal_view: &Types::Time, ) -> bool { - *builder_view == self.highest_view_num_builder_id.view + *builder_view == self.highest_view_num_builder_id.parent_view && !self.check_builder_state_existence_for_a_view(proposal_view) } } @@ -541,7 +532,7 @@ impl ProxyGlobalState { let state_id = BuilderStateId { parent_commitment: *for_parent, - view: Types::Time::new(view_number), + parent_view: Types::Time::new(view_number), }; // verify the signature @@ -552,7 +543,7 @@ impl ProxyGlobalState { tracing::info!("Requesting available blocks for {state_id}",); - let view_num = state_id.view; + let view_num = state_id.parent_view; // check in the local spawned builder states // if it doesn't exist; there are three cases // 1) it has already been garbage collected (view < decide) and we should return an error @@ -564,14 +555,14 @@ impl ProxyGlobalState { // If this `BlockBuilder` hasn't been reaped, it should have been. let global_state = self.global_state.read_arc().await; if view_num < global_state.last_garbage_collected_view_num - && global_state.highest_view_num_builder_id.view + && global_state.highest_view_num_builder_id.parent_view != global_state.last_garbage_collected_view_num { tracing::warn!( "Requesting for view {:?}, last decide-triggered cleanup on view {:?}, highest view num is {:?}", view_num, global_state.last_garbage_collected_view_num, - global_state.highest_view_num_builder_id.view + global_state.highest_view_num_builder_id.parent_view ); return Err(AvailableBlocksError::RequestForAvailableViewThatHasAlreadyBeenDecided); } @@ -1014,178 +1005,6 @@ impl ReadState for ProxyGlobalState { } } -/// [HotShotEventsService] is a trait that defines the interface for the -/// hotshot events service. The service is expected to provide a stream of -/// events and a startup info. -#[async_trait] -trait HotShotEventsService { - type EventsStream: Stream, EventStreamError>>; - type EventsError; - - type StartUpInfo; - type StartUpInfoError; - - /// [check_connection] is a function that will check that the service is - /// connected, or can be connected to the hotshot events service. - /// It's use is optional, and it has been included strictly for current - /// logic compatibility. - async fn check_connection(&self, timeout: Option) -> bool; - - /// [events] is a function that will return a stream of events from the - /// hotshot events service. The stream itself is expected to contain - /// items that are a [Result] of [Event] or an [EventsError]. - /// If [check_connection] has not been called before this function, the - /// connection should be established. - async fn events(&self) -> Result; - - /// [startup_info] is a function that will return the startup info from - /// the hotshot events service. The response is expected to be a - /// [Result] of [StartupInfo] or a [StartupInfoError]. - /// If [check_connection] has not been called before this function, the - /// connection should be established. - async fn startup_info(&self) -> Result; -} - -struct HotShotEventsServiceTideDiscoClient { - client: surf_disco::Client, - _pd: PhantomData, -} - -impl HotShotEventsServiceTideDiscoClient { - fn new(client: surf_disco::Client) -> Self { - HotShotEventsServiceTideDiscoClient { - client, - _pd: Default::default(), - } - } - - fn from_url(url: Url) -> Self { - let client = surf_disco::Client::::new(url); - Self::new(client) - } -} - -#[async_trait] -impl HotShotEventsService - for HotShotEventsServiceTideDiscoClient -{ - type EventsStream = surf_disco::socket::Connection< - Event, - surf_disco::socket::Unsupported, - EventStreamError, - Ver, - >; - type EventsError = hotshot_events_service::events::Error; - - type StartUpInfo = StartupInfo; - type StartUpInfoError = hotshot_events_service::events::Error; - - async fn check_connection(&self, timeout: Option) -> bool { - self.client.connect(timeout).await - } - - async fn events(&self) -> Result { - self.client - .socket("hotshot-events/events") - .subscribe::>() - .await - } - - async fn startup_info(&self) -> Result { - self.client - .get::>("hotshot-events/startup_info") - .send() - .await - } -} - -/// [ConnectToEventsServiceError] is an error enum that represents the class -/// of possible errors that can be returned when calling -/// `connect_to_events_service`. -#[derive(Debug)] -enum ConnectToEventsServiceError { - /// [Connection] is an error variant that represents a failure to - /// connect to the hotshot events service. - Connection, - - /// [Subscription] is an error variant that represents a failure to - /// subscribe to the events stream. - Subscription(hotshot_events_service::events::Error), - - /// [StartupInfo] is an error variant that represents a failure to - /// retrieve the startup info from the hotshot events service. - StartupInfo(hotshot_events_service::events::Error), -} - -impl Display for ConnectToEventsServiceError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ConnectToEventsServiceError::Connection => { - write!(f, "failed to connect to the hotshot events service") - } - ConnectToEventsServiceError::Subscription(e) => { - write!(f, "failed to subscribe to the events stream: {:?}", e) - } - ConnectToEventsServiceError::StartupInfo(e) => { - write!(f, "failed to retrieve the startup info: {:?}", e) - } - } - } -} - -/// [connect_to_events_service] is a function that will attempt to connect to -/// the hotshot events service, setup a subscription to the events stream, -/// and retrieve the startup info. If all is successful, it will return the -/// stream, and membership information derived from the startup info. -async fn connect_to_events_service( - client: &C, -) -> Result<(C::EventsStream, StaticCommittee), ConnectToEventsServiceError> -where - C: HotShotEventsService< - Types, - EventsStream = S, - EventsError = hotshot_events_service::events::Error, - StartUpInfo = StartupInfo, - StartUpInfoError = hotshot_events_service::events::Error, - >, - S: Stream, EventStreamError>>, -{ - if !(client.check_connection(None).await) { - return Err(ConnectToEventsServiceError::Connection); - } - - tracing::info!("Builder client connected to the hotshot events api"); - - // client subscribe to hotshot events - let subscribed_events = client - .events() - .await - .map_err(ConnectToEventsServiceError::Subscription)?; - - // handle the startup event at the start - let StartupInfo { - known_node_with_stake, - non_staked_node_count, - } = client - .startup_info() - .await - .map_err(ConnectToEventsServiceError::StartupInfo)?; - - let membership: StaticCommittee = StaticCommittee::::new( - known_node_with_stake.clone(), - known_node_with_stake.clone(), - Topic::Global, - ); - - tracing::info!( - "Startup info: Known nodes with stake: {:?}, Non-staked node count: {:?}", - known_node_with_stake, - non_staked_node_count - ); - - Ok((subscribed_events, membership)) -} - /* Running Non-Permissioned Builder Service */ @@ -1202,209 +1021,68 @@ pub async fn run_non_permissioned_standalone_builder_service< // sending a Decide event from the hotshot to the builder states decide_sender: BroadcastSender>, - // Url to (re)connect to for the events stream - hotshot_events_api_url: Url, + // HotShot event stream + hotshot_event_stream: impl Stream>, + + total_nodes: NonZeroUsize, // Global state global_state: Arc>>, ) -> Result<(), anyhow::Error> { - // connection to the events stream - let events_service_client = - HotShotEventsServiceTideDiscoClient::::from_url(hotshot_events_api_url.clone()); - - let connect_to_events_service_result = connect_to_events_service(&events_service_client).await; - let (mut subscribed_events, mut membership) = match connect_to_events_service_result { - Ok((subscribed_events, membership)) => (subscribed_events, membership), - Err(err) => { - return Err(anyhow!( - "failed to connect to API at {hotshot_events_api_url}: {err}" - )); - } - }; - let tx_sender = { // This closure is likely unnecessary, but we want to play it safe // with our RWLocks. let global_state_read_lock_guard = global_state.read_arc().await; global_state_read_lock_guard.tx_sender.clone() }; + let mut hotshot_event_stream = std::pin::pin!(hotshot_event_stream); loop { - let event = subscribed_events.next().await; - //tracing::debug!("Builder Event received from HotShot: {:?}", event); - match event { - Some(Ok(event)) => { - match event.event { - EventType::Error { error } => { - tracing::error!("Error event in HotShot: {:?}", error); - } - // tx event - EventType::Transactions { transactions } => { - let max_block_size = { - // This closure is likely unnecessary, but we want - // to play it safe with our RWLocks. - let global_state_read_lock_guard = global_state.read_arc().await; - global_state_read_lock_guard.max_block_size - }; - - handle_received_txns( - &tx_sender, - transactions, - TransactionSource::HotShot, - max_block_size, - ) - .await; - } - // decide event - EventType::Decide { - block_size: _, - leaf_chain, - qc: _, - } => { - let latest_decide_view_num = leaf_chain[0].leaf.view_number(); - handle_decide_event(&decide_sender, latest_decide_view_num).await; - } - // DA proposal event - EventType::DaProposal { proposal, sender } => { - // get the leader for current view - let leader = membership.leader(proposal.data.view_number); - // get the committee mstatked node count - let total_nodes = membership.total_nodes(); - - handle_da_event( - &da_sender, - Arc::new(proposal), - sender, - leader, - NonZeroUsize::new(total_nodes).unwrap_or(NonZeroUsize::MIN), - ) - .await; - } - // Quorum proposal event - EventType::QuorumProposal { proposal, sender } => { - // get the leader for current view - let leader = membership.leader(proposal.data.view_number); - handle_quorum_event(&quorum_sender, Arc::new(proposal), sender, leader) - .await; - } - _ => { - tracing::debug!("Unhandled event from Builder"); - } - } - } - Some(Err(e)) => { - tracing::error!("Error in the event stream: {:?}", e); - } - None => { - tracing::error!("Event stream ended"); - let connected = connect_to_events_service(&events_service_client).await; - - (subscribed_events, membership) = match connected { - Ok((subscribed_events, membership)) => (subscribed_events, membership), - Err(err) => { - return Err(anyhow!( - "failed to reconnect to API at {hotshot_events_api_url}: {err}" - )); - } - }; - } - } - } -} - -/* -Running Permissioned Builder Service -*/ -pub async fn run_permissioned_standalone_builder_service< - Types: NodeType, - I: NodeImplementation, - V: Versions, ->( - // sending a DA proposal from the hotshot to the builder states - da_sender: BroadcastSender>, - - // sending a Quorum proposal from the hotshot to the builder states - quorum_sender: BroadcastSender>, - - // sending a Decide event from the hotshot to the builder states - decide_sender: BroadcastSender>, - - // hotshot context handle - hotshot_handle: Arc>, + let Some(event) = hotshot_event_stream.next().await else { + anyhow::bail!("Event stream ended"); + }; - // Global state - global_state: Arc>>, -) { - let mut event_stream = hotshot_handle.event_stream(); - let tx_sender = { - // This closure is likely unnecessary, but we want to play it safe - // with our RWLocks. - let global_state_read_lock_guard = global_state.read_arc().await; - global_state_read_lock_guard.tx_sender.clone() - }; + match event.event { + EventType::Error { error } => { + tracing::error!("Error event in HotShot: {:?}", error); + } + // tx event + EventType::Transactions { transactions } => { + let max_block_size = { + // This closure is likely unnecessary, but we want + // to play it safe with our RWLocks. + let global_state_read_lock_guard = global_state.read_arc().await; + global_state_read_lock_guard.max_block_size + }; - loop { - tracing::debug!("Waiting for events from HotShot"); - match event_stream.next().await { - None => { - tracing::error!("Didn't receive any event from the HotShot event stream"); + handle_received_txns( + &tx_sender, + transactions, + TransactionSource::HotShot, + max_block_size, + ) + .await; + } + // decide event + EventType::Decide { + block_size: _, + leaf_chain, + qc: _, + } => { + let latest_decide_view_num = leaf_chain[0].leaf.view_number(); + handle_decide_event(&decide_sender, latest_decide_view_num).await; + } + // DA proposal event + EventType::DaProposal { proposal, sender } => { + handle_da_event(&da_sender, Arc::new(proposal), sender, total_nodes).await; + } + // QC proposal event + EventType::QuorumProposal { proposal, sender } => { + // get the leader for current view + handle_quorum_event(&quorum_sender, Arc::new(proposal), sender).await; } - Some(event) => { - match event.event { - // error event - EventType::Error { error } => { - tracing::error!("Error event in HotShot: {:?}", error); - } - // tx event - EventType::Transactions { transactions } => { - let max_block_size = { - // This closure is likely unnecessary, but we want - // to play it safe with our RWLocks. - let global_state_read_lock_guard = global_state.read_arc().await; - global_state_read_lock_guard.max_block_size - }; - - handle_received_txns( - &tx_sender, - transactions, - TransactionSource::HotShot, - max_block_size, - ) - .await; - } - // decide event - EventType::Decide { leaf_chain, .. } => { - let latest_decide_view_number = leaf_chain[0].leaf.view_number(); - - handle_decide_event(&decide_sender, latest_decide_view_number).await; - } - // DA proposal event - EventType::DaProposal { proposal, sender } => { - // get the leader for current view - let leader = hotshot_handle.leader(proposal.data.view_number).await; - // get the committee staked node count - let total_nodes = hotshot_handle.total_nodes(); - - handle_da_event( - &da_sender, - Arc::new(proposal), - sender, - leader, - total_nodes, - ) - .await; - } - // Quorum proposal event - EventType::QuorumProposal { proposal, sender } => { - // get the leader for current view - let leader = hotshot_handle.leader(proposal.data.view_number).await; - handle_quorum_event(&quorum_sender, Arc::new(proposal), sender, leader) - .await; - } - _ => { - tracing::error!("Unhandled event from Builder: {:?}", event.event); - } - } + _ => { + tracing::debug!("Unhandled event from Builder"); } } } @@ -1416,7 +1094,6 @@ pub async fn run_permissioned_standalone_builder_service< /// [handle_da_event_implementation]. #[derive(Debug)] enum HandleDaEventError { - SenderIsNotLeader, SignatureValidationFailed, BroadcastFailed(async_broadcast::SendError>), } @@ -1429,14 +1106,12 @@ async fn handle_da_event( da_channel_sender: &BroadcastSender>, da_proposal: Arc>>, sender: ::SignatureKey, - leader: ::SignatureKey, total_nodes: NonZeroUsize, ) { // We're explicitly not inspecting this error, as this function is not // expected to return an error or any indication of an error. let _ = - handle_da_event_implementation(da_channel_sender, da_proposal, sender, leader, total_nodes) - .await; + handle_da_event_implementation(da_channel_sender, da_proposal, sender, total_nodes).await; } /// [handle_da_event_implementation] is a utility function that will attempt @@ -1446,7 +1121,6 @@ async fn handle_da_event( /// There are only three conditions under which this will fail to send the /// message via the given `da_channel_sender`, and they are all represented /// via [HandleDaEventError]. They are as follows: -/// - [HandleDaEventError::SenderIsNotLeader]: The sender is not the leader /// - [HandleDaEventError::SignatureValidationFailed]: The signature validation /// failed /// - [HandleDaEventError::BroadcastFailed]: The broadcast failed as no receiver @@ -1457,12 +1131,11 @@ async fn handle_da_event_implementation( da_channel_sender: &BroadcastSender>, da_proposal: Arc>>, sender: ::SignatureKey, - leader: ::SignatureKey, total_nodes: NonZeroUsize, ) -> Result<(), HandleDaEventError> { tracing::debug!( "DaProposal: Leader: {:?} for the view: {:?}", - leader, + sender, da_proposal.data.view_number ); @@ -1470,21 +1143,18 @@ async fn handle_da_event_implementation( let encoded_txns_hash = Sha256::digest(&da_proposal.data.encoded_transactions); // check if the sender is the leader and the signature is valid; if yes, broadcast the DA proposal - // We check the failure condition first to prevent unnecessary conditional - // blocks - if leader != sender { - tracing::error!("Sender is not Leader on DaProposal for view {:?}: Leader for the current view: {:?} and sender: {:?}", da_proposal.data.view_number, leader, sender); - return Err(HandleDaEventError::SenderIsNotLeader); - } - if !sender.validate(&da_proposal.signature, &encoded_txns_hash) { - tracing::error!("Validation Failure on DaProposal for view {:?}: Leader for the current view: {:?} and sender: {:?}", da_proposal.data.view_number, leader, sender); + tracing::error!( + "Validation Failure on DaProposal for view {:?}: Leader: {:?}", + da_proposal.data.view_number, + sender + ); return Err(HandleDaEventError::SignatureValidationFailed); } let da_msg = DaProposalMessage:: { proposal: da_proposal, - sender: leader, + sender, total_nodes: total_nodes.into(), }; @@ -1515,7 +1185,6 @@ async fn handle_da_event_implementation( /// [handle_quorum_event_implementation]. #[derive(Debug)] enum HandleQuorumEventError { - SenderIsNotLeader, SignatureValidationFailed, BroadcastFailed(async_broadcast::SendError>), } @@ -1528,13 +1197,11 @@ async fn handle_quorum_event( quorum_channel_sender: &BroadcastSender>, quorum_proposal: Arc>>, sender: ::SignatureKey, - leader: ::SignatureKey, ) { // We're explicitly not inspecting this error, as this function is not // expected to return an error or any indication of an error. let _ = - handle_quorum_event_implementation(quorum_channel_sender, quorum_proposal, sender, leader) - .await; + handle_quorum_event_implementation(quorum_channel_sender, quorum_proposal, sender).await; } /// [handle_quorum_event_implementation] is a utility function that will attempt @@ -1544,7 +1211,6 @@ async fn handle_quorum_event( /// There are only three conditions under which this will fail to send the /// message via the given `quorum_channel_sender`, and they are all represented /// via [HandleQuorumEventError]. They are as follows: -/// - [HandleQuorumEventError::SenderIsNotLeader]: The sender is not the leader /// - [HandleQuorumEventError::SignatureValidationFailed]: The signature validation /// failed /// - [HandleQuorumEventError::BroadcastFailed]: The broadcast failed as no receiver @@ -1555,30 +1221,27 @@ async fn handle_quorum_event_implementation( quorum_channel_sender: &BroadcastSender>, quorum_proposal: Arc>>, sender: ::SignatureKey, - leader: ::SignatureKey, ) -> Result<(), HandleQuorumEventError> { tracing::debug!( "QuorumProposal: Leader: {:?} for the view: {:?}", - leader, + sender, quorum_proposal.data.view_number ); let leaf = Leaf::from_quorum_proposal(&quorum_proposal.data); - // check if the sender is the leader and the signature is valid; if yes, broadcast the Quorum proposal - if sender != leader { - tracing::error!("Sender is not Leader on QuorumProposal for view {:?}: Leader for the current view: {:?} and sender: {:?}", quorum_proposal.data.view_number, leader, sender); - return Err(HandleQuorumEventError::SenderIsNotLeader); - } - if !sender.validate(&quorum_proposal.signature, leaf.legacy_commit().as_ref()) { - tracing::error!("Validation Failure on QuorumProposal for view {:?}: Leader for the current view: {:?} and sender: {:?}", quorum_proposal.data.view_number, leader, sender); + tracing::error!( + "Validation Failure on QuorumProposal for view {:?}: Leader for the current view: {:?}", + quorum_proposal.data.view_number, + sender + ); return Err(HandleQuorumEventError::SignatureValidationFailed); } let quorum_msg = QuorumProposalMessage:: { proposal: quorum_proposal, - sender: leader, + sender, }; let view_number = quorum_msg.proposal.data.view_number; tracing::debug!( @@ -1793,16 +1456,12 @@ mod test { use async_compatibility_layer::channel::unbounded; use async_lock::RwLock; use committable::Commitment; - use futures::{ - channel::mpsc::{channel, Receiver}, - StreamExt, - }; + use futures::StreamExt; use hotshot::{ traits::BlockPayload, - types::{BLSPubKey, Event, SignatureKey}, + types::{BLSPubKey, SignatureKey}, }; use hotshot_builder_api::v0_2::block_info::AvailableBlockInfo; - use hotshot_events_service::events_source::StartupInfo; use hotshot_example_types::{ block_types::{TestBlockPayload, TestMetadata, TestTransaction}, node_types::{TestTypes, TestVersions}, @@ -1819,25 +1478,22 @@ mod test { }, utils::BuilderCommitment, }; + use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; use sha2::{Digest, Sha256}; - use tide_disco::StatusCode; use crate::{ builder_state::{ BuildBlockInfo, MessageType, RequestMessage, ResponseMessage, TransactionSource, TriggerStatus, }, - service::{ - connect_to_events_service, ConnectToEventsServiceError, HandleReceivedTxnsError, - INITIAL_MAX_BLOCK_SIZE, - }, - BlockId, BuilderStateId, LegacyCommit, ParentBlockReferences, + service::{HandleReceivedTxnsError, INITIAL_MAX_BLOCK_SIZE}, + LegacyCommit, }; use super::{ handle_da_event_implementation, handle_quorum_event_implementation, AvailableBlocksError, BlockInfo, ClaimBlockError, ClaimBlockHeaderInputError, GlobalState, HandleDaEventError, - HandleQuorumEventError, HandleReceivedTxns, HotShotEventsService, ProxyGlobalState, + HandleQuorumEventError, HandleReceivedTxns, ProxyGlobalState, }; // GlobalState Tests @@ -1864,7 +1520,7 @@ mod test { let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(1), + parent_view: ViewNumber::new(1), }; // There should be a single entry within the spawned_builder_states, @@ -1877,7 +1533,7 @@ mod test { assert!(state.spawned_builder_states.contains_key(&builder_state_id), "The spawned builder states should contain an entry with the bootstrapped parameters passed into new"); - assert!(!state.spawned_builder_states.contains_key(&BuilderStateId { parent_commitment: parent_commit, view: ViewNumber::new(0) }), "The spawned builder states should not contain any other entry, as such it should not contain any entry with a higher view number, but the same parent commit"); + assert!(!state.spawned_builder_states.contains_key(&BuilderStateId { parent_commitment: parent_commit, parent_view: ViewNumber::new(0) }), "The spawned builder states should not contain any other entry, as such it should not contain any entry with a higher view number, but the same parent commit"); // We can't compare the Senders directly @@ -1929,7 +1585,7 @@ mod test { let (req_sender, _) = async_broadcast::broadcast(10); let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(5), + parent_view: ViewNumber::new(5), }; state.register_builder_state( @@ -1962,7 +1618,7 @@ mod test { let (req_sender, _) = async_broadcast::broadcast(10); let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(6), + parent_view: ViewNumber::new(6), }; state.register_builder_state( @@ -2016,7 +1672,7 @@ mod test { let (req_sender, req_receiver) = async_broadcast::broadcast(10); let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(5), + parent_view: ViewNumber::new(5), }; state.register_builder_state( @@ -2047,7 +1703,7 @@ mod test { let (req_sender, req_receiver) = async_broadcast::broadcast(10); let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(5), + parent_view: ViewNumber::new(5), }; // This is the same BuilderStateId as the previous one, so it should @@ -2077,7 +1733,7 @@ mod test { { let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(5), + parent_view: ViewNumber::new(5), }; let req_id_and_sender = state.spawned_builder_states.get(&builder_state_id).unwrap(); @@ -2130,7 +1786,7 @@ mod test { let (req_sender, _) = async_broadcast::broadcast(10); let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(6), + parent_view: ViewNumber::new(6), }; state.register_builder_state( @@ -2163,7 +1819,7 @@ mod test { let (req_sender, _) = async_broadcast::broadcast(10); let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(5), + parent_view: ViewNumber::new(5), }; state.register_builder_state( @@ -2186,7 +1842,7 @@ mod test { state.highest_view_num_builder_id, BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(6) + parent_view: ViewNumber::new(6) }, "The highest view number builder id should now be the one that was just registered" ); @@ -2223,7 +1879,7 @@ mod test { let new_view_num = ViewNumber::new(1); let builder_state_id = BuilderStateId { parent_commitment: new_parent_commit, - view: new_view_num, + parent_view: new_view_num, }; let builder_hash_1 = BuilderCommitment::from_bytes([1, 2, 3, 4]); @@ -2422,7 +2078,7 @@ mod test { let new_view_num = ViewNumber::new(1); let builder_state_id = BuilderStateId { parent_commitment: new_parent_commit, - view: new_view_num, + parent_view: new_view_num, }; let builder_hash = BuilderCommitment::from_bytes([1, 2, 3, 4]); @@ -2698,7 +2354,7 @@ mod test { state.register_builder_state( BuilderStateId { parent_commitment: vid_commit, - view, + parent_view: view, }, ParentBlockReferences { view_number: view, @@ -2730,7 +2386,7 @@ mod test { let builder_state_id = BuilderStateId { parent_commitment: vid_commitment(&[10], 4), - view: ViewNumber::new(10), + parent_view: ViewNumber::new(10), }; assert_eq!( state.highest_view_num_builder_id, builder_state_id, @@ -2746,7 +2402,7 @@ mod test { assert!( state.spawned_builder_states.contains_key(&BuilderStateId { parent_commitment: vid_commitment(&[10], 4), - view: ViewNumber::new(10), + parent_view: ViewNumber::new(10), }), "The spawned builder states should contain the builder state id: {builder_state_id}" ); @@ -2790,7 +2446,7 @@ mod test { state.register_builder_state( BuilderStateId { parent_commitment: vid_commit, - view, + parent_view: view, }, ParentBlockReferences { view_number: view, @@ -2806,7 +2462,7 @@ mod test { state.highest_view_num_builder_id, BuilderStateId { parent_commitment: vid_commitment(&[10], 4), - view: ViewNumber::new(10), + parent_view: ViewNumber::new(10), }, "The highest view number builder id should be the one that was just registered" ); @@ -2867,7 +2523,7 @@ mod test { state.register_builder_state( BuilderStateId { parent_commitment: vid_commit, - view, + parent_view: view, }, ParentBlockReferences { view_number: view, @@ -2883,7 +2539,7 @@ mod test { state.highest_view_num_builder_id, BuilderStateId { parent_commitment: vid_commitment(&[10], 4), - view: ViewNumber::new(10), + parent_view: ViewNumber::new(10), }, "The highest view number builder id should be the one that was just registered" ); @@ -2908,7 +2564,7 @@ mod test { state.register_builder_state( BuilderStateId { parent_commitment: vid_commit, - view, + parent_view: view, }, ParentBlockReferences { view_number: view, @@ -2964,7 +2620,7 @@ mod test { state.register_builder_state( BuilderStateId { parent_commitment: vid_commit, - view, + parent_view: view, }, ParentBlockReferences { view_number: view, @@ -2986,7 +2642,7 @@ mod test { state.highest_view_num_builder_id, BuilderStateId { parent_commitment: vid_commitment(&[10], 4), - view: ViewNumber::new(10), + parent_view: ViewNumber::new(10), }, "The highest view number builder id should be the one that was just registered" ); @@ -3017,7 +2673,7 @@ mod test { for i in 0..5 { let builder_state_id = BuilderStateId { parent_commitment: vid_commitment(&[i], 4), - view: ViewNumber::new(i as u64), + parent_view: ViewNumber::new(i as u64), }; assert!( !state.spawned_builder_states.contains_key(&builder_state_id), @@ -3028,7 +2684,7 @@ mod test { for i in 5..=10 { let builder_state_id = BuilderStateId { parent_commitment: vid_commitment(&[i], 4), - view: ViewNumber::new(i as u64), + parent_view: ViewNumber::new(i as u64), }; assert!( state.spawned_builder_states.contains_key(&builder_state_id), @@ -3249,7 +2905,7 @@ mod test { let mut write_locked_global_state = state.global_state.write_arc().await; write_locked_global_state.highest_view_num_builder_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(5), + parent_view: ViewNumber::new(5), }; } @@ -3335,7 +2991,7 @@ mod test { // Now we want to make the block data available to the state. let expected_builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(1), + parent_view: ViewNumber::new(1), }; let mut response_receiver = { @@ -3349,7 +3005,7 @@ mod test { write_locked_global_state.register_builder_state( expected_builder_state_id.clone(), ParentBlockReferences { - view_number: expected_builder_state_id.view, + view_number: expected_builder_state_id.parent_view, vid_commitment: expected_builder_state_id.parent_commitment, leaf_commit: Commitment::from_raw([0; 32]), builder_commitment: BuilderCommitment::from_bytes([]), @@ -3456,7 +3112,7 @@ mod test { // Now we want to make the block data available to the state. let expected_builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(1), + parent_view: ViewNumber::new(1), }; let mut response_receiver = { @@ -3470,7 +3126,7 @@ mod test { write_locked_global_state.register_builder_state( expected_builder_state_id.clone(), ParentBlockReferences { - view_number: expected_builder_state_id.view, + view_number: expected_builder_state_id.parent_view, vid_commitment: expected_builder_state_id.parent_commitment, leaf_commit: Commitment::from_raw([0; 32]), builder_commitment: BuilderCommitment::from_bytes([]), @@ -4157,62 +3813,6 @@ mod test { // handle_da_event Tests - /// This test checks that the error [HandleDaEventError::SignatureValidationFailed] - /// is returned under the right conditions of invoking - /// [handle_da_event_implementation]. - /// - /// To trigger this error, we simply need to ensure that the public keys - /// provided for the leader and the sender do not match. - #[async_std::test] - async fn test_handle_da_event_implementation_error_sender_is_not_leader() { - let (sender_public_key, sender_private_key) = - ::generated_from_seed_indexed([0; 32], 0); - let (leader_public_key, _) = - ::generated_from_seed_indexed([0; 32], 1); - let (da_channel_sender, _) = async_broadcast::broadcast(10); - let total_nodes = NonZeroUsize::new(10).unwrap(); - let view_number = ViewNumber::new(10); - - let da_proposal = DaProposal:: { - encoded_transactions: Arc::new([1, 2, 3, 4, 5, 6]), - metadata: TestMetadata { - num_transactions: 1, - }, - view_number, - }; - - let encoded_txns_hash = Sha256::digest(&da_proposal.encoded_transactions); - let signature = - ::sign(&sender_private_key, &encoded_txns_hash).unwrap(); - - let signed_da_proposal = Arc::new(Proposal { - data: da_proposal, - signature, - _pd: Default::default(), - }); - - let result = handle_da_event_implementation( - &da_channel_sender, - signed_da_proposal.clone(), - sender_public_key, - leader_public_key, - total_nodes, - ) - .await; - - match result { - Err(HandleDaEventError::SenderIsNotLeader) => { - // This is expected. - } - Ok(_) => { - panic!("expected an error, but received a successful attempt instead") - } - Err(err) => { - panic!("Unexpected error: {:?}", err); - } - } - } - /// This test checks that the error [HandleDaEventError::SignatureValidationFailed] /// is returned under the right conditions of invoking /// [handle_da_event_implementation]. @@ -4253,7 +3853,6 @@ mod test { &da_channel_sender, signed_da_proposal.clone(), sender_public_key, - sender_public_key, total_nodes, ) .await; @@ -4282,8 +3881,6 @@ mod test { async fn test_handle_da_event_implementation_error_broadcast_failed() { let (sender_public_key, sender_private_key) = ::generated_from_seed_indexed([0; 32], 0); - let (leader_public_key, _) = - ::generated_from_seed_indexed([0; 32], 0); let da_channel_sender = { let (da_channel_sender, _) = async_broadcast::broadcast(10); da_channel_sender @@ -4314,7 +3911,6 @@ mod test { &da_channel_sender, signed_da_proposal.clone(), sender_public_key, - leader_public_key, total_nodes, ) .await; @@ -4338,8 +3934,6 @@ mod test { async fn test_handle_da_event_implementation_success() { let (sender_public_key, sender_private_key) = ::generated_from_seed_indexed([0; 32], 0); - let (leader_public_key, _) = - ::generated_from_seed_indexed([0; 32], 0); let (da_channel_sender, da_channel_receiver) = async_broadcast::broadcast(10); let total_nodes = NonZeroUsize::new(10).unwrap(); let view_number = ViewNumber::new(10); @@ -4366,7 +3960,6 @@ mod test { &da_channel_sender, signed_da_proposal.clone(), sender_public_key, - leader_public_key, total_nodes, ) .await; @@ -4393,74 +3986,6 @@ mod test { // handle_quorum_event Tests - /// This test checks that the error [HandleQuorumEventError::SenderIsNotLeader] - /// is returned under the right conditions of invoking - /// [handle_quorum_event_implementation]. - /// - /// To trigger this error, we simply need to ensure that the public keys - /// provided for the leader and the sender do not match. - #[async_std::test] - async fn test_handle_quorum_event_error_sender_is_not_leader() { - let (sender_public_key, sender_private_key) = - ::generated_from_seed_indexed([0; 32], 0); - let (leader_public_key, _) = - ::generated_from_seed_indexed([0; 32], 1); - let (quorum_channel_sender, _) = async_broadcast::broadcast(10); - let view_number = ViewNumber::new(10); - - let quorum_proposal = { - let leaf = Leaf::::genesis( - &TestValidatedState::default(), - &TestInstanceState::default(), - ) - .await; - - QuorumProposal:: { - block_header: leaf.block_header().clone(), - view_number, - justify_qc: QuorumCertificate::genesis::( - &TestValidatedState::default(), - &TestInstanceState::default(), - ) - .await, - upgrade_certificate: None, - proposal_certificate: None, - } - }; - - let leaf = Leaf::from_quorum_proposal(&quorum_proposal); - - let signature = - ::sign(&sender_private_key, leaf.legacy_commit().as_ref()) - .unwrap(); - - let signed_quorum_proposal = Arc::new(Proposal { - data: quorum_proposal, - signature, - _pd: Default::default(), - }); - - let result = handle_quorum_event_implementation( - &quorum_channel_sender, - signed_quorum_proposal.clone(), - sender_public_key, - leader_public_key, - ) - .await; - - match result { - Err(HandleQuorumEventError::SenderIsNotLeader) => { - // This is expected. - } - Ok(_) => { - panic!("expected an error, but received a successful attempt instead"); - } - Err(err) => { - panic!("Unexpected error: {:?}", err); - } - } - } - /// This test checks that the error [HandleQuorumEventError::SignatureValidationFailed] /// is returned under the right conditions of invoking /// [handle_quorum_event_implementation]. @@ -4515,7 +4040,6 @@ mod test { &quorum_channel_sender, signed_quorum_proposal.clone(), sender_public_key, - sender_public_key, ) .await; @@ -4543,8 +4067,6 @@ mod test { async fn test_handle_quorum_event_error_broadcast_failed() { let (sender_public_key, sender_private_key) = ::generated_from_seed_indexed([0; 32], 0); - let (leader_public_key, _) = - ::generated_from_seed_indexed([0; 32], 0); let quorum_channel_sender = { let (quorum_channel_sender, _) = async_broadcast::broadcast(10); quorum_channel_sender @@ -4588,7 +4110,6 @@ mod test { &quorum_channel_sender, signed_quorum_proposal.clone(), sender_public_key, - leader_public_key, ) .await; @@ -4611,8 +4132,6 @@ mod test { async fn test_handle_quorum_event_success() { let (sender_public_key, sender_private_key) = ::generated_from_seed_indexed([0; 32], 0); - let (leader_public_key, _) = - ::generated_from_seed_indexed([0; 32], 0); let (quorum_channel_sender, quorum_channel_receiver) = async_broadcast::broadcast(10); let view_number = ViewNumber::new(10); @@ -4652,7 +4171,6 @@ mod test { &quorum_channel_sender, signed_quorum_proposal.clone(), sender_public_key, - leader_public_key, ) .await; @@ -4876,213 +4394,4 @@ mod test { } } } - - /// This test checks that the error [ConnectToEventsServiceError::Connection] - /// is returned when [connect_to_events_service] is invoked. - /// - /// This error may be difficult to control in all environments. In order - /// to control this specific case, the underlying client being passed to - /// [connect_to_events_service] is a mock that always returns false when - /// [check_connection] is invoked. - #[async_std::test] - async fn test_connect_to_events_service_mock_error_connection() { - struct ConnectionFailure; - - #[async_trait::async_trait] - impl HotShotEventsService for ConnectionFailure { - type EventsStream = - Receiver, hotshot_events_service::events::Error>>; - type EventsError = hotshot_events_service::events::Error; - type StartUpInfo = StartupInfo; - type StartUpInfoError = hotshot_events_service::events::Error; - - async fn check_connection(&self, _timeout: Option) -> bool { - false - } - - async fn events(&self) -> Result { - todo!() - } - - async fn startup_info(&self) -> Result { - todo!() - } - } - - let client = ConnectionFailure; - - match connect_to_events_service(&client).await { - Err(ConnectToEventsServiceError::Connection) => { - // This is expected. - } - Ok(_) => { - panic!("Expected an error, but got a result"); - } - Err(err) => { - panic!("Unexpected error: {:?}", err); - } - } - } - - /// This test checks that the error [ConnectToEventsServiceError::Subscription] - /// is returned when [connect_to_events_service] is invoked. - /// - /// This error may be difficult to control in all environments. In order - /// to control this specific case, the underlying client being passed to - /// [connect_to_events_service] is a mock that always returns an error when - /// [events] is invoked. - #[async_std::test] - async fn test_connect_to_events_service_mock_error_events() { - struct EventsFailure; - - #[async_trait::async_trait] - impl HotShotEventsService for EventsFailure { - type EventsStream = - Receiver, hotshot_events_service::events::Error>>; - type EventsError = hotshot_events_service::events::Error; - type StartUpInfo = StartupInfo; - type StartUpInfoError = hotshot_events_service::events::Error; - - async fn check_connection(&self, _timeout: Option) -> bool { - true - } - - async fn events(&self) -> Result { - Err(hotshot_events_service::events::Error::Custom { - message: "This is a custom error".to_string(), - status: StatusCode::INTERNAL_SERVER_ERROR, - }) - } - - async fn startup_info(&self) -> Result { - todo!() - } - } - - let client = EventsFailure; - - match connect_to_events_service(&client).await { - Err(ConnectToEventsServiceError::Subscription( - hotshot_events_service::events::Error::Custom { .. }, - )) => { - // This is expected. - } - Ok(_) => { - panic!("Expected an error, but got a result"); - } - Err(err) => { - panic!("Unexpected error: {:?}", err); - } - } - } - - /// This test checks that the error [ConnectToEventsServiceError::StartupInfo] - /// is returned when [connect_to_events_service] is invoked. - /// - /// This error may be difficult to control in all environments. In order - /// to control this specific case, the underlying client being passed to - /// [connect_to_events_service] is a mock that always returns an error when - /// [startup_info] is invoked. - #[async_std::test] - async fn test_connect_to_service_mock_error_startup_info() { - type EventsStream = - Receiver, hotshot_events_service::events::Error>>; - struct StartupInfoFailure(Arc>>); - - #[async_trait::async_trait] - impl HotShotEventsService for StartupInfoFailure { - type EventsStream = EventsStream; - type EventsError = hotshot_events_service::events::Error; - type StartUpInfo = StartupInfo; - type StartUpInfoError = hotshot_events_service::events::Error; - - async fn check_connection(&self, _timeout: Option) -> bool { - true - } - - async fn events(&self) -> Result { - let mut write_lock_guard = self.0.write_arc().await; - - write_lock_guard - .take() - .ok_or(hotshot_events_service::events::Error::Custom { - message: "This is a custom error".to_string(), - status: StatusCode::INTERNAL_SERVER_ERROR, - }) - } - - async fn startup_info(&self) -> Result { - Err(hotshot_events_service::events::Error::Custom { - message: "This is a custom error".to_string(), - status: StatusCode::INTERNAL_SERVER_ERROR, - }) - } - } - - let client = StartupInfoFailure(Arc::new(RwLock::new(Some(channel(10).1)))); - - match connect_to_events_service(&client).await { - Err(ConnectToEventsServiceError::StartupInfo( - hotshot_events_service::events::Error::Custom { .. }, - )) => { - // This is expected. - } - Ok(_) => { - panic!("Expected an error, but got a result"); - } - Err(err) => { - panic!("Unexpected error: {:?}", err); - } - } - } - - /// This test checks that [connect_to_events_service] completes - /// successfully. - #[async_std::test] - async fn test_connect_to_service_mock_success() { - type EventsStream = - Receiver, hotshot_events_service::events::Error>>; - struct Success(Arc>>); - - #[async_trait::async_trait] - impl HotShotEventsService for Success { - type EventsStream = EventsStream; - type EventsError = hotshot_events_service::events::Error; - type StartUpInfo = StartupInfo; - type StartUpInfoError = hotshot_events_service::events::Error; - - async fn check_connection(&self, _timeout: Option) -> bool { - true - } - - async fn events(&self) -> Result { - let mut write_lock_guard = self.0.write_arc().await; - - write_lock_guard - .take() - .ok_or(hotshot_events_service::events::Error::Custom { - message: "This is a custom error".to_string(), - status: StatusCode::INTERNAL_SERVER_ERROR, - }) - } - - async fn startup_info(&self) -> Result { - Ok(StartupInfo { - known_node_with_stake: vec![], - non_staked_node_count: 0, - }) - } - } - - let client = Success(Arc::new(RwLock::new(Some(channel(10).1)))); - - match connect_to_events_service(&client).await { - Ok(_) => { - // This is expected. - } - Err(err) => { - panic!("Unexpected error: {:?}", err); - } - } - } } diff --git a/crates/legacy/src/testing/basic_test.rs b/crates/legacy/src/testing/basic_test.rs index f3efdb5b..04eeeffa 100644 --- a/crates/legacy/src/testing/basic_test.rs +++ b/crates/legacy/src/testing/basic_test.rs @@ -38,6 +38,7 @@ mod tests { block_types::{TestBlockHeader, TestBlockPayload, TestMetadata, TestTransaction}, state_types::{TestInstanceState, TestValidatedState}, }; + use marketplace_builder_shared::block::ParentBlockReferences; use crate::builder_state::{ DaProposalMessage, DecideMessage, QuorumProposalMessage, TransactionSource, @@ -45,7 +46,7 @@ mod tests { use crate::service::{ handle_received_txns, GlobalState, ProxyGlobalState, ReceivedTransaction, }; - use crate::{LegacyCommit, ParentBlockReferences}; + use crate::LegacyCommit; use async_lock::RwLock; use committable::{Commitment, CommitmentBoundsArkless, Committable}; use sha2::{Digest, Sha256}; diff --git a/crates/legacy/src/testing/finalization_test.rs b/crates/legacy/src/testing/finalization_test.rs index 18a9fda7..eea289a7 100644 --- a/crates/legacy/src/testing/finalization_test.rs +++ b/crates/legacy/src/testing/finalization_test.rs @@ -3,7 +3,6 @@ use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use crate::{ builder_state::{DaProposalMessage, QuorumProposalMessage, ALLOW_EMPTY_BLOCK_PERIOD}, service::{GlobalState, ProxyGlobalState, ReceivedTransaction}, - BuilderStateId, ParentBlockReferences, }; use async_broadcast::{broadcast, Sender}; use async_lock::RwLock; @@ -31,6 +30,8 @@ use hotshot_types::{ }, utils::BuilderCommitment, }; +use marketplace_builder_shared::block::BuilderStateId; +use marketplace_builder_shared::block::ParentBlockReferences; use sha2::{Digest, Sha256}; use super::basic_test::{BuilderState, MessageType}; @@ -66,15 +67,15 @@ fn setup_builder_for_test() -> TestSetup { let bootstrap_builder_state_id = BuilderStateId:: { parent_commitment: vid_commitment(&[], TEST_NUM_NODES_IN_VID_COMPUTATION), - view: ViewNumber::genesis(), + parent_view: ViewNumber::genesis(), }; let global_state = Arc::new(RwLock::new(GlobalState::new( req_sender, tx_sender.clone(), bootstrap_builder_state_id.parent_commitment, - bootstrap_builder_state_id.view, - bootstrap_builder_state_id.view, + bootstrap_builder_state_id.parent_view, + bootstrap_builder_state_id.parent_view, 0, ))); @@ -153,7 +154,7 @@ async fn process_available_blocks_round( let available_blocks_result = proxy_global_state .available_blocks( &builder_state_id.parent_commitment, - builder_state_id.view.u64(), + builder_state_id.parent_view.u64(), leader_pub, ¤t_commit_signature, ) @@ -194,7 +195,7 @@ async fn progress_round_with_available_block_info( let claim_block_result = proxy_global_state .claim_block( &available_block_info.block_hash, - builder_state_id.view.u64(), + builder_state_id.parent_view.u64(), leader_pub, &signed_parent_commitment, ) @@ -204,7 +205,7 @@ async fn progress_round_with_available_block_info( let _claim_block_header_result = proxy_global_state .claim_block_header_input( &available_block_info.block_hash, - builder_state_id.view.u64(), + builder_state_id.parent_view.u64(), leader_pub, &signed_parent_commitment, ) @@ -258,7 +259,7 @@ async fn progress_round_with_transactions( ) -> BuilderStateId { let (leader_pub, leader_priv) = BLSPubKey::generated_from_seed_indexed([0; 32], round); let encoded_transactions = TestTransaction::encode(&transactions); - let next_view = builder_state_id.view + 1; + let next_view = builder_state_id.parent_view + 1; // Create and send the DA Proposals and Quorum Proposals { @@ -353,7 +354,7 @@ async fn progress_round_with_transactions( BuilderStateId { parent_commitment: payload_vid_commitment, - view: next_view, + parent_view: next_view, } } } @@ -375,7 +376,7 @@ async fn test_empty_block_rate() { let mut current_builder_state_id = BuilderStateId:: { parent_commitment: vid_commitment(&[], TEST_NUM_NODES_IN_VID_COMPUTATION), - view: ViewNumber::genesis(), + parent_view: ViewNumber::genesis(), }; for round in 0..10 { @@ -426,7 +427,7 @@ async fn test_eager_block_rate() { let mut current_builder_state_id = BuilderStateId:: { parent_commitment: vid_commitment(&[], TEST_NUM_NODES_IN_VID_COMPUTATION), - view: ViewNumber::genesis(), + parent_view: ViewNumber::genesis(), }; // Round 0 diff --git a/crates/marketplace/Cargo.toml b/crates/marketplace/Cargo.toml index 3b357baf..5e241ba4 100644 --- a/crates/marketplace/Cargo.toml +++ b/crates/marketplace/Cargo.toml @@ -4,6 +4,7 @@ version = { workspace = true } edition = { workspace = true } [dependencies] +marketplace-builder-shared = { path = "../shared" } anyhow = "1" async-broadcast = "0.7" diff --git a/crates/marketplace/src/builder_state.rs b/crates/marketplace/src/builder_state.rs index bc5dcfd6..7532cf32 100644 --- a/crates/marketplace/src/builder_state.rs +++ b/crates/marketplace/src/builder_state.rs @@ -8,12 +8,14 @@ use hotshot_types::{ }, utils::BuilderCommitment, }; +use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; +use marketplace_builder_shared::utils::RotatingSet; use committable::Commitment; use crate::{ service::{BroadcastReceivers, GlobalState, ReceivedTransaction}, - utils::{BlockId, BuilderStateId, LegacyCommit as _, ParentBlockReferences, RotatingSet}, + utils::LegacyCommit as _, }; use async_broadcast::broadcast; use async_broadcast::Receiver as BroadcastReceiver; diff --git a/crates/marketplace/src/service.rs b/crates/marketplace/src/service.rs index 4088a140..5e72ec5c 100644 --- a/crates/marketplace/src/service.rs +++ b/crates/marketplace/src/service.rs @@ -1,4 +1,23 @@ +use std::{fmt::Debug, marker::PhantomData, time::Duration}; + +use crate::{ + builder_state::{ + BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QuorumProposalMessage, + RequestMessage, ResponseMessage, TransactionSource, + }, + utils::LegacyCommit as _, +}; +use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; + use anyhow::bail; +pub use async_broadcast::{broadcast, RecvError, TryRecvError}; +use async_broadcast::{InactiveReceiver, Sender as BroadcastSender, TrySendError}; +use async_lock::RwLock; +use async_trait::async_trait; +use committable::{Commitment, Committable}; +use derivative::Derivative; +use futures::stream::StreamExt; +use futures::{future::BoxFuture, Stream}; use hotshot::types::Event; use hotshot_builder_api::v0_3::{ builder::{define_api, submit_api, BuildError, Error as BuilderApiError}, @@ -18,26 +37,6 @@ use hotshot_types::{ utils::BuilderCommitment, vid::VidCommitment, }; -use tracing::{error, instrument}; -use vbs::version::StaticVersion; - -use std::{fmt::Debug, marker::PhantomData, time::Duration}; - -use crate::{ - builder_state::{ - BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QuorumProposalMessage, - RequestMessage, ResponseMessage, TransactionSource, - }, - utils::{BlockId, BuilderStateId, LegacyCommit as _, ParentBlockReferences}, -}; -pub use async_broadcast::{broadcast, RecvError, TryRecvError}; -use async_broadcast::{InactiveReceiver, Sender as BroadcastSender, TrySendError}; -use async_lock::RwLock; -use async_trait::async_trait; -use committable::{Commitment, Committable}; -use derivative::Derivative; -use futures::stream::StreamExt; -use futures::{future::BoxFuture, Stream}; use sha2::{Digest, Sha256}; use std::collections::HashMap; use std::num::NonZeroUsize; @@ -45,6 +44,10 @@ use std::sync::Arc; use std::{fmt::Display, time::Instant}; use tagged_base64::TaggedBase64; use tide_disco::{app::AppError, method::ReadState, App}; +use tracing::{error, instrument}; +use vbs::version::StaticVersion; + +pub use marketplace_builder_shared::utils::EventServiceStream; // It holds all the necessary information for a block #[derive(Debug)] @@ -608,14 +611,34 @@ pub struct BroadcastSenders { pub trait BuilderHooks: Sync + Send + 'static { #[inline(always)] async fn process_transactions( - self: &Arc, + &self, transactions: Vec, ) -> Vec { transactions } #[inline(always)] - async fn handle_hotshot_event(self: &Arc, _event: &Event) {} + async fn handle_hotshot_event(&self, _event: &Event) {} +} + +#[async_trait] +impl BuilderHooks for Box +where + Types: NodeType, + T: BuilderHooks, +{ + #[inline(always)] + async fn process_transactions( + &self, + transactions: Vec, + ) -> Vec { + (**self).process_transactions(transactions).await + } + + #[inline(always)] + async fn handle_hotshot_event(&self, event: &Event) { + (**self).handle_hotshot_event(event).await + } } pub struct NoHooks(pub PhantomData); diff --git a/crates/marketplace/src/testing/mod.rs b/crates/marketplace/src/testing/mod.rs index 616e2113..c046ae2a 100644 --- a/crates/marketplace/src/testing/mod.rs +++ b/crates/marketplace/src/testing/mod.rs @@ -6,7 +6,7 @@ use crate::{ ResponseMessage, }, service::BroadcastSenders, - utils::{BuilderStateId, LegacyCommit}, + utils::LegacyCommit, }; use async_broadcast::broadcast; use async_compatibility_layer::channel::{unbounded, UnboundedReceiver}; @@ -33,10 +33,10 @@ use hotshot_example_types::{ node_types::TestVersions, state_types::{TestInstanceState, TestValidatedState}, }; +use marketplace_builder_shared::block::{BuilderStateId, ParentBlockReferences}; use serde::{Deserialize, Serialize}; use crate::service::{broadcast_channels, GlobalState}; -use crate::utils::ParentBlockReferences; use async_lock::RwLock; use committable::{Commitment, CommitmentBoundsArkless, Committable}; use std::sync::Arc; diff --git a/crates/marketplace/src/testing/order_test.rs b/crates/marketplace/src/testing/order_test.rs index 25a59080..09f47cf5 100644 --- a/crates/marketplace/src/testing/order_test.rs +++ b/crates/marketplace/src/testing/order_test.rs @@ -4,11 +4,9 @@ use hotshot_types::{ data::QuorumProposal, traits::node_implementation::{ConsensusTime, NodeType}, }; +use marketplace_builder_shared::block::BuilderStateId; -use crate::{ - service::{BuilderHooks, ProxyGlobalState}, - utils::BuilderStateId, -}; +use crate::service::{BuilderHooks, ProxyGlobalState}; use std::{fmt::Debug, sync::Arc}; @@ -31,14 +29,14 @@ struct NoOpHooks; impl BuilderHooks for NoOpHooks { #[inline(always)] async fn process_transactions( - self: &Arc, + &self, transactions: Vec<::Transaction>, ) -> Vec<::Transaction> { transactions } #[inline(always)] - async fn handle_hotshot_event(self: &Arc, _event: &Event) {} + async fn handle_hotshot_event(&self, _event: &Event) {} } /// [RoundTransactionBehavior] is an enum that is used to represent different diff --git a/crates/marketplace/src/utils.rs b/crates/marketplace/src/utils.rs index 25a5f214..030dc911 100644 --- a/crates/marketplace/src/utils.rs +++ b/crates/marketplace/src/utils.rs @@ -1,239 +1,4 @@ -use std::{ - collections::HashSet, - future::Future, - hash::Hash, - mem, - pin::Pin, - task::Poll, - time::{Duration, Instant}, -}; - -use committable::Commitment; -use either::Either::{self, Left, Right}; -use futures::{FutureExt, Stream, StreamExt}; -use hotshot::types::Event; -use hotshot_events_service::events::Error as EventStreamError; -use hotshot_types::{ - data::Leaf, traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment, -}; -use surf_disco::Client; -use tracing::error; -use url::Url; -use vbs::version::StaticVersionType; - -/// A set that allows for time-based garbage collection, -/// implemented as three sets that are periodically shifted right. -/// Garage collection is triggered by calling [`Self::rotate`]. -#[derive(Clone, Debug)] -pub struct RotatingSet -where - T: PartialEq + Eq + Hash + Clone, -{ - fresh: HashSet, - stale: HashSet, - expiring: HashSet, - last_rotation: Instant, - period: Duration, -} - -impl RotatingSet -where - T: PartialEq + Eq + Hash + Clone, -{ - /// Construct a new `RotatingSet` - pub fn new(period: Duration) -> Self { - Self { - fresh: HashSet::new(), - stale: HashSet::new(), - expiring: HashSet::new(), - last_rotation: Instant::now(), - period, - } - } - - /// Returns `true` if the key is contained in the set - pub fn contains(&self, key: &T) -> bool { - self.fresh.contains(key) || self.stale.contains(key) || self.expiring.contains(key) - } - - /// Insert a `key` into the set. Doesn't trigger garbage collection - pub fn insert(&mut self, value: T) { - self.fresh.insert(value); - } - - /// Force garbage collection, even if the time elapsed since - /// the last garbage collection is less than `self.period` - pub fn force_rotate(&mut self) { - let now_stale = mem::take(&mut self.fresh); - let now_expiring = mem::replace(&mut self.stale, now_stale); - self.expiring = now_expiring; - self.last_rotation = Instant::now(); - } - - /// Trigger garbage collection. - pub fn rotate(&mut self) -> bool { - if self.last_rotation.elapsed() > self.period { - self.force_rotate(); - true - } else { - false - } - } -} - -impl Extend for RotatingSet -where - T: PartialEq + Eq + Hash + Clone, -{ - fn extend>(&mut self, iter: I) { - self.fresh.extend(iter) - } -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct BlockId { - pub hash: BuilderCommitment, - pub view: TYPES::Time, -} - -impl std::fmt::Display for BlockId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Block({}@{})", - hex::encode(self.hash.as_ref()), - *self.view - ) - } -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct BuilderStateId { - pub parent_view: TYPES::Time, - pub parent_commitment: VidCommitment, -} - -impl std::fmt::Display for BuilderStateId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "BuilderState({}@{})", - self.parent_commitment, *self.parent_view - ) - } -} - -/// References to the parent block that is extended to spawn the new builder state. -#[derive(Debug, Clone)] -pub struct ParentBlockReferences { - pub view_number: TYPES::Time, - pub vid_commitment: VidCommitment, - pub leaf_commit: Commitment>, - pub builder_commitment: BuilderCommitment, -} - -// implement display for the derived info -impl std::fmt::Display for ParentBlockReferences { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "View Number: {:?}", self.view_number) - } -} - -type EventServiceConnection = surf_disco::socket::Connection< - Event, - surf_disco::socket::Unsupported, - EventStreamError, - V, ->; - -type EventServiceReconnect = - Pin>> + Send + Sync>>; - -/// A wrapper around event streaming API that provides auto-reconnection capability -pub struct EventServiceStream { - api_url: Url, - connection: Either, EventServiceReconnect>, -} - -impl EventServiceStream { - async fn connect_inner( - url: Url, - ) -> anyhow::Result< - surf_disco::socket::Connection< - Event, - surf_disco::socket::Unsupported, - EventStreamError, - V, - >, - > { - let client = Client::::new(url.clone()); - - if !(client.connect(None).await) { - anyhow::bail!("Couldn't connect to API url"); - } - - tracing::info!("Builder client connected to the hotshot events api"); - - Ok(client - .socket("hotshot-events/events") - .subscribe::>() - .await?) - } - - /// Establish initial connection to the events service at `api_url` - pub async fn connect(api_url: Url) -> anyhow::Result { - let connection = Self::connect_inner(api_url.clone()).await?; - - Ok(Self { - api_url, - connection: Left(connection), - }) - } -} - -impl Stream for EventServiceStream { - type Item = Event; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - match &mut self.connection { - Left(connection) => { - let Poll::Ready(next) = connection.poll_next_unpin(cx) else { - return Poll::Pending; - }; - match next { - Some(Ok(event)) => Poll::Ready(Some(event)), - Some(Err(err)) => { - error!("Error in the event stream: {err:?}"); - Poll::Pending - } - None => { - let fut = Self::connect_inner(self.api_url.clone()); - let _ = std::mem::replace(&mut self.connection, Right(Box::pin(fut))); - Poll::Pending - } - } - } - Right(reconnect_future) => { - let Poll::Ready(ready) = reconnect_future.poll_unpin(cx) else { - return Poll::Pending; - }; - match ready { - Ok(connection) => { - let _ = std::mem::replace(&mut self.connection, Left(connection)); - Poll::Pending - } - Err(err) => { - error!("Failed to reconnect to the event service: {err:?}"); - Poll::Ready(None) - } - } - } - } - } -} +use hotshot_types::traits::node_implementation::NodeType; // TODO: Update commitment calculation with the new `commit`. // diff --git a/crates/shared/Cargo.toml b/crates/shared/Cargo.toml new file mode 100644 index 00000000..63658689 --- /dev/null +++ b/crates/shared/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "marketplace-builder-shared" +version = { workspace = true } +edition = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +async-broadcast = { workspace = true } +hotshot = { workspace = true } +hotshot-builder-api = { workspace = true } +hotshot-events-service = { workspace = true } +hotshot-task-impls = { workspace = true } +hotshot-types = { workspace = true } + +async-trait = { workspace = true } +committable = { workspace = true } +either = { workspace = true } +futures = { workspace = true } +hex = { workspace = true } +surf-disco = { workspace = true } +tracing = { workspace = true } +url = { workspace = true } +vbs = { workspace = true } + +[lints] +workspace = true diff --git a/crates/shared/src/block.rs b/crates/shared/src/block.rs new file mode 100644 index 00000000..87d8e285 --- /dev/null +++ b/crates/shared/src/block.rs @@ -0,0 +1,66 @@ +//! Shared types dealing with block information + +use committable::Commitment; +use hotshot_types::{ + data::Leaf, traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment, +}; + +/// Unique identifier for a block +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct BlockId { + /// Block hash + pub hash: BuilderCommitment, + /// Block view + pub view: TYPES::Time, +} + +impl std::fmt::Display for BlockId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Block({}@{})", + hex::encode(self.hash.as_ref()), + *self.view + ) + } +} + +/// Unique identifier for a builder state +/// +/// Builder state is identified by the VID commitment +/// and view of the block it targets to extend, i.e. +/// builder with given state ID assumes blocks/bundles it's building +/// are going to be included immediately after the parent block. +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct BuilderStateId { + /// View number of the parent block + pub parent_view: TYPES::Time, + /// VID commitment of the parent block + pub parent_commitment: VidCommitment, +} + +impl std::fmt::Display for BuilderStateId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "BuilderState({}@{})", + self.parent_commitment, *self.parent_view + ) + } +} + +/// References to the parent block that is extended to spawn the new builder state. +#[derive(Debug, Clone)] +pub struct ParentBlockReferences { + pub view_number: TYPES::Time, + pub vid_commitment: VidCommitment, + pub leaf_commit: Commitment>, + pub builder_commitment: BuilderCommitment, +} + +// implement display for the derived info +impl std::fmt::Display for ParentBlockReferences { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "View Number: {:?}", self.view_number) + } +} diff --git a/crates/shared/src/lib.rs b/crates/shared/src/lib.rs new file mode 100644 index 00000000..30eb349d --- /dev/null +++ b/crates/shared/src/lib.rs @@ -0,0 +1,4 @@ +pub mod block; +#[cfg(test)] +pub mod testing; +pub mod utils; diff --git a/crates/shared/src/testing.rs b/crates/shared/src/testing.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/crates/shared/src/testing.rs @@ -0,0 +1 @@ + diff --git a/crates/shared/src/utils.rs b/crates/shared/src/utils.rs new file mode 100644 index 00000000..0565cf5a --- /dev/null +++ b/crates/shared/src/utils.rs @@ -0,0 +1,183 @@ +use std::{future::Future, pin::Pin, task::Poll}; + +use std::{ + collections::HashSet, + hash::Hash, + mem, + time::{Duration, Instant}, +}; + +use either::Either::{self, Left, Right}; +use futures::{FutureExt, Stream, StreamExt}; +use hotshot::types::Event; +use hotshot_events_service::events::Error as EventStreamError; +use hotshot_types::traits::node_implementation::NodeType; +use surf_disco::Client; +use tracing::error; +use url::Url; +use vbs::version::StaticVersionType; + +/// A set that allows for time-based garbage collection, +/// implemented as three sets that are periodically shifted right. +/// Garbage collection is triggered by calling [`Self::rotate`]. +#[derive(Clone, Debug)] +pub struct RotatingSet +where + T: PartialEq + Eq + Hash + Clone, +{ + fresh: HashSet, + stale: HashSet, + expiring: HashSet, + last_rotation: Instant, + period: Duration, +} + +impl RotatingSet +where + T: PartialEq + Eq + Hash + Clone, +{ + /// Construct a new `RotatingSet` + pub fn new(period: Duration) -> Self { + Self { + fresh: HashSet::new(), + stale: HashSet::new(), + expiring: HashSet::new(), + last_rotation: Instant::now(), + period, + } + } + + /// Returns `true` if the key is contained in the set + pub fn contains(&self, key: &T) -> bool { + self.fresh.contains(key) || self.stale.contains(key) || self.expiring.contains(key) + } + + /// Insert a `key` into the set. Doesn't trigger garbage collection + pub fn insert(&mut self, value: T) { + self.fresh.insert(value); + } + + /// Force garbage collection, even if the time elapsed since + /// the last garbage collection is less than `self.period` + pub fn force_rotate(&mut self) { + let now_stale = mem::take(&mut self.fresh); + let now_expiring = mem::replace(&mut self.stale, now_stale); + self.expiring = now_expiring; + self.last_rotation = Instant::now(); + } + + /// Trigger garbage collection. + pub fn rotate(&mut self) -> bool { + if self.last_rotation.elapsed() > self.period { + self.force_rotate(); + true + } else { + false + } + } +} + +impl Extend for RotatingSet +where + T: PartialEq + Eq + Hash + Clone, +{ + fn extend>(&mut self, iter: I) { + self.fresh.extend(iter) + } +} + +type EventServiceConnection = surf_disco::socket::Connection< + Event, + surf_disco::socket::Unsupported, + EventStreamError, + V, +>; + +type EventServiceReconnect = + Pin>> + Send + Sync>>; + +/// A wrapper around event streaming API that provides auto-reconnection capability +pub struct EventServiceStream { + api_url: Url, + connection: Either, EventServiceReconnect>, +} + +impl EventServiceStream { + async fn connect_inner( + url: Url, + ) -> anyhow::Result< + surf_disco::socket::Connection< + Event, + surf_disco::socket::Unsupported, + EventStreamError, + V, + >, + > { + let client = Client::::new(url.clone()); + + if !(client.connect(None).await) { + anyhow::bail!("Couldn't connect to API url"); + } + + tracing::info!("Builder client connected to the hotshot events api"); + + Ok(client + .socket("hotshot-events/events") + .subscribe::>() + .await?) + } + + /// Establish initial connection to the events service at `api_url` + pub async fn connect(api_url: Url) -> anyhow::Result { + let connection = Self::connect_inner(api_url.clone()).await?; + + Ok(Self { + api_url, + connection: Left(connection), + }) + } +} + +impl Stream for EventServiceStream { + type Item = Event; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + match &mut self.connection { + Left(connection) => { + let Poll::Ready(next) = connection.poll_next_unpin(cx) else { + return Poll::Pending; + }; + match next { + Some(Ok(event)) => Poll::Ready(Some(event)), + Some(Err(err)) => { + error!("Error in the event stream: {err:?}"); + Poll::Pending + } + None => { + let fut = Self::connect_inner(self.api_url.clone()); + let _ = std::mem::replace(&mut self.connection, Right(Box::pin(fut))); + Poll::Pending + } + } + } + Right(reconnect_future) => { + let Poll::Ready(ready) = reconnect_future.poll_unpin(cx) else { + return Poll::Pending; + }; + match ready { + Ok(connection) => { + let _ = std::mem::replace(&mut self.connection, Left(connection)); + Poll::Pending + } + Err(err) => { + error!("Failed to reconnect to the event service: {err:?}"); + Poll::Ready(None) + } + } + } + } + } +} From 69579c9226e32727da2b829abc8ab9db08bf7b51 Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Fri, 11 Oct 2024 19:48:11 +0200 Subject: [PATCH 02/12] impl Stream -> generic param --- crates/legacy/src/service.rs | 3 ++- crates/marketplace/src/service.rs | 7 +++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/legacy/src/service.rs b/crates/legacy/src/service.rs index 416504d5..fa073b61 100644 --- a/crates/legacy/src/service.rs +++ b/crates/legacy/src/service.rs @@ -1011,6 +1011,7 @@ Running Non-Permissioned Builder Service pub async fn run_non_permissioned_standalone_builder_service< Types: NodeType, Ver: StaticVersionType, + S: Stream> + Unpin, >( // sending a DA proposal from the hotshot to the builder states da_sender: BroadcastSender>, @@ -1022,7 +1023,7 @@ pub async fn run_non_permissioned_standalone_builder_service< decide_sender: BroadcastSender>, // HotShot event stream - hotshot_event_stream: impl Stream>, + hotshot_event_stream: S, total_nodes: NonZeroUsize, diff --git a/crates/marketplace/src/service.rs b/crates/marketplace/src/service.rs index 5e72ec5c..77cbe4ca 100644 --- a/crates/marketplace/src/service.rs +++ b/crates/marketplace/src/service.rs @@ -647,10 +647,13 @@ impl BuilderHooks for NoHooks {} /// Run builder service, /// Refer to documentation for [`ProxyGlobalState`] for more details -pub async fn run_builder_service>( +pub async fn run_builder_service< + TYPES: NodeType