Skip to content
This repository has been archived by the owner on Dec 3, 2024. It is now read-only.

Add block size limits #238

Merged
merged 5 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ async-compatibility-layer = { version = "1.1", default-features = false, feature
async-lock = "2.8"
async-std = { version = "1.9.0", features = ["unstable", "attributes"] }
async-trait = "0.1"
bincode = "1.3"
clap = { version = "4.5", features = ["derive", "env"] }
committable = "0.2"
derivative = "2.2"
Expand All @@ -37,4 +38,6 @@ hex = "0.4.3"
hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.70" }

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(async_executor_impl, values("async-std", "tokio"))'] }
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(async_executor_impl, values("async-std", "tokio"))',
] }
50 changes: 43 additions & 7 deletions src/builder_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use async_std::task::spawn_blocking;
#[cfg(async_executor_impl = "tokio")]
use tokio::task::spawn_blocking;

use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -87,6 +87,8 @@ pub struct BuildBlockInfo<TYPES: NodeType> {
pub metadata: <<TYPES as NodeType>::BlockPayload as BlockPayload<TYPES>>::Metadata,
pub vid_trigger: OneShotSender<TriggerStatus>,
pub vid_receiver: UnboundedReceiver<(VidCommitment, VidPrecomputeData)>,
// Could we have included more transactions, but chose not to?
pub truncated: bool,
}

