diff --git a/src/builder_state.rs b/src/builder_state.rs index cdd58611..94f1946a 100644 --- a/src/builder_state.rs +++ b/src/builder_state.rs @@ -142,6 +142,9 @@ pub struct BuilderState { /// Expiring txs to be garbage collected pub included_txns_expiring: HashSet>, + /// txns currently in the tx_queue + pub txns_in_queue: HashSet>, + /// da_proposal_payload_commit to (da_proposal, node_count) #[allow(clippy::type_complexity)] pub da_proposal_payload_commit_to_da_proposal: @@ -491,9 +494,14 @@ impl BuilderProgress for BuilderState { let block_payload = >::from_bytes(encoded_txns, metadata); let txn_commitments = block_payload.transaction_commitments(metadata); + + for tx in txn_commitments.iter() { + self.txns_in_queue.remove(tx); + } + self.included_txns.extend(txn_commitments.iter()); self.tx_queue - .retain(|tx| !self.included_txns.contains(&tx.commit)); + .retain(|tx| self.txns_in_queue.contains(&tx.commit)); // register the spawned builder state to spawned_builder_states in the global state self.global_state.write_arc().await.register_builder_state( @@ -785,10 +793,12 @@ impl BuilderState { txn_garbage_collect_duration: Duration, validated_state: Arc, ) -> Self { + let txns_in_queue: HashSet<_> = tx_queue.iter().map(|tx| tx.commit).collect(); BuilderState { included_txns: HashSet::new(), included_txns_old: HashSet::new(), included_txns_expiring: HashSet::new(), + txns_in_queue, built_from_proposed_block, decide_receiver, da_proposal_receiver, @@ -836,6 +846,7 @@ impl BuilderState { included_txns, included_txns_old, included_txns_expiring, + txns_in_queue: self.txns_in_queue.clone(), built_from_proposed_block: self.built_from_proposed_block.clone(), decide_receiver: self.decide_receiver.clone(), da_proposal_receiver: self.da_proposal_receiver.clone(), @@ -865,9 +876,11 @@ impl BuilderState { if self.included_txns.contains(&tx.commit) || self.included_txns_old.contains(&tx.commit) || self.included_txns_expiring.contains(&tx.commit) + || self.txns_in_queue.contains(&tx.commit) { continue; } + self.txns_in_queue.insert(tx.commit); self.tx_queue.push(tx); } Err(async_broadcast::TryRecvError::Empty) diff --git a/src/service.rs b/src/service.rs index 052feac1..85a6716b 100644 --- a/src/service.rs +++ b/src/service.rs @@ -32,6 +32,7 @@ use anyhow::{anyhow, Context}; use async_broadcast::Sender as BroadcastSender; pub use async_broadcast::{broadcast, RecvError, TryRecvError}; use async_compatibility_layer::{ + art::async_sleep, art::async_timeout, channel::{unbounded, OneShotSender}, }; @@ -303,6 +304,7 @@ impl GlobalState { key: &(VidCommitment, Types::Time), ) -> Result<&BroadcastSender>, BuildError> { if let Some(channel) = self.spawned_builder_states.get(key) { + tracing::info!("Got matching builder for parent {:?}@{:?}", key.0, key.1); Ok(channel) } else { tracing::warn!( @@ -390,6 +392,8 @@ where sender: Types::SignatureKey, signature: &::PureAssembledSignatureType, ) -> Result>, BuildError> { + let starting_time = Instant::now(); + // verify the signature if !sender.validate(signature, for_parent.as_ref()) { tracing::error!("Signature validation failed in get_available_blocks"); @@ -405,40 +409,74 @@ where ); let view_num = <::Time as ConsensusTime>::new(view_number); - // check in the local spawned builder states, if it doesn't exist it means there could be two cases - // it has been sent to garbed collected, or never exists, in later case let higest view num builder state build a block for it - let just_return_with_this = { - //let global_state = self.global_state.read_arc().await; - if self - .global_state - .read_arc() - .await - .spawned_builder_states - .contains_key(&(*for_parent, view_num)) + // check in the local spawned builder states + // if it doesn't exist; there are three cases + // 1) it has already been garbage collected (view < decide) and we should return an error + // 2) it has not yet been created, and we should try to wait + // 3) we missed the triggering event, and should use the BuilderState with the highest available view + + { + // 1st case: Decide event received, and not bootstrapping. + // If this `BlockBuilder` hasn't been reaped, it should have been. + let global_state = self.global_state.read_arc().await; + if global_state.last_garbage_collected_view_num >= view_num + && global_state.highest_view_num_builder_id.1 != view_num { - None - } else { - self.global_state - .read_arc() - .await - .builder_state_to_last_built_block - .get(&(*for_parent, view_num)) - .cloned() + return Err(BuildError::Error { + message: + "Request for available blocks for a view that has already been decided." + .to_string(), + }); } - }; + } + let (response_sender, response_receiver) = unbounded(); let req_msg = RequestMessage { requested_vid_commitment: (*for_parent), requested_view_number: view_number, response_channel: response_sender, }; - let response_received = if let Some(response_cached) = just_return_with_this { - Ok(response_cached) - } else { - let timeout_after = Instant::now() + self.max_api_waiting_time; - let check_duration = self.max_api_waiting_time / 10; + let timeout_after = starting_time + self.max_api_waiting_time; + let check_duration = self.max_api_waiting_time / 10; + + let time_to_wait_for_matching_builder = starting_time + self.max_api_waiting_time / 2; + + let mut sent = false; + let key = (*for_parent, view_num); + while !sent && Instant::now() < time_to_wait_for_matching_builder { + // try to broadcast the request to the correct builder state + if let Some(builder) = self + .global_state + .read_arc() + .await + .spawned_builder_states + .get(&key) + { + tracing::info!( + "Got matching BlockBuilder for {:?}@{view_number}, sending get_available_blocks request", + req_msg.requested_vid_commitment + ); + if let Err(e) = builder + .broadcast(MessageType::RequestMessage(req_msg.clone())) + .await + { + tracing::warn!( + "Error {e} sending get_available_blocks request for parent {:?}@{view_number}", + req_msg.requested_vid_commitment + ); + } + sent = true; + } else { + tracing::info!( + "Failed to get matching BlockBuilder for {:?}@{view_number}, will try again", + req_msg.requested_vid_commitment + ); + async_sleep(check_duration).await + } + } - // broadcast the request to the builder states + if !sent { + // broadcast the request to the best fallback builder state if let Err(e) = self .global_state .read_arc() @@ -452,48 +490,49 @@ where req_msg.requested_vid_commitment ); } + } - tracing::debug!( - "Waiting for response for get_available_blocks with parent {:?}@{view_number}", - req_msg.requested_vid_commitment - ); + tracing::debug!( + "Waiting for response for get_available_blocks with parent {:?}@{view_number}", + req_msg.requested_vid_commitment + ); - loop { - match async_timeout(check_duration, response_receiver.recv()).await { - Err(toe) => { - if Instant::now() >= timeout_after { - tracing::warn!(%toe, "Couldn't get available blocks in time for parent {:?}", req_msg.requested_vid_commitment); - // lookup into the builder_state_to_last_built_block, if it contains the result, return that otherwise return error - if let Some(last_built_block) = self - .global_state - .read_arc() - .await - .builder_state_to_last_built_block - .get(&(*for_parent, view_num)) - { - tracing::info!( - "Returning last built block for parent {:?}", - req_msg.requested_vid_commitment - ); - break Ok(last_built_block.clone()); - } - break Err(BuildError::Error { - message: "No blocks available".to_string(), - }); - } - continue; - } - Ok(recv_attempt) => { - if let Err(ref e) = recv_attempt { - tracing::error!(%e, "Channel closed while getting available blocks for parent {:?}", req_msg.requested_vid_commitment); + let response_received = loop { + match async_timeout(check_duration, response_receiver.recv()).await { + Err(toe) => { + if Instant::now() >= timeout_after { + tracing::warn!(%toe, "Couldn't get available blocks in time for parent {:?}", req_msg.requested_vid_commitment); + // lookup into the builder_state_to_last_built_block, if it contains the result, return that otherwise return error + if let Some(last_built_block) = self + .global_state + .read_arc() + .await + .builder_state_to_last_built_block + .get(&(*for_parent, view_num)) + { + tracing::info!( + "Returning last built block for parent {:?}", + req_msg.requested_vid_commitment + ); + break Ok(last_built_block.clone()); } - break recv_attempt.map_err(|_| BuildError::Error { - message: "channel unexpectedly closed".to_string(), + break Err(BuildError::Error { + message: "No blocks available".to_string(), }); } + continue; + } + Ok(recv_attempt) => { + if let Err(ref e) = recv_attempt { + tracing::error!(%e, "Channel closed while getting available blocks for parent {:?}", req_msg.requested_vid_commitment); + } + break recv_attempt.map_err(|_| BuildError::Error { + message: "channel unexpectedly closed".to_string(), + }); } } }; + // }; match response_received { Ok(response) => {