diff --git a/crates/marketplace/src/builder_state.rs b/crates/marketplace/src/builder_state.rs index 9b1b5cb9..5544f26d 100644 --- a/crates/marketplace/src/builder_state.rs +++ b/crates/marketplace/src/builder_state.rs @@ -7,14 +7,13 @@ use hotshot_types::{ EncodeBytes, }, utils::BuilderCommitment, - vid::VidCommitment, }; use committable::{Commitment, Committable}; use crate::{ service::{BroadcastReceivers, GlobalState, ReceivedTransaction}, - utils::{BlockId, BuilderStateId, RotatingSet}, + utils::{BlockId, BuilderStateId, BuiltFromProposedBlock, RotatingSet}, }; use async_broadcast::broadcast; use async_broadcast::Receiver as BroadcastReceiver; @@ -97,22 +96,6 @@ pub enum Status { ShouldContinue, } -/// Builder State to hold the state of the builder -#[derive(Debug, Clone)] -pub struct BuiltFromProposedBlock { - pub view_number: TYPES::Time, - pub vid_commitment: VidCommitment, - pub leaf_commit: Commitment>, - pub builder_commitment: BuilderCommitment, -} - -// implement display for the derived info -impl std::fmt::Display for BuiltFromProposedBlock { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "View Number: {:?}", self.view_number) - } -} - #[derive(Debug)] pub struct BuilderState { pub included_txns: RotatingSet>, @@ -172,7 +155,206 @@ pub struct BuilderState { pub instance_state: Arc, } +/// [best_builder_states_to_extend] is a utility function that is used to +/// in order to determine which [BuilderState]s are the best fit to extend +/// from. +/// +/// This function is designed to inspect the current state of the global state +/// in order to determine which [BuilderState]s are the best fit to extend +/// from. We only want to use information from [GlobalState] as otherwise +/// we would have some insider knowledge unique to our specific [BuilderState] +/// rather than knowledge that is available to all [BuilderState]s. In fact, +/// in order to ensure this, this function lives outside of the [BuilderState] +/// itself. +/// +/// In an ideal circumstance the best [BuilderState] to extend from is going to +/// be the one that is immediately preceding the [QuorumProposal] that we are +/// attempting to extend from. However, if all we know is the [ViewNumber] of +/// the [QuorumProposal] that we are attempting to extend from, then we may end +/// up in a scenario where we have multiple [BuilderState]s that are all equally +/// valid to extend from. When this happens, we have the potential for a data +/// race. +/// +/// The primary cause of this has to due with the interface of the +/// [ProxyGlobalState]'s API. In general, we want to be able to retrieve a +/// [BuilderState] via the [BuilderStateId]. The [BuilderStateId] only +/// references a [ViewNumber] and a [VidCommitment]. While this information is +/// available in the [QuorumProposal], it only helps us to rule out +/// [BuilderState]s that already exist. It does **NOT** help us to pick a +/// [BuilderState] that is the best fit to extend from. +/// +/// This is where the `justify_qc` comes in to consideration. The `justify_qc` +/// contains the previous [ViewNumber] that is being extended from, and in +/// addition it also contains the previous [Commitment>] that is +/// being built on top of. Since our [BuilderState]s store identifying +/// information that contains this same `leaf_commit` we can compare these +/// directly to ensure that we are extending from the correct [BuilderState]. +/// +/// This function determines the best [BuilderState] in the following steps: +/// +/// 1. If we have a [BuilderState] that is already spawned for the current +/// [QuorumProposal], then we should should return no states, as one already +/// exists. This will prevent us from attempting to spawn duplicate +/// [BuilderState]s. +/// 2. Attempt to find all [BuilderState]s that are recorded within +/// [GlobalState] that have matching view number and leaf commitments. There +/// *should* only be one of these. But all would be valid extension points. +/// 3. If we can't find any [BuilderState]s that match the view number +/// and leaf commitment, then we should return for the maximum stored view +/// number that is smaller than the current [QuorumProposal]. +/// 4. If there is is only one [BuilderState] stored in the [GlobalState], then +/// we should return that [BuilderState] as the best fit. +/// 5. If none of the other criteria match, we return an empty result as it is +/// unclear what to do in this case. +/// +/// > Note: Any time this function returns more than a single entry in its +/// > [HashSet] result, there is a potential for a race condition. This is +/// > because there are multiple [BuilderState]s that are equally valid to +/// > extend from. This race could be avoided by just picking one of the +/// > entries in the resulting [HashSet], but this is not done here in order +/// > to allow us to highlight the possibility of the race. +async fn best_builder_states_to_extend( + quorum_proposal: Arc>>, + global_state: Arc>>, +) -> HashSet> { + let current_view_number = quorum_proposal.data.view_number; + let current_commitment = quorum_proposal.data.block_header.payload_commitment(); + let current_builder_state_id = BuilderStateId:: { + parent_commitment: current_commitment, + parent_view: current_view_number, + }; + + let global_state_read_lock = global_state.read_arc().await; + + // The first step is to check if we already have a spawned [BuilderState]. + // If we do, then we should indicate that there is no best fit, as we + // don't want to spawn another [BuilderState]. + if global_state_read_lock + .spawned_builder_states + .contains_key(¤t_builder_state_id) + { + // We already have a spawned [BuilderState] for this proposal. + // So we should just ignore it. + return HashSet::new(); + } + + // Next we want to see if there is an immediate match for a [BuilderState] + // that we can extend from. This is the most ideal situation, as it + // implies that we are extending from the correct [BuilderState]. + // We do this by checking the `justify_qc` stored within the + // [QuorumProposal], and checking it against the current spawned + // [BuilderState]s + let justify_qc = &quorum_proposal.data.justify_qc; + let existing_states: HashSet<_> = global_state_read_lock + .spawned_builder_states + .iter() + .filter( + |(_, (built_from_proposed_block, _))| match built_from_proposed_block { + None => false, + Some(built_from_proposed_block) => { + built_from_proposed_block.leaf_commit == justify_qc.data.leaf_commit + && built_from_proposed_block.view_number == justify_qc.view_number + } + }, + ) + .map(|(builder_state_id, _)| builder_state_id.clone()) + .collect(); + + // If we found any matching [BuilderState]s, then we should return them + // as the best fit. + if !existing_states.is_empty() { + return existing_states; + } + + // At this point, we don't have any "ideal" matches or scenarios. So we + // need to look for a suitable fall-back. The best fallback condition to + // start with is any [BuilderState] that has the maximum spawned view + // number whose value is smaller than the current [QuorumProposal]. + let maximum_stored_view_number_smaller_than_quorum_proposal = global_state_read_lock + .spawned_builder_states + .keys() + .map(|builder_state_id| *builder_state_id.parent_view) + .filter(|view_number| view_number < ¤t_view_number) + .max(); + + // If we have a maximum view number that meets our criteria, then we should + // return all [BuilderStateId]s that match this view number. + // This can lead to multiple [BuilderStateId]s being returned. + if let Some(maximum_stored_view_number_smaller_than_quorum_proposal) = + maximum_stored_view_number_smaller_than_quorum_proposal + { + // If we are the maximum stored view number smaller than the quorum + // proposal's view number, then we are the best fit. + let mut result = HashSet::new(); + for builder_state_id in + global_state_read_lock + .spawned_builder_states + .keys() + .filter(|builder_state_id| { + builder_state_id.parent_view.u64() + == maximum_stored_view_number_smaller_than_quorum_proposal + }) + { + result.insert(builder_state_id.clone()); + } + return result; + } + + // This is our last ditch effort to continue making progress. If there is + // only one [BuilderState] active, then we should return that as the best + // fit, as it will be the only way we can continue making progress with + // the builder. + if global_state_read_lock.spawned_builder_states.len() == 1 { + let mut result = HashSet::new(); + for builder_state_id in global_state_read_lock.spawned_builder_states.keys() { + result.insert(builder_state_id.clone()); + } + return result; + } + + // This implies that there are only larger [BuilderState]s active than + // the one we are. This is weird, it implies that some sort of time + // travel has occurred view-wise. It is unclear what to do in this + // situation. + + HashSet::new() +} + impl BuilderState { + /// [am_i_the_best_builder_state_to_extend] is a utility method that + /// attempts to determine whether we are among the best [BuilderState]s to + /// extend from. + async fn am_i_the_best_builder_state_to_extend( + &self, + quorum_proposal: Arc>>, + ) -> bool { + let best_builder_states_to_extend = + best_builder_states_to_extend(quorum_proposal.clone(), self.global_state.clone()).await; + + tracing::debug!( + "{}@{} thinks these are the best builder states to extend from: {:?} for proposal {}@{}", + self.built_from_proposed_block.vid_commitment, + self.built_from_proposed_block.view_number.u64(), + best_builder_states_to_extend + .iter() + .map(|builder_state_id| format!( + "{}@{}", + builder_state_id.parent_commitment, + builder_state_id.parent_view.u64() + )) + .collect::>(), + quorum_proposal.data.block_header.payload_commitment(), + quorum_proposal.data.view_number.u64(), + ); + + // We are a best fit if we are contained within the returned set of + // best [BuilderState]s to extend from. + best_builder_states_to_extend.contains(&BuilderStateId { + parent_commitment: self.built_from_proposed_block.vid_commitment, + parent_view: self.built_from_proposed_block.view_number, + }) + } + /// processing the DA proposal #[tracing::instrument(skip_all, name = "process da proposal", fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] @@ -215,9 +397,7 @@ impl BuilderState { self.quorum_proposal_payload_commit_to_quorum_proposal .remove(&(da_msg.builder_commitment.clone(), da_msg.view_number)); - let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity()); - self.clone_with_receiver(req_receiver) - .spawn_clone(da_msg, quorum_proposal, req_sender) + self.spawn_clone_that_extends_self(da_msg, quorum_proposal) .await; } else { tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers"); @@ -246,43 +426,6 @@ impl BuilderState { // To handle both cases, we can have the highest view number builder state running // and only doing the insertion if and only if intended builder state for a particulat view is not present // check the presence of quorum_proposal.data.view_number-1 in the spawned_builder_states list - if qc_msg.proposal.data.justify_qc.view_number != self.built_from_proposed_block.view_number - { - tracing::debug!( - "View number {:?} from justify qc does not match for builder {:?}", - qc_msg.proposal.data.justify_qc.view_number, - self.built_from_proposed_block - ); - if !self - .global_state - .read_arc() - .await - .should_view_handle_other_proposals( - &self.built_from_proposed_block.view_number, - &qc_msg.proposal.data.justify_qc.view_number, - ) - { - tracing::debug!( - "Builder {:?} is not currently bootstrapping.", - self.built_from_proposed_block - ); - // if we have the matching da proposal, we now know we don't need to keep it. - self.da_proposal_payload_commit_to_da_proposal.remove(&( - qc_msg - .proposal - .data - .block_header - .builder_commitment() - .clone(), - qc_msg.proposal.data.view_number, - )); - return; - } - tracing::debug!( - "Builder {:?} handling proposal as bootstrap.", - self.built_from_proposed_block - ); - } let quorum_proposal = &qc_msg.proposal; let view_number = quorum_proposal.data.view_number; let payload_builder_commitment = quorum_proposal.data.block_header.builder_commitment(); @@ -314,9 +457,7 @@ impl BuilderState { view_number ); - let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity()); - self.clone_with_receiver(req_receiver) - .spawn_clone(da_proposal_info, quorum_proposal.clone(), req_sender) + self.spawn_clone_that_extends_self(da_proposal_info, quorum_proposal.clone()) .await; } else { tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers"); @@ -329,6 +470,46 @@ impl BuilderState { } } + /// [spawn_a_clone_that_extends_self] is a helper function that is used by + /// both [process_da_proposal] and [process_quorum_proposal] to spawn a + /// new [BuilderState] that extends from the current [BuilderState]. + /// + /// This helper function also adds additional checks in order to ensure + /// that the [BuilderState] that is being spawned is the best fit for the + /// [QuorumProposal] that is being extended from. + async fn spawn_clone_that_extends_self( + &mut self, + da_proposal_info: Arc>, + quorum_proposal: Arc>>, + ) { + if !self + .am_i_the_best_builder_state_to_extend(quorum_proposal.clone()) + .await + { + tracing::debug!( + "{} is not the best fit for forking, {}@{}, so ignoring the QC proposal, and leaving it to another BuilderState", + self.built_from_proposed_block, + quorum_proposal.data.block_header.payload_commitment(), + quorum_proposal.data.view_number.u64(), + ); + return; + } + + let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity()); + + tracing::debug!( + "extending BuilderState with a clone from {} with new proposal {}@{}", + self.built_from_proposed_block, + quorum_proposal.data.block_header.payload_commitment(), + quorum_proposal.data.view_number.u64() + ); + + // We literally fork ourselves + self.clone_with_receiver(req_receiver) + .spawn_clone(da_proposal_info, quorum_proposal.clone(), req_sender) + .await; + } + /// processing the decide event #[tracing::instrument(skip_all, name = "process decide event", fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] @@ -371,14 +552,39 @@ impl BuilderState { quorum_proposal: Arc>>, req_sender: BroadcastSender>, ) { - self.built_from_proposed_block.view_number = quorum_proposal.data.view_number; - self.built_from_proposed_block.vid_commitment = - quorum_proposal.data.block_header.payload_commitment(); - self.built_from_proposed_block.builder_commitment = - quorum_proposal.data.block_header.builder_commitment(); let leaf = Leaf::from_quorum_proposal(&quorum_proposal.data); - self.built_from_proposed_block.leaf_commit = leaf.commit(); + // We replace our built_from_proposed_block with information from the + // quorum proposal. This is identifying the block that this specific + // instance of [BuilderState] is attempting to build for. + self.built_from_proposed_block = BuiltFromProposedBlock { + view_number: quorum_proposal.data.view_number, + vid_commitment: quorum_proposal.data.block_header.payload_commitment(), + leaf_commit: leaf.commit(), + builder_commitment: quorum_proposal.data.block_header.builder_commitment(), + }; + + let builder_state_id = BuilderStateId { + parent_commitment: self.built_from_proposed_block.vid_commitment, + parent_view: self.built_from_proposed_block.view_number, + }; + + { + // Let's ensure that we don't already have one of these BuilderStates + // running already. + + let global_state_read_lock = self.global_state.read_arc().await; + if global_state_read_lock + .spawned_builder_states + .contains_key(&builder_state_id) + { + tracing::warn!( + "Aborting spawn_clone, builder state already exists in spawned_builder_states: {:?}", + builder_state_id + ); + return; + } + } for tx in da_proposal_info.txn_commitments.iter() { self.txns_in_queue.remove(tx); @@ -389,12 +595,12 @@ impl BuilderState { self.tx_queue .retain(|tx| self.txns_in_queue.contains(&tx.commit)); - // register the spawned builder state to spawned_builder_states in the global state + // register the spawned builder state to spawned_builder_states in the + // global state We register this new child within the global_state, so + // that it can be looked up via the [BuilderStateId] in the future. self.global_state.write_arc().await.register_builder_state( - BuilderStateId { - parent_commitment: self.built_from_proposed_block.vid_commitment, - parent_view: self.built_from_proposed_block.view_number, - }, + builder_state_id, + self.built_from_proposed_block.clone(), req_sender, ); diff --git a/crates/marketplace/src/service.rs b/crates/marketplace/src/service.rs index f22303fb..b45d34f7 100644 --- a/crates/marketplace/src/service.rs +++ b/crates/marketplace/src/service.rs @@ -28,7 +28,7 @@ use crate::{ BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QuorumProposalMessage, RequestMessage, ResponseMessage, TransactionSource, }, - utils::{BlockId, BuilderStateId}, + utils::{BlockId, BuilderStateId, BuiltFromProposedBlock}, }; pub use async_broadcast::{broadcast, RecvError, TryRecvError}; use async_broadcast::{InactiveReceiver, Sender as BroadcastSender, TrySendError}; @@ -104,7 +104,20 @@ pub struct GlobalState { pub blocks: lru::LruCache, BlockInfo>, // registered builder states - pub spawned_builder_states: HashMap, BroadcastSender>>, + pub spawned_builder_states: HashMap< + BuilderStateId, + ( + // This is provided as an Option for convenience with initialization. + // When we build the initial state, we don't necessarily want to + // have to generate a valid BuiltFromProposedBlock object. As doing + // such would require a bit of setup. Additionally it would + // result in the call signature to `GlobalState::new` changing. + // However for every subsequent BuilderState, we expect this value + // to be populated. + Option>, + BroadcastSender>, + ), + >, // builder state -> last built block , it is used to respond the client // if the req channel times out during get_available_blocks @@ -134,7 +147,7 @@ impl GlobalState { parent_commitment: bootstrapped_builder_state_id, parent_view: bootstrapped_view_num, }; - spawned_builder_states.insert(bootstrap_id.clone(), bootstrap_sender.clone()); + spawned_builder_states.insert(bootstrap_id.clone(), (None, bootstrap_sender.clone())); GlobalState { blocks: lru::LruCache::new(NonZeroUsize::new(256).unwrap()), spawned_builder_states, @@ -148,11 +161,14 @@ impl GlobalState { pub fn register_builder_state( &mut self, parent_id: BuilderStateId, + built_from_proposed_block: BuiltFromProposedBlock, request_sender: BroadcastSender>, ) { // register the builder state - self.spawned_builder_states - .insert(parent_id.clone(), request_sender); + self.spawned_builder_states.insert( + parent_id.clone(), + (Some(built_from_proposed_block), request_sender), + ); // keep track of the max view number if parent_id.parent_view > self.highest_view_num_builder_id.parent_view { @@ -219,9 +235,9 @@ impl GlobalState { &self, key: &BuilderStateId, ) -> Result<&BroadcastSender>, BuildError> { - if let Some(channel) = self.spawned_builder_states.get(key) { + if let Some(id_and_sender) = self.spawned_builder_states.get(key) { tracing::info!("Got matching builder for parent {}", key); - Ok(channel) + Ok(&id_and_sender.1) } else { tracing::warn!( "failed to recover builder for parent {}, using higest view num builder with {}", @@ -231,6 +247,7 @@ impl GlobalState { // get the sender for the highest view number builder self.spawned_builder_states .get(&self.highest_view_num_builder_id) + .map(|(_, sender)| sender) .ok_or_else(|| BuildError::Error { message: "No builder state found".to_string(), }) @@ -353,7 +370,7 @@ where return Err(BuildError::NotFound); }; - let Some(sender) = self + let Some(id_and_sender) = self .global_state .read_arc() .await @@ -399,7 +416,8 @@ where response_channel: response_sender, }; - sender + id_and_sender + .1 .broadcast(MessageType::RequestMessage(request)) .await .map_err(|err| { diff --git a/crates/marketplace/src/testing/basic_test.rs b/crates/marketplace/src/testing/basic_test.rs index e9504bb8..e55c8be2 100644 --- a/crates/marketplace/src/testing/basic_test.rs +++ b/crates/marketplace/src/testing/basic_test.rs @@ -92,6 +92,7 @@ async fn test_builder() { .spawned_builder_states .get(&req_msg.1) .expect("Failed to get channel for matching builder") + .1 .broadcast(req_msg.2.clone()) .await .unwrap(); diff --git a/crates/marketplace/src/testing/mod.rs b/crates/marketplace/src/testing/mod.rs index 9dc63c52..598de98f 100644 --- a/crates/marketplace/src/testing/mod.rs +++ b/crates/marketplace/src/testing/mod.rs @@ -34,8 +34,8 @@ use hotshot_example_types::{ }; use serde::{Deserialize, Serialize}; -use crate::builder_state::BuiltFromProposedBlock; use crate::service::{broadcast_channels, GlobalState}; +use crate::utils::BuiltFromProposedBlock; use async_lock::RwLock; use committable::{Commitment, CommitmentBoundsArkless, Committable}; use std::sync::Arc; @@ -171,7 +171,7 @@ async fn calc_proposal_msg( SimpleCertificate::, SuccessThreshold>::new( quorum_data.clone(), quorum_data.commit(), - ViewNumber::new(round as u64), + prev_proposal.view_number, prev_justify_qc.signatures.clone(), PhantomData, ) diff --git a/crates/marketplace/src/testing/order_test.rs b/crates/marketplace/src/testing/order_test.rs index f5b61ec8..25a59080 100644 --- a/crates/marketplace/src/testing/order_test.rs +++ b/crates/marketplace/src/testing/order_test.rs @@ -1,25 +1,118 @@ +use hotshot_builder_api::v0_3::data_source::{AcceptsTxnSubmits, BuilderDataSource}; use hotshot_types::{ - data::{QuorumProposal, ViewNumber}, - traits::node_implementation::ConsensusTime, + bundle::Bundle, + data::QuorumProposal, + traits::node_implementation::{ConsensusTime, NodeType}, }; -use crate::builder_state::MessageType; - -use std::fmt::Debug; +use crate::{ + service::{BuilderHooks, ProxyGlobalState}, + utils::BuilderStateId, +}; -use async_compatibility_layer::art::async_sleep; -use async_std::prelude::FutureExt; +use std::{fmt::Debug, sync::Arc}; use hotshot_example_types::block_types::TestTransaction; -use crate::{builder_state::TransactionSource, testing::TestTypes}; -use crate::{ - service::handle_received_txns, - testing::{calc_proposal_msg, get_req_msg, start_builder_state}, +use crate::testing::TestTypes; +use crate::testing::{calc_proposal_msg, start_builder_state}; +use hotshot::{ + rand::{self, seq::SliceRandom, thread_rng}, + types::{BLSPubKey, Event, SignatureKey}, }; -use hotshot::rand::{self, seq::SliceRandom, thread_rng}; use std::time::Duration; +/// [NoOpHooks] is a struct placeholder that is used to implement the +/// [BuilderHooks] trait for the [TestTypes] NodeType in a way that doesn't +/// do anything. This is a convenience for creating [ProxyGlobalState] objects +struct NoOpHooks; + +#[async_trait::async_trait] +impl BuilderHooks for NoOpHooks { + #[inline(always)] + async fn process_transactions( + self: &Arc, + transactions: Vec<::Transaction>, + ) -> Vec<::Transaction> { + transactions + } + + #[inline(always)] + async fn handle_hotshot_event(self: &Arc, _event: &Event) {} +} + +/// [RoundTransactionBehavior] is an enum that is used to represent different +/// behaviors that we may want to simulate during a round. This applies to +/// determining which transactions are included in the block, and how their +/// order is adjusted before being included for consensus. +#[derive(Clone, Debug)] +enum RoundTransactionBehavior { + /// [NoAdjust] indicates that the transactions should be passed through + /// without any adjustment + NoAdjust, + + /// [Skip] indicates that the transactions should be omitted entirely + Skip, + + /// [AjdustAdd] indicates that a new transaction should be added to the + /// transactions submitted + AdjustAdd(usize), + + /// [AdjustRemoveTail] indicates that the last transaction should be removed + /// from the transactions submitted + AdjustRemoveTail, + + /// [ProposeInAdvance] indicates that a transaction should be added to the + /// transactions submitted that indicates that it is for the next round + /// (i.e. the round after the one being processed) + ProposeInAdvance(usize), + + /// [AdjustRemove] indicates that a random transaction (not the last one) + /// should be removed from the transactions submitted + AdjustRemove, +} + +impl RoundTransactionBehavior { + /// [process_transactions] is a helper method that takes a vector of transactions + /// and applies the behavior specified by the [RoundTransactionBehavior] enum + /// to the transactions before returning them. + fn process_transactions(&self, transactions: Vec) -> Vec { + match self { + RoundTransactionBehavior::NoAdjust => transactions, + RoundTransactionBehavior::Skip => vec![], + RoundTransactionBehavior::AdjustAdd(adjust_add_round) => { + let mut transactions = transactions.clone(); + transactions.insert( + rand::random::() % transactions.len(), + TestTransaction::new(vec![ + *adjust_add_round as u8, + (transactions.len() + 1) as u8, + ]), + ); + transactions + } + RoundTransactionBehavior::AdjustRemoveTail => { + let mut transactions = transactions.clone(); + transactions.pop(); + transactions + } + RoundTransactionBehavior::ProposeInAdvance(propose_in_advance_round) => { + let mut transactions = transactions.clone(); + transactions.push(TestTransaction::new(vec![ + (propose_in_advance_round + 1) as u8, + 0_u8, + ])); + transactions + } + RoundTransactionBehavior::AdjustRemove => { + let mut transactions = transactions.clone(); + transactions.remove(rand::random::() % (transactions.len() - 1)); + transactions + } + } + } +} + /// The function checks whether the common part of two transaction vectors have the same order fn order_check( transaction_history: Vec, @@ -27,7 +120,7 @@ fn order_check( ) -> bool { let all_transactions_vec = all_transactions.into_iter().flatten().collect::>(); tracing::debug!( - "Doing order check, transaction_history = {:?}, all_transactions = {:?}", + "Doing order check:\n\ttransaction_history = {:?}\n\tall_transactions = {:?}", transaction_history, all_transactions_vec ); @@ -66,6 +159,13 @@ async fn test_builder_order() { let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; + let proxy_global_state = ProxyGlobalState::new( + global_state.clone(), + Arc::new(NoOpHooks), + BLSPubKey::generated_from_seed_indexed([0; 32], 0), + Duration::from_secs(1), + ); + // Transactions to send let all_transactions = (0..NUM_ROUNDS) .map(|round| { @@ -89,6 +189,28 @@ async fn test_builder_order() { // the round we want to include tx in later round (NUM_ROUNDS -1 which is also the final round) to propose in advance let propose_in_advance_round = NUM_ROUNDS - 2; + // determine_round_behavior is a helper function that takes a round number + // and returns the desired [RoundTransactionBehavior] for that round. + let determine_round_behavior = |round: usize| -> RoundTransactionBehavior { + if round == skip_round { + return RoundTransactionBehavior::Skip; + } + + if round == adjust_add_round { + return RoundTransactionBehavior::AdjustAdd(adjust_add_round); + } + + if round == adjust_remove_tail_round { + return RoundTransactionBehavior::AdjustRemoveTail; + } + + if propose_in_advance_round == round { + return RoundTransactionBehavior::ProposeInAdvance(propose_in_advance_round + 1); + } + + RoundTransactionBehavior::NoAdjust + }; + // set up state to track between simulated consensus rounds let mut prev_proposed_transactions: Option> = None; let mut prev_quorum_proposal: Option> = None; @@ -97,98 +219,60 @@ async fn test_builder_order() { // Simulate NUM_ROUNDS of consensus. First we submit the transactions for this round to the builder, // then construct DA and Quorum Proposals based on what we received from builder in the previous round // and request a new bundle. - #[allow(clippy::needless_range_loop)] // intent is clearer this way - for round in 0..NUM_ROUNDS { - // simulate transaction being submitted to the builder - for res in handle_received_txns( - &senders.transactions, - all_transactions[round].clone(), - TransactionSource::HotShot, - ) - .await - { - res.unwrap(); - } - - // get transactions submitted in previous rounds, [] for genesis - // and simulate the block built from those - let transactions = prev_proposed_transactions.take().unwrap_or_default(); - let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions).await; + for (round, round_transactions, round_behavior) in all_transactions + .iter() + .enumerate() + .map(|(round, txns)| (round, txns, determine_round_behavior(round))) + { + // Simulate consensus deciding on the transactions that are included + // in the block. + let BuilderStateId { + parent_view, + parent_commitment, + } = { + // get transactions submitted in previous rounds, [] for genesis + // and simulate the block built from those + let transactions = prev_proposed_transactions.take().unwrap_or_default(); + let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = + calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions) + .await; + + prev_quorum_proposal = Some(quorum_proposal.clone()); + + // send quorum and DA proposals for this round + senders + .da_proposal + .broadcast(da_proposal_msg) + .await + .unwrap(); + senders + .quorum_proposal + .broadcast(quorum_proposal_msg) + .await + .unwrap(); - prev_quorum_proposal = Some(quorum_proposal.clone()); + builder_state_id + }; - // send quorum and DA proposals for this round - senders - .da_proposal - .broadcast(da_proposal_msg) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(quorum_proposal_msg) + // simulate transaction being submitted to the builder + proxy_global_state + .submit_txns(round_transactions.clone()) .await .unwrap(); - let req_msg = get_req_msg(round as u64, builder_state_id).await; - - // give builder state time to fork - async_sleep(Duration::from_millis(100)).await; - - // get the builder state for parent view we've just simulated - global_state - .read_arc() - .await - .spawned_builder_states - .get(&req_msg.1) - .expect("Failed to get channel for matching builder") - .broadcast(req_msg.2.clone()) + let Bundle { transactions, .. } = proxy_global_state + .bundle(parent_view.u64(), &parent_commitment, parent_view.u64()) .await .unwrap(); - // get response - // in the next round we will use received transactions to simulate - // the block being proposed - let res_msg = req_msg - .0 - .recv() - .timeout(Duration::from_secs(10)) - .await - .unwrap() - .unwrap(); + // process the specific round behavior to modify the transactions we + // received + let transactions = round_behavior.process_transactions(transactions); + + prev_proposed_transactions = Some(transactions.clone()); - // play with transactions propsed by proposers: skip the whole round OR interspersed some txs randomly OR remove some txs randomly - if let MessageType::::RequestMessage(ref request) = req_msg.2 { - let view_number = request.requested_view_number; - if view_number == ViewNumber::new(skip_round as u64) { - prev_proposed_transactions = None; - } else { - let mut proposed_transactions = res_msg.transactions.clone(); - if view_number == ViewNumber::new(adjust_add_round as u64) { - proposed_transactions.insert( - rand::random::() % NUM_TXNS_PER_ROUND, - TestTransaction::new(vec![ - adjust_add_round as u8, - (NUM_TXNS_PER_ROUND + 1) as u8, - ]), - ); - } else if view_number == ViewNumber::new(adjust_remove_tail_round as u64) { - proposed_transactions.pop(); - } else if view_number == ViewNumber::new(propose_in_advance_round as u64) { - proposed_transactions.push(TestTransaction::new(vec![ - (propose_in_advance_round + 1) as u8, - 0_u8, - ])); - } - prev_proposed_transactions = Some(proposed_transactions); - } - } else { - tracing::error!("Unable to get request from RequestMessage"); - } // save transactions to history - if prev_proposed_transactions.is_some() { - transaction_history.extend(prev_proposed_transactions.clone().unwrap()); - } + transaction_history.extend(transactions); } // we should've served all transactions submitted, and in correct order @@ -222,7 +306,23 @@ async fn test_builder_order_chain_fork() { // round 3 should be back to normal, there's no fork anymore let fork_round = 1; + // determine_round_behavior is a helper function that takes a round number + // and returns the desired [RoundTransactionBehavior] for that round. + let determine_round_behavior = |round: usize| -> RoundTransactionBehavior { + if round == fork_round { + return RoundTransactionBehavior::Skip; + } + + RoundTransactionBehavior::NoAdjust + }; + let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; + let proxy_global_state = ProxyGlobalState::new( + global_state.clone(), + Arc::new(NoOpHooks), + BLSPubKey::generated_from_seed_indexed([0; 32], 0), + Duration::from_secs(1), + ); // Transactions to send let all_transactions = (0..NUM_ROUNDS) @@ -234,180 +334,156 @@ async fn test_builder_order_chain_fork() { .collect::>(); // set up state to track between simulated consensus rounds - let mut prev_proposed_transactions: Option> = None; - let mut prev_quorum_proposal: Option> = None; - let mut transaction_history = Vec::new(); + let mut prev_proposed_transactions_branch_1: Option> = None; + let mut prev_quorum_proposal_branch_1: Option> = None; + let mut transaction_history_branch_1 = Vec::new(); // set up state to track the fork-ed chain - let mut prev_proposed_transactions_2: Option> = None; - let mut prev_quorum_proposal_2: Option> = None; - let mut transaction_history_2 = Vec::new(); - // the parameter to track whether there's a fork by pending whether the transactions submitted in - // the previous round are the same - let mut fork: bool; + let mut prev_proposed_transactions_branch_2: Option> = None; + let mut prev_quorum_proposal_branch_2: Option> = None; + let mut transaction_history_branch_2 = Vec::new(); // Simulate NUM_ROUNDS of consensus. First we submit the transactions for this round to the builder, // then construct DA and Quorum Proposals based on what we received from builder in the previous round // and request a new bundle. - #[allow(clippy::needless_range_loop)] // intent is clearer this way - for round in 0..NUM_ROUNDS { - // simulate transaction being submitted to the builder - for res in handle_received_txns( - &senders.transactions, - all_transactions[round].clone(), - TransactionSource::HotShot, - ) - .await - { - res.unwrap(); - } + for (round, transactions, fork_round_behavior) in all_transactions + .iter() + .enumerate() + .map(|(round, txns)| (round, txns, determine_round_behavior(round))) + { + // Simulate consensus deciding on the transactions that are included + // in the block, branch 1 + let BuilderStateId { + parent_view: parent_view_branch_1, + parent_commitment: parent_commitment_branch_1, + } = { + // get transactions submitted in previous rounds, [] for genesis + // and simulate the block built from those + let transactions = prev_proposed_transactions_branch_1 + .clone() + .unwrap_or_default(); + let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = + calc_proposal_msg( + NUM_STORAGE_NODES, + round, + prev_quorum_proposal_branch_1, + transactions, + ) + .await; + + prev_quorum_proposal_branch_1 = Some(quorum_proposal.clone()); + + // send quorum and DA proposals for this round + senders + .da_proposal + .broadcast(da_proposal_msg) + .await + .unwrap(); + senders + .quorum_proposal + .broadcast(quorum_proposal_msg) + .await + .unwrap(); - // get transactions submitted in previous rounds, [] for genesis - // and simulate the block built from those - let transactions = prev_proposed_transactions.take().unwrap_or_default(); - let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg( - NUM_STORAGE_NODES, - round, - prev_quorum_proposal, - transactions.clone(), - ) - .await; - - let transactions_2 = prev_proposed_transactions_2.take().unwrap_or_default(); - let (quorum_proposal_2, quorum_proposal_msg_2, da_proposal_msg_2, builder_state_id_2) = - calc_proposal_msg( - NUM_STORAGE_NODES, - round, - prev_quorum_proposal_2.clone(), - transactions_2.clone(), - ) - .await; - if transactions_2 != transactions { - fork = true; - tracing::debug!("Fork Exist.") - } else { - fork = false; - tracing::debug!("No fork."); - } + builder_state_id + }; + + // Simulate consensus deciding on the transactions that are included + // in the block, branch 2 + let BuilderStateId { + parent_view: parent_view_branch_2, + parent_commitment: parent_commitment_branch_2, + } = { + // get transactions submitted in previous rounds, [] for genesis + // and simulate the block built from those + let transactions = prev_proposed_transactions_branch_2 + .clone() + .unwrap_or_default(); + let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = + calc_proposal_msg( + NUM_STORAGE_NODES, + round, + prev_quorum_proposal_branch_2, + transactions, + ) + .await; + + prev_quorum_proposal_branch_2 = Some(quorum_proposal.clone()); + + // send quorum and DA proposals for this round + // we also need to send out the message for the fork-ed chain although it's not forked yet + // to prevent builders resend the transactions we've already committed + senders + .da_proposal + .broadcast(da_proposal_msg) + .await + .unwrap(); + senders + .quorum_proposal + .broadcast(quorum_proposal_msg) + .await + .unwrap(); - prev_quorum_proposal = Some(quorum_proposal.clone()); - // send quorum and DA proposals for this round - senders - .da_proposal - .broadcast(da_proposal_msg) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(quorum_proposal_msg) - .await - .unwrap(); + builder_state_id + }; - prev_quorum_proposal_2 = Some(quorum_proposal_2.clone()); - // send quorum and DA proposals for this round - // we also need to send out the message for the fork-ed chain although it's not forked yet - // to prevent builders resend the transactions we've already committeed - senders - .da_proposal - .broadcast(da_proposal_msg_2) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(quorum_proposal_msg_2) + // simulate transaction being submitted to the builder + proxy_global_state + .submit_txns(transactions.clone()) .await .unwrap(); - let req_msg = get_req_msg(round as u64, builder_state_id).await; - // give builder state time to fork - async_sleep(Duration::from_millis(100)).await; - - // get the builder state for parent view we've just simulated - global_state - .read_arc() - .await - .spawned_builder_states - .get(&req_msg.1) - .expect("Failed to get channel for matching builder") - .broadcast(req_msg.2.clone()) + let Bundle { + transactions: transactions_branch_1, + .. + } = proxy_global_state + .bundle( + parent_view_branch_1.u64(), + &parent_commitment_branch_1, + parent_view_branch_1.u64(), + ) .await .unwrap(); - // get response - // in the next round we will use received transactions to simulate - // the block being proposed - let res_msg = req_msg - .0 - .recv() - .timeout(Duration::from_secs(10)) + let Bundle { + transactions: transactions_branch_2, + .. + } = proxy_global_state + .bundle( + parent_view_branch_2.u64(), + &parent_commitment_branch_2, + parent_view_branch_2.u64(), + ) .await - .unwrap() .unwrap(); - // we have to get separate request message and response message when there's a fork - if fork { - let req_msg_2 = get_req_msg(round as u64, builder_state_id_2).await; - // give builder state time to fork - async_sleep(Duration::from_millis(100)).await; - - // get the builder state for parent view we've just simulated - global_state - .read_arc() - .await - .spawned_builder_states - .get(&req_msg_2.1) - .expect("Failed to get channel for matching builder") - .broadcast(req_msg_2.2.clone()) - .await - .unwrap(); - - // get response - let res_msg_2 = req_msg_2 - .0 - .recv() - .timeout(Duration::from_secs(10)) - .await - .unwrap() - .unwrap(); - - // play with transactions propsed by proposers: at the fork_round, one chain propose while the other chain does not propose any - let proposed_transactions_2 = res_msg_2.transactions.clone(); - prev_proposed_transactions_2 = Some(proposed_transactions_2); - } - - // play with transactions propsed by proposers: at the fork_round, one chain propose while the other chain does not propose any - let proposed_transactions = res_msg.transactions.clone(); - prev_proposed_transactions = Some(proposed_transactions); - // if it's the `fork_round` we'll change what we want to propse to `None` for the fork-ed chain - if let MessageType::::RequestMessage(ref request) = req_msg.2 { - let view_number = request.requested_view_number; - if view_number == ViewNumber::new(fork_round as u64) { - prev_proposed_transactions_2 = None; - } else { - prev_proposed_transactions_2 = prev_proposed_transactions.clone(); - } + let transactions_branch_2 = fork_round_behavior.process_transactions(transactions_branch_2); + if transactions_branch_2 != transactions_branch_1 { + tracing::debug!("Fork Exist.") } else { - tracing::error!("Unable to get request from RequestMessage"); + tracing::debug!("No fork."); } + prev_proposed_transactions_branch_1 = Some(transactions_branch_1.clone()); + prev_proposed_transactions_branch_2 = Some(transactions_branch_2.clone()); + // save transactions to history - if prev_proposed_transactions.is_some() { - transaction_history.extend(prev_proposed_transactions.clone().unwrap()); - } - if prev_proposed_transactions_2.is_some() { - transaction_history_2.extend(prev_proposed_transactions_2.clone().unwrap()); - } + transaction_history_branch_1.extend(transactions_branch_1); + transaction_history_branch_2.extend(transactions_branch_2); } + // With a fork, the transaction history should match once all transactions + // have been processed. + assert_eq!( + transaction_history_branch_1, transaction_history_branch_2, + "even with a fork, the transaction history branches should match" + ); // the test will fail if any transaction is re-ordered - assert!(order_check(transaction_history, all_transactions.clone())); - assert!(order_check(transaction_history_2, all_transactions)); - // the test will fail if any transaction is skipped or re-ordered - // assert_eq!( - // transaction_history_2, - // all_transactions.into_iter().flatten().collect::>() - // ); + assert!(order_check( + transaction_history_branch_1, + all_transactions.clone() + )); + assert!(order_check(transaction_history_branch_2, all_transactions)); } /// This test simulates multiple builder states receiving messages from the channels and processing them @@ -430,6 +506,12 @@ async fn test_builder_order_should_fail() { const NUM_STORAGE_NODES: usize = 4; let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; + let proxy_global_state = ProxyGlobalState::new( + global_state, + Arc::new(NoOpHooks), + BLSPubKey::generated_from_seed_indexed([0; 32], 0), + Duration::from_secs(1), + ); // Transactions to send let all_transactions = (0..NUM_ROUNDS) @@ -443,6 +525,15 @@ async fn test_builder_order_should_fail() { // generate a random number between (0..NUM_ROUNDS) to do some changes for output transactions // the round we want to skip some transactions (cannot be the final round), after it is enabled the test is expected to fail let adjust_remove_round = rand::random::() % (NUM_ROUNDS - 1); + // determine_round_behavior is a helper function that takes a round number + // and returns the desired [RoundTransactionBehavior] for that round. + let determine_round_behavior = |round: usize| -> RoundTransactionBehavior { + if round == adjust_remove_round { + return RoundTransactionBehavior::AdjustRemove; + } + + RoundTransactionBehavior::NoAdjust + }; // set up state to track between simulated consensus rounds let mut prev_proposed_transactions: Option> = None; let mut prev_quorum_proposal: Option> = None; @@ -451,82 +542,58 @@ async fn test_builder_order_should_fail() { // Simulate NUM_ROUNDS of consensus. First we submit the transactions for this round to the builder, // then construct DA and Quorum Proposals based on what we received from builder in the previous round // and request a new bundle. - #[allow(clippy::needless_range_loop)] // intent is clearer this way - for round in 0..NUM_ROUNDS { - // simulate transaction being submitted to the builder - for res in handle_received_txns( - &senders.transactions, - all_transactions[round].clone(), - TransactionSource::HotShot, - ) - .await - { - res.unwrap(); - } - - // get transactions submitted in previous rounds, [] for genesis - // and simulate the block built from those - let transactions = prev_proposed_transactions.take().unwrap_or_default(); - let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions).await; + for (round, round_transactions, round_behavior) in all_transactions + .iter() + .enumerate() + .map(|(round, txns)| (round, txns, determine_round_behavior(round))) + { + // Simulate consensus deciding on the transactions that are included + // in the block. + let BuilderStateId { + parent_view, + parent_commitment, + } = { + // get transactions submitted in previous rounds, [] for genesis + // and simulate the block built from those + let transactions = prev_proposed_transactions.take().unwrap_or_default(); + let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = + calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions) + .await; + + prev_quorum_proposal = Some(quorum_proposal.clone()); + + // send quorum and DA proposals for this round + senders + .da_proposal + .broadcast(da_proposal_msg) + .await + .unwrap(); + senders + .quorum_proposal + .broadcast(quorum_proposal_msg) + .await + .unwrap(); - prev_quorum_proposal = Some(quorum_proposal.clone()); + builder_state_id + }; - // send quorum and DA proposals for this round - senders - .da_proposal - .broadcast(da_proposal_msg) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(quorum_proposal_msg) + // simulate transaction being submitted to the builder + proxy_global_state + .submit_txns(round_transactions.clone()) .await .unwrap(); - let req_msg = get_req_msg(round as u64, builder_state_id).await; - - // give builder state time to fork - async_sleep(Duration::from_millis(100)).await; - - // get the builder state for parent view we've just simulated - global_state - .read_arc() - .await - .spawned_builder_states - .get(&req_msg.1) - .expect("Failed to get channel for matching builder") - .broadcast(req_msg.2.clone()) + let Bundle { transactions, .. } = proxy_global_state + .bundle(parent_view.u64(), &parent_commitment, parent_view.u64()) .await .unwrap(); - // get response - // in the next round we will use received transactions to simulate - // the block being proposed - let res_msg = req_msg - .0 - .recv() - .timeout(Duration::from_secs(10)) - .await - .unwrap() - .unwrap(); + let transactions = round_behavior.process_transactions(transactions); + + prev_proposed_transactions = Some(transactions.clone()); - // play with transactions propsed by proposers: skip the whole round OR interspersed some txs randomly OR remove some txs randomly - if let MessageType::::RequestMessage(ref request) = req_msg.2 { - let view_number = request.requested_view_number; - let mut proposed_transactions = res_msg.transactions.clone(); - if view_number == ViewNumber::new(adjust_remove_round as u64) { - proposed_transactions.remove(rand::random::() % (NUM_TXNS_PER_ROUND - 1)); - // cannot be the last transaction - } - prev_proposed_transactions = Some(proposed_transactions); - } else { - tracing::error!("Unable to get request from RequestMessage"); - } // save transactions to history - if prev_proposed_transactions.is_some() { - transaction_history.extend(prev_proposed_transactions.clone().unwrap()); - } + transaction_history.extend(transactions); } // we should've served all transactions submitted, and in correct order // the test will fail if the common part of two vectors of transactions don't have the same order diff --git a/crates/marketplace/src/utils.rs b/crates/marketplace/src/utils.rs index 29224abe..f46684ef 100644 --- a/crates/marketplace/src/utils.rs +++ b/crates/marketplace/src/utils.rs @@ -8,12 +8,13 @@ use std::{ time::{Duration, Instant}, }; +use committable::Commitment; use either::Either::{self, Left, Right}; use futures::{FutureExt, Stream, StreamExt}; use hotshot::types::Event; use hotshot_events_service::events::Error as EventStreamError; use hotshot_types::{ - traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment, + data::Leaf, traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment, }; use surf_disco::Client; use tracing::error; @@ -122,6 +123,22 @@ impl std::fmt::Display for BuilderStateId { } } +/// Builder State to hold the state of the builder +#[derive(Debug, Clone)] +pub struct BuiltFromProposedBlock { + pub view_number: TYPES::Time, + pub vid_commitment: VidCommitment, + pub leaf_commit: Commitment>, + pub builder_commitment: BuilderCommitment, +} + +// implement display for the derived info +impl std::fmt::Display for BuiltFromProposedBlock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "View Number: {:?}", self.view_number) + } +} + type EventServiceConnection = surf_disco::socket::Connection< Event, surf_disco::socket::Unsupported,