Skip to content
This repository has been archived by the owner on Dec 3, 2024. It is now read-only.

Commit

Permalink
Estimate max block size
Browse files Browse the repository at this point in the history
  • Loading branch information
QuentinI committed Aug 15, 2024
1 parent 9fdf328 commit 197b918
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ async-compatibility-layer = { version = "1.1", default-features = false, feature
async-lock = "2.8"
async-std = { version = "1.9.0", features = ["unstable", "attributes"] }
async-trait = "0.1"
bincode = "1.3"
clap = { version = "4.5", features = ["derive", "env"] }
committable = "0.2"
derivative = "2.2"
Expand All @@ -37,4 +38,6 @@ hex = "0.4.3"
hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.70" }

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(async_executor_impl, values("async-std", "tokio"))'] }
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(async_executor_impl, values("async-std", "tokio"))',
] }
50 changes: 43 additions & 7 deletions src/builder_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use async_std::task::spawn_blocking;
#[cfg(async_executor_impl = "tokio")]
use tokio::task::spawn_blocking;

use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -87,6 +87,8 @@ pub struct BuildBlockInfo<TYPES: NodeType> {
pub metadata: <<TYPES as NodeType>::BlockPayload as BlockPayload<TYPES>>::Metadata,
pub vid_trigger: OneShotSender<TriggerStatus>,
pub vid_receiver: UnboundedReceiver<(VidCommitment, VidPrecomputeData)>,
// Could we have included more transactions, but chose not to?
pub truncated: bool,
}

