diff --git a/Cargo.lock b/Cargo.lock index a2fed974..c423073e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2989,7 +2989,7 @@ dependencies = [ [[package]] name = "hs-builder-api" version = "0.1.0" -source = "git+https://github.com/EspressoSystems/hs-builder-api?branch=main#7ed5ba213f3dacdb57a52ab35d221323c4aaf839" +source = "git+https://github.com/EspressoSystems/hs-builder-api?branch=main#8b3c4c686b710b5a9693cfb65237b218b109288b" dependencies = [ "async-trait", "clap", @@ -3026,6 +3026,7 @@ dependencies = [ "serde", "sha2 0.10.8", "tagged-base64", + "tide-disco", "tracing", ] @@ -6668,7 +6669,7 @@ dependencies = [ [[package]] name = "tide-disco" version = "0.4.6" -source = "git+https://github.com/EspressoSystems/tide-disco.git?tag=v0.4.6#4ced31cef8aae769994098d245e026c127801b74" +source = "git+https://github.com/EspressoSystems/tide-disco?tag=v0.4.6#4ced31cef8aae769994098d245e026c127801b74" dependencies = [ "async-std", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 4afbca96..c0738d8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,5 +26,6 @@ hotshot-utils = { git = "https://github.com/EspressoSystems/HotShot.git", tag = hs-builder-api = { git = "https://github.com/EspressoSystems/hs-builder-api", branch = "main" } sha2 = "0.10" serde = { version = "1.0", features = ["derive"] } +tide-disco = { git = "https://github.com/EspressoSystems/tide-disco", tag = "v0.4.6" } tagged-base64 = { git = "https://github.com/EspressoSystems/tagged-base64", tag = "0.3.4" } tracing = "0.1" diff --git a/src/builder_state.rs b/src/builder_state.rs index 3a265f04..34c3ff55 100644 --- a/src/builder_state.rs +++ b/src/builder_state.rs @@ -4,18 +4,14 @@ #![allow(clippy::redundant_field_names)] #![allow(clippy::too_many_arguments)] use hotshot_types::{ - data::{test_srs, DAProposal, Leaf, QuorumProposal, VidCommitment, VidScheme, VidSchemeTrait}, + data::{DAProposal, Leaf, QuorumProposal, VidCommitment}, event::LeafChain, message::Proposal, simple_certificate::QuorumCertificate, + traits::block_contents::{BlockHeader, BlockPayload}, traits::{ block_contents::vid_commitment, - node_implementation::{ConsensusTime, NodeType as BuilderType}, - signature_key::SignatureKey, - }, - traits::{ - block_contents::{BlockHeader, BlockPayload}, - election::Membership, + node_implementation::{ConsensusTime, NodeType}, }, utils::BuilderCommitment, vote::Certificate, @@ -33,12 +29,12 @@ use async_trait::async_trait; use futures::StreamExt; //::select_all; use core::panic; -use std::cmp::PartialEq; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Debug; use std::sync::Arc; use std::time::SystemTime; +use std::{cmp::PartialEq, num::NonZeroUsize}; use crate::service::GlobalState; @@ -53,27 +49,27 @@ pub enum TransactionSource { /// Transaction Message to be put on the tx channel #[derive(Clone, Debug, PartialEq)] -pub struct TransactionMessage { +pub struct TransactionMessage { pub tx: TYPES::Transaction, pub tx_type: TransactionSource, } /// Decide Message to be put on the decide channel #[derive(Clone, Debug, PartialEq)] -pub struct DecideMessage { +pub struct DecideMessage { pub leaf_chain: Arc>, pub qc: Arc>, pub block_size: Option, } /// DA Proposal Message to be put on the da proposal channel #[derive(Clone, Debug, PartialEq)] -pub struct DAProposalMessage { +pub struct DAProposalMessage { pub proposal: Proposal>, pub sender: TYPES::SignatureKey, pub total_nodes: usize, } /// QC Message to be put on the quorum proposal channel #[derive(Clone, Debug, PartialEq)] -pub struct QCMessage { +pub struct QCMessage { pub proposal: Proposal>, pub sender: TYPES::SignatureKey, } @@ -84,17 +80,22 @@ pub struct RequestMessage { //pub total_nodes: usize } /// Response Message to be put on the response channel -#[derive(Debug, Clone)] -pub struct ResponseMessage { - pub block_hash: BuilderCommitment, //TODO: Need to pull out from hotshot +#[derive(Debug)] +pub struct BuildBlockInfo { + pub builder_hash: BuilderCommitment, //TODO: Need to pull out from hotshot pub block_size: u64, pub offered_fee: u64, pub block_payload: TYPES::BlockPayload, - pub metadata: <::BlockPayload as BlockPayload>::Metadata, - pub join_handle: Arc>, - pub signature: - <::SignatureKey as SignatureKey>::PureAssembledSignatureType, - pub sender: TYPES::SignatureKey, + pub metadata: <::BlockPayload as BlockPayload>::Metadata, + pub join_handle: Arc>, +} + +/// Response Message to be put on the response channel +#[derive(Debug, Clone)] +pub struct ResponseMessage { + pub builder_hash: BuilderCommitment, //TODO: Need to pull out from hotshot + pub block_size: u64, + pub offered_fee: u64, } /// Enum to hold the status out of the decide event pub enum Status { @@ -103,13 +104,7 @@ pub enum Status { } #[derive(Debug, Clone)] -pub struct BuilderState { - // identity keys for the builder - pub builder_keys: ( - TYPES::SignatureKey, - <::SignatureKey as SignatureKey>::PrivateKey, - ), //TODO (pub,priv) key of the builder, may be good to keep a ref - +pub struct BuilderState { // timestamp to tx hash, used for ordering for the transactions pub timestamp_to_tx: BTreeMap>, @@ -157,17 +152,19 @@ pub struct BuilderState { pub global_state: Arc>>, // response sender - pub response_sender: UnboundedSender>, + pub response_sender: UnboundedSender, - // quorum membership - pub quorum_membership: Arc, + // total nodes required for the VID computation + // Since a builder state exists for potential block building, therefore it gets + // populated based on num nodes received in DA Proposal message + pub total_nodes: NonZeroUsize, // builder Commitements pub builder_commitments: Vec, } /// Trait to hold the helper functions for the builder #[async_trait] -pub trait BuilderProgress { +pub trait BuilderProgress { /// process the external transaction fn process_external_transaction(&mut self, tx: TYPES::Transaction); @@ -192,7 +189,7 @@ pub trait BuilderProgress { ); /// build a block - fn build_block(&mut self, matching_vid: VidCommitment) -> Option>; + fn build_block(&mut self, matching_vid: VidCommitment) -> Option>; /// Event Loop fn event_loop(self); @@ -203,7 +200,7 @@ pub trait BuilderProgress { #[async_trait] //#[tracing::instrument(skip_all)] -impl BuilderProgress for BuilderState { +impl BuilderProgress for BuilderState { /// processing the external i.e private mempool transaction fn process_external_transaction(&mut self, tx: TYPES::Transaction) { // PRIVATE MEMPOOL TRANSACTION PROCESSING @@ -287,7 +284,7 @@ impl BuilderProgress for BuilderState { let view_number = da_proposal_data.view_number; let encoded_txns = da_proposal_data.encoded_transactions; - let metadata: <::BlockPayload as BlockPayload>::Metadata = + let metadata: <::BlockPayload as BlockPayload>::Metadata = da_proposal_data.metadata; // generate the vid commitment; num nodes are received through hotshot api in service.rs and passed along with message onto channel @@ -297,6 +294,10 @@ impl BuilderProgress for BuilderState { encoded_txns, total_nodes ); + + // set the total nodes required for the VID computation // later required in the build_block + self.total_nodes = NonZeroUsize::new(total_nodes).unwrap(); + let payload_vid_commitment = vid_commitment(&encoded_txns, total_nodes); tracing::debug!( "Generated payload commitment from the da proposal: {:?}", @@ -324,8 +325,6 @@ impl BuilderProgress for BuilderState { tracing::info!("Spawning a clone"); self.clone() .spawn_clone(da_proposal_data, qc_proposal_data, sender); - // register the clone to the global state - //self.global_state.get_mut().vid_to_potential_builder_state.insert(payload_vid_commitment, self_clone); } else { tracing::info!("Not spawning a clone despite matching DA and QC proposals, as they corresponds to bootstrapping phase"); } @@ -385,8 +384,6 @@ impl BuilderProgress for BuilderState { tracing::info!("Spawning a clone"); self.clone() .spawn_clone(da_proposal_data, qc_proposal_data, sender); - // registed the clone to the global state - //self.global_state.get_mut().vid_to_potential_builder_state.insert(payload_vid_commitment, self_clone); } else { tracing::info!("Not spawning a clone despite matching DA and QC proposals, as they corresponds to bootstrapping phase"); } @@ -446,7 +443,6 @@ impl BuilderProgress for BuilderState { let metadata = leaf_chain[0].0.get_block_header().metadata(); let transactions_commitments = block_payload.transaction_commitments(metadata); // iterate over the transactions and remove them from tx_hash_to_tx and timestamp_to_tx - //let transactions:Vec = vec![]; for tx_hash in transactions_commitments.iter() { // remove the transaction from the timestamp_to_tx map if let Some((timestamp, _, _)) = self.tx_hash_to_available_txns.get(tx_hash) @@ -518,7 +514,7 @@ impl BuilderProgress for BuilderState { fields(builder_view=%self.built_from_view_vid_leaf.0.get_u64(), builder_vid=%self.built_from_view_vid_leaf.1.clone(), builder_leaf=%self.built_from_view_vid_leaf.2.clone()))] - fn build_block(&mut self, _matching_vid: VidCommitment) -> Option> { + fn build_block(&mut self, _matching_vid: VidCommitment) -> Option> { if let Ok((payload, metadata)) = ::from_transactions( self.timestamp_to_tx.iter().filter_map(|(_ts, tx_hash)| { self.tx_hash_to_available_txns @@ -526,10 +522,10 @@ impl BuilderProgress for BuilderState { .map(|(_ts, tx, _source)| tx.clone()) }), ) { - let block_hash = payload.builder_commitment(&metadata); + let builder_hash = payload.builder_commitment(&metadata); // add the local builder commitment list - self.builder_commitments.push(block_hash.clone()); + self.builder_commitments.push(builder_hash.clone()); //let num_txns = ::txn_count(&payload); let encoded_txns: Vec = payload.encode().unwrap().collect(); @@ -538,51 +534,24 @@ impl BuilderProgress for BuilderState { let offered_fee: u64 = 0; - // get the number of quorum committee members to be used for VID calculation - //let quo_membership = Arc::into_inner(self.quorum_membership).unwrap();//.clone(); + // get the total nodes from the builder state. + // stored while processing the DA Proposal + let vid_num_nodes = self.total_nodes.get(); - let num_quorum_committee = self.quorum_membership.total_nodes(); + // convert vid_num_nodes to usize + // spawn a task to calculate the VID commitment, and pass the builder handle to the global state + // later global state can await on it before replying to the proposer + let join_handle = + task::spawn(async move { vid_commitment(&encoded_txns, vid_num_nodes) }); - // TODO - let srs = test_srs(num_quorum_committee); - - // calculate the last power of two - // TODO change after https://github.com/EspressoSystems/jellyfish/issues/339 - // issue: https://github.com/EspressoSystems/HotShot/issues/2152 - let chunk_size = 1 << num_quorum_committee.ilog2(); - - let join_handle = task::spawn(async move { - // TODO: Disperse Operation: May be talk to @Gus about it // https://github.com/EspressoSystems/HotShot/blob/main/crates/task-impls/src/vid.rs#L97-L98 - // calculate vid shares - let vid = VidScheme::new(chunk_size, num_quorum_committee, &srs).unwrap(); - vid.disperse(encoded_txns).unwrap(); - }); - //self.global_state.write().block_hash_to_block.insert(block_hash, (payload, metadata, join_handle)); - //let mut global_state = self.global_state.write().unwrap(); - //self.global_state.write_arc().await.block_hash_to_block.insert(block_hash.clone(), (payload, metadata, join_handle)); - - // to sign combine the block_hash i.e builder commitment, block size and offered fee - let mut combined_bytes: Vec = Vec::new(); - // TODO: see why it is signing is not working with 48 bytes, however it is working with 32 bytes - //combined_bytes.extend_from_slice(&block_size.to_ne_bytes()); - combined_bytes.extend_from_slice(block_hash.as_ref()); - //combined_bytes.extend_from_slice(&offered_fee.to_ne_bytes()); - - let signature_over_block_info = ::SignatureKey::sign( - &self.builder_keys.1, - combined_bytes.as_ref(), - ) - .expect("Failed to sign tx hash"); //let signature = self.builder_keys.0.sign(&block_hash); - return Some(ResponseMessage { - block_hash: block_hash, + return Some(BuildBlockInfo { + builder_hash: builder_hash, block_size: block_size, offered_fee: offered_fee, block_payload: payload, metadata: metadata, join_handle: Arc::new(join_handle), - signature: signature_over_block_info, - sender: self.builder_keys.0.clone(), }); }; @@ -603,21 +572,25 @@ impl BuilderProgress for BuilderState { response, req ); - // send the response back - self.response_sender.send(response.clone()).await.unwrap(); + + // form the response message and send it back + let response_msg = ResponseMessage { + builder_hash: response.builder_hash.clone(), + block_size: response.block_size, + offered_fee: response.offered_fee, + }; + self.response_sender.send(response_msg).await.unwrap(); // write to global state as well self.global_state .write_arc() .await .block_hash_to_block .insert( - response.block_hash, + response.builder_hash, ( response.block_payload, response.metadata, response.join_handle, - response.signature, - response.sender, ), ); } @@ -636,6 +609,7 @@ impl BuilderProgress for BuilderState { fn event_loop(mut self) { let _builder_handle = async_spawn(async move { loop { + tracing::debug!("Builder event loop"); while let Ok(req) = self.req_receiver.try_recv() { tracing::info!( "Received request msg in builder {:?}: {:?}", @@ -739,7 +713,7 @@ impl BuilderProgress for BuilderState { } /// Unifies the possible messages that can be received by the builder #[derive(Debug, Clone)] -pub enum MessageType { +pub enum MessageType { TransactionMessage(TransactionMessage), DecideMessage(DecideMessage), DAProposalMessage(DAProposalMessage), @@ -747,12 +721,8 @@ pub enum MessageType { RequestMessage(RequestMessage), } -impl BuilderState { +impl BuilderState { pub fn new( - builder_keys: ( - TYPES::SignatureKey, - <::SignatureKey as SignatureKey>::PrivateKey, - ), view_vid_leaf: (TYPES::Time, VidCommitment, Commitment>), tx_receiver: BroadcastReceiver>, decide_receiver: BroadcastReceiver>, @@ -760,11 +730,10 @@ impl BuilderState { qc_receiver: BroadcastReceiver>, req_receiver: BroadcastReceiver>, global_state: Arc>>, - response_sender: UnboundedSender>, - quorum_membership: Arc, + response_sender: UnboundedSender, + num_nodes: NonZeroUsize, ) -> Self { BuilderState { - builder_keys: builder_keys, timestamp_to_tx: BTreeMap::new(), tx_hash_to_available_txns: HashMap::new(), included_txns: HashSet::new(), @@ -779,8 +748,8 @@ impl BuilderState { quorum_proposal_payload_commit_to_quorum_proposal: HashMap::new(), global_state: global_state, response_sender: response_sender, - quorum_membership: quorum_membership, builder_commitments: vec![], + total_nodes: num_nodes, } } } diff --git a/src/data_source.rs b/src/data_source.rs deleted file mode 100644 index c8bd4099..00000000 --- a/src/data_source.rs +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright (c) 2024 Espresso Systems (espressosys.com) -// This file is part of the HotShot Builder Protocol. -// - -//! Builder Phase 1 -//! It mainly provides two API services to external users: -//! 1. Serves a proposer(leader)'s request to provide blocks information -//! 2. Serves a proposer(leader)'s request to provide the full blocks information -//! 3. Serves a request to submit a transaction externally i.e outside the HotShot network - -//! To support the above two services, it uses the following core services: -//! 1. To facilitate the acceptance of the transactions i.e. private and public mempool -//! 2. Actions to be taken on hearning of: -//! a. DA Proposal -//! b. Quorum Proposal -//! c. Decide Event -//! -#![allow(unused_imports)] - -pub use hotshot::{ - traits::NodeImplementation, types::Event, types::SystemContextHandle, SystemContext, -}; -//use async_compatibility_layer::channel::UnboundedStream; -use async_lock::RwLock; -use commit::Committable; - -use hotshot_task_impls::events::HotShotEvent; -use hotshot_types::simple_vote::QuorumData; -use hotshot_types::{ - consensus::Consensus, - error::HotShotError, - event::EventType, - message::{MessageKind, SequencingMessage}, - traits::{election::Membership, node_implementation::NodeType, storage::Storage}, -}; -use hotshot_types::{data::Leaf, simple_certificate::QuorumCertificate}; - -use std::{collections::HashSet, sync::Arc, time::Instant}; -use tracing::error; - -//use crate::builder_state::{BuilderState, BuilderType, GlobalId}; - -/* - - -// process the hotshot transaction event -pub async fn process_hotshot_transaction>( - event_stream: ChannelStream>, - handle: SystemContextHandle, -) -> TaskRunner -{ - let transactions_event_handler = HandleEvent(Arc::new( - move |event, mut state: TransactionTaskState>| { - async move { - let completion_status = state.handle_event(event).await; - (completion_status, state) - } - .boxed() - }, - )); -} - - -// Used by the third API service i.e. to submit a transaction externally (priavate mempool) -async fn process_external_transaction(builder_info: &mut BuilderState, tx_hash: T::TransactionCommit, tx: T::Transaction){ - - // go through the builderinfo.transactionspool btree map check if it already exits based on its transaction hash - // if it does not exist, then add it to the builderinfo.transactionspool btree map - // if it exists, then ignore it - - if let Ok(mut txid_to_tx) = builder_info.txid_to_tx.lock() { - if txid_to_tx.contains_key(&tx_hash) { - println!("Transaction already exists in the builderinfo.txid_to_tx hashmap"); - } else { - // get the current time - //let current_time = Instant::now(); - let next_global_id = next_id(); - // now check if the current time already exists in the builderinfo.time_to_txid btree map - if let Ok(mut time_to_txid) = builder_info.time_to_txid.lock(){ - // if it exists, then add the transaction hash to the existing set - if time_to_txid.contains_key(¤t_time) { - let mut existing_set = time_to_txid.get(¤t_time).unwrap().clone(); - existing_set.insert(tx_hash.clone()); - time_to_txid.insert(current_time, existing_set); - } else { - // if it does not exist, then create a new set and add the transaction hash to it - let mut new_set = HashSet::new(); - new_set.insert(tx_hash.clone()); - time_to_txid.insert(current_time, new_set); - } - } - - txid_to_tx.insert(tx_hash, tx); - - } - } - // Code to handle when the transaction hash exists in the map -} - - - -*/ diff --git a/src/lib.rs b/src/lib.rs index 3b75b2bd..f128f98d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,22 +2,14 @@ // This file is part of the HotShot Builder Protocol. // -//! Builder Phase 1 -//! It mainly provides two API services to external users: -//! 1. Serves a proposer(leader)'s request to provide blocks information -//! 2. Serves a proposer(leader)'s request to provide the full blocks information -//! 3. Serves a request to submit a transaction externally i.e outside the HotShot network +// Builder Phase 1 +// It mainly provides three API services to hotshot proposers: +// 1. Serves a proposer(leader)'s request to provide blocks information +// 2. Serves a proposer(leader)'s request to provide the full blocks information +// 3. Serves a proposer(leader)'s request to provide the block header information -//! To support the above two services, it uses the following core services: -//! 1. To facilitate the acceptance of the transactions i.e. private and public mempool -//! 2. Actions to be taken on hearning of: -//! a. DA Proposal -//! b. Quorum Proposal -//! c. Decide Event -//! - -// providing the API services to the external users -// pub mod api; +// It also provides one API services external users: +// 1. Serves a user's request to submit a private transaction // providing the core services to support above API services pub mod builder_state; @@ -25,9 +17,5 @@ pub mod builder_state; // Core interaction with the HotShot network pub mod service; -// tracking the transactions in both Private and Public mempools -pub mod data_source; - // tracking the testing - pub mod testing; diff --git a/src/service.rs b/src/service.rs index 4f93c3ad..8d6c4dca 100644 --- a/src/service.rs +++ b/src/service.rs @@ -2,19 +2,6 @@ // This file is part of the HotShot Builder Protocol. // -//! Builder Phase 1 -//! It mainly provides two API services to external users: -//! 1. Serves a proposer(leader)'s request to provide blocks information -//! 2. Serves a proposer(leader)'s request to provide the full blocks information -//! 3. Serves a request to submit a transaction externally i.e outside the HotShot network - -//! To support the above two services, it uses the following core services: -//! 1. To facilitate the acceptance of the transactions i.e. private and public mempool -//! 2. Actions to be taken on hearning of: -//! a. DA Proposal -//! b. Quorum Proposal -//! c. Decide Event -//! #![allow(unused_variables)] #![allow(clippy::redundant_field_names)] use hotshot::{traits::NodeImplementation, types::SystemContextHandle, HotShotConsensusApi}; @@ -24,33 +11,31 @@ use hotshot_types::{ traits::{ block_contents::{BlockHeader, BlockPayload}, consensus_api::ConsensusApi, - node_implementation::NodeType as BuilderType, + node_implementation::NodeType, signature_key::SignatureKey, }, utils::BuilderCommitment, }; use hs_builder_api::{ - block_info::{AvailableBlockData, AvailableBlockInfo}, + block_info::{AvailableBlockData, AvailableBlockHeaderInput, AvailableBlockInfo}, builder::BuildError, - data_source::BuilderDataSource, + data_source::{AcceptsTxnSubmits, BuilderDataSource}, }; use async_broadcast::Sender as BroadcastSender; use async_compatibility_layer::channel::UnboundedReceiver; +use async_lock::RwLock; use async_std::task::JoinHandle; use async_trait::async_trait; use futures::stream::StreamExt; -use sha2::{Digest, Sha256}; -use std::{collections::HashMap, sync::Arc}; -use tagged_base64::TaggedBase64; -use tracing::error; - use crate::builder_state::{ DAProposalMessage, DecideMessage, QCMessage, TransactionMessage, TransactionSource, }; use crate::builder_state::{MessageType, RequestMessage, ResponseMessage}; - +use sha2::{Digest, Sha256}; +use std::{collections::HashMap, sync::Arc}; +use tracing::error; #[derive(clap::Args, Default)] pub struct Options { #[clap(short, long, env = "ESPRESSO_BUILDER_PORT")] @@ -59,22 +44,55 @@ pub struct Options { // #[allow(clippy::type_complexity)] #[derive(Debug)] -pub struct GlobalState { +pub struct GlobalState { + // identity keys for the builder + // May be ideal place as GlobalState interacts with hotshot apis + // and then can sign on responsers as desired + pub builder_keys: ( + Types::SignatureKey, // pub key + <::SignatureKey as SignatureKey>::PrivateKey, // private key + ), + + // data store for the blocks pub block_hash_to_block: HashMap< BuilderCommitment, ( Types::BlockPayload, - <::BlockPayload as BlockPayload>::Metadata, - Arc>, - <::SignatureKey as SignatureKey>::PureAssembledSignatureType, - Types::SignatureKey, + <::BlockPayload as BlockPayload>::Metadata, + Arc>, ), >, + // sending a request from the hotshot to the builder states pub request_sender: BroadcastSender>, - pub response_receiver: UnboundedReceiver>, + + // getting a response from the builder states based on the request sent by the hotshot + pub response_receiver: UnboundedReceiver, + + // sending a transaction from the hotshot/private mempool to the builder states + // NOTE: Currently, we don't differentiate between the transactions from the hotshot and the private mempool + pub tx_sender: BroadcastSender>, } -impl GlobalState { +impl GlobalState { + pub fn new( + builder_keys: ( + Types::SignatureKey, + <::SignatureKey as SignatureKey>::PrivateKey, + ), + request_sender: BroadcastSender>, + response_receiver: UnboundedReceiver, + tx_sender: BroadcastSender>, + ) -> Self { + GlobalState { + builder_keys: builder_keys, + block_hash_to_block: Default::default(), + request_sender: request_sender, + response_receiver: response_receiver, + tx_sender: tx_sender, + } + } + + // remove the builder state handles based on the decide event pub fn remove_handles( &mut self, vidcommitment: VidCommitment, @@ -85,25 +103,160 @@ impl GlobalState { self.block_hash_to_block.remove(&block_hash); } } - pub fn new( - request_sender: BroadcastSender>, - response_receiver: UnboundedReceiver>, - ) -> Self { - GlobalState { - block_hash_to_block: Default::default(), - request_sender: request_sender, - response_receiver: response_receiver, + // private mempool submit txn + // Currenlty, we don't differentiate between the transactions from the hotshot and the private mempool + pub async fn submit_txn( + &self, + txn: ::Transaction, + ) -> Result<(), BuildError> { + let tx_msg = TransactionMessage:: { + tx: txn, + tx_type: TransactionSource::HotShot, + }; + self.tx_sender + .broadcast(MessageType::TransactionMessage(tx_msg)) + .await + .map(|a| ()) + .map_err(|e| BuildError::Error { + message: "failed to send txn".to_string(), + }) + } + /// Listen to the events from the HotShot and pass onto to the builder states + pub async fn run_standalone_builder_service>( + &self, + // sending a DA proposal from the hotshot to the builder states + da_sender: BroadcastSender>, + + // sending a QC proposal from the hotshot to the builder states + qc_sender: BroadcastSender>, + + // sending a Decide event from the hotshot to the builder states + decide_sender: BroadcastSender>, + + // hotshot context handle + hotshot: SystemContextHandle, + ) -> Result<(), ()> { + loop { + tracing::debug!("Waiting for events from HotShot"); + let mut event_stream = hotshot.get_event_stream(); + match event_stream.next().await { + None => { + //TODO should we panic here? + //TODO or should we just continue just because we might trasaxtions from private mempool + panic!("Didn't receive any event from the HotShot event stream"); + } + Some(event) => { + match event.event { + // error event + EventType::Error { error } => { + error!("Error event in HotShot: {:?}", error); + } + // tx event + EventType::Transactions { transactions } => { + // iterate over the transactions and send them to the tx_sender, might get duplicate transactions but builder needs to filter them + // TODO: check do we need to change the type or struct of the transaction here + for tx_message in transactions { + let tx_msg = TransactionMessage:: { + tx: tx_message, + tx_type: TransactionSource::HotShot, + }; + self.tx_sender + .broadcast(MessageType::TransactionMessage(tx_msg)) + .await + .unwrap(); + } + } + // DA proposal event + EventType::DAProposal { proposal, sender } => { + // process the DA proposal + // get the leader for current view + let leader = hotshot.get_leader(proposal.data.view_number).await; + // get the encoded transactions hash + let encoded_txns_hash = + Sha256::digest(&proposal.data.encoded_transactions); + // check if the sender is the leader and the signature is valid; if yes, broadcast the DA proposal + if leader == sender + && sender.validate(&proposal.signature, &encoded_txns_hash) + { + // get the num of VID nodes + let c_api: HotShotConsensusApi = HotShotConsensusApi { + inner: hotshot.hotshot.inner.clone(), + }; + + let total_nodes = c_api.total_nodes(); + + let da_msg = DAProposalMessage:: { + proposal: proposal, + sender: sender, + total_nodes: total_nodes.into(), + }; + da_sender + .broadcast(MessageType::DAProposalMessage(da_msg)) + .await + .unwrap(); + } + } + // QC proposal event + EventType::QuorumProposal { proposal, sender } => { + // process the QC proposal + // get the leader for current view + let leader = hotshot.get_leader(proposal.data.view_number).await; + // get the payload commitment + let payload_commitment = + proposal.data.block_header.payload_commitment(); + // check if the sender is the leader and the signature is valid; if yes, broadcast the QC proposal + if sender == leader + && sender.validate(&proposal.signature, payload_commitment.as_ref()) + { + let qc_msg = QCMessage:: { + proposal: proposal, + sender: sender, + }; + qc_sender + .broadcast(MessageType::QCMessage(qc_msg)) + .await + .unwrap(); + } + } + // decide event + EventType::Decide { + leaf_chain, + qc, + block_size, + } => { + let decide_msg: DecideMessage = DecideMessage:: { + leaf_chain: leaf_chain, + qc: qc, + block_size: block_size, + }; + decide_sender + .broadcast(MessageType::DecideMessage(decide_msg)) + .await + .unwrap(); + } + // not sure whether we need it or not //TODO + EventType::ViewFinished { view_number } => { + tracing::info!( + "View Finished Event for view number: {:?}", + view_number + ); + unimplemented!("View Finished Event"); + } + _ => { + unimplemented!(); + } + } + } + } } } } #[async_trait] -impl BuilderDataSource for GlobalState +impl BuilderDataSource for GlobalState where - for<'a> <::SignatureKey as SignatureKey>::PureAssembledSignatureType: - From<&'a tagged_base64::TaggedBase64>, - TaggedBase64: - From<<::SignatureKey as SignatureKey>::PureAssembledSignatureType>, + <::SignatureKey as SignatureKey>::PureAssembledSignatureType: + for<'a> TryFrom<&'a tagged_base64::TaggedBase64> + Into, { async fn get_available_blocks( &self, @@ -116,19 +269,33 @@ where .broadcast(MessageType::RequestMessage(req_msg)) .await .unwrap(); + let response_received = self.response_receiver.recv().await; match response_received { Ok(response) => { - let block_metadata = AvailableBlockInfo:: { - block_hash: response.block_hash, + // to sign combine the block_hash i.e builder commitment, block size and offered fee + let mut combined_bytes: Vec = Vec::new(); + // TODO: see why signing is not working with 48 bytes, however it is working with 32 bytes + combined_bytes.extend_from_slice(response.block_size.to_be_bytes().as_ref()); + combined_bytes.extend_from_slice(response.offered_fee.to_be_bytes().as_ref()); + combined_bytes.extend_from_slice(response.builder_hash.as_ref()); + + let signature_over_block_info = ::SignatureKey::sign( + &self.builder_keys.1, + combined_bytes.as_ref(), + ) + .expect("Available block info signing failed"); + + let initial_block_info = AvailableBlockInfo:: { + block_hash: response.builder_hash, block_size: response.block_size, offered_fee: response.offered_fee, - signature: response.signature, - sender: response.sender, + signature: signature_over_block_info, + sender: self.builder_keys.0.clone(), _phantom: Default::default(), }; - Ok(vec![block_metadata]) + Ok(vec![initial_block_info]) } _ => Err(BuildError::Error { message: "No blocks available".to_string(), @@ -138,16 +305,20 @@ where async fn claim_block( &self, block_hash: &BuilderCommitment, - signature: &<::SignatureKey as SignatureKey>::PureAssembledSignatureType, + signature: &<::SignatureKey as SignatureKey>::PureAssembledSignatureType, ) -> Result, BuildError> { - // TODO: Verify the signature?? - + // TODO, Verify the signature over the proposer request if let Some(block) = self.block_hash_to_block.get(block_hash) { - //Ok(block.0.clone()) + // sign over the builder commitment, as the proposer can computer it based on provide block_payload + // and the metata data + let signature_over_builder_commitment = + ::SignatureKey::sign(&self.builder_keys.1, block_hash.as_ref()) + .expect("Claim block signing failed"); let block_data = AvailableBlockData:: { block_payload: block.0.clone(), - signature: block.3.clone(), - sender: block.4.clone(), + metadata: block.1.clone(), + signature: signature_over_builder_commitment, + sender: self.builder_keys.0.clone(), _phantom: Default::default(), }; Ok(block_data) @@ -158,131 +329,52 @@ where } // TODO: should we remove the block from the hashmap? } - async fn submit_txn(&self, txn: ::Transaction) -> Result<(), BuildError> { - unimplemented!() + async fn claim_block_header_input( + &self, + block_hash: &BuilderCommitment, + signature: &<::SignatureKey as SignatureKey>::PureAssembledSignatureType, + ) -> Result, BuildError> { + if let Some(block) = self.block_hash_to_block.get(block_hash) { + // wait on the handle for the vid computation before returning the response + + // clone the arc handle + let vid_handle = Arc::into_inner(block.2.clone()).unwrap(); + let vid_commitement = vid_handle.await; + + let signature_over_vid_commitment = ::SignatureKey::sign( + &self.builder_keys.1, + vid_commitement.as_ref(), + ) + .expect("Claim block header input signing failed"); + let reponse = AvailableBlockHeaderInput:: { + vid_commitment: vid_commitement, + signature: signature_over_vid_commitment, + sender: self.builder_keys.0.clone(), + _phantom: Default::default(), + }; + Ok(reponse) + } else { + Err(BuildError::Error { + message: "Block not found".to_string(), + }) + } } } -// impl api // from the hs-builder-api/src/ -/// Run an instance of the default Espresso builder service. -pub async fn run_standalone_builder_service, D>( - options: Options, - data_source: D, // contains both the tx's and blocks local pool - hotshot: SystemContextHandle, - tx_sender: BroadcastSender>, - decide_sender: BroadcastSender>, - da_sender: BroadcastSender>, - qc_sender: BroadcastSender>, -) -> Result<(), ()> -//where //TODO - //Payload: availability::QueryablePayload - // Might need to bound D with something... -{ - loop { - let mut event_stream = hotshot.get_event_stream(); - match event_stream.next().await { - None => { - //TODO should we panic here? - //TODO or should we just continue just because we might trasaxtions from private mempool - panic!("Didn't receive any event from the HotShot event stream"); - } - Some(event) => { - match event.event { - // error event - EventType::Error { error } => { - error!("Error event in HotShot: {:?}", error); - } - // tx event - EventType::Transactions { transactions } => { - // iterate over the transactions and send them to the tx_sender, might get duplicate transactions but builder needs to filter them - // TODO: check do we need to change the type or struct of the transaction here - for tx_message in transactions { - let tx_msg = TransactionMessage:: { - tx: tx_message, - tx_type: TransactionSource::HotShot, - }; - tx_sender - .broadcast(MessageType::TransactionMessage(tx_msg)) - .await - .unwrap(); - } - } - // DA proposal event - EventType::DAProposal { proposal, sender } => { - // process the DA proposal - // get the leader for current view - let leader = hotshot.get_leader(proposal.data.view_number).await; - // get the encoded transactions hash - let encoded_txns_hash = Sha256::digest(&proposal.data.encoded_transactions); - // check if the sender is the leader and the signature is valid; if yes, broadcast the DA proposal - if leader == sender - && sender.validate(&proposal.signature, &encoded_txns_hash) - { - // get the num of VID nodes - let c_api: HotShotConsensusApi = HotShotConsensusApi { - inner: hotshot.hotshot.inner.clone(), - }; - - let total_nodes = c_api.total_nodes(); +pub struct GlobalStateTxnSubmitter { + pub global_state_handle: Arc>>, +} - let da_msg = DAProposalMessage:: { - proposal: proposal, - sender: sender, - total_nodes: total_nodes.into(), - }; - da_sender - .broadcast(MessageType::DAProposalMessage(da_msg)) - .await - .unwrap(); - } - } - // QC proposal event - EventType::QuorumProposal { proposal, sender } => { - // process the QC proposal - // get the leader for current view - let leader = hotshot.get_leader(proposal.data.view_number).await; - // get the payload commitment - let payload_commitment = proposal.data.block_header.payload_commitment(); - // check if the sender is the leader and the signature is valid; if yes, broadcast the QC proposal - if sender == leader - && sender.validate(&proposal.signature, payload_commitment.as_ref()) - { - let qc_msg = QCMessage:: { - proposal: proposal, - sender: sender, - }; - qc_sender - .broadcast(MessageType::QCMessage(qc_msg)) - .await - .unwrap(); - } - } - // decide event - EventType::Decide { - leaf_chain, - qc, - block_size, - } => { - let decide_msg: DecideMessage = DecideMessage:: { - leaf_chain: leaf_chain, - qc: qc, - block_size: block_size, - }; - decide_sender - .broadcast(MessageType::DecideMessage(decide_msg)) - .await - .unwrap(); - } - // not sure whether we need it or not //TODO - EventType::ViewFinished { view_number } => { - tracing::info!("View Finished Event for view number: {:?}", view_number); - unimplemented!("View Finished Event"); - } - _ => { - unimplemented!(); - } - } - } - } +#[async_trait] +impl AcceptsTxnSubmits for GlobalStateTxnSubmitter { + async fn submit_txn( + &mut self, + txn: ::Transaction, + ) -> Result<(), BuildError> { + self.global_state_handle + .read_arc() + .await + .submit_txn(txn) + .await } } diff --git a/src/testing.rs b/src/testing.rs index 3fdff719..f1f6cd5d 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -2,8 +2,4 @@ // This file is part of the HotShot Builder Protocol. // -//! Builder Phase 1 Testing -//! -//! - pub mod basic_test; diff --git a/src/testing/basic_test.rs b/src/testing/basic_test.rs index 92738297..568cbaa2 100644 --- a/src/testing/basic_test.rs +++ b/src/testing/basic_test.rs @@ -2,8 +2,9 @@ // This file is part of the HotShot Builder Protocol. // -//! Builder Phase 1 Testing -//! +// Builder Phase 1 Testing +// + #![allow(unused_imports)] #![allow(clippy::redundant_field_names)] use async_std::task; @@ -17,7 +18,7 @@ pub use hotshot_types::{ simple_certificate::{QuorumCertificate, SimpleCertificate, SuccessThreshold}, traits::{ block_contents::BlockPayload, - node_implementation::{ConsensusTime, NodeType as BuilderType, NodeType}, + node_implementation::{ConsensusTime, NodeType}, }, }; use sha2::{Digest, Sha256}; @@ -35,7 +36,7 @@ use tracing; #[cfg(test)] mod tests { - use std::{hash::Hash, marker::PhantomData}; + use std::{hash::Hash, marker::PhantomData, num::NonZeroUsize}; use async_compatibility_layer::channel::unbounded; use commit::Committable; @@ -73,10 +74,10 @@ mod tests { use super::*; use serde::{Deserialize, Serialize}; - /// This test simulates multiple builders receiving messages from the channels and processing them + /// This test simulates multiple builder states receiving messages from the channels and processing them #[async_std::test] //#[instrument] - async fn test_channel() { + async fn test_builder() { async_compatibility_layer::logging::setup_logging(); async_compatibility_layer::logging::setup_backtrace(); tracing::info!("Testing the builder core with multiple messages from the channels"); @@ -94,7 +95,7 @@ mod tests { Deserialize, )] struct TestTypes; - impl BuilderType for TestTypes { + impl NodeType for TestTypes { type Time = ViewNumber; type BlockHeader = TestBlockHeader; type BlockPayload = TestBlockPayload; @@ -106,7 +107,7 @@ mod tests { type Membership = GeneralStaticCommittee; } // no of test messages to send - let num_test_messages = 10; + let num_test_messages = 5; let multiplication_factor = 5; const TEST_NUM_NODES_IN_VID_COMPUTATION: usize = 4; @@ -123,6 +124,19 @@ mod tests { broadcast::>(num_test_messages * multiplication_factor); let (res_sender, res_receiver) = unbounded(); + //let global_state_clone = global_state.clone(); + // generate the keys for the buidler + let seed = [201_u8; 32]; + let (builder_pub_key, builder_private_key) = + BLSPubKey::generated_from_seed_indexed(seed, 2011_u64); + // instantiate the global state also + let global_state = GlobalState::::new( + (builder_pub_key, builder_private_key), + req_sender, + res_receiver, + tx_sender.clone(), + ); + // to store all the sent messages let mut stx_msgs: Vec> = Vec::new(); let mut sdecide_msgs: Vec> = Vec::new(); @@ -130,7 +144,7 @@ mod tests { let mut sqc_msgs: Vec> = Vec::new(); let mut sreq_msgs: Vec> = Vec::new(); // storing response messages - let mut rres_msgs: Vec> = Vec::new(); + let mut rres_msgs: Vec = Vec::new(); // generate num_test messages for each type and send it to the respective channels; for i in 0..num_test_messages as u32 { @@ -272,9 +286,9 @@ mod tests { let payload_commitment = qc_proposal.block_header.payload_commitment(); let qc_signature = ::SignatureKey::sign( - &private_key, - payload_commitment.as_ref(), - ).expect("Failed to sign payload commitment while preparing QC proposal"); + &private_key, + payload_commitment.as_ref(), + ).expect("Failed to sign payload commitment while preparing QC proposal"); let sqc_msg = QCMessage:: { proposal: Proposal { @@ -344,46 +358,12 @@ mod tests { sqc_msgs.push(sqc_msg); sreq_msgs.push(request_message); } - // form the quorum election config, required for the VID computation inside the builder_state - let quorum_election_config = <::Membership as Membership< - TestTypes, - >>::default_election_config( - TEST_NUM_NODES_IN_VID_COMPUTATION as u64 - ); - - let mut commitee_stake_table_entries = vec![]; - for i in 0..TEST_NUM_NODES_IN_VID_COMPUTATION { - let (pub_key, _private_key) = - BLSPubKey::generated_from_seed_indexed([i as u8; 32], i as u64); - let stake = i as u64; - commitee_stake_table_entries.push(pub_key.get_stake_table_entry(stake)); - } - - let quorum_membership = - <::Membership as Membership>::create_election( - commitee_stake_table_entries, - quorum_election_config, - ); - - assert_eq!( - quorum_membership.total_nodes(), - TEST_NUM_NODES_IN_VID_COMPUTATION - ); - - // instantiate the global state also - let global_state = Arc::new(RwLock::new(GlobalState::::new( - req_sender.clone(), - res_receiver, - ))); - let global_state_clone = global_state.clone(); - // generate the keys for the buidler - let seed = [201_u8; 32]; - let (builder_pub_key, builder_private_key) = - BLSPubKey::generated_from_seed_indexed(seed, 2011_u64); + //let global_state_clone = arc_rwlock_global_state.clone(); + let arc_rwlock_global_state = Arc::new(RwLock::new(global_state)); + let arc_rwlock_global_state_clone = arc_rwlock_global_state.clone(); let handle = async_spawn(async move { let builder_state = BuilderState::::new( - (builder_pub_key, builder_private_key), ( ViewNumber::new(0), genesis_vid_commitment(), @@ -394,9 +374,9 @@ mod tests { da_receiver, qc_receiver, req_receiver, - global_state, + arc_rwlock_global_state_clone, res_sender, - Arc::new(quorum_membership), + NonZeroUsize::new(TEST_NUM_NODES_IN_VID_COMPUTATION).unwrap(), ); //builder_state.event_loop().await; @@ -408,7 +388,13 @@ mod tests { // go through the request messages in sreq_msgs and send the request message for req_msg in sreq_msgs.iter() { task::sleep(std::time::Duration::from_secs(1)).await; - req_sender.broadcast(req_msg.clone()).await.unwrap(); + arc_rwlock_global_state + .read_arc() + .await + .request_sender + .broadcast(req_msg.clone()) + .await + .unwrap(); } task::sleep(std::time::Duration::from_secs(2)).await; @@ -421,8 +407,8 @@ mod tests { .unwrap(); } - while let Ok(res_msg) = global_state_clone - .write_arc() + while let Ok(res_msg) = arc_rwlock_global_state + .read_arc() .await .response_receiver .try_recv()