From 1734e44bdc1707d6098b59ae13b5aeda8db1795c Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Mon, 29 Jul 2024 23:12:34 +0200 Subject: [PATCH] Port refactors from legacy builder --- Cargo.lock | 2 + Cargo.toml | 7 + src/builder_state.rs | 93 ++---- src/builder_state.rs.rej | 79 +++++ src/lib.rs | 33 ++ src/service.rs | 150 +++++---- src/service.rs.diff | 555 ++++++++++++++++++++++++++++++++++ src/testing/basic_test.rs | 21 +- src/testing/basic_test.rs.rej | 15 + 9 files changed, 821 insertions(+), 134 deletions(-) create mode 100644 src/builder_state.rs.rej create mode 100644 src/service.rs.diff create mode 100644 src/testing/basic_test.rs.rej diff --git a/Cargo.lock b/Cargo.lock index 582caefa..0f640a3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4539,12 +4539,14 @@ dependencies = [ "committable", "derivative", "futures", + "hex", "hotshot", "hotshot-builder-api", "hotshot-events-service", "hotshot-example-types", "hotshot-task-impls", "hotshot-types", + "lru 0.12.3", "multimap", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 024ed28c..63f1fc88 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,11 +17,13 @@ clap = { version = "4.4", features = ["derive", "env"] } committable = "0.2" derivative = "2.2" futures = "0.3" +hex = "0.4.3" hotshot = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" } hotshot-builder-api = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" } hotshot-events-service = { git = "https://github.com/EspressoSystems/hotshot-events-service.git", tag = "rc-0.1.37" } hotshot-task-impls = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" } hotshot-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" } +lru = "0.12.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.10" @@ -38,3 +40,8 @@ multimap = "0.10.0" [dev-dependencies] hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" } + +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(async_executor_impl, values("async-std", "tokio"))', +] } diff --git a/src/builder_state.rs b/src/builder_state.rs index 374347bc..cb1c305a 100644 --- a/src/builder_state.rs +++ b/src/builder_state.rs @@ -12,14 +12,16 @@ use hotshot_types::{ use committable::{Commitment, Committable}; -use crate::service::{GlobalState, ReceivedTransaction}; +use crate::{ + service::{GlobalState, ReceivedTransaction}, + BlockId, BuilderStateId, +}; use async_broadcast::broadcast; use async_broadcast::Receiver as BroadcastReceiver; use async_broadcast::Sender as BroadcastSender; use async_compatibility_layer::channel::UnboundedSender; use async_compatibility_layer::{art::async_sleep, art::async_spawn}; use async_lock::RwLock; -use async_trait::async_trait; use core::panic; use futures::StreamExt; @@ -62,8 +64,8 @@ pub struct QCMessage { } /// Request Message to be put on the request channel #[derive(Clone, Debug)] -pub struct RequestMessage { - pub requested_view_number: u64, +pub struct RequestMessage { + pub requested_view_number: TYPES::Time, pub response_channel: UnboundedSender, } pub enum TriggerStatus { @@ -74,7 +76,7 @@ pub enum TriggerStatus { /// Response Message to be put on the response channel #[derive(Debug)] pub struct BuildBlockInfo { - pub builder_hash: BuilderCommitment, + pub id: BlockId, pub block_size: u64, pub offered_fee: u64, pub block_payload: TYPES::BlockPayload, @@ -185,41 +187,7 @@ pub struct BuilderState { pub next_txn_garbage_collect_time: Instant, } -/// Trait to hold the helper functions for the builder -#[async_trait] -pub trait BuilderProgress { - /// process the DA proposal - async fn process_da_proposal(&mut self, da_msg: Arc>); - /// process the quorum proposal - async fn process_quorum_proposal(&mut self, qc_msg: QCMessage); - - /// process the decide event - async fn process_decide_event(&mut self, decide_msg: DecideMessage) -> Option; - - /// spawn a clone of builder - async fn spawn_clone( - self, - da_proposal: Arc>, - quorum_proposal: Arc>>, - req_sender: BroadcastSender>, - ); - - /// build a block - async fn build_block( - &mut self, - matching_builder_commitment: VidCommitment, - matching_view_number: TYPES::Time, - ) -> Option>; - - /// Event Loop - fn event_loop(self); - - /// process the block request - async fn process_block_request(&mut self, req: RequestMessage); -} - -#[async_trait] -impl BuilderProgress for BuilderState { +impl BuilderState { /// processing the DA proposal #[tracing::instrument(skip_all, name = "process da proposal", fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] @@ -440,8 +408,10 @@ impl BuilderProgress for BuilderState { // register the spawned builder state to spawned_builder_states in the global state self.global_state.write_arc().await.register_builder_state( - self.built_from_proposed_block.vid_commitment, - self.built_from_proposed_block.view_number, + BuilderStateId { + parent_commitment: self.built_from_proposed_block.vid_commitment, + view: self.built_from_proposed_block.view_number, + }, req_sender, ); @@ -453,8 +423,7 @@ impl BuilderProgress for BuilderState { fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] async fn build_block( &mut self, - matching_vid: VidCommitment, - requested_view_number: TYPES::Time, + state_id: BuilderStateId, ) -> Option> { let timeout_after = Instant::now() + self.maximize_txn_capture_timeout; let sleep_interval = self.maximize_txn_capture_timeout / 10; @@ -484,9 +453,9 @@ impl BuilderProgress for BuilderState { // insert the recently built block into the builder commitments self.builder_commitments.insert(( - matching_vid, + state_id.parent_commitment, builder_hash.clone(), - requested_view_number, + state_id.view, )); let encoded_txns: Vec = payload.encode().to_vec(); @@ -501,7 +470,10 @@ impl BuilderProgress for BuilderState { ); Some(BuildBlockInfo { - builder_hash, + id: BlockId { + view: self.built_from_proposed_block.view_number, + hash: builder_hash, + }, block_size, offered_fee, block_payload: payload, @@ -513,9 +485,8 @@ impl BuilderProgress for BuilderState { } } - async fn process_block_request(&mut self, req: RequestMessage) { - let requested_view_number = - <::Time as ConsensusTime>::new(req.requested_view_number); + async fn process_block_request(&mut self, req: RequestMessage) { + let requested_view_number = req.requested_view_number; // If a spawned clone is active then it will handle the request, otherwise the highest view num builder will handle if requested_view_number == self.built_from_proposed_block.view_number { tracing::info!( @@ -525,26 +496,28 @@ impl BuilderProgress for BuilderState { requested_view_number ); let response = self - .build_block( - self.built_from_proposed_block.vid_commitment, - requested_view_number, - ) + .build_block(BuilderStateId { + parent_commitment: self.built_from_proposed_block.vid_commitment, + view: requested_view_number, + }) .await; match response { Some(response) => { // form the response message let response_msg = ResponseMessage { - builder_hash: response.builder_hash.clone(), + builder_hash: response.id.hash.clone(), block_size: response.block_size, offered_fee: response.offered_fee, }; - let builder_hash = response.builder_hash.clone(); + let builder_hash = response.id.hash.clone(); self.global_state.write_arc().await.update_global_state( + BuilderStateId { + parent_commitment: self.built_from_proposed_block.vid_commitment, + view: requested_view_number, + }, response, - self.built_from_proposed_block.vid_commitment, - requested_view_number, response_msg.clone(), ); @@ -581,7 +554,7 @@ impl BuilderProgress for BuilderState { } #[tracing::instrument(skip_all, name = "event loop", fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] - fn event_loop(mut self) { + pub fn event_loop(mut self) { let _builder_handle = async_spawn(async move { loop { tracing::debug!( @@ -686,7 +659,7 @@ pub enum MessageType { DecideMessage(DecideMessage), DaProposalMessage(Arc>), QCMessage(QCMessage), - RequestMessage(RequestMessage), + RequestMessage(RequestMessage), } #[allow(clippy::too_many_arguments)] diff --git a/src/builder_state.rs.rej b/src/builder_state.rs.rej new file mode 100644 index 00000000..c584062a --- /dev/null +++ b/src/builder_state.rs.rej @@ -0,0 +1,79 @@ +diff a/src/builder_state.rs b/src/builder_state.rs (rejected hunks) +@@ -65,9 +68,8 @@ pub struct QCMessage { + } + /// Request Message to be put on the request channel + #[derive(Clone, Debug)] +-pub struct RequestMessage { +- pub requested_vid_commitment: VidCommitment, +- pub requested_view_number: u64, ++pub struct RequestMessageTYPES: NodeType> { ++ pub state_id: BuilderStateIdTYPES>, + pub response_channel: UnboundedSender, + } + pub enum TriggerStatus { +@@ -555,46 +561,39 @@ impl BuilderState { + } + } + +- async fn process_block_request(&mut self, req: RequestMessage) { +- let requested_vid_commitment = req.requested_vid_commitment; +- let requested_view_number = +- <::Time as ConsensusTime>::new(req.requested_view_number); ++ async fn process_block_request(&mut self, req: RequestMessage) { + // If a spawned clone is active then it will handle the request, otherwise the highest view num builder will handle +- if (requested_vid_commitment == self.built_from_proposed_block.vid_commitment +- && requested_view_number == self.built_from_proposed_block.view_number) ++ if (req.state_id.parent_commitment == self.built_from_proposed_block.vid_commitment ++ && req.state_id.view == self.built_from_proposed_block.view_number) + || (self.built_from_proposed_block.view_number.u64() + == self + .global_state + .read_arc() + .await + .highest_view_num_builder_id +- .1 ++ .view + .u64()) + { + tracing::info!( +- "Request handled by builder with view {:?} for (parent {:?}, view_num: {:?})", ++ "Request for parent {} handled by builder with view {:?}", ++ req.state_id, + self.built_from_proposed_block.view_number, +- requested_vid_commitment, +- requested_view_number + ); +- let response = self +- .build_block(requested_vid_commitment, requested_view_number) +- .await; ++ let response = self.build_block(req.state_id.clone()).await; + + match response { + Some(response) => { + // form the response message + let response_msg = ResponseMessage { +- builder_hash: response.builder_hash.clone(), ++ builder_hash: response.id.hash.clone(), + block_size: response.block_size, + offered_fee: response.offered_fee, + }; + +- let builder_hash = response.builder_hash.clone(); ++ let builder_hash = response.id.hash.clone(); + self.global_state.write_arc().await.update_global_state( ++ req.state_id.clone(), + response, +- requested_vid_commitment, +- requested_view_number, + response_msg.clone(), + ); + +@@ -736,7 +735,7 @@ pub enum MessageType { + DecideMessage(DecideMessage), + DaProposalMessage(DaProposalMessage), + QCMessage(QCMessage), +- RequestMessage(RequestMessage), ++ RequestMessage(RequestMessage), + } + + #[allow(clippy::too_many_arguments)] diff --git a/src/lib.rs b/src/lib.rs index f128f98d..0625bf81 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,10 @@ // It also provides one API services external users: // 1. Serves a user's request to submit a private transaction +use hotshot_types::{ + traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment, +}; + // providing the core services to support above API services pub mod builder_state; @@ -19,3 +23,32 @@ pub mod service; // tracking the testing pub mod testing; + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct BlockId { + hash: BuilderCommitment, + view: TYPES::Time, +} + +impl std::fmt::Display for BlockId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Block({}@{})", + hex::encode(self.hash.as_ref()), + *self.view + ) + } +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct BuilderStateId { + parent_commitment: VidCommitment, + view: TYPES::Time, +} + +impl std::fmt::Display for BuilderStateId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BuilderState({}@{})", self.parent_commitment, *self.view) + } +} diff --git a/src/service.rs b/src/service.rs index a7c93166..2b8a29c2 100644 --- a/src/service.rs +++ b/src/service.rs @@ -27,13 +27,16 @@ use tracing::error; use std::{fmt::Debug, marker::PhantomData}; -use crate::builder_state::{ - BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QCMessage, ResponseMessage, - TransactionSource, +use crate::{ + builder_state::{ + BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QCMessage, ResponseMessage, + TransactionSource, + }, + BlockId, BuilderStateId, }; use anyhow::anyhow; -use async_broadcast::Sender as BroadcastSender; pub use async_broadcast::{broadcast, RecvError, TryRecvError}; +use async_broadcast::{Sender as BroadcastSender, TrySendError}; use async_lock::RwLock; use async_trait::async_trait; use committable::{Commitment, Committable}; @@ -105,15 +108,14 @@ pub struct ReceivedTransaction { #[derive(Debug)] pub struct GlobalState { // data store for the blocks - pub block_hash_to_block: HashMap<(BuilderCommitment, TYPES::Time), BlockInfo>, + pub blocks: lru::LruCache, BlockInfo>, // registered builder states - pub spawned_builder_states: - HashMap<(VidCommitment, TYPES::Time), BroadcastSender>>, + pub spawned_builder_states: HashMap, BroadcastSender>>, // builder state -> last built block , it is used to respond the client // if the req channel times out during get_available_blocks - pub builder_state_to_last_built_block: HashMap<(VidCommitment, TYPES::Time), ResponseMessage>, + pub builder_state_to_last_built_block: HashMap, ResponseMessage>, // // scheduled GC by view number // pub view_to_cleanup_targets: BTreeMap>, @@ -126,7 +128,7 @@ pub struct GlobalState { pub last_garbage_collected_view_num: TYPES::Time, // highest view running builder task - pub highest_view_num_builder_id: (VidCommitment, TYPES::Time), + pub highest_view_num_builder_id: BuilderStateId, } impl GlobalState { @@ -140,76 +142,73 @@ impl GlobalState { _buffer_view_num_count: u64, ) -> Self { let mut spawned_builder_states = HashMap::new(); - spawned_builder_states.insert( - (bootstrapped_builder_state_id, bootstrapped_view_num), - bootstrap_sender.clone(), - ); + let bootstrap_id = BuilderStateId { + parent_commitment: bootstrapped_builder_state_id, + view: bootstrapped_view_num, + }; + spawned_builder_states.insert(bootstrap_id.clone(), bootstrap_sender.clone()); GlobalState { - block_hash_to_block: Default::default(), + blocks: lru::LruCache::new(NonZeroUsize::new(256).unwrap()), spawned_builder_states, tx_sender, last_garbage_collected_view_num, builder_state_to_last_built_block: Default::default(), - highest_view_num_builder_id: (bootstrapped_builder_state_id, bootstrapped_view_num), + highest_view_num_builder_id: bootstrap_id, } } pub fn register_builder_state( &mut self, - vid_commmit: VidCommitment, - view_num: TYPES::Time, + parent_id: BuilderStateId, request_sender: BroadcastSender>, ) { // register the builder state self.spawned_builder_states - .insert((vid_commmit, view_num), request_sender); + .insert(parent_id.clone(), request_sender); // keep track of the max view number - if view_num > self.highest_view_num_builder_id.1 { - tracing::info!( - "registering builder {:?}@{:?} as highest", - vid_commmit, - view_num - ); - self.highest_view_num_builder_id = (vid_commmit, view_num); + if parent_id.view > self.highest_view_num_builder_id.view { + tracing::info!("registering builder {parent_id} as highest",); + self.highest_view_num_builder_id = parent_id; } else { tracing::warn!( - "builder {:?}@{:?} created; highest registered is {:?}@{:?}", - vid_commmit, - view_num, - self.highest_view_num_builder_id.0, - self.highest_view_num_builder_id.1 + "builder {parent_id} created; highest registered is {}", + self.highest_view_num_builder_id, ); } } pub fn update_global_state( &mut self, + state_id: BuilderStateId, build_block_info: BuildBlockInfo, - builder_vid_commitment: VidCommitment, - view_num: TYPES::Time, response_msg: ResponseMessage, ) { - self.block_hash_to_block - .entry((build_block_info.builder_hash, view_num)) - .or_insert_with(|| BlockInfo { - block_payload: build_block_info.block_payload, - metadata: build_block_info.metadata, - offered_fee: build_block_info.offered_fee, - }); + if self.blocks.contains(&build_block_info.id) { + self.blocks.promote(&build_block_info.id) + } else { + self.blocks.push( + build_block_info.id, + BlockInfo { + block_payload: build_block_info.block_payload, + metadata: build_block_info.metadata, + offered_fee: build_block_info.offered_fee, + }, + ); + } // update the builder state to last built block self.builder_state_to_last_built_block - .insert((builder_vid_commitment, view_num), response_msg); + .insert(state_id, response_msg); } // remove the builder state handles based on the decide event pub fn remove_handles(&mut self, on_decide_view: TYPES::Time) -> TYPES::Time { // remove everything from the spawned builder states when view_num <= on_decide_view; // if we don't have a highest view > decide, use highest view as cutoff. - let cutoff = std::cmp::min(self.highest_view_num_builder_id.1, on_decide_view); + let cutoff = std::cmp::min(self.highest_view_num_builder_id.view, on_decide_view); self.spawned_builder_states - .retain(|(_vid, view_num), _channel| *view_num >= cutoff); + .retain(|id, _| id.view >= cutoff); let cutoff_u64 = cutoff.u64(); let gc_view = if cutoff_u64 > 0 { cutoff_u64 - 1 } else { 0 }; @@ -224,24 +223,22 @@ impl GlobalState { pub async fn submit_client_txns( &self, txns: Vec<::Transaction>, - ) -> Result::Transaction>>, BuildError> { + ) -> Vec::Transaction>, BuildError>> { handle_received_txns(&self.tx_sender, txns, TransactionSource::External).await } pub fn get_channel_for_matching_builder_or_highest_view_buider( &self, - key: &(VidCommitment, TYPES::Time), + key: &BuilderStateId, ) -> Result<&BroadcastSender>, BuildError> { if let Some(channel) = self.spawned_builder_states.get(key) { - tracing::info!("Got matching builder for parent {:?}@{:?}", key.0, key.1); + tracing::info!("Got matching builder for parent {}", key); Ok(channel) } else { tracing::warn!( - "failed to recover builder for parent {:?}@{:?}, using higest view num builder with {:?}@{:?}", - key.0, - key.1, - self.highest_view_num_builder_id.0, - self.highest_view_num_builder_id.1 + "failed to recover builder for parent {}, using higest view num builder with {}", + key, + self.highest_view_num_builder_id, ); // get the sender for the highest view number builder self.spawned_builder_states @@ -257,7 +254,7 @@ impl GlobalState { // iterate over the spawned builder states and check if the view number exists self.spawned_builder_states .iter() - .any(|((_vid, view_num), _sender)| view_num == key) + .any(|(id, _)| id.view == *key) } pub fn should_view_handle_other_proposals( @@ -265,7 +262,7 @@ impl GlobalState { builder_view: &TYPES::Time, proposal_view: &TYPES::Time, ) -> bool { - *builder_view == self.highest_view_num_builder_id.1 + *builder_view == self.highest_view_num_builder_id.view && !self.check_builder_state_existence_for_a_view(proposal_view) } } @@ -373,7 +370,10 @@ impl AcceptsTxnSubmits for ProxyGlobalState { response ); - response + // NOTE: ideally we want to respond with original Vec + // instead of Result not to loose any information, + // but this requires changes to builder API + response.into_iter().collect() } } #[async_trait] @@ -512,14 +512,16 @@ pub async fn run_non_permissioned_standalone_builder_service { let transactions = hooks.process_transactions(transactions).await; - if let Err(e) = handle_received_txns( + for res in handle_received_txns( &senders.transactions, transactions, TransactionSource::HotShot, ) .await { - tracing::warn!("Failed to handle transactions; {:?}", e); + if let Err(e) = res { + tracing::warn!("Failed to handle transactions; {:?}", e); + } } } // decide event @@ -616,14 +618,16 @@ pub async fn run_permissioned_standalone_builder_service< EventType::Transactions { transactions } => { let transactions = hooks.process_transactions(transactions).await; - if let Err(e) = handle_received_txns( + for res in handle_received_txns( &senders.transactions, transactions, TransactionSource::HotShot, ) .await { - tracing::warn!("Failed to handle transactions; {:?}", e); + if let Err(e) = res { + tracing::warn!("Failed to handle transactions; {:?}", e); + } } } // decide event @@ -797,25 +801,39 @@ pub(crate) async fn handle_received_txns( tx_sender: &BroadcastSender>>, txns: Vec, source: TransactionSource, -) -> Result::Transaction>>, BuildError> { +) -> Vec::Transaction>, BuildError>> { let mut results = Vec::with_capacity(txns.len()); let time_in = Instant::now(); for tx in txns.into_iter() { let commit = tx.commit(); - results.push(commit); let res = tx_sender - .broadcast(Arc::new(ReceivedTransaction { + .try_broadcast(Arc::new(ReceivedTransaction { tx, source: source.clone(), commit, time_in, })) - .await; - if res.is_err() { - return Err(BuildError::Error { - message: format!("Failed to broadcast txn with commit {:?}", commit), + .inspect(|val| { + if let Some(evicted_txn) = val { + tracing::warn!( + "Overflow mode enabled, transaction {} evicted", + evicted_txn.commit + ); + } + }) + .map(|_| commit) + .inspect_err(|err| { + tracing::warn!("Failed to broadcast txn with commit {:?}: {}", commit, err); + }) + .map_err(|err| match err { + TrySendError::Full(_) => BuildError::Error { + message: "Too many transactions".to_owned(), + }, + e => BuildError::Error { + message: format!("Internal error when submitting transaction: {}", e), + }, }); - } + results.push(res); } - Ok(results) + results } diff --git a/src/service.rs.diff b/src/service.rs.diff new file mode 100644 index 00000000..704f4cb3 --- /dev/null +++ b/src/service.rs.diff @@ -0,0 +1,555 @@ +diff a/src/service.rs b/src/service.rs (rejected hunks) +@@ -2,7 +2,7 @@ use hotshot::{ + traits::{election::static_committee::GeneralStaticCommittee, NodeImplementation}, + types::{Event, SystemContextHandle}, + }; +-use hotshot_builder_api::v0_2::{ ++use hotshot_builder_api::v0_1::{ + block_info::{AvailableBlockData, AvailableBlockHeaderInput, AvailableBlockInfo}, + builder::BuildError, + data_source::{AcceptsTxnSubmits, BuilderDataSource}, +@@ -21,12 +21,20 @@ use hotshot_types::{ + utils::BuilderCommitment, + vid::{VidCommitment, VidPrecomputeData}, + }; ++use lru::LruCache; + +-use crate::builder_state::{ +- BuildBlockInfo, DaProposalMessage, DecideMessage, QCMessage, TransactionSource, TriggerStatus, +-}; +-use crate::builder_state::{MessageType, RequestMessage, ResponseMessage}; + use crate::WaitAndKeep; ++use crate::{ ++ builder_state::{ ++ BuildBlockInfo, DaProposalMessage, DecideMessage, QCMessage, TransactionSource, ++ TriggerStatus, ++ }, ++ BlockId, ++}; ++use crate::{ ++ builder_state::{MessageType, RequestMessage, ResponseMessage}, ++ BuilderStateId, ++}; + use anyhow::{anyhow, Context}; + pub use async_broadcast::{broadcast, RecvError, TryRecvError}; + use async_broadcast::{Sender as BroadcastSender, TrySendError}; +@@ -108,15 +116,14 @@ pub struct ReceivedTransactionTYPES: NodeType> { + #[derive(Debug)] + pub struct GlobalStateTYPES: NodeType> { + // data store for the blocks +- pub block_hash_to_block: HashMap<(BuilderCommitment, Types::Time), BlockInfoTYPES>>, ++ pub blocks: lru::LruCache, BlockInfoTYPES>>, + + // registered builder states +- pub spawned_builder_states: +- HashMap<(VidCommitment, Types::Time), BroadcastSender>>, ++ pub spawned_builder_states: HashMap, BroadcastSender>>, + + // builder state -> last built block , it is used to respond the client + // if the req channel times out during get_available_blocks +- pub builder_state_to_last_built_block: HashMap<(VidCommitment, Types::Time), ResponseMessage>, ++ pub builder_state_to_last_built_block: HashMap, ResponseMessage>, + + // sending a transaction from the hotshot/private mempool to the builder states + // NOTE: Currently, we don't differentiate between the transactions from the hotshot and the private mempool +@@ -126,7 +133,7 @@ pub struct GlobalStateTYPES: NodeType> { + pub last_garbage_collected_view_num: Types::Time, + + // highest view running builder task +- pub highest_view_num_builder_id: (VidCommitment, Types::Time), ++ pub highest_view_num_builder_id: BuilderStateIdTYPES>, + } + + implTYPES: NodeType> GlobalStateTYPES> { +@@ -140,80 +147,77 @@ implTYPES: NodeType> GlobalStateTYPES> { + _buffer_view_num_count: u64, + ) -> Self { + let mut spawned_builder_states = HashMap::new(); +- spawned_builder_states.insert( +- (bootstrapped_builder_state_id, bootstrapped_view_num), +- bootstrap_sender.clone(), +- ); ++ let bootstrap_id = BuilderStateId { ++ parent_commitment: bootstrapped_builder_state_id, ++ view: bootstrapped_view_num, ++ }; ++ spawned_builder_states.insert(bootstrap_id.clone(), bootstrap_sender.clone()); + GlobalState { +- block_hash_to_block: Default::default(), ++ blocks: LruCache::new(NonZeroUsize::new(256).unwrap()), + spawned_builder_states, + tx_sender, + last_garbage_collected_view_num, + builder_state_to_last_built_block: Default::default(), +- highest_view_num_builder_id: (bootstrapped_builder_state_id, bootstrapped_view_num), ++ highest_view_num_builder_id: bootstrap_id, + } + } + + pub fn register_builder_state( + &mut self, +- vid_commmit: VidCommitment, +- view_num: Types::Time, ++ parent_id: BuilderStateIdTYPES>, + request_sender: BroadcastSender>, + ) { + // register the builder state + self.spawned_builder_states +- .insert((vid_commmit, view_num), request_sender); ++ .insert(parent_id.clone(), request_sender); + + // keep track of the max view number +- if view_num > self.highest_view_num_builder_id.1 { +- tracing::info!( +- "registering builder {:?}@{:?} as highest", +- vid_commmit, +- view_num +- ); +- self.highest_view_num_builder_id = (vid_commmit, view_num); ++ if parent_id.view > self.highest_view_num_builder_id.view { ++ tracing::info!("registering builder {parent_id} as highest",); ++ self.highest_view_num_builder_id = parent_id; + } else { + tracing::warn!( +- "builder {:?}@{:?} created; highest registered is {:?}@{:?}", +- vid_commmit, +- view_num, +- self.highest_view_num_builder_id.0, +- self.highest_view_num_builder_id.1 ++ "builder {parent_id} created; highest registered is {}", ++ self.highest_view_num_builder_id, + ); + } + } + + pub fn update_global_state( + &mut self, ++ state_id: BuilderStateIdTYPES>, + build_block_info: BuildBlockInfoTYPES>, +- builder_vid_commitment: VidCommitment, +- view_num: Types::Time, + response_msg: ResponseMessage, + ) { +- self.block_hash_to_block +- .entry((build_block_info.builder_hash, view_num)) +- .or_insert_with(|| BlockInfo { +- block_payload: build_block_info.block_payload, +- metadata: build_block_info.metadata, +- vid_trigger: Arc::new(RwLock::new(Some(build_block_info.vid_trigger))), +- vid_receiver: Arc::new(RwLock::new(WaitAndKeep::Wait( +- build_block_info.vid_receiver, +- ))), +- offered_fee: build_block_info.offered_fee, +- }); ++ if self.blocks.contains(&build_block_info.id) { ++ self.blocks.promote(&build_block_info.id) ++ } else { ++ self.blocks.push( ++ build_block_info.id, ++ BlockInfo { ++ block_payload: build_block_info.block_payload, ++ metadata: build_block_info.metadata, ++ vid_trigger: Arc::new(RwLock::new(Some(build_block_info.vid_trigger))), ++ vid_receiver: Arc::new(RwLock::new(WaitAndKeep::Wait( ++ build_block_info.vid_receiver, ++ ))), ++ offered_fee: build_block_info.offered_fee, ++ }, ++ ); ++ } + + // update the builder state to last built block + self.builder_state_to_last_built_block +- .insert((builder_vid_commitment, view_num), response_msg); ++ .insert(state_id, response_msg); + } + + // remove the builder state handles based on the decide event + pub fn remove_handles(&mut self, on_decide_view: Types::Time) -> Types::Time { + // remove everything from the spawned builder states when view_num <= on_decide_view; + // if we don't have a highest view > decide, use highest view as cutoff. +- let cutoff = std::cmp::min(self.highest_view_num_builder_id.1, on_decide_view); ++ let cutoff = std::cmp::min(self.highest_view_num_builder_id.view, on_decide_view); + self.spawned_builder_states +- .retain(|(_vid, view_num), _channel| *view_num >= cutoff); ++ .retain(|id, _| id.view >= cutoff); + + let cutoff_u64 = cutoff.u64(); + let gc_view = if cutoff_u64 > 0 { cutoff_u64 - 1 } else { 0 }; +@@ -234,18 +238,16 @@ implTYPES: NodeType> GlobalStateTYPES> { + + pub fn get_channel_for_matching_builder_or_highest_view_buider( + &self, +- key: &(VidCommitment, Types::Time), ++ key: &BuilderStateIdTYPES>, + ) -> Result<&BroadcastSender>, BuildError> { + if let Some(channel) = self.spawned_builder_states.get(key) { +- tracing::info!("Got matching builder for parent {:?}@{:?}", key.0, key.1); ++ tracing::info!("Got matching builder for parent {}", key); + Ok(channel) + } else { + tracing::warn!( +- "failed to recover builder for parent {:?}@{:?}, using higest view num builder with {:?}@{:?}", +- key.0, +- key.1, +- self.highest_view_num_builder_id.0, +- self.highest_view_num_builder_id.1 ++ "failed to recover builder for parent {}, using higest view num builder with {}", ++ key, ++ self.highest_view_num_builder_id, + ); + // get the sender for the highest view number builder + self.spawned_builder_states +@@ -269,7 +271,7 @@ implTYPES: NodeType> GlobalStateTYPES> { + builder_view: &Types::Time, + proposal_view: &Types::Time, + ) -> bool { +- *builder_view == self.highest_view_num_builder_id.1 ++ *builder_view == self.highest_view_num_builder_id.view + && !self.check_builder_state_existence_for_a_view(proposal_view) + } + } +@@ -327,21 +329,22 @@ where + ) -> Result>, BuildError> { + let starting_time = Instant::now(); + ++ let state_id = BuilderStateId { ++ parent_commitment: *for_parent, ++ view: Types::Time::new(view_number), ++ }; ++ + // verify the signature +- if !sender.validate(signature, for_parent.as_ref()) { ++ if !sender.validate(signature, state_id.parent_commitment.as_ref()) { + tracing::error!("Signature validation failed in get_available_blocks"); + return Err(BuildError::Error { + message: "Signature validation failed in get_available_blocks".to_string(), + }); + } + +- tracing::info!( +- "Requesting available blocks for (parent {:?}, view_num: {:?})", +- for_parent, +- view_number +- ); ++ tracing::info!("Requesting available blocks for {state_id}",); + +- let view_num = ::Time as ConsensusTime>::new(view_number); ++ let view_num = state_id.view; + // check in the local spawned builder states + // if it doesn't exist; there are three cases + // 1) it has already been garbage collected (view < decide) and we should return an error +@@ -353,14 +356,14 @@ where + // If this `BlockBuilder` hasn't been reaped, it should have been. + let global_state = self.global_state.read_arc().await; + if view_num < global_state.last_garbage_collected_view_num +- && global_state.highest_view_num_builder_id.1 ++ && global_state.highest_view_num_builder_id.view + != global_state.last_garbage_collected_view_num + { + tracing::warn!( + "Requesting for view {:?}, last decide-triggered cleanup on view {:?}, highest view num is {:?}", + view_num, + global_state.last_garbage_collected_view_num, +- global_state.highest_view_num_builder_id.1 ++ global_state.highest_view_num_builder_id.view + ); + return Err(BuildError::Error { + message: +@@ -372,8 +375,7 @@ where + + let (response_sender, response_receiver) = unbounded(); + let req_msg = RequestMessage { +- requested_vid_commitment: (*for_parent), +- requested_view_number: view_number, ++ state_id: state_id.clone(), + response_channel: response_sender, + }; + let timeout_after = starting_time + self.max_api_waiting_time; +@@ -382,7 +384,6 @@ where + let time_to_wait_for_matching_builder = starting_time + self.max_api_waiting_time / 2; + + let mut sent = false; +- let key = (*for_parent, view_num); + while !sent && Instant::now() < time_to_wait_for_matching_builder { + // try to broadcast the request to the correct builder state + if let Some(builder) = self +@@ -390,26 +391,21 @@ where + .read_arc() + .await + .spawned_builder_states +- .get(&key) ++ .get(&state_id) + { + tracing::info!( +- "Got matching BlockBuilder for {:?}@{view_number}, sending get_available_blocks request", +- req_msg.requested_vid_commitment ++ "Got matching BlockBuilder for {state_id}, sending get_available_blocks request", + ); + if let Err(e) = builder + .broadcast(MessageType::RequestMessage(req_msg.clone())) + .await + { +- tracing::warn!( +- "Error {e} sending get_available_blocks request for parent {:?}@{view_number}", +- req_msg.requested_vid_commitment +- ); ++ tracing::warn!("Error {e} sending get_available_blocks request for {state_id}",); + } + sent = true; + } else { + tracing::info!( +- "Failed to get matching BlockBuilder for {:?}@{view_number}, will try again", +- req_msg.requested_vid_commitment ++ "Failed to get matching BlockBuilder for {state_id}, will try again", + ); + async_sleep(check_duration).await + } +@@ -421,39 +417,32 @@ where + .global_state + .read_arc() + .await +- .get_channel_for_matching_builder_or_highest_view_buider(&(*for_parent, view_num))? ++ .get_channel_for_matching_builder_or_highest_view_buider(&state_id)? + .broadcast(MessageType::RequestMessage(req_msg.clone())) + .await + { + tracing::warn!( +- "Error {e} sending get_available_blocks request for parent {:?}@{view_number}", +- req_msg.requested_vid_commitment ++ "Error {e} sending get_available_blocks request for parent {state_id}", + ); + } + } + +- tracing::debug!( +- "Waiting for response for get_available_blocks with parent {:?}@{view_number}", +- req_msg.requested_vid_commitment +- ); ++ tracing::debug!("Waiting for response for get_available_blocks with parent {state_id}",); + + let response_received = loop { + match async_timeout(check_duration, response_receiver.recv()).await { + Err(toe) => { + if Instant::now() >= timeout_after { +- tracing::warn!(%toe, "Couldn't get available blocks in time for parent {:?}@{view_number}", req_msg.requested_vid_commitment); ++ tracing::warn!(%toe, "Couldn't get available blocks in time for parent {state_id}"); + // lookup into the builder_state_to_last_built_block, if it contains the result, return that otherwise return error + if let Some(last_built_block) = self + .global_state + .read_arc() + .await + .builder_state_to_last_built_block +- .get(&(*for_parent, view_num)) ++ .get(&state_id) + { +- tracing::info!( +- "Returning last built block for parent {:?}@{view_number}", +- req_msg.requested_vid_commitment +- ); ++ tracing::info!("Returning last built block for parent {state_id}",); + break Ok(last_built_block.clone()); + } + break Err(BuildError::Error { +@@ -464,7 +453,7 @@ where + } + Ok(recv_attempt) => { + if let Err(ref e) = recv_attempt { +- tracing::error!(%e, "Channel closed while getting available blocks for parent {:?}@{view_number}", req_msg.requested_vid_commitment); ++ tracing::error!(%e, "Channel closed while getting available blocks for parent {state_id}"); + } + break recv_attempt.map_err(|_| BuildError::Error { + message: "channel unexpectedly closed".to_string(), +@@ -499,9 +488,7 @@ where + _phantom: Default::default(), + }; + tracing::info!( +- "Sending available Block info response for (parent {:?}, view_num: {:?}) with block hash: {:?}", +- req_msg.requested_vid_commitment, +- view_number, ++ "Sending available Block info response for {state_id} with block hash: {:?}", + response.builder_hash + ); + Ok(vec![initial_block_info]) +@@ -509,10 +496,7 @@ where + + // We failed to get available blocks + Err(e) => { +- tracing::warn!( +- "Failed to get available blocks for parent {:?}@{view_number}", +- req_msg.requested_vid_commitment +- ); ++ tracing::warn!("Failed to get available blocks for parent {state_id}",); + Err(e) + } + } +@@ -520,49 +504,35 @@ where + + async fn claim_block( + &self, +- block_hash: &BuilderCommitment, +- view_number: u64, ++ _block_hash: &BuilderCommitment, ++ _view_number: u64, + sender: Types::SignatureKey, + signature: &::SignatureKey as SignatureKey>::PureAssembledSignatureType, + ) -> Result, BuildError> { +- tracing::info!( +- "Received request for claiming block for (block_hash {:?}, view_num: {:?})", +- block_hash, +- view_number +- ); ++ let block_id = BlockId { ++ hash: _block_hash.clone(), ++ view: Types::Time::new(_view_number), ++ }; ++ ++ tracing::info!("Received request for claiming block {block_id}",); + // verify the signature +- if !sender.validate(signature, block_hash.as_ref()) { ++ if !sender.validate(signature, block_id.hash.as_ref()) { + tracing::error!("Signature validation failed in claim block"); + return Err(BuildError::Error { + message: "Signature validation failed in claim block".to_string(), + }); + } + let (pub_key, sign_key) = self.builder_keys.clone(); +- let view_num = ::Time as ConsensusTime>::new(view_number); + +- if let Some(block_info) = self +- .global_state +- .read_arc() +- .await +- .block_hash_to_block +- .get(&(block_hash.clone(), view_num)) +- { +- tracing::info!( +- "Trying sending vid trigger info for {:?}@{:?}", +- block_hash, +- view_num +- ); ++ if let Some(block_info) = self.global_state.write_arc().await.blocks.get(&block_id) { ++ tracing::info!("Trying sending vid trigger info for {block_id}",); + + if let Some(trigger_writer) = block_info.vid_trigger.write().await.take() { +- tracing::info!("Sending vid trigger for {:?}@{:?}", block_hash, view_num); ++ tracing::info!("Sending vid trigger for {block_id}"); + trigger_writer.send(TriggerStatus::Start); +- tracing::info!("Sent vid trigger for {:?}@{:?}", block_hash, view_num); ++ tracing::info!("Sent vid trigger for {block_id}"); + } +- tracing::info!( +- "Done Trying sending vid trigger info for {:?}@{:?}", +- block_hash, +- view_num +- ); ++ tracing::info!("Done Trying sending vid trigger info for {block_id}",); + + // sign over the builder commitment, as the proposer can computer it based on provide block_payload + // and the metata data +@@ -584,11 +554,7 @@ where + signature: signature_over_builder_commitment, + sender: pub_key.clone(), + }; +- tracing::info!( +- "Sending Claim Block data for (block_hash {:?}, view_num: {:?})", +- block_hash, +- view_number +- ); ++ tracing::info!("Sending Claim Block data for {block_id}",); + Ok(block_data) + } else { + tracing::warn!("Claim Block not found"); +@@ -600,33 +566,27 @@ where + + async fn claim_block_header_input( + &self, +- block_hash: &BuilderCommitment, +- view_number: u64, ++ _block_hash: &BuilderCommitment, ++ _view_number: u64, + sender: Types::SignatureKey, + signature: &::SignatureKey as SignatureKey>::PureAssembledSignatureType, + ) -> Result, BuildError> { +- tracing::info!( +- "Received request for claiming block header input for (block_hash {:?}, view_num: {:?})", +- block_hash, +- view_number +- ); ++ let id = BlockId { ++ hash: _block_hash.clone(), ++ view: Types::Time::new(_view_number), ++ }; ++ ++ tracing::info!("Received request for claiming block header input for block {id}"); + // verify the signature +- if !sender.validate(signature, block_hash.as_ref()) { ++ if !sender.validate(signature, id.hash.as_ref()) { + tracing::error!("Signature validation failed in claim block header input"); + return Err(BuildError::Error { + message: "Signature validation failed in claim block header input".to_string(), + }); + } + let (pub_key, sign_key) = self.builder_keys.clone(); +- let view_num = ::Time as ConsensusTime>::new(view_number); +- if let Some(block_info) = self +- .global_state +- .read_arc() +- .await +- .block_hash_to_block +- .get(&(block_hash.clone(), view_num)) +- { +- tracing::info!("Waiting for vid commitment for block {:?}", block_hash); ++ if let Some(block_info) = self.global_state.write_arc().await.blocks.get(&id) { ++ tracing::info!("Waiting for vid commitment for block {id}"); + + let timeout_after = Instant::now() + self.max_api_waiting_time; + let check_duration = self.max_api_waiting_time / 10; +@@ -637,10 +597,7 @@ where + { + Err(_toe) => { + if Instant::now() >= timeout_after { +- tracing::warn!( +- "Couldn't get vid commitment in time for block {:?}", +- block_hash +- ); ++ tracing::warn!("Couldn't get vid commitment in time for block {id}",); + break Err(BuildError::Error { + message: "Couldn't get vid commitment in time".to_string(), + }); +@@ -650,8 +607,7 @@ where + Ok(recv_attempt) => { + if let Err(ref _e) = recv_attempt { + tracing::error!( +- "Channel closed while getting vid commitment for block {:?}", +- block_hash ++ "Channel closed while getting vid commitment for block {id}", + ); + } + break recv_attempt.map_err(|_| BuildError::Error { +@@ -661,11 +617,7 @@ where + } + }; + +- tracing::info!( +- "Got vid commitment for block {:?}@{:?}", +- block_hash, +- view_number +- ); ++ tracing::info!("Got vid commitment for block {id}",); + if response_received.is_ok() { + let (vid_commitment, vid_precompute_data) = + response_received.map_err(|err| BuildError::Error { +@@ -699,11 +651,7 @@ where + message_signature: signature_over_vid_commitment, + sender: pub_key.clone(), + }; +- tracing::info!( +- "Sending Claim Block Header Input response for (block_hash {:?}, view_num: {:?})", +- block_hash, +- view_number +- ); ++ tracing::info!("Sending Claim Block Header Input response for {id}",); + Ok(response) + } else { + tracing::warn!("Claim Block Header Input not found"); diff --git a/src/testing/basic_test.rs b/src/testing/basic_test.rs index 932de265..d2a0cdf5 100644 --- a/src/testing/basic_test.rs +++ b/src/testing/basic_test.rs @@ -10,7 +10,7 @@ pub use hotshot_types::{ }, }; -pub use crate::builder_state::{BuilderProgress, BuilderState, MessageType, ResponseMessage}; +pub use crate::builder_state::{BuilderState, MessageType, ResponseMessage}; pub use async_broadcast::{ broadcast, Receiver as BroadcastReceiver, RecvError, Sender as BroadcastSender, TryRecvError, }; @@ -28,7 +28,6 @@ mod tests { simple_vote::QuorumData, traits::block_contents::{vid_commitment, BlockHeader}, utils::BuilderCommitment, - vid::VidCommitment, }; use hotshot_example_types::{ @@ -41,6 +40,7 @@ mod tests { TransactionSource, }; use crate::service::{handle_received_txns, GlobalState, ReceivedTransaction}; + use crate::BuilderStateId; use async_lock::RwLock; use async_std::task; use committable::{Commitment, CommitmentBoundsArkless, Committable}; @@ -127,7 +127,7 @@ mod tests { #[allow(clippy::type_complexity)] let mut sreq_msgs: Vec<( UnboundedReceiver, - (VidCommitment, ViewNumber), + BuilderStateId, MessageType, )> = Vec::new(); // storing response messages @@ -288,9 +288,11 @@ mod tests { // validate the signature before pushing the message to the builder_state channels // currently this step happens in the service.rs, wheneve we receiver an hotshot event tracing::debug!("Sending transaction message: {:?}", tx); - handle_received_txns(&tx_sender, vec![tx.clone()], TransactionSource::HotShot) - .await - .unwrap(); + for res in + handle_received_txns(&tx_sender, vec![tx.clone()], TransactionSource::HotShot).await + { + res.unwrap(); + } da_sender .broadcast(MessageType::DaProposalMessage(Arc::clone(&sda_msg))) .await @@ -306,7 +308,7 @@ mod tests { let (response_sender, response_receiver) = unbounded(); let request_message = MessageType::::RequestMessage(RequestMessage { - requested_view_number: i as u64, + requested_view_number: ViewNumber::new(i as u64), response_channel: response_sender, }); @@ -315,7 +317,10 @@ mod tests { sqc_msgs.push(sqc_msg); sreq_msgs.push(( response_receiver, - (requested_vid_commitment, ViewNumber::new(i as u64)), + BuilderStateId { + parent_commitment: requested_vid_commitment, + view: ViewNumber::new(i as u64), + }, request_message, )); } diff --git a/src/testing/basic_test.rs.rej b/src/testing/basic_test.rs.rej new file mode 100644 index 00000000..47b995e5 --- /dev/null +++ b/src/testing/basic_test.rs.rej @@ -0,0 +1,15 @@ +diff a/src/testing/basic_test.rs b/src/testing/basic_test.rs (rejected hunks) +@@ -324,8 +324,11 @@ mod tests { + + let (response_sender, response_receiver) = unbounded(); + let request_message = MessageType::::RequestMessage(RequestMessage { +- requested_vid_commitment, +- requested_view_number: i as u64, ++ state_id: crate::BuilderStateId { ++ parent_commitment: requested_vid_commitment, ++ view: ::Time::new(i as u64), ++ }, ++ + response_channel: response_sender, + }); +