/// Response Message to be put on the response channel
Expand Down Expand Up @@ -169,7 +171,7 @@ pub struct BuilderState<TYPES: NodeType> {
pub tx_receiver: BroadcastReceiver<Arc<ReceivedTransaction<TYPES>>>,

/// filtered queue of available transactions, taken from tx_receiver
pub tx_queue: Vec<Arc<ReceivedTransaction<TYPES>>>,
pub tx_queue: VecDeque<Arc<ReceivedTransaction<TYPES>>>,

/// global state handle, defined in the service.rs
pub global_state: Arc<RwLock<GlobalState<TYPES>>>,
Expand Down Expand Up @@ -490,17 +492,50 @@ impl<TYPES: NodeType> BuilderState<TYPES> {

async_sleep(sleep_interval).await
}

// Don't build an empty block
if self.tx_queue.is_empty() {
return None;
}

let max_block_size = self.global_state.read_arc().await.max_block_size;
let transactions_to_include = self.tx_queue.iter().scan(0, |total_size, tx| {
if *total_size >= max_block_size {
None
} else {
// This way we will include one transaction over our target block length.
// This is done on purpose, otherwise we'd have a possible failure state
// where a single transaction larger than target block state is stuck in
// queue and we just build empty blocks
*total_size += tx.len;
Some(tx.tx.clone())
}
});

if let Ok((payload, metadata)) =
<TYPES::BlockPayload as BlockPayload<TYPES>>::from_transactions(
self.tx_queue.iter().map(|tx| tx.tx.clone()),
transactions_to_include,
&self.validated_state,
&self.instance_state,
)
.await
{
let builder_hash = payload.builder_commitment(&metadata);
// count the number of txns
let txn_count = self.tx_queue.len();
let actual_txn_count = payload.num_transactions(&metadata);

// Payload is empty despite us checking that tx_queue isn't empty earlier.
// This indicates that the block was truncated due to sequencer block length
// limits and we have a transaction too big to ever be included in the head of
// our queue. We need to drop it and mark as "included" so that if we receive
// it again we don't even bother with it.
if actual_txn_count == 0 {
if let Some(txn) = self.tx_queue.pop_front() {
self.txns_in_queue.remove(&txn.commit);
self.included_txns.insert(txn.commit);
};
return None;
}

// insert the recently built block into the builder commitments
self.builder_commitments
Expand Down Expand Up @@ -536,7 +571,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
tracing::info!(
"Builder view num {:?}, building block with {:?} txns, with builder hash {:?}",
self.built_from_proposed_block.view_number,
txn_count,
actual_txn_count,
builder_hash
);

Expand All @@ -551,6 +586,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
metadata,
vid_trigger: trigger_send,
vid_receiver: unbounded_receiver,
truncated: actual_txn_count < self.tx_queue.len(),
})
} else {
tracing::warn!("build block, returning None");
Expand Down Expand Up @@ -744,7 +780,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
qc_receiver: BroadcastReceiver<MessageType<TYPES>>,
req_receiver: BroadcastReceiver<MessageType<TYPES>>,
tx_receiver: BroadcastReceiver<Arc<ReceivedTransaction<TYPES>>>,
tx_queue: Vec<Arc<ReceivedTransaction<TYPES>>>,
tx_queue: VecDeque<Arc<ReceivedTransaction<TYPES>>>,
global_state: Arc<RwLock<GlobalState<TYPES>>>,
num_nodes: NonZeroUsize,
maximize_txn_capture_timeout: Duration,
Expand Down Expand Up @@ -841,7 +877,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
continue;
}
self.txns_in_queue.insert(tx.commit);
self.tx_queue.push(tx);
self.tx_queue.push_back(tx);
}
Err(async_broadcast::TryRecvError::Empty)
| Err(async_broadcast::TryRecvError::Closed) => {
Expand Down
41 changes: 39 additions & 2 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ use std::{fmt::Display, time::Instant};
use tagged_base64::TaggedBase64;
use tide_disco::{method::ReadState, Url};

// Start assuming we're fine calculatig VID for 100 kilobyte blocks
const INITIAL_MAX_BLOCK_SIZE: u64 = 100_000;
// Never go lower than 10 kilobytes
const MAX_BLOCK_SIZE_FLOOR: u64 = 10_000;

// It holds all the necessary information for a block
#[derive(Debug)]
pub struct BlockInfo<Types: NodeType> {
Expand All @@ -68,6 +73,8 @@ pub struct BlockInfo<Types: NodeType> {
pub vid_trigger: Arc<RwLock<Option<OneShotSender<TriggerStatus>>>>,
pub vid_receiver: Arc<RwLock<WaitAndKeep<(VidCommitment, VidPrecomputeData)>>>,
pub offered_fee: u64,
// Could we have included more transactions with this block, but chose not to?
pub truncated: bool,
}

// It holds the information for the proposed block
Expand Down Expand Up @@ -105,9 +112,11 @@ pub struct BuilderStatesInfo<Types: NodeType> {
pub struct ReceivedTransaction<Types: NodeType> {
// the transaction
pub tx: Types::Transaction,
// its hash
// transaction's hash
pub commit: Commitment<Types::Transaction>,
// its source
// transaction's esitmated length
pub len: u64,
// transaction's source
pub source: TransactionSource,
// received time
pub time_in: Instant,
Expand Down Expand Up @@ -135,6 +144,9 @@ pub struct GlobalState<Types: NodeType> {

// highest view running builder task
pub highest_view_num_builder_id: BuilderStateId<Types>,

// estimated maximum block size we can build in time
pub max_block_size: u64,
}

impl<Types: NodeType> GlobalState<Types> {
Expand All @@ -160,6 +172,7 @@ impl<Types: NodeType> GlobalState<Types> {
last_garbage_collected_view_num,
builder_state_to_last_built_block: Default::default(),
highest_view_num_builder_id: bootstrap_id,
max_block_size: INITIAL_MAX_BLOCK_SIZE,
}
}

Expand Down Expand Up @@ -203,6 +216,7 @@ impl<Types: NodeType> GlobalState<Types> {
build_block_info.vid_receiver,
))),
offered_fee: build_block_info.offered_fee,
truncated: build_block_info.truncated,
},
);
}
Expand Down Expand Up @@ -599,6 +613,15 @@ where
Err(_toe) => {
if Instant::now() >= timeout_after {
tracing::warn!("Couldn't get vid commitment in time for block {id}",);
{
// we can't keep up with this block size, reduce max block size by 10%
let mut global_state = self.global_state.write_arc().await;
global_state.max_block_size = std::cmp::min(
global_state.max_block_size
- global_state.max_block_size.div_ceil(10),
MAX_BLOCK_SIZE_FLOOR,
);
}
break Err(BuildError::Error {
message: "Couldn't get vid commitment in time".to_string(),
});
Expand All @@ -619,6 +642,18 @@ where
};

tracing::info!("Got vid commitment for block {id}",);

// This block was truncated, but we got VID in time.
// Maybe we can handle bigger blocks?
if block_info.truncated {
// Increase max block size by 10%
let mut global_state = self.global_state.write_arc().await;
global_state.max_block_size = std::cmp::min(
global_state.max_block_size + global_state.max_block_size.div_ceil(10),
MAX_BLOCK_SIZE_FLOOR,
);
}

if response_received.is_ok() {
let (vid_commitment, vid_precompute_data) =
response_received.map_err(|err| BuildError::Error {
Expand Down Expand Up @@ -1062,12 +1097,14 @@ pub(crate) async fn handle_received_txns<Types: NodeType>(
let time_in = Instant::now();
for tx in txns.into_iter() {
let commit = tx.commit();
let len = bincode::serialized_size(&tx).unwrap_or_default();
let res = tx_sender
.try_broadcast(Arc::new(ReceivedTransaction {
tx,
source: source.clone(),
commit,
time_in,
len,
}))
.inspect(|val| {
if let Some(evicted_txn) = val {
Expand Down
3 changes: 2 additions & 1 deletion src/testing/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use async_broadcast::{
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::{hash::Hash, marker::PhantomData, num::NonZeroUsize};

use async_compatibility_layer::channel::unbounded;
Expand Down Expand Up @@ -100,7 +101,7 @@ mod tests {
let (tx_sender, tx_receiver) = broadcast::<Arc<ReceivedTransaction<TestTypes>>>(
num_test_messages * multiplication_factor,
);
let tx_queue = Vec::new();
let tx_queue = VecDeque::new();
// generate the keys for the buidler
let seed = [201_u8; 32];
let (_builder_pub_key, _builder_private_key) =
Expand Down

0 comments on commit 197b918

Please sign in to comment.