From 25012a1969e9bdfe89fe3f010f1dc2af100f1c2e Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Fri, 8 Nov 2024 20:57:19 +0100 Subject: [PATCH] Switch marketplace builder to coordinator --- .github/workflows/audit.yml | 13 +- Cargo.lock | 2 +- crates/marketplace/Cargo.toml | 2 +- crates/marketplace/src/builder_state.rs | 1430 ----------------- crates/marketplace/src/hooks.rs | 43 + crates/marketplace/src/lib.rs | 8 +- crates/marketplace/src/service.rs | 940 +++-------- crates/marketplace/src/testing/basic_test.rs | 103 +- crates/marketplace/src/testing/integration.rs | 123 +- crates/marketplace/src/testing/mod.rs | 181 +-- crates/marketplace/src/testing/order_test.rs | 206 ++- crates/marketplace/src/utils.rs | 13 - crates/shared/src/testing/constants.rs | 12 + 13 files changed, 532 insertions(+), 2544 deletions(-) delete mode 100644 crates/marketplace/src/builder_state.rs create mode 100644 crates/marketplace/src/hooks.rs delete mode 100644 crates/marketplace/src/utils.rs diff --git a/.github/workflows/audit.yml b/.github/workflows/audit.yml index 0b814331..15b1dac6 100644 --- a/.github/workflows/audit.yml +++ b/.github/workflows/audit.yml @@ -18,16 +18,7 @@ jobs: steps: - uses: actions/checkout@v4 - - uses: cachix/install-nix-action@v30 - - - uses: cachix/cachix-action@v15 - # If PR is from a non-collaborator (e.g. dependabot) the secrets are missing and the login to cachix fails. - continue-on-error: true - with: - name: espresso-systems-private - authToken: "${{ secrets.CACHIX_AUTH_TOKEN }}" - extraPullNames: nix-community - skipPush: ${{ github.actor == 'dependabot[bot]' }} + - uses: dtolnay/rust-toolchain@stable - name: Audit - run: nix develop .# -c cargo audit + run: cargo audit diff --git a/Cargo.lock b/Cargo.lock index 8a57aae8..47f30c51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4725,7 +4725,7 @@ dependencies = [ "async-std", "async-trait", "committable", - "derivative", + "derive_more 1.0.0", "futures", "hotshot", "hotshot-builder-api", diff --git a/crates/marketplace/Cargo.toml b/crates/marketplace/Cargo.toml index 43f16941..629c54be 100644 --- a/crates/marketplace/Cargo.toml +++ b/crates/marketplace/Cargo.toml @@ -11,7 +11,7 @@ async-lock = { workspace = true } async-std = { workspace = true, features = ["unstable", "attributes"] } async-trait = { workspace = true } committable = { workspace = true } -derivative = { workspace = true } +derive_more = { workspace = true, features = ["deref", "deref_mut"] } futures = { workspace = true } marketplace-builder-shared = { path = "../shared" } diff --git a/crates/marketplace/src/builder_state.rs b/crates/marketplace/src/builder_state.rs deleted file mode 100644 index f4424087..00000000 --- a/crates/marketplace/src/builder_state.rs +++ /dev/null @@ -1,1430 +0,0 @@ -use hotshot_types::{ - data::{Leaf, QuorumProposal}, - message::Proposal, - traits::{ - block_contents::{BlockHeader, BlockPayload}, - node_implementation::{ConsensusTime, NodeType}, - EncodeBytes, - }, - utils::BuilderCommitment, -}; -use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; -use marketplace_builder_shared::utils::RotatingSet; - -use committable::Commitment; - -use crate::{ - service::{BroadcastReceivers, GlobalState, ReceivedTransaction}, - utils::LegacyCommit as _, -}; -use async_broadcast::broadcast; -use async_broadcast::Receiver as BroadcastReceiver; -use async_broadcast::Sender as BroadcastSender; -use async_compatibility_layer::channel::UnboundedSender; -use async_compatibility_layer::{art::async_sleep, art::async_spawn}; -use async_lock::RwLock; -use core::panic; -use futures::StreamExt; - -use std::cmp::PartialEq; -use std::collections::{HashMap, HashSet}; -use std::fmt::Debug; -use std::sync::Arc; -use std::time::Instant; -use std::{collections::hash_map::Entry, time::Duration}; - -pub type TxTimeStamp = u128; - -/// Enum to hold the different sources of the transaction -#[derive(Clone, Debug, PartialEq)] -pub enum TransactionSource { - External, // txn from the external source i.e private mempool - HotShot, // txn from the HotShot network i.e public mempool -} - -/// Decide Message to be put on the decide channel -#[derive(Clone, Debug)] -pub struct DecideMessage { - pub latest_decide_view_number: Types::View, -} -/// DA Proposal Message to be put on the da proposal channel -#[derive(Debug, Clone, PartialEq)] -pub struct DaProposalMessage { - pub view_number: Types::View, - pub txn_commitments: Vec>, - pub sender: ::SignatureKey, - pub builder_commitment: BuilderCommitment, -} - -/// Quorum proposal message to be put on the quorum proposal channel -#[derive(Clone, Debug, PartialEq)] -pub struct QuorumProposalMessage { - pub proposal: Arc>>, - pub sender: Types::SignatureKey, -} -/// Request Message to be put on the request channel -#[derive(Clone, Debug)] -pub struct RequestMessage { - pub requested_view_number: Types::View, - pub response_channel: UnboundedSender>, -} -pub enum TriggerStatus { - Start, - Exit, -} - -/// Response Message to be put on the response channel -#[derive(Debug)] -pub struct BuildBlockInfo { - pub id: BlockId, - pub block_size: u64, - pub offered_fee: u64, - pub block_payload: Types::BlockPayload, - pub metadata: <::BlockPayload as BlockPayload>::Metadata, -} - -/// Response Message to be put on the response channel -#[derive(Debug, Clone)] -pub struct ResponseMessage { - pub builder_hash: BuilderCommitment, - pub transactions: Vec, - pub block_size: u64, - pub offered_fee: u64, -} -#[derive(Debug, Clone)] -/// Enum to hold the status out of the decide event -pub enum Status { - ShouldExit, - ShouldContinue, -} - -/// Builder State to hold the state of the builder -#[derive(Debug)] -pub struct BuilderState { - /// txns that have been included in recent blocks that have - /// been built. This is used to try and guarantee that a transaction - /// isn't duplicated. - /// Keeps a history of the last 3 proposals. - pub included_txns: RotatingSet>, - - /// txn commits currently in the `tx_queue`. This is used as a quick - /// check for whether a transaction is already in the `tx_queue` or - /// not. - /// - /// This should be kept up-to-date with the `tx_queue` as it acts as an - /// accessory to the `tx_queue`. - pub txn_commits_in_queue: HashSet>, - - /// filtered queue of available transactions, taken from `tx_receiver` - pub tx_queue: Vec>>, - - /// `da_proposal_payload_commit` to (`da_proposal`, `node_count`) - #[allow(clippy::type_complexity)] - pub da_proposal_payload_commit_to_da_proposal: - HashMap<(BuilderCommitment, Types::View), Arc>>, - - /// `quorum_proposal_payload_commit` to `quorum_proposal` - #[allow(clippy::type_complexity)] - pub quorum_proposal_payload_commit_to_quorum_proposal: - HashMap<(BuilderCommitment, Types::View), Arc>>>, - - /// Spawned-from references to the parent block. - pub parent_block_references: ParentBlockReferences, - - // Channel Receivers for the HotShot events, Tx_receiver could also receive the external transactions - /// decide receiver - pub decide_receiver: BroadcastReceiver>, - - /// da proposal receiver - pub da_proposal_receiver: BroadcastReceiver>, - - /// quorum proposal receiver - pub quorum_proposal_receiver: BroadcastReceiver>, - - /// channel receiver for the block requests - pub req_receiver: BroadcastReceiver>, - - /// incoming stream of transactions - pub tx_receiver: BroadcastReceiver>>, - - /// global state handle, defined in the service.rs - pub global_state: Arc>>, - - /// locally spawned builder Commitements - pub builder_commitments: HashSet<(BuilderStateId, BuilderCommitment)>, - - /// timeout for maximising the txns in the block - pub maximize_txn_capture_timeout: Duration, - - /// constant fee that the builder will offer per byte of data sequenced - pub base_fee: u64, - - /// validated state that is required for a proposal to be considered valid. Needed for the - /// purposes of building a valid block payload within the sequencer. - pub validated_state: Arc, - - /// instance state to enfoce `max_block_size` - pub instance_state: Arc, -} - -/// [`best_builder_states_to_extend`] is a utility function that is used to -/// in order to determine which [`BuilderState`]s are the best fit to extend -/// from. -/// -/// This function is designed to inspect the current state of the global state -/// in order to determine which [`BuilderState`]s are the best fit to extend -/// from. We only want to use information from [`GlobalState`] as otherwise -/// we would have some insider knowledge unique to our specific [`BuilderState`] -/// rather than knowledge that is available to all [`BuilderState`]s. In fact, -/// in order to ensure this, this function lives outside of the [`BuilderState`] -/// itself. -/// -/// In an ideal circumstance the best [`BuilderState`] to extend from is going to -/// be the one that is immediately preceding the [`QuorumProposal`] that we are -/// attempting to extend from. However, if all we know is the view number of -/// the [`QuorumProposal`] that we are attempting to extend from, then we may end -/// up in a scenario where we have multiple [`BuilderState`]s that are all equally -/// valid to extend from. When this happens, we have the potential for a data -/// race. -/// -/// The primary cause of this has to due with the interface of the -/// [`ProxyGlobalState`](crate::service::ProxyGlobalState)'s API. -/// In general, we want to be able to retrieve a -/// [`BuilderState`] via the [`BuilderStateId`]. The [`BuilderStateId`] only -/// references a [`ViewNumber`](hotshot_types::data::ViewNumber) and a -/// [`VidCommitment`](hotshot_types::vid::VidCommitment). While this information -/// is available in the [`QuorumProposal`], it only helps us to rule out -/// [`BuilderState`]s that already exist. It does **NOT** help us to pick a -/// [`BuilderState`] that is the best fit to extend from. -/// -/// This is where the `justify_qc` comes in to consideration. The `justify_qc` -/// contains the previous [`ViewNumber`](hotshot_types::data::ViewNumber) that is being -/// extended from, and in addition it also contains the previous [`Commitment>`] -/// that is being built on top of. Since our [`BuilderState`]s store identifying -/// information that contains this same `leaf_commit` we can compare these -/// directly to ensure that we are extending from the correct [`BuilderState`]. -/// -/// This function determines the best [`BuilderState`] in the following steps: -/// -/// 1. If we have a [`BuilderState`] that is already spawned for the current -/// [`QuorumProposal`], then we should should return no states, as one already -/// exists. This will prevent us from attempting to spawn duplicate -/// [`BuilderState`]s. -/// 2. Attempt to find all [`BuilderState`]s that are recorded within -/// [`GlobalState`] that have matching view number and leaf commitments. There -/// *should* only be one of these. But all would be valid extension points. -/// 3. If we can't find any [`BuilderState`]s that match the view number -/// and leaf commitment, then we should return for the maximum stored view -/// number that is smaller than the current [`QuorumProposal`]. -/// 4. If there is is only one [`BuilderState`] stored in the [`GlobalState`], then -/// we should return that [`BuilderState`] as the best fit. -/// 5. If none of the other criteria match, we return an empty result as it is -/// unclear what to do in this case. -/// -/// > Note: Any time this function returns more than a single entry in its -/// > [HashSet] result, there is a potential for a race condition. This is -/// > because there are multiple [BuilderState]s that are equally valid to -/// > extend from. This race could be avoided by just picking one of the -/// > entries in the resulting [HashSet], but this is not done here in order -/// > to allow us to highlight the possibility of the race. -async fn best_builder_states_to_extend( - quorum_proposal: Arc>>, - global_state: Arc>>, -) -> HashSet> { - let current_view_number = quorum_proposal.data.view_number; - let current_commitment = quorum_proposal.data.block_header.payload_commitment(); - let current_builder_state_id = BuilderStateId:: { - parent_commitment: current_commitment, - parent_view: current_view_number, - }; - - let global_state_read_lock = global_state.read_arc().await; - - // The first step is to check if we already have a spawned [BuilderState]. - // If we do, then we should indicate that there is no best fit, as we - // don't want to spawn another [BuilderState]. - if global_state_read_lock - .spawned_builder_states - .contains_key(¤t_builder_state_id) - { - // We already have a spawned [BuilderState] for this proposal. - // So we should just ignore it. - return HashSet::new(); - } - - // Next we want to see if there is an immediate match for a [BuilderState] - // that we can extend from. This is the most ideal situation, as it - // implies that we are extending from the correct [BuilderState]. - // We do this by checking the `justify_qc` stored within the - // [QuorumProposal], and checking it against the current spawned - // [BuilderState]s - let justify_qc = &quorum_proposal.data.justify_qc; - let existing_states: HashSet<_> = global_state_read_lock - .spawned_builder_states - .iter() - .filter(|(builder_state_id, (leaf_commit, _))| match leaf_commit { - None => false, - Some(leaf_commit) => { - *leaf_commit == justify_qc.data.leaf_commit - && builder_state_id.parent_view == justify_qc.view_number - } - }) - .map(|(builder_state_id, _)| builder_state_id.clone()) - .collect(); - - // If we found any matching [BuilderState]s, then we should return them - // as the best fit. - if !existing_states.is_empty() { - return existing_states; - } - - // At this point, we don't have any "ideal" matches or scenarios. So we - // need to look for a suitable fall-back. The best fallback condition to - // start with is any [BuilderState] that has the maximum spawned view - // number whose value is smaller than the current [QuorumProposal]. - let maximum_stored_view_number_smaller_than_quorum_proposal = global_state_read_lock - .spawned_builder_states - .keys() - .map(|builder_state_id| *builder_state_id.parent_view) - .filter(|view_number| view_number < ¤t_view_number) - .max(); - - // If we have a maximum view number that meets our criteria, then we should - // return all [BuilderStateId]s that match this view number. - // This can lead to multiple [BuilderStateId]s being returned. - if let Some(maximum_stored_view_number_smaller_than_quorum_proposal) = - maximum_stored_view_number_smaller_than_quorum_proposal - { - // If we are the maximum stored view number smaller than the quorum - // proposal's view number, then we are the best fit. - let mut result = HashSet::new(); - for builder_state_id in - global_state_read_lock - .spawned_builder_states - .keys() - .filter(|builder_state_id| { - builder_state_id.parent_view.u64() - == maximum_stored_view_number_smaller_than_quorum_proposal - }) - { - result.insert(builder_state_id.clone()); - } - return result; - } - - // This is our last ditch effort to continue making progress. If there is - // only one [BuilderState] active, then we should return that as the best - // fit, as it will be the only way we can continue making progress with - // the builder. - if global_state_read_lock.spawned_builder_states.len() == 1 { - let mut result = HashSet::new(); - for builder_state_id in global_state_read_lock.spawned_builder_states.keys() { - result.insert(builder_state_id.clone()); - } - return result; - } - - // This implies that there are only larger [BuilderState]s active than - // the one we are. This is weird, it implies that some sort of time - // travel has occurred view-wise. It is unclear what to do in this - // situation. - - HashSet::new() -} - -impl BuilderState { - /// Utility method that attempts to determine whether we are among - /// the best [`BuilderState`]s to extend from. - async fn am_i_the_best_builder_state_to_extend( - &self, - quorum_proposal: Arc>>, - ) -> bool { - let best_builder_states_to_extend = - best_builder_states_to_extend(quorum_proposal.clone(), self.global_state.clone()).await; - - tracing::debug!( - "{}@{} thinks these are the best builder states to extend from: {:?} for proposal {}@{}", - self.parent_block_references.vid_commitment, - self.parent_block_references.view_number.u64(), - best_builder_states_to_extend - .iter() - .map(|builder_state_id| format!( - "{}@{}", - builder_state_id.parent_commitment, - builder_state_id.parent_view.u64() - )) - .collect::>(), - quorum_proposal.data.block_header.payload_commitment(), - quorum_proposal.data.view_number.u64(), - ); - - // We are a best fit if we are contained within the returned set of - // best [BuilderState]s to extend from. - best_builder_states_to_extend.contains(&BuilderStateId { - parent_commitment: self.parent_block_references.vid_commitment, - parent_view: self.parent_block_references.view_number, - }) - } - - /// This method is used to handle incoming DA proposal messages - /// from an incoming HotShot [Event](hotshot_types::event::Event). A DA Proposal is - /// a proposal that is meant to be voted on by consensus nodes in order to - /// determine which transactions should be included for this view. - /// - /// A DA Proposal in conjunction with a Quorum Proposal is an indicator - /// that a new Block / Leaf is being proposed for the HotShot network. So - /// we need to be able to propose new Bundles on top of these proposals. - /// - /// In order to do so we must first wait until we have both a DA Proposal - /// and a Quorum Proposal. If we do not, then we can just record the - /// proposal we have and wait for the other to arrive. - /// - /// If we do have a matching Quorum Proposal, then we can proceed to make - /// a decision about extending the current [BuilderState] via - /// [BuilderState::spawn_clone_that_extends_self]. - /// - /// > Note: In the case of `process_da_proposal` if we don't have a corresponding - /// > Quorum Proposal, then we will have to wait for `process_quorum_proposal` - /// > to be called with the matching Quorum Proposal. Until that point we - /// > exit knowing we have fulfilled the DA proposal portion. - #[tracing::instrument(skip_all, name = "process da proposal", - fields(builder_parent_block_references = %self.parent_block_references))] - async fn process_da_proposal(&mut self, da_msg: Arc>) { - tracing::debug!( - "Builder Received DA message for view {:?}", - da_msg.view_number - ); - - // we do not have the option to ignore DA proposals if we want to be able to handle failed view reorgs. - - // If the respective builder state exists to handle the request - tracing::debug!( - "Extracted builder commitment from the da proposal: {:?}", - da_msg.builder_commitment - ); - - let Entry::Vacant(e) = self - .da_proposal_payload_commit_to_da_proposal - .entry((da_msg.builder_commitment.clone(), da_msg.view_number)) - else { - tracing::debug!("Payload commitment already exists in the da_proposal_payload_commit_to_da_proposal hashmap, so ignoring it"); - return; - }; - - // if we have matching da and quorum proposals, we can skip storing the one, and remove - // the other from storage, and call build_block with both, to save a little space. - let Entry::Occupied(quorum_proposal) = self - .quorum_proposal_payload_commit_to_quorum_proposal - .entry((da_msg.builder_commitment.clone(), da_msg.view_number)) - else { - e.insert(da_msg); - return; - }; - - let quorum_proposal = quorum_proposal.remove(); - - if quorum_proposal.data.view_number != da_msg.view_number { - tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers"); - return; - } - - self.spawn_clone_that_extends_self(da_msg, quorum_proposal.clone()) - .await; - } - - /// This method is used to handle incoming Quorum Proposal messages - /// from an incoming HotShot [Event](hotshot_types::event::Event). A Quorum - /// Proposal is a proposal that indicates the next potential Block of the - /// chain is being proposed for the HotShot network. This proposal is - /// voted on by the consensus nodes in order to determine if whether this - /// will be the next Block of the chain or not. - /// - /// A Quorum Proposal in conjunction with a DA Proposal is an indicator - /// that a new Block / Leaf is being proposed for the HotShot network. So - /// we need to be able to propose new Bundles on top of these proposals. - /// - /// In order to do so we must first wait until we have both a DA Proposal - /// and a Quorum Proposal. If we do not, then we can just record the - /// proposal we have and wait for the other to arrive. - /// - /// If we do have a matching DA Proposal, then we can proceed to make - /// a decision about extending the current [BuilderState] via - /// [BuilderState::spawn_clone_that_extends_self]. - /// - /// > Note: In the case of `process_quorum_proposal` if we don't have a - /// > corresponding DA Proposal, then we will have to wait for - /// > `process_da_proposal` to be called with the matching DA Proposal. - /// > Until that point we exit knowing we have fulfilled the Quorum proposal - /// > portion. - //#[tracing::instrument(skip_all, name = "Process Quorum Proposal")] - #[tracing::instrument(skip_all, name = "process quorum proposal", - fields(builder_parent_block_references = %self.parent_block_references))] - async fn process_quorum_proposal(&mut self, quorum_msg: QuorumProposalMessage) { - tracing::debug!( - "Builder Received Quorum proposal message for view {:?}", - quorum_msg.proposal.data.view_number - ); - - // Two cases to handle: - // Case 1: Bootstrapping phase - // Case 2: No intended builder state exist - let quorum_proposal = &quorum_msg.proposal; - let view_number = quorum_proposal.data.view_number; - let payload_builder_commitment = quorum_proposal.data.block_header.builder_commitment(); - - tracing::debug!( - "Extracted payload builder commitment from the quorum proposal: {:?}", - payload_builder_commitment - ); - - // first check whether vid_commitment exists in the - // quorum_proposal_payload_commit_to_quorum_proposal hashmap, if yes, ignore it, otherwise - // validate it and later insert in - - let Entry::Vacant(e) = self - .quorum_proposal_payload_commit_to_quorum_proposal - .entry((payload_builder_commitment.clone(), view_number)) - else { - tracing::debug!("Payload commitment already exists in the quorum_proposal_payload_commit_to_quorum_proposal hashmap, so ignoring it"); - return; - }; - - // if we have matching da and quorum proposals, we can skip storing - // the one, and remove the other from storage, and call build_block - // with both, to save a little space. - let Entry::Occupied(da_proposal) = self - .da_proposal_payload_commit_to_da_proposal - .entry((payload_builder_commitment.clone(), view_number)) - else { - e.insert(quorum_proposal.clone()); - return; - }; - - let da_proposal_info = da_proposal.remove(); - // remove the entry from the da_proposal_payload_commit_to_da_proposal hashmap - self.da_proposal_payload_commit_to_da_proposal - .remove(&(payload_builder_commitment.clone(), view_number)); - - // also make sure we clone for the same view number - // (check incase payload commitments are same) - if da_proposal_info.view_number != view_number { - tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers"); - } - - self.spawn_clone_that_extends_self(da_proposal_info, quorum_proposal.clone()) - .await; - } - - /// A helper function that is used by both [`process_da_proposal`](Self::process_da_proposal) - /// and [`process_quorum_proposal`](Self::process_quorum_proposal) to spawn a new [`BuilderState`] - /// that extends from the current [`BuilderState`]. - /// - /// This helper function also adds additional checks in order to ensure - /// that the [`BuilderState`] that is being spawned is the best fit for the - /// [`QuorumProposal`] that is being extended from. - async fn spawn_clone_that_extends_self( - &mut self, - da_proposal_info: Arc>, - quorum_proposal: Arc>>, - ) { - if !self - .am_i_the_best_builder_state_to_extend(quorum_proposal.clone()) - .await - { - tracing::debug!( - "{} is not the best fit for forking, {}@{}, so ignoring the Quorum proposal, and leaving it to another BuilderState", - self.parent_block_references, - quorum_proposal.data.block_header.payload_commitment(), - quorum_proposal.data.view_number.u64(), - ); - return; - } - - let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity()); - - tracing::debug!( - "extending BuilderState with a clone from {} with new proposal {}@{}", - self.parent_block_references, - quorum_proposal.data.block_header.payload_commitment(), - quorum_proposal.data.view_number.u64() - ); - - // We literally fork ourselves - self.clone_with_receiver(req_receiver) - .spawn_clone(da_proposal_info, quorum_proposal.clone(), req_sender) - .await; - } - - /// processing the decide event - #[tracing::instrument(skip_all, name = "process decide event", - fields(builder_parent_block_references = %self.parent_block_references))] - async fn process_decide_event(&mut self, decide_msg: DecideMessage) -> Option { - // Exit out all the builder states if their parent_block_references.view_number is less than the latest_decide_view_number - // The only exception is that we want to keep the highest view number builder state active to ensure that - // we have a builder state to handle the incoming DA and Quorum proposals - let decide_view_number = decide_msg.latest_decide_view_number; - - let retained_view_cutoff = self - .global_state - .write_arc() - .await - .remove_handles(decide_view_number); - if self.parent_block_references.view_number < retained_view_cutoff { - tracing::info!( - "Decide@{:?}; Task@{:?} exiting; views < {:?} being reclaimed", - decide_view_number.u64(), - self.parent_block_references.view_number.u64(), - retained_view_cutoff.u64(), - ); - return Some(Status::ShouldExit); - } - - tracing::info!( - "Decide@{:?}; Task@{:?} not exiting; views >= {:?} being retained", - decide_view_number.u64(), - self.parent_block_references.view_number.u64(), - retained_view_cutoff.u64(), - ); - - Some(Status::ShouldContinue) - } - - /// spawn a clone of the builder state - #[tracing::instrument(skip_all, name = "spawn_clone", - fields(builder_parent_block_references = %self.parent_block_references))] - async fn spawn_clone( - mut self, - da_proposal_info: Arc>, - quorum_proposal: Arc>>, - req_sender: BroadcastSender>, - ) { - let leaf = Leaf::from_quorum_proposal(&quorum_proposal.data); - - // We replace our parent_block_references with information from the - // quorum proposal. This is identifying the block that this specific - // instance of [BuilderState] is attempting to build for. - self.parent_block_references = ParentBlockReferences { - view_number: quorum_proposal.data.view_number, - vid_commitment: quorum_proposal.data.block_header.payload_commitment(), - leaf_commit: leaf.legacy_commit(), - builder_commitment: quorum_proposal.data.block_header.builder_commitment(), - }; - - let builder_state_id = BuilderStateId { - parent_commitment: self.parent_block_references.vid_commitment, - parent_view: self.parent_block_references.view_number, - }; - - { - // Let's ensure that we don't already have one of these BuilderStates - // running already. - - let global_state_read_lock = self.global_state.read_arc().await; - if global_state_read_lock - .spawned_builder_states - .contains_key(&builder_state_id) - { - tracing::warn!( - "Aborting spawn_clone, builder state already exists in spawned_builder_states: {:?}", - builder_state_id - ); - return; - } - } - - for tx in da_proposal_info.txn_commitments.iter() { - self.txn_commits_in_queue.remove(tx); - } - - // We add the included transactions to the included_txns set, so we can - // also filter them should they be included in a future transaction - // submission. - self.included_txns - .extend(da_proposal_info.txn_commitments.iter().cloned()); - - // We wish to keep only the transactions in the tx_queue to those that - // also exist in the txns_in_queue set. - self.tx_queue - .retain(|tx| self.txn_commits_in_queue.contains(&tx.commit)); - - // register the spawned builder state to spawned_builder_states in the - // global state We register this new child within the global_state, so - // that it can be looked up via the [BuilderStateId] in the future. - self.global_state.write_arc().await.register_builder_state( - builder_state_id, - self.parent_block_references.clone(), - req_sender, - ); - - self.event_loop(); - } - - /// A method that will return a [BuildBlockInfo] if it is - /// able to build a block. If it encounters an error building a block, then - /// it will return None. - /// - /// This first starts by collecting transactions to include in the block. It - /// will wait until it has at least one transaction to include in the block, - /// or up to the configured `maximize_txn_capture_timeout` duration elapses. - /// At which point it will attempt to build a block with the transactions it - /// has collected. - /// - /// Upon successfully building a block, a commitment for the [BuilderStateId] - /// and Block payload pair are stored, and a [BuildBlockInfo] is created - /// and returned. - #[tracing::instrument(skip_all, name = "build block", - fields(builder_parent_block_references = %self.parent_block_references))] - async fn build_block( - &mut self, - state_id: BuilderStateId, - ) -> Option> { - // collect all the transactions from the near future - let timeout_after = Instant::now() + self.maximize_txn_capture_timeout; - let sleep_interval = self.maximize_txn_capture_timeout / 10; - while Instant::now() <= timeout_after { - self.collect_txns(timeout_after).await; - - if !self.tx_queue.is_empty() // we have transactions - || Instant::now() + sleep_interval > timeout_after - // we don't have time for another iteration - { - break; - } - - async_sleep(sleep_interval).await - } - - let Ok((payload, metadata)) = - >::from_transactions( - self.tx_queue.iter().map(|tx| tx.tx.clone()), - &self.validated_state, - &self.instance_state, - ) - .await - else { - tracing::warn!("Failed to build block payload"); - return None; - }; - - let builder_hash = payload.builder_commitment(&metadata); - // count the number of txns - let txn_count = payload.num_transactions(&metadata); - - // insert the recently built block into the builder commitments - self.builder_commitments - .insert((state_id, builder_hash.clone())); - - let encoded_txns: Vec = payload.encode().to_vec(); - let block_size: u64 = encoded_txns.len() as u64; - let offered_fee: u64 = self.base_fee * block_size; - - tracing::info!( - "Builder view num {:?}, building block with {:?} txns, with builder hash {:?}", - self.parent_block_references.view_number, - txn_count, - builder_hash - ); - - Some(BuildBlockInfo { - id: BlockId { - view: self.parent_block_references.view_number, - hash: builder_hash, - }, - block_size, - offered_fee, - block_payload: payload, - metadata, - }) - } - - /// A method that is used to handle incoming - /// [`RequestMessage`]s. These [`RequestMessage`]s are looking for a bundle - /// of transactions to be included in the next block. Instead of returning - /// a value, this method's response will be provided to the [`UnboundedSender`] that - /// is included in the [`RequestMessage`]. - /// - /// At this point this particular [`BuilderState`] has already been deemed - /// as the [`BuilderState`] that should handle this request, and it is up - /// to this [`BuilderState`] to provide the response, if it is able to do - /// so. - /// - /// The response will be a [`ResponseMessage`] that contains the transactions - /// the `Builder` wants to include in the next block in addition to the - /// expected block size, offered fee, and the - /// Builder's commit block of the data being returned. - async fn process_block_request(&mut self, req: RequestMessage) { - let requested_view_number = req.requested_view_number; - // If a spawned clone is active then it will handle the request, otherwise the highest view num builder will handle - if requested_view_number != self.parent_block_references.view_number { - tracing::debug!( - "Builder {:?} Requested view number does not match the built_from_view, so ignoring it", - self.parent_block_references.view_number - ); - return; - } - - tracing::info!( - "Request handled by builder with view {}@{:?} for (view_num: {:?})", - self.parent_block_references.vid_commitment, - self.parent_block_references.view_number, - requested_view_number - ); - - let response = self - .build_block(BuilderStateId { - parent_commitment: self.parent_block_references.vid_commitment, - parent_view: requested_view_number, - }) - .await; - - let Some(response) = response else { - tracing::debug!("No response to send"); - return; - }; - - // form the response message - let response_msg = ResponseMessage { - builder_hash: response.id.hash.clone(), - block_size: response.block_size, - offered_fee: response.offered_fee, - transactions: response - .block_payload - .transactions(&response.metadata) - .collect(), - }; - - let builder_hash = response.id.hash.clone(); - self.global_state.write_arc().await.update_global_state( - BuilderStateId { - parent_commitment: self.parent_block_references.vid_commitment, - parent_view: requested_view_number, - }, - response, - response_msg.clone(), - ); - - // ... and finally, send the response - if let Err(e) = req.response_channel.send(response_msg).await { - tracing::warn!( - "Builder {:?} failed to send response to {:?} with builder hash {:?}, Err: {:?}", - self.parent_block_references.view_number, - req, - builder_hash, - e - ); - return; - } - - tracing::info!( - "Builder {:?} Sent response to the request{:?} with builder hash {:?}", - self.parent_block_references.view_number, - req, - builder_hash - ); - } - - // MARK: event loop processing for [BuilderState] - - /// Helper function used to handle incoming [`MessageType`]s, - /// specifically [`RequestMessage`]s, that are received by the - /// [`BuilderState::req_receiver`] channel. - /// - /// This method is used to process block requests. - async fn event_loop_helper_handle_request(&mut self, req: Option>) { - tracing::debug!( - "Received request msg in builder {:?}: {:?}", - self.parent_block_references.view_number, - req - ); - - let Some(req) = req else { - tracing::warn!("No more request messages to consume"); - return; - }; - - let MessageType::RequestMessage(req) = req else { - tracing::warn!("Unexpected message on requests channel: {:?}", req); - return; - }; - - tracing::debug!( - "Received request msg in builder {:?}: {:?}", - self.parent_block_references.view_number, - req - ); - - self.process_block_request(req).await; - } - - /// Helper function that is used to handle incoming [`MessageType`]s, - /// specifically [`DaProposalMessage`]s,that are received by the [`BuilderState::da_proposal_receiver`] channel. - async fn event_loop_helper_handle_da_proposal(&mut self, da: Option>) { - let Some(da) = da else { - tracing::warn!("No more da proposal messages to consume"); - return; - }; - - let MessageType::DaProposalMessage(rda_msg) = da else { - tracing::warn!("Unexpected message on da proposals channel: {:?}", da); - return; - }; - - tracing::debug!( - "Received da proposal msg in builder {:?}:\n {:?}", - self.parent_block_references, - rda_msg.view_number - ); - - self.process_da_proposal(rda_msg).await; - } - - /// Helper function that is used to handle incoming [`MessageType`]s, - /// specifically [`QuorumProposalMessage`]s, that are received by the [`BuilderState::quorum_proposal_receiver`] channel. - async fn event_loop_helper_handle_quorum_proposal( - &mut self, - quorum: Option>, - ) { - let Some(quorum) = quorum else { - tracing::warn!("No more quorum proposal messages to consume"); - return; - }; - - let MessageType::QuorumProposalMessage(quorum_proposal_message) = quorum else { - tracing::warn!( - "Unexpected message on quorum proposals channel: {:?}", - quorum - ); - return; - }; - - tracing::debug!( - "Received quorum proposal msg in builder {:?}:\n {:?} for view ", - self.parent_block_references, - quorum_proposal_message.proposal.data.view_number - ); - - self.process_quorum_proposal(quorum_proposal_message).await; - } - - /// Helper function that is used to handle incoming [`MessageType`]s, - /// specifically [`DecideMessage`]s, that are received by the [`BuilderState::decide_receiver`] channel. - /// - /// This method can trigger the exit of the [`BuilderState::event_loop`] async - /// task via the returned [`std::ops::ControlFlow`] type. If the returned - /// value is a [`std::ops::ControlFlow::Break`], then the - /// [`BuilderState::event_loop`] - /// async task should exit. - async fn event_loop_helper_handle_decide( - &mut self, - decide: Option>, - ) -> std::ops::ControlFlow<()> { - let Some(decide) = decide else { - tracing::warn!("No more decide messages to consume"); - return std::ops::ControlFlow::Continue(()); - }; - - let MessageType::DecideMessage(rdecide_msg) = decide else { - tracing::warn!("Unexpected message on decide channel: {:?}", decide); - return std::ops::ControlFlow::Continue(()); - }; - - let latest_decide_view_num = rdecide_msg.latest_decide_view_number; - tracing::debug!( - "Received decide msg view {:?} in builder {:?}", - &latest_decide_view_num, - self.parent_block_references - ); - let decide_status = self.process_decide_event(rdecide_msg).await; - - let Some(decide_status) = decide_status else { - tracing::warn!( - "decide_status was None; Continuing builder {:?}", - self.parent_block_references - ); - return std::ops::ControlFlow::Continue(()); - }; - - match decide_status { - Status::ShouldContinue => { - tracing::debug!("Continuing builder {:?}", self.parent_block_references); - std::ops::ControlFlow::Continue(()) - } - _ => { - tracing::info!( - "Exiting builder {:?} with decide view {:?}", - self.parent_block_references, - &latest_decide_view_num - ); - std::ops::ControlFlow::Break(()) - } - } - } - - /// spawns an async task that attempts to handle messages being received - /// across the [BuilderState]s various channels. - /// - /// This async task will continue to run until it receives a message that - /// indicates that it should exit. This exit message is sent via the - /// [DecideMessage] channel. - /// - /// The main body of the loop listens to four channels at once, and when - /// a message is received it will process the message with the appropriate - /// handler accordingly. - /// - /// > Note: There is potential for improvement in typing here, as each of - /// > these receivers returns the exact same type despite being separate - /// > Channels. These channels may want to convey separate types so that - /// > the contained message can pertain to its specific channel - /// > accordingly. - #[tracing::instrument(skip_all, name = "event loop", - fields(builder_parent_block_references = %self.parent_block_references))] - pub fn event_loop(mut self) { - let _builder_handle = async_spawn(async move { - loop { - tracing::debug!( - "Builder {:?} event loop", - self.parent_block_references.view_number - ); - - futures::select! { - req = self.req_receiver.next() => self.event_loop_helper_handle_request(req).await, - da = self.da_proposal_receiver.next() => self.event_loop_helper_handle_da_proposal(da).await, - quorum = self.quorum_proposal_receiver.next() => self.event_loop_helper_handle_quorum_proposal(quorum).await, - decide = self.decide_receiver.next() => if let std::ops::ControlFlow::Break(_) = self.event_loop_helper_handle_decide(decide).await { return; }, - }; - } - }); - } -} -/// Unifies the possible messages that can be received by the builder -#[derive(Debug, Clone)] -pub enum MessageType { - DecideMessage(DecideMessage), - DaProposalMessage(Arc>), - QuorumProposalMessage(QuorumProposalMessage), - RequestMessage(RequestMessage), -} - -#[allow(clippy::too_many_arguments)] -impl BuilderState { - pub fn new( - parent_block_references: ParentBlockReferences, - receivers: &BroadcastReceivers, - req_receiver: BroadcastReceiver>, - tx_queue: Vec>>, - global_state: Arc>>, - maximize_txn_capture_timeout: Duration, - base_fee: u64, - instance_state: Arc, - txn_garbage_collect_duration: Duration, - validated_state: Arc, - ) -> Self { - let txns_in_queue: HashSet<_> = tx_queue.iter().map(|tx| tx.commit).collect(); - BuilderState { - txn_commits_in_queue: txns_in_queue, - parent_block_references, - req_receiver, - tx_queue, - global_state, - maximize_txn_capture_timeout, - base_fee, - instance_state, - validated_state, - included_txns: RotatingSet::new(txn_garbage_collect_duration), - da_proposal_payload_commit_to_da_proposal: HashMap::new(), - quorum_proposal_payload_commit_to_quorum_proposal: HashMap::new(), - builder_commitments: HashSet::new(), - decide_receiver: receivers.decide.activate_cloned(), - da_proposal_receiver: receivers.da_proposal.activate_cloned(), - quorum_proposal_receiver: receivers.quorum_proposal.activate_cloned(), - tx_receiver: receivers.transactions.activate_cloned(), - } - } - pub fn clone_with_receiver(&self, req_receiver: BroadcastReceiver>) -> Self { - let mut included_txns = self.included_txns.clone(); - included_txns.rotate(); - - BuilderState { - included_txns, - txn_commits_in_queue: self.txn_commits_in_queue.clone(), - parent_block_references: self.parent_block_references.clone(), - decide_receiver: self.decide_receiver.clone(), - da_proposal_receiver: self.da_proposal_receiver.clone(), - quorum_proposal_receiver: self.quorum_proposal_receiver.clone(), - req_receiver, - da_proposal_payload_commit_to_da_proposal: HashMap::new(), - quorum_proposal_payload_commit_to_quorum_proposal: HashMap::new(), - tx_receiver: self.tx_receiver.clone(), - tx_queue: self.tx_queue.clone(), - global_state: self.global_state.clone(), - builder_commitments: self.builder_commitments.clone(), - maximize_txn_capture_timeout: self.maximize_txn_capture_timeout, - base_fee: self.base_fee, - instance_state: self.instance_state.clone(), - validated_state: self.validated_state.clone(), - } - } - - // collect outstanding transactions - async fn collect_txns(&mut self, timeout_after: Instant) { - while Instant::now() <= timeout_after { - match self.tx_receiver.try_recv() { - Ok(tx) => { - if self.included_txns.contains(&tx.commit) { - // We've included this transaction in one of our - // recent blocks, and we do not wish to include it - // again. - continue; - } - - if self.txn_commits_in_queue.contains(&tx.commit) { - // We already have this transaction in our current - // queue, so we do not want to include it again - continue; - } - - self.txn_commits_in_queue.insert(tx.commit); - self.tx_queue.push(tx); - } - - Err(async_broadcast::TryRecvError::Empty) - | Err(async_broadcast::TryRecvError::Closed) => { - // The transaction receiver is empty, or it's been closed. - // If it's closed that's a big problem and we should - // probably indicate it as such. - break; - } - - Err(async_broadcast::TryRecvError::Overflowed(lost)) => { - tracing::warn!("Missed {lost} transactions due to backlog"); - continue; - } - } - } - } -} - -#[cfg(test)] -mod test { - use std::collections::HashMap; - use std::sync::Arc; - - use async_broadcast::broadcast; - use committable::RawCommitmentBuilder; - use hotshot_example_types::block_types::TestTransaction; - use hotshot_example_types::node_types::TestTypes; - use hotshot_types::data::ViewNumber; - use hotshot_types::data::{Leaf, QuorumProposal}; - use hotshot_types::traits::node_implementation::{ConsensusTime, NodeType}; - use hotshot_types::utils::BuilderCommitment; - - use super::DaProposalMessage; - use super::MessageType; - use super::ParentBlockReferences; - use crate::testing::{calc_proposal_msg, create_builder_state}; - - /// This test the function `process_da_propsal`. - /// It checkes da_proposal_payload_commit_to_da_proposal change appropriately - /// when receiving a da proposal message. - /// This test also checks whether corresponding BuilderStateId is in global_state. - #[async_std::test] - async fn test_process_da_proposal() { - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - tracing::info!("Testing the function `process_da_proposal` in `builder_state.rs`"); - - // Number of views to simulate - const NUM_ROUNDS: usize = 5; - // Capacity of broadcast channels - const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; - // Number of nodes on DA committee - const NUM_STORAGE_NODES: usize = 4; - - // create builder_state without entering event loop - let (_senders, global_state, mut builder_state) = - create_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; - - // randomly generate a transaction - let transactions = vec![TestTransaction::new(vec![1, 2, 3]); 3]; - let (_quorum_proposal, _quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions.clone()).await; - - // sub-test one - // call process_da_proposal without matching quorum proposal message - // da_proposal_payload_commit_to_da_proposal should insert the message - let mut correct_da_proposal_payload_commit_to_da_proposal: HashMap< - (BuilderCommitment, ::View), - Arc>, - > = HashMap::new(); - - builder_state - .process_da_proposal(da_proposal_msg.clone()) - .await; - correct_da_proposal_payload_commit_to_da_proposal.insert( - ( - da_proposal_msg.builder_commitment.clone(), - da_proposal_msg.view_number, - ), - da_proposal_msg, - ); - assert_eq!( - builder_state - .da_proposal_payload_commit_to_da_proposal - .clone(), - correct_da_proposal_payload_commit_to_da_proposal.clone(), - ); - // check global_state didn't change - if global_state - .read_arc() - .await - .spawned_builder_states - .contains_key(&builder_state_id) - { - panic!("global_state shouldn't have cooresponding builder_state_id without matching quorum proposal."); - } - - // sub-test two - // call process_da_proposal with the same msg again - // we should skip the process and everything should be the same - let transactions_1 = transactions.clone(); - let (_quorum_proposal_1, _quorum_proposal_msg_1, da_proposal_msg_1, builder_state_id_1) = - calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions_1).await; - builder_state - .process_da_proposal(da_proposal_msg_1.clone()) - .await; - assert_eq!( - builder_state - .da_proposal_payload_commit_to_da_proposal - .clone(), - correct_da_proposal_payload_commit_to_da_proposal.clone(), - ); - // check global_state didn't change - if global_state - .read_arc() - .await - .spawned_builder_states - .contains_key(&builder_state_id_1) - { - panic!("global_state shouldn't have cooresponding builder_state_id without matching quorum proposal."); - } - - // sub-test three - // add the matching quorum proposal message with different tx - // and call process_da_proposal with this matching da proposal message and quorum proposal message - // we should spawn_clone here - // and check whether global_state has correct BuilderStateId - let transactions_2 = vec![TestTransaction::new(vec![1, 2, 3, 4]); 2]; - let (_quorum_proposal_2, quorum_proposal_msg_2, da_proposal_msg_2, builder_state_id_2) = - calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions_2).await; - - // process quorum proposal first, so that later when process_da_proposal we can directly call `build_block` and skip storage - builder_state - .process_quorum_proposal(quorum_proposal_msg_2.clone()) - .await; - - // process da proposal message and do the check - builder_state - .process_da_proposal(da_proposal_msg_2.clone()) - .await; - assert_eq!( - builder_state - .da_proposal_payload_commit_to_da_proposal - .clone(), - correct_da_proposal_payload_commit_to_da_proposal.clone(), - ); - // check global_state has this new builder_state_id - if global_state - .read_arc() - .await - .spawned_builder_states - .contains_key(&builder_state_id_2) - { - tracing::debug!("global_state updated successfully"); - } else { - panic!("global_state should have cooresponding builder_state_id as now we have matching quorum proposal."); - } - } - - /// This test the function `process_quorum_propsal`. - /// It checkes quorum_proposal_payload_commit_to_quorum_proposal change appropriately - /// when receiving a quorum proposal message. - /// This test also checks whether corresponding BuilderStateId is in global_state. - #[async_std::test] - async fn test_process_quorum_proposal() { - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - tracing::info!("Testing the function `process_quorum_proposal` in `builder_state.rs`"); - - // Number of views to simulate - const NUM_ROUNDS: usize = 5; - // Capacity of broadcast channels - const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; - // Number of nodes on DA committee - const NUM_STORAGE_NODES: usize = 4; - - // create builder_state without entering event loop - let (_senders, global_state, mut builder_state) = - create_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; - - // randomly generate a transaction - let transactions = vec![TestTransaction::new(vec![1, 2, 3]); 3]; - let (_quorum_proposal, quorum_proposal_msg, _da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions.clone()).await; - - // sub-test one - // call process_quorum_proposal without matching da proposal message - // quorum_proposal_payload_commit_to_quorum_proposal should insert the message - let mut correct_quorum_proposal_payload_commit_to_quorum_proposal = HashMap::new(); - builder_state - .process_quorum_proposal(quorum_proposal_msg.clone()) - .await; - correct_quorum_proposal_payload_commit_to_quorum_proposal.insert( - ( - quorum_proposal_msg - .proposal - .data - .block_header - .builder_commitment - .clone(), - quorum_proposal_msg.proposal.data.view_number, - ), - quorum_proposal_msg.proposal, - ); - assert_eq!( - builder_state - .quorum_proposal_payload_commit_to_quorum_proposal - .clone(), - correct_quorum_proposal_payload_commit_to_quorum_proposal.clone() - ); - // check global_state didn't change - if global_state - .read_arc() - .await - .spawned_builder_states - .contains_key(&builder_state_id) - { - panic!("global_state shouldn't have cooresponding builder_state_id without matching quorum proposal."); - } - - // sub-test two - // add the matching da proposal message with different tx - // and call process_da_proposal with this matching quorum proposal message and quorum da message - // we should spawn_clone here - // and check whether global_state has correct BuilderStateId - let transactions_2 = vec![TestTransaction::new(vec![2, 3, 4]); 2]; - let (_quorum_proposal_2, quorum_proposal_msg_2, da_proposal_msg_2, builder_state_id_2) = - calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions_2).await; - - // process da proposal message first, so that later when process_da_proposal we can directly call `build_block` and skip storage - builder_state - .process_da_proposal(da_proposal_msg_2.clone()) - .await; - - // process quorum proposal, and do the check - builder_state - .process_quorum_proposal(quorum_proposal_msg_2.clone()) - .await; - assert_eq!( - builder_state - .quorum_proposal_payload_commit_to_quorum_proposal - .clone(), - correct_quorum_proposal_payload_commit_to_quorum_proposal.clone() - ); - // check global_state has this new builder_state_id - if global_state - .read_arc() - .await - .spawned_builder_states - .contains_key(&builder_state_id_2) - { - tracing::debug!("global_state updated successfully"); - } else { - panic!("global_state should have cooresponding builder_state_id as now we have matching da proposal."); - } - } - - /// This test the function `process_decide_event`. - /// It checkes whether we exit out correct builder states when there's a decide event coming in. - /// This test also checks whether corresponding BuilderStateId is removed in global_state. - #[async_std::test] - async fn test_process_decide_event() { - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - tracing::info!("Testing the builder core with multiple messages from the channels"); - - // Number of views to simulate - const NUM_ROUNDS: usize = 5; - // Number of transactions to submit per round - const NUM_TXNS_PER_ROUND: usize = 4; - // Capacity of broadcast channels - const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; - // Number of nodes on DA committee - const NUM_STORAGE_NODES: usize = 4; - - // create builder_state without entering event loop - let (_senders, global_state, mut builder_state) = - create_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; - - // Transactions to send - let all_transactions = (0..NUM_ROUNDS) - .map(|round| { - (0..NUM_TXNS_PER_ROUND) - .map(|tx_num| TestTransaction::new(vec![round as u8, tx_num as u8])) - .collect::>() - }) - .collect::>(); - let mut prev_quorum_proposal: Option> = None; - // register some builder states for later decide event - #[allow(clippy::needless_range_loop)] - for round in 0..NUM_ROUNDS { - let transactions = all_transactions[round].clone(); - let (quorum_proposal, _quorum_proposal_msg, _da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions) - .await; - prev_quorum_proposal = Some(quorum_proposal.clone()); - let (req_sender, _req_receiver) = broadcast(CHANNEL_CAPACITY); - let leaf: Leaf = Leaf::from_quorum_proposal(&quorum_proposal); - let leaf_commit = RawCommitmentBuilder::new("leaf commitment") - .u64_field("view number", leaf.view_number().u64()) - .u64_field("block number", leaf.height()) - .field("parent Leaf commitment", leaf.parent_commitment()) - .var_size_field( - "block payload commitment", - leaf.payload_commitment().as_ref(), - ) - .finalize(); - global_state.write_arc().await.register_builder_state( - builder_state_id, - ParentBlockReferences { - view_number: quorum_proposal.view_number, - vid_commitment: quorum_proposal.block_header.payload_commitment, - leaf_commit, - builder_commitment: quorum_proposal.block_header.builder_commitment, - }, - req_sender, - ); - } - - // send out a decide event in a middle round - let latest_decide_view_number = ViewNumber::new(3); - - let decide_message = MessageType::DecideMessage(crate::builder_state::DecideMessage { - latest_decide_view_number, - }); - if let MessageType::DecideMessage(practice_decide_msg) = decide_message.clone() { - builder_state - .process_decide_event(practice_decide_msg.clone()) - .await; - } else { - panic!("Not a decide_message in correct format"); - } - // check whether spawned_builder_states have correct builder_state_id and already exit-ed builder_states older than decides - let current_spawned_builder_states = - global_state.read_arc().await.spawned_builder_states.clone(); - current_spawned_builder_states - .iter() - .for_each(|(builder_state_id, _)| { - assert!(builder_state_id.parent_view >= latest_decide_view_number) - }); - } -} diff --git a/crates/marketplace/src/hooks.rs b/crates/marketplace/src/hooks.rs new file mode 100644 index 00000000..761218ff --- /dev/null +++ b/crates/marketplace/src/hooks.rs @@ -0,0 +1,43 @@ +use std::marker::PhantomData; + +use async_trait::async_trait; +use hotshot::types::Event; +use hotshot_types::traits::node_implementation::NodeType; + +#[async_trait] +pub trait BuilderHooks: Sync + Send + 'static { + #[inline(always)] + async fn process_transactions( + &self, + transactions: Vec, + ) -> Vec { + transactions + } + + #[inline(always)] + async fn handle_hotshot_event(&self, _event: &Event) {} +} + +#[async_trait] +impl BuilderHooks for Box +where + Types: NodeType, + T: BuilderHooks, +{ + #[inline(always)] + async fn process_transactions( + &self, + transactions: Vec, + ) -> Vec { + (**self).process_transactions(transactions).await + } + + #[inline(always)] + async fn handle_hotshot_event(&self, event: &Event) { + (**self).handle_hotshot_event(event).await + } +} + +pub struct NoHooks(pub PhantomData); + +impl BuilderHooks for NoHooks {} diff --git a/crates/marketplace/src/lib.rs b/crates/marketplace/src/lib.rs index ddd3365e..9bacf40d 100644 --- a/crates/marketplace/src/lib.rs +++ b/crates/marketplace/src/lib.rs @@ -11,15 +11,9 @@ // It also provides one API services external users: // 1. Serves a user's request to submit a private transaction -// providing the core services to support above API services -pub mod builder_state; - -// Core interaction with the HotShot network +pub mod hooks; pub mod service; -// utilities -pub mod utils; - // tracking the testing #[cfg(test)] pub mod testing; diff --git a/crates/marketplace/src/service.rs b/crates/marketplace/src/service.rs index 368b8d3c..eed0a324 100644 --- a/crates/marketplace/src/service.rs +++ b/crates/marketplace/src/service.rs @@ -1,284 +1,60 @@ -use std::{fmt::Debug, marker::PhantomData, time::Duration}; +use std::time::Duration; -use crate::{ - builder_state::{ - BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QuorumProposalMessage, - RequestMessage, ResponseMessage, TransactionSource, - }, - utils::LegacyCommit as _, +use async_compatibility_layer::art::async_sleep; +use marketplace_builder_shared::{ + block::{BuilderStateId, ReceivedTransaction, TransactionSource}, + coordinator::{BuilderStateCoordinator, BuilderStateLookup}, + state::BuilderState, }; -use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; -use anyhow::bail; pub use async_broadcast::{broadcast, RecvError, TryRecvError}; -use async_broadcast::{InactiveReceiver, Sender as BroadcastSender, TrySendError}; use async_lock::RwLock; +#[cfg(async_executor_impl = "async-std")] +use async_std::task::JoinHandle; use async_trait::async_trait; use committable::{Commitment, Committable}; -use derivative::Derivative; -use futures::stream::StreamExt; -use futures::{future::BoxFuture, Stream}; +use futures::{future::BoxFuture, stream::FuturesUnordered, Stream}; +use futures::{ + stream::{FuturesOrdered, StreamExt}, + TryStreamExt, +}; use hotshot::types::Event; use hotshot_builder_api::v0_3::{ builder::{define_api, submit_api, BuildError, Error as BuilderApiError}, data_source::{AcceptsTxnSubmits, BuilderDataSource}, }; use hotshot_types::bundle::Bundle; -use hotshot_types::traits::block_contents::BuilderFee; +use hotshot_types::traits::block_contents::{BuilderFee, Transaction}; use hotshot_types::{ - data::{DaProposal, Leaf, QuorumProposal, ViewNumber}, event::EventType, - message::Proposal, traits::{ - block_contents::BlockPayload, node_implementation::{ConsensusTime, NodeType}, signature_key::{BuilderSignatureKey, SignatureKey}, }, - utils::BuilderCommitment, vid::VidCommitment, }; -use sha2::{Digest, Sha256}; use std::collections::HashMap; -use std::num::NonZeroUsize; use std::sync::Arc; use std::{fmt::Display, time::Instant}; use tagged_base64::TaggedBase64; use tide_disco::{app::AppError, method::ReadState, App}; -use tracing::{error, instrument}; +#[cfg(async_executor_impl = "tokio")] +use tokio::task::JoinHandle; +use tracing::Level; use vbs::version::StaticVersion; pub use marketplace_builder_shared::utils::EventServiceStream; -// It holds all the necessary information for a block -#[derive(Debug)] -pub struct BlockInfo { - pub block_payload: Types::BlockPayload, - pub metadata: <::BlockPayload as BlockPayload>::Metadata, - pub offered_fee: u64, -} - -// It holds the information for the proposed block -#[derive(Debug)] -pub struct ProposedBlockId { - pub parent_commitment: VidCommitment, - pub payload_commitment: BuilderCommitment, - pub parent_view: Types::View, -} - -impl ProposedBlockId { - pub fn new( - parent_commitment: VidCommitment, - payload_commitment: BuilderCommitment, - parent_view: Types::View, - ) -> Self { - ProposedBlockId { - parent_commitment, - payload_commitment, - parent_view, - } - } -} - -#[derive(Debug, Derivative)] -#[derivative(Default(bound = ""))] -pub struct BuilderStatesInfo { - // list of all the builder states spawned for a view - pub vid_commitments: Vec, - // list of all the proposed blocks for a view - pub block_ids: Vec>, -} - -#[derive(Debug)] -pub struct ReceivedTransaction { - // the transaction - pub tx: Types::Transaction, - // its hash - pub commit: Commitment, - // its source - pub source: TransactionSource, - // received time - pub time_in: Instant, -} - -#[allow(clippy::type_complexity)] -#[derive(Debug)] -pub struct GlobalState { - // data store for the blocks - pub blocks: lru::LruCache, BlockInfo>, - - // registered builder states - pub spawned_builder_states: HashMap< - BuilderStateId, - ( - // This is provided as an Option for convenience with initialization. - // When we build the initial state, we don't necessarily want to - // have to generate a valid `ParentBlockReferences` object and register its leaf - // commitment, as doing such would require a bit of setup. Additionally it would - // result in the call signature to `GlobalState::new` changing. - // However for every subsequent BuilderState, we expect this value - // to be populated. - Option>>, - BroadcastSender>, - ), - >, - - // builder state -> last built block , it is used to respond the client - // if the req channel times out during get_available_blocks - pub builder_state_to_last_built_block: HashMap, ResponseMessage>, - - // sending a transaction from the hotshot/private mempool to the builder states - // NOTE: Currently, we don't differentiate between the transactions from the hotshot and the private mempool - pub tx_sender: BroadcastSender>>, - - // last garbage collected view number - pub last_garbage_collected_view_num: Types::View, - - // highest view running builder task - pub highest_view_num_builder_id: BuilderStateId, -} - -impl GlobalState { - #[allow(clippy::too_many_arguments)] - pub fn new( - bootstrap_sender: BroadcastSender>, - tx_sender: BroadcastSender>>, - bootstrapped_builder_state_id: VidCommitment, - bootstrapped_view_num: Types::View, - ) -> Self { - let mut spawned_builder_states = HashMap::new(); - let bootstrap_id = BuilderStateId { - parent_commitment: bootstrapped_builder_state_id, - parent_view: bootstrapped_view_num, - }; - spawned_builder_states.insert(bootstrap_id.clone(), (None, bootstrap_sender.clone())); - GlobalState { - blocks: lru::LruCache::new(NonZeroUsize::new(256).unwrap()), - spawned_builder_states, - tx_sender, - last_garbage_collected_view_num: bootstrapped_view_num, - builder_state_to_last_built_block: Default::default(), - highest_view_num_builder_id: bootstrap_id, - } - } - - pub fn register_builder_state( - &mut self, - parent_id: BuilderStateId, - parent_block_references: ParentBlockReferences, - request_sender: BroadcastSender>, - ) { - // register the builder state - self.spawned_builder_states.insert( - parent_id.clone(), - (Some(parent_block_references.leaf_commit), request_sender), - ); - - // keep track of the max view number - if parent_id.parent_view > self.highest_view_num_builder_id.parent_view { - tracing::info!("registering builder {parent_id} as highest",); - self.highest_view_num_builder_id = parent_id; - } else { - tracing::warn!( - "builder {parent_id} created; highest registered is {}", - self.highest_view_num_builder_id, - ); - } - } - - pub fn update_global_state( - &mut self, - state_id: BuilderStateId, - build_block_info: BuildBlockInfo, - response_msg: ResponseMessage, - ) { - if self.blocks.contains(&build_block_info.id) { - self.blocks.promote(&build_block_info.id) - } else { - self.blocks.push( - build_block_info.id, - BlockInfo { - block_payload: build_block_info.block_payload, - metadata: build_block_info.metadata, - offered_fee: build_block_info.offered_fee, - }, - ); - } - - // update the builder state to last built block - self.builder_state_to_last_built_block - .insert(state_id, response_msg); - } - - // remove the builder state handles based on the decide event - pub fn remove_handles(&mut self, on_decide_view: Types::View) -> Types::View { - // remove everything from the spawned builder states when view_num <= on_decide_view; - // if we don't have a highest view > decide, use highest view as cutoff. - let cutoff = std::cmp::min(self.highest_view_num_builder_id.parent_view, on_decide_view); - self.spawned_builder_states - .retain(|id, _| id.parent_view >= cutoff); - - let cutoff_u64 = cutoff.u64(); - let gc_view = if cutoff_u64 > 0 { cutoff_u64 - 1 } else { 0 }; - - self.last_garbage_collected_view_num = Types::View::new(gc_view); - - cutoff - } - - // private mempool submit txn - // Currently, we don't differentiate between the transactions from the hotshot and the private mempool - pub async fn submit_client_txns( - &self, - txns: Vec<::Transaction>, - ) -> Vec::Transaction>, BuildError>> { - handle_received_txns(&self.tx_sender, txns, TransactionSource::External).await - } - - pub fn get_channel_for_matching_builder_or_highest_view_buider( - &self, - key: &BuilderStateId, - ) -> Result<&BroadcastSender>, BuildError> { - if let Some(id_and_sender) = self.spawned_builder_states.get(key) { - tracing::info!("Got matching builder for parent {}", key); - Ok(&id_and_sender.1) - } else { - tracing::warn!( - "failed to recover builder for parent {}, using higest view num builder with {}", - key, - self.highest_view_num_builder_id, - ); - // get the sender for the highest view number builder - self.spawned_builder_states - .get(&self.highest_view_num_builder_id) - .map(|(_, sender)| sender) - .ok_or_else(|| BuildError::Error("No builder state found".to_string())) - } - } - - // check for the existence of the builder state for a view - pub fn check_builder_state_existence_for_a_view(&self, key: &Types::View) -> bool { - // iterate over the spawned builder states and check if the view number exists - self.spawned_builder_states - .iter() - .any(|(id, _)| id.parent_view == *key) - } - - pub fn should_view_handle_other_proposals( - &self, - builder_view: &Types::View, - proposal_view: &Types::View, - ) -> bool { - *builder_view == self.highest_view_num_builder_id.parent_view - && !self.check_builder_state_existence_for_a_view(proposal_view) - } -} - -pub struct ProxyGlobalState> { - // global state - global_state: Arc>>, +use crate::hooks::BuilderHooks; - // hooks - hooks: Arc, +/// The main type implementing the marketplace builder. +pub struct GlobalState +where + Types: NodeType, + Hooks: BuilderHooks, +{ + // coordinator + coordinator: Arc>, // identity keys for the builder // May be ideal place as GlobalState interacts with hotshot apis @@ -290,43 +66,64 @@ pub struct ProxyGlobalState> { // Maximum time allotted to wait for bundle before returning an error api_timeout: Duration, + + maximize_txn_capture_timeout: Duration, + + bundle_cache: RwLock, Bundle>>, + + base_fee: u64, + + hooks: Arc, } -impl ProxyGlobalState +impl GlobalState where Types: NodeType, - H: BuilderHooks, + Hooks: BuilderHooks, for<'a> <::PureAssembledSignatureType as TryFrom< &'a TaggedBase64, >>::Error: Display, for<'a> >::Error: Display, { pub fn new( - global_state: Arc>>, - hooks: Arc, builder_keys: ( Types::BuilderSignatureKey, <::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey, ), api_timeout: Duration, - ) -> Self { - ProxyGlobalState { - hooks, - global_state, + maximize_txn_capture_timeout: Duration, + txn_garbage_collect_duration: Duration, + txn_channel_capacity: usize, + base_fee: u64, + hooks: Hooks, + ) -> Arc { + let coordinator = + BuilderStateCoordinator::new(txn_channel_capacity, txn_garbage_collect_duration); + Arc::new(Self { + hooks: Arc::new(hooks), + coordinator: Arc::new(coordinator), builder_keys, api_timeout, - } + maximize_txn_capture_timeout, + bundle_cache: RwLock::new(HashMap::new()), + base_fee, + }) } /// Consumes `self` and returns a `tide_disco` [`App`] with builder and private mempool APIs registered - pub fn into_app(self) -> Result, AppError> { - let builder_api = define_api::(&Default::default())?; + pub fn into_app( + self: Arc, + ) -> Result, BuilderApiError>, AppError> { + let proxy = ProxyGlobalState(self); + let builder_api = define_api::, Types>(&Default::default())?; // TODO: Replace StaticVersion with proper constant when added in HotShot let private_mempool_api = - submit_api::>(&Default::default())?; + submit_api::, Types, StaticVersion<0, 1>>( + &Default::default(), + )?; - let mut app: App, BuilderApiError> = App::with_state(self); + let mut app: App, BuilderApiError> = App::with_state(proxy); app.register_module( hotshot_types::constants::MARKETPLACE_BUILDER_MODULE, @@ -337,22 +134,171 @@ where Ok(app) } + + /// Spawns an event loop handling HotShot events from the provided stream. + /// Returns a handle for the spawned task. + pub fn start_event_loop( + &self, + event_stream: impl Stream> + Unpin + Send + 'static, + ) -> JoinHandle> { + async_compatibility_layer::art::async_spawn(Self::event_loop( + self.coordinator.clone(), + self.hooks.clone(), + event_stream, + )) + } + + /// Internal implementation of the event loop, drives the underlying coordinator + /// and runs hooks + async fn event_loop( + coordinator: Arc>, + hooks: Arc, + mut event_stream: impl Stream> + Unpin + Send + 'static, + ) -> anyhow::Result<()> { + loop { + let Some(event) = event_stream.next().await else { + anyhow::bail!("Event stream ended"); + }; + + hooks.handle_hotshot_event(&event).await; + + match event.event { + EventType::Error { error } => { + tracing::error!("Error event in HotShot: {:?}", error); + } + EventType::Transactions { transactions } => { + let transactions = hooks.process_transactions(transactions).await; + + // TODO: record results + let _ = transactions + .into_iter() + .map(|txn| { + coordinator.handle_transaction(ReceivedTransaction::new( + txn, + TransactionSource::Public, + )) + }) + .collect::>() + .collect::>() + .await; + } + EventType::Decide { leaf_chain, .. } => { + coordinator.handle_decide(leaf_chain).await; + } + EventType::DaProposal { proposal, .. } => { + coordinator.handle_da_proposal(proposal.data).await; + } + EventType::QuorumProposal { proposal, .. } => { + coordinator.handle_quorum_proposal(proposal.data).await; + } + _ => {} + } + } + } + + /// Collect transactions to include in the bundle. Will wait until we have + /// at least one transaction or up to the configured `maximize_txn_capture_timeout` duration elapses. + #[tracing::instrument(skip_all, fields(builder_parent_block_references = %state.parent_block_references))] + async fn collect_transactions( + &self, + state: &Arc>, + ) -> Option> { + // collect all the transactions from the near future + let timeout_after = Instant::now() + self.maximize_txn_capture_timeout; + let sleep_interval = self.maximize_txn_capture_timeout / 10; + while Instant::now() <= timeout_after { + let queue_populated = state.collect_txns(timeout_after).await; + + if queue_populated || Instant::now() + sleep_interval > timeout_after { + // we don't have time for another iteration + break; + } + + async_sleep(sleep_interval).await + } + + let transactions = state + .txn_queue + .read() + .await + .iter() + .map(|txn| txn.transaction.clone()) + .collect(); + + Some(transactions) + } + + /// Assembles a [`Bundle`] for a certain view from a list of transactions by adding fee and signature + async fn assemble_bundle( + &self, + transactions: Vec, + view_number: u64, + ) -> Result, BuildError> { + let bundle_size: u64 = transactions + .iter() + .map(|txn| txn.minimum_block_size()) + .sum(); + let offered_fee = self.base_fee * bundle_size; + + let fee_signature = + ::sign_sequencing_fee_marketplace( + &self.builder_keys.1, + offered_fee, + view_number, + ) + .map_err(|e| BuildError::Error(e.to_string()))?; + + let sequencing_fee: BuilderFee = BuilderFee { + fee_amount: offered_fee, + fee_account: self.builder_keys.0.clone(), + fee_signature, + }; + + let commitments = transactions + .iter() + .flat_map(|txn| <[u8; 32]>::from(txn.commit())) + .collect::>(); + + let signature = ::sign_builder_message( + &self.builder_keys.1, + &commitments, + ) + .map_err(|e| BuildError::Error(e.to_string()))?; + + Ok(Bundle { + sequencing_fee, + transactions, + signature, + }) + } } +#[derive(derive_more::Deref, derive_more::DerefMut)] +#[deref(forward)] +#[deref_mut(forward)] +pub struct ProxyGlobalState(pub Arc>) +where + Types: NodeType, + Hooks: BuilderHooks; + /* Handling Builder API responses */ #[async_trait] -impl BuilderDataSource for ProxyGlobalState +impl BuilderDataSource for ProxyGlobalState where Types: NodeType, - H: BuilderHooks, + Hooks: BuilderHooks, for<'a> <::PureAssembledSignatureType as TryFrom< &'a TaggedBase64, >>::Error: Display, for<'a> >::Error: Display, { - #[tracing::instrument(skip(self))] + #[tracing::instrument( + skip(self), + err(level = Level::INFO) + ret(level = Level::TRACE) + )] async fn bundle( &self, parent_view: u64, @@ -374,112 +320,42 @@ where return Err(BuildError::NotFound); }; - let Some(id_and_sender) = self - .global_state - .read_arc() - .await - .spawned_builder_states - .get(&state_id) - .cloned() - else { - let global_state = self.global_state.read_arc().await; - - let past_gc = parent_view <= global_state.last_garbage_collected_view_num; - // Used as an indicator that we're just bootstrapping, as they should be equal at bootstrap - // and never otherwise. - let last_gc_view = global_state.last_garbage_collected_view_num; - let highest_observed_view = global_state.highest_view_num_builder_id.parent_view; - let is_bootstrapping = last_gc_view == highest_observed_view; - - // Explicitly drop `global_state` to avoid the lock while sleeping in `else`. - drop(global_state); - - if past_gc && !is_bootstrapping { + let builder_state = match self.coordinator.lookup_builder_state(&state_id).await { + BuilderStateLookup::Found(builder_state_entry) => builder_state_entry, + BuilderStateLookup::NotFound => { + // If we couldn't find the state because it hasn't yet been created, try again + async_sleep(self.api_timeout / 10).await; + continue; + } + BuilderStateLookup::Decided => { // If we couldn't find the state because the view has already been decided, we can just return an error - tracing::warn!( - last_gc_view = ?last_gc_view, - highest_observed_view = ?highest_observed_view, - "Requested a bundle for view we already GCd as decided", - ); + tracing::warn!("Requested a bundle for view we already GCd as decided",); return Err(BuildError::Error( "Request for a bundle for a view that has already been decided.".to_owned(), )); - } else { - // If we couldn't find the state because it hasn't yet been created, try again - async_compatibility_layer::art::async_sleep(self.api_timeout / 10).await; - continue; } }; - let (response_sender, response_receiver) = - async_compatibility_layer::channel::unbounded(); + tracing::info!( + "Request handled by builder with view {}@{:?} for (view_num: {:?})", + builder_state.parent_block_references.vid_commitment, + builder_state.parent_block_references.view_number, + parent_view + ); - let request = RequestMessage { - requested_view_number: parent_view, - response_channel: response_sender, + let Some(transactions) = self.collect_transactions(&builder_state).await else { + tracing::debug!("No response to send"); + return Err(BuildError::NotFound); }; - id_and_sender - .1 - .broadcast(MessageType::RequestMessage(request)) - .await - .map_err(|err| { - tracing::warn!(%err, "Error requesting bundle"); - - BuildError::Error("Error requesting bundle".to_owned()) - })?; + let bundle = self.assemble_bundle(transactions, view_number).await?; - let response = async_compatibility_layer::art::async_timeout( - self.api_timeout.saturating_sub(start.elapsed()), - response_receiver.recv(), - ) - .await - .map_err(|err| { - tracing::warn!(%err, "Couldn't get a bundle in time"); - - BuildError::NotFound - })? - .map_err(|err| { - tracing::warn!(%err, "Channel closed while waiting for bundle"); - - BuildError::Error("Channel closed while waiting for bundle".to_owned()) - })?; - - let fee_signature = - ::sign_sequencing_fee_marketplace( - &self.builder_keys.1, - response.offered_fee, - view_number - ) - .map_err(|e| BuildError::Error(e.to_string()))?; - - let sequencing_fee: BuilderFee = BuilderFee { - fee_amount: response.offered_fee, - fee_account: self.builder_keys.0.clone(), - fee_signature, - }; - - let commitments = response - .transactions - .iter() - .flat_map(|txn| <[u8; 32]>::from(txn.commit())) - .collect::>(); - - let signature = - ::sign_builder_message( - &self.builder_keys.1, - &commitments, - ) - .map_err(|e| BuildError::Error(e.to_string()))?; - - let bundle = Bundle { - sequencing_fee, - transactions: response.transactions, - signature, - }; + self.bundle_cache + .write() + .await + .insert(state_id, bundle.clone()); tracing::info!("Serving bundle"); - tracing::trace!(?bundle); return Ok(bundle); } @@ -493,46 +369,39 @@ where } #[async_trait] -impl AcceptsTxnSubmits for ProxyGlobalState +impl AcceptsTxnSubmits for ProxyGlobalState where + Hooks: BuilderHooks, Types: NodeType, - H: BuilderHooks, { async fn submit_txns( &self, txns: Vec<::Transaction>, ) -> Result::Transaction>>, BuildError> { - tracing::debug!( - "Submitting {:?} transactions to the builder states{:?}", - txns.len(), - txns.iter().map(|txn| txn.commit()).collect::>() - ); let txns = self.hooks.process_transactions(txns).await; - let response = self - .global_state - .read_arc() + + txns.into_iter() + .map(|txn| ReceivedTransaction::new(txn, TransactionSource::Private)) + .map(|txn| async { + let commit = txn.commit; + self.coordinator + .handle_transaction(txn) + .await + .map(|_| commit) + }) + .collect::>() + .try_collect() .await - .submit_client_txns(txns) - .await; - - tracing::debug!( - "Transaction submitted to the builder states, sending response: {:?}", - response - ); - - // NOTE: ideally we want to respond with original Vec - // instead of Result not to loose any information, - // but this requires changes to builder API - response.into_iter().collect() } } + #[async_trait] -impl ReadState for ProxyGlobalState +impl ReadState for ProxyGlobalState where Types: NodeType, - H: BuilderHooks + 'static, + Hooks: BuilderHooks, { - type State = ProxyGlobalState; + type State = Self; async fn read( &self, @@ -541,306 +410,3 @@ where op(self).await } } - -pub fn broadcast_channels( - capacity: usize, -) -> (BroadcastSenders, BroadcastReceivers) { - macro_rules! pair { - ($s:ident, $r:ident) => { - let ($s, $r) = broadcast(capacity); - let $r = $r.deactivate(); - }; - } - - pair!(tx_sender, tx_receiver); - pair!(da_sender, da_receiver); - pair!(quorum_sender, quorum_proposal_receiver); - pair!(decide_sender, decide_receiver); - - ( - BroadcastSenders { - transactions: tx_sender, - da_proposal: da_sender, - quorum_proposal: quorum_sender, - decide: decide_sender, - }, - BroadcastReceivers { - transactions: tx_receiver, - da_proposal: da_receiver, - quorum_proposal: quorum_proposal_receiver, - decide: decide_receiver, - }, - ) -} - -// Receivers for HotShot events for the builder states -pub struct BroadcastReceivers { - /// For transactions, shared. - pub transactions: InactiveReceiver>>, - /// For the DA proposal. - pub da_proposal: InactiveReceiver>, - /// For the quorum proposal. - pub quorum_proposal: InactiveReceiver>, - /// For the decide. - pub decide: InactiveReceiver>, -} - -// Senders to broadcast data from HotShot to the builder states. -pub struct BroadcastSenders { - /// For transactions, shared. - pub transactions: BroadcastSender>>, - /// For the DA proposal. - pub da_proposal: BroadcastSender>, - /// For the quorum proposal. - pub quorum_proposal: BroadcastSender>, - /// For the decide. - pub decide: BroadcastSender>, -} - -#[async_trait] -pub trait BuilderHooks: Sync + Send + 'static { - #[inline(always)] - async fn process_transactions( - &self, - transactions: Vec, - ) -> Vec { - transactions - } - - #[inline(always)] - async fn handle_hotshot_event(&self, _event: &Event) {} -} - -#[async_trait] -impl BuilderHooks for Box -where - Types: NodeType, - T: BuilderHooks, -{ - #[inline(always)] - async fn process_transactions( - &self, - transactions: Vec, - ) -> Vec { - (**self).process_transactions(transactions).await - } - - #[inline(always)] - async fn handle_hotshot_event(&self, event: &Event) { - (**self).handle_hotshot_event(event).await - } -} - -pub struct NoHooks(pub PhantomData); - -impl BuilderHooks for NoHooks {} - -/// Run builder service, -/// Refer to documentation for [`ProxyGlobalState`] for more details -pub async fn run_builder_service< - Types: NodeType, - S: Stream> + Unpin, ->( - hooks: Arc>, - senders: BroadcastSenders, - hotshot_event_stream: S, -) -> Result<(), anyhow::Error> { - let mut hotshot_event_stream = std::pin::pin!(hotshot_event_stream); - loop { - let Some(event) = hotshot_event_stream.next().await else { - bail!("Event stream ended"); - }; - - hooks.handle_hotshot_event(&event).await; - - match event.event { - EventType::Error { error } => { - error!("Error event in HotShot: {:?}", error); - } - // tx event - EventType::Transactions { transactions } => { - let transactions = hooks.process_transactions(transactions).await; - - for res in handle_received_txns( - &senders.transactions, - transactions, - TransactionSource::HotShot, - ) - .await - { - if let Err(e) = res { - tracing::warn!("Failed to handle transactions; {:?}", e); - } - } - } - // decide event - EventType::Decide { - block_size: _, - leaf_chain, - qc: _, - } => { - let latest_decide_view_num = leaf_chain[0].leaf.view_number(); - handle_decide_event(&senders.decide, latest_decide_view_num).await; - } - // DA proposal event - EventType::DaProposal { proposal, sender } => { - handle_da_event(&senders.da_proposal, proposal, sender).await; - } - // Quorum proposal event - EventType::QuorumProposal { proposal, sender } => { - handle_quorum_event(&senders.quorum_proposal, Arc::new(proposal), sender).await; - } - _ => { - tracing::trace!("Unhandled event from Builder: {:?}", event.event); - } - } - } -} - -/* -Utility functions to handle the hotshot events -*/ -#[instrument(skip_all, fields(sender, da_proposal.data.view_number))] -async fn handle_da_event( - da_channel_sender: &BroadcastSender>, - da_proposal: Proposal>, - sender: ::SignatureKey, -) { - // get the encoded transactions hash - let encoded_txns_hash = Sha256::digest(&da_proposal.data.encoded_transactions); - // check if the sender is the leader and the signature is valid; if yes, broadcast the DA proposal - if !sender.validate(&da_proposal.signature, &encoded_txns_hash) { - error!("Validation Failure on DaProposal"); - return; - } - - let view_number = da_proposal.data.view_number; - tracing::debug!("Sending DA proposal to the builder states",); - - // form a block payload from the encoded transactions - let block_payload = >::from_bytes( - &da_proposal.data.encoded_transactions, - &da_proposal.data.metadata, - ); - // get the builder commitment from the block payload - let builder_commitment = block_payload.builder_commitment(&da_proposal.data.metadata); - - let txn_commitments = block_payload - .transactions(&da_proposal.data.metadata) - // TODO: - //.filter(|txn| txn.namespace_id() != namespace_id) - .map(|txn| txn.commit()) - .collect(); - - let da_msg = DaProposalMessage { - view_number, - txn_commitments, - sender, - builder_commitment, - }; - - if let Err(e) = da_channel_sender - .broadcast(MessageType::DaProposalMessage(Arc::new(da_msg))) - .await - { - tracing::warn!( - "Error {e}, failed to send DA proposal to builder states for view {:?}", - view_number - ); - } -} - -#[instrument(skip_all, fields(sender, quorum_proposal.data.view_number))] -async fn handle_quorum_event( - quorum_channel_sender: &BroadcastSender>, - quorum_proposal: Arc>>, - sender: ::SignatureKey, -) { - let leaf = Leaf::from_quorum_proposal(&quorum_proposal.data); - - // check if the sender is the leader and the signature is valid; if yes, broadcast the Quorum proposal - if !sender.validate(&quorum_proposal.signature, leaf.legacy_commit().as_ref()) { - error!("Validation Failure on QuorumProposal"); - return; - }; - - let quorum_msg = QuorumProposalMessage:: { - proposal: quorum_proposal, - sender, - }; - let view_number = quorum_msg.proposal.data.view_number; - tracing::debug!( - "Sending Quorum proposal to the builder states for view {:?}", - view_number - ); - if let Err(e) = quorum_channel_sender - .broadcast(MessageType::QuorumProposalMessage(quorum_msg)) - .await - { - tracing::warn!( - "Error {e}, failed to send Quorum proposal to builder states for view {:?}", - view_number - ); - } -} - -async fn handle_decide_event( - decide_channel_sender: &BroadcastSender>, - latest_decide_view_number: Types::View, -) { - let decide_msg: DecideMessage = DecideMessage:: { - latest_decide_view_number, - }; - tracing::debug!( - "Sending Decide event to builder states for view {:?}", - latest_decide_view_number - ); - if let Err(e) = decide_channel_sender - .broadcast(MessageType::DecideMessage(decide_msg)) - .await - { - tracing::warn!( - "Error {e}, failed to send Decide event to builder states for view {:?}", - latest_decide_view_number - ); - } -} - -pub(crate) async fn handle_received_txns( - tx_sender: &BroadcastSender>>, - txns: Vec, - source: TransactionSource, -) -> Vec::Transaction>, BuildError>> { - let mut results = Vec::with_capacity(txns.len()); - let time_in = Instant::now(); - for tx in txns.into_iter() { - let commit = tx.commit(); - let res = tx_sender - .try_broadcast(Arc::new(ReceivedTransaction { - tx, - source: source.clone(), - commit, - time_in, - })) - .inspect(|val| { - if let Some(evicted_txn) = val { - tracing::warn!( - "Overflow mode enabled, transaction {} evicted", - evicted_txn.commit - ); - } - }) - .map(|_| commit) - .inspect_err(|err| { - tracing::warn!("Failed to broadcast txn with commit {:?}: {}", commit, err); - }) - .map_err(|err| match err { - TrySendError::Full(_) => BuildError::Error("Too many transactions".to_owned()), - e => { - BuildError::Error(format!("Internal error when submitting transaction: {}", e)) - } - }); - results.push(res); - } - results -} diff --git a/crates/marketplace/src/testing/basic_test.rs b/crates/marketplace/src/testing/basic_test.rs index 63abfc5c..6ebf5df7 100644 --- a/crates/marketplace/src/testing/basic_test.rs +++ b/crates/marketplace/src/testing/basic_test.rs @@ -1,16 +1,21 @@ -use hotshot_types::data::QuorumProposal; +use async_broadcast::broadcast; +use hotshot::types::{BLSPubKey, Event, SignatureKey}; +use hotshot_builder_api::v0_3::data_source::{AcceptsTxnSubmits, BuilderDataSource}; +use hotshot_types::data::{QuorumProposal, ViewNumber}; use async_compatibility_layer::art::async_sleep; -use async_std::prelude::FutureExt; use hotshot_example_types::block_types::TestTransaction; - -use crate::builder_state::MessageType; -use crate::{builder_state::TransactionSource, testing::TestTypes}; -use crate::{ - service::handle_received_txns, - testing::{calc_proposal_msg, get_req_msg, start_builder_state}, +use hotshot_types::traits::node_implementation::ConsensusTime; +use marketplace_builder_shared::testing::constants::{ + TEST_API_TIMEOUT, TEST_BASE_FEE, TEST_INCLUDED_TX_GC_PERIOD, TEST_MAXIMIZE_TX_CAPTURE_TIMEOUT, }; + +use crate::hooks::NoHooks; +use crate::service::{GlobalState, ProxyGlobalState}; +use crate::testing::{calc_proposal_events, TestTypes}; +use std::marker::PhantomData; +use std::sync::Arc; use std::time::Duration; /// This test simulates multiple builder states receiving messages from the channels and processing them @@ -26,10 +31,20 @@ async fn test_builder() { const NUM_TXNS_PER_ROUND: usize = 4; // Capacity of broadcast channels const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; - // Number of nodes on DA committee - const NUM_STORAGE_NODES: usize = 4; - let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; + let global_state = Arc::new(GlobalState::new( + BLSPubKey::generated_from_seed_indexed([0; 32], 0), + TEST_API_TIMEOUT, + TEST_MAXIMIZE_TX_CAPTURE_TIMEOUT, + TEST_INCLUDED_TX_GC_PERIOD, + CHANNEL_CAPACITY, + TEST_BASE_FEE, + NoHooks(PhantomData), + )); + let proxy_global_state = ProxyGlobalState(Arc::clone(&global_state)); + + let (event_stream_sender, event_stream) = broadcast(1024); + global_state.start_event_loop(event_stream); // Transactions to send let all_transactions = (0..NUM_ROUNDS) @@ -51,66 +66,48 @@ async fn test_builder() { #[allow(clippy::needless_range_loop)] // intent is clearer this way for round in 0..NUM_ROUNDS { // simulate transaction being submitted to the builder - for res in handle_received_txns( - &senders.transactions, - all_transactions[round].clone(), - TransactionSource::HotShot, - ) - .await - { - res.unwrap(); - } + proxy_global_state + .submit_txns(all_transactions[round].clone()) + .await + .unwrap(); // get transactions submitted in previous rounds, [] for genesis // and simulate the block built from those let transactions = prev_proposed_transactions.take().unwrap_or_default(); - let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions).await; + let (quorum_proposal, events, builder_state_id) = + calc_proposal_events(round, prev_quorum_proposal, transactions).await; prev_quorum_proposal = Some(quorum_proposal.clone()); // send quorum and DA proposals for this round - senders - .da_proposal - .broadcast(MessageType::DaProposalMessage(da_proposal_msg)) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(MessageType::QuorumProposalMessage(quorum_proposal_msg)) - .await - .unwrap(); - - let req_msg = get_req_msg(round as u64, builder_state_id).await; + for evt in events { + event_stream_sender + .broadcast(Event { + view_number: ViewNumber::new(round as u64), + event: evt, + }) + .await + .unwrap(); + } // give builder state time to fork async_sleep(Duration::from_millis(100)).await; - // get the builder state for parent view we've just simulated - global_state - .read_arc() - .await - .spawned_builder_states - .get(&req_msg.1) - .expect("Failed to get channel for matching builder") - .1 - .broadcast(req_msg.2.clone()) - .await - .unwrap(); - // get response - let res_msg = req_msg - .0 - .recv() - .timeout(Duration::from_secs(10)) + let bundle = proxy_global_state + .bundle( + *builder_state_id.parent_view, + &builder_state_id.parent_commitment, + round as u64 + 1, + ) .await - .unwrap() .unwrap(); + // in the next round we will use received transactions to simulate // the block being proposed - prev_proposed_transactions = Some(res_msg.transactions.clone()); + prev_proposed_transactions = Some(bundle.transactions.clone()); // save transactions to history - transaction_history.extend(res_msg.transactions); + transaction_history.extend(bundle.transactions); } // we should've served all transactions submitted, and in correct order diff --git a/crates/marketplace/src/testing/integration.rs b/crates/marketplace/src/testing/integration.rs index 14cf82fa..a90515ee 100644 --- a/crates/marketplace/src/testing/integration.rs +++ b/crates/marketplace/src/testing/integration.rs @@ -4,34 +4,23 @@ use std::{collections::HashMap, fmt::Display, marker::PhantomData, sync::Arc, time::Duration}; use async_compatibility_layer::art::async_spawn; -use async_lock::RwLock; use async_trait::async_trait; use hotshot::types::SignatureKey; -use hotshot_example_types::node_types::TestVersions; use hotshot_testing::{ block_builder::{BuilderTask, TestBuilderImplementation}, test_builder::BuilderChange, }; use hotshot_types::{ - data::{Leaf, ViewNumber}, - message::UpgradeLock, - traits::{ - block_contents::{vid_commitment, BlockPayload, EncodeBytes}, - node_implementation::{ConsensusTime, NodeType}, - signature_key::BuilderSignatureKey, - states::ValidatedState, - }, + data::ViewNumber, + traits::{node_implementation::NodeType, signature_key::BuilderSignatureKey}, }; -use marketplace_builder_shared::block::ParentBlockReferences; use tagged_base64::TaggedBase64; use url::Url; use vbs::version::StaticVersion; use crate::{ - builder_state::BuilderState, - service::{ - run_builder_service, BroadcastSenders, BuilderHooks, GlobalState, NoHooks, ProxyGlobalState, - }, + hooks::{BuilderHooks, NoHooks}, + service::GlobalState, }; const BUILDER_CHANNEL_CAPACITY: usize = 1024; @@ -43,7 +32,7 @@ struct TestMarketplaceBuilderConfig where Types: NodeType, { - hooks: Arc>>, + hooks: Box>, } impl Default for TestMarketplaceBuilderConfig @@ -52,7 +41,7 @@ where { fn default() -> Self { Self { - hooks: Arc::new(Box::new(NoHooks(PhantomData))), + hooks: Box::new(NoHooks(PhantomData)), } } } @@ -78,93 +67,34 @@ where /// [`BuilderTask`] it returns will be injected into consensus runtime by HotShot testing harness and /// will forward transactions from hotshot event stream to the builder. async fn start( - n_nodes: usize, + _n_nodes: usize, url: Url, config: Self::Config, _changes: HashMap, ) -> Box> { - let instance_state = Types::InstanceState::default(); - let (validated_state, _) = Types::ValidatedState::genesis(&instance_state); - let builder_key_pair = Types::BuilderSignatureKey::generated_from_seed_indexed([0; 32], 0); - let (senders, receivers) = crate::service::broadcast_channels(BUILDER_CHANNEL_CAPACITY); - - // builder api request channel - let (req_sender, req_receiver) = async_broadcast::broadcast::<_>(BUILDER_CHANNEL_CAPACITY); - - let (genesis_payload, genesis_ns_table) = - Types::BlockPayload::from_transactions([], &validated_state, &instance_state) - .await - .expect("genesis payload construction failed"); - - let builder_commitment = genesis_payload.builder_commitment(&genesis_ns_table); - - let vid_commitment = { - let payload_bytes = genesis_payload.encode(); - vid_commitment(&payload_bytes, n_nodes) - }; - - // create the global state - let global_state: GlobalState = GlobalState::::new( - req_sender, - senders.transactions.clone(), - vid_commitment, - Types::View::genesis(), - ); - - let global_state = Arc::new(RwLock::new(global_state)); - - let leaf = Leaf::genesis(&validated_state, &instance_state).await; - - let builder_state = BuilderState::::new( - ParentBlockReferences { - view_number: Types::View::genesis(), - vid_commitment, - leaf_commit: leaf - .commit(&UpgradeLock::::new()) - .await, - builder_commitment, - }, - &receivers, - req_receiver, - Vec::new(), /* tx_queue */ - Arc::clone(&global_state), - Duration::from_millis(1), - 10, - Arc::new(instance_state), + let service = GlobalState::new( + builder_key_pair, + Duration::from_millis(500), + Duration::from_millis(10), Duration::from_secs(60), - Arc::new(validated_state), + BUILDER_CHANNEL_CAPACITY, + 1, // Arbitrary base fee + config.hooks, ); - builder_state.event_loop(); - - let hooks = Arc::new(NoHooks(PhantomData)); - // create the proxy global state it will server the builder apis - let app = ProxyGlobalState::new( - global_state.clone(), - Arc::clone(&hooks), - builder_key_pair, - Duration::from_millis(500), - ) - .into_app() - .expect("Failed to create builder tide-disco app"); + let app = service + .clone() + .into_app() + .expect("Failed to create builder tide-disco app"); let url_clone = url.clone(); - async_spawn(async move { - tracing::error!("Starting builder app on {url_clone}"); - if let Err(e) = app.serve(url_clone, StaticVersion::<0, 1> {}).await { - tracing::error!(?e, "Builder API App exited with error"); - } else { - tracing::error!("Builder API App exited"); - } - }); - Box::new(MarketplaceBuilderTask { - hooks: config.hooks, - senders, - }) + async_spawn(app.serve(url_clone, StaticVersion::<0, 1> {})); + + Box::new(MarketplaceBuilderTask { service }) } } @@ -173,13 +103,16 @@ struct MarketplaceBuilderTask where Types: NodeType, { - hooks: Arc>>, - senders: BroadcastSenders, + service: Arc>>>, } impl BuilderTask for MarketplaceBuilderTask where Types: NodeType, + for<'a> <::PureAssembledSignatureType as TryFrom< + &'a TaggedBase64, + >>::Error: Display, + for<'a> >::Error: Display, { fn start( self: Box, @@ -190,7 +123,7 @@ where + 'static, >, ) { - async_spawn(run_builder_service(self.hooks, self.senders, stream)); + self.service.start_event_loop(stream); } } @@ -297,7 +230,7 @@ mod tests { Metadata: { TestDescription { validate_transactions : hotshot_testing::test_builder::nonempty_block_threshold((90,100)), - txn_description : hotshot_testing::txn_task::TxnTaskDescription::RoundRobinTimeBased(Duration::from_millis(50)), + txn_description : hotshot_testing::txn_task::TxnTaskDescription::RoundRobinTimeBased(Duration::from_millis(10)), completion_task_description : CompletionTaskDescription::TimeBasedCompletionTaskBuilder( TimeBasedCompletionTaskDescription { duration: Duration::from_secs(120), diff --git a/crates/marketplace/src/testing/mod.rs b/crates/marketplace/src/testing/mod.rs index a7dd820b..df53fe17 100644 --- a/crates/marketplace/src/testing/mod.rs +++ b/crates/marketplace/src/testing/mod.rs @@ -1,128 +1,47 @@ use std::marker::PhantomData; -use crate::{ - builder_state::{ - BuilderState, DaProposalMessage, MessageType, QuorumProposalMessage, RequestMessage, - ResponseMessage, - }, - service::BroadcastSenders, - utils::LegacyCommit, -}; -use async_broadcast::broadcast; -use async_compatibility_layer::channel::{unbounded, UnboundedReceiver}; +use committable::Committable; use hotshot::{ traits::BlockPayload, - types::{BLSPubKey, SignatureKey}, + types::{BLSPubKey, EventType, SignatureKey}, +}; +use hotshot_example_types::{ + block_types::{TestBlockHeader, TestBlockPayload, TestMetadata, TestTransaction}, + node_types::{TestTypes, TestVersions}, + state_types::{TestInstanceState, TestValidatedState}, }; use hotshot_types::{ - data::{Leaf, QuorumProposal, ViewNumber}, + data::{DaProposal, Leaf, QuorumProposal, ViewNumber}, message::Proposal, simple_certificate::{QuorumCertificate, SimpleCertificate, SuccessThreshold}, simple_vote::QuorumData, traits::{block_contents::vid_commitment, node_implementation::ConsensusTime}, - utils::BuilderCommitment, }; - -use hotshot_example_types::{ - block_types::{TestBlockHeader, TestBlockPayload, TestMetadata, TestTransaction}, - node_types::{TestTypes, TestVersions}, - state_types::{TestInstanceState, TestValidatedState}, +use marketplace_builder_shared::{ + block::BuilderStateId, testing::constants::TEST_NUM_NODES_IN_VID_COMPUTATION, }; -use marketplace_builder_shared::block::{BuilderStateId, ParentBlockReferences}; +use sha2::{Digest, Sha256}; -use crate::service::{broadcast_channels, GlobalState}; -use async_lock::RwLock; -use committable::{Commitment, CommitmentBoundsArkless, Committable}; -use std::sync::Arc; -use std::time::Duration; pub mod basic_test; pub mod integration; pub mod order_test; -pub async fn create_builder_state( - channel_capacity: usize, - num_storage_nodes: usize, -) -> ( - BroadcastSenders, - Arc>>, - BuilderState, -) { - // set up the broadcast channels - let (bootstrap_sender, bootstrap_receiver) = - broadcast::>(channel_capacity); - let (senders, receivers) = broadcast_channels(channel_capacity); - - let genesis_vid_commitment = vid_commitment(&[], num_storage_nodes); - let genesis_builder_commitment = BuilderCommitment::from_bytes([]); - let parent_block_references = ParentBlockReferences { - view_number: ViewNumber::genesis(), - vid_commitment: genesis_vid_commitment, - leaf_commit: Commitment::>::default_commitment_no_preimage(), - builder_commitment: genesis_builder_commitment, - }; - - // instantiate the global state - let global_state = Arc::new(RwLock::new(GlobalState::::new( - bootstrap_sender, - senders.transactions.clone(), - genesis_vid_commitment, - ViewNumber::genesis(), - ))); - - // instantiate the bootstrap builder state - let builder_state = BuilderState::::new( - parent_block_references, - &receivers, - bootstrap_receiver, - Vec::new(), - Arc::clone(&global_state), - Duration::from_millis(10), // max time to wait for non-zero txn block - 0, // base fee - Arc::new(TestInstanceState::default()), - Duration::from_secs(3600), // duration for txn garbage collection - Arc::new(TestValidatedState::default()), - ); - - (senders, global_state, builder_state) -} - -/// set up the broadcast channels and instatiate the global state with fixed channel capacity and num nodes -pub async fn start_builder_state( - channel_capacity: usize, - num_storage_nodes: usize, -) -> ( - BroadcastSenders, - Arc>>, -) { - let (senders, global_state, builder_state) = - create_builder_state(channel_capacity, num_storage_nodes).await; - - // start the event loop - builder_state.event_loop(); - - (senders, global_state) -} - -/// get transactions submitted in previous rounds, [] for genesis -/// and simulate the block built from those -pub async fn calc_proposal_msg( - num_storage_nodes: usize, +pub async fn calc_proposal_events( round: usize, prev_quorum_proposal: Option>, transactions: Vec, ) -> ( QuorumProposal, - QuorumProposalMessage, - Arc>, + Vec>, BuilderStateId, ) { // get transactions submitted in previous rounds, [] for genesis // and simulate the block built from those let num_transactions = transactions.len() as u64; - let txn_commitments = transactions.iter().map(Committable::commit).collect(); let encoded_transactions = TestTransaction::encode(&transactions); let block_payload = TestBlockPayload { transactions }; - let block_vid_commitment = vid_commitment(&encoded_transactions, num_storage_nodes); + let block_vid_commitment = + vid_commitment(&encoded_transactions, TEST_NUM_NODES_IN_VID_COMPUTATION); let metadata = TestMetadata { num_transactions }; let block_builder_commitment = >::builder_commitment( @@ -134,12 +53,24 @@ pub async fn calc_proposal_msg( let seed = [round as u8; 32]; let (pub_key, private_key) = BLSPubKey::generated_from_seed_indexed(seed, round as u64); - let da_proposal = Arc::new(DaProposalMessage { + let quorum_signature = + ::SignatureKey::sign( + &private_key, + block_vid_commitment.as_ref(), + ) + .expect("Failed to sign payload commitment while preparing Quorum proposal"); + let da_signature = + ::SignatureKey::sign( + &private_key, + Sha256::digest(&encoded_transactions).as_ref(), + ) + .expect("Failed to sign payload commitment while preparing DA proposal"); + + let da_proposal = DaProposal { + encoded_transactions: encoded_transactions.into(), + metadata, view_number: ViewNumber::new(round as u64), - txn_commitments, - sender: pub_key, - builder_commitment: block_builder_commitment.clone(), - }); + }; let block_header = TestBlockHeader { block_number: round as u64, @@ -161,7 +92,7 @@ pub async fn calc_proposal_msg( Some(prev_proposal) => { let prev_justify_qc = &prev_proposal.justify_qc; let quorum_data = QuorumData:: { - leaf_commit: Leaf::from_quorum_proposal(prev_proposal).legacy_commit(), + leaf_commit: Committable::commit(&Leaf::from_quorum_proposal(prev_proposal)), }; // form a justify qc @@ -185,48 +116,32 @@ pub async fn calc_proposal_msg( proposal_certificate: None, }; - let quorum_signature = - ::SignatureKey::sign( - &private_key, - block_vid_commitment.as_ref(), - ) - .expect("Failed to sign payload commitment while preparing Quorum proposal"); - - let quorum_proposal_msg = QuorumProposalMessage:: { - proposal: Arc::new(Proposal { + let quorum_proposal_event = EventType::QuorumProposal { + proposal: Proposal { data: quorum_proposal.clone(), signature: quorum_signature, _pd: PhantomData, - }), + }, sender: pub_key, }; + + let da_proposal_event = EventType::DaProposal { + proposal: Proposal { + data: da_proposal, + signature: da_signature, + _pd: PhantomData, + }, + sender: pub_key, + }; + let builder_state_id = BuilderStateId { parent_commitment: block_vid_commitment, parent_view: ViewNumber::new(round as u64), }; + ( quorum_proposal, - quorum_proposal_msg, - da_proposal, + vec![quorum_proposal_event, da_proposal_event], builder_state_id, ) } - -/// get request message -/// it contains receiver, builder state id ( which helps looking up builder state in global state) and request message in view number and response channel -async fn get_req_msg( - round: u64, - builder_state_id: BuilderStateId, -) -> ( - UnboundedReceiver>, - BuilderStateId, - MessageType, -) { - let (response_sender, response_receiver) = unbounded(); - let request_message = MessageType::::RequestMessage(RequestMessage { - requested_view_number: ViewNumber::new(round), - response_channel: response_sender, - }); - - (response_receiver, builder_state_id, request_message) -} diff --git a/crates/marketplace/src/testing/order_test.rs b/crates/marketplace/src/testing/order_test.rs index 6e5fd520..8f9b6f16 100644 --- a/crates/marketplace/src/testing/order_test.rs +++ b/crates/marketplace/src/testing/order_test.rs @@ -1,46 +1,33 @@ +use async_broadcast::broadcast; use hotshot_builder_api::v0_3::data_source::{AcceptsTxnSubmits, BuilderDataSource}; use hotshot_types::{ bundle::Bundle, - data::QuorumProposal, - traits::node_implementation::{ConsensusTime, NodeType}, + data::{QuorumProposal, ViewNumber}, + traits::node_implementation::ConsensusTime, +}; +use marketplace_builder_shared::{ + block::BuilderStateId, + testing::constants::{ + TEST_API_TIMEOUT, TEST_BASE_FEE, TEST_INCLUDED_TX_GC_PERIOD, + TEST_MAXIMIZE_TX_CAPTURE_TIMEOUT, + }, }; -use marketplace_builder_shared::block::BuilderStateId; use crate::{ - builder_state::MessageType, - service::{BuilderHooks, ProxyGlobalState}, + hooks::NoHooks, + service::{GlobalState, ProxyGlobalState}, + testing::calc_proposal_events, }; -use std::{fmt::Debug, sync::Arc}; +use std::{fmt::Debug, marker::PhantomData, sync::Arc}; use hotshot_example_types::block_types::TestTransaction; +use hotshot_example_types::node_types::TestTypes; -use crate::testing::TestTypes; -use crate::testing::{calc_proposal_msg, start_builder_state}; use hotshot::{ rand::{self, seq::SliceRandom, thread_rng}, types::{BLSPubKey, Event, SignatureKey}, }; -use std::time::Duration; - -/// [`NoOpHooks`] is a struct placeholder that is used to implement the -/// [`BuilderHooks`] trait for the [`TestTypes`] `NodeType` in a way that doesn't -/// do anything. This is a convenience for creating [`ProxyGlobalState`] objects -struct NoOpHooks; - -#[async_trait::async_trait] -impl BuilderHooks for NoOpHooks { - #[inline(always)] - async fn process_transactions( - &self, - transactions: Vec<::Transaction>, - ) -> Vec<::Transaction> { - transactions - } - - #[inline(always)] - async fn handle_hotshot_event(&self, _event: &Event) {} -} /// [`RoundTransactionBehavior`] is an enum that is used to represent different /// behaviors that we may want to simulate during a round. This applies to @@ -155,17 +142,20 @@ async fn test_builder_order() { const NUM_TXNS_PER_ROUND: usize = 5; /// Capacity of broadcast channels const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; - /// Number of nodes on DA committee - const NUM_STORAGE_NODES: usize = 4; - let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; - - let proxy_global_state = ProxyGlobalState::new( - global_state.clone(), - Arc::new(NoOpHooks), + let global_state = GlobalState::new( BLSPubKey::generated_from_seed_indexed([0; 32], 0), - Duration::from_secs(1), + TEST_API_TIMEOUT, + TEST_MAXIMIZE_TX_CAPTURE_TIMEOUT, + TEST_INCLUDED_TX_GC_PERIOD, + CHANNEL_CAPACITY, + TEST_BASE_FEE, + NoHooks(PhantomData), ); + let proxy_global_state = ProxyGlobalState(Arc::clone(&global_state)); + + let (event_stream_sender, event_stream) = broadcast(1024); + global_state.start_event_loop(event_stream); // Transactions to send let all_transactions = (0..NUM_ROUNDS) @@ -234,23 +224,20 @@ async fn test_builder_order() { // get transactions submitted in previous rounds, [] for genesis // and simulate the block built from those let transactions = prev_proposed_transactions.take().unwrap_or_default(); - let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions) - .await; + let (quorum_proposal, events, builder_state_id) = + calc_proposal_events(round, prev_quorum_proposal, transactions).await; prev_quorum_proposal = Some(quorum_proposal.clone()); - // send quorum and DA proposals for this round - senders - .da_proposal - .broadcast(MessageType::DaProposalMessage(da_proposal_msg)) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(MessageType::QuorumProposalMessage(quorum_proposal_msg)) - .await - .unwrap(); + for evt in events { + event_stream_sender + .broadcast(Event { + view_number: ViewNumber::new(round as u64), + event: evt, + }) + .await + .unwrap(); + } builder_state_id }; @@ -297,8 +284,6 @@ async fn test_builder_order_chain_fork() { const NUM_TXNS_PER_ROUND: usize = 5; // Capacity of broadcast channels const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; - // Number of nodes on DA committee - const NUM_STORAGE_NODES: usize = 4; // the round we want to skip all the transactions for the fork chain // round 0 is pre-fork @@ -317,13 +302,19 @@ async fn test_builder_order_chain_fork() { RoundTransactionBehavior::NoAdjust }; - let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; - let proxy_global_state = ProxyGlobalState::new( - global_state.clone(), - Arc::new(NoOpHooks), + let global_state = GlobalState::new( BLSPubKey::generated_from_seed_indexed([0; 32], 0), - Duration::from_secs(1), + TEST_API_TIMEOUT, + TEST_MAXIMIZE_TX_CAPTURE_TIMEOUT, + TEST_INCLUDED_TX_GC_PERIOD, + CHANNEL_CAPACITY, + TEST_BASE_FEE, + NoHooks(PhantomData), ); + let proxy_global_state = ProxyGlobalState(Arc::clone(&global_state)); + + let (event_stream_sender, event_stream) = broadcast(1024); + global_state.start_event_loop(event_stream); // Transactions to send let all_transactions = (0..NUM_ROUNDS) @@ -363,28 +354,21 @@ async fn test_builder_order_chain_fork() { let transactions = prev_proposed_transactions_branch_1 .clone() .unwrap_or_default(); - let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg( - NUM_STORAGE_NODES, - round, - prev_quorum_proposal_branch_1, - transactions, - ) - .await; + let (quorum_proposal, events, builder_state_id) = + calc_proposal_events(round, prev_quorum_proposal_branch_1, transactions).await; prev_quorum_proposal_branch_1 = Some(quorum_proposal.clone()); // send quorum and DA proposals for this round - senders - .da_proposal - .broadcast(MessageType::DaProposalMessage(da_proposal_msg)) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(MessageType::QuorumProposalMessage(quorum_proposal_msg)) - .await - .unwrap(); + for evt in events { + event_stream_sender + .broadcast(Event { + view_number: ViewNumber::new(round as u64), + event: evt, + }) + .await + .unwrap(); + } builder_state_id }; @@ -400,30 +384,24 @@ async fn test_builder_order_chain_fork() { let transactions = prev_proposed_transactions_branch_2 .clone() .unwrap_or_default(); - let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg( - NUM_STORAGE_NODES, - round, - prev_quorum_proposal_branch_2, - transactions, - ) - .await; + + let (quorum_proposal, events, builder_state_id) = + calc_proposal_events(round, prev_quorum_proposal_branch_2, transactions).await; prev_quorum_proposal_branch_2 = Some(quorum_proposal.clone()); // send quorum and DA proposals for this round // we also need to send out the message for the fork-ed chain although it's not forked yet // to prevent builders resend the transactions we've already committed - senders - .da_proposal - .broadcast(MessageType::DaProposalMessage(da_proposal_msg)) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(MessageType::QuorumProposalMessage(quorum_proposal_msg)) - .await - .unwrap(); + for evt in events { + event_stream_sender + .broadcast(Event { + view_number: ViewNumber::new(round as u64), + event: evt, + }) + .await + .unwrap(); + } builder_state_id }; @@ -503,16 +481,20 @@ async fn test_builder_order_should_fail() { const NUM_TXNS_PER_ROUND: usize = 5; // Capacity of broadcast channels const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; - // Number of nodes on DA committee - const NUM_STORAGE_NODES: usize = 4; - let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; - let proxy_global_state = ProxyGlobalState::new( - global_state, - Arc::new(NoOpHooks), + let global_state = GlobalState::new( BLSPubKey::generated_from_seed_indexed([0; 32], 0), - Duration::from_secs(1), + TEST_API_TIMEOUT, + TEST_MAXIMIZE_TX_CAPTURE_TIMEOUT, + TEST_INCLUDED_TX_GC_PERIOD, + CHANNEL_CAPACITY, + TEST_BASE_FEE, + NoHooks(PhantomData), ); + let proxy_global_state = ProxyGlobalState(Arc::clone(&global_state)); + + let (event_stream_sender, event_stream) = broadcast(1024); + global_state.start_event_loop(event_stream); // Transactions to send let all_transactions = (0..NUM_ROUNDS) @@ -557,23 +539,21 @@ async fn test_builder_order_should_fail() { // get transactions submitted in previous rounds, [] for genesis // and simulate the block built from those let transactions = prev_proposed_transactions.take().unwrap_or_default(); - let (quorum_proposal, quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions) - .await; + let (quorum_proposal, events, builder_state_id) = + calc_proposal_events(round, prev_quorum_proposal, transactions).await; prev_quorum_proposal = Some(quorum_proposal.clone()); // send quorum and DA proposals for this round - senders - .da_proposal - .broadcast(MessageType::DaProposalMessage(da_proposal_msg)) - .await - .unwrap(); - senders - .quorum_proposal - .broadcast(MessageType::QuorumProposalMessage(quorum_proposal_msg)) - .await - .unwrap(); + for evt in events { + event_stream_sender + .broadcast(Event { + view_number: ViewNumber::new(round as u64), + event: evt, + }) + .await + .unwrap(); + } builder_state_id }; diff --git a/crates/marketplace/src/utils.rs b/crates/marketplace/src/utils.rs deleted file mode 100644 index 030dc911..00000000 --- a/crates/marketplace/src/utils.rs +++ /dev/null @@ -1,13 +0,0 @@ -use hotshot_types::traits::node_implementation::NodeType; - -// TODO: Update commitment calculation with the new `commit`. -// -pub trait LegacyCommit { - fn legacy_commit(&self) -> committable::Commitment>; -} - -impl LegacyCommit for hotshot_types::data::Leaf { - fn legacy_commit(&self) -> committable::Commitment> { - as committable::Committable>::commit(self) - } -} diff --git a/crates/shared/src/testing/constants.rs b/crates/shared/src/testing/constants.rs index 0597e8e9..34a3de77 100644 --- a/crates/shared/src/testing/constants.rs +++ b/crates/shared/src/testing/constants.rs @@ -26,3 +26,15 @@ pub const TEST_CHANNEL_BUFFER_SIZE: usize = 32; /// Governs the included transaction GC period used in tests. /// This is an arbitrary default value for testing. pub const TEST_INCLUDED_TX_GC_PERIOD: Duration = Duration::from_secs(1); + +/// Governs API timeout for builder API. +/// This is an arbitrary default value for testing. +pub const TEST_API_TIMEOUT: Duration = Duration::from_secs(1); + +/// Governs timeout when waiting to fill transaction queue on incoming bundle/block request. +/// This is an arbitrary default value for testing. +pub const TEST_MAXIMIZE_TX_CAPTURE_TIMEOUT: Duration = Duration::from_millis(100); + +/// Governs fee per byte used by builders. +/// This is an arbitrary default value for testing. +pub const TEST_BASE_FEE: u64 = 1;