/// Response Message to be put on the response channel
Expand Down Expand Up @@ -169,7 +171,7 @@ pub struct BuilderState<TYPES: NodeType> {
pub tx_receiver: BroadcastReceiver<Arc<ReceivedTransaction<TYPES>>>,

/// filtered queue of available transactions, taken from tx_receiver
pub tx_queue: Vec<Arc<ReceivedTransaction<TYPES>>>,
pub tx_queue: VecDeque<Arc<ReceivedTransaction<TYPES>>>,

/// global state handle, defined in the service.rs
pub global_state: Arc<RwLock<GlobalState<TYPES>>>,
Expand Down Expand Up @@ -490,17 +492,50 @@ impl<TYPES: NodeType> BuilderState<TYPES> {

async_sleep(sleep_interval).await
}

// Don't build an empty block
if self.tx_queue.is_empty() {
return None;
}

let max_block_size = self.global_state.read_arc().await.max_block_size;
let transactions_to_include = self.tx_queue.iter().scan(0, |total_size, tx| {
if *total_size >= max_block_size {
None
} else {
// This way we will include one transaction over our target block length.
// This is done on purpose, otherwise we'd have a possible failure state
// where a single transaction larger than target block state is stuck in
// queue and we just build empty blocks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions that not necessarily need modifications:

  1. Intuitively I'd assume that we'd skip a transaction larger than the max size, i.e., we'd check if tx.len > max_block_size, and if so, we'd just skip it and try the next transaction. I understand that we may instead want to build for all transactions, but just want to bring this up in case this option hasn't been discussed yet.

  2. Would it be better to include a transaction over the target length only at the beginning? E.g., if the target length is 10, and the length of the first transaction is 12, we will include it. But if we've added a transaction with length 4, and the second transaction has length 12, we won't include it for this block (but it can be added to the next block). I don't think this is strictly better than the current setting, but just feel it's closer to the purpose of avoiding empty blocks without making the length too large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should silently drop transactions on the basis that they're too big for us to handle. What we can do is immediately reject them when they're added to mempool, I'll add a thing for that.

As for dropping transactions to big for sequencer to handle, see my response to your comment below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think the second point above is an improvement we could make, but I don't think it's urgent. And your replies to other questions/comments all make sense! I'll approve once the incrementation thing is fixed and will add this discussion to our tech debt.

Copy link
Contributor Author

@QuentinI QuentinI Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on the second point, addressed it

*total_size += tx.len;
Some(tx.tx.clone())
}
});

if let Ok((payload, metadata)) =
<TYPES::BlockPayload as BlockPayload<TYPES>>::from_transactions(
self.tx_queue.iter().map(|tx| tx.tx.clone()),
transactions_to_include,
&self.validated_state,
&self.instance_state,
)
.await
{
let builder_hash = payload.builder_commitment(&metadata);
// count the number of txns
let txn_count = self.tx_queue.len();
let actual_txn_count = payload.num_transactions(&metadata);

// Payload is empty despite us checking that tx_queue isn't empty earlier.
// This indicates that the block was truncated due to sequencer block length
// limits and we have a transaction too big to ever be included in the head of
// our queue. We need to drop it and mark as "included" so that if we receive
// it again we don't even bother with it.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the "sequencer block length limits" the same as max_block_size? The comment and code here make sense to me, but see my first question above--I'm wondering whether we can skip the large transaction there so that we don't need the list updates here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sequencer block length limits are different from our max_block_size. max_block_size is our estimation of what we can calculate VID for in time, while sequencer limits are determined by chainconfig.

Unfortunately, we can't check if transaction is over the sequencer's limit immediately, because there's no way for us to access the chainconfig, as we don't know anything about the sequencer types. Moreover, even if we did, the sequencer's maximum block size != maximum transaction size, because blocks have overhead on namespaces and other stuff. Moreover moreover, even if we had a way to get a clearly-defined transaction size from the sequencer, we can't even know the actual length of TYPES::Transaction!

So this code block detects if a single transaction is big enough to single-handedly go over the sequencer's block size limit by observing the following scenario:

  • we have some transactions in queue
  • we build a block from those by calling <TYPES::BlockPayload as BlockPayload<TYPES>>::from_transactions
  • the definition of that for sequencer silently truncates the block on the first transaction it encounters that makes the block go over sequencer's limit
  • <TYPES::BlockPayload as BlockPayload<TYPES>>::from_transactions returns a block, but calling num_transactions on it shows that it's completely empty
    This means that the first transaction that we tried to include is too big, so we'll drop it from our queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation. Makes sense now!

if actual_txn_count == 0 {
if let Some(txn) = self.tx_queue.pop_front() {
self.txns_in_queue.remove(&txn.commit);
self.included_txns.insert(txn.commit);
};
return None;
}

// insert the recently built block into the builder commitments
self.builder_commitments
Expand Down Expand Up @@ -536,7 +571,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
tracing::info!(
"Builder view num {:?}, building block with {:?} txns, with builder hash {:?}",
self.built_from_proposed_block.view_number,
txn_count,
actual_txn_count,
builder_hash
);

Expand All @@ -551,6 +586,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
metadata,
vid_trigger: trigger_send,
vid_receiver: unbounded_receiver,
truncated: actual_txn_count < self.tx_queue.len(),
})
} else {
tracing::warn!("build block, returning None");
Expand Down Expand Up @@ -744,7 +780,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
qc_receiver: BroadcastReceiver<MessageType<TYPES>>,
req_receiver: BroadcastReceiver<MessageType<TYPES>>,
tx_receiver: BroadcastReceiver<Arc<ReceivedTransaction<TYPES>>>,
tx_queue: Vec<Arc<ReceivedTransaction<TYPES>>>,
tx_queue: VecDeque<Arc<ReceivedTransaction<TYPES>>>,
global_state: Arc<RwLock<GlobalState<TYPES>>>,
num_nodes: NonZeroUsize,
maximize_txn_capture_timeout: Duration,
Expand Down Expand Up @@ -841,7 +877,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
continue;
}
self.txns_in_queue.insert(tx.commit);
self.tx_queue.push(tx);
self.tx_queue.push_back(tx);
}
Err(async_broadcast::TryRecvError::Empty)
| Err(async_broadcast::TryRecvError::Closed) => {
Expand Down
56 changes: 54 additions & 2 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ use std::{fmt::Display, time::Instant};
use tagged_base64::TaggedBase64;
use tide_disco::{method::ReadState, Url};

// Start assuming we're fine calculatig VID for 100 kilobyte blocks
const INITIAL_MAX_BLOCK_SIZE: u64 = 100_000;
// Never go lower than 10 kilobytes
const MAX_BLOCK_SIZE_FLOOR: u64 = 10_000;
// When adjusting max block size, we it will be decremented or incremented
// by current value / [`MAX_BLOCK_SIZE_CHANGE_DIVISOR`]
const MAX_BLOCK_SIZE_CHANGE_DIVISOR: u64 = 10;
// We will not increment max block value if we aren't able to serve a response
// with a margin below [`ProxyGlobalState::max_api_waiting_time`]
// more than [`ProxyGlobalState::max_api_waiting_time`] / `VID_RESPONSE_TARGET_MARGIN_DIVISOR`
const VID_RESPONSE_TARGET_MARGIN_DIVISOR: u32 = 10;

// It holds all the necessary information for a block
#[derive(Debug)]
pub struct BlockInfo<Types: NodeType> {
Expand All @@ -68,6 +80,8 @@ pub struct BlockInfo<Types: NodeType> {
pub vid_trigger: Arc<RwLock<Option<OneShotSender<TriggerStatus>>>>,
pub vid_receiver: Arc<RwLock<WaitAndKeep<(VidCommitment, VidPrecomputeData)>>>,
pub offered_fee: u64,
// Could we have included more transactions with this block, but chose not to?
pub truncated: bool,
}

// It holds the information for the proposed block
Expand Down Expand Up @@ -105,9 +119,11 @@ pub struct BuilderStatesInfo<Types: NodeType> {
pub struct ReceivedTransaction<Types: NodeType> {
// the transaction
pub tx: Types::Transaction,
// its hash
// transaction's hash
pub commit: Commitment<Types::Transaction>,
// its source
// transaction's esitmated length
pub len: u64,
// transaction's source
pub source: TransactionSource,
// received time
pub time_in: Instant,
Expand Down Expand Up @@ -135,6 +151,9 @@ pub struct GlobalState<Types: NodeType> {

// highest view running builder task
pub highest_view_num_builder_id: BuilderStateId<Types>,

// estimated maximum block size we can build in time
pub max_block_size: u64,
}

impl<Types: NodeType> GlobalState<Types> {
Expand All @@ -160,6 +179,7 @@ impl<Types: NodeType> GlobalState<Types> {
last_garbage_collected_view_num,
builder_state_to_last_built_block: Default::default(),
highest_view_num_builder_id: bootstrap_id,
max_block_size: INITIAL_MAX_BLOCK_SIZE,
}
}

Expand Down Expand Up @@ -203,6 +223,7 @@ impl<Types: NodeType> GlobalState<Types> {
build_block_info.vid_receiver,
))),
offered_fee: build_block_info.offered_fee,
truncated: build_block_info.truncated,
},
);
}
Expand Down Expand Up @@ -599,6 +620,17 @@ where
Err(_toe) => {
if Instant::now() >= timeout_after {
tracing::warn!("Couldn't get vid commitment in time for block {id}",);
{
// we can't keep up with this block size, reduce max block size
let mut global_state = self.global_state.write_arc().await;
global_state.max_block_size = std::cmp::min(
global_state.max_block_size
- global_state
.max_block_size
.div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR),
MAX_BLOCK_SIZE_FLOOR,
);
Comment on lines +632 to +638
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC MAX_BLOCK_SIZE_FLOOR is the minimum block size we accept, so I think this should be cmp::max to make sure we don't set a size lower than MAX_BLOCK_SIZE_FLOOR.

}
break Err(BuildError::Error {
message: "Couldn't get vid commitment in time".to_string(),
});
Expand All @@ -619,6 +651,24 @@ where
};

