From 888df1da6759eea631cf92a4661b43a506b82266 Mon Sep 17 00:00:00 2001 From: Theodore Schnepper Date: Thu, 5 Sep 2024 06:43:03 -0600 Subject: [PATCH 1/9] Refactor order tests for deterministic behavior The current order tests can be flaky at times as they rely on waiting for the builder's logic to process via `async_sleep` without being able to determine if its in a state that is ready. To address this specifically the code has been refactored to work with `ProxyGlobalState` instead of the `GlobalState` directly as `ProxyGlobalState` already has some mechanisms for waiting for things to be completed. This also aids in simplifying a bit of the internal logic, as we were reimplementing what `ProxyGlobalState` was already doing. In addition to these changes this also attempts to refactor the code in such a way as to remove the conditionals that occur within the body of the for each loops. This is done by adding an enum for round behavior that can adjust the transactions returned by the bundle call as needed. The order of progression within the for each loop has also been adjusted to perform the consensus portion at the top of the loop before transactions are submitted. Since this order isn't that important, it seems better to have the consensus to occur at the top of the loop rather than after the transactions as it makes it more clear which portion of the logic we are currently in. I.E. are we currently considering consensus' role, or the builder's role. --- crates/marketplace/src/testing/order_test.rs | 684 ++++++++++--------- 1 file changed, 374 insertions(+), 310 deletions(-) diff --git a/crates/marketplace/src/testing/order_test.rs b/crates/marketplace/src/testing/order_test.rs index f5b61ec8..20b350e4 100644 --- a/crates/marketplace/src/testing/order_test.rs +++ b/crates/marketplace/src/testing/order_test.rs @@ -1,25 +1,118 @@ +use hotshot_builder_api::v0_3::data_source::{AcceptsTxnSubmits, BuilderDataSource}; use hotshot_types::{ - data::{QuorumProposal, ViewNumber}, - traits::node_implementation::ConsensusTime, + bundle::Bundle, + data::QuorumProposal, + traits::node_implementation::{ConsensusTime, NodeType}, }; -use crate::builder_state::MessageType; - -use std::fmt::Debug; +use crate::{ + service::{BuilderHooks, ProxyGlobalState}, + utils::BuilderStateId, +}; -use async_compatibility_layer::art::async_sleep; -use async_std::prelude::FutureExt; +use std::{fmt::Debug, sync::Arc}; use hotshot_example_types::block_types::TestTransaction; -use crate::{builder_state::TransactionSource, testing::TestTypes}; -use crate::{ - service::handle_received_txns, - testing::{calc_proposal_msg, get_req_msg, start_builder_state}, +use crate::testing::TestTypes; +use crate::testing::{calc_proposal_msg, start_builder_state}; +use hotshot::{ + rand::{self, seq::SliceRandom, thread_rng}, + types::{BLSPubKey, Event, SignatureKey}, }; -use hotshot::rand::{self, seq::SliceRandom, thread_rng}; use std::time::Duration; +/// [NoOpHooks] is a struct placeholder that is used to implement the +/// [BuilderHooks] trait for the [TestTypes] NodeType in a way that doesn't +/// do anything. This is a convenience for creating [ProxyGlobalState] objects +struct NoOpHooks; + +#[async_trait::async_trait] +impl BuilderHooks for NoOpHooks { + #[inline(always)] + async fn process_transactions( + self: &Arc, + transactions: Vec<::Transaction>, + ) -> Vec<::Transaction> { + transactions + } + + #[inline(always)] + async fn handle_hotshot_event(self: &Arc, _event: &Event) {} +} + +/// [RoundTransactionBehavior] is an enum that is used to represent different +/// behaviors that we may want to simulate during a round. This applies to +/// determining which transactions are included in the block, and how their +/// order is adjusted before being included for consensus. +#[derive(Clone, Debug)] +enum RoundTransactionBehavior { + /// [NoAdjust] indicates that the transactions should be passed through + /// without any adjustment + NoAdjust, + + /// [Skip] indicates that the transactions should be omitted entirely + Skip, + + /// [AjdustAdd] indicates that a new transaction should be added to the + /// transactions submitted + AdjustAdd(usize), + + /// [AdjustRemoveTail] indicates that the last transaction should be removed + /// from the transactions submitted + AdjustRemoveTail, + + /// [ProposeInAdvance] indicates that a transaction should be added to the + /// transactions submitted that indicates that it is for the next round + /// (i.e. the round after the one being processed) + ProposeInAdvance(usize), + + /// [AdjustRemove] indicates that a random transaction (not the last one) + /// should be removed from the transactions submitted + AdjustRemove, +} + +impl RoundTransactionBehavior { + /// [process_transactions] is a helper method that takes a vector of transactions + /// and applies the behavior specified by the [RoundTransactionBehavior] enum + /// to the transactions before returning them. + fn process_transactions(&self, transactions: Vec) -> Vec { + match self { + RoundTransactionBehavior::NoAdjust => transactions, + RoundTransactionBehavior::Skip => vec![], + RoundTransactionBehavior::AdjustAdd(adjust_add_round) => { + let mut transactions = transactions.clone(); + transactions.insert( + rand::random::() % transactions.len(), + TestTransaction::new(vec![ + *adjust_add_round as u8, + (transactions.len() + 1) as u8, + ]), + ); + transactions + } + RoundTransactionBehavior::AdjustRemoveTail => { + let mut transactions = transactions.clone(); + transactions.pop(); + transactions + } + RoundTransactionBehavior::ProposeInAdvance(propose_in_advance_round) => { + let mut transactions = transactions.clone(); + transactions.push(TestTransaction::new(vec![ + (propose_in_advance_round + 1) as u8, + 0_u8, + ])); + transactions + } + RoundTransactionBehavior::AdjustRemove => { + let mut transactions = transactions.clone(); + transactions.remove(rand::random::() % (transactions.len() - 1)); + transactions + } + } + } +} + /// The function checks whether the common part of two transaction vectors have the same order fn order_check( transaction_history: Vec, @@ -27,7 +120,7 @@ fn order_check( ) -> bool { let all_transactions_vec = all_transactions.into_iter().flatten().collect::>(); tracing::debug!( - "Doing order check, transaction_history = {:?}, all_transactions = {:?}", + "Doing order check:\n\ttransaction_history = {:?}\n\tall_transactions = {:?}", transaction_history, all_transactions_vec ); @@ -66,6 +159,13 @@ async fn test_builder_order() { let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; + let proxy_global_state = ProxyGlobalState::new( + global_state.clone(), + Arc::new(NoOpHooks), + BLSPubKey::generated_from_seed_indexed([0; 32], 0), + Duration::from_secs(1), + ); + // Transactions to send let all_transactions = (0..NUM_ROUNDS) .map(|round| { @@ -89,6 +189,28 @@ async fn test_builder_order() { // the round we want to include tx in later round (NUM_ROUNDS -1 which is also the final round) to propose in advance let propose_in_advance_round = NUM_ROUNDS - 2; + // determine_round_behavior is a helper function that takes a round number + // and returns the desired [RoundTransactionBehavior] for that round. + let determine_round_behavior = |round: usize| -> RoundTransactionBehavior { + if round == skip_round { + return RoundTransactionBehavior::Skip; + } + + if round == adjust_add_round { + return RoundTransactionBehavior::AdjustAdd(adjust_add_round); + } + + if round == adjust_remove_tail_round { + return RoundTransactionBehavior::AdjustRemoveTail; + } + + if propose_in_advance_round == round { + return RoundTransactionBehavior::ProposeInAdvance(propose_in_advance_round + 1); + } + + RoundTransactionBehavior::NoAdjust + }; + // set up state to track between simulated consensus rounds let mut prev_proposed_transactions: Option> = None; let mut prev_quorum_proposal: Option> = None; @@ -97,98 +219,60 @@ async fn test_builder_order() { // Simulate NUM_ROUNDS of consensus. First we submit the transactions for this round to the builder, // then construct DA and Quorum Proposals based on what we received from builder in the previous round // and request a new bundle. - #[allow(clippy::needless_range_loop)] // intent is clearer this way - for round in 0..NUM_ROUNDS { - // simulate transaction being submitted to the builder - for res in handle_received_txns( - &senders.transactions, - all_transactions[round].clone(), - TransactionSource::HotShot, - ) - .await - { - res.unwrap(); - } - - // get transactions submitted in previous rounds, [] for genesis - // and simulate the block built from those - let transactions = prev_proposed_transactions.take().unwrap_or_default(); - let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions).await; + for (round, round_transactions, round_behavior) in all_transactions + .iter() + .enumerate() + .map(|(round, txns)| (round, txns, determine_round_behavior(round))) + { + // Simulate consensus deciding on the transactions that are included + // in the block. + let BuilderStateId { + parent_view, + parent_commitment, + } = { + // get transactions submitted in previous rounds, [] for genesis + // and simulate the block built from those + let transactions = prev_proposed_transactions.take().unwrap_or_default(); + let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = + calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions) + .await; + + prev_quorum_proposal = Some(quorum_proposal.clone()); + + // send quorum and DA proposals for this round + senders + .da_proposal + .broadcast(da_proposal_msg) + .await + .unwrap(); + senders + .quorum_proposal + .broadcast(quorum_proposal_msg) + .await + .unwrap(); - prev_quorum_proposal = Some(quorum_proposal.clone()); + builder_state_id + }; - // send quorum and DA proposals for this round - senders - .da_proposal - .broadcast(da_proposal_msg) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(quorum_proposal_msg) + // simulate transaction being submitted to the builder + proxy_global_state + .submit_txns(round_transactions.clone()) .await .unwrap(); - let req_msg = get_req_msg(round as u64, builder_state_id).await; - - // give builder state time to fork - async_sleep(Duration::from_millis(100)).await; - - // get the builder state for parent view we've just simulated - global_state - .read_arc() - .await - .spawned_builder_states - .get(&req_msg.1) - .expect("Failed to get channel for matching builder") - .broadcast(req_msg.2.clone()) + let Bundle { transactions, .. } = proxy_global_state + .bundle(parent_view.u64(), &parent_commitment, parent_view.u64()) .await .unwrap(); - // get response - // in the next round we will use received transactions to simulate - // the block being proposed - let res_msg = req_msg - .0 - .recv() - .timeout(Duration::from_secs(10)) - .await - .unwrap() - .unwrap(); + // process the specific round behavior to modify the transactions we + // received + let transactions = round_behavior.process_transactions(transactions); + + prev_proposed_transactions = Some(transactions.clone()); - // play with transactions propsed by proposers: skip the whole round OR interspersed some txs randomly OR remove some txs randomly - if let MessageType::::RequestMessage(ref request) = req_msg.2 { - let view_number = request.requested_view_number; - if view_number == ViewNumber::new(skip_round as u64) { - prev_proposed_transactions = None; - } else { - let mut proposed_transactions = res_msg.transactions.clone(); - if view_number == ViewNumber::new(adjust_add_round as u64) { - proposed_transactions.insert( - rand::random::() % NUM_TXNS_PER_ROUND, - TestTransaction::new(vec![ - adjust_add_round as u8, - (NUM_TXNS_PER_ROUND + 1) as u8, - ]), - ); - } else if view_number == ViewNumber::new(adjust_remove_tail_round as u64) { - proposed_transactions.pop(); - } else if view_number == ViewNumber::new(propose_in_advance_round as u64) { - proposed_transactions.push(TestTransaction::new(vec![ - (propose_in_advance_round + 1) as u8, - 0_u8, - ])); - } - prev_proposed_transactions = Some(proposed_transactions); - } - } else { - tracing::error!("Unable to get request from RequestMessage"); - } // save transactions to history - if prev_proposed_transactions.is_some() { - transaction_history.extend(prev_proposed_transactions.clone().unwrap()); - } + transaction_history.extend(transactions); } // we should've served all transactions submitted, and in correct order @@ -222,7 +306,23 @@ async fn test_builder_order_chain_fork() { // round 3 should be back to normal, there's no fork anymore let fork_round = 1; + // determine_round_behavior is a helper function that takes a round number + // and returns the desired [RoundTransactionBehavior] for that round. + let determine_round_behavior = |round: usize| -> RoundTransactionBehavior { + if round == fork_round { + return RoundTransactionBehavior::Skip; + } + + RoundTransactionBehavior::NoAdjust + }; + let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; + let proxy_global_state = ProxyGlobalState::new( + global_state.clone(), + Arc::new(NoOpHooks), + BLSPubKey::generated_from_seed_indexed([0; 32], 0), + Duration::from_secs(1), + ); // Transactions to send let all_transactions = (0..NUM_ROUNDS) @@ -234,180 +334,153 @@ async fn test_builder_order_chain_fork() { .collect::>(); // set up state to track between simulated consensus rounds - let mut prev_proposed_transactions: Option> = None; - let mut prev_quorum_proposal: Option> = None; - let mut transaction_history = Vec::new(); + let mut prev_proposed_transactions_branch_1: Option> = None; + let mut prev_quorum_proposal_branch_1: Option> = None; + let mut transaction_history_branch_1 = Vec::new(); // set up state to track the fork-ed chain - let mut prev_proposed_transactions_2: Option> = None; - let mut prev_quorum_proposal_2: Option> = None; - let mut transaction_history_2 = Vec::new(); - // the parameter to track whether there's a fork by pending whether the transactions submitted in - // the previous round are the same - let mut fork: bool; + let mut prev_proposed_transactions_branch_2: Option> = None; + let mut prev_quorum_proposal_branch_2: Option> = None; + let mut transaction_history_branch_2 = Vec::new(); // Simulate NUM_ROUNDS of consensus. First we submit the transactions for this round to the builder, // then construct DA and Quorum Proposals based on what we received from builder in the previous round // and request a new bundle. - #[allow(clippy::needless_range_loop)] // intent is clearer this way - for round in 0..NUM_ROUNDS { - // simulate transaction being submitted to the builder - for res in handle_received_txns( - &senders.transactions, - all_transactions[round].clone(), - TransactionSource::HotShot, - ) - .await - { - res.unwrap(); - } + for (round, transactions, fork_round_behavior) in all_transactions + .iter() + .enumerate() + .map(|(round, txns)| (round, txns, determine_round_behavior(round))) + { + // Simulate consensus deciding on the transactions that are included + // in the block, branch 1 + let BuilderStateId { + parent_view: parent_view_branch_1, + parent_commitment: parent_commitment_branch_1, + } = { + // get transactions submitted in previous rounds, [] for genesis + // and simulate the block built from those + let transactions = prev_proposed_transactions_branch_1 + .take() + .unwrap_or_default(); + let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = + calc_proposal_msg( + NUM_STORAGE_NODES, + round, + prev_quorum_proposal_branch_1, + transactions, + ) + .await; + + prev_quorum_proposal_branch_1 = Some(quorum_proposal.clone()); + + // send quorum and DA proposals for this round + senders + .da_proposal + .broadcast(da_proposal_msg) + .await + .unwrap(); + senders + .quorum_proposal + .broadcast(quorum_proposal_msg) + .await + .unwrap(); - // get transactions submitted in previous rounds, [] for genesis - // and simulate the block built from those - let transactions = prev_proposed_transactions.take().unwrap_or_default(); - let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg( - NUM_STORAGE_NODES, - round, - prev_quorum_proposal, - transactions.clone(), - ) - .await; - - let transactions_2 = prev_proposed_transactions_2.take().unwrap_or_default(); - let (quorum_proposal_2, quorum_proposal_msg_2, da_proposal_msg_2, builder_state_id_2) = - calc_proposal_msg( - NUM_STORAGE_NODES, - round, - prev_quorum_proposal_2.clone(), - transactions_2.clone(), - ) - .await; - if transactions_2 != transactions { - fork = true; - tracing::debug!("Fork Exist.") - } else { - fork = false; - tracing::debug!("No fork."); - } + builder_state_id + }; + + // Simulate consensus deciding on the transactions that are included + // in the block, branch 2 + let BuilderStateId { + parent_view: parent_view_branch_2, + parent_commitment: parent_commitment_branch_2, + } = { + // get transactions submitted in previous rounds, [] for genesis + // and simulate the block built from those + let transactions = prev_proposed_transactions_branch_2 + .take() + .unwrap_or_default(); + let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = + calc_proposal_msg( + NUM_STORAGE_NODES, + round, + prev_quorum_proposal_branch_2, + transactions, + ) + .await; + + prev_quorum_proposal_branch_2 = Some(quorum_proposal.clone()); + + // send quorum and DA proposals for this round + // we also need to send out the message for the fork-ed chain although it's not forked yet + // to prevent builders resend the transactions we've already committeed + senders + .da_proposal + .broadcast(da_proposal_msg) + .await + .unwrap(); + senders + .quorum_proposal + .broadcast(quorum_proposal_msg) + .await + .unwrap(); - prev_quorum_proposal = Some(quorum_proposal.clone()); - // send quorum and DA proposals for this round - senders - .da_proposal - .broadcast(da_proposal_msg) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(quorum_proposal_msg) - .await - .unwrap(); + builder_state_id + }; - prev_quorum_proposal_2 = Some(quorum_proposal_2.clone()); - // send quorum and DA proposals for this round - // we also need to send out the message for the fork-ed chain although it's not forked yet - // to prevent builders resend the transactions we've already committeed - senders - .da_proposal - .broadcast(da_proposal_msg_2) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(quorum_proposal_msg_2) + // simulate transaction being submitted to the builder + proxy_global_state + .submit_txns(transactions.clone()) .await .unwrap(); - let req_msg = get_req_msg(round as u64, builder_state_id).await; - // give builder state time to fork - async_sleep(Duration::from_millis(100)).await; - - // get the builder state for parent view we've just simulated - global_state - .read_arc() - .await - .spawned_builder_states - .get(&req_msg.1) - .expect("Failed to get channel for matching builder") - .broadcast(req_msg.2.clone()) + let Bundle { + transactions: transactions_branch_1, + .. + } = proxy_global_state + .bundle( + parent_view_branch_1.u64(), + &parent_commitment_branch_1, + parent_view_branch_1.u64(), + ) .await .unwrap(); - // get response - // in the next round we will use received transactions to simulate - // the block being proposed - let res_msg = req_msg - .0 - .recv() - .timeout(Duration::from_secs(10)) + let Bundle { + transactions: transactions_branch_2, + .. + } = proxy_global_state + .bundle( + parent_view_branch_2.u64(), + &parent_commitment_branch_2, + parent_view_branch_2.u64(), + ) .await - .unwrap() .unwrap(); - // we have to get separate request message and response message when there's a fork - if fork { - let req_msg_2 = get_req_msg(round as u64, builder_state_id_2).await; - // give builder state time to fork - async_sleep(Duration::from_millis(100)).await; - - // get the builder state for parent view we've just simulated - global_state - .read_arc() - .await - .spawned_builder_states - .get(&req_msg_2.1) - .expect("Failed to get channel for matching builder") - .broadcast(req_msg_2.2.clone()) - .await - .unwrap(); - - // get response - let res_msg_2 = req_msg_2 - .0 - .recv() - .timeout(Duration::from_secs(10)) - .await - .unwrap() - .unwrap(); - - // play with transactions propsed by proposers: at the fork_round, one chain propose while the other chain does not propose any - let proposed_transactions_2 = res_msg_2.transactions.clone(); - prev_proposed_transactions_2 = Some(proposed_transactions_2); - } - - // play with transactions propsed by proposers: at the fork_round, one chain propose while the other chain does not propose any - let proposed_transactions = res_msg.transactions.clone(); - prev_proposed_transactions = Some(proposed_transactions); - // if it's the `fork_round` we'll change what we want to propse to `None` for the fork-ed chain - if let MessageType::::RequestMessage(ref request) = req_msg.2 { - let view_number = request.requested_view_number; - if view_number == ViewNumber::new(fork_round as u64) { - prev_proposed_transactions_2 = None; - } else { - prev_proposed_transactions_2 = prev_proposed_transactions.clone(); - } + if transactions_branch_2 != transactions_branch_1 { + tracing::debug!("Fork Exist.") } else { - tracing::error!("Unable to get request from RequestMessage"); + tracing::debug!("No fork."); } + let transactions_branch_2 = fork_round_behavior.process_transactions(transactions_branch_2); + + prev_proposed_transactions_branch_1 = Some(transactions_branch_1.clone()); + prev_proposed_transactions_branch_2 = Some(transaction_history_branch_2.clone()); + // save transactions to history - if prev_proposed_transactions.is_some() { - transaction_history.extend(prev_proposed_transactions.clone().unwrap()); - } - if prev_proposed_transactions_2.is_some() { - transaction_history_2.extend(prev_proposed_transactions_2.clone().unwrap()); - } + transaction_history_branch_1.extend(transactions_branch_1); + transaction_history_branch_2.extend(transactions_branch_2); } + // This will fail if no fork has occurred + assert!(transaction_history_branch_1 != transaction_history_branch_2); // the test will fail if any transaction is re-ordered - assert!(order_check(transaction_history, all_transactions.clone())); - assert!(order_check(transaction_history_2, all_transactions)); - // the test will fail if any transaction is skipped or re-ordered - // assert_eq!( - // transaction_history_2, - // all_transactions.into_iter().flatten().collect::>() - // ); + assert!(order_check( + transaction_history_branch_1, + all_transactions.clone() + )); + assert!(order_check(transaction_history_branch_2, all_transactions)); } /// This test simulates multiple builder states receiving messages from the channels and processing them @@ -430,6 +503,12 @@ async fn test_builder_order_should_fail() { const NUM_STORAGE_NODES: usize = 4; let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; + let proxy_global_state = ProxyGlobalState::new( + global_state, + Arc::new(NoOpHooks), + BLSPubKey::generated_from_seed_indexed([0; 32], 0), + Duration::from_secs(1), + ); // Transactions to send let all_transactions = (0..NUM_ROUNDS) @@ -443,6 +522,15 @@ async fn test_builder_order_should_fail() { // generate a random number between (0..NUM_ROUNDS) to do some changes for output transactions // the round we want to skip some transactions (cannot be the final round), after it is enabled the test is expected to fail let adjust_remove_round = rand::random::() % (NUM_ROUNDS - 1); + // determine_round_behavior is a helper function that takes a round number + // and returns the desired [RoundTransactionBehavior] for that round. + let determine_round_behavior = |round: usize| -> RoundTransactionBehavior { + if round == adjust_remove_round { + return RoundTransactionBehavior::AdjustRemove; + } + + RoundTransactionBehavior::NoAdjust + }; // set up state to track between simulated consensus rounds let mut prev_proposed_transactions: Option> = None; let mut prev_quorum_proposal: Option> = None; @@ -451,82 +539,58 @@ async fn test_builder_order_should_fail() { // Simulate NUM_ROUNDS of consensus. First we submit the transactions for this round to the builder, // then construct DA and Quorum Proposals based on what we received from builder in the previous round // and request a new bundle. - #[allow(clippy::needless_range_loop)] // intent is clearer this way - for round in 0..NUM_ROUNDS { - // simulate transaction being submitted to the builder - for res in handle_received_txns( - &senders.transactions, - all_transactions[round].clone(), - TransactionSource::HotShot, - ) - .await - { - res.unwrap(); - } - - // get transactions submitted in previous rounds, [] for genesis - // and simulate the block built from those - let transactions = prev_proposed_transactions.take().unwrap_or_default(); - let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions).await; + for (round, round_transactions, round_behavior) in all_transactions + .iter() + .enumerate() + .map(|(round, txns)| (round, txns, determine_round_behavior(round))) + { + // Simulate consensus deciding on the transactions that are included + // in the block. + let BuilderStateId { + parent_view, + parent_commitment, + } = { + // get transactions submitted in previous rounds, [] for genesis + // and simulate the block built from those + let transactions = prev_proposed_transactions.take().unwrap_or_default(); + let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = + calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions) + .await; + + prev_quorum_proposal = Some(quorum_proposal.clone()); + + // send quorum and DA proposals for this round + senders + .da_proposal + .broadcast(da_proposal_msg) + .await + .unwrap(); + senders + .quorum_proposal + .broadcast(quorum_proposal_msg) + .await + .unwrap(); - prev_quorum_proposal = Some(quorum_proposal.clone()); + builder_state_id + }; - // send quorum and DA proposals for this round - senders - .da_proposal - .broadcast(da_proposal_msg) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(quorum_proposal_msg) + // simulate transaction being submitted to the builder + proxy_global_state + .submit_txns(round_transactions.clone()) .await .unwrap(); - let req_msg = get_req_msg(round as u64, builder_state_id).await; - - // give builder state time to fork - async_sleep(Duration::from_millis(100)).await; - - // get the builder state for parent view we've just simulated - global_state - .read_arc() - .await - .spawned_builder_states - .get(&req_msg.1) - .expect("Failed to get channel for matching builder") - .broadcast(req_msg.2.clone()) + let Bundle { transactions, .. } = proxy_global_state + .bundle(parent_view.u64(), &parent_commitment, parent_view.u64()) .await .unwrap(); - // get response - // in the next round we will use received transactions to simulate - // the block being proposed - let res_msg = req_msg - .0 - .recv() - .timeout(Duration::from_secs(10)) - .await - .unwrap() - .unwrap(); + let transactions = round_behavior.process_transactions(transactions); + + prev_proposed_transactions = Some(transactions.clone()); - // play with transactions propsed by proposers: skip the whole round OR interspersed some txs randomly OR remove some txs randomly - if let MessageType::::RequestMessage(ref request) = req_msg.2 { - let view_number = request.requested_view_number; - let mut proposed_transactions = res_msg.transactions.clone(); - if view_number == ViewNumber::new(adjust_remove_round as u64) { - proposed_transactions.remove(rand::random::() % (NUM_TXNS_PER_ROUND - 1)); - // cannot be the last transaction - } - prev_proposed_transactions = Some(proposed_transactions); - } else { - tracing::error!("Unable to get request from RequestMessage"); - } // save transactions to history - if prev_proposed_transactions.is_some() { - transaction_history.extend(prev_proposed_transactions.clone().unwrap()); - } + transaction_history.extend(transactions); } // we should've served all transactions submitted, and in correct order // the test will fail if the common part of two vectors of transactions don't have the same order From fb1a797d6e060d0a6c05e0732d0b9e3aa6029fef Mon Sep 17 00:00:00 2001 From: Theodore Schnepper Date: Thu, 5 Sep 2024 12:57:13 -0600 Subject: [PATCH 2/9] Add asserts to verify specific fork behavior When a fork happens such that we have separate requests submitted for the builder, we are expecting the builder to be aware of these and to issue any missing transactions for a block in a subsequent block. At the moment, this is not occurring, and the assert at the bottom of the `test_builder_order_chain_fork` that ensures that the two transaction history forks don't match is backwards. Additional asserts have been added when producing the next propsoal to verify that the transaction lists are the same in the event that their previous lists were the same, or differ if their previous lists differ while not having any transactions in the fork, or if the transactions are the same when the previous proposal contained all of the missing transactions. --- crates/marketplace/src/testing/order_test.rs | 24 ++++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/crates/marketplace/src/testing/order_test.rs b/crates/marketplace/src/testing/order_test.rs index 20b350e4..bf2e6866 100644 --- a/crates/marketplace/src/testing/order_test.rs +++ b/crates/marketplace/src/testing/order_test.rs @@ -360,7 +360,7 @@ async fn test_builder_order_chain_fork() { // get transactions submitted in previous rounds, [] for genesis // and simulate the block built from those let transactions = prev_proposed_transactions_branch_1 - .take() + .clone() .unwrap_or_default(); let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = calc_proposal_msg( @@ -397,7 +397,7 @@ async fn test_builder_order_chain_fork() { // get transactions submitted in previous rounds, [] for genesis // and simulate the block built from those let transactions = prev_proposed_transactions_branch_2 - .take() + .clone() .unwrap_or_default(); let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = calc_proposal_msg( @@ -412,7 +412,7 @@ async fn test_builder_order_chain_fork() { // send quorum and DA proposals for this round // we also need to send out the message for the fork-ed chain although it's not forked yet - // to prevent builders resend the transactions we've already committeed + // to prevent builders resend the transactions we've already committed senders .da_proposal .broadcast(da_proposal_msg) @@ -457,16 +457,23 @@ async fn test_builder_order_chain_fork() { .await .unwrap(); + if prev_proposed_transactions_branch_1 == prev_proposed_transactions_branch_2 { + assert_eq!(transactions_branch_1, transactions_branch_2, "if the previous proposed transactions are the same, then the new transactions should also be the same"); + } else if prev_proposed_transactions_branch_2.map(|txs| txs.len()) == Some(0) { + assert_ne!(transactions_branch_1, transactions_branch_2, "if the previous proposed transactions differ and the previous proposed transactions is empty, then the new transactions should also differ"); + } else { + assert_eq!(transactions_branch_1, transactions_branch_2, "if the previous proposed transactions differ, then the new transactions should be the same, as they should now have been repaired"); + } + + let transactions_branch_2 = fork_round_behavior.process_transactions(transactions_branch_2); if transactions_branch_2 != transactions_branch_1 { tracing::debug!("Fork Exist.") } else { tracing::debug!("No fork."); } - let transactions_branch_2 = fork_round_behavior.process_transactions(transactions_branch_2); - prev_proposed_transactions_branch_1 = Some(transactions_branch_1.clone()); - prev_proposed_transactions_branch_2 = Some(transaction_history_branch_2.clone()); + prev_proposed_transactions_branch_2 = Some(transactions_branch_2.clone()); // save transactions to history transaction_history_branch_1.extend(transactions_branch_1); @@ -474,7 +481,10 @@ async fn test_builder_order_chain_fork() { } // This will fail if no fork has occurred - assert!(transaction_history_branch_1 != transaction_history_branch_2); + assert_eq!( + transaction_history_branch_1, transaction_history_branch_2, + "even with a fork, the transaction history branches should match" + ); // the test will fail if any transaction is re-ordered assert!(order_check( transaction_history_branch_1, From a96d8d3404896e7d7201ec280452f57629d9f1ed Mon Sep 17 00:00:00 2001 From: Theodore Schnepper Date: Mon, 9 Sep 2024 08:33:40 -0600 Subject: [PATCH 3/9] Fix comment on fork test to match assert --- crates/marketplace/src/testing/order_test.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/marketplace/src/testing/order_test.rs b/crates/marketplace/src/testing/order_test.rs index bf2e6866..99625ed8 100644 --- a/crates/marketplace/src/testing/order_test.rs +++ b/crates/marketplace/src/testing/order_test.rs @@ -480,7 +480,8 @@ async fn test_builder_order_chain_fork() { transaction_history_branch_2.extend(transactions_branch_2); } - // This will fail if no fork has occurred + // With a fork, the transaction history should match once all transactions + // have been processed. assert_eq!( transaction_history_branch_1, transaction_history_branch_2, "even with a fork, the transaction history branches should match" From 2d5ed8461f7352b7524664144dcde1bdf231b5c7 Mon Sep 17 00:00:00 2001 From: Theodore Schnepper Date: Mon, 9 Sep 2024 14:56:22 -0600 Subject: [PATCH 4/9] Fix BuilderState unecessary forking The current `BuilderState` extending behavior fails to anticipate enough of the various edge cases that determine whether the current `BuilderState` is the best, or one of the only `BuilderState`s that should be extended. Due to this behavior many more `BuilderState`s are spawned that really should not be spawned. This causes a lot of unecessary computation and memory usage that will never get cleaned up as the `Sender`s that are registered for them get overwritten by subsequent clones that have the same `BuilderStateId`. This effectively causes an async memory leak, async processing leak, and a potential fork-bomb that will effectively prevent the builder from being responsive. Additionally, the race condition that stems from multiple `BuilderState`s spawning a clone that extends them for the next `QuorumProposal` means that we often recieve an unexpected list of transactions as a result. This behavior can be best seen by the test marketplace::testing::order_test::test_builder_order_chain_fork`. To fix these issues a few changes have been maded. First a new method has been added called `spawn_clone_that_extends_self` that ensures a standard way of spawning a new `BuilderState` that extends the current `BuilderState`. Another method has been added called `am_i_the_best_builder_state_to_extend` that attempts to ensure that we are the best fit, or one of the only fits that can extend the current `BuilderState` based on the given `QuorumProposal`. Finally, as a safe guard, a check has been added when calling `spawn_clone` that ensures that the same `BuilderStateId` will not be registered more than once. IE, if there is already an async process spawned, there is no need to spawn another. Remove newly added asserts as they fail to represent the correct anticipated intermediate state --- crates/marketplace/src/builder_state.rs | 242 ++++++++++++++----- crates/marketplace/src/testing/order_test.rs | 8 - 2 files changed, 185 insertions(+), 65 deletions(-) diff --git a/crates/marketplace/src/builder_state.rs b/crates/marketplace/src/builder_state.rs index 9b1b5cb9..0e39af89 100644 --- a/crates/marketplace/src/builder_state.rs +++ b/crates/marketplace/src/builder_state.rs @@ -173,6 +173,111 @@ pub struct BuilderState { } impl BuilderState { + /// [am_i_the_best_builder_state_to_extend] is a utility method that + /// attempts to determine the best [BuilderState] to extend from, given a + /// [QuorumProposal]. + /// + /// In an ideal case the [BuilderState] whose recorded view and + /// vid_commitment that match the [QuorumProposal]'s justify_qc should be + /// the best fit. However, if there is no exact match, then the best fit + /// is the [BuilderState] with the largest view number smaller than the + /// [QuorumProposal]'s view number. + /// + /// If there are no [BuilderState]s with a view number smaller than the + /// [QuorumProposal]'s view number, then the best fit is the only + /// [BuilderState] active. + async fn am_i_the_best_builder_state_to_extend( + &self, + quorum_proposal: Arc>>, + ) -> bool { + let justify_qc = &quorum_proposal.data.justify_qc; + let parent_view_number = justify_qc.view_number; + let parent_commitment = quorum_proposal.data.block_header.payload_commitment(); + + if quorum_proposal.data.view_number == self.built_from_proposed_block.view_number { + // We are being asked to extend ourselves. This is likely a + // spawning initial condition or a test condition. + return false; + } + + if parent_view_number == self.built_from_proposed_block.view_number + && parent_commitment == self.built_from_proposed_block.vid_commitment + { + // This is us exactly, so we should be the best one to extend. + return true; + } + + let desired_builder_state_id = BuilderStateId:: { + parent_commitment, + parent_view: parent_view_number, + }; + + // Alright, we weren't the immediate best fit, let's see if there's a better fit out there. + + let global_state_read_lock = self.global_state.read_arc().await; + + if global_state_read_lock + .spawned_builder_states + .contains_key(&desired_builder_state_id) + { + // There is an exact match that isn't us, so we should not extend, + // and we should wait for that match to extend instead. + return false; + } + + // There is no exact match that we are aware of. This ultimately means + // that we do not have visibility on the previously extended + // [BuilderState]. + // + // It would be best if we could determine which was the best one to + // extend from, but there's a very real possibility that we just do + // not have any previous [BuilderState]s to extend from. This would + // mean that we should extend from the oldest [BuilderState] that we + // have locally in order to ensure that we don't drop any transactions + // that we believe may be pending with other calls that have been made. + + let leaf = Leaf::::from_quorum_proposal(&quorum_proposal.data); + if self.built_from_proposed_block.leaf_commit == leaf.commit() { + // We are the oldest [BuilderState] that we have locally, so we + // should extend from ourselves. + return true; + } + + // At this point we don't have any way to inspecting the other + // [BuilderState]'s `build_from_proposed_block` values to determine + // if another [BuilderState] will be extending from the same parent. + // So we'll do some other checks to see if we can make some safe + // assumptions. + + let maximum_stored_view_number_smaller_than_quorum_proposal = global_state_read_lock + .spawned_builder_states + .keys() + .map(|builder_state_id| *builder_state_id.parent_view) + .filter(|view_number| view_number < &quorum_proposal.data.view_number) + .max(); + + if let Some(maximum_stored_view_number_smaller_than_quorum_proposal) = + maximum_stored_view_number_smaller_than_quorum_proposal + { + // If we are the maximum stored view number smaller than the quorum + // proposal's view number, then we are the best fit. + return maximum_stored_view_number_smaller_than_quorum_proposal + == self.built_from_proposed_block.view_number.u64(); + } + + // If there are no stored view numbers smaller than the quorum proposal + // Are we the only builder state? + if global_state_read_lock.spawned_builder_states.len() == 1 { + return true; + } + + // This implies that there are only larger [BuilderState]s active than + // the one we are. This is weird, it implies that some sort of time + // travel has occurred view-wise. It is unclear what to do in this + // situation. + + true + } /// processing the DA proposal #[tracing::instrument(skip_all, name = "process da proposal", fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] @@ -215,9 +320,7 @@ impl BuilderState { self.quorum_proposal_payload_commit_to_quorum_proposal .remove(&(da_msg.builder_commitment.clone(), da_msg.view_number)); - let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity()); - self.clone_with_receiver(req_receiver) - .spawn_clone(da_msg, quorum_proposal, req_sender) + self.spawn_clone_that_extends_self(da_msg, quorum_proposal) .await; } else { tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers"); @@ -246,43 +349,6 @@ impl BuilderState { // To handle both cases, we can have the highest view number builder state running // and only doing the insertion if and only if intended builder state for a particulat view is not present // check the presence of quorum_proposal.data.view_number-1 in the spawned_builder_states list - if qc_msg.proposal.data.justify_qc.view_number != self.built_from_proposed_block.view_number - { - tracing::debug!( - "View number {:?} from justify qc does not match for builder {:?}", - qc_msg.proposal.data.justify_qc.view_number, - self.built_from_proposed_block - ); - if !self - .global_state - .read_arc() - .await - .should_view_handle_other_proposals( - &self.built_from_proposed_block.view_number, - &qc_msg.proposal.data.justify_qc.view_number, - ) - { - tracing::debug!( - "Builder {:?} is not currently bootstrapping.", - self.built_from_proposed_block - ); - // if we have the matching da proposal, we now know we don't need to keep it. - self.da_proposal_payload_commit_to_da_proposal.remove(&( - qc_msg - .proposal - .data - .block_header - .builder_commitment() - .clone(), - qc_msg.proposal.data.view_number, - )); - return; - } - tracing::debug!( - "Builder {:?} handling proposal as bootstrap.", - self.built_from_proposed_block - ); - } let quorum_proposal = &qc_msg.proposal; let view_number = quorum_proposal.data.view_number; let payload_builder_commitment = quorum_proposal.data.block_header.builder_commitment(); @@ -314,9 +380,7 @@ impl BuilderState { view_number ); - let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity()); - self.clone_with_receiver(req_receiver) - .spawn_clone(da_proposal_info, quorum_proposal.clone(), req_sender) + self.spawn_clone_that_extends_self(da_proposal_info, quorum_proposal.clone()) .await; } else { tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers"); @@ -329,6 +393,46 @@ impl BuilderState { } } + /// [spawn_a_clone_that_extends_self] is a helper function that is used by + /// both [process_da_proposal] and [process_quorum_proposal] to spawn a + /// new [BuilderState] that extends from the current [BuilderState]. + /// + /// This helper function also adds additional checks in order to ensure + /// that the [BuilderState] that is being spawned is the best fit for the + /// [QuorumProposal] that is being extended from. + async fn spawn_clone_that_extends_self( + &mut self, + da_proposal_info: Arc>, + quorum_proposal: Arc>>, + ) { + if !self + .am_i_the_best_builder_state_to_extend(quorum_proposal.clone()) + .await + { + tracing::debug!( + "{} is not the best fit for forking, {}@{}, so ignoring the QC proposal, and leaving it to another BuilderState", + self.built_from_proposed_block, + quorum_proposal.data.block_header.payload_commitment(), + quorum_proposal.data.view_number.u64(), + ); + return; + } + + let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity()); + + tracing::debug!( + "extending BuilderState with a clone from {} with new proposal {}@{}", + self.built_from_proposed_block, + quorum_proposal.data.block_header.payload_commitment(), + quorum_proposal.data.view_number.u64() + ); + + // We literally fork ourselves + self.clone_with_receiver(req_receiver) + .spawn_clone(da_proposal_info, quorum_proposal.clone(), req_sender) + .await; + } + /// processing the decide event #[tracing::instrument(skip_all, name = "process decide event", fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] @@ -371,14 +475,39 @@ impl BuilderState { quorum_proposal: Arc>>, req_sender: BroadcastSender>, ) { - self.built_from_proposed_block.view_number = quorum_proposal.data.view_number; - self.built_from_proposed_block.vid_commitment = - quorum_proposal.data.block_header.payload_commitment(); - self.built_from_proposed_block.builder_commitment = - quorum_proposal.data.block_header.builder_commitment(); let leaf = Leaf::from_quorum_proposal(&quorum_proposal.data); - self.built_from_proposed_block.leaf_commit = leaf.commit(); + // We replace our built_from_proposed_block with information from the + // quorum proposal. This is identifying the block that this specific + // instance of [BuilderState] is attempting to build for. + self.built_from_proposed_block = BuiltFromProposedBlock { + view_number: quorum_proposal.data.view_number, + vid_commitment: quorum_proposal.data.block_header.payload_commitment(), + leaf_commit: leaf.commit(), + builder_commitment: quorum_proposal.data.block_header.builder_commitment(), + }; + + let builder_state_id = BuilderStateId { + parent_commitment: self.built_from_proposed_block.vid_commitment, + parent_view: self.built_from_proposed_block.view_number, + }; + + { + // Let's ensure that we don't already have one of these BuilderStates + // running already. + + let global_state_read_lock = self.global_state.read_arc().await; + if global_state_read_lock + .spawned_builder_states + .contains_key(&builder_state_id) + { + tracing::warn!( + "Aborting spawn_clone, builder state already exists in spawned_builder_states: {:?}", + builder_state_id + ); + return; + } + } for tx in da_proposal_info.txn_commitments.iter() { self.txns_in_queue.remove(tx); @@ -389,14 +518,13 @@ impl BuilderState { self.tx_queue .retain(|tx| self.txns_in_queue.contains(&tx.commit)); - // register the spawned builder state to spawned_builder_states in the global state - self.global_state.write_arc().await.register_builder_state( - BuilderStateId { - parent_commitment: self.built_from_proposed_block.vid_commitment, - parent_view: self.built_from_proposed_block.view_number, - }, - req_sender, - ); + // register the spawned builder state to spawned_builder_states in the + // global state We register this new child within the global_state, so + // that it can be looked up via the [BuilderStateId] in the future. + self.global_state + .write_arc() + .await + .register_builder_state(builder_state_id, req_sender); self.event_loop(); } diff --git a/crates/marketplace/src/testing/order_test.rs b/crates/marketplace/src/testing/order_test.rs index 99625ed8..25a59080 100644 --- a/crates/marketplace/src/testing/order_test.rs +++ b/crates/marketplace/src/testing/order_test.rs @@ -457,14 +457,6 @@ async fn test_builder_order_chain_fork() { .await .unwrap(); - if prev_proposed_transactions_branch_1 == prev_proposed_transactions_branch_2 { - assert_eq!(transactions_branch_1, transactions_branch_2, "if the previous proposed transactions are the same, then the new transactions should also be the same"); - } else if prev_proposed_transactions_branch_2.map(|txs| txs.len()) == Some(0) { - assert_ne!(transactions_branch_1, transactions_branch_2, "if the previous proposed transactions differ and the previous proposed transactions is empty, then the new transactions should also differ"); - } else { - assert_eq!(transactions_branch_1, transactions_branch_2, "if the previous proposed transactions differ, then the new transactions should be the same, as they should now have been repaired"); - } - let transactions_branch_2 = fork_round_behavior.process_transactions(transactions_branch_2); if transactions_branch_2 != transactions_branch_1 { tracing::debug!("Fork Exist.") From 43bac2e255532c14a7f554a9a04d3174ac678af6 Mon Sep 17 00:00:00 2001 From: Theodore Schnepper Date: Tue, 10 Sep 2024 12:41:12 -0600 Subject: [PATCH 5/9] Fix `calc_proposal_msg` `QuorumProposal`'s `justify_qc`'s `view_number` The `calc_proposal_msg` is currently returning the same `view_number` for its justify_qc that it is for the entire `QuorumProposal`. Instead the `justify_qc` view should come from the passed previous proposal's view_number. --- crates/marketplace/src/testing/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/marketplace/src/testing/mod.rs b/crates/marketplace/src/testing/mod.rs index 9dc63c52..acf9ff1e 100644 --- a/crates/marketplace/src/testing/mod.rs +++ b/crates/marketplace/src/testing/mod.rs @@ -171,7 +171,7 @@ async fn calc_proposal_msg( SimpleCertificate::, SuccessThreshold>::new( quorum_data.clone(), quorum_data.commit(), - ViewNumber::new(round as u64), + prev_proposal.view_number, prev_justify_qc.signatures.clone(), PhantomData, ) From 2191e7ac254e6b874cae998d32e1601acd410d1d Mon Sep 17 00:00:00 2001 From: Theodore Schnepper Date: Tue, 10 Sep 2024 12:43:10 -0600 Subject: [PATCH 6/9] Relocate `BuiltFromProposedBlock` to `utils.rs` The `BuiltFromProposedBlock` definitions is going to be referenced in `GlobalState`. In order to prepare for this change the struct needs to be moved to a different crate in order to avoid circular dependencies. --- crates/marketplace/src/builder_state.rs | 19 +------------------ crates/marketplace/src/testing/mod.rs | 2 +- crates/marketplace/src/utils.rs | 19 ++++++++++++++++++- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/crates/marketplace/src/builder_state.rs b/crates/marketplace/src/builder_state.rs index 0e39af89..9e5f97af 100644 --- a/crates/marketplace/src/builder_state.rs +++ b/crates/marketplace/src/builder_state.rs @@ -7,14 +7,13 @@ use hotshot_types::{ EncodeBytes, }, utils::BuilderCommitment, - vid::VidCommitment, }; use committable::{Commitment, Committable}; use crate::{ service::{BroadcastReceivers, GlobalState, ReceivedTransaction}, - utils::{BlockId, BuilderStateId, RotatingSet}, + utils::{BlockId, BuilderStateId, BuiltFromProposedBlock, RotatingSet}, }; use async_broadcast::broadcast; use async_broadcast::Receiver as BroadcastReceiver; @@ -97,22 +96,6 @@ pub enum Status { ShouldContinue, } -/// Builder State to hold the state of the builder -#[derive(Debug, Clone)] -pub struct BuiltFromProposedBlock { - pub view_number: TYPES::Time, - pub vid_commitment: VidCommitment, - pub leaf_commit: Commitment>, - pub builder_commitment: BuilderCommitment, -} - -// implement display for the derived info -impl std::fmt::Display for BuiltFromProposedBlock { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "View Number: {:?}", self.view_number) - } -} - #[derive(Debug)] pub struct BuilderState { pub included_txns: RotatingSet>, diff --git a/crates/marketplace/src/testing/mod.rs b/crates/marketplace/src/testing/mod.rs index acf9ff1e..598de98f 100644 --- a/crates/marketplace/src/testing/mod.rs +++ b/crates/marketplace/src/testing/mod.rs @@ -34,8 +34,8 @@ use hotshot_example_types::{ }; use serde::{Deserialize, Serialize}; -use crate::builder_state::BuiltFromProposedBlock; use crate::service::{broadcast_channels, GlobalState}; +use crate::utils::BuiltFromProposedBlock; use async_lock::RwLock; use committable::{Commitment, CommitmentBoundsArkless, Committable}; use std::sync::Arc; diff --git a/crates/marketplace/src/utils.rs b/crates/marketplace/src/utils.rs index 29224abe..f46684ef 100644 --- a/crates/marketplace/src/utils.rs +++ b/crates/marketplace/src/utils.rs @@ -8,12 +8,13 @@ use std::{ time::{Duration, Instant}, }; +use committable::Commitment; use either::Either::{self, Left, Right}; use futures::{FutureExt, Stream, StreamExt}; use hotshot::types::Event; use hotshot_events_service::events::Error as EventStreamError; use hotshot_types::{ - traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment, + data::Leaf, traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment, }; use surf_disco::Client; use tracing::error; @@ -122,6 +123,22 @@ impl std::fmt::Display for BuilderStateId { } } +/// Builder State to hold the state of the builder +#[derive(Debug, Clone)] +pub struct BuiltFromProposedBlock { + pub view_number: TYPES::Time, + pub vid_commitment: VidCommitment, + pub leaf_commit: Commitment>, + pub builder_commitment: BuilderCommitment, +} + +// implement display for the derived info +impl std::fmt::Display for BuiltFromProposedBlock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "View Number: {:?}", self.view_number) + } +} + type EventServiceConnection = surf_disco::socket::Connection< Event, surf_disco::socket::Unsupported, From 6d39955a62e771836114c16b4983bd7d0336f095 Mon Sep 17 00:00:00 2001 From: Theodore Schnepper Date: Tue, 10 Sep 2024 13:10:08 -0600 Subject: [PATCH 7/9] Add `BuiltFromProposedBlock` to `spawned_builder_states` In order to be able to consistently pick the correct `BuilderState` to extend when we receive a `QuorumProposal`, additional information is needed when understanding the spawned `BuilderState`s. In order to add this additional information the `BuiltFromProposedBlock` struct has been added as a `Pair` to the value stored in the `spawned_builder_states`. This addition will allow for inspectors to better understand the `QuorumProposal` information that the `BuilderState` was spawned from. This field is also made an `Option` to make it easier to initialize. --- crates/marketplace/src/builder_state.rs | 9 +++--- crates/marketplace/src/service.rs | 29 ++++++++++++++------ crates/marketplace/src/testing/basic_test.rs | 1 + 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/crates/marketplace/src/builder_state.rs b/crates/marketplace/src/builder_state.rs index 9e5f97af..3ea2d2f5 100644 --- a/crates/marketplace/src/builder_state.rs +++ b/crates/marketplace/src/builder_state.rs @@ -504,10 +504,11 @@ impl BuilderState { // register the spawned builder state to spawned_builder_states in the // global state We register this new child within the global_state, so // that it can be looked up via the [BuilderStateId] in the future. - self.global_state - .write_arc() - .await - .register_builder_state(builder_state_id, req_sender); + self.global_state.write_arc().await.register_builder_state( + builder_state_id, + self.built_from_proposed_block.clone(), + req_sender, + ); self.event_loop(); } diff --git a/crates/marketplace/src/service.rs b/crates/marketplace/src/service.rs index f22303fb..b3716e29 100644 --- a/crates/marketplace/src/service.rs +++ b/crates/marketplace/src/service.rs @@ -28,7 +28,7 @@ use crate::{ BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QuorumProposalMessage, RequestMessage, ResponseMessage, TransactionSource, }, - utils::{BlockId, BuilderStateId}, + utils::{BlockId, BuilderStateId, BuiltFromProposedBlock}, }; pub use async_broadcast::{broadcast, RecvError, TryRecvError}; use async_broadcast::{InactiveReceiver, Sender as BroadcastSender, TrySendError}; @@ -104,7 +104,13 @@ pub struct GlobalState { pub blocks: lru::LruCache, BlockInfo>, // registered builder states - pub spawned_builder_states: HashMap, BroadcastSender>>, + pub spawned_builder_states: HashMap< + BuilderStateId, + ( + Option>, + BroadcastSender>, + ), + >, // builder state -> last built block , it is used to respond the client // if the req channel times out during get_available_blocks @@ -134,7 +140,7 @@ impl GlobalState { parent_commitment: bootstrapped_builder_state_id, parent_view: bootstrapped_view_num, }; - spawned_builder_states.insert(bootstrap_id.clone(), bootstrap_sender.clone()); + spawned_builder_states.insert(bootstrap_id.clone(), (None, bootstrap_sender.clone())); GlobalState { blocks: lru::LruCache::new(NonZeroUsize::new(256).unwrap()), spawned_builder_states, @@ -148,11 +154,14 @@ impl GlobalState { pub fn register_builder_state( &mut self, parent_id: BuilderStateId, + built_from_proposed_block: BuiltFromProposedBlock, request_sender: BroadcastSender>, ) { // register the builder state - self.spawned_builder_states - .insert(parent_id.clone(), request_sender); + self.spawned_builder_states.insert( + parent_id.clone(), + (Some(built_from_proposed_block), request_sender), + ); // keep track of the max view number if parent_id.parent_view > self.highest_view_num_builder_id.parent_view { @@ -219,9 +228,9 @@ impl GlobalState { &self, key: &BuilderStateId, ) -> Result<&BroadcastSender>, BuildError> { - if let Some(channel) = self.spawned_builder_states.get(key) { + if let Some(id_and_sender) = self.spawned_builder_states.get(key) { tracing::info!("Got matching builder for parent {}", key); - Ok(channel) + Ok(&id_and_sender.1) } else { tracing::warn!( "failed to recover builder for parent {}, using higest view num builder with {}", @@ -231,6 +240,7 @@ impl GlobalState { // get the sender for the highest view number builder self.spawned_builder_states .get(&self.highest_view_num_builder_id) + .map(|(_, sender)| sender) .ok_or_else(|| BuildError::Error { message: "No builder state found".to_string(), }) @@ -353,7 +363,7 @@ where return Err(BuildError::NotFound); }; - let Some(sender) = self + let Some(id_and_sender) = self .global_state .read_arc() .await @@ -399,7 +409,8 @@ where response_channel: response_sender, }; - sender + id_and_sender + .1 .broadcast(MessageType::RequestMessage(request)) .await .map_err(|err| { diff --git a/crates/marketplace/src/testing/basic_test.rs b/crates/marketplace/src/testing/basic_test.rs index e9504bb8..e55c8be2 100644 --- a/crates/marketplace/src/testing/basic_test.rs +++ b/crates/marketplace/src/testing/basic_test.rs @@ -92,6 +92,7 @@ async fn test_builder() { .spawned_builder_states .get(&req_msg.1) .expect("Failed to get channel for matching builder") + .1 .broadcast(req_msg.2.clone()) .await .unwrap(); From 3285db0f46c939d1382139fd6e9299c139548954 Mon Sep 17 00:00:00 2001 From: Theodore Schnepper Date: Tue, 10 Sep 2024 13:15:58 -0600 Subject: [PATCH 8/9] Fix best `BuilderState` algorithm With the addition of the the `BuiltForProposedBlock` to the `spawned_builder_states` we can not make better decisions about which `BlockerState` a given `QuorumProposal` can extend. As such, this splits the determiniation of the best `BuilderState`s to extend from the `am_i_the_best_builder_state_to_extend` call. Additionally the new `best_builder_states_to_extend` function is deliberately made into a function rather than a `method` in order to ensure that information contained within the current `BuilderState` that is only visible to that `BuilderState` is used. This should allow for more consistent determiniations of the proper `BuilderState` to extend. --- crates/marketplace/src/builder_state.rs | 284 ++++++++++++++++-------- 1 file changed, 189 insertions(+), 95 deletions(-) diff --git a/crates/marketplace/src/builder_state.rs b/crates/marketplace/src/builder_state.rs index 3ea2d2f5..5544f26d 100644 --- a/crates/marketplace/src/builder_state.rs +++ b/crates/marketplace/src/builder_state.rs @@ -155,112 +155,206 @@ pub struct BuilderState { pub instance_state: Arc, } -impl BuilderState { - /// [am_i_the_best_builder_state_to_extend] is a utility method that - /// attempts to determine the best [BuilderState] to extend from, given a - /// [QuorumProposal]. - /// - /// In an ideal case the [BuilderState] whose recorded view and - /// vid_commitment that match the [QuorumProposal]'s justify_qc should be - /// the best fit. However, if there is no exact match, then the best fit - /// is the [BuilderState] with the largest view number smaller than the - /// [QuorumProposal]'s view number. - /// - /// If there are no [BuilderState]s with a view number smaller than the - /// [QuorumProposal]'s view number, then the best fit is the only - /// [BuilderState] active. - async fn am_i_the_best_builder_state_to_extend( - &self, - quorum_proposal: Arc>>, - ) -> bool { - let justify_qc = &quorum_proposal.data.justify_qc; - let parent_view_number = justify_qc.view_number; - let parent_commitment = quorum_proposal.data.block_header.payload_commitment(); - - if quorum_proposal.data.view_number == self.built_from_proposed_block.view_number { - // We are being asked to extend ourselves. This is likely a - // spawning initial condition or a test condition. - return false; - } - - if parent_view_number == self.built_from_proposed_block.view_number - && parent_commitment == self.built_from_proposed_block.vid_commitment - { - // This is us exactly, so we should be the best one to extend. - return true; - } - - let desired_builder_state_id = BuilderStateId:: { - parent_commitment, - parent_view: parent_view_number, - }; - - // Alright, we weren't the immediate best fit, let's see if there's a better fit out there. +/// [best_builder_states_to_extend] is a utility function that is used to +/// in order to determine which [BuilderState]s are the best fit to extend +/// from. +/// +/// This function is designed to inspect the current state of the global state +/// in order to determine which [BuilderState]s are the best fit to extend +/// from. We only want to use information from [GlobalState] as otherwise +/// we would have some insider knowledge unique to our specific [BuilderState] +/// rather than knowledge that is available to all [BuilderState]s. In fact, +/// in order to ensure this, this function lives outside of the [BuilderState] +/// itself. +/// +/// In an ideal circumstance the best [BuilderState] to extend from is going to +/// be the one that is immediately preceding the [QuorumProposal] that we are +/// attempting to extend from. However, if all we know is the [ViewNumber] of +/// the [QuorumProposal] that we are attempting to extend from, then we may end +/// up in a scenario where we have multiple [BuilderState]s that are all equally +/// valid to extend from. When this happens, we have the potential for a data +/// race. +/// +/// The primary cause of this has to due with the interface of the +/// [ProxyGlobalState]'s API. In general, we want to be able to retrieve a +/// [BuilderState] via the [BuilderStateId]. The [BuilderStateId] only +/// references a [ViewNumber] and a [VidCommitment]. While this information is +/// available in the [QuorumProposal], it only helps us to rule out +/// [BuilderState]s that already exist. It does **NOT** help us to pick a +/// [BuilderState] that is the best fit to extend from. +/// +/// This is where the `justify_qc` comes in to consideration. The `justify_qc` +/// contains the previous [ViewNumber] that is being extended from, and in +/// addition it also contains the previous [Commitment>] that is +/// being built on top of. Since our [BuilderState]s store identifying +/// information that contains this same `leaf_commit` we can compare these +/// directly to ensure that we are extending from the correct [BuilderState]. +/// +/// This function determines the best [BuilderState] in the following steps: +/// +/// 1. If we have a [BuilderState] that is already spawned for the current +/// [QuorumProposal], then we should should return no states, as one already +/// exists. This will prevent us from attempting to spawn duplicate +/// [BuilderState]s. +/// 2. Attempt to find all [BuilderState]s that are recorded within +/// [GlobalState] that have matching view number and leaf commitments. There +/// *should* only be one of these. But all would be valid extension points. +/// 3. If we can't find any [BuilderState]s that match the view number +/// and leaf commitment, then we should return for the maximum stored view +/// number that is smaller than the current [QuorumProposal]. +/// 4. If there is is only one [BuilderState] stored in the [GlobalState], then +/// we should return that [BuilderState] as the best fit. +/// 5. If none of the other criteria match, we return an empty result as it is +/// unclear what to do in this case. +/// +/// > Note: Any time this function returns more than a single entry in its +/// > [HashSet] result, there is a potential for a race condition. This is +/// > because there are multiple [BuilderState]s that are equally valid to +/// > extend from. This race could be avoided by just picking one of the +/// > entries in the resulting [HashSet], but this is not done here in order +/// > to allow us to highlight the possibility of the race. +async fn best_builder_states_to_extend( + quorum_proposal: Arc>>, + global_state: Arc>>, +) -> HashSet> { + let current_view_number = quorum_proposal.data.view_number; + let current_commitment = quorum_proposal.data.block_header.payload_commitment(); + let current_builder_state_id = BuilderStateId:: { + parent_commitment: current_commitment, + parent_view: current_view_number, + }; + + let global_state_read_lock = global_state.read_arc().await; + + // The first step is to check if we already have a spawned [BuilderState]. + // If we do, then we should indicate that there is no best fit, as we + // don't want to spawn another [BuilderState]. + if global_state_read_lock + .spawned_builder_states + .contains_key(¤t_builder_state_id) + { + // We already have a spawned [BuilderState] for this proposal. + // So we should just ignore it. + return HashSet::new(); + } - let global_state_read_lock = self.global_state.read_arc().await; + // Next we want to see if there is an immediate match for a [BuilderState] + // that we can extend from. This is the most ideal situation, as it + // implies that we are extending from the correct [BuilderState]. + // We do this by checking the `justify_qc` stored within the + // [QuorumProposal], and checking it against the current spawned + // [BuilderState]s + let justify_qc = &quorum_proposal.data.justify_qc; + let existing_states: HashSet<_> = global_state_read_lock + .spawned_builder_states + .iter() + .filter( + |(_, (built_from_proposed_block, _))| match built_from_proposed_block { + None => false, + Some(built_from_proposed_block) => { + built_from_proposed_block.leaf_commit == justify_qc.data.leaf_commit + && built_from_proposed_block.view_number == justify_qc.view_number + } + }, + ) + .map(|(builder_state_id, _)| builder_state_id.clone()) + .collect(); + + // If we found any matching [BuilderState]s, then we should return them + // as the best fit. + if !existing_states.is_empty() { + return existing_states; + } - if global_state_read_lock - .spawned_builder_states - .contains_key(&desired_builder_state_id) + // At this point, we don't have any "ideal" matches or scenarios. So we + // need to look for a suitable fall-back. The best fallback condition to + // start with is any [BuilderState] that has the maximum spawned view + // number whose value is smaller than the current [QuorumProposal]. + let maximum_stored_view_number_smaller_than_quorum_proposal = global_state_read_lock + .spawned_builder_states + .keys() + .map(|builder_state_id| *builder_state_id.parent_view) + .filter(|view_number| view_number < ¤t_view_number) + .max(); + + // If we have a maximum view number that meets our criteria, then we should + // return all [BuilderStateId]s that match this view number. + // This can lead to multiple [BuilderStateId]s being returned. + if let Some(maximum_stored_view_number_smaller_than_quorum_proposal) = + maximum_stored_view_number_smaller_than_quorum_proposal + { + // If we are the maximum stored view number smaller than the quorum + // proposal's view number, then we are the best fit. + let mut result = HashSet::new(); + for builder_state_id in + global_state_read_lock + .spawned_builder_states + .keys() + .filter(|builder_state_id| { + builder_state_id.parent_view.u64() + == maximum_stored_view_number_smaller_than_quorum_proposal + }) { - // There is an exact match that isn't us, so we should not extend, - // and we should wait for that match to extend instead. - return false; + result.insert(builder_state_id.clone()); } + return result; + } - // There is no exact match that we are aware of. This ultimately means - // that we do not have visibility on the previously extended - // [BuilderState]. - // - // It would be best if we could determine which was the best one to - // extend from, but there's a very real possibility that we just do - // not have any previous [BuilderState]s to extend from. This would - // mean that we should extend from the oldest [BuilderState] that we - // have locally in order to ensure that we don't drop any transactions - // that we believe may be pending with other calls that have been made. - - let leaf = Leaf::::from_quorum_proposal(&quorum_proposal.data); - if self.built_from_proposed_block.leaf_commit == leaf.commit() { - // We are the oldest [BuilderState] that we have locally, so we - // should extend from ourselves. - return true; + // This is our last ditch effort to continue making progress. If there is + // only one [BuilderState] active, then we should return that as the best + // fit, as it will be the only way we can continue making progress with + // the builder. + if global_state_read_lock.spawned_builder_states.len() == 1 { + let mut result = HashSet::new(); + for builder_state_id in global_state_read_lock.spawned_builder_states.keys() { + result.insert(builder_state_id.clone()); } + return result; + } - // At this point we don't have any way to inspecting the other - // [BuilderState]'s `build_from_proposed_block` values to determine - // if another [BuilderState] will be extending from the same parent. - // So we'll do some other checks to see if we can make some safe - // assumptions. - - let maximum_stored_view_number_smaller_than_quorum_proposal = global_state_read_lock - .spawned_builder_states - .keys() - .map(|builder_state_id| *builder_state_id.parent_view) - .filter(|view_number| view_number < &quorum_proposal.data.view_number) - .max(); - - if let Some(maximum_stored_view_number_smaller_than_quorum_proposal) = - maximum_stored_view_number_smaller_than_quorum_proposal - { - // If we are the maximum stored view number smaller than the quorum - // proposal's view number, then we are the best fit. - return maximum_stored_view_number_smaller_than_quorum_proposal - == self.built_from_proposed_block.view_number.u64(); - } + // This implies that there are only larger [BuilderState]s active than + // the one we are. This is weird, it implies that some sort of time + // travel has occurred view-wise. It is unclear what to do in this + // situation. - // If there are no stored view numbers smaller than the quorum proposal - // Are we the only builder state? - if global_state_read_lock.spawned_builder_states.len() == 1 { - return true; - } + HashSet::new() +} + +impl BuilderState { + /// [am_i_the_best_builder_state_to_extend] is a utility method that + /// attempts to determine whether we are among the best [BuilderState]s to + /// extend from. + async fn am_i_the_best_builder_state_to_extend( + &self, + quorum_proposal: Arc>>, + ) -> bool { + let best_builder_states_to_extend = + best_builder_states_to_extend(quorum_proposal.clone(), self.global_state.clone()).await; - // This implies that there are only larger [BuilderState]s active than - // the one we are. This is weird, it implies that some sort of time - // travel has occurred view-wise. It is unclear what to do in this - // situation. + tracing::debug!( + "{}@{} thinks these are the best builder states to extend from: {:?} for proposal {}@{}", + self.built_from_proposed_block.vid_commitment, + self.built_from_proposed_block.view_number.u64(), + best_builder_states_to_extend + .iter() + .map(|builder_state_id| format!( + "{}@{}", + builder_state_id.parent_commitment, + builder_state_id.parent_view.u64() + )) + .collect::>(), + quorum_proposal.data.block_header.payload_commitment(), + quorum_proposal.data.view_number.u64(), + ); - true + // We are a best fit if we are contained within the returned set of + // best [BuilderState]s to extend from. + best_builder_states_to_extend.contains(&BuilderStateId { + parent_commitment: self.built_from_proposed_block.vid_commitment, + parent_view: self.built_from_proposed_block.view_number, + }) } + /// processing the DA proposal #[tracing::instrument(skip_all, name = "process da proposal", fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] From 15e7c02eb16bac0821fad33f6839852be1f79fe3 Mon Sep 17 00:00:00 2001 From: Theodore Schnepper Date: Wed, 11 Sep 2024 14:45:46 -0600 Subject: [PATCH 9/9] Add comment describing the Option value of `spawned_builder_states` Addresses feeback given by @shenkeyao: https://github.com/EspressoSystems/marketplace-builder-core/pull/114#discussion_r1755339290 --- crates/marketplace/src/service.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/marketplace/src/service.rs b/crates/marketplace/src/service.rs index b3716e29..b45d34f7 100644 --- a/crates/marketplace/src/service.rs +++ b/crates/marketplace/src/service.rs @@ -107,6 +107,13 @@ pub struct GlobalState { pub spawned_builder_states: HashMap< BuilderStateId, ( + // This is provided as an Option for convenience with initialization. + // When we build the initial state, we don't necessarily want to + // have to generate a valid BuiltFromProposedBlock object. As doing + // such would require a bit of setup. Additionally it would + // result in the call signature to `GlobalState::new` changing. + // However for every subsequent BuilderState, we expect this value + // to be populated. Option>, BroadcastSender>, ),