From d818aff8d1dea580429517306d8b1cef41d501d0 Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Wed, 16 Oct 2024 17:23:59 +0200 Subject: [PATCH] Extract shared crate (#158) --- Cargo.lock | 24 + Cargo.toml | 1 + crates/legacy/Cargo.toml | 1 + crates/legacy/src/builder_state.rs | 23 +- crates/legacy/src/lib.rs | 49 +- crates/legacy/src/service.rs | 923 +++--------------- crates/legacy/src/testing/basic_test.rs | 3 +- .../legacy/src/testing/finalization_test.rs | 23 +- crates/marketplace/Cargo.toml | 1 + crates/marketplace/src/builder_state.rs | 4 +- crates/marketplace/src/service.rs | 74 +- crates/marketplace/src/testing/mod.rs | 4 +- crates/marketplace/src/testing/order_test.rs | 10 +- crates/marketplace/src/utils.rs | 237 +---- crates/shared/Cargo.toml | 26 + crates/shared/src/block.rs | 66 ++ crates/shared/src/lib.rs | 4 + crates/shared/src/testing.rs | 1 + crates/shared/src/utils.rs | 183 ++++ 19 files changed, 513 insertions(+), 1144 deletions(-) create mode 100644 crates/shared/Cargo.toml create mode 100644 crates/shared/src/block.rs create mode 100644 crates/shared/src/lib.rs create mode 100644 crates/shared/src/testing.rs create mode 100644 crates/shared/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index a9430e92..34382683 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3078,6 +3078,7 @@ dependencies = [ "hotshot-example-types", "hotshot-types", "lru 0.12.5", + "marketplace-builder-shared", "serde", "sha2 0.10.8", "snafu", @@ -4633,6 +4634,7 @@ dependencies = [ "hotshot-task-impls", "hotshot-types", "lru 0.12.5", + "marketplace-builder-shared", "multimap", "rkyv", "serde", @@ -4649,6 +4651,28 @@ dependencies = [ "vbs", ] +[[package]] +name = "marketplace-builder-shared" +version = "0.1.51" +dependencies = [ + "anyhow", + "async-broadcast", + "async-trait", + "committable", + "either", + "futures", + "hex", + "hotshot", + "hotshot-builder-api", + "hotshot-events-service", + "hotshot-task-impls", + "hotshot-types", + "surf-disco", + "tracing", + "url", + "vbs", +] + [[package]] name = "match_cfg" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index ac7b0178..8de4acb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ edition = "2021" [workspace.lints.rust] unexpected_cfgs = { level = "warn", check-cfg = [ 'cfg(async_executor_impl, values("async-std", "tokio"))', + 'cfg(async_channel_impl, values("async-std", "tokio"))', ] } [workspace.lints.clippy] diff --git a/crates/legacy/Cargo.toml b/crates/legacy/Cargo.toml index 498c92ac..1e92e640 100644 --- a/crates/legacy/Cargo.toml +++ b/crates/legacy/Cargo.toml @@ -4,6 +4,7 @@ version = { workspace = true } edition = { workspace = true } [dependencies] +marketplace-builder-shared = { path = "../shared" } anyhow = { workspace = true } async-broadcast = { workspace = true } diff --git a/crates/legacy/src/builder_state.rs b/crates/legacy/src/builder_state.rs index b6f854fd..00c94b4c 100644 --- a/crates/legacy/src/builder_state.rs +++ b/crates/legacy/src/builder_state.rs @@ -10,12 +10,13 @@ use hotshot_types::{ utils::BuilderCommitment, vid::{VidCommitment, VidPrecomputeData}, }; +use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; use committable::Commitment; use crate::{ service::{GlobalState, ReceivedTransaction}, - BlockId, BuilderStateId, LegacyCommit, ParentBlockReferences, + LegacyCommit, }; use async_broadcast::broadcast; use async_broadcast::Receiver as BroadcastReceiver; @@ -267,7 +268,7 @@ async fn best_builder_states_to_extend( let current_commitment = quorum_proposal.data.block_header.payload_commitment(); let current_builder_state_id = BuilderStateId:: { parent_commitment: current_commitment, - view: current_view_number, + parent_view: current_view_number, }; let global_state_read_lock = global_state.read_arc().await; @@ -319,7 +320,7 @@ async fn best_builder_states_to_extend( let maximum_stored_view_number_smaller_than_quorum_proposal = global_state_read_lock .spawned_builder_states .keys() - .map(|builder_state_id| *builder_state_id.view) + .map(|builder_state_id| *builder_state_id.parent_view) .filter(|view_number| view_number < ¤t_view_number) .max(); @@ -337,7 +338,7 @@ async fn best_builder_states_to_extend( .spawned_builder_states .keys() .filter(|builder_state_id| { - builder_state_id.view.u64() + builder_state_id.parent_view.u64() == maximum_stored_view_number_smaller_than_quorum_proposal }) { @@ -385,7 +386,7 @@ impl BuilderState { .map(|builder_state_id| format!( "{}@{}", builder_state_id.parent_commitment, - builder_state_id.view.u64() + builder_state_id.parent_view.u64() )) .collect::>(), quorum_proposal.data.block_header.payload_commitment(), @@ -396,7 +397,7 @@ impl BuilderState { // best [BuilderState]s to extend from. best_builder_states_to_extend.contains(&BuilderStateId { parent_commitment: self.parent_block_references.vid_commitment, - view: self.parent_block_references.view_number, + parent_view: self.parent_block_references.view_number, }) } @@ -643,7 +644,7 @@ impl BuilderState { let builder_state_id = BuilderStateId { parent_commitment: self.parent_block_references.vid_commitment, - view: self.parent_block_references.view_number, + parent_view: self.parent_block_references.view_number, }; { @@ -688,7 +689,7 @@ impl BuilderState { self.global_state.write_arc().await.register_builder_state( BuilderStateId { parent_commitment: self.parent_block_references.vid_commitment, - view: self.parent_block_references.view_number, + parent_view: self.parent_block_references.view_number, }, self.parent_block_references.clone(), req_sender, @@ -724,7 +725,7 @@ impl BuilderState { let should_prioritize_finalization = self .allow_empty_block_until - .map(|until| state_id.view < until) + .map(|until| state_id.parent_view < until) .unwrap_or(false); if self.tx_queue.is_empty() && !should_prioritize_finalization { @@ -840,7 +841,7 @@ impl BuilderState { async fn process_block_request(&mut self, req: RequestMessage) { // If a spawned clone is active then it will handle the request, otherwise the highest view num builder will handle if req.state_id.parent_commitment != self.parent_block_references.vid_commitment - || req.state_id.view != self.parent_block_references.view_number + || req.state_id.parent_view != self.parent_block_references.view_number { tracing::debug!( "Builder {:?} Requested Builder commitment does not match the built_from_view, so ignoring it", @@ -856,7 +857,7 @@ impl BuilderState { .highest_view_num_builder_id .clone(); - if highest_view_num_builder_id.view != self.parent_block_references.view_number { + if highest_view_num_builder_id.parent_view != self.parent_block_references.view_number { tracing::debug!( "Builder {:?} Requested Builder commitment does not match the highest_view_num_builder_id, so ignoring it", self.parent_block_references.view_number diff --git a/crates/legacy/src/lib.rs b/crates/legacy/src/lib.rs index 9a0d3fa8..aa6bbc46 100644 --- a/crates/legacy/src/lib.rs +++ b/crates/legacy/src/lib.rs @@ -22,11 +22,8 @@ pub mod service; pub mod testing; use async_compatibility_layer::channel::UnboundedReceiver; -use committable::Commitment; use hotshot_builder_api::v0_1::builder::BuildError; -use hotshot_types::{ - data::Leaf, traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment, -}; +use hotshot_types::traits::node_implementation::NodeType; /// `WaitAndKeep` is a helper enum that allows for the lazy polling of a single /// value from an unbound receiver. @@ -78,50 +75,6 @@ impl WaitAndKeep { } } -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct BlockId { - hash: BuilderCommitment, - view: Types::Time, -} - -impl std::fmt::Display for BlockId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Block({}@{})", - hex::encode(self.hash.as_ref()), - *self.view - ) - } -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct BuilderStateId { - parent_commitment: VidCommitment, - view: Types::Time, -} - -impl std::fmt::Display for BuilderStateId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "BuilderState({}@{})", self.parent_commitment, *self.view) - } -} - -/// References to the parent block that is extended to spawn the new builder state. -#[derive(Debug, Clone)] -pub struct ParentBlockReferences { - pub view_number: TYPES::Time, - pub vid_commitment: VidCommitment, - pub leaf_commit: Commitment>, - pub builder_commitment: BuilderCommitment, -} -// implement display for the referenced info -impl std::fmt::Display for ParentBlockReferences { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "View Number: {:?}", self.view_number) - } -} - // TODO: Update commitment calculation with the new `commit`. // trait LegacyCommit { diff --git a/crates/legacy/src/service.rs b/crates/legacy/src/service.rs index 914aee98..7351b398 100644 --- a/crates/legacy/src/service.rs +++ b/crates/legacy/src/service.rs @@ -1,7 +1,4 @@ -use hotshot::{ - traits::{election::static_committee::StaticCommittee, NodeImplementation}, - types::{Event, SystemContextHandle}, -}; +use hotshot::types::Event; use hotshot_builder_api::v0_1::{ block_info::{AvailableBlockData, AvailableBlockHeaderInput, AvailableBlockInfo}, builder::BuildError, @@ -13,10 +10,7 @@ use hotshot_types::{ message::Proposal, traits::{ block_contents::BlockPayload, - consensus_api::ConsensusApi, - election::Membership, - network::Topic, - node_implementation::{ConsensusTime, NodeType, Versions}, + node_implementation::{ConsensusTime, NodeType}, signature_key::{BuilderSignatureKey, SignatureKey}, }, utils::BuilderCommitment, @@ -25,19 +19,17 @@ use hotshot_types::{ use lru::LruCache; use vbs::version::StaticVersionType; +use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; + +use crate::builder_state::{MessageType, RequestMessage, ResponseMessage}; use crate::{ builder_state::{ BuildBlockInfo, DaProposalMessage, DecideMessage, QuorumProposalMessage, TransactionSource, TriggerStatus, }, - BlockId, LegacyCommit as _, -}; -use crate::{ - builder_state::{MessageType, RequestMessage, ResponseMessage}, - BuilderStateId, ParentBlockReferences, + LegacyCommit as _, }; use crate::{WaitAndKeep, WaitAndKeepGetError}; -use anyhow::anyhow; pub use async_broadcast::{broadcast, RecvError, TryRecvError}; use async_broadcast::{Sender as BroadcastSender, TrySendError}; use async_compatibility_layer::{ @@ -50,15 +42,14 @@ use async_trait::async_trait; use committable::{Commitment, Committable}; use futures::stream::StreamExt; use futures::{future::BoxFuture, Stream}; -use hotshot_events_service::{events::Error as EventStreamError, events_source::StartupInfo}; use sha2::{Digest, Sha256}; +use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; -use std::{collections::HashMap, marker::PhantomData}; use std::{fmt::Display, time::Instant}; use tagged_base64::TaggedBase64; -use tide_disco::{method::ReadState, Url}; +use tide_disco::method::ReadState; // Start assuming we're fine calculatig VID for 5 megabyte blocks const INITIAL_MAX_BLOCK_SIZE: u64 = 5_000_000; @@ -182,7 +173,7 @@ impl GlobalState { let mut spawned_builder_states = HashMap::new(); let bootstrap_id = BuilderStateId { parent_commitment: bootstrapped_builder_state_id, - view: bootstrapped_view_num, + parent_view: bootstrapped_view_num, }; spawned_builder_states.insert(bootstrap_id.clone(), (None, bootstrap_sender.clone())); GlobalState { @@ -228,7 +219,7 @@ impl GlobalState { } // keep track of the max view number - if parent_id.view > self.highest_view_num_builder_id.view { + if parent_id.parent_view > self.highest_view_num_builder_id.parent_view { tracing::info!("registering builder {parent_id} as highest",); self.highest_view_num_builder_id = parent_id; } else { @@ -308,9 +299,9 @@ impl GlobalState { pub fn remove_handles(&mut self, on_decide_view: Types::Time) -> Types::Time { // remove everything from the spawned builder states when view_num <= on_decide_view; // if we don't have a highest view > decide, use highest view as cutoff. - let cutoff = std::cmp::min(self.highest_view_num_builder_id.view, on_decide_view); + let cutoff = std::cmp::min(self.highest_view_num_builder_id.parent_view, on_decide_view); self.spawned_builder_states - .retain(|id, _| id.view >= cutoff); + .retain(|id, _| id.parent_view >= cutoff); let cutoff_u64 = cutoff.u64(); let gc_view = if cutoff_u64 > 0 { cutoff_u64 - 1 } else { 0 }; @@ -365,7 +356,7 @@ impl GlobalState { // iterate over the spawned builder states and check if the view number exists self.spawned_builder_states .iter() - .any(|(id, _)| id.view == *key) + .any(|(id, _)| id.parent_view == *key) } pub fn should_view_handle_other_proposals( @@ -373,7 +364,7 @@ impl GlobalState { builder_view: &Types::Time, proposal_view: &Types::Time, ) -> bool { - *builder_view == self.highest_view_num_builder_id.view + *builder_view == self.highest_view_num_builder_id.parent_view && !self.check_builder_state_existence_for_a_view(proposal_view) } } @@ -540,7 +531,7 @@ impl ProxyGlobalState { let state_id = BuilderStateId { parent_commitment: *for_parent, - view: Types::Time::new(view_number), + parent_view: Types::Time::new(view_number), }; // verify the signature @@ -551,7 +542,7 @@ impl ProxyGlobalState { tracing::info!("Requesting available blocks for {state_id}",); - let view_num = state_id.view; + let view_num = state_id.parent_view; // check in the local spawned builder states // if it doesn't exist; there are three cases // 1) it has already been garbage collected (view < decide) and we should return an error @@ -563,14 +554,14 @@ impl ProxyGlobalState { // If this `BlockBuilder` hasn't been reaped, it should have been. let global_state = self.global_state.read_arc().await; if view_num < global_state.last_garbage_collected_view_num - && global_state.highest_view_num_builder_id.view + && global_state.highest_view_num_builder_id.parent_view != global_state.last_garbage_collected_view_num { tracing::warn!( "Requesting for view {:?}, last decide-triggered cleanup on view {:?}, highest view num is {:?}", view_num, global_state.last_garbage_collected_view_num, - global_state.highest_view_num_builder_id.view + global_state.highest_view_num_builder_id.parent_view ); return Err(AvailableBlocksError::RequestForAvailableViewThatHasAlreadyBeenDecided); } @@ -1013,177 +1004,13 @@ impl ReadState for ProxyGlobalState { } } -/// [HotShotEventsService] is a trait that defines the interface for the -/// hotshot events service. The service is expected to provide a stream of -/// events and a startup info. -#[async_trait] -trait HotShotEventsService { - type EventsStream: Stream, EventStreamError>>; - type EventsError; - - type StartUpInfo; - type StartUpInfoError; - - /// Check that the service is connected, or can be connected to the hotshot events service. - /// It's use is optional, and it has been included strictly for current logic compatibility. - async fn check_connection(&self, timeout: Option) -> bool; - - /// Returns a stream of events from the hotshot events service. - /// - /// If [check_connection](HotShotEventsService::check_connection) - /// has not been called before this function, the connection should be established. - async fn events(&self) -> Result; - - /// Returns the startup info from the hotshot events service. - /// - /// If [check_connection](HotShotEventsService::check_connection) - /// has not been called before this function, the connection should be established. - async fn startup_info(&self) -> Result; -} - -struct HotShotEventsServiceTideDiscoClient { - client: surf_disco::Client, - _pd: PhantomData, -} - -impl HotShotEventsServiceTideDiscoClient { - fn new(client: surf_disco::Client) -> Self { - HotShotEventsServiceTideDiscoClient { - client, - _pd: Default::default(), - } - } - - fn from_url(url: Url) -> Self { - let client = surf_disco::Client::::new(url); - Self::new(client) - } -} - -#[async_trait] -impl HotShotEventsService - for HotShotEventsServiceTideDiscoClient -{ - type EventsStream = surf_disco::socket::Connection< - Event, - surf_disco::socket::Unsupported, - EventStreamError, - Ver, - >; - type EventsError = hotshot_events_service::events::Error; - - type StartUpInfo = StartupInfo; - type StartUpInfoError = hotshot_events_service::events::Error; - - async fn check_connection(&self, timeout: Option) -> bool { - self.client.connect(timeout).await - } - - async fn events(&self) -> Result { - self.client - .socket("hotshot-events/events") - .subscribe::>() - .await - } - - async fn startup_info(&self) -> Result { - self.client - .get::>("hotshot-events/startup_info") - .send() - .await - } -} - -/// [`ConnectToEventsServiceError`] is an error enum that represents the class -/// of possible errors that can be returned when calling -/// `connect_to_events_service`. -#[derive(Debug)] -enum ConnectToEventsServiceError { - /// Represents a failure to connect to the hotshot events service. - Connection, - - /// Represents a failure to subscribe to the events stream. - Subscription(hotshot_events_service::events::Error), - - /// Represents a failure to retrieve the startup info from the hotshot events service. - StartupInfo(hotshot_events_service::events::Error), -} - -impl Display for ConnectToEventsServiceError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ConnectToEventsServiceError::Connection => { - write!(f, "failed to connect to the hotshot events service") - } - ConnectToEventsServiceError::Subscription(e) => { - write!(f, "failed to subscribe to the events stream: {:?}", e) - } - ConnectToEventsServiceError::StartupInfo(e) => { - write!(f, "failed to retrieve the startup info: {:?}", e) - } - } - } -} - -/// [`connect_to_events_service`] is a function that will attempt to connect to -/// the hotshot events service, setup a subscription to the events stream, -/// and retrieve the startup info. If all is successful, it will return the -/// stream, and membership information derived from the startup info. -async fn connect_to_events_service( - client: &C, -) -> Result<(C::EventsStream, StaticCommittee), ConnectToEventsServiceError> -where - C: HotShotEventsService< - Types, - EventsStream = S, - EventsError = hotshot_events_service::events::Error, - StartUpInfo = StartupInfo, - StartUpInfoError = hotshot_events_service::events::Error, - >, - S: Stream, EventStreamError>>, -{ - if !(client.check_connection(None).await) { - return Err(ConnectToEventsServiceError::Connection); - } - - tracing::info!("Builder client connected to the hotshot events api"); - - // client subscribe to hotshot events - let subscribed_events = client - .events() - .await - .map_err(ConnectToEventsServiceError::Subscription)?; - - // handle the startup event at the start - let StartupInfo { - known_node_with_stake, - non_staked_node_count, - } = client - .startup_info() - .await - .map_err(ConnectToEventsServiceError::StartupInfo)?; - - let membership: StaticCommittee = StaticCommittee::::new( - known_node_with_stake.clone(), - known_node_with_stake.clone(), - Topic::Global, - ); - - tracing::info!( - "Startup info: Known nodes with stake: {:?}, Non-staked node count: {:?}", - known_node_with_stake, - non_staked_node_count - ); - - Ok((subscribed_events, membership)) -} - /* Running Non-Permissioned Builder Service */ pub async fn run_non_permissioned_standalone_builder_service< Types: NodeType, Ver: StaticVersionType, + S: Stream> + Unpin, >( // sending a DA proposal from the hotshot to the builder states da_sender: BroadcastSender>, @@ -1194,209 +1021,68 @@ pub async fn run_non_permissioned_standalone_builder_service< // sending a Decide event from the hotshot to the builder states decide_sender: BroadcastSender>, - // Url to (re)connect to for the events stream - hotshot_events_api_url: Url, + // HotShot event stream + hotshot_event_stream: S, + + total_nodes: NonZeroUsize, // Global state global_state: Arc>>, ) -> Result<(), anyhow::Error> { - // connection to the events stream - let events_service_client = - HotShotEventsServiceTideDiscoClient::::from_url(hotshot_events_api_url.clone()); - - let connect_to_events_service_result = connect_to_events_service(&events_service_client).await; - let (mut subscribed_events, mut membership) = match connect_to_events_service_result { - Ok((subscribed_events, membership)) => (subscribed_events, membership), - Err(err) => { - return Err(anyhow!( - "failed to connect to API at {hotshot_events_api_url}: {err}" - )); - } - }; - let tx_sender = { // This closure is likely unnecessary, but we want to play it safe // with our RWLocks. let global_state_read_lock_guard = global_state.read_arc().await; global_state_read_lock_guard.tx_sender.clone() }; + let mut hotshot_event_stream = std::pin::pin!(hotshot_event_stream); loop { - let event = subscribed_events.next().await; - //tracing::debug!("Builder Event received from HotShot: {:?}", event); - match event { - Some(Ok(event)) => { - match event.event { - EventType::Error { error } => { - tracing::error!("Error event in HotShot: {:?}", error); - } - // tx event - EventType::Transactions { transactions } => { - let max_block_size = { - // This closure is likely unnecessary, but we want - // to play it safe with our RWLocks. - let global_state_read_lock_guard = global_state.read_arc().await; - global_state_read_lock_guard.max_block_size - }; - - handle_received_txns( - &tx_sender, - transactions, - TransactionSource::HotShot, - max_block_size, - ) - .await; - } - // decide event - EventType::Decide { - block_size: _, - leaf_chain, - qc: _, - } => { - let latest_decide_view_num = leaf_chain[0].leaf.view_number(); - handle_decide_event(&decide_sender, latest_decide_view_num).await; - } - // DA proposal event - EventType::DaProposal { proposal, sender } => { - // get the leader for current view - let leader = membership.leader(proposal.data.view_number); - // get the committee mstatked node count - let total_nodes = membership.total_nodes(); - - handle_da_event( - &da_sender, - Arc::new(proposal), - sender, - leader, - NonZeroUsize::new(total_nodes).unwrap_or(NonZeroUsize::MIN), - ) - .await; - } - // Quorum proposal event - EventType::QuorumProposal { proposal, sender } => { - // get the leader for current view - let leader = membership.leader(proposal.data.view_number); - handle_quorum_event(&quorum_sender, Arc::new(proposal), sender, leader) - .await; - } - _ => { - tracing::debug!("Unhandled event from Builder"); - } - } - } - Some(Err(e)) => { - tracing::error!("Error in the event stream: {:?}", e); - } - None => { - tracing::error!("Event stream ended"); - let connected = connect_to_events_service(&events_service_client).await; - - (subscribed_events, membership) = match connected { - Ok((subscribed_events, membership)) => (subscribed_events, membership), - Err(err) => { - return Err(anyhow!( - "failed to reconnect to API at {hotshot_events_api_url}: {err}" - )); - } - }; - } - } - } -} - -/* -Running Permissioned Builder Service -*/ -pub async fn run_permissioned_standalone_builder_service< - Types: NodeType, - I: NodeImplementation, - V: Versions, ->( - // sending a DA proposal from the hotshot to the builder states - da_sender: BroadcastSender>, - - // sending a Quorum proposal from the hotshot to the builder states - quorum_sender: BroadcastSender>, - - // sending a Decide event from the hotshot to the builder states - decide_sender: BroadcastSender>, - - // hotshot context handle - hotshot_handle: Arc>, + let Some(event) = hotshot_event_stream.next().await else { + anyhow::bail!("Event stream ended"); + }; - // Global state - global_state: Arc>>, -) { - let mut event_stream = hotshot_handle.event_stream(); - let tx_sender = { - // This closure is likely unnecessary, but we want to play it safe - // with our RWLocks. - let global_state_read_lock_guard = global_state.read_arc().await; - global_state_read_lock_guard.tx_sender.clone() - }; + match event.event { + EventType::Error { error } => { + tracing::error!("Error event in HotShot: {:?}", error); + } + // tx event + EventType::Transactions { transactions } => { + let max_block_size = { + // This closure is likely unnecessary, but we want + // to play it safe with our RWLocks. + let global_state_read_lock_guard = global_state.read_arc().await; + global_state_read_lock_guard.max_block_size + }; - loop { - tracing::debug!("Waiting for events from HotShot"); - match event_stream.next().await { - None => { - tracing::error!("Didn't receive any event from the HotShot event stream"); + handle_received_txns( + &tx_sender, + transactions, + TransactionSource::HotShot, + max_block_size, + ) + .await; + } + // decide event + EventType::Decide { + block_size: _, + leaf_chain, + qc: _, + } => { + let latest_decide_view_num = leaf_chain[0].leaf.view_number(); + handle_decide_event(&decide_sender, latest_decide_view_num).await; + } + // DA proposal event + EventType::DaProposal { proposal, sender } => { + handle_da_event(&da_sender, Arc::new(proposal), sender, total_nodes).await; + } + // QC proposal event + EventType::QuorumProposal { proposal, sender } => { + // get the leader for current view + handle_quorum_event(&quorum_sender, Arc::new(proposal), sender).await; } - Some(event) => { - match event.event { - // error event - EventType::Error { error } => { - tracing::error!("Error event in HotShot: {:?}", error); - } - // tx event - EventType::Transactions { transactions } => { - let max_block_size = { - // This closure is likely unnecessary, but we want - // to play it safe with our RWLocks. - let global_state_read_lock_guard = global_state.read_arc().await; - global_state_read_lock_guard.max_block_size - }; - - handle_received_txns( - &tx_sender, - transactions, - TransactionSource::HotShot, - max_block_size, - ) - .await; - } - // decide event - EventType::Decide { leaf_chain, .. } => { - let latest_decide_view_number = leaf_chain[0].leaf.view_number(); - - handle_decide_event(&decide_sender, latest_decide_view_number).await; - } - // DA proposal event - EventType::DaProposal { proposal, sender } => { - // get the leader for current view - let leader = hotshot_handle.leader(proposal.data.view_number).await; - // get the committee staked node count - let total_nodes = hotshot_handle.total_nodes(); - - handle_da_event( - &da_sender, - Arc::new(proposal), - sender, - leader, - total_nodes, - ) - .await; - } - // Quorum proposal event - EventType::QuorumProposal { proposal, sender } => { - // get the leader for current view - let leader = hotshot_handle.leader(proposal.data.view_number).await; - handle_quorum_event(&quorum_sender, Arc::new(proposal), sender, leader) - .await; - } - _ => { - tracing::error!("Unhandled event from Builder: {:?}", event.event); - } - } + _ => { + tracing::debug!("Unhandled event from Builder"); } } } @@ -1408,7 +1094,6 @@ pub async fn run_permissioned_standalone_builder_service< /// [`handle_da_event_implementation`]. #[derive(Debug)] enum HandleDaEventError { - SenderIsNotLeader, SignatureValidationFailed, BroadcastFailed(async_broadcast::SendError>), } @@ -1421,14 +1106,12 @@ async fn handle_da_event( da_channel_sender: &BroadcastSender>, da_proposal: Arc>>, sender: ::SignatureKey, - leader: ::SignatureKey, total_nodes: NonZeroUsize, ) { // We're explicitly not inspecting this error, as this function is not // expected to return an error or any indication of an error. let _ = - handle_da_event_implementation(da_channel_sender, da_proposal, sender, leader, total_nodes) - .await; + handle_da_event_implementation(da_channel_sender, da_proposal, sender, total_nodes).await; } /// [`handle_da_event_implementation`] is a utility function that will attempt @@ -1438,9 +1121,7 @@ async fn handle_da_event( /// There are only three conditions under which this will fail to send the /// message via the given `da_channel_sender`, and they are all represented /// via [`HandleDaEventError`]. They are as follows: -/// - [`HandleDaEventError::SenderIsNotLeader`]: The sender is not the leader -/// - [`HandleDaEventError::SignatureValidationFailed`]: The signature validation -/// failed +/// - [`HandleDaEventError::SignatureValidationFailed`]: The signature validation failed /// - [`HandleDaEventError::BroadcastFailed`]: The broadcast failed as no receiver /// is in place to receive the message /// @@ -1449,12 +1130,11 @@ async fn handle_da_event_implementation( da_channel_sender: &BroadcastSender>, da_proposal: Arc>>, sender: ::SignatureKey, - leader: ::SignatureKey, total_nodes: NonZeroUsize, ) -> Result<(), HandleDaEventError> { tracing::debug!( "DaProposal: Leader: {:?} for the view: {:?}", - leader, + sender, da_proposal.data.view_number ); @@ -1462,21 +1142,18 @@ async fn handle_da_event_implementation( let encoded_txns_hash = Sha256::digest(&da_proposal.data.encoded_transactions); // check if the sender is the leader and the signature is valid; if yes, broadcast the DA proposal - // We check the failure condition first to prevent unnecessary conditional - // blocks - if leader != sender { - tracing::error!("Sender is not Leader on DaProposal for view {:?}: Leader for the current view: {:?} and sender: {:?}", da_proposal.data.view_number, leader, sender); - return Err(HandleDaEventError::SenderIsNotLeader); - } - if !sender.validate(&da_proposal.signature, &encoded_txns_hash) { - tracing::error!("Validation Failure on DaProposal for view {:?}: Leader for the current view: {:?} and sender: {:?}", da_proposal.data.view_number, leader, sender); + tracing::error!( + "Validation Failure on DaProposal for view {:?}: Leader: {:?}", + da_proposal.data.view_number, + sender + ); return Err(HandleDaEventError::SignatureValidationFailed); } let da_msg = DaProposalMessage:: { proposal: da_proposal, - sender: leader, + sender, total_nodes: total_nodes.into(), }; @@ -1507,7 +1184,6 @@ async fn handle_da_event_implementation( /// [`handle_quorum_event_implementation`]. #[derive(Debug)] enum HandleQuorumEventError { - SenderIsNotLeader, SignatureValidationFailed, BroadcastFailed(async_broadcast::SendError>), } @@ -1520,13 +1196,11 @@ async fn handle_quorum_event( quorum_channel_sender: &BroadcastSender>, quorum_proposal: Arc>>, sender: ::SignatureKey, - leader: ::SignatureKey, ) { // We're explicitly not inspecting this error, as this function is not // expected to return an error or any indication of an error. let _ = - handle_quorum_event_implementation(quorum_channel_sender, quorum_proposal, sender, leader) - .await; + handle_quorum_event_implementation(quorum_channel_sender, quorum_proposal, sender).await; } /// Utility function that will attempt to broadcast the given `quorum_proposal` @@ -1535,9 +1209,7 @@ async fn handle_quorum_event( /// There are only three conditions under which this will fail to send the /// message via the given `quorum_channel_sender`, and they are all represented /// via [`HandleQuorumEventError`]. They are as follows: -/// - [`HandleQuorumEventError::SenderIsNotLeader`]: The sender is not the leader -/// - [`HandleQuorumEventError::SignatureValidationFailed`]: The signature validation -/// failed +/// - [`HandleQuorumEventError::SignatureValidationFailed`]: The signature validation failed /// - [`HandleQuorumEventError::BroadcastFailed`]: The broadcast failed as no receiver /// is in place to receive the message /// @@ -1546,30 +1218,27 @@ async fn handle_quorum_event_implementation( quorum_channel_sender: &BroadcastSender>, quorum_proposal: Arc>>, sender: ::SignatureKey, - leader: ::SignatureKey, ) -> Result<(), HandleQuorumEventError> { tracing::debug!( "QuorumProposal: Leader: {:?} for the view: {:?}", - leader, + sender, quorum_proposal.data.view_number ); let leaf = Leaf::from_quorum_proposal(&quorum_proposal.data); - // check if the sender is the leader and the signature is valid; if yes, broadcast the Quorum proposal - if sender != leader { - tracing::error!("Sender is not Leader on QuorumProposal for view {:?}: Leader for the current view: {:?} and sender: {:?}", quorum_proposal.data.view_number, leader, sender); - return Err(HandleQuorumEventError::SenderIsNotLeader); - } - if !sender.validate(&quorum_proposal.signature, leaf.legacy_commit().as_ref()) { - tracing::error!("Validation Failure on QuorumProposal for view {:?}: Leader for the current view: {:?} and sender: {:?}", quorum_proposal.data.view_number, leader, sender); + tracing::error!( + "Validation Failure on QuorumProposal for view {:?}: Leader for the current view: {:?}", + quorum_proposal.data.view_number, + sender + ); return Err(HandleQuorumEventError::SignatureValidationFailed); } let quorum_msg = QuorumProposalMessage:: { proposal: quorum_proposal, - sender: leader, + sender, }; let view_number = quorum_msg.proposal.data.view_number; tracing::debug!( @@ -1784,16 +1453,12 @@ mod test { use async_compatibility_layer::channel::unbounded; use async_lock::RwLock; use committable::Commitment; - use futures::{ - channel::mpsc::{channel, Receiver}, - StreamExt, - }; + use futures::StreamExt; use hotshot::{ traits::BlockPayload, - types::{BLSPubKey, Event, SignatureKey}, + types::{BLSPubKey, SignatureKey}, }; use hotshot_builder_api::v0_2::block_info::AvailableBlockInfo; - use hotshot_events_service::events_source::StartupInfo; use hotshot_example_types::{ block_types::{TestBlockPayload, TestMetadata, TestTransaction}, node_types::{TestTypes, TestVersions}, @@ -1810,25 +1475,22 @@ mod test { }, utils::BuilderCommitment, }; + use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; use sha2::{Digest, Sha256}; - use tide_disco::StatusCode; use crate::{ builder_state::{ BuildBlockInfo, MessageType, RequestMessage, ResponseMessage, TransactionSource, TriggerStatus, }, - service::{ - connect_to_events_service, ConnectToEventsServiceError, HandleReceivedTxnsError, - INITIAL_MAX_BLOCK_SIZE, - }, - BlockId, BuilderStateId, LegacyCommit, ParentBlockReferences, + service::{HandleReceivedTxnsError, INITIAL_MAX_BLOCK_SIZE}, + LegacyCommit, }; use super::{ handle_da_event_implementation, handle_quorum_event_implementation, AvailableBlocksError, BlockInfo, ClaimBlockError, ClaimBlockHeaderInputError, GlobalState, HandleDaEventError, - HandleQuorumEventError, HandleReceivedTxns, HotShotEventsService, ProxyGlobalState, + HandleQuorumEventError, HandleReceivedTxns, ProxyGlobalState, }; // GlobalState Tests @@ -1855,7 +1517,7 @@ mod test { let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(1), + parent_view: ViewNumber::new(1), }; // There should be a single entry within the spawned_builder_states, @@ -1868,7 +1530,7 @@ mod test { assert!(state.spawned_builder_states.contains_key(&builder_state_id), "The spawned builder states should contain an entry with the bootstrapped parameters passed into new"); - assert!(!state.spawned_builder_states.contains_key(&BuilderStateId { parent_commitment: parent_commit, view: ViewNumber::new(0) }), "The spawned builder states should not contain any other entry, as such it should not contain any entry with a higher view number, but the same parent commit"); + assert!(!state.spawned_builder_states.contains_key(&BuilderStateId { parent_commitment: parent_commit, parent_view: ViewNumber::new(0) }), "The spawned builder states should not contain any other entry, as such it should not contain any entry with a higher view number, but the same parent commit"); // We can't compare the Senders directly @@ -1920,7 +1582,7 @@ mod test { let (req_sender, _) = async_broadcast::broadcast(10); let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(5), + parent_view: ViewNumber::new(5), }; state.register_builder_state( @@ -1953,7 +1615,7 @@ mod test { let (req_sender, _) = async_broadcast::broadcast(10); let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(6), + parent_view: ViewNumber::new(6), }; state.register_builder_state( @@ -2007,7 +1669,7 @@ mod test { let (req_sender, req_receiver) = async_broadcast::broadcast(10); let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(5), + parent_view: ViewNumber::new(5), }; state.register_builder_state( @@ -2038,7 +1700,7 @@ mod test { let (req_sender, req_receiver) = async_broadcast::broadcast(10); let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(5), + parent_view: ViewNumber::new(5), }; // This is the same BuilderStateId as the previous one, so it should @@ -2068,7 +1730,7 @@ mod test { { let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(5), + parent_view: ViewNumber::new(5), }; let req_id_and_sender = state.spawned_builder_states.get(&builder_state_id).unwrap(); @@ -2121,7 +1783,7 @@ mod test { let (req_sender, _) = async_broadcast::broadcast(10); let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(6), + parent_view: ViewNumber::new(6), }; state.register_builder_state( @@ -2154,7 +1816,7 @@ mod test { let (req_sender, _) = async_broadcast::broadcast(10); let builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(5), + parent_view: ViewNumber::new(5), }; state.register_builder_state( @@ -2177,7 +1839,7 @@ mod test { state.highest_view_num_builder_id, BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(6) + parent_view: ViewNumber::new(6) }, "The highest view number builder id should now be the one that was just registered" ); @@ -2214,7 +1876,7 @@ mod test { let new_view_num = ViewNumber::new(1); let builder_state_id = BuilderStateId { parent_commitment: new_parent_commit, - view: new_view_num, + parent_view: new_view_num, }; let builder_hash_1 = BuilderCommitment::from_bytes([1, 2, 3, 4]); @@ -2413,7 +2075,7 @@ mod test { let new_view_num = ViewNumber::new(1); let builder_state_id = BuilderStateId { parent_commitment: new_parent_commit, - view: new_view_num, + parent_view: new_view_num, }; let builder_hash = BuilderCommitment::from_bytes([1, 2, 3, 4]); @@ -2689,7 +2351,7 @@ mod test { state.register_builder_state( BuilderStateId { parent_commitment: vid_commit, - view, + parent_view: view, }, ParentBlockReferences { view_number: view, @@ -2721,7 +2383,7 @@ mod test { let builder_state_id = BuilderStateId { parent_commitment: vid_commitment(&[10], 4), - view: ViewNumber::new(10), + parent_view: ViewNumber::new(10), }; assert_eq!( state.highest_view_num_builder_id, builder_state_id, @@ -2737,7 +2399,7 @@ mod test { assert!( state.spawned_builder_states.contains_key(&BuilderStateId { parent_commitment: vid_commitment(&[10], 4), - view: ViewNumber::new(10), + parent_view: ViewNumber::new(10), }), "The spawned builder states should contain the builder state id: {builder_state_id}" ); @@ -2781,7 +2443,7 @@ mod test { state.register_builder_state( BuilderStateId { parent_commitment: vid_commit, - view, + parent_view: view, }, ParentBlockReferences { view_number: view, @@ -2797,7 +2459,7 @@ mod test { state.highest_view_num_builder_id, BuilderStateId { parent_commitment: vid_commitment(&[10], 4), - view: ViewNumber::new(10), + parent_view: ViewNumber::new(10), }, "The highest view number builder id should be the one that was just registered" ); @@ -2858,7 +2520,7 @@ mod test { state.register_builder_state( BuilderStateId { parent_commitment: vid_commit, - view, + parent_view: view, }, ParentBlockReferences { view_number: view, @@ -2874,7 +2536,7 @@ mod test { state.highest_view_num_builder_id, BuilderStateId { parent_commitment: vid_commitment(&[10], 4), - view: ViewNumber::new(10), + parent_view: ViewNumber::new(10), }, "The highest view number builder id should be the one that was just registered" ); @@ -2899,7 +2561,7 @@ mod test { state.register_builder_state( BuilderStateId { parent_commitment: vid_commit, - view, + parent_view: view, }, ParentBlockReferences { view_number: view, @@ -2955,7 +2617,7 @@ mod test { state.register_builder_state( BuilderStateId { parent_commitment: vid_commit, - view, + parent_view: view, }, ParentBlockReferences { view_number: view, @@ -2977,7 +2639,7 @@ mod test { state.highest_view_num_builder_id, BuilderStateId { parent_commitment: vid_commitment(&[10], 4), - view: ViewNumber::new(10), + parent_view: ViewNumber::new(10), }, "The highest view number builder id should be the one that was just registered" ); @@ -3008,7 +2670,7 @@ mod test { for i in 0..5 { let builder_state_id = BuilderStateId { parent_commitment: vid_commitment(&[i], 4), - view: ViewNumber::new(i as u64), + parent_view: ViewNumber::new(i as u64), }; assert!( !state.spawned_builder_states.contains_key(&builder_state_id), @@ -3019,7 +2681,7 @@ mod test { for i in 5..=10 { let builder_state_id = BuilderStateId { parent_commitment: vid_commitment(&[i], 4), - view: ViewNumber::new(i as u64), + parent_view: ViewNumber::new(i as u64), }; assert!( state.spawned_builder_states.contains_key(&builder_state_id), @@ -3240,7 +2902,7 @@ mod test { let mut write_locked_global_state = state.global_state.write_arc().await; write_locked_global_state.highest_view_num_builder_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(5), + parent_view: ViewNumber::new(5), }; } @@ -3326,7 +2988,7 @@ mod test { // Now we want to make the block data available to the state. let expected_builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(1), + parent_view: ViewNumber::new(1), }; let mut response_receiver = { @@ -3340,7 +3002,7 @@ mod test { write_locked_global_state.register_builder_state( expected_builder_state_id.clone(), ParentBlockReferences { - view_number: expected_builder_state_id.view, + view_number: expected_builder_state_id.parent_view, vid_commitment: expected_builder_state_id.parent_commitment, leaf_commit: Commitment::from_raw([0; 32]), builder_commitment: BuilderCommitment::from_bytes([]), @@ -3447,7 +3109,7 @@ mod test { // Now we want to make the block data available to the state. let expected_builder_state_id = BuilderStateId { parent_commitment: parent_commit, - view: ViewNumber::new(1), + parent_view: ViewNumber::new(1), }; let mut response_receiver = { @@ -3461,7 +3123,7 @@ mod test { write_locked_global_state.register_builder_state( expected_builder_state_id.clone(), ParentBlockReferences { - view_number: expected_builder_state_id.view, + view_number: expected_builder_state_id.parent_view, vid_commitment: expected_builder_state_id.parent_commitment, leaf_commit: Commitment::from_raw([0; 32]), builder_commitment: BuilderCommitment::from_bytes([]), @@ -4148,62 +3810,6 @@ mod test { // handle_da_event Tests - /// This test checks that the error [HandleDaEventError::SignatureValidationFailed] - /// is returned under the right conditions of invoking - /// [handle_da_event_implementation]. - /// - /// To trigger this error, we simply need to ensure that the public keys - /// provided for the leader and the sender do not match. - #[async_std::test] - async fn test_handle_da_event_implementation_error_sender_is_not_leader() { - let (sender_public_key, sender_private_key) = - ::generated_from_seed_indexed([0; 32], 0); - let (leader_public_key, _) = - ::generated_from_seed_indexed([0; 32], 1); - let (da_channel_sender, _) = async_broadcast::broadcast(10); - let total_nodes = NonZeroUsize::new(10).unwrap(); - let view_number = ViewNumber::new(10); - - let da_proposal = DaProposal:: { - encoded_transactions: Arc::new([1, 2, 3, 4, 5, 6]), - metadata: TestMetadata { - num_transactions: 1, - }, - view_number, - }; - - let encoded_txns_hash = Sha256::digest(&da_proposal.encoded_transactions); - let signature = - ::sign(&sender_private_key, &encoded_txns_hash).unwrap(); - - let signed_da_proposal = Arc::new(Proposal { - data: da_proposal, - signature, - _pd: Default::default(), - }); - - let result = handle_da_event_implementation( - &da_channel_sender, - signed_da_proposal.clone(), - sender_public_key, - leader_public_key, - total_nodes, - ) - .await; - - match result { - Err(HandleDaEventError::SenderIsNotLeader) => { - // This is expected. - } - Ok(_) => { - panic!("expected an error, but received a successful attempt instead") - } - Err(err) => { - panic!("Unexpected error: {:?}", err); - } - } - } - /// This test checks that the error [HandleDaEventError::SignatureValidationFailed] /// is returned under the right conditions of invoking /// [handle_da_event_implementation]. @@ -4244,7 +3850,6 @@ mod test { &da_channel_sender, signed_da_proposal.clone(), sender_public_key, - sender_public_key, total_nodes, ) .await; @@ -4273,8 +3878,6 @@ mod test { async fn test_handle_da_event_implementation_error_broadcast_failed() { let (sender_public_key, sender_private_key) = ::generated_from_seed_indexed([0; 32], 0); - let (leader_public_key, _) = - ::generated_from_seed_indexed([0; 32], 0); let da_channel_sender = { let (da_channel_sender, _) = async_broadcast::broadcast(10); da_channel_sender @@ -4305,7 +3908,6 @@ mod test { &da_channel_sender, signed_da_proposal.clone(), sender_public_key, - leader_public_key, total_nodes, ) .await; @@ -4329,8 +3931,6 @@ mod test { async fn test_handle_da_event_implementation_success() { let (sender_public_key, sender_private_key) = ::generated_from_seed_indexed([0; 32], 0); - let (leader_public_key, _) = - ::generated_from_seed_indexed([0; 32], 0); let (da_channel_sender, da_channel_receiver) = async_broadcast::broadcast(10); let total_nodes = NonZeroUsize::new(10).unwrap(); let view_number = ViewNumber::new(10); @@ -4357,7 +3957,6 @@ mod test { &da_channel_sender, signed_da_proposal.clone(), sender_public_key, - leader_public_key, total_nodes, ) .await; @@ -4384,74 +3983,6 @@ mod test { // handle_quorum_event Tests - /// This test checks that the error [HandleQuorumEventError::SenderIsNotLeader] - /// is returned under the right conditions of invoking - /// [handle_quorum_event_implementation]. - /// - /// To trigger this error, we simply need to ensure that the public keys - /// provided for the leader and the sender do not match. - #[async_std::test] - async fn test_handle_quorum_event_error_sender_is_not_leader() { - let (sender_public_key, sender_private_key) = - ::generated_from_seed_indexed([0; 32], 0); - let (leader_public_key, _) = - ::generated_from_seed_indexed([0; 32], 1); - let (quorum_channel_sender, _) = async_broadcast::broadcast(10); - let view_number = ViewNumber::new(10); - - let quorum_proposal = { - let leaf = Leaf::::genesis( - &TestValidatedState::default(), - &TestInstanceState::default(), - ) - .await; - - QuorumProposal:: { - block_header: leaf.block_header().clone(), - view_number, - justify_qc: QuorumCertificate::genesis::( - &TestValidatedState::default(), - &TestInstanceState::default(), - ) - .await, - upgrade_certificate: None, - proposal_certificate: None, - } - }; - - let leaf = Leaf::from_quorum_proposal(&quorum_proposal); - - let signature = - ::sign(&sender_private_key, leaf.legacy_commit().as_ref()) - .unwrap(); - - let signed_quorum_proposal = Arc::new(Proposal { - data: quorum_proposal, - signature, - _pd: Default::default(), - }); - - let result = handle_quorum_event_implementation( - &quorum_channel_sender, - signed_quorum_proposal.clone(), - sender_public_key, - leader_public_key, - ) - .await; - - match result { - Err(HandleQuorumEventError::SenderIsNotLeader) => { - // This is expected. - } - Ok(_) => { - panic!("expected an error, but received a successful attempt instead"); - } - Err(err) => { - panic!("Unexpected error: {:?}", err); - } - } - } - /// This test checks that the error [HandleQuorumEventError::SignatureValidationFailed] /// is returned under the right conditions of invoking /// [handle_quorum_event_implementation]. @@ -4506,7 +4037,6 @@ mod test { &quorum_channel_sender, signed_quorum_proposal.clone(), sender_public_key, - sender_public_key, ) .await; @@ -4534,8 +4064,6 @@ mod test { async fn test_handle_quorum_event_error_broadcast_failed() { let (sender_public_key, sender_private_key) = ::generated_from_seed_indexed([0; 32], 0); - let (leader_public_key, _) = - ::generated_from_seed_indexed([0; 32], 0); let quorum_channel_sender = { let (quorum_channel_sender, _) = async_broadcast::broadcast(10); quorum_channel_sender @@ -4579,7 +4107,6 @@ mod test { &quorum_channel_sender, signed_quorum_proposal.clone(), sender_public_key, - leader_public_key, ) .await; @@ -4602,8 +4129,6 @@ mod test { async fn test_handle_quorum_event_success() { let (sender_public_key, sender_private_key) = ::generated_from_seed_indexed([0; 32], 0); - let (leader_public_key, _) = - ::generated_from_seed_indexed([0; 32], 0); let (quorum_channel_sender, quorum_channel_receiver) = async_broadcast::broadcast(10); let view_number = ViewNumber::new(10); @@ -4643,7 +4168,6 @@ mod test { &quorum_channel_sender, signed_quorum_proposal.clone(), sender_public_key, - leader_public_key, ) .await; @@ -4867,213 +4391,4 @@ mod test { } } } - - /// This test checks that the error [ConnectToEventsServiceError::Connection] - /// is returned when [connect_to_events_service] is invoked. - /// - /// This error may be difficult to control in all environments. In order - /// to control this specific case, the underlying client being passed to - /// [connect_to_events_service] is a mock that always returns false when - /// [check_connection] is invoked. - #[async_std::test] - async fn test_connect_to_events_service_mock_error_connection() { - struct ConnectionFailure; - - #[async_trait::async_trait] - impl HotShotEventsService for ConnectionFailure { - type EventsStream = - Receiver, hotshot_events_service::events::Error>>; - type EventsError = hotshot_events_service::events::Error; - type StartUpInfo = StartupInfo; - type StartUpInfoError = hotshot_events_service::events::Error; - - async fn check_connection(&self, _timeout: Option) -> bool { - false - } - - async fn events(&self) -> Result { - todo!() - } - - async fn startup_info(&self) -> Result { - todo!() - } - } - - let client = ConnectionFailure; - - match connect_to_events_service(&client).await { - Err(ConnectToEventsServiceError::Connection) => { - // This is expected. - } - Ok(_) => { - panic!("Expected an error, but got a result"); - } - Err(err) => { - panic!("Unexpected error: {:?}", err); - } - } - } - - /// This test checks that the error [ConnectToEventsServiceError::Subscription] - /// is returned when [connect_to_events_service] is invoked. - /// - /// This error may be difficult to control in all environments. In order - /// to control this specific case, the underlying client being passed to - /// [connect_to_events_service] is a mock that always returns an error when - /// [events] is invoked. - #[async_std::test] - async fn test_connect_to_events_service_mock_error_events() { - struct EventsFailure; - - #[async_trait::async_trait] - impl HotShotEventsService for EventsFailure { - type EventsStream = - Receiver, hotshot_events_service::events::Error>>; - type EventsError = hotshot_events_service::events::Error; - type StartUpInfo = StartupInfo; - type StartUpInfoError = hotshot_events_service::events::Error; - - async fn check_connection(&self, _timeout: Option) -> bool { - true - } - - async fn events(&self) -> Result { - Err(hotshot_events_service::events::Error::Custom { - message: "This is a custom error".to_string(), - status: StatusCode::INTERNAL_SERVER_ERROR, - }) - } - - async fn startup_info(&self) -> Result { - todo!() - } - } - - let client = EventsFailure; - - match connect_to_events_service(&client).await { - Err(ConnectToEventsServiceError::Subscription( - hotshot_events_service::events::Error::Custom { .. }, - )) => { - // This is expected. - } - Ok(_) => { - panic!("Expected an error, but got a result"); - } - Err(err) => { - panic!("Unexpected error: {:?}", err); - } - } - } - - /// This test checks that the error [ConnectToEventsServiceError::StartupInfo] - /// is returned when [connect_to_events_service] is invoked. - /// - /// This error may be difficult to control in all environments. In order - /// to control this specific case, the underlying client being passed to - /// [connect_to_events_service] is a mock that always returns an error when - /// [startup_info] is invoked. - #[async_std::test] - async fn test_connect_to_service_mock_error_startup_info() { - type EventsStream = - Receiver, hotshot_events_service::events::Error>>; - struct StartupInfoFailure(Arc>>); - - #[async_trait::async_trait] - impl HotShotEventsService for StartupInfoFailure { - type EventsStream = EventsStream; - type EventsError = hotshot_events_service::events::Error; - type StartUpInfo = StartupInfo; - type StartUpInfoError = hotshot_events_service::events::Error; - - async fn check_connection(&self, _timeout: Option) -> bool { - true - } - - async fn events(&self) -> Result { - let mut write_lock_guard = self.0.write_arc().await; - - write_lock_guard - .take() - .ok_or(hotshot_events_service::events::Error::Custom { - message: "This is a custom error".to_string(), - status: StatusCode::INTERNAL_SERVER_ERROR, - }) - } - - async fn startup_info(&self) -> Result { - Err(hotshot_events_service::events::Error::Custom { - message: "This is a custom error".to_string(), - status: StatusCode::INTERNAL_SERVER_ERROR, - }) - } - } - - let client = StartupInfoFailure(Arc::new(RwLock::new(Some(channel(10).1)))); - - match connect_to_events_service(&client).await { - Err(ConnectToEventsServiceError::StartupInfo( - hotshot_events_service::events::Error::Custom { .. }, - )) => { - // This is expected. - } - Ok(_) => { - panic!("Expected an error, but got a result"); - } - Err(err) => { - panic!("Unexpected error: {:?}", err); - } - } - } - - /// This test checks that [connect_to_events_service] completes - /// successfully. - #[async_std::test] - async fn test_connect_to_service_mock_success() { - type EventsStream = - Receiver, hotshot_events_service::events::Error>>; - struct Success(Arc>>); - - #[async_trait::async_trait] - impl HotShotEventsService for Success { - type EventsStream = EventsStream; - type EventsError = hotshot_events_service::events::Error; - type StartUpInfo = StartupInfo; - type StartUpInfoError = hotshot_events_service::events::Error; - - async fn check_connection(&self, _timeout: Option) -> bool { - true - } - - async fn events(&self) -> Result { - let mut write_lock_guard = self.0.write_arc().await; - - write_lock_guard - .take() - .ok_or(hotshot_events_service::events::Error::Custom { - message: "This is a custom error".to_string(), - status: StatusCode::INTERNAL_SERVER_ERROR, - }) - } - - async fn startup_info(&self) -> Result { - Ok(StartupInfo { - known_node_with_stake: vec![], - non_staked_node_count: 0, - }) - } - } - - let client = Success(Arc::new(RwLock::new(Some(channel(10).1)))); - - match connect_to_events_service(&client).await { - Ok(_) => { - // This is expected. - } - Err(err) => { - panic!("Unexpected error: {:?}", err); - } - } - } } diff --git a/crates/legacy/src/testing/basic_test.rs b/crates/legacy/src/testing/basic_test.rs index f3efdb5b..04eeeffa 100644 --- a/crates/legacy/src/testing/basic_test.rs +++ b/crates/legacy/src/testing/basic_test.rs @@ -38,6 +38,7 @@ mod tests { block_types::{TestBlockHeader, TestBlockPayload, TestMetadata, TestTransaction}, state_types::{TestInstanceState, TestValidatedState}, }; + use marketplace_builder_shared::block::ParentBlockReferences; use crate::builder_state::{ DaProposalMessage, DecideMessage, QuorumProposalMessage, TransactionSource, @@ -45,7 +46,7 @@ mod tests { use crate::service::{ handle_received_txns, GlobalState, ProxyGlobalState, ReceivedTransaction, }; - use crate::{LegacyCommit, ParentBlockReferences}; + use crate::LegacyCommit; use async_lock::RwLock; use committable::{Commitment, CommitmentBoundsArkless, Committable}; use sha2::{Digest, Sha256}; diff --git a/crates/legacy/src/testing/finalization_test.rs b/crates/legacy/src/testing/finalization_test.rs index 6eec2474..40e7215a 100644 --- a/crates/legacy/src/testing/finalization_test.rs +++ b/crates/legacy/src/testing/finalization_test.rs @@ -3,7 +3,6 @@ use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use crate::{ builder_state::{DaProposalMessage, QuorumProposalMessage, ALLOW_EMPTY_BLOCK_PERIOD}, service::{GlobalState, ProxyGlobalState, ReceivedTransaction}, - BuilderStateId, ParentBlockReferences, }; use async_broadcast::{broadcast, Sender}; use async_lock::RwLock; @@ -31,6 +30,8 @@ use hotshot_types::{ }, utils::BuilderCommitment, }; +use marketplace_builder_shared::block::BuilderStateId; +use marketplace_builder_shared::block::ParentBlockReferences; use sha2::{Digest, Sha256}; use super::basic_test::{BuilderState, MessageType}; @@ -66,15 +67,15 @@ fn setup_builder_for_test() -> TestSetup { let bootstrap_builder_state_id = BuilderStateId:: { parent_commitment: vid_commitment(&[], TEST_NUM_NODES_IN_VID_COMPUTATION), - view: ViewNumber::genesis(), + parent_view: ViewNumber::genesis(), }; let global_state = Arc::new(RwLock::new(GlobalState::new( req_sender, tx_sender.clone(), bootstrap_builder_state_id.parent_commitment, - bootstrap_builder_state_id.view, - bootstrap_builder_state_id.view, + bootstrap_builder_state_id.parent_view, + bootstrap_builder_state_id.parent_view, 0, ))); @@ -153,7 +154,7 @@ async fn process_available_blocks_round( let available_blocks_result = proxy_global_state .available_blocks( &builder_state_id.parent_commitment, - builder_state_id.view.u64(), + builder_state_id.parent_view.u64(), leader_pub, ¤t_commit_signature, ) @@ -194,7 +195,7 @@ async fn progress_round_with_available_block_info( let claim_block_result = proxy_global_state .claim_block( &available_block_info.block_hash, - builder_state_id.view.u64(), + builder_state_id.parent_view.u64(), leader_pub, &signed_parent_commitment, ) @@ -204,7 +205,7 @@ async fn progress_round_with_available_block_info( let _claim_block_header_result = proxy_global_state .claim_block_header_input( &available_block_info.block_hash, - builder_state_id.view.u64(), + builder_state_id.parent_view.u64(), leader_pub, &signed_parent_commitment, ) @@ -258,7 +259,7 @@ async fn progress_round_with_transactions( ) -> BuilderStateId { let (leader_pub, leader_priv) = BLSPubKey::generated_from_seed_indexed([0; 32], round); let encoded_transactions = TestTransaction::encode(&transactions); - let next_view = builder_state_id.view + 1; + let next_view = builder_state_id.parent_view + 1; // Create and send the DA Proposals and Quorum Proposals { @@ -353,7 +354,7 @@ async fn progress_round_with_transactions( BuilderStateId { parent_commitment: payload_vid_commitment, - view: next_view, + parent_view: next_view, } } } @@ -375,7 +376,7 @@ async fn test_empty_block_rate() { let mut current_builder_state_id = BuilderStateId:: { parent_commitment: vid_commitment(&[], TEST_NUM_NODES_IN_VID_COMPUTATION), - view: ViewNumber::genesis(), + parent_view: ViewNumber::genesis(), }; for round in 0..10 { @@ -426,7 +427,7 @@ async fn test_eager_block_rate() { let mut current_builder_state_id = BuilderStateId:: { parent_commitment: vid_commitment(&[], TEST_NUM_NODES_IN_VID_COMPUTATION), - view: ViewNumber::genesis(), + parent_view: ViewNumber::genesis(), }; // Round 0 diff --git a/crates/marketplace/Cargo.toml b/crates/marketplace/Cargo.toml index 3b357baf..5e241ba4 100644 --- a/crates/marketplace/Cargo.toml +++ b/crates/marketplace/Cargo.toml @@ -4,6 +4,7 @@ version = { workspace = true } edition = { workspace = true } [dependencies] +marketplace-builder-shared = { path = "../shared" } anyhow = "1" async-broadcast = "0.7" diff --git a/crates/marketplace/src/builder_state.rs b/crates/marketplace/src/builder_state.rs index fb6fca25..79e7b92a 100644 --- a/crates/marketplace/src/builder_state.rs +++ b/crates/marketplace/src/builder_state.rs @@ -8,12 +8,14 @@ use hotshot_types::{ }, utils::BuilderCommitment, }; +use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; +use marketplace_builder_shared::utils::RotatingSet; use committable::Commitment; use crate::{ service::{BroadcastReceivers, GlobalState, ReceivedTransaction}, - utils::{BlockId, BuilderStateId, LegacyCommit as _, ParentBlockReferences, RotatingSet}, + utils::LegacyCommit as _, }; use async_broadcast::broadcast; use async_broadcast::Receiver as BroadcastReceiver; diff --git a/crates/marketplace/src/service.rs b/crates/marketplace/src/service.rs index c88e4da5..adfda17b 100644 --- a/crates/marketplace/src/service.rs +++ b/crates/marketplace/src/service.rs @@ -1,4 +1,23 @@ +use std::{fmt::Debug, marker::PhantomData, time::Duration}; + +use crate::{ + builder_state::{ + BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QuorumProposalMessage, + RequestMessage, ResponseMessage, TransactionSource, + }, + utils::LegacyCommit as _, +}; +use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; + use anyhow::bail; +pub use async_broadcast::{broadcast, RecvError, TryRecvError}; +use async_broadcast::{InactiveReceiver, Sender as BroadcastSender, TrySendError}; +use async_lock::RwLock; +use async_trait::async_trait; +use committable::{Commitment, Committable}; +use derivative::Derivative; +use futures::stream::StreamExt; +use futures::{future::BoxFuture, Stream}; use hotshot::types::Event; use hotshot_builder_api::v0_3::{ builder::{define_api, submit_api, BuildError, Error as BuilderApiError}, @@ -18,26 +37,6 @@ use hotshot_types::{ utils::BuilderCommitment, vid::VidCommitment, }; -use tracing::{error, instrument}; -use vbs::version::StaticVersion; - -use std::{fmt::Debug, marker::PhantomData, time::Duration}; - -use crate::{ - builder_state::{ - BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QuorumProposalMessage, - RequestMessage, ResponseMessage, TransactionSource, - }, - utils::{BlockId, BuilderStateId, LegacyCommit as _, ParentBlockReferences}, -}; -pub use async_broadcast::{broadcast, RecvError, TryRecvError}; -use async_broadcast::{InactiveReceiver, Sender as BroadcastSender, TrySendError}; -use async_lock::RwLock; -use async_trait::async_trait; -use committable::{Commitment, Committable}; -use derivative::Derivative; -use futures::stream::StreamExt; -use futures::{future::BoxFuture, Stream}; use sha2::{Digest, Sha256}; use std::collections::HashMap; use std::num::NonZeroUsize; @@ -45,6 +44,10 @@ use std::sync::Arc; use std::{fmt::Display, time::Instant}; use tagged_base64::TaggedBase64; use tide_disco::{app::AppError, method::ReadState, App}; +use tracing::{error, instrument}; +use vbs::version::StaticVersion; + +pub use marketplace_builder_shared::utils::EventServiceStream; // It holds all the necessary information for a block #[derive(Debug)] @@ -608,14 +611,34 @@ pub struct BroadcastSenders { pub trait BuilderHooks: Sync + Send + 'static { #[inline(always)] async fn process_transactions( - self: &Arc, + &self, transactions: Vec, ) -> Vec { transactions } #[inline(always)] - async fn handle_hotshot_event(self: &Arc, _event: &Event) {} + async fn handle_hotshot_event(&self, _event: &Event) {} +} + +#[async_trait] +impl BuilderHooks for Box +where + Types: NodeType, + T: BuilderHooks, +{ + #[inline(always)] + async fn process_transactions( + &self, + transactions: Vec, + ) -> Vec { + (**self).process_transactions(transactions).await + } + + #[inline(always)] + async fn handle_hotshot_event(&self, event: &Event) { + (**self).handle_hotshot_event(event).await + } } pub struct NoHooks(pub PhantomData); @@ -624,10 +647,13 @@ impl BuilderHooks for NoHooks {} /// Run builder service, /// Refer to documentation for [`ProxyGlobalState`] for more details -pub async fn run_builder_service>( +pub async fn run_builder_service< + TYPES: NodeType