diff --git a/Cargo.lock b/Cargo.lock index 2edc8c7..a597c0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3031,6 +3031,7 @@ dependencies = [ "async-lock 2.8.0", "async-std", "async-trait", + "bincode", "clap", "committable", "derivative", diff --git a/Cargo.toml b/Cargo.toml index b26048e..b0ef843 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ async-compatibility-layer = { version = "1.1", default-features = false, feature async-lock = "2.8" async-std = { version = "1.9.0", features = ["unstable", "attributes"] } async-trait = "0.1" +bincode = "1.3" clap = { version = "4.5", features = ["derive", "env"] } committable = "0.2" derivative = "2.2" @@ -37,4 +38,6 @@ hex = "0.4.3" hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.70" } [lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(async_executor_impl, values("async-std", "tokio"))'] } +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(async_executor_impl, values("async-std", "tokio"))', +] } diff --git a/src/builder_state.rs b/src/builder_state.rs index 28756ed..3e68c52 100644 --- a/src/builder_state.rs +++ b/src/builder_state.rs @@ -32,7 +32,7 @@ use async_std::task::spawn_blocking; #[cfg(async_executor_impl = "tokio")] use tokio::task::spawn_blocking; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt::Debug; use std::sync::Arc; use std::time::Instant; @@ -87,6 +87,8 @@ pub struct BuildBlockInfo { pub metadata: <::BlockPayload as BlockPayload>::Metadata, pub vid_trigger: OneShotSender, pub vid_receiver: UnboundedReceiver<(VidCommitment, VidPrecomputeData)>, + // Could we have included more transactions, but chose not to? + pub truncated: bool, } /// Response Message to be put on the response channel @@ -169,7 +171,7 @@ pub struct BuilderState { pub tx_receiver: BroadcastReceiver>>, /// filtered queue of available transactions, taken from tx_receiver - pub tx_queue: Vec>>, + pub tx_queue: VecDeque>>, /// global state handle, defined in the service.rs pub global_state: Arc>>, @@ -490,9 +492,30 @@ impl BuilderState { async_sleep(sleep_interval).await } + + // Don't build an empty block + if self.tx_queue.is_empty() { + return None; + } + + let max_block_size = self.global_state.read_arc().await.max_block_size; + let transactions_to_include = self.tx_queue.iter().scan(0, |total_size, tx| { + let prev_size = *total_size; + *total_size += tx.len; + // We will include one transaction over our target block length + // if it's the first transaction in queue, otherwise we'd have a possible failure + // state where a single transaction larger than target block state is stuck in + // queue and we just build empty blocks forever + if *total_size >= max_block_size && prev_size != 0 { + None + } else { + Some(tx.tx.clone()) + } + }); + if let Ok((payload, metadata)) = >::from_transactions( - self.tx_queue.iter().map(|tx| tx.tx.clone()), + transactions_to_include, &self.validated_state, &self.instance_state, ) @@ -500,7 +523,27 @@ impl BuilderState { { let builder_hash = payload.builder_commitment(&metadata); // count the number of txns - let txn_count = self.tx_queue.len(); + let actual_txn_count = payload.num_transactions(&metadata); + + // Payload is empty despite us checking that tx_queue isn't empty earlier. + // + // This means that the block was truncated due to *sequencer* block length + // limits, which are different from our `max_block_size`. There's no good way + // for us to check for this in advance, so we detect transactions too big for + // the sequencer indirectly, by observing that we passed some transactions + // to `>::from_transactions`, but + // it returned an empty block. + // Thus we deduce that the first transaction in our queue is too big to *ever* + // be included, because it alone goes over sequencer's block size limit. + // We need to drop it and mark as "included" so that if we receive + // it again we don't even bother with it. + if actual_txn_count == 0 { + if let Some(txn) = self.tx_queue.pop_front() { + self.txns_in_queue.remove(&txn.commit); + self.included_txns.insert(txn.commit); + }; + return None; + } // insert the recently built block into the builder commitments self.builder_commitments @@ -536,7 +579,7 @@ impl BuilderState { tracing::info!( "Builder view num {:?}, building block with {:?} txns, with builder hash {:?}", self.built_from_proposed_block.view_number, - txn_count, + actual_txn_count, builder_hash ); @@ -551,6 +594,7 @@ impl BuilderState { metadata, vid_trigger: trigger_send, vid_receiver: unbounded_receiver, + truncated: actual_txn_count < self.tx_queue.len(), }) } else { tracing::warn!("build block, returning None"); @@ -744,7 +788,7 @@ impl BuilderState { qc_receiver: BroadcastReceiver>, req_receiver: BroadcastReceiver>, tx_receiver: BroadcastReceiver>>, - tx_queue: Vec>>, + tx_queue: VecDeque>>, global_state: Arc>>, num_nodes: NonZeroUsize, maximize_txn_capture_timeout: Duration, @@ -841,7 +885,7 @@ impl BuilderState { continue; } self.txns_in_queue.insert(tx.commit); - self.tx_queue.push(tx); + self.tx_queue.push_back(tx); } Err(async_broadcast::TryRecvError::Empty) | Err(async_broadcast::TryRecvError::Closed) => { diff --git a/src/service.rs b/src/service.rs index 536b050..d2ea211 100644 --- a/src/service.rs +++ b/src/service.rs @@ -60,6 +60,18 @@ use std::{fmt::Display, time::Instant}; use tagged_base64::TaggedBase64; use tide_disco::{method::ReadState, Url}; +// Start assuming we're fine calculatig VID for 100 kilobyte blocks +const INITIAL_MAX_BLOCK_SIZE: u64 = 100_000; +// Never go lower than 10 kilobytes +const MAX_BLOCK_SIZE_FLOOR: u64 = 10_000; +// When adjusting max block size, we it will be decremented or incremented +// by current value / [`MAX_BLOCK_SIZE_CHANGE_DIVISOR`] +const MAX_BLOCK_SIZE_CHANGE_DIVISOR: u64 = 10; +// We will not increment max block value if we aren't able to serve a response +// with a margin below [`ProxyGlobalState::max_api_waiting_time`] +// more than [`ProxyGlobalState::max_api_waiting_time`] / `VID_RESPONSE_TARGET_MARGIN_DIVISOR` +const VID_RESPONSE_TARGET_MARGIN_DIVISOR: u32 = 10; + // It holds all the necessary information for a block #[derive(Debug)] pub struct BlockInfo { @@ -68,6 +80,8 @@ pub struct BlockInfo { pub vid_trigger: Arc>>>, pub vid_receiver: Arc>>, pub offered_fee: u64, + // Could we have included more transactions with this block, but chose not to? + pub truncated: bool, } // It holds the information for the proposed block @@ -105,9 +119,11 @@ pub struct BuilderStatesInfo { pub struct ReceivedTransaction { // the transaction pub tx: Types::Transaction, - // its hash + // transaction's hash pub commit: Commitment, - // its source + // transaction's esitmated length + pub len: u64, + // transaction's source pub source: TransactionSource, // received time pub time_in: Instant, @@ -135,6 +151,9 @@ pub struct GlobalState { // highest view running builder task pub highest_view_num_builder_id: BuilderStateId, + + // estimated maximum block size we can build in time + pub max_block_size: u64, } impl GlobalState { @@ -160,6 +179,7 @@ impl GlobalState { last_garbage_collected_view_num, builder_state_to_last_built_block: Default::default(), highest_view_num_builder_id: bootstrap_id, + max_block_size: INITIAL_MAX_BLOCK_SIZE, } } @@ -203,6 +223,7 @@ impl GlobalState { build_block_info.vid_receiver, ))), offered_fee: build_block_info.offered_fee, + truncated: build_block_info.truncated, }, ); } @@ -234,7 +255,13 @@ impl GlobalState { &self, txns: Vec<::Transaction>, ) -> Vec::Transaction>, BuildError>> { - handle_received_txns(&self.tx_sender, txns, TransactionSource::External).await + handle_received_txns( + &self.tx_sender, + txns, + TransactionSource::External, + self.max_block_size, + ) + .await } pub fn get_channel_for_matching_builder_or_highest_view_buider( @@ -599,6 +626,17 @@ where Err(_toe) => { if Instant::now() >= timeout_after { tracing::warn!("Couldn't get vid commitment in time for block {id}",); + { + // we can't keep up with this block size, reduce max block size + let mut global_state = self.global_state.write_arc().await; + global_state.max_block_size = std::cmp::min( + global_state.max_block_size + - global_state + .max_block_size + .div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR), + MAX_BLOCK_SIZE_FLOOR, + ); + } break Err(BuildError::Error { message: "Couldn't get vid commitment in time".to_string(), }); @@ -619,6 +657,21 @@ where }; tracing::info!("Got vid commitment for block {id}",); + + // This block was truncated, but we got VID in time with margin left. + // Maybe we can handle bigger blocks? + if block_info.truncated + && timeout_after.duration_since(Instant::now()) + > self.max_api_waiting_time / VID_RESPONSE_TARGET_MARGIN_DIVISOR + { + // Increase max block size + let mut global_state = self.global_state.write_arc().await; + global_state.max_block_size = global_state.max_block_size + + global_state + .max_block_size + .div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR); + } + if response_received.is_ok() { let (vid_commitment, vid_precompute_data) = response_received.map_err(|err| BuildError::Error { @@ -785,11 +838,11 @@ pub async fn run_non_permissioned_standalone_builder_service>, - // shared accumulated transactions handle - tx_sender: BroadcastSender>>, - // Url to (re)connect to for the events stream hotshot_events_api_url: Url, + + // Global state + global_state: Arc>>, ) -> Result<(), anyhow::Error> { // connection to the events stream let connected = connect_to_events_service::(hotshot_events_api_url.clone()).await; @@ -801,6 +854,8 @@ pub async fn run_non_permissioned_standalone_builder_service { - handle_received_txns(&tx_sender, transactions, TransactionSource::HotShot) - .await; + let max_block_size = global_state.read_arc().await.max_block_size; + handle_received_txns( + &tx_sender, + transactions, + TransactionSource::HotShot, + max_block_size, + ) + .await; } // decide event EventType::Decide { @@ -878,9 +939,6 @@ pub async fn run_permissioned_standalone_builder_service< I: NodeImplementation, V: Versions, >( - // sending received transactions - tx_sender: BroadcastSender>>, - // sending a DA proposal from the hotshot to the builder states da_sender: BroadcastSender>, @@ -892,8 +950,12 @@ pub async fn run_permissioned_standalone_builder_service< // hotshot context handle hotshot_handle: Arc>, + + // Global state + global_state: Arc>>, ) { let mut event_stream = hotshot_handle.event_stream(); + let tx_sender = global_state.read_arc().await.tx_sender.clone(); loop { tracing::debug!("Waiting for events from HotShot"); match event_stream.next().await { @@ -908,8 +970,14 @@ pub async fn run_permissioned_standalone_builder_service< } // tx event EventType::Transactions { transactions } => { - handle_received_txns(&tx_sender, transactions, TransactionSource::HotShot) - .await; + let max_block_size = global_state.read_arc().await.max_block_size; + handle_received_txns( + &tx_sender, + transactions, + TransactionSource::HotShot, + max_block_size, + ) + .await; } // decide event EventType::Decide { leaf_chain, .. } => { @@ -1057,17 +1125,30 @@ pub(crate) async fn handle_received_txns( tx_sender: &BroadcastSender>>, txns: Vec, source: TransactionSource, + max_txn_len: u64, ) -> Vec::Transaction>, BuildError>> { let mut results = Vec::with_capacity(txns.len()); let time_in = Instant::now(); for tx in txns.into_iter() { let commit = tx.commit(); + // This is a rough estimate, but we don't have any other way to get real + // encoded transaction length. Luckily, this being roughly proportional + // to encoded length is enough, because we only use this value to estimate + // our limitations on computing the VID in time. + let len = bincode::serialized_size(&tx).unwrap_or_default(); + if len > max_txn_len { + results.push(Err(BuildError::Error { + message: format!("Transaction too big (estimated length {len}, currently accepting <= {max_txn_len})"), + })); + continue; + } let res = tx_sender .try_broadcast(Arc::new(ReceivedTransaction { tx, source: source.clone(), commit, time_in, + len, })) .inspect(|val| { if let Some(evicted_txn) = val { diff --git a/src/testing/basic_test.rs b/src/testing/basic_test.rs index 4781ab9..1e2c955 100644 --- a/src/testing/basic_test.rs +++ b/src/testing/basic_test.rs @@ -18,6 +18,7 @@ pub use async_broadcast::{ #[cfg(test)] mod tests { use super::*; + use std::collections::VecDeque; use std::{hash::Hash, marker::PhantomData, num::NonZeroUsize}; use async_compatibility_layer::channel::unbounded; @@ -100,7 +101,7 @@ mod tests { let (tx_sender, tx_receiver) = broadcast::>>( num_test_messages * multiplication_factor, ); - let tx_queue = Vec::new(); + let tx_queue = VecDeque::new(); // generate the keys for the buidler let seed = [201_u8; 32]; let (_builder_pub_key, _builder_private_key) = @@ -311,8 +312,13 @@ mod tests { // validate the signature before pushing the message to the builder_state channels // currently this step happens in the service.rs, wheneve we receiver an hotshot event tracing::debug!("Sending transaction message: {:?}", tx); - for res in - handle_received_txns(&tx_sender, vec![tx.clone()], TransactionSource::HotShot).await + for res in handle_received_txns( + &tx_sender, + vec![tx.clone()], + TransactionSource::HotShot, + u64::MAX, + ) + .await { res.unwrap(); }