tracing::info!("Got vid commitment for block {id}",);

// This block was truncated, but we got VID in time with margin left.
// Maybe we can handle bigger blocks?
if block_info.truncated
&& timeout_after.duration_since(Instant::now())
> self.max_api_waiting_time / VID_RESPONSE_TARGET_MARGIN_DIVISOR
{
// Increase max block size
let mut global_state = self.global_state.write_arc().await;
global_state.max_block_size = std::cmp::min(
global_state.max_block_size
+ global_state
.max_block_size
.div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR),
MAX_BLOCK_SIZE_FLOOR,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be MAX_BLOCK_SIZE_CEIL (not sure if we have such value)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch! There's no reason to make such a ceiling, the whole min call is copypasta error.

);
}

if response_received.is_ok() {
let (vid_commitment, vid_precompute_data) =
response_received.map_err(|err| BuildError::Error {
Expand Down Expand Up @@ -1062,12 +1112,14 @@ pub(crate) async fn handle_received_txns<Types: NodeType>(
let time_in = Instant::now();
for tx in txns.into_iter() {
let commit = tx.commit();
let len = bincode::serialized_size(&tx).unwrap_or_default();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only method of estimating transaction's size short of actually serializing it we have. We should add one in HotShot.

let res = tx_sender
.try_broadcast(Arc::new(ReceivedTransaction {
tx,
source: source.clone(),
commit,
time_in,
len,
}))
.inspect(|val| {
if let Some(evicted_txn) = val {
Expand Down
3 changes: 2 additions & 1 deletion src/testing/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use async_broadcast::{
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::{hash::Hash, marker::PhantomData, num::NonZeroUsize};

use async_compatibility_layer::channel::unbounded;
Expand Down Expand Up @@ -100,7 +101,7 @@ mod tests {
let (tx_sender, tx_receiver) = broadcast::<Arc<ReceivedTransaction<TestTypes>>>(
num_test_messages * multiplication_factor,
);
let tx_queue = Vec::new();
let tx_queue = VecDeque::new();
// generate the keys for the buidler
let seed = [201_u8; 32];
let (_builder_pub_key, _builder_private_key) =
Expand Down
Loading