Skip to content

Commit

Permalink
Port refactors from legacy builder
Browse files Browse the repository at this point in the history
  • Loading branch information
QuentinI committed Jul 29, 2024
1 parent 194e972 commit 1734e44
Show file tree
Hide file tree
Showing 9 changed files with 821 additions and 134 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ clap = { version = "4.4", features = ["derive", "env"] }
committable = "0.2"
derivative = "2.2"
futures = "0.3"
hex = "0.4.3"
hotshot = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" }
hotshot-builder-api = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" }
hotshot-events-service = { git = "https://github.com/EspressoSystems/hotshot-events-service.git", tag = "rc-0.1.37" }
hotshot-task-impls = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" }
hotshot-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" }
lru = "0.12.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10"
Expand All @@ -38,3 +40,8 @@ multimap = "0.10.0"

[dev-dependencies]
hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" }

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(async_executor_impl, values("async-std", "tokio"))',
] }
93 changes: 33 additions & 60 deletions src/builder_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ use hotshot_types::{

use committable::{Commitment, Committable};

use crate::service::{GlobalState, ReceivedTransaction};
use crate::{
service::{GlobalState, ReceivedTransaction},
BlockId, BuilderStateId,
};
use async_broadcast::broadcast;
use async_broadcast::Receiver as BroadcastReceiver;
use async_broadcast::Sender as BroadcastSender;
use async_compatibility_layer::channel::UnboundedSender;
use async_compatibility_layer::{art::async_sleep, art::async_spawn};
use async_lock::RwLock;
use async_trait::async_trait;
use core::panic;
use futures::StreamExt;

Expand Down Expand Up @@ -62,8 +64,8 @@ pub struct QCMessage<TYPES: NodeType> {
}
/// Request Message to be put on the request channel
#[derive(Clone, Debug)]
pub struct RequestMessage {
pub requested_view_number: u64,
pub struct RequestMessage<TYPES: NodeType> {
pub requested_view_number: TYPES::Time,
pub response_channel: UnboundedSender<ResponseMessage>,
}
pub enum TriggerStatus {
Expand All @@ -74,7 +76,7 @@ pub enum TriggerStatus {
/// Response Message to be put on the response channel
#[derive(Debug)]
pub struct BuildBlockInfo<TYPES: NodeType> {
pub builder_hash: BuilderCommitment,
pub id: BlockId<TYPES>,
pub block_size: u64,
pub offered_fee: u64,
pub block_payload: TYPES::BlockPayload,
Expand Down Expand Up @@ -185,41 +187,7 @@ pub struct BuilderState<TYPES: NodeType> {
pub next_txn_garbage_collect_time: Instant,
}

/// Trait to hold the helper functions for the builder
#[async_trait]
pub trait BuilderProgress<TYPES: NodeType> {
/// process the DA proposal
async fn process_da_proposal(&mut self, da_msg: Arc<DaProposalMessage<TYPES>>);
/// process the quorum proposal
async fn process_quorum_proposal(&mut self, qc_msg: QCMessage<TYPES>);

/// process the decide event
async fn process_decide_event(&mut self, decide_msg: DecideMessage<TYPES>) -> Option<Status>;

/// spawn a clone of builder
async fn spawn_clone(
self,
da_proposal: Arc<DaProposalMessage<TYPES>>,
quorum_proposal: Arc<Proposal<TYPES, QuorumProposal<TYPES>>>,
req_sender: BroadcastSender<MessageType<TYPES>>,
);

/// build a block
async fn build_block(
&mut self,
matching_builder_commitment: VidCommitment,
matching_view_number: TYPES::Time,
) -> Option<BuildBlockInfo<TYPES>>;

/// Event Loop
fn event_loop(self);

/// process the block request
async fn process_block_request(&mut self, req: RequestMessage);
}

#[async_trait]
impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
impl<TYPES: NodeType> BuilderState<TYPES> {
/// processing the DA proposal
#[tracing::instrument(skip_all, name = "process da proposal",
fields(builder_built_from_proposed_block = %self.built_from_proposed_block))]
Expand Down Expand Up @@ -440,8 +408,10 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {

// register the spawned builder state to spawned_builder_states in the global state
self.global_state.write_arc().await.register_builder_state(
self.built_from_proposed_block.vid_commitment,
self.built_from_proposed_block.view_number,
BuilderStateId {
parent_commitment: self.built_from_proposed_block.vid_commitment,
view: self.built_from_proposed_block.view_number,
},
req_sender,
);

Expand All @@ -453,8 +423,7 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
fields(builder_built_from_proposed_block = %self.built_from_proposed_block))]
async fn build_block(
&mut self,
matching_vid: VidCommitment,
requested_view_number: TYPES::Time,
state_id: BuilderStateId<TYPES>,
) -> Option<BuildBlockInfo<TYPES>> {
let timeout_after = Instant::now() + self.maximize_txn_capture_timeout;
let sleep_interval = self.maximize_txn_capture_timeout / 10;
Expand Down Expand Up @@ -484,9 +453,9 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {

// insert the recently built block into the builder commitments
self.builder_commitments.insert((
matching_vid,
state_id.parent_commitment,
builder_hash.clone(),
requested_view_number,
state_id.view,
));

let encoded_txns: Vec<u8> = payload.encode().to_vec();
Expand All @@ -501,7 +470,10 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
);

Some(BuildBlockInfo {
builder_hash,
id: BlockId {
view: self.built_from_proposed_block.view_number,
hash: builder_hash,
},
block_size,
offered_fee,
block_payload: payload,
Expand All @@ -513,9 +485,8 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
}
}

async fn process_block_request(&mut self, req: RequestMessage) {
let requested_view_number =
<<TYPES as NodeType>::Time as ConsensusTime>::new(req.requested_view_number);
async fn process_block_request(&mut self, req: RequestMessage<TYPES>) {
let requested_view_number = req.requested_view_number;
// If a spawned clone is active then it will handle the request, otherwise the highest view num builder will handle
if requested_view_number == self.built_from_proposed_block.view_number {
tracing::info!(
Expand All @@ -525,26 +496,28 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
requested_view_number
);
let response = self
.build_block(
self.built_from_proposed_block.vid_commitment,
requested_view_number,
)
.build_block(BuilderStateId {
parent_commitment: self.built_from_proposed_block.vid_commitment,
view: requested_view_number,
})
.await;

match response {
Some(response) => {
// form the response message
let response_msg = ResponseMessage {
builder_hash: response.builder_hash.clone(),
builder_hash: response.id.hash.clone(),
block_size: response.block_size,
offered_fee: response.offered_fee,
};

let builder_hash = response.builder_hash.clone();
let builder_hash = response.id.hash.clone();
self.global_state.write_arc().await.update_global_state(
BuilderStateId {
parent_commitment: self.built_from_proposed_block.vid_commitment,
view: requested_view_number,
},
response,
self.built_from_proposed_block.vid_commitment,
requested_view_number,
response_msg.clone(),
);

Expand Down Expand Up @@ -581,7 +554,7 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
}
#[tracing::instrument(skip_all, name = "event loop",
fields(builder_built_from_proposed_block = %self.built_from_proposed_block))]
fn event_loop(mut self) {
pub fn event_loop(mut self) {
let _builder_handle = async_spawn(async move {
loop {
tracing::debug!(
Expand Down Expand Up @@ -686,7 +659,7 @@ pub enum MessageType<TYPES: NodeType> {
DecideMessage(DecideMessage<TYPES>),
DaProposalMessage(Arc<DaProposalMessage<TYPES>>),
QCMessage(QCMessage<TYPES>),
RequestMessage(RequestMessage),
RequestMessage(RequestMessage<TYPES>),
}

#[allow(clippy::too_many_arguments)]
Expand Down
79 changes: 79 additions & 0 deletions src/builder_state.rs.rej
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
diff a/src/builder_state.rs b/src/builder_state.rs (rejected hunks)
@@ -65,9 +68,8 @@ pub struct QCMessage<TYPES: NodeType> {
}
/// Request Message to be put on the request channel
#[derive(Clone, Debug)]
-pub struct RequestMessage {
- pub requested_vid_commitment: VidCommitment,
- pub requested_view_number: u64,
+pub struct RequestMessageTYPES: NodeType> {
+ pub state_id: BuilderStateIdTYPES>,
pub response_channel: UnboundedSender<ResponseMessage>,
}
pub enum TriggerStatus {
@@ -555,46 +561,39 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
}
}

- async fn process_block_request(&mut self, req: RequestMessage) {
- let requested_vid_commitment = req.requested_vid_commitment;
- let requested_view_number =
- <<TYPES as NodeType>::Time as ConsensusTime>::new(req.requested_view_number);
+ async fn process_block_request(&mut self, req: RequestMessage<TYPES>) {
// If a spawned clone is active then it will handle the request, otherwise the highest view num builder will handle
- if (requested_vid_commitment == self.built_from_proposed_block.vid_commitment
- && requested_view_number == self.built_from_proposed_block.view_number)
+ if (req.state_id.parent_commitment == self.built_from_proposed_block.vid_commitment
+ && req.state_id.view == self.built_from_proposed_block.view_number)
|| (self.built_from_proposed_block.view_number.u64()
== self
.global_state
.read_arc()
.await
.highest_view_num_builder_id
- .1
+ .view
.u64())
{
tracing::info!(
- "Request handled by builder with view {:?} for (parent {:?}, view_num: {:?})",
+ "Request for parent {} handled by builder with view {:?}",
+ req.state_id,
self.built_from_proposed_block.view_number,
- requested_vid_commitment,
- requested_view_number
);
- let response = self
- .build_block(requested_vid_commitment, requested_view_number)
- .await;
+ let response = self.build_block(req.state_id.clone()).await;

match response {
Some(response) => {
// form the response message
let response_msg = ResponseMessage {
- builder_hash: response.builder_hash.clone(),
+ builder_hash: response.id.hash.clone(),
block_size: response.block_size,
offered_fee: response.offered_fee,
};

- let builder_hash = response.builder_hash.clone();
+ let builder_hash = response.id.hash.clone();
self.global_state.write_arc().await.update_global_state(
+ req.state_id.clone(),
response,
- requested_vid_commitment,
- requested_view_number,
response_msg.clone(),
);

@@ -736,7 +735,7 @@ pub enum MessageType<TYPES: NodeType> {
DecideMessage(DecideMessage<TYPES>),
DaProposalMessage(DaProposalMessage<TYPES>),
QCMessage(QCMessage<TYPES>),
- RequestMessage(RequestMessage),
+ RequestMessage(RequestMessage<TYPES>),
}

#[allow(clippy::too_many_arguments)]
33 changes: 33 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
// It also provides one API services external users:
// 1. Serves a user's request to submit a private transaction

use hotshot_types::{
traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment,
};

// providing the core services to support above API services
pub mod builder_state;

Expand All @@ -19,3 +23,32 @@ pub mod service;

// tracking the testing
pub mod testing;

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct BlockId<TYPES: NodeType> {
hash: BuilderCommitment,
view: TYPES::Time,
}

impl<TYPES: NodeType> std::fmt::Display for BlockId<TYPES> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Block({}@{})",
hex::encode(self.hash.as_ref()),
*self.view
)
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct BuilderStateId<TYPES: NodeType> {
parent_commitment: VidCommitment,
view: TYPES::Time,
}

impl<TYPES: NodeType> std::fmt::Display for BuilderStateId<TYPES> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BuilderState({}@{})", self.parent_commitment, *self.view)
}
}
Loading

0 comments on commit 1734e44

Please sign in to comment.