Skip to content
This repository has been archived by the owner on Dec 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #222 from EspressoSystems/ag/size-limit
Browse files Browse the repository at this point in the history
Backpressure for private mempool & some refactorings
  • Loading branch information
QuentinI authored Jul 31, 2024
2 parents d0d7637 + 095b006 commit 5643cd6
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 280 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ tide-disco = "0.9"
tokio = "1"
tracing = "0.1"
vbs = "0.1"
lru = "0.12.3"
hex = "0.4.3"

[dev-dependencies]
hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.67" }

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(async_executor_impl, values("async-std"))', 'cfg(async_executor_impl, values("tokio"))'] }
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(async_executor_impl, values("async-std", "tokio"))'] }
105 changes: 32 additions & 73 deletions src/builder_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ use hotshot_types::{

use committable::{Commitment, Committable};

use crate::service::{GlobalState, ReceivedTransaction};
use crate::{
service::{GlobalState, ReceivedTransaction},
BlockId, BuilderStateId,
};
use async_broadcast::broadcast;
use async_broadcast::Receiver as BroadcastReceiver;
use async_broadcast::Sender as BroadcastSender;
use async_compatibility_layer::channel::{oneshot, unbounded, UnboundedSender};
use async_compatibility_layer::{art::async_sleep, channel::OneShotSender};
use async_compatibility_layer::{art::async_spawn, channel::UnboundedReceiver};
use async_lock::RwLock;
use async_trait::async_trait;
use core::panic;
use futures::StreamExt;

Expand Down Expand Up @@ -66,9 +68,8 @@ pub struct QCMessage<TYPES: NodeType> {
}
/// Request Message to be put on the request channel
#[derive(Clone, Debug)]
pub struct RequestMessage {
pub requested_vid_commitment: VidCommitment,
pub requested_view_number: u64,
pub struct RequestMessage<Types: NodeType> {
pub state_id: BuilderStateId<Types>,
pub response_channel: UnboundedSender<ResponseMessage>,
}
pub enum TriggerStatus {
Expand All @@ -79,7 +80,7 @@ pub enum TriggerStatus {
/// Response Message to be put on the response channel
#[derive(Debug)]
pub struct BuildBlockInfo<TYPES: NodeType> {
pub builder_hash: BuilderCommitment,
pub id: BlockId<TYPES>,
pub block_size: u64,
pub offered_fee: u64,
pub block_payload: TYPES::BlockPayload,
Expand Down Expand Up @@ -177,7 +178,7 @@ pub struct BuilderState<TYPES: NodeType> {
pub total_nodes: NonZeroUsize,

/// locally spawned builder Commitements
pub builder_commitments: HashSet<(VidCommitment, BuilderCommitment, TYPES::Time)>,
pub builder_commitments: HashSet<(BuilderStateId<TYPES>, BuilderCommitment)>,

/// timeout for maximising the txns in the block
pub maximize_txn_capture_timeout: Duration,
Expand All @@ -199,43 +200,7 @@ pub struct BuilderState<TYPES: NodeType> {
pub next_txn_garbage_collect_time: Instant,
}

/// Trait to hold the helper functions for the builder
#[async_trait]
pub trait BuilderProgress<TYPES: NodeType> {
/// process the DA proposal
async fn process_da_proposal(&mut self, da_msg: DaProposalMessage<TYPES>);

/// process the quorum proposal
async fn process_quorum_proposal(&mut self, qc_msg: QCMessage<TYPES>);

/// process the decide event
async fn process_decide_event(&mut self, decide_msg: DecideMessage<TYPES>) -> Option<Status>;

/// spawn a clone of builder
async fn spawn_clone(
self,
da_proposal: DAProposalInfo<TYPES>,
quorum_proposal: Arc<Proposal<TYPES, QuorumProposal<TYPES>>>,
leader: TYPES::SignatureKey,
req_sender: BroadcastSender<MessageType<TYPES>>,
);

/// build a block
async fn build_block(
&mut self,
matching_builder_commitment: VidCommitment,
matching_view_number: TYPES::Time,
) -> Option<BuildBlockInfo<TYPES>>;

/// Event Loop
fn event_loop(self);

/// process the block request
async fn process_block_request(&mut self, req: RequestMessage);
}

#[async_trait]
impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
impl<TYPES: NodeType> BuilderState<TYPES> {
/// processing the DA proposal
#[tracing::instrument(skip_all, name = "process da proposal",
fields(builder_built_from_proposed_block = %self.built_from_proposed_block))]
Expand Down Expand Up @@ -494,8 +459,10 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {

// register the spawned builder state to spawned_builder_states in the global state
self.global_state.write_arc().await.register_builder_state(
self.built_from_proposed_block.vid_commitment,
self.built_from_proposed_block.view_number,
BuilderStateId {
parent_commitment: self.built_from_proposed_block.vid_commitment,
view: self.built_from_proposed_block.view_number,
},
req_sender,
);

Expand All @@ -507,8 +474,7 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
fields(builder_built_from_proposed_block = %self.built_from_proposed_block))]
async fn build_block(
&mut self,
matching_vid: VidCommitment,
requested_view_number: TYPES::Time,
state_id: BuilderStateId<TYPES>,
) -> Option<BuildBlockInfo<TYPES>> {
let timeout_after = Instant::now() + self.maximize_txn_capture_timeout;
let sleep_interval = self.maximize_txn_capture_timeout / 10;
Expand Down Expand Up @@ -537,11 +503,8 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
let txn_count = self.tx_queue.len();

// insert the recently built block into the builder commitments
self.builder_commitments.insert((
matching_vid,
builder_hash.clone(),
requested_view_number,
));
self.builder_commitments
.insert((state_id, builder_hash.clone()));

let encoded_txns: Vec<u8> = payload.encode().to_vec();
let block_size: u64 = encoded_txns.len() as u64;
Expand Down Expand Up @@ -578,7 +541,10 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
);

Some(BuildBlockInfo {
builder_hash,
id: BlockId {
view: self.built_from_proposed_block.view_number,
hash: builder_hash,
},
block_size,
offered_fee,
block_payload: payload,
Expand All @@ -592,46 +558,39 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
}
}

async fn process_block_request(&mut self, req: RequestMessage) {
let requested_vid_commitment = req.requested_vid_commitment;
let requested_view_number =
<<TYPES as NodeType>::Time as ConsensusTime>::new(req.requested_view_number);
async fn process_block_request(&mut self, req: RequestMessage<TYPES>) {
// If a spawned clone is active then it will handle the request, otherwise the highest view num builder will handle
if (requested_vid_commitment == self.built_from_proposed_block.vid_commitment
&& requested_view_number == self.built_from_proposed_block.view_number)
if (req.state_id.parent_commitment == self.built_from_proposed_block.vid_commitment
&& req.state_id.view == self.built_from_proposed_block.view_number)
|| (self.built_from_proposed_block.view_number.u64()
== self
.global_state
.read_arc()
.await
.highest_view_num_builder_id
.1
.view
.u64())
{
tracing::info!(
"Request handled by builder with view {:?} for (parent {:?}, view_num: {:?})",
"Request for parent {} handled by builder with view {:?}",
req.state_id,
self.built_from_proposed_block.view_number,
requested_vid_commitment,
requested_view_number
);
let response = self
.build_block(requested_vid_commitment, requested_view_number)
.await;
let response = self.build_block(req.state_id.clone()).await;

match response {
Some(response) => {
// form the response message
let response_msg = ResponseMessage {
builder_hash: response.builder_hash.clone(),
builder_hash: response.id.hash.clone(),
block_size: response.block_size,
offered_fee: response.offered_fee,
};

let builder_hash = response.builder_hash.clone();
let builder_hash = response.id.hash.clone();
self.global_state.write_arc().await.update_global_state(
req.state_id.clone(),
response,
requested_vid_commitment,
requested_view_number,
response_msg.clone(),
);

Expand Down Expand Up @@ -668,7 +627,7 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
}
#[tracing::instrument(skip_all, name = "event loop",
fields(builder_built_from_proposed_block = %self.built_from_proposed_block))]
fn event_loop(mut self) {
pub fn event_loop(mut self) {
let _builder_handle = async_spawn(async move {
loop {
tracing::debug!(
Expand Down Expand Up @@ -773,7 +732,7 @@ pub enum MessageType<TYPES: NodeType> {
DecideMessage(DecideMessage<TYPES>),
DaProposalMessage(DaProposalMessage<TYPES>),
QCMessage(QCMessage<TYPES>),
RequestMessage(RequestMessage),
RequestMessage(RequestMessage<TYPES>),
}

#[allow(clippy::too_many_arguments)]
Expand Down
35 changes: 34 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ pub mod service;
pub mod testing;

use async_compatibility_layer::channel::UnboundedReceiver;
use hotshot_builder_api::v0_2::builder::BuildError;
use hotshot_builder_api::v0_1::builder::BuildError;
use hotshot_types::{
traits::node_implementation::NodeType, utils::BuilderCommitment, vid::VidCommitment,
};

#[derive(Debug)]
pub enum WaitAndKeep<T> {
Keep(T),
Expand All @@ -45,3 +49,32 @@ impl<T: Clone> WaitAndKeep<T> {
}
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct BlockId<Types: NodeType> {
hash: BuilderCommitment,
view: Types::Time,
}

impl<Types: NodeType> std::fmt::Display for BlockId<Types> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Block({}@{})",
hex::encode(self.hash.as_ref()),
*self.view
)
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct BuilderStateId<Types: NodeType> {
parent_commitment: VidCommitment,
view: Types::Time,
}

impl<Types: NodeType> std::fmt::Display for BuilderStateId<Types> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BuilderState({}@{})", self.parent_commitment, *self.view)
}
}
Loading

0 comments on commit 5643cd6

Please sign in to comment.