From 9705123d16e8e212ae460501e3771e73a244ce85 Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Wed, 30 Oct 2024 21:43:43 +0100 Subject: [PATCH] WIP --- .clippy.toml | 4 +- .github/workflows/test.yml | 1 + Cargo.lock | 51 +- Cargo.toml | 4 +- crates/legacy/Cargo.toml | 1 - crates/legacy/src/builder_state.rs | 42 +- crates/legacy/src/service.rs | 63 +- crates/legacy/src/testing/basic_test.rs | 18 +- .../legacy/src/testing/finalization_test.rs | 3 +- crates/legacy/src/testing/mod.rs | 3 +- crates/marketplace/Cargo.toml | 2 +- crates/marketplace/src/builder_state.rs | 1430 ----------------- crates/marketplace/src/hooks.rs | 43 + crates/marketplace/src/lib.rs | 8 +- crates/marketplace/src/service.rs | 926 +++-------- crates/marketplace/src/testing/integration.rs | 125 +- crates/marketplace/src/testing/mod.rs | 231 --- crates/marketplace/src/testing/order_test.rs | 8 +- crates/marketplace/src/utils.rs | 13 - crates/shared/Cargo.toml | 24 +- crates/shared/src/block.rs | 90 +- .../src/coordinator/builder_state_map.rs | 257 +++ crates/shared/src/coordinator/mod.rs | 464 ++++++ crates/shared/src/lib.rs | 2 + crates/shared/src/state.rs | 198 +++ crates/shared/src/testing/generation.rs | 2 +- crates/shared/src/testing/validation.rs | 2 +- crates/shared/src/utils.rs | 22 + 28 files changed, 1473 insertions(+), 2564 deletions(-) delete mode 100644 crates/marketplace/src/builder_state.rs create mode 100644 crates/marketplace/src/hooks.rs delete mode 100644 crates/marketplace/src/utils.rs create mode 100644 crates/shared/src/coordinator/builder_state_map.rs create mode 100644 crates/shared/src/coordinator/mod.rs create mode 100644 crates/shared/src/state.rs diff --git a/.clippy.toml b/.clippy.toml index 70b4d54e..45433876 100644 --- a/.clippy.toml +++ b/.clippy.toml @@ -1,2 +1,2 @@ -disallowed-names = [ "TYPES" ] -doc-valid-idents = [ "HotShot", ".." ] +disallowed-names = ["TYPES"] +doc-valid-idents = ["HotShot", ".."] diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e1577cfc..2bdb0519 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,6 +17,7 @@ jobs: runs-on: ubuntu-latest env: RUSTFLAGS: '--cfg async_executor_impl="async-std" --cfg async_channel_impl="async-std"' + RUSTDOCFLAGS: '--cfg async_executor_impl="async-std" --cfg async_channel_impl="async-std"' RUST_LOG: info steps: - uses: actions/checkout@v4 diff --git a/Cargo.lock b/Cargo.lock index 9f7ab3e4..b16b5863 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1484,6 +1484,16 @@ dependencies = [ "tagged-base64", ] +[[package]] +name = "concurrent-map" +version = "5.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6542c565fbcba786db59307d7840f0bf5cd9e0aba6502755337e15f0e06fd65" +dependencies = [ + "ebr", + "stack-map", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -2090,6 +2100,7 @@ dependencies = [ "proc-macro2", "quote", "syn 2.0.82", + "unicode-xid", ] [[package]] @@ -2198,6 +2209,15 @@ name = "dyn-clone" version = "1.0.17" source = "git+https://github.com/dtolnay/dyn-clone?tag=1.0.17#51bf8816be5a73e38b59fd4d9dda2bc18e9c2429" +[[package]] +name = "ebr" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b1ea3b18359d566f360eaf811a2d69bc6c8eb6faaeecc8839975633860a076e" +dependencies = [ + "shared-local-state", +] + [[package]] name = "ed25519" version = "2.2.3" @@ -3082,7 +3102,6 @@ dependencies = [ "bincode", "clap", "committable", - "derivative", "futures", "hex", "hotshot", @@ -4730,7 +4749,7 @@ dependencies = [ "async-trait", "clap", "committable", - "derivative", + "derive_more 1.0.0", "either", "futures", "hex", @@ -4767,11 +4786,14 @@ dependencies = [ "anyhow", "async-broadcast", "async-compatibility-layer", + "async-lock 2.8.0", "async-std", "async-trait", "bincode", "chrono", "committable", + "concurrent-map", + "derive_more 1.0.0", "either", "futures", "hex", @@ -4782,13 +4804,17 @@ dependencies = [ "hotshot-task-impls", "hotshot-testing", "hotshot-types", + "jf-vid", + "nonempty-collections", "portpicker", "rand 0.8.5", "serde", + "sha2 0.10.8", "surf-disco", "tide-disco", "tokio", "tracing", + "typenum", "url", "vbs", "vec1", @@ -5150,6 +5176,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonempty-collections" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c07626f57b1cb0ee81e5193d331209751d2e18ffa3ceaa0fd6fab63db31fafd9" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -6721,6 +6753,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shared-local-state" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a50ccb2f45251772ed15abfd1e5f10a305288187b1582ab2e4295b29bbb4929" +dependencies = [ + "parking_lot", +] + [[package]] name = "shellexpand" version = "3.1.0" @@ -7112,6 +7153,12 @@ dependencies = [ "url", ] +[[package]] +name = "stack-map" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49d6d36fee60faad91e23603db2356677b58ec2429237b39d5c60c26868f37c" + [[package]] name = "standback" version = "0.2.17" diff --git a/Cargo.toml b/Cargo.toml index a073cdc9..3a320135 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,8 @@ bincode = "1.3" clap = "4.5" chrono = { version = "0.4", features = ["serde"] } committable = "0.2" -derivative = "2.2" +concurrent-map = "5.0" +derive_more = "1.0" either = "1.13" futures = "0.3" hex = "0.4.3" @@ -40,6 +41,7 @@ tide-disco = "0.9" tokio = "1" toml = "0.8" tracing = "0.1" +typenum = "1.17" url = "2.3" vbs = "0.1" vec1 = "1.12" diff --git a/crates/legacy/Cargo.toml b/crates/legacy/Cargo.toml index 1e92e640..5e6a3b4e 100644 --- a/crates/legacy/Cargo.toml +++ b/crates/legacy/Cargo.toml @@ -15,7 +15,6 @@ async-trait = { workspace = true } bincode = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } committable = { workspace = true } -derivative = { workspace = true } futures = { workspace = true } hex = { workspace = true } hotshot = { workspace = true } diff --git a/crates/legacy/src/builder_state.rs b/crates/legacy/src/builder_state.rs index 5c0df1e1..e42cd249 100644 --- a/crates/legacy/src/builder_state.rs +++ b/crates/legacy/src/builder_state.rs @@ -10,14 +10,13 @@ use hotshot_types::{ utils::BuilderCommitment, vid::{VidCommitment, VidPrecomputeData}, }; -use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; +use marketplace_builder_shared::block::{ + BlockId, BuilderStateId, ParentBlockReferences, ReceivedTransaction, +}; use committable::Commitment; -use crate::{ - service::{GlobalState, ReceivedTransaction}, - LegacyCommit, -}; +use crate::{service::GlobalState, LegacyCommit}; use async_broadcast::broadcast; use async_broadcast::Receiver as BroadcastReceiver; use async_broadcast::Sender as BroadcastSender; @@ -42,13 +41,6 @@ use std::{collections::hash_map::Entry, time::Duration}; pub type TxTimeStamp = u128; -/// Enum to hold the different sources of the transaction -#[derive(Clone, Debug, PartialEq)] -pub enum TransactionSource { - External, // txn from the external source i.e private mempool - HotShot, // txn from the HotShot network i.e public mempool -} - /// Decide Message to be put on the decide channel #[derive(Clone, Debug)] pub struct DecideMessage { @@ -669,13 +661,13 @@ impl BuilderState { >::from_bytes(encoded_txns, metadata); let txn_commitments = block_payload.transaction_commitments(metadata); - for tx in txn_commitments.iter() { - self.txns_in_queue.remove(tx); + for txn in txn_commitments.iter() { + self.txns_in_queue.remove(txn); } self.included_txns.extend(txn_commitments.iter()); self.tx_queue - .retain(|tx| self.txns_in_queue.contains(&tx.commit)); + .retain(|txn| self.txns_in_queue.contains(&txn.commit)); if !txn_commitments.is_empty() { self.allow_empty_block_until = Some(Types::View::new( @@ -739,7 +731,7 @@ impl BuilderState { .max_block_size; let transactions_to_include = self.tx_queue.iter().scan(0, |total_size, tx| { let prev_size = *total_size; - *total_size += tx.len; + *total_size += tx.min_block_size; // We will include one transaction over our target block length // if it's the first transaction in queue, otherwise we'd have a possible failure // state where a single transaction larger than target block state is stuck in @@ -747,7 +739,7 @@ impl BuilderState { if *total_size >= max_block_size && prev_size != 0 { None } else { - Some(tx.tx.clone()) + Some(tx.transaction.clone()) } }); @@ -1042,7 +1034,7 @@ impl BuilderState { txn_garbage_collect_duration: Duration, validated_state: Arc, ) -> Self { - let txns_in_queue: HashSet<_> = tx_queue.iter().map(|tx| tx.commit).collect(); + let txns_in_queue: HashSet<_> = tx_queue.iter().map(|txn| txn.commit).collect(); BuilderState { included_txns: HashSet::new(), included_txns_old: HashSet::new(), @@ -1123,16 +1115,16 @@ impl BuilderState { async fn collect_txns(&mut self, timeout_after: Instant) { while Instant::now() <= timeout_after { match self.tx_receiver.try_recv() { - Ok(tx) => { - if self.included_txns.contains(&tx.commit) - || self.included_txns_old.contains(&tx.commit) - || self.included_txns_expiring.contains(&tx.commit) - || self.txns_in_queue.contains(&tx.commit) + Ok(txn) => { + if self.included_txns.contains(&txn.commit) + || self.included_txns_old.contains(&txn.commit) + || self.included_txns_expiring.contains(&txn.commit) + || self.txns_in_queue.contains(&txn.commit) { continue; } - self.txns_in_queue.insert(tx.commit); - self.tx_queue.push_back(tx); + self.txns_in_queue.insert(txn.commit); + self.tx_queue.push_back(txn); } Err(async_broadcast::TryRecvError::Empty) | Err(async_broadcast::TryRecvError::Closed) => { diff --git a/crates/legacy/src/service.rs b/crates/legacy/src/service.rs index 9c7ad22a..fc0750a9 100644 --- a/crates/legacy/src/service.rs +++ b/crates/legacy/src/service.rs @@ -19,13 +19,14 @@ use hotshot_types::{ use lru::LruCache; use vbs::version::StaticVersionType; -use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; +use marketplace_builder_shared::block::{ + BlockId, BuilderStateId, ParentBlockReferences, ReceivedTransaction, TransactionSource, +}; use crate::builder_state::{MessageType, RequestMessage, ResponseMessage}; use crate::{ builder_state::{ - BuildBlockInfo, DaProposalMessage, DecideMessage, QuorumProposalMessage, TransactionSource, - TriggerStatus, + BuildBlockInfo, DaProposalMessage, DecideMessage, QuorumProposalMessage, TriggerStatus, }, LegacyCommit as _, }; @@ -68,22 +69,6 @@ pub struct BlockInfo { pub truncated: bool, } -/// [`ReceivedTransaction`] represents receipt information concerning a received -/// [`NodeType::Transaction`]. -#[derive(Debug)] -pub struct ReceivedTransaction { - // the transaction - pub tx: Types::Transaction, - // transaction's hash - pub commit: Commitment, - // transaction's esitmated length - pub len: u64, - // transaction's source - pub source: TransactionSource, - // received time - pub time_in: Instant, -} - /// Adjustable limits for block size ceiled by /// maximum block size allowed by the protocol #[derive(Debug, Clone)] @@ -375,7 +360,7 @@ impl GlobalState { handle_received_txns( &self.tx_sender, txns, - TransactionSource::External, + TransactionSource::Private, self.block_size_limits.max_block_size, ) .await @@ -1108,7 +1093,7 @@ pub async fn run_non_permissioned_standalone_builder_service< handle_received_txns( &tx_sender, transactions, - TransactionSource::HotShot, + TransactionSource::Public, max_block_size, ) .await; @@ -1443,18 +1428,15 @@ where // increment the offset so we can ensure we're making progress; self.offset += 1; - let tx = self.txns[offset].clone(); - let commit = tx.commit(); - // This is a rough estimate, but we don't have any other way to get real - // encoded transaction length. Luckily, this being roughly proportional - // to encoded length is enough, because we only use this value to estimate - // our limitations on computing the VID in time. - let len = tx.minimum_block_size(); + let txn = self.txns[offset].clone(); + let commit = txn.commit(); + + let min_block_size = txn.minimum_block_size(); let max_txn_len = self.max_txn_len; - if len > max_txn_len { - tracing::warn!(%commit, %len, %max_txn_len, "Transaction too big"); + if min_block_size > max_txn_len { + tracing::warn!(%commit, %min_block_size, %max_txn_len, "Transaction too big"); return Some(Err(HandleReceivedTxnsError::TransactionTooBig { - estimated_length: len, + estimated_length: min_block_size, max_txn_len: self.max_txn_len, })); } @@ -1462,11 +1444,11 @@ where let res = self .tx_sender .try_broadcast(Arc::new(ReceivedTransaction { - tx, + transaction: txn, source: self.source.clone(), commit, time_in: self.time_in, - len, + min_block_size, })) .inspect(|val| { if let Some(evicted_txn) = val { @@ -1523,15 +1505,14 @@ mod test { utils::BuilderCommitment, }; use marketplace_builder_shared::{ - block::{BlockId, BuilderStateId, ParentBlockReferences}, + block::{BlockId, BuilderStateId, ParentBlockReferences, TransactionSource}, testing::constants::{TEST_MAX_BLOCK_SIZE_INCREMENT_PERIOD, TEST_PROTOCOL_MAX_BLOCK_SIZE}, }; use sha2::{Digest, Sha256}; use crate::{ builder_state::{ - BuildBlockInfo, MessageType, RequestMessage, ResponseMessage, TransactionSource, - TriggerStatus, + BuildBlockInfo, MessageType, RequestMessage, ResponseMessage, TriggerStatus, }, service::{BlockSizeLimits, HandleReceivedTxnsError}, LegacyCommit, @@ -4296,7 +4277,7 @@ mod test { let mut handle_received_txns_iter = HandleReceivedTxns::::new( tx_sender, txns.clone(), - TransactionSource::HotShot, + TransactionSource::Public, TEST_MAX_TX_LEN, ); @@ -4344,7 +4325,7 @@ mod test { let mut handle_received_txns_iter = HandleReceivedTxns::::new( tx_sender, txns.clone(), - TransactionSource::HotShot, + TransactionSource::Public, TEST_MAX_TX_LEN, ); @@ -4402,7 +4383,7 @@ mod test { let mut handle_received_txns_iter = HandleReceivedTxns::::new( tx_sender, txns.clone(), - TransactionSource::HotShot, + TransactionSource::Public, TEST_MAX_TX_LEN, ); @@ -4447,7 +4428,7 @@ mod test { let handle_received_txns_iter = HandleReceivedTxns::::new( tx_sender, txns.clone(), - TransactionSource::HotShot, + TransactionSource::Public, TEST_MAX_TX_LEN, ); @@ -4466,7 +4447,7 @@ mod test { for tx in txns { match tx_receiver.next().await { Some(received_txn) => { - assert_eq!(received_txn.tx, tx); + assert_eq!(received_txn.transaction, tx); } _ => { panic!("Expected a TransactionMessage, but got something else"); diff --git a/crates/legacy/src/testing/basic_test.rs b/crates/legacy/src/testing/basic_test.rs index 8fdbde69..f3c33077 100644 --- a/crates/legacy/src/testing/basic_test.rs +++ b/crates/legacy/src/testing/basic_test.rs @@ -36,17 +36,15 @@ mod tests { block_types::{TestBlockHeader, TestBlockPayload, TestMetadata, TestTransaction}, state_types::{TestInstanceState, TestValidatedState}, }; - use marketplace_builder_shared::block::ParentBlockReferences; + use marketplace_builder_shared::block::{ + ParentBlockReferences, ReceivedTransaction, TransactionSource, + }; use marketplace_builder_shared::testing::constants::{ TEST_MAX_BLOCK_SIZE_INCREMENT_PERIOD, TEST_PROTOCOL_MAX_BLOCK_SIZE, }; - use crate::builder_state::{ - DaProposalMessage, DecideMessage, QuorumProposalMessage, TransactionSource, - }; - use crate::service::{ - handle_received_txns, GlobalState, ProxyGlobalState, ReceivedTransaction, - }; + use crate::builder_state::{DaProposalMessage, DecideMessage, QuorumProposalMessage}; + use crate::service::{handle_received_txns, GlobalState, ProxyGlobalState}; use crate::LegacyCommit; use async_lock::RwLock; use committable::{Commitment, CommitmentBoundsArkless, Committable}; @@ -188,14 +186,14 @@ mod tests { // Submit Transactions to the Builder { // Prepare the transaction message - let tx = TestTransaction::new(vec![round as u8]); + let txn = TestTransaction::new(vec![round as u8]); - let tx_vec = vec![tx]; + let tx_vec = vec![txn]; assert_eq!( handle_received_txns( &tx_sender, tx_vec.clone(), - TransactionSource::HotShot, + TransactionSource::Public, u64::MAX, ) .await diff --git a/crates/legacy/src/testing/finalization_test.rs b/crates/legacy/src/testing/finalization_test.rs index 71f788d6..e721bce6 100644 --- a/crates/legacy/src/testing/finalization_test.rs +++ b/crates/legacy/src/testing/finalization_test.rs @@ -2,7 +2,7 @@ use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use crate::{ builder_state::{DaProposalMessage, QuorumProposalMessage, ALLOW_EMPTY_BLOCK_PERIOD}, - service::{GlobalState, ProxyGlobalState, ReceivedTransaction}, + service::{GlobalState, ProxyGlobalState}, }; use async_broadcast::{broadcast, Sender}; use async_lock::RwLock; @@ -30,6 +30,7 @@ use hotshot_types::{ }, utils::BuilderCommitment, }; +use marketplace_builder_shared::block::ReceivedTransaction; use marketplace_builder_shared::testing::constants::{ TEST_CHANNEL_BUFFER_SIZE, TEST_NUM_CONSENSUS_RETRIES, TEST_NUM_NODES_IN_VID_COMPUTATION, }; diff --git a/crates/legacy/src/testing/mod.rs b/crates/legacy/src/testing/mod.rs index 7db43cd7..2d2e6715 100644 --- a/crates/legacy/src/testing/mod.rs +++ b/crates/legacy/src/testing/mod.rs @@ -4,7 +4,6 @@ use crate::{ builder_state::{ BuilderState, DAProposalInfo, DaProposalMessage, MessageType, QuorumProposalMessage, }, - service::ReceivedTransaction, LegacyCommit, }; use async_broadcast::broadcast; @@ -33,7 +32,7 @@ use crate::service::GlobalState; use async_lock::RwLock; use committable::{Commitment, CommitmentBoundsArkless, Committable}; use marketplace_builder_shared::{ - block::{BuilderStateId, ParentBlockReferences}, + block::{BuilderStateId, ParentBlockReferences, ReceivedTransaction}, testing::constants::{TEST_MAX_BLOCK_SIZE_INCREMENT_PERIOD, TEST_PROTOCOL_MAX_BLOCK_SIZE}, }; use std::sync::Arc; diff --git a/crates/marketplace/Cargo.toml b/crates/marketplace/Cargo.toml index 36811810..4ad563fa 100644 --- a/crates/marketplace/Cargo.toml +++ b/crates/marketplace/Cargo.toml @@ -14,7 +14,7 @@ async-std = { workspace = true, features = ["unstable", "attributes"] } async-trait = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } committable = { workspace = true } -derivative = { workspace = true } +derive_more = { workspace = true, features = ["deref", "deref_mut"] } either = { workspace = true } futures = { workspace = true } hex = { workspace = true } diff --git a/crates/marketplace/src/builder_state.rs b/crates/marketplace/src/builder_state.rs deleted file mode 100644 index f4424087..00000000 --- a/crates/marketplace/src/builder_state.rs +++ /dev/null @@ -1,1430 +0,0 @@ -use hotshot_types::{ - data::{Leaf, QuorumProposal}, - message::Proposal, - traits::{ - block_contents::{BlockHeader, BlockPayload}, - node_implementation::{ConsensusTime, NodeType}, - EncodeBytes, - }, - utils::BuilderCommitment, -}; -use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; -use marketplace_builder_shared::utils::RotatingSet; - -use committable::Commitment; - -use crate::{ - service::{BroadcastReceivers, GlobalState, ReceivedTransaction}, - utils::LegacyCommit as _, -}; -use async_broadcast::broadcast; -use async_broadcast::Receiver as BroadcastReceiver; -use async_broadcast::Sender as BroadcastSender; -use async_compatibility_layer::channel::UnboundedSender; -use async_compatibility_layer::{art::async_sleep, art::async_spawn}; -use async_lock::RwLock; -use core::panic; -use futures::StreamExt; - -use std::cmp::PartialEq; -use std::collections::{HashMap, HashSet}; -use std::fmt::Debug; -use std::sync::Arc; -use std::time::Instant; -use std::{collections::hash_map::Entry, time::Duration}; - -pub type TxTimeStamp = u128; - -/// Enum to hold the different sources of the transaction -#[derive(Clone, Debug, PartialEq)] -pub enum TransactionSource { - External, // txn from the external source i.e private mempool - HotShot, // txn from the HotShot network i.e public mempool -} - -/// Decide Message to be put on the decide channel -#[derive(Clone, Debug)] -pub struct DecideMessage { - pub latest_decide_view_number: Types::View, -} -/// DA Proposal Message to be put on the da proposal channel -#[derive(Debug, Clone, PartialEq)] -pub struct DaProposalMessage { - pub view_number: Types::View, - pub txn_commitments: Vec>, - pub sender: ::SignatureKey, - pub builder_commitment: BuilderCommitment, -} - -/// Quorum proposal message to be put on the quorum proposal channel -#[derive(Clone, Debug, PartialEq)] -pub struct QuorumProposalMessage { - pub proposal: Arc>>, - pub sender: Types::SignatureKey, -} -/// Request Message to be put on the request channel -#[derive(Clone, Debug)] -pub struct RequestMessage { - pub requested_view_number: Types::View, - pub response_channel: UnboundedSender>, -} -pub enum TriggerStatus { - Start, - Exit, -} - -/// Response Message to be put on the response channel -#[derive(Debug)] -pub struct BuildBlockInfo { - pub id: BlockId, - pub block_size: u64, - pub offered_fee: u64, - pub block_payload: Types::BlockPayload, - pub metadata: <::BlockPayload as BlockPayload>::Metadata, -} - -/// Response Message to be put on the response channel -#[derive(Debug, Clone)] -pub struct ResponseMessage { - pub builder_hash: BuilderCommitment, - pub transactions: Vec, - pub block_size: u64, - pub offered_fee: u64, -} -#[derive(Debug, Clone)] -/// Enum to hold the status out of the decide event -pub enum Status { - ShouldExit, - ShouldContinue, -} - -/// Builder State to hold the state of the builder -#[derive(Debug)] -pub struct BuilderState { - /// txns that have been included in recent blocks that have - /// been built. This is used to try and guarantee that a transaction - /// isn't duplicated. - /// Keeps a history of the last 3 proposals. - pub included_txns: RotatingSet>, - - /// txn commits currently in the `tx_queue`. This is used as a quick - /// check for whether a transaction is already in the `tx_queue` or - /// not. - /// - /// This should be kept up-to-date with the `tx_queue` as it acts as an - /// accessory to the `tx_queue`. - pub txn_commits_in_queue: HashSet>, - - /// filtered queue of available transactions, taken from `tx_receiver` - pub tx_queue: Vec>>, - - /// `da_proposal_payload_commit` to (`da_proposal`, `node_count`) - #[allow(clippy::type_complexity)] - pub da_proposal_payload_commit_to_da_proposal: - HashMap<(BuilderCommitment, Types::View), Arc>>, - - /// `quorum_proposal_payload_commit` to `quorum_proposal` - #[allow(clippy::type_complexity)] - pub quorum_proposal_payload_commit_to_quorum_proposal: - HashMap<(BuilderCommitment, Types::View), Arc>>>, - - /// Spawned-from references to the parent block. - pub parent_block_references: ParentBlockReferences, - - // Channel Receivers for the HotShot events, Tx_receiver could also receive the external transactions - /// decide receiver - pub decide_receiver: BroadcastReceiver>, - - /// da proposal receiver - pub da_proposal_receiver: BroadcastReceiver>, - - /// quorum proposal receiver - pub quorum_proposal_receiver: BroadcastReceiver>, - - /// channel receiver for the block requests - pub req_receiver: BroadcastReceiver>, - - /// incoming stream of transactions - pub tx_receiver: BroadcastReceiver>>, - - /// global state handle, defined in the service.rs - pub global_state: Arc>>, - - /// locally spawned builder Commitements - pub builder_commitments: HashSet<(BuilderStateId, BuilderCommitment)>, - - /// timeout for maximising the txns in the block - pub maximize_txn_capture_timeout: Duration, - - /// constant fee that the builder will offer per byte of data sequenced - pub base_fee: u64, - - /// validated state that is required for a proposal to be considered valid. Needed for the - /// purposes of building a valid block payload within the sequencer. - pub validated_state: Arc, - - /// instance state to enfoce `max_block_size` - pub instance_state: Arc, -} - -/// [`best_builder_states_to_extend`] is a utility function that is used to -/// in order to determine which [`BuilderState`]s are the best fit to extend -/// from. -/// -/// This function is designed to inspect the current state of the global state -/// in order to determine which [`BuilderState`]s are the best fit to extend -/// from. We only want to use information from [`GlobalState`] as otherwise -/// we would have some insider knowledge unique to our specific [`BuilderState`] -/// rather than knowledge that is available to all [`BuilderState`]s. In fact, -/// in order to ensure this, this function lives outside of the [`BuilderState`] -/// itself. -/// -/// In an ideal circumstance the best [`BuilderState`] to extend from is going to -/// be the one that is immediately preceding the [`QuorumProposal`] that we are -/// attempting to extend from. However, if all we know is the view number of -/// the [`QuorumProposal`] that we are attempting to extend from, then we may end -/// up in a scenario where we have multiple [`BuilderState`]s that are all equally -/// valid to extend from. When this happens, we have the potential for a data -/// race. -/// -/// The primary cause of this has to due with the interface of the -/// [`ProxyGlobalState`](crate::service::ProxyGlobalState)'s API. -/// In general, we want to be able to retrieve a -/// [`BuilderState`] via the [`BuilderStateId`]. The [`BuilderStateId`] only -/// references a [`ViewNumber`](hotshot_types::data::ViewNumber) and a -/// [`VidCommitment`](hotshot_types::vid::VidCommitment). While this information -/// is available in the [`QuorumProposal`], it only helps us to rule out -/// [`BuilderState`]s that already exist. It does **NOT** help us to pick a -/// [`BuilderState`] that is the best fit to extend from. -/// -/// This is where the `justify_qc` comes in to consideration. The `justify_qc` -/// contains the previous [`ViewNumber`](hotshot_types::data::ViewNumber) that is being -/// extended from, and in addition it also contains the previous [`Commitment>`] -/// that is being built on top of. Since our [`BuilderState`]s store identifying -/// information that contains this same `leaf_commit` we can compare these -/// directly to ensure that we are extending from the correct [`BuilderState`]. -/// -/// This function determines the best [`BuilderState`] in the following steps: -/// -/// 1. If we have a [`BuilderState`] that is already spawned for the current -/// [`QuorumProposal`], then we should should return no states, as one already -/// exists. This will prevent us from attempting to spawn duplicate -/// [`BuilderState`]s. -/// 2. Attempt to find all [`BuilderState`]s that are recorded within -/// [`GlobalState`] that have matching view number and leaf commitments. There -/// *should* only be one of these. But all would be valid extension points. -/// 3. If we can't find any [`BuilderState`]s that match the view number -/// and leaf commitment, then we should return for the maximum stored view -/// number that is smaller than the current [`QuorumProposal`]. -/// 4. If there is is only one [`BuilderState`] stored in the [`GlobalState`], then -/// we should return that [`BuilderState`] as the best fit. -/// 5. If none of the other criteria match, we return an empty result as it is -/// unclear what to do in this case. -/// -/// > Note: Any time this function returns more than a single entry in its -/// > [HashSet] result, there is a potential for a race condition. This is -/// > because there are multiple [BuilderState]s that are equally valid to -/// > extend from. This race could be avoided by just picking one of the -/// > entries in the resulting [HashSet], but this is not done here in order -/// > to allow us to highlight the possibility of the race. -async fn best_builder_states_to_extend( - quorum_proposal: Arc>>, - global_state: Arc>>, -) -> HashSet> { - let current_view_number = quorum_proposal.data.view_number; - let current_commitment = quorum_proposal.data.block_header.payload_commitment(); - let current_builder_state_id = BuilderStateId:: { - parent_commitment: current_commitment, - parent_view: current_view_number, - }; - - let global_state_read_lock = global_state.read_arc().await; - - // The first step is to check if we already have a spawned [BuilderState]. - // If we do, then we should indicate that there is no best fit, as we - // don't want to spawn another [BuilderState]. - if global_state_read_lock - .spawned_builder_states - .contains_key(¤t_builder_state_id) - { - // We already have a spawned [BuilderState] for this proposal. - // So we should just ignore it. - return HashSet::new(); - } - - // Next we want to see if there is an immediate match for a [BuilderState] - // that we can extend from. This is the most ideal situation, as it - // implies that we are extending from the correct [BuilderState]. - // We do this by checking the `justify_qc` stored within the - // [QuorumProposal], and checking it against the current spawned - // [BuilderState]s - let justify_qc = &quorum_proposal.data.justify_qc; - let existing_states: HashSet<_> = global_state_read_lock - .spawned_builder_states - .iter() - .filter(|(builder_state_id, (leaf_commit, _))| match leaf_commit { - None => false, - Some(leaf_commit) => { - *leaf_commit == justify_qc.data.leaf_commit - && builder_state_id.parent_view == justify_qc.view_number - } - }) - .map(|(builder_state_id, _)| builder_state_id.clone()) - .collect(); - - // If we found any matching [BuilderState]s, then we should return them - // as the best fit. - if !existing_states.is_empty() { - return existing_states; - } - - // At this point, we don't have any "ideal" matches or scenarios. So we - // need to look for a suitable fall-back. The best fallback condition to - // start with is any [BuilderState] that has the maximum spawned view - // number whose value is smaller than the current [QuorumProposal]. - let maximum_stored_view_number_smaller_than_quorum_proposal = global_state_read_lock - .spawned_builder_states - .keys() - .map(|builder_state_id| *builder_state_id.parent_view) - .filter(|view_number| view_number < ¤t_view_number) - .max(); - - // If we have a maximum view number that meets our criteria, then we should - // return all [BuilderStateId]s that match this view number. - // This can lead to multiple [BuilderStateId]s being returned. - if let Some(maximum_stored_view_number_smaller_than_quorum_proposal) = - maximum_stored_view_number_smaller_than_quorum_proposal - { - // If we are the maximum stored view number smaller than the quorum - // proposal's view number, then we are the best fit. - let mut result = HashSet::new(); - for builder_state_id in - global_state_read_lock - .spawned_builder_states - .keys() - .filter(|builder_state_id| { - builder_state_id.parent_view.u64() - == maximum_stored_view_number_smaller_than_quorum_proposal - }) - { - result.insert(builder_state_id.clone()); - } - return result; - } - - // This is our last ditch effort to continue making progress. If there is - // only one [BuilderState] active, then we should return that as the best - // fit, as it will be the only way we can continue making progress with - // the builder. - if global_state_read_lock.spawned_builder_states.len() == 1 { - let mut result = HashSet::new(); - for builder_state_id in global_state_read_lock.spawned_builder_states.keys() { - result.insert(builder_state_id.clone()); - } - return result; - } - - // This implies that there are only larger [BuilderState]s active than - // the one we are. This is weird, it implies that some sort of time - // travel has occurred view-wise. It is unclear what to do in this - // situation. - - HashSet::new() -} - -impl BuilderState { - /// Utility method that attempts to determine whether we are among - /// the best [`BuilderState`]s to extend from. - async fn am_i_the_best_builder_state_to_extend( - &self, - quorum_proposal: Arc>>, - ) -> bool { - let best_builder_states_to_extend = - best_builder_states_to_extend(quorum_proposal.clone(), self.global_state.clone()).await; - - tracing::debug!( - "{}@{} thinks these are the best builder states to extend from: {:?} for proposal {}@{}", - self.parent_block_references.vid_commitment, - self.parent_block_references.view_number.u64(), - best_builder_states_to_extend - .iter() - .map(|builder_state_id| format!( - "{}@{}", - builder_state_id.parent_commitment, - builder_state_id.parent_view.u64() - )) - .collect::>(), - quorum_proposal.data.block_header.payload_commitment(), - quorum_proposal.data.view_number.u64(), - ); - - // We are a best fit if we are contained within the returned set of - // best [BuilderState]s to extend from. - best_builder_states_to_extend.contains(&BuilderStateId { - parent_commitment: self.parent_block_references.vid_commitment, - parent_view: self.parent_block_references.view_number, - }) - } - - /// This method is used to handle incoming DA proposal messages - /// from an incoming HotShot [Event](hotshot_types::event::Event). A DA Proposal is - /// a proposal that is meant to be voted on by consensus nodes in order to - /// determine which transactions should be included for this view. - /// - /// A DA Proposal in conjunction with a Quorum Proposal is an indicator - /// that a new Block / Leaf is being proposed for the HotShot network. So - /// we need to be able to propose new Bundles on top of these proposals. - /// - /// In order to do so we must first wait until we have both a DA Proposal - /// and a Quorum Proposal. If we do not, then we can just record the - /// proposal we have and wait for the other to arrive. - /// - /// If we do have a matching Quorum Proposal, then we can proceed to make - /// a decision about extending the current [BuilderState] via - /// [BuilderState::spawn_clone_that_extends_self]. - /// - /// > Note: In the case of `process_da_proposal` if we don't have a corresponding - /// > Quorum Proposal, then we will have to wait for `process_quorum_proposal` - /// > to be called with the matching Quorum Proposal. Until that point we - /// > exit knowing we have fulfilled the DA proposal portion. - #[tracing::instrument(skip_all, name = "process da proposal", - fields(builder_parent_block_references = %self.parent_block_references))] - async fn process_da_proposal(&mut self, da_msg: Arc>) { - tracing::debug!( - "Builder Received DA message for view {:?}", - da_msg.view_number - ); - - // we do not have the option to ignore DA proposals if we want to be able to handle failed view reorgs. - - // If the respective builder state exists to handle the request - tracing::debug!( - "Extracted builder commitment from the da proposal: {:?}", - da_msg.builder_commitment - ); - - let Entry::Vacant(e) = self - .da_proposal_payload_commit_to_da_proposal - .entry((da_msg.builder_commitment.clone(), da_msg.view_number)) - else { - tracing::debug!("Payload commitment already exists in the da_proposal_payload_commit_to_da_proposal hashmap, so ignoring it"); - return; - }; - - // if we have matching da and quorum proposals, we can skip storing the one, and remove - // the other from storage, and call build_block with both, to save a little space. - let Entry::Occupied(quorum_proposal) = self - .quorum_proposal_payload_commit_to_quorum_proposal - .entry((da_msg.builder_commitment.clone(), da_msg.view_number)) - else { - e.insert(da_msg); - return; - }; - - let quorum_proposal = quorum_proposal.remove(); - - if quorum_proposal.data.view_number != da_msg.view_number { - tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers"); - return; - } - - self.spawn_clone_that_extends_self(da_msg, quorum_proposal.clone()) - .await; - } - - /// This method is used to handle incoming Quorum Proposal messages - /// from an incoming HotShot [Event](hotshot_types::event::Event). A Quorum - /// Proposal is a proposal that indicates the next potential Block of the - /// chain is being proposed for the HotShot network. This proposal is - /// voted on by the consensus nodes in order to determine if whether this - /// will be the next Block of the chain or not. - /// - /// A Quorum Proposal in conjunction with a DA Proposal is an indicator - /// that a new Block / Leaf is being proposed for the HotShot network. So - /// we need to be able to propose new Bundles on top of these proposals. - /// - /// In order to do so we must first wait until we have both a DA Proposal - /// and a Quorum Proposal. If we do not, then we can just record the - /// proposal we have and wait for the other to arrive. - /// - /// If we do have a matching DA Proposal, then we can proceed to make - /// a decision about extending the current [BuilderState] via - /// [BuilderState::spawn_clone_that_extends_self]. - /// - /// > Note: In the case of `process_quorum_proposal` if we don't have a - /// > corresponding DA Proposal, then we will have to wait for - /// > `process_da_proposal` to be called with the matching DA Proposal. - /// > Until that point we exit knowing we have fulfilled the Quorum proposal - /// > portion. - //#[tracing::instrument(skip_all, name = "Process Quorum Proposal")] - #[tracing::instrument(skip_all, name = "process quorum proposal", - fields(builder_parent_block_references = %self.parent_block_references))] - async fn process_quorum_proposal(&mut self, quorum_msg: QuorumProposalMessage) { - tracing::debug!( - "Builder Received Quorum proposal message for view {:?}", - quorum_msg.proposal.data.view_number - ); - - // Two cases to handle: - // Case 1: Bootstrapping phase - // Case 2: No intended builder state exist - let quorum_proposal = &quorum_msg.proposal; - let view_number = quorum_proposal.data.view_number; - let payload_builder_commitment = quorum_proposal.data.block_header.builder_commitment(); - - tracing::debug!( - "Extracted payload builder commitment from the quorum proposal: {:?}", - payload_builder_commitment - ); - - // first check whether vid_commitment exists in the - // quorum_proposal_payload_commit_to_quorum_proposal hashmap, if yes, ignore it, otherwise - // validate it and later insert in - - let Entry::Vacant(e) = self - .quorum_proposal_payload_commit_to_quorum_proposal - .entry((payload_builder_commitment.clone(), view_number)) - else { - tracing::debug!("Payload commitment already exists in the quorum_proposal_payload_commit_to_quorum_proposal hashmap, so ignoring it"); - return; - }; - - // if we have matching da and quorum proposals, we can skip storing - // the one, and remove the other from storage, and call build_block - // with both, to save a little space. - let Entry::Occupied(da_proposal) = self - .da_proposal_payload_commit_to_da_proposal - .entry((payload_builder_commitment.clone(), view_number)) - else { - e.insert(quorum_proposal.clone()); - return; - }; - - let da_proposal_info = da_proposal.remove(); - // remove the entry from the da_proposal_payload_commit_to_da_proposal hashmap - self.da_proposal_payload_commit_to_da_proposal - .remove(&(payload_builder_commitment.clone(), view_number)); - - // also make sure we clone for the same view number - // (check incase payload commitments are same) - if da_proposal_info.view_number != view_number { - tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers"); - } - - self.spawn_clone_that_extends_self(da_proposal_info, quorum_proposal.clone()) - .await; - } - - /// A helper function that is used by both [`process_da_proposal`](Self::process_da_proposal) - /// and [`process_quorum_proposal`](Self::process_quorum_proposal) to spawn a new [`BuilderState`] - /// that extends from the current [`BuilderState`]. - /// - /// This helper function also adds additional checks in order to ensure - /// that the [`BuilderState`] that is being spawned is the best fit for the - /// [`QuorumProposal`] that is being extended from. - async fn spawn_clone_that_extends_self( - &mut self, - da_proposal_info: Arc>, - quorum_proposal: Arc>>, - ) { - if !self - .am_i_the_best_builder_state_to_extend(quorum_proposal.clone()) - .await - { - tracing::debug!( - "{} is not the best fit for forking, {}@{}, so ignoring the Quorum proposal, and leaving it to another BuilderState", - self.parent_block_references, - quorum_proposal.data.block_header.payload_commitment(), - quorum_proposal.data.view_number.u64(), - ); - return; - } - - let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity()); - - tracing::debug!( - "extending BuilderState with a clone from {} with new proposal {}@{}", - self.parent_block_references, - quorum_proposal.data.block_header.payload_commitment(), - quorum_proposal.data.view_number.u64() - ); - - // We literally fork ourselves - self.clone_with_receiver(req_receiver) - .spawn_clone(da_proposal_info, quorum_proposal.clone(), req_sender) - .await; - } - - /// processing the decide event - #[tracing::instrument(skip_all, name = "process decide event", - fields(builder_parent_block_references = %self.parent_block_references))] - async fn process_decide_event(&mut self, decide_msg: DecideMessage) -> Option { - // Exit out all the builder states if their parent_block_references.view_number is less than the latest_decide_view_number - // The only exception is that we want to keep the highest view number builder state active to ensure that - // we have a builder state to handle the incoming DA and Quorum proposals - let decide_view_number = decide_msg.latest_decide_view_number; - - let retained_view_cutoff = self - .global_state - .write_arc() - .await - .remove_handles(decide_view_number); - if self.parent_block_references.view_number < retained_view_cutoff { - tracing::info!( - "Decide@{:?}; Task@{:?} exiting; views < {:?} being reclaimed", - decide_view_number.u64(), - self.parent_block_references.view_number.u64(), - retained_view_cutoff.u64(), - ); - return Some(Status::ShouldExit); - } - - tracing::info!( - "Decide@{:?}; Task@{:?} not exiting; views >= {:?} being retained", - decide_view_number.u64(), - self.parent_block_references.view_number.u64(), - retained_view_cutoff.u64(), - ); - - Some(Status::ShouldContinue) - } - - /// spawn a clone of the builder state - #[tracing::instrument(skip_all, name = "spawn_clone", - fields(builder_parent_block_references = %self.parent_block_references))] - async fn spawn_clone( - mut self, - da_proposal_info: Arc>, - quorum_proposal: Arc>>, - req_sender: BroadcastSender>, - ) { - let leaf = Leaf::from_quorum_proposal(&quorum_proposal.data); - - // We replace our parent_block_references with information from the - // quorum proposal. This is identifying the block that this specific - // instance of [BuilderState] is attempting to build for. - self.parent_block_references = ParentBlockReferences { - view_number: quorum_proposal.data.view_number, - vid_commitment: quorum_proposal.data.block_header.payload_commitment(), - leaf_commit: leaf.legacy_commit(), - builder_commitment: quorum_proposal.data.block_header.builder_commitment(), - }; - - let builder_state_id = BuilderStateId { - parent_commitment: self.parent_block_references.vid_commitment, - parent_view: self.parent_block_references.view_number, - }; - - { - // Let's ensure that we don't already have one of these BuilderStates - // running already. - - let global_state_read_lock = self.global_state.read_arc().await; - if global_state_read_lock - .spawned_builder_states - .contains_key(&builder_state_id) - { - tracing::warn!( - "Aborting spawn_clone, builder state already exists in spawned_builder_states: {:?}", - builder_state_id - ); - return; - } - } - - for tx in da_proposal_info.txn_commitments.iter() { - self.txn_commits_in_queue.remove(tx); - } - - // We add the included transactions to the included_txns set, so we can - // also filter them should they be included in a future transaction - // submission. - self.included_txns - .extend(da_proposal_info.txn_commitments.iter().cloned()); - - // We wish to keep only the transactions in the tx_queue to those that - // also exist in the txns_in_queue set. - self.tx_queue - .retain(|tx| self.txn_commits_in_queue.contains(&tx.commit)); - - // register the spawned builder state to spawned_builder_states in the - // global state We register this new child within the global_state, so - // that it can be looked up via the [BuilderStateId] in the future. - self.global_state.write_arc().await.register_builder_state( - builder_state_id, - self.parent_block_references.clone(), - req_sender, - ); - - self.event_loop(); - } - - /// A method that will return a [BuildBlockInfo] if it is - /// able to build a block. If it encounters an error building a block, then - /// it will return None. - /// - /// This first starts by collecting transactions to include in the block. It - /// will wait until it has at least one transaction to include in the block, - /// or up to the configured `maximize_txn_capture_timeout` duration elapses. - /// At which point it will attempt to build a block with the transactions it - /// has collected. - /// - /// Upon successfully building a block, a commitment for the [BuilderStateId] - /// and Block payload pair are stored, and a [BuildBlockInfo] is created - /// and returned. - #[tracing::instrument(skip_all, name = "build block", - fields(builder_parent_block_references = %self.parent_block_references))] - async fn build_block( - &mut self, - state_id: BuilderStateId, - ) -> Option> { - // collect all the transactions from the near future - let timeout_after = Instant::now() + self.maximize_txn_capture_timeout; - let sleep_interval = self.maximize_txn_capture_timeout / 10; - while Instant::now() <= timeout_after { - self.collect_txns(timeout_after).await; - - if !self.tx_queue.is_empty() // we have transactions - || Instant::now() + sleep_interval > timeout_after - // we don't have time for another iteration - { - break; - } - - async_sleep(sleep_interval).await - } - - let Ok((payload, metadata)) = - >::from_transactions( - self.tx_queue.iter().map(|tx| tx.tx.clone()), - &self.validated_state, - &self.instance_state, - ) - .await - else { - tracing::warn!("Failed to build block payload"); - return None; - }; - - let builder_hash = payload.builder_commitment(&metadata); - // count the number of txns - let txn_count = payload.num_transactions(&metadata); - - // insert the recently built block into the builder commitments - self.builder_commitments - .insert((state_id, builder_hash.clone())); - - let encoded_txns: Vec = payload.encode().to_vec(); - let block_size: u64 = encoded_txns.len() as u64; - let offered_fee: u64 = self.base_fee * block_size; - - tracing::info!( - "Builder view num {:?}, building block with {:?} txns, with builder hash {:?}", - self.parent_block_references.view_number, - txn_count, - builder_hash - ); - - Some(BuildBlockInfo { - id: BlockId { - view: self.parent_block_references.view_number, - hash: builder_hash, - }, - block_size, - offered_fee, - block_payload: payload, - metadata, - }) - } - - /// A method that is used to handle incoming - /// [`RequestMessage`]s. These [`RequestMessage`]s are looking for a bundle - /// of transactions to be included in the next block. Instead of returning - /// a value, this method's response will be provided to the [`UnboundedSender`] that - /// is included in the [`RequestMessage`]. - /// - /// At this point this particular [`BuilderState`] has already been deemed - /// as the [`BuilderState`] that should handle this request, and it is up - /// to this [`BuilderState`] to provide the response, if it is able to do - /// so. - /// - /// The response will be a [`ResponseMessage`] that contains the transactions - /// the `Builder` wants to include in the next block in addition to the - /// expected block size, offered fee, and the - /// Builder's commit block of the data being returned. - async fn process_block_request(&mut self, req: RequestMessage) { - let requested_view_number = req.requested_view_number; - // If a spawned clone is active then it will handle the request, otherwise the highest view num builder will handle - if requested_view_number != self.parent_block_references.view_number { - tracing::debug!( - "Builder {:?} Requested view number does not match the built_from_view, so ignoring it", - self.parent_block_references.view_number - ); - return; - } - - tracing::info!( - "Request handled by builder with view {}@{:?} for (view_num: {:?})", - self.parent_block_references.vid_commitment, - self.parent_block_references.view_number, - requested_view_number - ); - - let response = self - .build_block(BuilderStateId { - parent_commitment: self.parent_block_references.vid_commitment, - parent_view: requested_view_number, - }) - .await; - - let Some(response) = response else { - tracing::debug!("No response to send"); - return; - }; - - // form the response message - let response_msg = ResponseMessage { - builder_hash: response.id.hash.clone(), - block_size: response.block_size, - offered_fee: response.offered_fee, - transactions: response - .block_payload - .transactions(&response.metadata) - .collect(), - }; - - let builder_hash = response.id.hash.clone(); - self.global_state.write_arc().await.update_global_state( - BuilderStateId { - parent_commitment: self.parent_block_references.vid_commitment, - parent_view: requested_view_number, - }, - response, - response_msg.clone(), - ); - - // ... and finally, send the response - if let Err(e) = req.response_channel.send(response_msg).await { - tracing::warn!( - "Builder {:?} failed to send response to {:?} with builder hash {:?}, Err: {:?}", - self.parent_block_references.view_number, - req, - builder_hash, - e - ); - return; - } - - tracing::info!( - "Builder {:?} Sent response to the request{:?} with builder hash {:?}", - self.parent_block_references.view_number, - req, - builder_hash - ); - } - - // MARK: event loop processing for [BuilderState] - - /// Helper function used to handle incoming [`MessageType`]s, - /// specifically [`RequestMessage`]s, that are received by the - /// [`BuilderState::req_receiver`] channel. - /// - /// This method is used to process block requests. - async fn event_loop_helper_handle_request(&mut self, req: Option>) { - tracing::debug!( - "Received request msg in builder {:?}: {:?}", - self.parent_block_references.view_number, - req - ); - - let Some(req) = req else { - tracing::warn!("No more request messages to consume"); - return; - }; - - let MessageType::RequestMessage(req) = req else { - tracing::warn!("Unexpected message on requests channel: {:?}", req); - return; - }; - - tracing::debug!( - "Received request msg in builder {:?}: {:?}", - self.parent_block_references.view_number, - req - ); - - self.process_block_request(req).await; - } - - /// Helper function that is used to handle incoming [`MessageType`]s, - /// specifically [`DaProposalMessage`]s,that are received by the [`BuilderState::da_proposal_receiver`] channel. - async fn event_loop_helper_handle_da_proposal(&mut self, da: Option>) { - let Some(da) = da else { - tracing::warn!("No more da proposal messages to consume"); - return; - }; - - let MessageType::DaProposalMessage(rda_msg) = da else { - tracing::warn!("Unexpected message on da proposals channel: {:?}", da); - return; - }; - - tracing::debug!( - "Received da proposal msg in builder {:?}:\n {:?}", - self.parent_block_references, - rda_msg.view_number - ); - - self.process_da_proposal(rda_msg).await; - } - - /// Helper function that is used to handle incoming [`MessageType`]s, - /// specifically [`QuorumProposalMessage`]s, that are received by the [`BuilderState::quorum_proposal_receiver`] channel. - async fn event_loop_helper_handle_quorum_proposal( - &mut self, - quorum: Option>, - ) { - let Some(quorum) = quorum else { - tracing::warn!("No more quorum proposal messages to consume"); - return; - }; - - let MessageType::QuorumProposalMessage(quorum_proposal_message) = quorum else { - tracing::warn!( - "Unexpected message on quorum proposals channel: {:?}", - quorum - ); - return; - }; - - tracing::debug!( - "Received quorum proposal msg in builder {:?}:\n {:?} for view ", - self.parent_block_references, - quorum_proposal_message.proposal.data.view_number - ); - - self.process_quorum_proposal(quorum_proposal_message).await; - } - - /// Helper function that is used to handle incoming [`MessageType`]s, - /// specifically [`DecideMessage`]s, that are received by the [`BuilderState::decide_receiver`] channel. - /// - /// This method can trigger the exit of the [`BuilderState::event_loop`] async - /// task via the returned [`std::ops::ControlFlow`] type. If the returned - /// value is a [`std::ops::ControlFlow::Break`], then the - /// [`BuilderState::event_loop`] - /// async task should exit. - async fn event_loop_helper_handle_decide( - &mut self, - decide: Option>, - ) -> std::ops::ControlFlow<()> { - let Some(decide) = decide else { - tracing::warn!("No more decide messages to consume"); - return std::ops::ControlFlow::Continue(()); - }; - - let MessageType::DecideMessage(rdecide_msg) = decide else { - tracing::warn!("Unexpected message on decide channel: {:?}", decide); - return std::ops::ControlFlow::Continue(()); - }; - - let latest_decide_view_num = rdecide_msg.latest_decide_view_number; - tracing::debug!( - "Received decide msg view {:?} in builder {:?}", - &latest_decide_view_num, - self.parent_block_references - ); - let decide_status = self.process_decide_event(rdecide_msg).await; - - let Some(decide_status) = decide_status else { - tracing::warn!( - "decide_status was None; Continuing builder {:?}", - self.parent_block_references - ); - return std::ops::ControlFlow::Continue(()); - }; - - match decide_status { - Status::ShouldContinue => { - tracing::debug!("Continuing builder {:?}", self.parent_block_references); - std::ops::ControlFlow::Continue(()) - } - _ => { - tracing::info!( - "Exiting builder {:?} with decide view {:?}", - self.parent_block_references, - &latest_decide_view_num - ); - std::ops::ControlFlow::Break(()) - } - } - } - - /// spawns an async task that attempts to handle messages being received - /// across the [BuilderState]s various channels. - /// - /// This async task will continue to run until it receives a message that - /// indicates that it should exit. This exit message is sent via the - /// [DecideMessage] channel. - /// - /// The main body of the loop listens to four channels at once, and when - /// a message is received it will process the message with the appropriate - /// handler accordingly. - /// - /// > Note: There is potential for improvement in typing here, as each of - /// > these receivers returns the exact same type despite being separate - /// > Channels. These channels may want to convey separate types so that - /// > the contained message can pertain to its specific channel - /// > accordingly. - #[tracing::instrument(skip_all, name = "event loop", - fields(builder_parent_block_references = %self.parent_block_references))] - pub fn event_loop(mut self) { - let _builder_handle = async_spawn(async move { - loop { - tracing::debug!( - "Builder {:?} event loop", - self.parent_block_references.view_number - ); - - futures::select! { - req = self.req_receiver.next() => self.event_loop_helper_handle_request(req).await, - da = self.da_proposal_receiver.next() => self.event_loop_helper_handle_da_proposal(da).await, - quorum = self.quorum_proposal_receiver.next() => self.event_loop_helper_handle_quorum_proposal(quorum).await, - decide = self.decide_receiver.next() => if let std::ops::ControlFlow::Break(_) = self.event_loop_helper_handle_decide(decide).await { return; }, - }; - } - }); - } -} -/// Unifies the possible messages that can be received by the builder -#[derive(Debug, Clone)] -pub enum MessageType { - DecideMessage(DecideMessage), - DaProposalMessage(Arc>), - QuorumProposalMessage(QuorumProposalMessage), - RequestMessage(RequestMessage), -} - -#[allow(clippy::too_many_arguments)] -impl BuilderState { - pub fn new( - parent_block_references: ParentBlockReferences, - receivers: &BroadcastReceivers, - req_receiver: BroadcastReceiver>, - tx_queue: Vec>>, - global_state: Arc>>, - maximize_txn_capture_timeout: Duration, - base_fee: u64, - instance_state: Arc, - txn_garbage_collect_duration: Duration, - validated_state: Arc, - ) -> Self { - let txns_in_queue: HashSet<_> = tx_queue.iter().map(|tx| tx.commit).collect(); - BuilderState { - txn_commits_in_queue: txns_in_queue, - parent_block_references, - req_receiver, - tx_queue, - global_state, - maximize_txn_capture_timeout, - base_fee, - instance_state, - validated_state, - included_txns: RotatingSet::new(txn_garbage_collect_duration), - da_proposal_payload_commit_to_da_proposal: HashMap::new(), - quorum_proposal_payload_commit_to_quorum_proposal: HashMap::new(), - builder_commitments: HashSet::new(), - decide_receiver: receivers.decide.activate_cloned(), - da_proposal_receiver: receivers.da_proposal.activate_cloned(), - quorum_proposal_receiver: receivers.quorum_proposal.activate_cloned(), - tx_receiver: receivers.transactions.activate_cloned(), - } - } - pub fn clone_with_receiver(&self, req_receiver: BroadcastReceiver>) -> Self { - let mut included_txns = self.included_txns.clone(); - included_txns.rotate(); - - BuilderState { - included_txns, - txn_commits_in_queue: self.txn_commits_in_queue.clone(), - parent_block_references: self.parent_block_references.clone(), - decide_receiver: self.decide_receiver.clone(), - da_proposal_receiver: self.da_proposal_receiver.clone(), - quorum_proposal_receiver: self.quorum_proposal_receiver.clone(), - req_receiver, - da_proposal_payload_commit_to_da_proposal: HashMap::new(), - quorum_proposal_payload_commit_to_quorum_proposal: HashMap::new(), - tx_receiver: self.tx_receiver.clone(), - tx_queue: self.tx_queue.clone(), - global_state: self.global_state.clone(), - builder_commitments: self.builder_commitments.clone(), - maximize_txn_capture_timeout: self.maximize_txn_capture_timeout, - base_fee: self.base_fee, - instance_state: self.instance_state.clone(), - validated_state: self.validated_state.clone(), - } - } - - // collect outstanding transactions - async fn collect_txns(&mut self, timeout_after: Instant) { - while Instant::now() <= timeout_after { - match self.tx_receiver.try_recv() { - Ok(tx) => { - if self.included_txns.contains(&tx.commit) { - // We've included this transaction in one of our - // recent blocks, and we do not wish to include it - // again. - continue; - } - - if self.txn_commits_in_queue.contains(&tx.commit) { - // We already have this transaction in our current - // queue, so we do not want to include it again - continue; - } - - self.txn_commits_in_queue.insert(tx.commit); - self.tx_queue.push(tx); - } - - Err(async_broadcast::TryRecvError::Empty) - | Err(async_broadcast::TryRecvError::Closed) => { - // The transaction receiver is empty, or it's been closed. - // If it's closed that's a big problem and we should - // probably indicate it as such. - break; - } - - Err(async_broadcast::TryRecvError::Overflowed(lost)) => { - tracing::warn!("Missed {lost} transactions due to backlog"); - continue; - } - } - } - } -} - -#[cfg(test)] -mod test { - use std::collections::HashMap; - use std::sync::Arc; - - use async_broadcast::broadcast; - use committable::RawCommitmentBuilder; - use hotshot_example_types::block_types::TestTransaction; - use hotshot_example_types::node_types::TestTypes; - use hotshot_types::data::ViewNumber; - use hotshot_types::data::{Leaf, QuorumProposal}; - use hotshot_types::traits::node_implementation::{ConsensusTime, NodeType}; - use hotshot_types::utils::BuilderCommitment; - - use super::DaProposalMessage; - use super::MessageType; - use super::ParentBlockReferences; - use crate::testing::{calc_proposal_msg, create_builder_state}; - - /// This test the function `process_da_propsal`. - /// It checkes da_proposal_payload_commit_to_da_proposal change appropriately - /// when receiving a da proposal message. - /// This test also checks whether corresponding BuilderStateId is in global_state. - #[async_std::test] - async fn test_process_da_proposal() { - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - tracing::info!("Testing the function `process_da_proposal` in `builder_state.rs`"); - - // Number of views to simulate - const NUM_ROUNDS: usize = 5; - // Capacity of broadcast channels - const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; - // Number of nodes on DA committee - const NUM_STORAGE_NODES: usize = 4; - - // create builder_state without entering event loop - let (_senders, global_state, mut builder_state) = - create_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; - - // randomly generate a transaction - let transactions = vec![TestTransaction::new(vec![1, 2, 3]); 3]; - let (_quorum_proposal, _quorum_proposal_msg, da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions.clone()).await; - - // sub-test one - // call process_da_proposal without matching quorum proposal message - // da_proposal_payload_commit_to_da_proposal should insert the message - let mut correct_da_proposal_payload_commit_to_da_proposal: HashMap< - (BuilderCommitment, ::View), - Arc>, - > = HashMap::new(); - - builder_state - .process_da_proposal(da_proposal_msg.clone()) - .await; - correct_da_proposal_payload_commit_to_da_proposal.insert( - ( - da_proposal_msg.builder_commitment.clone(), - da_proposal_msg.view_number, - ), - da_proposal_msg, - ); - assert_eq!( - builder_state - .da_proposal_payload_commit_to_da_proposal - .clone(), - correct_da_proposal_payload_commit_to_da_proposal.clone(), - ); - // check global_state didn't change - if global_state - .read_arc() - .await - .spawned_builder_states - .contains_key(&builder_state_id) - { - panic!("global_state shouldn't have cooresponding builder_state_id without matching quorum proposal."); - } - - // sub-test two - // call process_da_proposal with the same msg again - // we should skip the process and everything should be the same - let transactions_1 = transactions.clone(); - let (_quorum_proposal_1, _quorum_proposal_msg_1, da_proposal_msg_1, builder_state_id_1) = - calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions_1).await; - builder_state - .process_da_proposal(da_proposal_msg_1.clone()) - .await; - assert_eq!( - builder_state - .da_proposal_payload_commit_to_da_proposal - .clone(), - correct_da_proposal_payload_commit_to_da_proposal.clone(), - ); - // check global_state didn't change - if global_state - .read_arc() - .await - .spawned_builder_states - .contains_key(&builder_state_id_1) - { - panic!("global_state shouldn't have cooresponding builder_state_id without matching quorum proposal."); - } - - // sub-test three - // add the matching quorum proposal message with different tx - // and call process_da_proposal with this matching da proposal message and quorum proposal message - // we should spawn_clone here - // and check whether global_state has correct BuilderStateId - let transactions_2 = vec![TestTransaction::new(vec![1, 2, 3, 4]); 2]; - let (_quorum_proposal_2, quorum_proposal_msg_2, da_proposal_msg_2, builder_state_id_2) = - calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions_2).await; - - // process quorum proposal first, so that later when process_da_proposal we can directly call `build_block` and skip storage - builder_state - .process_quorum_proposal(quorum_proposal_msg_2.clone()) - .await; - - // process da proposal message and do the check - builder_state - .process_da_proposal(da_proposal_msg_2.clone()) - .await; - assert_eq!( - builder_state - .da_proposal_payload_commit_to_da_proposal - .clone(), - correct_da_proposal_payload_commit_to_da_proposal.clone(), - ); - // check global_state has this new builder_state_id - if global_state - .read_arc() - .await - .spawned_builder_states - .contains_key(&builder_state_id_2) - { - tracing::debug!("global_state updated successfully"); - } else { - panic!("global_state should have cooresponding builder_state_id as now we have matching quorum proposal."); - } - } - - /// This test the function `process_quorum_propsal`. - /// It checkes quorum_proposal_payload_commit_to_quorum_proposal change appropriately - /// when receiving a quorum proposal message. - /// This test also checks whether corresponding BuilderStateId is in global_state. - #[async_std::test] - async fn test_process_quorum_proposal() { - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - tracing::info!("Testing the function `process_quorum_proposal` in `builder_state.rs`"); - - // Number of views to simulate - const NUM_ROUNDS: usize = 5; - // Capacity of broadcast channels - const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; - // Number of nodes on DA committee - const NUM_STORAGE_NODES: usize = 4; - - // create builder_state without entering event loop - let (_senders, global_state, mut builder_state) = - create_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; - - // randomly generate a transaction - let transactions = vec![TestTransaction::new(vec![1, 2, 3]); 3]; - let (_quorum_proposal, quorum_proposal_msg, _da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions.clone()).await; - - // sub-test one - // call process_quorum_proposal without matching da proposal message - // quorum_proposal_payload_commit_to_quorum_proposal should insert the message - let mut correct_quorum_proposal_payload_commit_to_quorum_proposal = HashMap::new(); - builder_state - .process_quorum_proposal(quorum_proposal_msg.clone()) - .await; - correct_quorum_proposal_payload_commit_to_quorum_proposal.insert( - ( - quorum_proposal_msg - .proposal - .data - .block_header - .builder_commitment - .clone(), - quorum_proposal_msg.proposal.data.view_number, - ), - quorum_proposal_msg.proposal, - ); - assert_eq!( - builder_state - .quorum_proposal_payload_commit_to_quorum_proposal - .clone(), - correct_quorum_proposal_payload_commit_to_quorum_proposal.clone() - ); - // check global_state didn't change - if global_state - .read_arc() - .await - .spawned_builder_states - .contains_key(&builder_state_id) - { - panic!("global_state shouldn't have cooresponding builder_state_id without matching quorum proposal."); - } - - // sub-test two - // add the matching da proposal message with different tx - // and call process_da_proposal with this matching quorum proposal message and quorum da message - // we should spawn_clone here - // and check whether global_state has correct BuilderStateId - let transactions_2 = vec![TestTransaction::new(vec![2, 3, 4]); 2]; - let (_quorum_proposal_2, quorum_proposal_msg_2, da_proposal_msg_2, builder_state_id_2) = - calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions_2).await; - - // process da proposal message first, so that later when process_da_proposal we can directly call `build_block` and skip storage - builder_state - .process_da_proposal(da_proposal_msg_2.clone()) - .await; - - // process quorum proposal, and do the check - builder_state - .process_quorum_proposal(quorum_proposal_msg_2.clone()) - .await; - assert_eq!( - builder_state - .quorum_proposal_payload_commit_to_quorum_proposal - .clone(), - correct_quorum_proposal_payload_commit_to_quorum_proposal.clone() - ); - // check global_state has this new builder_state_id - if global_state - .read_arc() - .await - .spawned_builder_states - .contains_key(&builder_state_id_2) - { - tracing::debug!("global_state updated successfully"); - } else { - panic!("global_state should have cooresponding builder_state_id as now we have matching da proposal."); - } - } - - /// This test the function `process_decide_event`. - /// It checkes whether we exit out correct builder states when there's a decide event coming in. - /// This test also checks whether corresponding BuilderStateId is removed in global_state. - #[async_std::test] - async fn test_process_decide_event() { - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - tracing::info!("Testing the builder core with multiple messages from the channels"); - - // Number of views to simulate - const NUM_ROUNDS: usize = 5; - // Number of transactions to submit per round - const NUM_TXNS_PER_ROUND: usize = 4; - // Capacity of broadcast channels - const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; - // Number of nodes on DA committee - const NUM_STORAGE_NODES: usize = 4; - - // create builder_state without entering event loop - let (_senders, global_state, mut builder_state) = - create_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; - - // Transactions to send - let all_transactions = (0..NUM_ROUNDS) - .map(|round| { - (0..NUM_TXNS_PER_ROUND) - .map(|tx_num| TestTransaction::new(vec![round as u8, tx_num as u8])) - .collect::>() - }) - .collect::>(); - let mut prev_quorum_proposal: Option> = None; - // register some builder states for later decide event - #[allow(clippy::needless_range_loop)] - for round in 0..NUM_ROUNDS { - let transactions = all_transactions[round].clone(); - let (quorum_proposal, _quorum_proposal_msg, _da_proposal_msg, builder_state_id) = - calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions) - .await; - prev_quorum_proposal = Some(quorum_proposal.clone()); - let (req_sender, _req_receiver) = broadcast(CHANNEL_CAPACITY); - let leaf: Leaf = Leaf::from_quorum_proposal(&quorum_proposal); - let leaf_commit = RawCommitmentBuilder::new("leaf commitment") - .u64_field("view number", leaf.view_number().u64()) - .u64_field("block number", leaf.height()) - .field("parent Leaf commitment", leaf.parent_commitment()) - .var_size_field( - "block payload commitment", - leaf.payload_commitment().as_ref(), - ) - .finalize(); - global_state.write_arc().await.register_builder_state( - builder_state_id, - ParentBlockReferences { - view_number: quorum_proposal.view_number, - vid_commitment: quorum_proposal.block_header.payload_commitment, - leaf_commit, - builder_commitment: quorum_proposal.block_header.builder_commitment, - }, - req_sender, - ); - } - - // send out a decide event in a middle round - let latest_decide_view_number = ViewNumber::new(3); - - let decide_message = MessageType::DecideMessage(crate::builder_state::DecideMessage { - latest_decide_view_number, - }); - if let MessageType::DecideMessage(practice_decide_msg) = decide_message.clone() { - builder_state - .process_decide_event(practice_decide_msg.clone()) - .await; - } else { - panic!("Not a decide_message in correct format"); - } - // check whether spawned_builder_states have correct builder_state_id and already exit-ed builder_states older than decides - let current_spawned_builder_states = - global_state.read_arc().await.spawned_builder_states.clone(); - current_spawned_builder_states - .iter() - .for_each(|(builder_state_id, _)| { - assert!(builder_state_id.parent_view >= latest_decide_view_number) - }); - } -} diff --git a/crates/marketplace/src/hooks.rs b/crates/marketplace/src/hooks.rs new file mode 100644 index 00000000..761218ff --- /dev/null +++ b/crates/marketplace/src/hooks.rs @@ -0,0 +1,43 @@ +use std::marker::PhantomData; + +use async_trait::async_trait; +use hotshot::types::Event; +use hotshot_types::traits::node_implementation::NodeType; + +#[async_trait] +pub trait BuilderHooks: Sync + Send + 'static { + #[inline(always)] + async fn process_transactions( + &self, + transactions: Vec, + ) -> Vec { + transactions + } + + #[inline(always)] + async fn handle_hotshot_event(&self, _event: &Event) {} +} + +#[async_trait] +impl BuilderHooks for Box +where + Types: NodeType, + T: BuilderHooks, +{ + #[inline(always)] + async fn process_transactions( + &self, + transactions: Vec, + ) -> Vec { + (**self).process_transactions(transactions).await + } + + #[inline(always)] + async fn handle_hotshot_event(&self, event: &Event) { + (**self).handle_hotshot_event(event).await + } +} + +pub struct NoHooks(pub PhantomData); + +impl BuilderHooks for NoHooks {} diff --git a/crates/marketplace/src/lib.rs b/crates/marketplace/src/lib.rs index ddd3365e..9bacf40d 100644 --- a/crates/marketplace/src/lib.rs +++ b/crates/marketplace/src/lib.rs @@ -11,15 +11,9 @@ // It also provides one API services external users: // 1. Serves a user's request to submit a private transaction -// providing the core services to support above API services -pub mod builder_state; - -// Core interaction with the HotShot network +pub mod hooks; pub mod service; -// utilities -pub mod utils; - // tracking the testing #[cfg(test)] pub mod testing; diff --git a/crates/marketplace/src/service.rs b/crates/marketplace/src/service.rs index 6c45cf1b..875ce4d9 100644 --- a/crates/marketplace/src/service.rs +++ b/crates/marketplace/src/service.rs @@ -1,284 +1,59 @@ -use std::{fmt::Debug, marker::PhantomData, time::Duration}; +use std::time::Duration; -use crate::{ - builder_state::{ - BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QuorumProposalMessage, - RequestMessage, ResponseMessage, TransactionSource, - }, - utils::LegacyCommit as _, +use async_compatibility_layer::art::async_sleep; +use marketplace_builder_shared::{ + block::{BuilderStateId, ReceivedTransaction, TransactionSource}, + coordinator::{BuilderStateCoordinator, BuilderStateLookup}, + state::BuilderState, }; -use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; -use anyhow::bail; pub use async_broadcast::{broadcast, RecvError, TryRecvError}; -use async_broadcast::{InactiveReceiver, Sender as BroadcastSender, TrySendError}; use async_lock::RwLock; +#[cfg(async_executor_impl = "async-std")] +use async_std::task::JoinHandle; use async_trait::async_trait; use committable::{Commitment, Committable}; -use derivative::Derivative; -use futures::stream::StreamExt; -use futures::{future::BoxFuture, Stream}; +use futures::{future::BoxFuture, stream::FuturesUnordered, Stream}; +use futures::{ + stream::{FuturesOrdered, StreamExt}, + TryStreamExt, +}; use hotshot::types::Event; use hotshot_builder_api::v0_3::{ builder::{define_api, submit_api, BuildError, Error as BuilderApiError}, data_source::{AcceptsTxnSubmits, BuilderDataSource}, }; use hotshot_types::bundle::Bundle; -use hotshot_types::traits::block_contents::BuilderFee; +use hotshot_types::traits::block_contents::{BuilderFee, Transaction}; use hotshot_types::{ - data::{DaProposal, Leaf, QuorumProposal, ViewNumber}, event::EventType, - message::Proposal, traits::{ - block_contents::BlockPayload, node_implementation::{ConsensusTime, NodeType}, signature_key::{BuilderSignatureKey, SignatureKey}, }, - utils::BuilderCommitment, vid::VidCommitment, }; -use sha2::{Digest, Sha256}; use std::collections::HashMap; -use std::num::NonZeroUsize; use std::sync::Arc; use std::{fmt::Display, time::Instant}; use tagged_base64::TaggedBase64; use tide_disco::{app::AppError, method::ReadState, App}; -use tracing::{error, instrument}; +#[cfg(async_executor_impl = "tokio")] +use tokio::task::JoinHandle; +use tracing::Level; use vbs::version::StaticVersion; pub use marketplace_builder_shared::utils::EventServiceStream; -// It holds all the necessary information for a block -#[derive(Debug)] -pub struct BlockInfo { - pub block_payload: Types::BlockPayload, - pub metadata: <::BlockPayload as BlockPayload>::Metadata, - pub offered_fee: u64, -} - -// It holds the information for the proposed block -#[derive(Debug)] -pub struct ProposedBlockId { - pub parent_commitment: VidCommitment, - pub payload_commitment: BuilderCommitment, - pub parent_view: Types::View, -} - -impl ProposedBlockId { - pub fn new( - parent_commitment: VidCommitment, - payload_commitment: BuilderCommitment, - parent_view: Types::View, - ) -> Self { - ProposedBlockId { - parent_commitment, - payload_commitment, - parent_view, - } - } -} - -#[derive(Debug, Derivative)] -#[derivative(Default(bound = ""))] -pub struct BuilderStatesInfo { - // list of all the builder states spawned for a view - pub vid_commitments: Vec, - // list of all the proposed blocks for a view - pub block_ids: Vec>, -} - -#[derive(Debug)] -pub struct ReceivedTransaction { - // the transaction - pub tx: Types::Transaction, - // its hash - pub commit: Commitment, - // its source - pub source: TransactionSource, - // received time - pub time_in: Instant, -} - -#[allow(clippy::type_complexity)] -#[derive(Debug)] -pub struct GlobalState { - // data store for the blocks - pub blocks: lru::LruCache, BlockInfo>, - - // registered builder states - pub spawned_builder_states: HashMap< - BuilderStateId, - ( - // This is provided as an Option for convenience with initialization. - // When we build the initial state, we don't necessarily want to - // have to generate a valid `ParentBlockReferences` object and register its leaf - // commitment, as doing such would require a bit of setup. Additionally it would - // result in the call signature to `GlobalState::new` changing. - // However for every subsequent BuilderState, we expect this value - // to be populated. - Option>>, - BroadcastSender>, - ), - >, - - // builder state -> last built block , it is used to respond the client - // if the req channel times out during get_available_blocks - pub builder_state_to_last_built_block: HashMap, ResponseMessage>, - - // sending a transaction from the hotshot/private mempool to the builder states - // NOTE: Currently, we don't differentiate between the transactions from the hotshot and the private mempool - pub tx_sender: BroadcastSender>>, - - // last garbage collected view number - pub last_garbage_collected_view_num: Types::View, - - // highest view running builder task - pub highest_view_num_builder_id: BuilderStateId, -} - -impl GlobalState { - #[allow(clippy::too_many_arguments)] - pub fn new( - bootstrap_sender: BroadcastSender>, - tx_sender: BroadcastSender>>, - bootstrapped_builder_state_id: VidCommitment, - bootstrapped_view_num: Types::View, - ) -> Self { - let mut spawned_builder_states = HashMap::new(); - let bootstrap_id = BuilderStateId { - parent_commitment: bootstrapped_builder_state_id, - parent_view: bootstrapped_view_num, - }; - spawned_builder_states.insert(bootstrap_id.clone(), (None, bootstrap_sender.clone())); - GlobalState { - blocks: lru::LruCache::new(NonZeroUsize::new(256).unwrap()), - spawned_builder_states, - tx_sender, - last_garbage_collected_view_num: bootstrapped_view_num, - builder_state_to_last_built_block: Default::default(), - highest_view_num_builder_id: bootstrap_id, - } - } - - pub fn register_builder_state( - &mut self, - parent_id: BuilderStateId, - parent_block_references: ParentBlockReferences, - request_sender: BroadcastSender>, - ) { - // register the builder state - self.spawned_builder_states.insert( - parent_id.clone(), - (Some(parent_block_references.leaf_commit), request_sender), - ); - - // keep track of the max view number - if parent_id.parent_view > self.highest_view_num_builder_id.parent_view { - tracing::info!("registering builder {parent_id} as highest",); - self.highest_view_num_builder_id = parent_id; - } else { - tracing::warn!( - "builder {parent_id} created; highest registered is {}", - self.highest_view_num_builder_id, - ); - } - } - - pub fn update_global_state( - &mut self, - state_id: BuilderStateId, - build_block_info: BuildBlockInfo, - response_msg: ResponseMessage, - ) { - if self.blocks.contains(&build_block_info.id) { - self.blocks.promote(&build_block_info.id) - } else { - self.blocks.push( - build_block_info.id, - BlockInfo { - block_payload: build_block_info.block_payload, - metadata: build_block_info.metadata, - offered_fee: build_block_info.offered_fee, - }, - ); - } - - // update the builder state to last built block - self.builder_state_to_last_built_block - .insert(state_id, response_msg); - } - - // remove the builder state handles based on the decide event - pub fn remove_handles(&mut self, on_decide_view: Types::View) -> Types::View { - // remove everything from the spawned builder states when view_num <= on_decide_view; - // if we don't have a highest view > decide, use highest view as cutoff. - let cutoff = std::cmp::min(self.highest_view_num_builder_id.parent_view, on_decide_view); - self.spawned_builder_states - .retain(|id, _| id.parent_view >= cutoff); - - let cutoff_u64 = cutoff.u64(); - let gc_view = if cutoff_u64 > 0 { cutoff_u64 - 1 } else { 0 }; - - self.last_garbage_collected_view_num = Types::View::new(gc_view); - - cutoff - } - - // private mempool submit txn - // Currently, we don't differentiate between the transactions from the hotshot and the private mempool - pub async fn submit_client_txns( - &self, - txns: Vec<::Transaction>, - ) -> Vec::Transaction>, BuildError>> { - handle_received_txns(&self.tx_sender, txns, TransactionSource::External).await - } - - pub fn get_channel_for_matching_builder_or_highest_view_buider( - &self, - key: &BuilderStateId, - ) -> Result<&BroadcastSender>, BuildError> { - if let Some(id_and_sender) = self.spawned_builder_states.get(key) { - tracing::info!("Got matching builder for parent {}", key); - Ok(&id_and_sender.1) - } else { - tracing::warn!( - "failed to recover builder for parent {}, using higest view num builder with {}", - key, - self.highest_view_num_builder_id, - ); - // get the sender for the highest view number builder - self.spawned_builder_states - .get(&self.highest_view_num_builder_id) - .map(|(_, sender)| sender) - .ok_or_else(|| BuildError::Error("No builder state found".to_string())) - } - } - - // check for the existence of the builder state for a view - pub fn check_builder_state_existence_for_a_view(&self, key: &Types::View) -> bool { - // iterate over the spawned builder states and check if the view number exists - self.spawned_builder_states - .iter() - .any(|(id, _)| id.parent_view == *key) - } - - pub fn should_view_handle_other_proposals( - &self, - builder_view: &Types::View, - proposal_view: &Types::View, - ) -> bool { - *builder_view == self.highest_view_num_builder_id.parent_view - && !self.check_builder_state_existence_for_a_view(proposal_view) - } -} +use crate::hooks::BuilderHooks; -pub struct ProxyGlobalState> { +pub struct GlobalState +where + Types: NodeType, + Hooks: BuilderHooks, +{ // global state - global_state: Arc>>, - - // hooks - hooks: Arc, + coordinator: Arc>, // identity keys for the builder // May be ideal place as GlobalState interacts with hotshot apis @@ -290,43 +65,64 @@ pub struct ProxyGlobalState> { // Maximum time allotted to wait for bundle before returning an error api_timeout: Duration, + + maximize_txn_capture_timeout: Duration, + + bundle_cache: RwLock, Bundle>>, + + base_fee: u64, + + hooks: Arc, } -impl ProxyGlobalState +impl GlobalState where Types: NodeType, - H: BuilderHooks, + Hooks: BuilderHooks, for<'a> <::PureAssembledSignatureType as TryFrom< &'a TaggedBase64, >>::Error: Display, for<'a> >::Error: Display, { pub fn new( - global_state: Arc>>, - hooks: Arc, builder_keys: ( Types::BuilderSignatureKey, <::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey, ), api_timeout: Duration, + maximize_txn_capture_timeout: Duration, + txn_garbage_collect_duration: Duration, + txn_channel_capacity: usize, + base_fee: u64, + hooks: Hooks, ) -> Self { - ProxyGlobalState { - hooks, - global_state, + let coordinator = + BuilderStateCoordinator::new(txn_channel_capacity, txn_garbage_collect_duration); + Self { + hooks: Arc::new(hooks), + coordinator: Arc::new(coordinator), builder_keys, api_timeout, + maximize_txn_capture_timeout, + bundle_cache: RwLock::new(HashMap::new()), + base_fee, } } /// Consumes `self` and returns a `tide_disco` [`App`] with builder and private mempool APIs registered - pub fn into_app(self) -> Result, AppError> { - let builder_api = define_api::(&Default::default())?; + pub fn into_app( + self: Arc, + ) -> Result, BuilderApiError>, AppError> { + let proxy = ProxyGlobalState(self); + let builder_api = define_api::, Types>(&Default::default())?; // TODO: Replace StaticVersion with proper constant when added in HotShot let private_mempool_api = - submit_api::>(&Default::default())?; + submit_api::, Types, StaticVersion<0, 1>>( + &Default::default(), + )?; - let mut app: App, BuilderApiError> = App::with_state(self); + let mut app: App, BuilderApiError> = App::with_state(proxy); app.register_module( hotshot_types::constants::MARKETPLACE_BUILDER_MODULE, @@ -337,22 +133,165 @@ where Ok(app) } + + pub fn start_event_loop( + &self, + event_stream: impl Stream> + Unpin + Send + 'static, + ) -> JoinHandle> { + async_compatibility_layer::art::async_spawn(Self::drive_coordinator( + self.coordinator.clone(), + self.hooks.clone(), + event_stream, + )) + } + + async fn drive_coordinator( + coordinator: Arc>, + hooks: Arc, + mut event_stream: impl Stream> + Unpin + Send + 'static, + ) -> anyhow::Result<()> { + loop { + let Some(event) = event_stream.next().await else { + anyhow::bail!("Event stream ended"); + }; + + hooks.handle_hotshot_event(&event).await; + + match event.event { + EventType::Error { error } => { + tracing::error!("Error event in HotShot: {:?}", error); + } + EventType::Transactions { transactions } => { + let transactions = hooks.process_transactions(transactions).await; + + // TODO: record results + let _ = transactions + .into_iter() + .map(|txn| { + coordinator.handle_transaction(ReceivedTransaction::new( + txn, + TransactionSource::Public, + )) + }) + .collect::>() + .collect::>() + .await; + } + EventType::Decide { leaf_chain, .. } => { + coordinator.handle_decide(leaf_chain).await; + } + EventType::DaProposal { proposal, .. } => { + coordinator.handle_da_proposal(proposal.data).await; + } + EventType::QuorumProposal { proposal, .. } => { + coordinator.handle_quorum_proposal(proposal.data).await; + } + _ => {} + } + } + } + + /// Collect transactions to include in the bundle. Will wait until we have + /// at least one transaction or up to the configured `maximize_txn_capture_timeout` duration elapses. + #[tracing::instrument(skip_all, fields(builder_parent_block_references = %state.parent_block_references))] + async fn collect_transactions( + &self, + state: &Arc>, + ) -> Option> { + // collect all the transactions from the near future + let timeout_after = Instant::now() + self.maximize_txn_capture_timeout; + let sleep_interval = self.maximize_txn_capture_timeout / 10; + while Instant::now() <= timeout_after { + let queue_populated = state.collect_txns(timeout_after).await; + + if queue_populated || Instant::now() + sleep_interval > timeout_after { + // we don't have time for another iteration + break; + } + + async_sleep(sleep_interval).await + } + + let transactions = state + .txn_queue + .read() + .await + .transactions + .iter() + .map(|txn| txn.transaction.clone()) + .collect(); + + Some(transactions) + } + + async fn assemble_bundle( + &self, + transactions: Vec, + ) -> Result, BuildError> { + let bundle_size: u64 = transactions + .iter() + .map(|txn| txn.minimum_block_size()) + .sum(); + let offered_fee = self.base_fee * bundle_size; + + let fee_signature = + ::sign_sequencing_fee_marketplace( + &self.builder_keys.1, + offered_fee, + ) + .map_err(|e| BuildError::Error(e.to_string()))?; + + let sequencing_fee: BuilderFee = BuilderFee { + fee_amount: offered_fee, + fee_account: self.builder_keys.0.clone(), + fee_signature, + }; + + let commitments = transactions + .iter() + .flat_map(|txn| <[u8; 32]>::from(txn.commit())) + .collect::>(); + + let signature = ::sign_builder_message( + &self.builder_keys.1, + &commitments, + ) + .map_err(|e| BuildError::Error(e.to_string()))?; + + Ok(Bundle { + sequencing_fee, + transactions, + signature, + }) + } } +#[derive(derive_more::Deref, derive_more::DerefMut)] +#[deref(forward)] +#[deref_mut(forward)] +pub struct ProxyGlobalState(pub Arc>) +where + Types: NodeType, + Hooks: BuilderHooks; + /* Handling Builder API responses */ #[async_trait] -impl BuilderDataSource for ProxyGlobalState +impl BuilderDataSource for ProxyGlobalState where Types: NodeType, - H: BuilderHooks, + Hooks: BuilderHooks, for<'a> <::PureAssembledSignatureType as TryFrom< &'a TaggedBase64, >>::Error: Display, for<'a> >::Error: Display, { - #[tracing::instrument(skip(self))] + #[tracing::instrument( + skip(self), + err(level = Level::INFO) + ret(level = Level::TRACE) + )] async fn bundle( &self, parent_view: u64, @@ -374,111 +313,42 @@ where return Err(BuildError::NotFound); }; - let Some(id_and_sender) = self - .global_state - .read_arc() - .await - .spawned_builder_states - .get(&state_id) - .cloned() - else { - let global_state = self.global_state.read_arc().await; - - let past_gc = parent_view <= global_state.last_garbage_collected_view_num; - // Used as an indicator that we're just bootstrapping, as they should be equal at bootstrap - // and never otherwise. - let last_gc_view = global_state.last_garbage_collected_view_num; - let highest_observed_view = global_state.highest_view_num_builder_id.parent_view; - let is_bootstrapping = last_gc_view == highest_observed_view; - - // Explicitly drop `global_state` to avoid the lock while sleeping in `else`. - drop(global_state); - - if past_gc && !is_bootstrapping { + let builder_state = match self.coordinator.lookup_builder_state(&state_id).await { + BuilderStateLookup::Found(builder_state_entry) => builder_state_entry, + BuilderStateLookup::NotFound => { + // If we couldn't find the state because it hasn't yet been created, try again + async_sleep(self.api_timeout / 10).await; + continue; + } + BuilderStateLookup::Decided => { // If we couldn't find the state because the view has already been decided, we can just return an error - tracing::warn!( - last_gc_view = ?last_gc_view, - highest_observed_view = ?highest_observed_view, - "Requested a bundle for view we already GCd as decided", - ); + tracing::warn!("Requested a bundle for view we already GCd as decided",); return Err(BuildError::Error( "Request for a bundle for a view that has already been decided.".to_owned(), )); - } else { - // If we couldn't find the state because it hasn't yet been created, try again - async_compatibility_layer::art::async_sleep(self.api_timeout / 10).await; - continue; } }; - let (response_sender, response_receiver) = - async_compatibility_layer::channel::unbounded(); + tracing::info!( + "Request handled by builder with view {}@{:?} for (view_num: {:?})", + builder_state.parent_block_references.vid_commitment, + builder_state.parent_block_references.view_number, + parent_view + ); - let request = RequestMessage { - requested_view_number: parent_view, - response_channel: response_sender, + let Some(transactions) = self.collect_transactions(&builder_state).await else { + tracing::debug!("No response to send"); + return Err(BuildError::NotFound); }; - id_and_sender - .1 - .broadcast(MessageType::RequestMessage(request)) - .await - .map_err(|err| { - tracing::warn!(%err, "Error requesting bundle"); - - BuildError::Error("Error requesting bundle".to_owned()) - })?; + let bundle = self.assemble_bundle(transactions).await?; - let response = async_compatibility_layer::art::async_timeout( - self.api_timeout.saturating_sub(start.elapsed()), - response_receiver.recv(), - ) - .await - .map_err(|err| { - tracing::warn!(%err, "Couldn't get a bundle in time"); - - BuildError::NotFound - })? - .map_err(|err| { - tracing::warn!(%err, "Channel closed while waiting for bundle"); - - BuildError::Error("Channel closed while waiting for bundle".to_owned()) - })?; - - let fee_signature = - ::sign_sequencing_fee_marketplace( - &self.builder_keys.1, - response.offered_fee, - ) - .map_err(|e| BuildError::Error(e.to_string()))?; - - let sequencing_fee: BuilderFee = BuilderFee { - fee_amount: response.offered_fee, - fee_account: self.builder_keys.0.clone(), - fee_signature, - }; - - let commitments = response - .transactions - .iter() - .flat_map(|txn| <[u8; 32]>::from(txn.commit())) - .collect::>(); - - let signature = - ::sign_builder_message( - &self.builder_keys.1, - &commitments, - ) - .map_err(|e| BuildError::Error(e.to_string()))?; - - let bundle = Bundle { - sequencing_fee, - transactions: response.transactions, - signature, - }; + self.bundle_cache + .write() + .await + .insert(state_id, bundle.clone()); tracing::info!("Serving bundle"); - tracing::trace!(?bundle); return Ok(bundle); } @@ -492,46 +362,39 @@ where } #[async_trait] -impl AcceptsTxnSubmits for ProxyGlobalState +impl AcceptsTxnSubmits for ProxyGlobalState where + Hooks: BuilderHooks, Types: NodeType, - H: BuilderHooks, { async fn submit_txns( &self, txns: Vec<::Transaction>, ) -> Result::Transaction>>, BuildError> { - tracing::debug!( - "Submitting {:?} transactions to the builder states{:?}", - txns.len(), - txns.iter().map(|txn| txn.commit()).collect::>() - ); let txns = self.hooks.process_transactions(txns).await; - let response = self - .global_state - .read_arc() + + txns.into_iter() + .map(|txn| ReceivedTransaction::new(txn, TransactionSource::Private)) + .map(|txn| async { + let commit = txn.commit; + self.coordinator + .handle_transaction(txn) + .await + .map(|_| commit) + }) + .collect::>() + .try_collect() .await - .submit_client_txns(txns) - .await; - - tracing::debug!( - "Transaction submitted to the builder states, sending response: {:?}", - response - ); - - // NOTE: ideally we want to respond with original Vec - // instead of Result not to loose any information, - // but this requires changes to builder API - response.into_iter().collect() } } + #[async_trait] -impl ReadState for ProxyGlobalState +impl ReadState for ProxyGlobalState where Types: NodeType, - H: BuilderHooks + 'static, + Hooks: BuilderHooks, { - type State = ProxyGlobalState; + type State = Self; async fn read( &self, @@ -540,306 +403,3 @@ where op(self).await } } - -pub fn broadcast_channels( - capacity: usize, -) -> (BroadcastSenders, BroadcastReceivers) { - macro_rules! pair { - ($s:ident, $r:ident) => { - let ($s, $r) = broadcast(capacity); - let $r = $r.deactivate(); - }; - } - - pair!(tx_sender, tx_receiver); - pair!(da_sender, da_receiver); - pair!(quorum_sender, quorum_proposal_receiver); - pair!(decide_sender, decide_receiver); - - ( - BroadcastSenders { - transactions: tx_sender, - da_proposal: da_sender, - quorum_proposal: quorum_sender, - decide: decide_sender, - }, - BroadcastReceivers { - transactions: tx_receiver, - da_proposal: da_receiver, - quorum_proposal: quorum_proposal_receiver, - decide: decide_receiver, - }, - ) -} - -// Receivers for HotShot events for the builder states -pub struct BroadcastReceivers { - /// For transactions, shared. - pub transactions: InactiveReceiver>>, - /// For the DA proposal. - pub da_proposal: InactiveReceiver>, - /// For the quorum proposal. - pub quorum_proposal: InactiveReceiver>, - /// For the decide. - pub decide: InactiveReceiver>, -} - -// Senders to broadcast data from HotShot to the builder states. -pub struct BroadcastSenders { - /// For transactions, shared. - pub transactions: BroadcastSender>>, - /// For the DA proposal. - pub da_proposal: BroadcastSender>, - /// For the quorum proposal. - pub quorum_proposal: BroadcastSender>, - /// For the decide. - pub decide: BroadcastSender>, -} - -#[async_trait] -pub trait BuilderHooks: Sync + Send + 'static { - #[inline(always)] - async fn process_transactions( - &self, - transactions: Vec, - ) -> Vec { - transactions - } - - #[inline(always)] - async fn handle_hotshot_event(&self, _event: &Event) {} -} - -#[async_trait] -impl BuilderHooks for Box -where - Types: NodeType, - T: BuilderHooks, -{ - #[inline(always)] - async fn process_transactions( - &self, - transactions: Vec, - ) -> Vec { - (**self).process_transactions(transactions).await - } - - #[inline(always)] - async fn handle_hotshot_event(&self, event: &Event) { - (**self).handle_hotshot_event(event).await - } -} - -pub struct NoHooks(pub PhantomData); - -impl BuilderHooks for NoHooks {} - -/// Run builder service, -/// Refer to documentation for [`ProxyGlobalState`] for more details -pub async fn run_builder_service< - Types: NodeType, - S: Stream> + Unpin, ->( - hooks: Arc>, - senders: BroadcastSenders, - hotshot_event_stream: S, -) -> Result<(), anyhow::Error> { - let mut hotshot_event_stream = std::pin::pin!(hotshot_event_stream); - loop { - let Some(event) = hotshot_event_stream.next().await else { - bail!("Event stream ended"); - }; - - hooks.handle_hotshot_event(&event).await; - - match event.event { - EventType::Error { error } => { - error!("Error event in HotShot: {:?}", error); - } - // tx event - EventType::Transactions { transactions } => { - let transactions = hooks.process_transactions(transactions).await; - - for res in handle_received_txns( - &senders.transactions, - transactions, - TransactionSource::HotShot, - ) - .await - { - if let Err(e) = res { - tracing::warn!("Failed to handle transactions; {:?}", e); - } - } - } - // decide event - EventType::Decide { - block_size: _, - leaf_chain, - qc: _, - } => { - let latest_decide_view_num = leaf_chain[0].leaf.view_number(); - handle_decide_event(&senders.decide, latest_decide_view_num).await; - } - // DA proposal event - EventType::DaProposal { proposal, sender } => { - handle_da_event(&senders.da_proposal, proposal, sender).await; - } - // Quorum proposal event - EventType::QuorumProposal { proposal, sender } => { - handle_quorum_event(&senders.quorum_proposal, Arc::new(proposal), sender).await; - } - _ => { - tracing::trace!("Unhandled event from Builder: {:?}", event.event); - } - } - } -} - -/* -Utility functions to handle the hotshot events -*/ -#[instrument(skip_all, fields(sender, da_proposal.data.view_number))] -async fn handle_da_event( - da_channel_sender: &BroadcastSender>, - da_proposal: Proposal>, - sender: ::SignatureKey, -) { - // get the encoded transactions hash - let encoded_txns_hash = Sha256::digest(&da_proposal.data.encoded_transactions); - // check if the sender is the leader and the signature is valid; if yes, broadcast the DA proposal - if !sender.validate(&da_proposal.signature, &encoded_txns_hash) { - error!("Validation Failure on DaProposal"); - return; - } - - let view_number = da_proposal.data.view_number; - tracing::debug!("Sending DA proposal to the builder states",); - - // form a block payload from the encoded transactions - let block_payload = >::from_bytes( - &da_proposal.data.encoded_transactions, - &da_proposal.data.metadata, - ); - // get the builder commitment from the block payload - let builder_commitment = block_payload.builder_commitment(&da_proposal.data.metadata); - - let txn_commitments = block_payload - .transactions(&da_proposal.data.metadata) - // TODO: - //.filter(|txn| txn.namespace_id() != namespace_id) - .map(|txn| txn.commit()) - .collect(); - - let da_msg = DaProposalMessage { - view_number, - txn_commitments, - sender, - builder_commitment, - }; - - if let Err(e) = da_channel_sender - .broadcast(MessageType::DaProposalMessage(Arc::new(da_msg))) - .await - { - tracing::warn!( - "Error {e}, failed to send DA proposal to builder states for view {:?}", - view_number - ); - } -} - -#[instrument(skip_all, fields(sender, quorum_proposal.data.view_number))] -async fn handle_quorum_event( - quorum_channel_sender: &BroadcastSender>, - quorum_proposal: Arc>>, - sender: ::SignatureKey, -) { - let leaf = Leaf::from_quorum_proposal(&quorum_proposal.data); - - // check if the sender is the leader and the signature is valid; if yes, broadcast the Quorum proposal - if !sender.validate(&quorum_proposal.signature, leaf.legacy_commit().as_ref()) { - error!("Validation Failure on QuorumProposal"); - return; - }; - - let quorum_msg = QuorumProposalMessage:: { - proposal: quorum_proposal, - sender, - }; - let view_number = quorum_msg.proposal.data.view_number; - tracing::debug!( - "Sending Quorum proposal to the builder states for view {:?}", - view_number - ); - if let Err(e) = quorum_channel_sender - .broadcast(MessageType::QuorumProposalMessage(quorum_msg)) - .await - { - tracing::warn!( - "Error {e}, failed to send Quorum proposal to builder states for view {:?}", - view_number - ); - } -} - -async fn handle_decide_event( - decide_channel_sender: &BroadcastSender>, - latest_decide_view_number: Types::View, -) { - let decide_msg: DecideMessage = DecideMessage:: { - latest_decide_view_number, - }; - tracing::debug!( - "Sending Decide event to builder states for view {:?}", - latest_decide_view_number - ); - if let Err(e) = decide_channel_sender - .broadcast(MessageType::DecideMessage(decide_msg)) - .await - { - tracing::warn!( - "Error {e}, failed to send Decide event to builder states for view {:?}", - latest_decide_view_number - ); - } -} - -pub(crate) async fn handle_received_txns( - tx_sender: &BroadcastSender>>, - txns: Vec, - source: TransactionSource, -) -> Vec::Transaction>, BuildError>> { - let mut results = Vec::with_capacity(txns.len()); - let time_in = Instant::now(); - for tx in txns.into_iter() { - let commit = tx.commit(); - let res = tx_sender - .try_broadcast(Arc::new(ReceivedTransaction { - tx, - source: source.clone(), - commit, - time_in, - })) - .inspect(|val| { - if let Some(evicted_txn) = val { - tracing::warn!( - "Overflow mode enabled, transaction {} evicted", - evicted_txn.commit - ); - } - }) - .map(|_| commit) - .inspect_err(|err| { - tracing::warn!("Failed to broadcast txn with commit {:?}: {}", commit, err); - }) - .map_err(|err| match err { - TrySendError::Full(_) => BuildError::Error("Too many transactions".to_owned()), - e => { - BuildError::Error(format!("Internal error when submitting transaction: {}", e)) - } - }); - results.push(res); - } - results -} diff --git a/crates/marketplace/src/testing/integration.rs b/crates/marketplace/src/testing/integration.rs index 14cf82fa..677d4125 100644 --- a/crates/marketplace/src/testing/integration.rs +++ b/crates/marketplace/src/testing/integration.rs @@ -4,34 +4,23 @@ use std::{collections::HashMap, fmt::Display, marker::PhantomData, sync::Arc, time::Duration}; use async_compatibility_layer::art::async_spawn; -use async_lock::RwLock; use async_trait::async_trait; use hotshot::types::SignatureKey; -use hotshot_example_types::node_types::TestVersions; use hotshot_testing::{ block_builder::{BuilderTask, TestBuilderImplementation}, test_builder::BuilderChange, }; use hotshot_types::{ - data::{Leaf, ViewNumber}, - message::UpgradeLock, - traits::{ - block_contents::{vid_commitment, BlockPayload, EncodeBytes}, - node_implementation::{ConsensusTime, NodeType}, - signature_key::BuilderSignatureKey, - states::ValidatedState, - }, + data::ViewNumber, + traits::{node_implementation::NodeType, signature_key::BuilderSignatureKey}, }; -use marketplace_builder_shared::block::ParentBlockReferences; use tagged_base64::TaggedBase64; use url::Url; use vbs::version::StaticVersion; use crate::{ - builder_state::BuilderState, - service::{ - run_builder_service, BroadcastSenders, BuilderHooks, GlobalState, NoHooks, ProxyGlobalState, - }, + hooks::{BuilderHooks, NoHooks}, + service::GlobalState, }; const BUILDER_CHANNEL_CAPACITY: usize = 1024; @@ -43,7 +32,7 @@ struct TestMarketplaceBuilderConfig where Types: NodeType, { - hooks: Arc>>, + hooks: Box>, } impl Default for TestMarketplaceBuilderConfig @@ -52,7 +41,7 @@ where { fn default() -> Self { Self { - hooks: Arc::new(Box::new(NoHooks(PhantomData))), + hooks: Box::new(NoHooks(PhantomData)), } } } @@ -78,93 +67,34 @@ where /// [`BuilderTask`] it returns will be injected into consensus runtime by HotShot testing harness and /// will forward transactions from hotshot event stream to the builder. async fn start( - n_nodes: usize, + _n_nodes: usize, url: Url, config: Self::Config, _changes: HashMap, ) -> Box> { - let instance_state = Types::InstanceState::default(); - let (validated_state, _) = Types::ValidatedState::genesis(&instance_state); - let builder_key_pair = Types::BuilderSignatureKey::generated_from_seed_indexed([0; 32], 0); - let (senders, receivers) = crate::service::broadcast_channels(BUILDER_CHANNEL_CAPACITY); - - // builder api request channel - let (req_sender, req_receiver) = async_broadcast::broadcast::<_>(BUILDER_CHANNEL_CAPACITY); - - let (genesis_payload, genesis_ns_table) = - Types::BlockPayload::from_transactions([], &validated_state, &instance_state) - .await - .expect("genesis payload construction failed"); - - let builder_commitment = genesis_payload.builder_commitment(&genesis_ns_table); - - let vid_commitment = { - let payload_bytes = genesis_payload.encode(); - vid_commitment(&payload_bytes, n_nodes) - }; - - // create the global state - let global_state: GlobalState = GlobalState::::new( - req_sender, - senders.transactions.clone(), - vid_commitment, - Types::View::genesis(), - ); - - let global_state = Arc::new(RwLock::new(global_state)); - - let leaf = Leaf::genesis(&validated_state, &instance_state).await; - - let builder_state = BuilderState::::new( - ParentBlockReferences { - view_number: Types::View::genesis(), - vid_commitment, - leaf_commit: leaf - .commit(&UpgradeLock::::new()) - .await, - builder_commitment, - }, - &receivers, - req_receiver, - Vec::new(), /* tx_queue */ - Arc::clone(&global_state), - Duration::from_millis(1), - 10, - Arc::new(instance_state), + let service = Arc::new(GlobalState::new( + builder_key_pair, + Duration::from_millis(500), + Duration::from_millis(10), Duration::from_secs(60), - Arc::new(validated_state), - ); - - builder_state.event_loop(); - - let hooks = Arc::new(NoHooks(PhantomData)); + BUILDER_CHANNEL_CAPACITY, + 1, // Arbitrary base fee + config.hooks, + )); // create the proxy global state it will server the builder apis - let app = ProxyGlobalState::new( - global_state.clone(), - Arc::clone(&hooks), - builder_key_pair, - Duration::from_millis(500), - ) - .into_app() - .expect("Failed to create builder tide-disco app"); + let app = service + .clone() + .into_app() + .expect("Failed to create builder tide-disco app"); let url_clone = url.clone(); - async_spawn(async move { - tracing::error!("Starting builder app on {url_clone}"); - if let Err(e) = app.serve(url_clone, StaticVersion::<0, 1> {}).await { - tracing::error!(?e, "Builder API App exited with error"); - } else { - tracing::error!("Builder API App exited"); - } - }); - Box::new(MarketplaceBuilderTask { - hooks: config.hooks, - senders, - }) + async_spawn(app.serve(url_clone, StaticVersion::<0, 1> {})); + + Box::new(MarketplaceBuilderTask { service }) } } @@ -173,13 +103,16 @@ struct MarketplaceBuilderTask where Types: NodeType, { - hooks: Arc>>, - senders: BroadcastSenders, + service: Arc>>>, } impl BuilderTask for MarketplaceBuilderTask where Types: NodeType, + for<'a> <::PureAssembledSignatureType as TryFrom< + &'a TaggedBase64, + >>::Error: Display, + for<'a> >::Error: Display, { fn start( self: Box, @@ -190,7 +123,7 @@ where + 'static, >, ) { - async_spawn(run_builder_service(self.hooks, self.senders, stream)); + self.service.start_event_loop(stream); } } @@ -297,7 +230,7 @@ mod tests { Metadata: { TestDescription { validate_transactions : hotshot_testing::test_builder::nonempty_block_threshold((90,100)), - txn_description : hotshot_testing::txn_task::TxnTaskDescription::RoundRobinTimeBased(Duration::from_millis(50)), + txn_description : hotshot_testing::txn_task::TxnTaskDescription::RoundRobinTimeBased(Duration::from_millis(10)), completion_task_description : CompletionTaskDescription::TimeBasedCompletionTaskBuilder( TimeBasedCompletionTaskDescription { duration: Duration::from_secs(120), diff --git a/crates/marketplace/src/testing/mod.rs b/crates/marketplace/src/testing/mod.rs index a7dd820b..5155b774 100644 --- a/crates/marketplace/src/testing/mod.rs +++ b/crates/marketplace/src/testing/mod.rs @@ -1,232 +1 @@ -use std::marker::PhantomData; - -use crate::{ - builder_state::{ - BuilderState, DaProposalMessage, MessageType, QuorumProposalMessage, RequestMessage, - ResponseMessage, - }, - service::BroadcastSenders, - utils::LegacyCommit, -}; -use async_broadcast::broadcast; -use async_compatibility_layer::channel::{unbounded, UnboundedReceiver}; -use hotshot::{ - traits::BlockPayload, - types::{BLSPubKey, SignatureKey}, -}; -use hotshot_types::{ - data::{Leaf, QuorumProposal, ViewNumber}, - message::Proposal, - simple_certificate::{QuorumCertificate, SimpleCertificate, SuccessThreshold}, - simple_vote::QuorumData, - traits::{block_contents::vid_commitment, node_implementation::ConsensusTime}, - utils::BuilderCommitment, -}; - -use hotshot_example_types::{ - block_types::{TestBlockHeader, TestBlockPayload, TestMetadata, TestTransaction}, - node_types::{TestTypes, TestVersions}, - state_types::{TestInstanceState, TestValidatedState}, -}; -use marketplace_builder_shared::block::{BuilderStateId, ParentBlockReferences}; - -use crate::service::{broadcast_channels, GlobalState}; -use async_lock::RwLock; -use committable::{Commitment, CommitmentBoundsArkless, Committable}; -use std::sync::Arc; -use std::time::Duration; -pub mod basic_test; pub mod integration; -pub mod order_test; - -pub async fn create_builder_state( - channel_capacity: usize, - num_storage_nodes: usize, -) -> ( - BroadcastSenders, - Arc>>, - BuilderState, -) { - // set up the broadcast channels - let (bootstrap_sender, bootstrap_receiver) = - broadcast::>(channel_capacity); - let (senders, receivers) = broadcast_channels(channel_capacity); - - let genesis_vid_commitment = vid_commitment(&[], num_storage_nodes); - let genesis_builder_commitment = BuilderCommitment::from_bytes([]); - let parent_block_references = ParentBlockReferences { - view_number: ViewNumber::genesis(), - vid_commitment: genesis_vid_commitment, - leaf_commit: Commitment::>::default_commitment_no_preimage(), - builder_commitment: genesis_builder_commitment, - }; - - // instantiate the global state - let global_state = Arc::new(RwLock::new(GlobalState::::new( - bootstrap_sender, - senders.transactions.clone(), - genesis_vid_commitment, - ViewNumber::genesis(), - ))); - - // instantiate the bootstrap builder state - let builder_state = BuilderState::::new( - parent_block_references, - &receivers, - bootstrap_receiver, - Vec::new(), - Arc::clone(&global_state), - Duration::from_millis(10), // max time to wait for non-zero txn block - 0, // base fee - Arc::new(TestInstanceState::default()), - Duration::from_secs(3600), // duration for txn garbage collection - Arc::new(TestValidatedState::default()), - ); - - (senders, global_state, builder_state) -} - -/// set up the broadcast channels and instatiate the global state with fixed channel capacity and num nodes -pub async fn start_builder_state( - channel_capacity: usize, - num_storage_nodes: usize, -) -> ( - BroadcastSenders, - Arc>>, -) { - let (senders, global_state, builder_state) = - create_builder_state(channel_capacity, num_storage_nodes).await; - - // start the event loop - builder_state.event_loop(); - - (senders, global_state) -} - -/// get transactions submitted in previous rounds, [] for genesis -/// and simulate the block built from those -pub async fn calc_proposal_msg( - num_storage_nodes: usize, - round: usize, - prev_quorum_proposal: Option>, - transactions: Vec, -) -> ( - QuorumProposal, - QuorumProposalMessage, - Arc>, - BuilderStateId, -) { - // get transactions submitted in previous rounds, [] for genesis - // and simulate the block built from those - let num_transactions = transactions.len() as u64; - let txn_commitments = transactions.iter().map(Committable::commit).collect(); - let encoded_transactions = TestTransaction::encode(&transactions); - let block_payload = TestBlockPayload { transactions }; - let block_vid_commitment = vid_commitment(&encoded_transactions, num_storage_nodes); - let metadata = TestMetadata { num_transactions }; - let block_builder_commitment = - >::builder_commitment( - &block_payload, - &metadata, - ); - - // generate key for leader of this round - let seed = [round as u8; 32]; - let (pub_key, private_key) = BLSPubKey::generated_from_seed_indexed(seed, round as u64); - - let da_proposal = Arc::new(DaProposalMessage { - view_number: ViewNumber::new(round as u64), - txn_commitments, - sender: pub_key, - builder_commitment: block_builder_commitment.clone(), - }); - - let block_header = TestBlockHeader { - block_number: round as u64, - payload_commitment: block_vid_commitment, - builder_commitment: block_builder_commitment, - timestamp: round as u64, - metadata, - random: 1, // arbitrary - }; - - let justify_qc = match prev_quorum_proposal.as_ref() { - None => { - QuorumCertificate::::genesis::( - &TestValidatedState::default(), - &TestInstanceState::default(), - ) - .await - } - Some(prev_proposal) => { - let prev_justify_qc = &prev_proposal.justify_qc; - let quorum_data = QuorumData:: { - leaf_commit: Leaf::from_quorum_proposal(prev_proposal).legacy_commit(), - }; - - // form a justify qc - SimpleCertificate::, SuccessThreshold>::new( - quorum_data.clone(), - quorum_data.commit(), - prev_proposal.view_number, - prev_justify_qc.signatures.clone(), - PhantomData, - ) - } - }; - - tracing::debug!("Iteration: {} justify_qc: {:?}", round, justify_qc); - - let quorum_proposal = QuorumProposal:: { - block_header, - view_number: ViewNumber::new(round as u64), - justify_qc: justify_qc.clone(), - upgrade_certificate: None, - proposal_certificate: None, - }; - - let quorum_signature = - ::SignatureKey::sign( - &private_key, - block_vid_commitment.as_ref(), - ) - .expect("Failed to sign payload commitment while preparing Quorum proposal"); - - let quorum_proposal_msg = QuorumProposalMessage:: { - proposal: Arc::new(Proposal { - data: quorum_proposal.clone(), - signature: quorum_signature, - _pd: PhantomData, - }), - sender: pub_key, - }; - let builder_state_id = BuilderStateId { - parent_commitment: block_vid_commitment, - parent_view: ViewNumber::new(round as u64), - }; - ( - quorum_proposal, - quorum_proposal_msg, - da_proposal, - builder_state_id, - ) -} - -/// get request message -/// it contains receiver, builder state id ( which helps looking up builder state in global state) and request message in view number and response channel -async fn get_req_msg( - round: u64, - builder_state_id: BuilderStateId, -) -> ( - UnboundedReceiver>, - BuilderStateId, - MessageType, -) { - let (response_sender, response_receiver) = unbounded(); - let request_message = MessageType::::RequestMessage(RequestMessage { - requested_view_number: ViewNumber::new(round), - response_channel: response_sender, - }); - - (response_receiver, builder_state_id, request_message) -} diff --git a/crates/marketplace/src/testing/order_test.rs b/crates/marketplace/src/testing/order_test.rs index 6e5fd520..3baf854f 100644 --- a/crates/marketplace/src/testing/order_test.rs +++ b/crates/marketplace/src/testing/order_test.rs @@ -8,7 +8,7 @@ use marketplace_builder_shared::block::BuilderStateId; use crate::{ builder_state::MessageType, - service::{BuilderHooks, ProxyGlobalState}, + service::{BuilderHooks, MarketplaceBuilderService}, }; use std::{fmt::Debug, sync::Arc}; @@ -160,7 +160,7 @@ async fn test_builder_order() { let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; - let proxy_global_state = ProxyGlobalState::new( + let proxy_global_state = MarketplaceBuilderService::new( global_state.clone(), Arc::new(NoOpHooks), BLSPubKey::generated_from_seed_indexed([0; 32], 0), @@ -318,7 +318,7 @@ async fn test_builder_order_chain_fork() { }; let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; - let proxy_global_state = ProxyGlobalState::new( + let proxy_global_state = MarketplaceBuilderService::new( global_state.clone(), Arc::new(NoOpHooks), BLSPubKey::generated_from_seed_indexed([0; 32], 0), @@ -507,7 +507,7 @@ async fn test_builder_order_should_fail() { const NUM_STORAGE_NODES: usize = 4; let (senders, global_state) = start_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; - let proxy_global_state = ProxyGlobalState::new( + let proxy_global_state = MarketplaceBuilderService::new( global_state, Arc::new(NoOpHooks), BLSPubKey::generated_from_seed_indexed([0; 32], 0), diff --git a/crates/marketplace/src/utils.rs b/crates/marketplace/src/utils.rs deleted file mode 100644 index 030dc911..00000000 --- a/crates/marketplace/src/utils.rs +++ /dev/null @@ -1,13 +0,0 @@ -use hotshot_types::traits::node_implementation::NodeType; - -// TODO: Update commitment calculation with the new `commit`. -// -pub trait LegacyCommit { - fn legacy_commit(&self) -> committable::Commitment>; -} - -impl LegacyCommit for hotshot_types::data::Leaf { - fn legacy_commit(&self) -> committable::Commitment> { - as committable::Committable>::commit(self) - } -} diff --git a/crates/shared/Cargo.toml b/crates/shared/Cargo.toml index a2a9b3a0..edb9c727 100644 --- a/crates/shared/Cargo.toml +++ b/crates/shared/Cargo.toml @@ -4,30 +4,36 @@ version = { workspace = true } edition = { workspace = true } [dependencies] -hotshot = { workspace = true } -hotshot-builder-api = { workspace = true } -hotshot-events-service = { workspace = true } -hotshot-example-types = { workspace = true } -hotshot-task-impls = { workspace = true } -hotshot-testing = { workspace = true } -hotshot-types = { workspace = true } - anyhow = { workspace = true } async-broadcast = { workspace = true } async-compatibility-layer = { workspace = true, features = ["logging-utils"] } +async-lock = { workspace = true } async-std = { workspace = true } async-trait = { workspace = true } bincode = { workspace = true } chrono = { workspace = true } committable = { workspace = true } +concurrent-map = { workspace = true } +derive_more = { workspace = true, features = ["debug"] } either = { workspace = true } futures = { workspace = true } hex = { workspace = true } +hotshot = { workspace = true } +hotshot-builder-api = { workspace = true } +hotshot-events-service = { workspace = true } +hotshot-example-types = { workspace = true } +hotshot-task-impls = { workspace = true } +hotshot-testing = { workspace = true } +hotshot-types = { workspace = true } + +nonempty-collections = "0.2" rand = { workspace = true } serde = { workspace = true } +sha2 = { workspace = true } surf-disco = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +typenum = { workspace = true } url = { workspace = true } vbs = { workspace = true } vec1 = { workspace = true } @@ -36,5 +42,7 @@ vec1 = { workspace = true } portpicker = "0.1.1" tide-disco = { workspace = true } +jf-vid = { version = "0.1.0", git = "https://github.com/EspressoSystems/jellyfish", tag = "0.4.5" } + [lints] workspace = true diff --git a/crates/shared/src/block.rs b/crates/shared/src/block.rs index 2a11bce3..9a8a0a74 100644 --- a/crates/shared/src/block.rs +++ b/crates/shared/src/block.rs @@ -1,10 +1,54 @@ //! Shared types dealing with block information -use committable::Commitment; +use std::time::Instant; + +use committable::{Commitment, Committable}; +use hotshot_types::data::fake_commitment; +use hotshot_types::traits::node_implementation::ConsensusTime; use hotshot_types::{ - data::Leaf, traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment, + data::Leaf, + traits::{block_contents::Transaction, node_implementation::NodeType}, + utils::BuilderCommitment, + vid::VidCommitment, }; +/// Enum to hold the different sources of the transaction +#[derive(Clone, Debug, PartialEq)] +pub enum TransactionSource { + /// Transaction from private mempool + Private, + /// Transaction from public mempool + Public, +} + +/// [`ReceivedTransaction`] represents receipt information concerning a received +/// [`NodeType::Transaction`]. +#[derive(Debug, Clone)] +pub struct ReceivedTransaction { + /// the transaction + pub transaction: Types::Transaction, + /// transaction's hash + pub commit: Commitment, + /// transaction's esitmated length + pub min_block_size: u64, + /// transaction's source + pub source: TransactionSource, + /// received time + pub time_in: Instant, +} + +impl ReceivedTransaction { + pub fn new(transaction: Types::Transaction, source: TransactionSource) -> Self { + Self { + commit: transaction.commit(), + min_block_size: transaction.minimum_block_size(), + source, + time_in: Instant::now(), + transaction, + } + } +} + /// Unique identifier for a block #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct BlockId { @@ -31,7 +75,7 @@ impl std::fmt::Display for BlockId { /// and view of the block it targets to extend, i.e. /// builder with given state ID assumes blocks/bundles it's building /// are going to be included immediately after the parent block. -#[derive(Clone, Debug, Hash, PartialEq, Eq)] +#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct BuilderStateId { /// View number of the parent block pub parent_view: Types::View, @@ -50,7 +94,7 @@ impl std::fmt::Display for BuilderStateId { } /// References to the parent block that is extended to spawn the new builder state. -#[derive(Debug, Clone)] +#[derive(derive_more::Debug, Clone)] pub struct ParentBlockReferences { pub view_number: Types::View, pub vid_commitment: VidCommitment, @@ -58,6 +102,44 @@ pub struct ParentBlockReferences { pub builder_commitment: BuilderCommitment, } +impl ParentBlockReferences +where + Types: NodeType, +{ + /// Create mock references for bootstrap (don't correspond to a real block) + pub fn bootstrap() -> Self { + Self { + view_number: Types::View::genesis(), + vid_commitment: VidCommitment::default(), + leaf_commit: fake_commitment(), + builder_commitment: BuilderCommitment::from_bytes([0; 32]), + } + } + + #[cfg(test)] + /// Generate references for given view number with random + /// commitments for use in testing code + pub fn random_with_view(view_number: Types::View) -> Self { + use hotshot_types::{data::random_commitment, vid::vid_scheme}; + use jf_vid::VidScheme; + use rand::{distributions::Standard, thread_rng, Rng}; + + use crate::testing::constants::TEST_NUM_NODES_IN_VID_COMPUTATION; + + let rng = &mut thread_rng(); + Self { + view_number, + leaf_commit: random_commitment(rng), + vid_commitment: vid_scheme(TEST_NUM_NODES_IN_VID_COMPUTATION) + .commit_only(rng.sample_iter(Standard).take(100).collect::>()) + .unwrap(), + builder_commitment: BuilderCommitment::from_bytes( + rng.sample_iter(Standard).take(32).collect::>(), + ), + } + } +} + // implement display for the derived info impl std::fmt::Display for ParentBlockReferences { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { diff --git a/crates/shared/src/coordinator/builder_state_map.rs b/crates/shared/src/coordinator/builder_state_map.rs new file mode 100644 index 00000000..1a8158d3 --- /dev/null +++ b/crates/shared/src/coordinator/builder_state_map.rs @@ -0,0 +1,257 @@ +//! This module contains an optimizide implementation of a +//! [`BuilderStateId`] to [`BuilderState`] map. + +use std::{ + collections::{btree_map::Entry, BTreeMap}, + ops::RangeBounds, + sync::Arc, +}; + +use hotshot_types::{traits::node_implementation::NodeType, vid::VidCommitment}; +use nonempty_collections::{nem, NEMap}; + +use crate::{block::BuilderStateId, state::BuilderState}; + +/// A map from [`BuilderStateId`] to [`BuilderState`], implemented as a tiered map +/// with the first tier being [`BTreeMap`] keyed by view number of [`BuilderStateId`] +/// and the second [`NEMap`] keyed by VID commitment of [`BuilderStateId`]. +/// +/// Usage of [`BTreeMap`] means that the map has an convenient property of always being +/// sorted by view number, which makes common operations such as pruning old views more efficient. +/// +/// Second tier being non-empty by construction [`NEMap`] ensures that we can't accidentally +/// create phantom entries with empty maps in the first tier. +pub struct BuilderStateMap( + BTreeMap<::View, NEMap>>>, +); + +impl BuilderStateMap { + /// Create a new empty map + pub fn new() -> Self { + Self(BTreeMap::new()) + } + + /// Returns an iterator visiting all values in this map + pub fn values(&self) -> impl Iterator>> { + self.0 + .values() + .flat_map(|bucket| bucket.values().into_iter()) + } + + /// Returns a nested iterator visiting all [`BuilderState`]s for view numbers in given range + pub fn range( + &self, + range: R, + ) -> impl Iterator>>> + where + R: RangeBounds, + { + self.0 + .range(range) + .map(|(_, bucket)| bucket.values().into_iter()) + } + + /// Returns an iterator visiting all [`BuilderState`]s for given view number + pub fn bucket( + &self, + view_number: &Types::View, + ) -> impl Iterator>> { + self.0 + .get(view_number) + .into_iter() + .flat_map(|bucket| bucket.values().into_iter()) + } + + /// Returns the number of builder states stored + pub fn len(&self) -> usize { + self.0.values().map(|bucket| bucket.len().get()).sum() + } + + /// Returns whether this map is empty + pub fn is_empty(&self) -> bool { + self.0.len() == 0 + } + + /// Get builder state by ID + pub fn get(&self, key: &BuilderStateId) -> Option<&Arc>> { + self.0.get(&key.parent_view)?.get(&key.parent_commitment) + } + + /// Get highest view builder state + /// + /// Returns `None` if the map is empty + pub fn highest_view_builder(&self) -> Option<&Arc>> { + Some(&self.0.last_key_value()?.1.head_val) + } + + /// Insert a new builder state + pub fn insert(&mut self, key: BuilderStateId, value: Arc>) { + match self.0.entry(key.parent_view) { + Entry::Vacant(entry) => { + entry.insert(nem![key.parent_commitment => value]); + } + Entry::Occupied(mut entry) => { + entry.get_mut().insert(key.parent_commitment, value); + } + } + } + + /// Returns highest view number for which we have a builder state + pub fn highest_view(&self) -> Option { + Some(*self.0.last_key_value()?.0) + } + + /// Returns lowest view number for which we have a builder state + pub fn lowest_view(&self) -> Option { + Some(*self.0.first_key_value()?.0) + } + + /// Removes every view lower than to `cutoff_view` (exclusive) from self and returns all removed views. + pub fn prune(&mut self, cutoff_view: Types::View) -> Self { + let high = self.0.split_off(&cutoff_view); + let low = std::mem::replace(&mut self.0, high); + Self(low) + } +} + +impl Default for BuilderStateMap { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use std::{ops::Bound, time::Duration}; + + use crate::block::ParentBlockReferences; + + use super::*; + use async_broadcast::broadcast; + use hotshot_example_types::node_types::TestTypes; + use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime}; + + type View = ViewNumber; + type BuilderStateMap = super::BuilderStateMap; + + #[test] + fn test_new_map() { + let map = BuilderStateMap::new(); + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + } + + #[test] + fn test_insert_and_get() { + let mut map = BuilderStateMap::new(); + let view = View::new(1); + let commitment = VidCommitment::default(); + let state_id = BuilderStateId { + parent_view: view, + parent_commitment: commitment, + }; + let builder_state = create_mock_builder_state(view); + + map.insert(state_id.clone(), builder_state.clone()); + + assert!(!map.is_empty()); + assert_eq!(map.len(), 1); + assert!(Arc::ptr_eq(map.get(&state_id).unwrap(), &builder_state)); + } + + #[test] + fn test_range_iteration() { + let mut map = BuilderStateMap::new(); + for i in 0..5 { + let view = View::new(i); + let commitment = VidCommitment::default(); + let state_id = BuilderStateId { + parent_view: view, + parent_commitment: commitment, + }; + map.insert(state_id, create_mock_builder_state(view)); + } + let start = View::new(1); + let end = View::new(3); + + let collected: Vec<_> = map + .range((Bound::Included(start), Bound::Excluded(end))) + .flatten() + .collect(); + + assert_eq!(collected.len(), 2); + assert_eq!(*collected[0].parent_block_references.view_number, 1); + assert_eq!(*collected[1].parent_block_references.view_number, 2); + } + + #[test] + fn test_pruning() { + let mut map = BuilderStateMap::new(); + for i in 0..10 { + let view = View::new(i); + let commitment = VidCommitment::default(); + let state_id = BuilderStateId { + parent_view: view, + parent_commitment: commitment, + }; + map.insert(state_id, create_mock_builder_state(view)); + } + + let pruned_map = map.prune(View::new(5)); + assert_eq!(pruned_map.len(), 5); + assert_eq!(map.len(), 5); + + assert!(pruned_map.bucket(&View::new(4)).next().is_some()); + assert!(map.bucket(&View::new(5)).next().is_some()); + + assert!(pruned_map.bucket(&View::new(5)).next().is_none()); + assert!(map.bucket(&View::new(4)).next().is_none()); + } + + #[test] + fn test_highest_and_lowest_view() { + let mut map = BuilderStateMap::new(); + assert_eq!(map.highest_view(), None); + assert_eq!(map.lowest_view(), None); + + for i in 1..10 { + let view = View::new(i); + let commitment = VidCommitment::default(); + let state_id = BuilderStateId { + parent_view: view, + parent_commitment: commitment, + }; + map.insert(state_id, create_mock_builder_state(view)); + } + + assert_eq!(*map.highest_view().unwrap(), 9); + assert_eq!(*map.lowest_view().unwrap(), 1); + } + + #[test] + fn test_highest_view_builder() { + let mut map = BuilderStateMap::new(); + assert!(map.highest_view_builder().is_none()); + + let view = View::new(10); + let commitment = VidCommitment::default(); + let state_id = BuilderStateId { + parent_view: view, + parent_commitment: commitment, + }; + let builder_state = create_mock_builder_state(view); + + map.insert(state_id, builder_state.clone()); + + assert!(Arc::ptr_eq( + map.highest_view_builder().unwrap(), + &builder_state + )); + } + + fn create_mock_builder_state(view: View) -> Arc> { + let references = ParentBlockReferences::random_with_view(view); + let (_, receiver) = broadcast(1); + BuilderState::new(references, Duration::from_secs(1), receiver) + } +} diff --git a/crates/shared/src/coordinator/mod.rs b/crates/shared/src/coordinator/mod.rs new file mode 100644 index 00000000..606eb7b4 --- /dev/null +++ b/crates/shared/src/coordinator/mod.rs @@ -0,0 +1,464 @@ +use std::{ + collections::{hash_map::Entry, HashMap}, + ops::Bound, + sync::Arc, + time::Duration, +}; + +use async_broadcast::Sender; +use async_lock::{Mutex, RwLock}; +use builder_state_map::BuilderStateMap; +use either::Either; +use hotshot::traits::BlockPayload; +use hotshot_builder_api::v0_3::builder::BuildError; +use hotshot_types::{ + data::{DaProposal, QuorumProposal}, + event::LeafInfo, + traits::{ + block_contents::BlockHeader, + node_implementation::{ConsensusTime, NodeType}, + }, +}; +use tracing::{error, info, warn}; + +use crate::{ + block::{BuilderStateId, ParentBlockReferences, ReceivedTransaction}, + state::BuilderState, + utils::ProposalId, +}; + +pub mod builder_state_map; + +type ProposalMap = + HashMap, Either, DaProposal>>; + +/// Result of looking up a builder state by ID. +/// +/// Different from an [`Option`] as it distinguishes between +/// two cases: one where the [`BuilderStateId`] is for a view that has +/// already been marked as decided - meaning there's no way +/// it will exist again - and another where the [`BuilderStateId`] is +/// for a not yet decided view, indicating a chance that +/// this entry may be populated at some point in the future. +#[derive(Clone, Debug)] +pub enum BuilderStateLookup +where + Types: NodeType, +{ + /// Corresponding [`BuilderState`] doesn't exist + NotFound, + /// The view number looked up was already decided + Decided, + /// Successful lookup + Found(Arc>), +} + +/// A coordinator managing the lifecycle of [`BuilderState`]s. +/// +/// Its responsibilities include: +/// - Storing builder states and allowing their lookup +/// - Spawning new builder states +/// - Distributing transactions to builder states through a broadcast channel +/// - Removing outdated builder states +/// +///
+/// +/// Important: [`BuilderState`]s do not automatically remove transactions from the channel. +/// Refer to [`BuilderState::collect_txns`] for more details on manually dequeuing transactions. +/// +///
+/// +/// For the coordinator to function correctly, the following handler functions +/// must be invoked when receiving corresponding HotShot events: +/// - [`Self::handle_decide`] +/// - [`Self::handle_quorum_proposal`] +/// - [`Self::handle_da_proposal`] +/// - [`Self::handle_transaction`] +pub struct BuilderStateCoordinator +where + Types: NodeType, +{ + builder_states: RwLock>, + transaction_sender: Sender>>, + proposals: Mutex>, +} + +impl BuilderStateCoordinator +where + Types: NodeType, +{ + /// Constructs a new [`BuilderState`] coordinator. + /// `txn_channel_capacity` controls the size of the channel used to distribute transactions to [`BuilderState`]s. + /// `txn_garbage_collect_duration` specifies the duration for which the coordinator retains the hashes of transactions + /// that have been marked as included by its [`BuilderState`]s. Once this duration has elapsed, new [`BuilderState`]s + /// can include duplicates of older transactions should such be received again. + pub fn new(txn_channel_capacity: usize, txn_garbage_collect_duration: Duration) -> Self { + let (txn_sender, txn_receiver) = async_broadcast::broadcast(txn_channel_capacity); + let bootstrap_state = BuilderState::new( + ParentBlockReferences::bootstrap(), + txn_garbage_collect_duration, + txn_receiver, + ); + let mut builder_states = BuilderStateMap::new(); + builder_states.insert(bootstrap_state.id(), bootstrap_state); + + Self { + transaction_sender: txn_sender, + builder_states: RwLock::new(builder_states), + proposals: Mutex::new(ProposalMap::new()), + } + } + + /// This function should be called whenever new decide events are received from HotShot. + /// Its main responsibility is to perform garbage collection of [`BuilderState`]s for older views. + /// The function returns the [`BuilderState`]s that have been garbage collected. + #[tracing::instrument(skip_all)] + pub async fn handle_decide( + &self, + leaf_chain: Arc>>, + ) -> BuilderStateMap { + let latest_decide_view_num = leaf_chain[0].leaf.view_number(); + let mut builder_states = self.builder_states.write().await; + let highest_active_view_num = builder_states + .highest_view() + .unwrap_or(Types::View::genesis()); + let cutoff = Types::View::new( + latest_decide_view_num + .saturating_add(1) + .min(*highest_active_view_num), + ); + builder_states.prune(cutoff) + } + + /// This function should be called whenever new transactions are received from HotShot. + ///
+ /// + /// Important: [`BuilderState`]s do not automatically remove transactions from the channel. + /// Refer to [`BuilderState::collect_txns`] for more details on manually dequeuing transactions. + /// + ///
+ #[tracing::instrument(skip_all, fields(transaction = %transaction.commit))] + #[must_use] + pub async fn handle_transaction( + &self, + transaction: ReceivedTransaction, + ) -> Result<(), BuildError> { + self.transaction_sender + .try_broadcast(Arc::new(transaction)) + .inspect(|val| { + if let Some(evicted_txn) = val { + warn!( + "Overflow mode enabled, transaction {} evicted", + evicted_txn.commit + ); + } + }) + .inspect_err(|err| { + warn!("Failed to broadcast txn: {}", err); + }) + .map_err(|e| BuildError::Error(e.to_string()))?; + Ok(()) + } + + /// This function should be called whenever new DA Proposal is recieved from HotShot. + /// Coordinator uses matching Quorum and DA proposals to track creation of new blocks + /// and spawning corresponding builder states for those. + pub async fn handle_da_proposal(&self, da_proposal: DaProposal) { + let builder_commitment = >::from_bytes( + &da_proposal.encoded_transactions, + &da_proposal.metadata, + ) + .builder_commitment(&da_proposal.metadata); + + let proposal_id = ProposalId { + payload_commitment: builder_commitment, + view_number: da_proposal.view_number, + }; + + self.handle_proposal(proposal_id, Either::Right(da_proposal)) + .await; + } + + /// This function should be called whenever new Quorum Proposal is recieved from HotShot. + /// Coordinator uses matching Quorum and DA proposals to track creation of new blocks + /// and spawning corresponding builder states for those. + pub async fn handle_quorum_proposal(&self, quorum_proposal: QuorumProposal) { + let proposal_id = ProposalId { + payload_commitment: quorum_proposal.block_header.builder_commitment(), + view_number: quorum_proposal.view_number, + }; + + self.handle_proposal(proposal_id, Either::Left(quorum_proposal)) + .await; + } + + /// Generalized function to handle Quorum and DA proposals. The behavior is as follows: + /// + /// - If a matching proposal of the other kind exists for this [`ProposalId`], remove it + /// from storage and spawn a new [`BuilderState`] from the resulting proposal pair. + /// - If a proposal of the same kind is stored, do nothing. + /// - If there are no records for this [`ProposalId`], store it. + #[tracing::instrument(skip_all)] + async fn handle_proposal( + &self, + proposal_id: ProposalId, + proposal: Either, DaProposal>, + ) { + match self.proposals.lock().await.entry(proposal_id) { + Entry::Occupied(entry) if entry.get().is_left() == proposal.is_left() => { + // Duplicate proposal, ignore + } + Entry::Occupied(entry) => match (entry.remove(), proposal) { + (Either::Right(da_proposal), Either::Left(quorum_proposal)) + | (Either::Left(quorum_proposal), Either::Right(da_proposal)) => { + self.spawn_builder_state(quorum_proposal, da_proposal).await + } + _ => { + unreachable!() + } + }, + Entry::Vacant(entry) => { + entry.insert(proposal); + } + } + } + + /// Looks up a [`BuilderState`] by id. + /// + /// Refer to [`BuilderStateLookup`] for more information on return value + #[tracing::instrument(skip_all)] + #[must_use] + pub async fn lookup_builder_state( + &self, + id: &BuilderStateId, + ) -> BuilderStateLookup { + if let Some(entry) = self.builder_states.read().await.get(id).cloned() { + return BuilderStateLookup::Found(entry); + } + + let lowest_view = self + .builder_states + .read() + .await + .lowest_view() + .unwrap_or(Types::View::genesis()); + + if lowest_view > id.parent_view { + return BuilderStateLookup::Decided; + } + + BuilderStateLookup::NotFound + } + + /// Looks up a [`BuilderState`] by id. + /// If it is not found, looks up the builder state with the highest view number. + /// If there are no builder states at all, returns [`None`]. + #[tracing::instrument(skip_all)] + #[must_use] + pub async fn get_builder_by_key_or_highest_view_builder( + &self, + key: &BuilderStateId, + ) -> Option>> { + if let BuilderStateLookup::Found(builder_state) = self.lookup_builder_state(key).await { + Some(builder_state) + } else { + self.builder_states + .read() + .await + .highest_view_builder() + .cloned() + .inspect(|state| { + warn!( + "failed to recover builder for parent {}, using higest view num builder with {}", + key, + state.parent_block_references.view_number, + ); + }) + } + } + + /// Spawn a new builder state off of matching pair of Quorum and DA proposals, store it in [`Self::builder_states`] + async fn spawn_builder_state( + &self, + quorum_proposal: QuorumProposal, + da_proposal: DaProposal, + ) { + assert_eq!(quorum_proposal.view_number, da_proposal.view_number); + + let mut candidate_parents = self.find_builder_states_to_extend(&quorum_proposal).await; + + if candidate_parents.is_empty() { + error!( + ?quorum_proposal, + ?da_proposal, + "Couldn't find a parent for new builder state" + ); + } + + if candidate_parents.len() > 1 { + info!( + ?candidate_parents, + "Multiple candidates for new builder state parent" + ); + } + + // if we have multiple candidate states, this is the simplest way to choose one + let Some(parent_state) = candidate_parents.pop() else { + return; + }; + + let child_state = parent_state + .new_child(quorum_proposal.clone(), da_proposal.clone()) + .await; + + self.builder_states + .write() + .await + .insert(child_state.id(), child_state); + } + + /// This is an utility function that is used to determine which [`BuilderState`]s + /// are the best fit to extend from for given [`QuorumProposal`] + /// + /// In an ideal circumstance the best [`BuilderState`] to extend from is going to + /// be the one that is immediately preceding the [`QuorumProposal`] that we are + /// attempting to extend from. However, if all we know is the view number of + /// the [`QuorumProposal`] that we are attempting to extend from, then we may end + /// up in a scenario where we have multiple [`BuilderState`]s that are all equally + /// valid to extend from. When this happens, we have the potential for a data + /// race. + /// + /// The primary cause of this has to due with the interface of the + /// [`BuilderStateCoordinator`]'s API. In general, we want to be able to retrieve + /// a [`BuilderState`] via the [`BuilderStateId`]. The [`BuilderStateId`] only references + /// a [`ViewNumber`](hotshot_types::data::ViewNumber) and a [`VidCommitment`](`hotshot_types::vid::VidCommitment`). + /// While this information is available in the [`QuorumProposal`], + /// it only helps us to rule out [`BuilderState`]s that already exist. + /// It does **NOT** help us to pick a [`BuilderState`] that is the best fit to extend from. + /// + /// This is where the `justify_qc` comes in to consideration. The `justify_qc` + /// contains the previous [`ViewNumber`](hotshot_types::data::ViewNumber) that is + /// being extended from, and in addition it also contains the previous + /// [`Commitment>`](`committable::Commitment`) + /// that is being built on top of. Since our [`BuilderState`]s store identifying + /// information that contains this same `leaf_commit` we can compare these + /// directly to ensure that we are extending from the correct [`BuilderState`]. + /// + /// This function determines the best [`BuilderState`] in the following steps: + /// + /// 1. If we have a [`BuilderState`] that is already spawned for the current + /// [`QuorumProposal`], then we should should return no states, as one already + /// exists. This will prevent us from attempting to spawn duplicate + /// [`BuilderState`]s. + /// 2. Attempt to find all [`BuilderState`]s that are recorded within + /// coordinator that have matching view number and leaf commitments. There + /// *should* only be one of these. But all would be valid extension points. + /// 3. If we can't find any [`BuilderState`]s that match the view number + /// and leaf commitment, then we should return for the maximum stored view + /// number that is smaller than the current [`QuorumProposal`]. + /// 4. If there is is only one [`BuilderState`] stored in the coordinator, then + /// we should return that [`BuilderState`] as the best fit. + /// 5. If none of the other criteria match, we return an empty result as it is + /// unclear what to do in this case. + /// + ///
+ /// + /// Note: Any time this function returns more than a single entry in its + /// result, there is a potential for a race condition. This is + /// because there are multiple [`BuilderState`]s that are equally valid to + /// extend from. This race could be avoided by just picking one of the + /// entries in the resulting [`Vec`], but this is not done here in order + /// to allow us to highlight the possibility of the race. + /// + ///
+ #[must_use] + async fn find_builder_states_to_extend( + &self, + quorum_proposal: &QuorumProposal, + ) -> Vec>> { + // This is ID of the state we want to spawn + let current_builder_state_id = BuilderStateId { + parent_view: quorum_proposal.view_number, + parent_commitment: quorum_proposal.block_header.payload_commitment(), + }; + + let builder_states = self.builder_states.read().await; + + // The first step is to check if we already have a spawned [BuilderState]. + // If we do, then we should indicate that there is no best fit, as we + // don't want to spawn another [BuilderState]. + if builder_states.get(¤t_builder_state_id).is_some() { + // We already have a spawned [BuilderState] for this proposal. + // So we should just ignore it. + return Vec::new(); + } + + // Next we want to see if there is an immediate match for a [BuilderState] + // that we can extend from. This is the most ideal situation, as it + // implies that we are extending from the correct [BuilderState]. + // We do this by checking the `justify_qc` stored within the + // [QuorumProposal], and checking it against the current spawned + // [BuilderState]s + let justify_qc = &quorum_proposal.justify_qc; + + let existing_states = builder_states + .bucket(&justify_qc.view_number) + .filter(|state| { + state.parent_block_references.leaf_commit == justify_qc.data.leaf_commit + }) + .cloned() + .collect::>(); + + // If we found any matching [BuilderState]s, then we should return them + // as the best fit. + if !existing_states.is_empty() { + return existing_states; + } + + warn!("No ideal match for builder state to extend"); + + // At this point, we don't have any "ideal" matches or scenarios. So we + // need to look for a suitable fall-back. The best fallback condition to + // start with is any [BuilderState] that has the maximum spawned view + // number whose value is smaller than the current [QuorumProposal]. + if let Some(states) = builder_states + .range(( + Bound::Unbounded, + Bound::Excluded(current_builder_state_id.parent_view), + )) + .next() + { + // If we have a maximum view number that meets our criteria, then we should + // return all [BuilderStateId]s that match this view number. + // This can lead to multiple [BuilderStateId]s being returned. + // If we are the maximum stored view number smaller than the quorum + // proposal's view number, then we are the best fit. + return states.cloned().collect(); + } + + // This is our last ditch effort to continue making progress. If there is + // only one [BuilderState] active, then we should return that as the best + // fit, as it will be the only way we can continue making progress with + // the builder. + if builder_states.len() == 1 { + return builder_states + .highest_view_builder() + .cloned() + .into_iter() + .collect(); + } + drop(builder_states); + + // This implies that there are only larger [BuilderState]s active than + // the one we are. This is weird, it implies that some sort of time + // travel has occurred view-wise. It is unclear what to do in this + // situation. + warn!("View time-travel"); + Vec::new() + } +} + +#[cfg(test)] +mod tests { + // TODO: +} diff --git a/crates/shared/src/lib.rs b/crates/shared/src/lib.rs index bd26a468..98b6466f 100644 --- a/crates/shared/src/lib.rs +++ b/crates/shared/src/lib.rs @@ -1,3 +1,5 @@ pub mod block; +pub mod coordinator; +pub mod state; pub mod testing; pub mod utils; diff --git a/crates/shared/src/state.rs b/crates/shared/src/state.rs new file mode 100644 index 00000000..e829982c --- /dev/null +++ b/crates/shared/src/state.rs @@ -0,0 +1,198 @@ +use std::{ + collections::{HashSet, VecDeque}, + sync::Arc, + time::{Duration, Instant}, +}; + +use crate::{ + block::{BuilderStateId, ParentBlockReferences, ReceivedTransaction}, + utils::{LegacyCommit, RotatingSet}, +}; +use async_broadcast::Receiver; +use async_lock::{Mutex, RwLock}; +use committable::Commitment; +use hotshot::traits::BlockPayload; +use hotshot_types::{ + data::{DaProposal, Leaf, QuorumProposal}, + traits::{block_contents::BlockHeader, node_implementation::NodeType}, +}; + +#[derive(Debug, Clone)] +pub struct TransactionQueue +where + Types: NodeType, +{ + /// txn commits currently in the `tx_queue`. This is used as a quick + /// check for whether a transaction is already in the `tx_queue` or + /// not. + /// + /// This should be kept up-to-date with the `tx_queue` as it acts as an + /// accessory to the `tx_queue`. + pub commits: HashSet>, + + /// filtered queue of available transactions, taken from `tx_receiver` + pub transactions: VecDeque>>, +} + +impl Default for TransactionQueue +where + Types: NodeType, +{ + fn default() -> Self { + Self::new() + } +} + +impl TransactionQueue +where + Types: NodeType, +{ + pub fn new() -> Self { + Self { + commits: HashSet::new(), + transactions: VecDeque::new(), + } + } + + pub fn prune<'a>(&mut self, commits: impl Iterator>) { + for commit in commits { + self.commits.remove(commit); + } + self.transactions + .retain(|txn| self.commits.contains(&txn.commit)); + } + + pub fn insert(&mut self, transaction: Arc>) -> bool { + if !self.commits.contains(&transaction.commit) { + self.commits.insert(transaction.commit); + self.transactions.push_back(transaction); + true + } else { + false + } + } + + pub fn is_empty(&self) -> bool { + self.commits.is_empty() + } +} + +#[derive(derive_more::Debug)] +pub struct BuilderState { + /// Spawned-from references to the parent block. + pub parent_block_references: ParentBlockReferences, + + /// txns that have been included in recent blocks that have + /// been built. This is used to try and guarantee that a transaction + /// isn't duplicated. + /// Keeps a history of the last 3 proposals. + #[debug(skip)] + pub included_txns: RotatingSet>, + + /// transaction queue + #[debug(skip)] + pub txn_queue: RwLock>, + + #[debug(skip)] + pub txn_receiver: Mutex>>>, +} + +impl BuilderState +where + Types: NodeType, +{ + pub fn new( + parent: ParentBlockReferences, + txn_garbage_collect_duration: Duration, + txn_receiver: Receiver>>, + ) -> Arc { + Arc::new(Self { + parent_block_references: parent, + included_txns: RotatingSet::new(txn_garbage_collect_duration), + txn_queue: RwLock::new(TransactionQueue::new()), + txn_receiver: Mutex::new(txn_receiver), + }) + } + + pub fn id(&self) -> BuilderStateId { + BuilderStateId { + parent_view: self.parent_block_references.view_number, + parent_commitment: self.parent_block_references.vid_commitment, + } + } + + pub(crate) async fn new_child( + self: Arc, + quorum_proposal: QuorumProposal, + da_proposal: DaProposal, + ) -> Arc { + let leaf = Leaf::from_quorum_proposal(&quorum_proposal); + + // We replace our parent_block_references with information from the + // quorum proposal. This is identifying the block that this specific + // instance of [BuilderState] is attempting to build for. + let parent_block_references = ParentBlockReferences { + view_number: quorum_proposal.view_number, + vid_commitment: quorum_proposal.block_header.payload_commitment(), + leaf_commit: leaf.legacy_commit(), + builder_commitment: quorum_proposal.block_header.builder_commitment(), + }; + + let mut included_txns = self.included_txns.clone(); + included_txns.rotate(); + + let encoded_txns = &da_proposal.encoded_transactions; + let metadata = &da_proposal.metadata; + + let block_payload = + >::from_bytes(encoded_txns, metadata); + let txn_commitments = block_payload.transaction_commitments(metadata); + + let mut txn_queue = self.txn_queue.read().await.clone(); + txn_queue.prune(txn_commitments.iter()); + + included_txns.extend(txn_commitments.into_iter()); + + Arc::new(BuilderState { + parent_block_references, + included_txns, + txn_queue: RwLock::new(txn_queue), + txn_receiver: Mutex::new(self.txn_receiver.lock().await.clone()), + }) + } + + // collect outstanding transactions + pub async fn collect_txns(&self, timeout_after: Instant) -> bool { + let mut queue_empty = self.txn_queue.read().await.is_empty(); + while Instant::now() <= timeout_after { + let mut receiver_guard = self.txn_receiver.lock().await; + match receiver_guard.try_recv() { + Ok(txn) => { + if self.included_txns.contains(&txn.commit) { + // We've included this transaction in one of our + // recent blocks, and we do not wish to include it + // again. + continue; + } + + self.txn_queue.write().await.insert(txn); + queue_empty = false; + } + + Err(async_broadcast::TryRecvError::Empty) + | Err(async_broadcast::TryRecvError::Closed) => { + // The transaction receiver is empty, or it's been closed. + // If it's closed that's a big problem and we should + // probably indicate it as such. + break; + } + + Err(async_broadcast::TryRecvError::Overflowed(lost)) => { + tracing::warn!("Missed {lost} transactions due to backlog"); + continue; + } + } + } + queue_empty + } +} diff --git a/crates/shared/src/testing/generation.rs b/crates/shared/src/testing/generation.rs index 6bb40d14..7efb0dc9 100644 --- a/crates/shared/src/testing/generation.rs +++ b/crates/shared/src/testing/generation.rs @@ -3,7 +3,7 @@ use std::{ sync::Arc, }; -use async_std::sync::RwLock; +use async_lock::RwLock; use async_trait::async_trait; use chrono::Local; use hotshot::{ diff --git a/crates/shared/src/testing/validation.rs b/crates/shared/src/testing/validation.rs index 1891bc17..c30dbe9f 100644 --- a/crates/shared/src/testing/validation.rs +++ b/crates/shared/src/testing/validation.rs @@ -17,7 +17,7 @@ use hotshot_types::traits::{ }; use anyhow::bail; -use async_std::sync::RwLock; +use async_lock::RwLock; use async_trait::async_trait; use chrono::{DateTime, Local}; diff --git a/crates/shared/src/utils.rs b/crates/shared/src/utils.rs index 9dd2972b..c63299e7 100644 --- a/crates/shared/src/utils.rs +++ b/crates/shared/src/utils.rs @@ -16,6 +16,7 @@ use futures::{Stream, StreamExt}; use hotshot::types::Event; use hotshot_events_service::events::Error as EventStreamError; use hotshot_types::traits::node_implementation::NodeType; +use hotshot_types::utils::BuilderCommitment; use surf_disco::client::HealthStatus; use surf_disco::Client; use tracing::{error, warn}; @@ -345,3 +346,24 @@ mod tests { .expect_err("API is reachable, but is on wrong path"); } } + +// TODO: Update commitment calculation with the new `commit`. +// +pub trait LegacyCommit { + fn legacy_commit(&self) -> committable::Commitment>; +} + +impl LegacyCommit for hotshot_types::data::Leaf { + fn legacy_commit(&self) -> committable::Commitment> { + as committable::Committable>::commit(self) + } +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct ProposalId +where + Types: NodeType, +{ + pub view_number: Types::View, + pub payload_commitment: BuilderCommitment, +}