diff --git a/Cargo.lock b/Cargo.lock index ad5beecf..ca6e4ea0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4693,12 +4693,13 @@ dependencies = [ "anyhow", "async-broadcast", "async-compatibility-layer", - "async-lock 3.4.0", + "async-lock 2.8.0", "async-std", "async-trait", "clap", "committable", "derivative", + "either", "futures", "hex", "hotshot", diff --git a/Cargo.toml b/Cargo.toml index e901e0ed..f9c07811 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,12 +10,13 @@ async-broadcast = "0.7" async-compatibility-layer = { version = "1.2.1", default-features = false, features = [ "logging-utils", ] } -async-lock = "3.4" +async-lock = "2.2" async-std = { version = "1.9.0", features = ["unstable", "attributes"] } async-trait = "0.1" clap = { version = "4.5", features = ["derive", "env"] } committable = "0.2" derivative = "2.2" +either = "1.13" futures = "0.3" hex = "0.4.3" hotshot = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.71" } diff --git a/src/builder_state.rs b/src/builder_state.rs index 9706c208..9b1b5cb9 100644 --- a/src/builder_state.rs +++ b/src/builder_state.rs @@ -25,11 +25,11 @@ use async_lock::RwLock; use core::panic; use futures::StreamExt; +use std::cmp::PartialEq; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::Arc; use std::time::Instant; -use std::{cmp::PartialEq, num::NonZeroUsize}; use std::{collections::hash_map::Entry, time::Duration}; pub type TxTimeStamp = u128; @@ -51,7 +51,6 @@ pub struct DecideMessage { pub struct DaProposalMessage { pub view_number: TYPES::Time, pub txn_commitments: Vec>, - pub num_nodes: usize, pub sender: ::SignatureKey, pub builder_commitment: BuilderCommitment, } @@ -156,9 +155,6 @@ pub struct BuilderState { /// global state handle, defined in the service.rs pub global_state: Arc>>, - /// total nodes required for the VID computation as part of block header input response - pub num_nodes: NonZeroUsize, - /// locally spawned builder Commitements pub builder_commitments: HashSet<(BuilderStateId, BuilderCommitment)>, @@ -375,7 +371,6 @@ impl BuilderState { quorum_proposal: Arc>>, req_sender: BroadcastSender>, ) { - self.num_nodes = NonZeroUsize::new(da_proposal_info.num_nodes).unwrap_or(self.num_nodes); self.built_from_proposed_block.view_number = quorum_proposal.data.view_number; self.built_from_proposed_block.vid_commitment = quorum_proposal.data.block_header.payload_commitment(); @@ -659,7 +654,6 @@ impl BuilderState { req_receiver: BroadcastReceiver>, tx_queue: Vec>>, global_state: Arc>>, - num_nodes: NonZeroUsize, maximize_txn_capture_timeout: Duration, base_fee: u64, instance_state: Arc, @@ -668,7 +662,6 @@ impl BuilderState { ) -> Self { let txns_in_queue: HashSet<_> = tx_queue.iter().map(|tx| tx.commit).collect(); BuilderState { - num_nodes, txns_in_queue, built_from_proposed_block, req_receiver, @@ -706,7 +699,6 @@ impl BuilderState { tx_queue: self.tx_queue.clone(), global_state: self.global_state.clone(), builder_commitments: self.builder_commitments.clone(), - num_nodes: self.num_nodes, maximize_txn_capture_timeout: self.maximize_txn_capture_timeout, base_fee: self.base_fee, instance_state: self.instance_state.clone(), diff --git a/src/service.rs b/src/service.rs index 4b52faaa..b6e252fb 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,26 +1,24 @@ -use anyhow::Context; -use hotshot::{traits::election::static_committee::GeneralStaticCommittee, types::Event}; +use anyhow::bail; +use hotshot::types::Event; use hotshot_builder_api::v0_3::{ builder::BuildError, data_source::{AcceptsTxnSubmits, BuilderDataSource}, }; +use hotshot_types::bundle::Bundle; use hotshot_types::traits::block_contents::BuilderFee; -use hotshot_types::{bundle::Bundle, traits::node_implementation::Versions}; use hotshot_types::{ data::{DaProposal, Leaf, QuorumProposal, ViewNumber}, event::EventType, message::Proposal, traits::{ block_contents::BlockPayload, - election::Membership, - network::Topic, node_implementation::{ConsensusTime, NodeType}, signature_key::{BuilderSignatureKey, SignatureKey}, }, utils::BuilderCommitment, vid::VidCommitment, }; -use tracing::error; +use tracing::{error, instrument}; use std::{fmt::Debug, marker::PhantomData, time::Duration}; @@ -31,24 +29,21 @@ use crate::{ }, utils::{BlockId, BuilderStateId}, }; -use anyhow::anyhow; pub use async_broadcast::{broadcast, RecvError, TryRecvError}; use async_broadcast::{InactiveReceiver, Sender as BroadcastSender, TrySendError}; use async_lock::RwLock; use async_trait::async_trait; use committable::{Commitment, Committable}; use derivative::Derivative; -use futures::future::BoxFuture; use futures::stream::StreamExt; -use hotshot_events_service::{events::Error as EventStreamError, events_source::StartupInfo}; +use futures::{future::BoxFuture, Stream}; use sha2::{Digest, Sha256}; use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::Arc; use std::{fmt::Display, time::Instant}; -use surf_disco::Client; use tagged_base64::TaggedBase64; -use tide_disco::{method::ReadState, Url}; +use tide_disco::method::ReadState; // It holds all the necessary information for a block #[derive(Debug)] @@ -132,8 +127,6 @@ impl GlobalState { tx_sender: BroadcastSender>>, bootstrapped_builder_state_id: VidCommitment, bootstrapped_view_num: TYPES::Time, - last_garbage_collected_view_num: TYPES::Time, - _buffer_view_num_count: u64, ) -> Self { let mut spawned_builder_states = HashMap::new(); let bootstrap_id = BuilderStateId { @@ -145,7 +138,7 @@ impl GlobalState { blocks: lru::LruCache::new(NonZeroUsize::new(256).unwrap()), spawned_builder_states, tx_sender, - last_garbage_collected_view_num, + last_garbage_collected_view_num: bootstrapped_view_num, builder_state_to_last_built_block: Default::default(), highest_view_num_builder_id: bootstrap_id, } @@ -568,66 +561,8 @@ pub struct BroadcastSenders { pub decide: BroadcastSender>, } -async fn connect_to_events_service( - hotshot_events_api_url: Url, -) -> Option<( - surf_disco::socket::Connection< - Event, - surf_disco::socket::Unsupported, - EventStreamError, - V::Base, - >, - GeneralStaticCommittee::SignatureKey>, -)> { - let client = Client::::new( - hotshot_events_api_url.clone(), - ); - - if !(client.connect(None).await) { - return None; - } - - tracing::info!("Builder client connected to the hotshot events api"); - - // client subscrive to hotshot events - let subscribed_events = client - .socket("hotshot-events/events") - .subscribe::>() - .await - .ok()?; - - // handle the startup event at the start - let membership = if let Ok(response) = client - .get::>("hotshot-events/startup_info") - .send() - .await - { - let StartupInfo { - known_node_with_stake, - non_staked_node_count, - } = response; - let membership: GeneralStaticCommittee::SignatureKey> = - GeneralStaticCommittee::::SignatureKey>::create_election( - known_node_with_stake.clone(), - known_node_with_stake.clone(), - Topic::Global, - 0, - ); - - tracing::info!( - "Startup info: Known nodes with stake: {:?}, Non-staked node count: {:?}", - known_node_with_stake, - non_staked_node_count - ); - Some(membership) - } else { - None - }; - membership.map(|membership| (subscribed_events, membership)) -} - #[async_trait] -pub trait BuilderHooks: Sync + Send { +pub trait BuilderHooks: Sync + Send + 'static { #[inline(always)] async fn process_transactions( self: &Arc, @@ -644,114 +579,60 @@ pub struct NoHooks(pub PhantomData); impl BuilderHooks for NoHooks {} -/* -Running Non-Permissioned Builder Service -*/ -pub async fn run_non_permissioned_standalone_builder_service< - TYPES: NodeType