diff --git a/Cargo.lock b/Cargo.lock index b0a0b9fc..cf8efc28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4530,12 +4530,14 @@ dependencies = [ "committable", "derivative", "futures", + "hex", "hotshot", "hotshot-builder-api", "hotshot-events-service", "hotshot-example-types", "hotshot-task-impls", "hotshot-types", + "lru 0.12.3", "multimap", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 774df168..f82a9d0f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,11 +17,13 @@ clap = { version = "4.5", features = ["derive", "env"] } committable = "0.2" derivative = "2.2" futures = "0.3" +hex = "0.4.3" hotshot = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.67" } hotshot-builder-api = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.67" } hotshot-events-service = { git = "https://github.com/EspressoSystems/hotshot-events-service.git", tag = "0.1.37" } hotshot-task-impls = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.67" } hotshot-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.67" } +lru = "0.12.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.10" @@ -38,3 +40,8 @@ multimap = "0.10.0" [dev-dependencies] hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.67" } + +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(async_executor_impl, values("async-std", "tokio"))', +] } diff --git a/src/builder_state.rs b/src/builder_state.rs index 374347bc..f8389170 100644 --- a/src/builder_state.rs +++ b/src/builder_state.rs @@ -12,14 +12,16 @@ use hotshot_types::{ use committable::{Commitment, Committable}; -use crate::service::{GlobalState, ReceivedTransaction}; +use crate::{ + service::{GlobalState, ReceivedTransaction}, + BlockId, BuilderStateId, +}; use async_broadcast::broadcast; use async_broadcast::Receiver as BroadcastReceiver; use async_broadcast::Sender as BroadcastSender; use async_compatibility_layer::channel::UnboundedSender; use async_compatibility_layer::{art::async_sleep, art::async_spawn}; use async_lock::RwLock; -use async_trait::async_trait; use core::panic; use futures::StreamExt; @@ -62,8 +64,8 @@ pub struct QCMessage { } /// Request Message to be put on the request channel #[derive(Clone, Debug)] -pub struct RequestMessage { - pub requested_view_number: u64, +pub struct RequestMessage { + pub requested_view_number: TYPES::Time, pub response_channel: UnboundedSender, } pub enum TriggerStatus { @@ -74,7 +76,7 @@ pub enum TriggerStatus { /// Response Message to be put on the response channel #[derive(Debug)] pub struct BuildBlockInfo { - pub builder_hash: BuilderCommitment, + pub id: BlockId, pub block_size: u64, pub offered_fee: u64, pub block_payload: TYPES::BlockPayload, @@ -163,7 +165,7 @@ pub struct BuilderState { pub total_nodes: NonZeroUsize, /// locally spawned builder Commitements - pub builder_commitments: HashSet<(VidCommitment, BuilderCommitment, TYPES::Time)>, + pub builder_commitments: HashSet<(BuilderStateId, BuilderCommitment)>, /// timeout for maximising the txns in the block pub maximize_txn_capture_timeout: Duration, @@ -185,41 +187,7 @@ pub struct BuilderState { pub next_txn_garbage_collect_time: Instant, } -/// Trait to hold the helper functions for the builder -#[async_trait] -pub trait BuilderProgress { - /// process the DA proposal - async fn process_da_proposal(&mut self, da_msg: Arc>); - /// process the quorum proposal - async fn process_quorum_proposal(&mut self, qc_msg: QCMessage); - - /// process the decide event - async fn process_decide_event(&mut self, decide_msg: DecideMessage) -> Option; - - /// spawn a clone of builder - async fn spawn_clone( - self, - da_proposal: Arc>, - quorum_proposal: Arc>>, - req_sender: BroadcastSender>, - ); - - /// build a block - async fn build_block( - &mut self, - matching_builder_commitment: VidCommitment, - matching_view_number: TYPES::Time, - ) -> Option>; - - /// Event Loop - fn event_loop(self); - - /// process the block request - async fn process_block_request(&mut self, req: RequestMessage); -} - -#[async_trait] -impl BuilderProgress for BuilderState { +impl BuilderState { /// processing the DA proposal #[tracing::instrument(skip_all, name = "process da proposal", fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] @@ -440,8 +408,10 @@ impl BuilderProgress for BuilderState { // register the spawned builder state to spawned_builder_states in the global state self.global_state.write_arc().await.register_builder_state( - self.built_from_proposed_block.vid_commitment, - self.built_from_proposed_block.view_number, + BuilderStateId { + parent_commitment: self.built_from_proposed_block.vid_commitment, + view: self.built_from_proposed_block.view_number, + }, req_sender, ); @@ -453,8 +423,7 @@ impl BuilderProgress for BuilderState { fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] async fn build_block( &mut self, - matching_vid: VidCommitment, - requested_view_number: TYPES::Time, + state_id: BuilderStateId, ) -> Option> { let timeout_after = Instant::now() + self.maximize_txn_capture_timeout; let sleep_interval = self.maximize_txn_capture_timeout / 10; @@ -483,11 +452,8 @@ impl BuilderProgress for BuilderState { let txn_count = payload.num_transactions(&metadata); // insert the recently built block into the builder commitments - self.builder_commitments.insert(( - matching_vid, - builder_hash.clone(), - requested_view_number, - )); + self.builder_commitments + .insert((state_id, builder_hash.clone())); let encoded_txns: Vec = payload.encode().to_vec(); let block_size: u64 = encoded_txns.len() as u64; @@ -501,7 +467,10 @@ impl BuilderProgress for BuilderState { ); Some(BuildBlockInfo { - builder_hash, + id: BlockId { + view: self.built_from_proposed_block.view_number, + hash: builder_hash, + }, block_size, offered_fee, block_payload: payload, @@ -513,9 +482,8 @@ impl BuilderProgress for BuilderState { } } - async fn process_block_request(&mut self, req: RequestMessage) { - let requested_view_number = - <::Time as ConsensusTime>::new(req.requested_view_number); + async fn process_block_request(&mut self, req: RequestMessage) { + let requested_view_number = req.requested_view_number; // If a spawned clone is active then it will handle the request, otherwise the highest view num builder will handle if requested_view_number == self.built_from_proposed_block.view_number { tracing::info!( @@ -525,26 +493,28 @@ impl BuilderProgress for BuilderState { requested_view_number ); let response = self - .build_block( - self.built_from_proposed_block.vid_commitment, - requested_view_number, - ) + .build_block(BuilderStateId { + parent_commitment: self.built_from_proposed_block.vid_commitment, + view: requested_view_number, + }) .await; match response { Some(response) => { // form the response message let response_msg = ResponseMessage { - builder_hash: response.builder_hash.clone(), + builder_hash: response.id.hash.clone(), block_size: response.block_size, offered_fee: response.offered_fee, }; - let builder_hash = response.builder_hash.clone(); + let builder_hash = response.id.hash.clone(); self.global_state.write_arc().await.update_global_state( + BuilderStateId { + parent_commitment: self.built_from_proposed_block.vid_commitment, + view: requested_view_number, + }, response, - self.built_from_proposed_block.vid_commitment, - requested_view_number, response_msg.clone(), ); @@ -581,7 +551,7 @@ impl BuilderProgress for BuilderState { } #[tracing::instrument(skip_all, name = "event loop", fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] - fn event_loop(mut self) { + pub fn event_loop(mut self) { let _builder_handle = async_spawn(async move { loop { tracing::debug!( @@ -686,7 +656,7 @@ pub enum MessageType { DecideMessage(DecideMessage), DaProposalMessage(Arc>), QCMessage(QCMessage), - RequestMessage(RequestMessage), + RequestMessage(RequestMessage), } #[allow(clippy::too_many_arguments)] diff --git a/src/lib.rs b/src/lib.rs index f128f98d..0625bf81 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,10 @@ // It also provides one API services external users: // 1. Serves a user's request to submit a private transaction +use hotshot_types::{ + traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment, +}; + // providing the core services to support above API services pub mod builder_state; @@ -19,3 +23,32 @@ pub mod service; // tracking the testing pub mod testing; + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct BlockId { + hash: BuilderCommitment, + view: TYPES::Time, +} + +impl std::fmt::Display for BlockId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Block({}@{})", + hex::encode(self.hash.as_ref()), + *self.view + ) + } +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct BuilderStateId { + parent_commitment: VidCommitment, + view: TYPES::Time, +} + +impl std::fmt::Display for BuilderStateId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BuilderState({}@{})", self.parent_commitment, *self.view) + } +} diff --git a/src/service.rs b/src/service.rs index cde2580f..18c9e339 100644 --- a/src/service.rs +++ b/src/service.rs @@ -28,13 +28,16 @@ use tracing::error; use std::{fmt::Debug, marker::PhantomData}; -use crate::builder_state::{ - BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QCMessage, ResponseMessage, - TransactionSource, +use crate::{ + builder_state::{ + BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QCMessage, ResponseMessage, + TransactionSource, + }, + BlockId, BuilderStateId, }; use anyhow::anyhow; -use async_broadcast::Sender as BroadcastSender; pub use async_broadcast::{broadcast, RecvError, TryRecvError}; +use async_broadcast::{Sender as BroadcastSender, TrySendError}; use async_lock::RwLock; use async_trait::async_trait; use committable::{Commitment, Committable}; @@ -106,15 +109,14 @@ pub struct ReceivedTransaction { #[derive(Debug)] pub struct GlobalState { // data store for the blocks - pub block_hash_to_block: HashMap<(BuilderCommitment, TYPES::Time), BlockInfo>, + pub blocks: lru::LruCache, BlockInfo>, // registered builder states - pub spawned_builder_states: - HashMap<(VidCommitment, TYPES::Time), BroadcastSender>>, + pub spawned_builder_states: HashMap, BroadcastSender>>, // builder state -> last built block , it is used to respond the client // if the req channel times out during get_available_blocks - pub builder_state_to_last_built_block: HashMap<(VidCommitment, TYPES::Time), ResponseMessage>, + pub builder_state_to_last_built_block: HashMap, ResponseMessage>, // // scheduled GC by view number // pub view_to_cleanup_targets: BTreeMap>, @@ -127,7 +129,7 @@ pub struct GlobalState { pub last_garbage_collected_view_num: TYPES::Time, // highest view running builder task - pub highest_view_num_builder_id: (VidCommitment, TYPES::Time), + pub highest_view_num_builder_id: BuilderStateId, } impl GlobalState { @@ -141,76 +143,73 @@ impl GlobalState { _buffer_view_num_count: u64, ) -> Self { let mut spawned_builder_states = HashMap::new(); - spawned_builder_states.insert( - (bootstrapped_builder_state_id, bootstrapped_view_num), - bootstrap_sender.clone(), - ); + let bootstrap_id = BuilderStateId { + parent_commitment: bootstrapped_builder_state_id, + view: bootstrapped_view_num, + }; + spawned_builder_states.insert(bootstrap_id.clone(), bootstrap_sender.clone()); GlobalState { - block_hash_to_block: Default::default(), + blocks: lru::LruCache::new(NonZeroUsize::new(256).unwrap()), spawned_builder_states, tx_sender, last_garbage_collected_view_num, builder_state_to_last_built_block: Default::default(), - highest_view_num_builder_id: (bootstrapped_builder_state_id, bootstrapped_view_num), + highest_view_num_builder_id: bootstrap_id, } } pub fn register_builder_state( &mut self, - vid_commmit: VidCommitment, - view_num: TYPES::Time, + parent_id: BuilderStateId, request_sender: BroadcastSender>, ) { // register the builder state self.spawned_builder_states - .insert((vid_commmit, view_num), request_sender); + .insert(parent_id.clone(), request_sender); // keep track of the max view number - if view_num > self.highest_view_num_builder_id.1 { - tracing::info!( - "registering builder {:?}@{:?} as highest", - vid_commmit, - view_num - ); - self.highest_view_num_builder_id = (vid_commmit, view_num); + if parent_id.view > self.highest_view_num_builder_id.view { + tracing::info!("registering builder {parent_id} as highest",); + self.highest_view_num_builder_id = parent_id; } else { tracing::warn!( - "builder {:?}@{:?} created; highest registered is {:?}@{:?}", - vid_commmit, - view_num, - self.highest_view_num_builder_id.0, - self.highest_view_num_builder_id.1 + "builder {parent_id} created; highest registered is {}", + self.highest_view_num_builder_id, ); } } pub fn update_global_state( &mut self, + state_id: BuilderStateId, build_block_info: BuildBlockInfo, - builder_vid_commitment: VidCommitment, - view_num: TYPES::Time, response_msg: ResponseMessage, ) { - self.block_hash_to_block - .entry((build_block_info.builder_hash, view_num)) - .or_insert_with(|| BlockInfo { - block_payload: build_block_info.block_payload, - metadata: build_block_info.metadata, - offered_fee: build_block_info.offered_fee, - }); + if self.blocks.contains(&build_block_info.id) { + self.blocks.promote(&build_block_info.id) + } else { + self.blocks.push( + build_block_info.id, + BlockInfo { + block_payload: build_block_info.block_payload, + metadata: build_block_info.metadata, + offered_fee: build_block_info.offered_fee, + }, + ); + } // update the builder state to last built block self.builder_state_to_last_built_block - .insert((builder_vid_commitment, view_num), response_msg); + .insert(state_id, response_msg); } // remove the builder state handles based on the decide event pub fn remove_handles(&mut self, on_decide_view: TYPES::Time) -> TYPES::Time { // remove everything from the spawned builder states when view_num <= on_decide_view; // if we don't have a highest view > decide, use highest view as cutoff. - let cutoff = std::cmp::min(self.highest_view_num_builder_id.1, on_decide_view); + let cutoff = std::cmp::min(self.highest_view_num_builder_id.view, on_decide_view); self.spawned_builder_states - .retain(|(_vid, view_num), _channel| *view_num >= cutoff); + .retain(|id, _| id.view >= cutoff); let cutoff_u64 = cutoff.u64(); let gc_view = if cutoff_u64 > 0 { cutoff_u64 - 1 } else { 0 }; @@ -225,24 +224,22 @@ impl GlobalState { pub async fn submit_client_txns( &self, txns: Vec<::Transaction>, - ) -> Result::Transaction>>, BuildError> { + ) -> Vec::Transaction>, BuildError>> { handle_received_txns(&self.tx_sender, txns, TransactionSource::External).await } pub fn get_channel_for_matching_builder_or_highest_view_buider( &self, - key: &(VidCommitment, TYPES::Time), + key: &BuilderStateId, ) -> Result<&BroadcastSender>, BuildError> { if let Some(channel) = self.spawned_builder_states.get(key) { - tracing::info!("Got matching builder for parent {:?}@{:?}", key.0, key.1); + tracing::info!("Got matching builder for parent {}", key); Ok(channel) } else { tracing::warn!( - "failed to recover builder for parent {:?}@{:?}, using higest view num builder with {:?}@{:?}", - key.0, - key.1, - self.highest_view_num_builder_id.0, - self.highest_view_num_builder_id.1 + "failed to recover builder for parent {}, using higest view num builder with {}", + key, + self.highest_view_num_builder_id, ); // get the sender for the highest view number builder self.spawned_builder_states @@ -258,7 +255,7 @@ impl GlobalState { // iterate over the spawned builder states and check if the view number exists self.spawned_builder_states .iter() - .any(|((_vid, view_num), _sender)| view_num == key) + .any(|(id, _)| id.view == *key) } pub fn should_view_handle_other_proposals( @@ -266,7 +263,7 @@ impl GlobalState { builder_view: &TYPES::Time, proposal_view: &TYPES::Time, ) -> bool { - *builder_view == self.highest_view_num_builder_id.1 + *builder_view == self.highest_view_num_builder_id.view && !self.check_builder_state_existence_for_a_view(proposal_view) } } @@ -374,7 +371,10 @@ impl AcceptsTxnSubmits for ProxyGlobalState { response ); - response + // NOTE: ideally we want to respond with original Vec + // instead of Result not to loose any information, + // but this requires changes to builder API + response.into_iter().collect() } } #[async_trait] @@ -514,14 +514,16 @@ pub async fn run_non_permissioned_standalone_builder_service { let transactions = hooks.process_transactions(transactions).await; - if let Err(e) = handle_received_txns( + for res in handle_received_txns( &senders.transactions, transactions, TransactionSource::HotShot, ) .await { - tracing::warn!("Failed to handle transactions; {:?}", e); + if let Err(e) = res { + tracing::warn!("Failed to handle transactions; {:?}", e); + } } } // decide event @@ -618,14 +620,16 @@ pub async fn run_permissioned_standalone_builder_service< EventType::Transactions { transactions } => { let transactions = hooks.process_transactions(transactions).await; - if let Err(e) = handle_received_txns( + for res in handle_received_txns( &senders.transactions, transactions, TransactionSource::HotShot, ) .await { - tracing::warn!("Failed to handle transactions; {:?}", e); + if let Err(e) = res { + tracing::warn!("Failed to handle transactions; {:?}", e); + } } } // decide event @@ -799,25 +803,39 @@ pub(crate) async fn handle_received_txns( tx_sender: &BroadcastSender>>, txns: Vec, source: TransactionSource, -) -> Result::Transaction>>, BuildError> { +) -> Vec::Transaction>, BuildError>> { let mut results = Vec::with_capacity(txns.len()); let time_in = Instant::now(); for tx in txns.into_iter() { let commit = tx.commit(); - results.push(commit); let res = tx_sender - .broadcast(Arc::new(ReceivedTransaction { + .try_broadcast(Arc::new(ReceivedTransaction { tx, source: source.clone(), commit, time_in, })) - .await; - if res.is_err() { - return Err(BuildError::Error { - message: format!("Failed to broadcast txn with commit {:?}", commit), + .inspect(|val| { + if let Some(evicted_txn) = val { + tracing::warn!( + "Overflow mode enabled, transaction {} evicted", + evicted_txn.commit + ); + } + }) + .map(|_| commit) + .inspect_err(|err| { + tracing::warn!("Failed to broadcast txn with commit {:?}: {}", commit, err); + }) + .map_err(|err| match err { + TrySendError::Full(_) => BuildError::Error { + message: "Too many transactions".to_owned(), + }, + e => BuildError::Error { + message: format!("Internal error when submitting transaction: {}", e), + }, }); - } + results.push(res); } - Ok(results) + results } diff --git a/src/testing/basic_test.rs b/src/testing/basic_test.rs index 932de265..d2a0cdf5 100644 --- a/src/testing/basic_test.rs +++ b/src/testing/basic_test.rs @@ -10,7 +10,7 @@ pub use hotshot_types::{ }, }; -pub use crate::builder_state::{BuilderProgress, BuilderState, MessageType, ResponseMessage}; +pub use crate::builder_state::{BuilderState, MessageType, ResponseMessage}; pub use async_broadcast::{ broadcast, Receiver as BroadcastReceiver, RecvError, Sender as BroadcastSender, TryRecvError, }; @@ -28,7 +28,6 @@ mod tests { simple_vote::QuorumData, traits::block_contents::{vid_commitment, BlockHeader}, utils::BuilderCommitment, - vid::VidCommitment, }; use hotshot_example_types::{ @@ -41,6 +40,7 @@ mod tests { TransactionSource, }; use crate::service::{handle_received_txns, GlobalState, ReceivedTransaction}; + use crate::BuilderStateId; use async_lock::RwLock; use async_std::task; use committable::{Commitment, CommitmentBoundsArkless, Committable}; @@ -127,7 +127,7 @@ mod tests { #[allow(clippy::type_complexity)] let mut sreq_msgs: Vec<( UnboundedReceiver, - (VidCommitment, ViewNumber), + BuilderStateId, MessageType, )> = Vec::new(); // storing response messages @@ -288,9 +288,11 @@ mod tests { // validate the signature before pushing the message to the builder_state channels // currently this step happens in the service.rs, wheneve we receiver an hotshot event tracing::debug!("Sending transaction message: {:?}", tx); - handle_received_txns(&tx_sender, vec![tx.clone()], TransactionSource::HotShot) - .await - .unwrap(); + for res in + handle_received_txns(&tx_sender, vec![tx.clone()], TransactionSource::HotShot).await + { + res.unwrap(); + } da_sender .broadcast(MessageType::DaProposalMessage(Arc::clone(&sda_msg))) .await @@ -306,7 +308,7 @@ mod tests { let (response_sender, response_receiver) = unbounded(); let request_message = MessageType::::RequestMessage(RequestMessage { - requested_view_number: i as u64, + requested_view_number: ViewNumber::new(i as u64), response_channel: response_sender, }); @@ -315,7 +317,10 @@ mod tests { sqc_msgs.push(sqc_msg); sreq_msgs.push(( response_receiver, - (requested_vid_commitment, ViewNumber::new(i as u64)), + BuilderStateId { + parent_commitment: requested_vid_commitment, + view: ViewNumber::new(i as u64), + }, request_message, )); }