Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port refactors from legacy builder #30

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may consider updating to HotShot 0.5.67 without the rc_ but this can be in a separate PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh actually, looks like the tag update is needed for conflict resolving.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I'll bump the tag!

Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ clap = { version = "4.4", features = ["derive", "env"] }
committable = "0.2"
derivative = "2.2"
futures = "0.3"
hex = "0.4.3"
hotshot = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" }
hotshot-builder-api = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" }
hotshot-events-service = { git = "https://github.com/EspressoSystems/hotshot-events-service.git", tag = "rc-0.1.37" }
hotshot-task-impls = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" }
hotshot-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" }
lru = "0.12.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10"
Expand All @@ -38,3 +40,8 @@ multimap = "0.10.0"

[dev-dependencies]
hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "rc-0.5.67" }

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(async_executor_impl, values("async-std", "tokio"))',
] }
98 changes: 34 additions & 64 deletions src/builder_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ use hotshot_types::{

use committable::{Commitment, Committable};

use crate::service::{GlobalState, ReceivedTransaction};
use crate::{
service::{GlobalState, ReceivedTransaction},
BlockId, BuilderStateId,
};
use async_broadcast::broadcast;
use async_broadcast::Receiver as BroadcastReceiver;
use async_broadcast::Sender as BroadcastSender;
use async_compatibility_layer::channel::UnboundedSender;
use async_compatibility_layer::{art::async_sleep, art::async_spawn};
use async_lock::RwLock;
use async_trait::async_trait;
use core::panic;
use futures::StreamExt;

Expand Down Expand Up @@ -62,8 +64,8 @@ pub struct QCMessage<TYPES: NodeType> {
}
/// Request Message to be put on the request channel
#[derive(Clone, Debug)]
pub struct RequestMessage {
pub requested_view_number: u64,
pub struct RequestMessage<TYPES: NodeType> {
pub requested_view_number: TYPES::Time,
pub response_channel: UnboundedSender<ResponseMessage>,
}
pub enum TriggerStatus {
Expand All @@ -74,7 +76,7 @@ pub enum TriggerStatus {
/// Response Message to be put on the response channel
#[derive(Debug)]
pub struct BuildBlockInfo<TYPES: NodeType> {
pub builder_hash: BuilderCommitment,
pub id: BlockId<TYPES>,
pub block_size: u64,
pub offered_fee: u64,
pub block_payload: TYPES::BlockPayload,
Expand Down Expand Up @@ -163,7 +165,7 @@ pub struct BuilderState<TYPES: NodeType> {
pub total_nodes: NonZeroUsize,

/// locally spawned builder Commitements
pub builder_commitments: HashSet<(VidCommitment, BuilderCommitment, TYPES::Time)>,
pub builder_commitments: HashSet<(BuilderStateId<TYPES>, BuilderCommitment)>,

/// timeout for maximising the txns in the block
pub maximize_txn_capture_timeout: Duration,
Expand All @@ -185,41 +187,7 @@ pub struct BuilderState<TYPES: NodeType> {
pub next_txn_garbage_collect_time: Instant,
}

/// Trait to hold the helper functions for the builder
#[async_trait]
pub trait BuilderProgress<TYPES: NodeType> {
/// process the DA proposal
async fn process_da_proposal(&mut self, da_msg: Arc<DaProposalMessage<TYPES>>);
/// process the quorum proposal
async fn process_quorum_proposal(&mut self, qc_msg: QCMessage<TYPES>);

/// process the decide event
async fn process_decide_event(&mut self, decide_msg: DecideMessage<TYPES>) -> Option<Status>;

/// spawn a clone of builder
async fn spawn_clone(
self,
da_proposal: Arc<DaProposalMessage<TYPES>>,
quorum_proposal: Arc<Proposal<TYPES, QuorumProposal<TYPES>>>,
req_sender: BroadcastSender<MessageType<TYPES>>,
);

/// build a block
async fn build_block(
&mut self,
matching_builder_commitment: VidCommitment,
matching_view_number: TYPES::Time,
) -> Option<BuildBlockInfo<TYPES>>;

/// Event Loop
fn event_loop(self);

/// process the block request
async fn process_block_request(&mut self, req: RequestMessage);
}

#[async_trait]
impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
impl<TYPES: NodeType> BuilderState<TYPES> {
/// processing the DA proposal
#[tracing::instrument(skip_all, name = "process da proposal",
fields(builder_built_from_proposed_block = %self.built_from_proposed_block))]
Expand Down Expand Up @@ -440,8 +408,10 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {

// register the spawned builder state to spawned_builder_states in the global state
self.global_state.write_arc().await.register_builder_state(
self.built_from_proposed_block.vid_commitment,
self.built_from_proposed_block.view_number,
BuilderStateId {
parent_commitment: self.built_from_proposed_block.vid_commitment,
view: self.built_from_proposed_block.view_number,
},
req_sender,
);

Expand All @@ -453,8 +423,7 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
fields(builder_built_from_proposed_block = %self.built_from_proposed_block))]
async fn build_block(
&mut self,
matching_vid: VidCommitment,
requested_view_number: TYPES::Time,
state_id: BuilderStateId<TYPES>,
) -> Option<BuildBlockInfo<TYPES>> {
let timeout_after = Instant::now() + self.maximize_txn_capture_timeout;
let sleep_interval = self.maximize_txn_capture_timeout / 10;
Expand Down Expand Up @@ -483,11 +452,8 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
let txn_count = payload.num_transactions(&metadata);

// insert the recently built block into the builder commitments
self.builder_commitments.insert((
matching_vid,
builder_hash.clone(),
requested_view_number,
));
self.builder_commitments
.insert((state_id, builder_hash.clone()));

let encoded_txns: Vec<u8> = payload.encode().to_vec();
let block_size: u64 = encoded_txns.len() as u64;
Expand All @@ -501,7 +467,10 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
);

Some(BuildBlockInfo {
builder_hash,
id: BlockId {
view: self.built_from_proposed_block.view_number,
hash: builder_hash,
},
block_size,
offered_fee,
block_payload: payload,
Expand All @@ -513,9 +482,8 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
}
}

async fn process_block_request(&mut self, req: RequestMessage) {
let requested_view_number =
<<TYPES as NodeType>::Time as ConsensusTime>::new(req.requested_view_number);
async fn process_block_request(&mut self, req: RequestMessage<TYPES>) {
let requested_view_number = req.requested_view_number;
// If a spawned clone is active then it will handle the request, otherwise the highest view num builder will handle
if requested_view_number == self.built_from_proposed_block.view_number {
tracing::info!(
Expand All @@ -525,26 +493,28 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
requested_view_number
);
let response = self
.build_block(
self.built_from_proposed_block.vid_commitment,
requested_view_number,
)
.build_block(BuilderStateId {
parent_commitment: self.built_from_proposed_block.vid_commitment,
view: requested_view_number,
})
.await;

match response {
Some(response) => {
// form the response message
let response_msg = ResponseMessage {
builder_hash: response.builder_hash.clone(),
builder_hash: response.id.hash.clone(),
block_size: response.block_size,
offered_fee: response.offered_fee,
};

let builder_hash = response.builder_hash.clone();
let builder_hash = response.id.hash.clone();
self.global_state.write_arc().await.update_global_state(
BuilderStateId {
parent_commitment: self.built_from_proposed_block.vid_commitment,
view: requested_view_number,
},
response,
self.built_from_proposed_block.vid_commitment,
requested_view_number,
response_msg.clone(),
);

Expand Down Expand Up @@ -581,7 +551,7 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
}
#[tracing::instrument(skip_all, name = "event loop",
fields(builder_built_from_proposed_block = %self.built_from_proposed_block))]
fn event_loop(mut self) {
pub fn event_loop(mut self) {
let _builder_handle = async_spawn(async move {
loop {
tracing::debug!(
Expand Down Expand Up @@ -686,7 +656,7 @@ pub enum MessageType<TYPES: NodeType> {
DecideMessage(DecideMessage<TYPES>),
DaProposalMessage(Arc<DaProposalMessage<TYPES>>),
QCMessage(QCMessage<TYPES>),
RequestMessage(RequestMessage),
RequestMessage(RequestMessage<TYPES>),
}

#[allow(clippy::too_many_arguments)]
Expand Down
33 changes: 33 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
// It also provides one API services external users:
// 1. Serves a user's request to submit a private transaction

use hotshot_types::{
traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment,
};

// providing the core services to support above API services
pub mod builder_state;

Expand All @@ -19,3 +23,32 @@ pub mod service;

// tracking the testing
pub mod testing;

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct BlockId<TYPES: NodeType> {
hash: BuilderCommitment,
view: TYPES::Time,
}

impl<TYPES: NodeType> std::fmt::Display for BlockId<TYPES> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Block({}@{})",
hex::encode(self.hash.as_ref()),
*self.view
)
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct BuilderStateId<TYPES: NodeType> {
parent_commitment: VidCommitment,
view: TYPES::Time,
}

impl<TYPES: NodeType> std::fmt::Display for BuilderStateId<TYPES> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BuilderState({}@{})", self.parent_commitment, *self.view)
}
}
Loading
Loading