Skip to content
This repository has been archived by the owner on Dec 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #196 from EspressoSystems/nfy/reducing_duplicate_txns
Browse files Browse the repository at this point in the history
Give BlockBuilder time to register; fix same block duplicate txns regression
  • Loading branch information
nyospe authored Jul 1, 2024
2 parents 5ad88e9 + bf10e84 commit 778fc05
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 60 deletions.
15 changes: 14 additions & 1 deletion src/builder_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ pub struct BuilderState<TYPES: NodeType> {
/// Expiring txs to be garbage collected
pub included_txns_expiring: HashSet<Commitment<TYPES::Transaction>>,

/// txns currently in the tx_queue
pub txns_in_queue: HashSet<Commitment<TYPES::Transaction>>,

/// da_proposal_payload_commit to (da_proposal, node_count)
#[allow(clippy::type_complexity)]
pub da_proposal_payload_commit_to_da_proposal:
Expand Down Expand Up @@ -491,9 +494,14 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
let block_payload =
<TYPES::BlockPayload as BlockPayload<TYPES>>::from_bytes(encoded_txns, metadata);
let txn_commitments = block_payload.transaction_commitments(metadata);

for tx in txn_commitments.iter() {
self.txns_in_queue.remove(tx);
}

self.included_txns.extend(txn_commitments.iter());
self.tx_queue
.retain(|tx| !self.included_txns.contains(&tx.commit));
.retain(|tx| self.txns_in_queue.contains(&tx.commit));

// register the spawned builder state to spawned_builder_states in the global state
self.global_state.write_arc().await.register_builder_state(
Expand Down Expand Up @@ -785,10 +793,12 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
txn_garbage_collect_duration: Duration,
validated_state: Arc<TYPES::ValidatedState>,
) -> Self {
let txns_in_queue: HashSet<_> = tx_queue.iter().map(|tx| tx.commit).collect();
BuilderState {
included_txns: HashSet::new(),
included_txns_old: HashSet::new(),
included_txns_expiring: HashSet::new(),
txns_in_queue,
built_from_proposed_block,
decide_receiver,
da_proposal_receiver,
Expand Down Expand Up @@ -836,6 +846,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
included_txns,
included_txns_old,
included_txns_expiring,
txns_in_queue: self.txns_in_queue.clone(),
built_from_proposed_block: self.built_from_proposed_block.clone(),
decide_receiver: self.decide_receiver.clone(),
da_proposal_receiver: self.da_proposal_receiver.clone(),
Expand Down Expand Up @@ -865,9 +876,11 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
if self.included_txns.contains(&tx.commit)
|| self.included_txns_old.contains(&tx.commit)
|| self.included_txns_expiring.contains(&tx.commit)
|| self.txns_in_queue.contains(&tx.commit)
{
continue;
}
self.txns_in_queue.insert(tx.commit);
self.tx_queue.push(tx);
}
Err(async_broadcast::TryRecvError::Empty)
Expand Down
157 changes: 98 additions & 59 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use anyhow::{anyhow, Context};
use async_broadcast::Sender as BroadcastSender;
pub use async_broadcast::{broadcast, RecvError, TryRecvError};
use async_compatibility_layer::{
art::async_sleep,
art::async_timeout,
channel::{unbounded, OneShotSender},
};
Expand Down Expand Up @@ -303,6 +304,7 @@ impl<Types: NodeType> GlobalState<Types> {
key: &(VidCommitment, Types::Time),
) -> Result<&BroadcastSender<MessageType<Types>>, BuildError> {
if let Some(channel) = self.spawned_builder_states.get(key) {
tracing::info!("Got matching builder for parent {:?}@{:?}", key.0, key.1);
Ok(channel)
} else {
tracing::warn!(
Expand Down Expand Up @@ -390,6 +392,8 @@ where
sender: Types::SignatureKey,
signature: &<Types::SignatureKey as SignatureKey>::PureAssembledSignatureType,
) -> Result<Vec<AvailableBlockInfo<Types>>, BuildError> {
let starting_time = Instant::now();

// verify the signature
if !sender.validate(signature, for_parent.as_ref()) {
tracing::error!("Signature validation failed in get_available_blocks");
Expand All @@ -405,40 +409,74 @@ where
);

let view_num = <<Types as NodeType>::Time as ConsensusTime>::new(view_number);
// check in the local spawned builder states, if it doesn't exist it means there could be two cases
// it has been sent to garbed collected, or never exists, in later case let higest view num builder state build a block for it
let just_return_with_this = {
//let global_state = self.global_state.read_arc().await;
if self
.global_state
.read_arc()
.await
.spawned_builder_states
.contains_key(&(*for_parent, view_num))
// check in the local spawned builder states
// if it doesn't exist; there are three cases
// 1) it has already been garbage collected (view < decide) and we should return an error
// 2) it has not yet been created, and we should try to wait
// 3) we missed the triggering event, and should use the BuilderState with the highest available view

{
// 1st case: Decide event received, and not bootstrapping.
// If this `BlockBuilder` hasn't been reaped, it should have been.
let global_state = self.global_state.read_arc().await;
if global_state.last_garbage_collected_view_num >= view_num
&& global_state.highest_view_num_builder_id.1 != view_num
{
None
} else {
self.global_state
.read_arc()
.await
.builder_state_to_last_built_block
.get(&(*for_parent, view_num))
.cloned()
return Err(BuildError::Error {
message:
"Request for available blocks for a view that has already been decided."
.to_string(),
});
}
};
}

let (response_sender, response_receiver) = unbounded();
let req_msg = RequestMessage {
requested_vid_commitment: (*for_parent),
requested_view_number: view_number,
response_channel: response_sender,
};
let response_received = if let Some(response_cached) = just_return_with_this {
Ok(response_cached)
} else {
let timeout_after = Instant::now() + self.max_api_waiting_time;
let check_duration = self.max_api_waiting_time / 10;
let timeout_after = starting_time + self.max_api_waiting_time;
let check_duration = self.max_api_waiting_time / 10;

let time_to_wait_for_matching_builder = starting_time + self.max_api_waiting_time / 2;

let mut sent = false;
let key = (*for_parent, view_num);
while !sent && Instant::now() < time_to_wait_for_matching_builder {
// try to broadcast the request to the correct builder state
if let Some(builder) = self
.global_state
.read_arc()
.await
.spawned_builder_states
.get(&key)
{
tracing::info!(
"Got matching BlockBuilder for {:?}@{view_number}, sending get_available_blocks request",
req_msg.requested_vid_commitment
);
if let Err(e) = builder
.broadcast(MessageType::RequestMessage(req_msg.clone()))
.await
{
tracing::warn!(
"Error {e} sending get_available_blocks request for parent {:?}@{view_number}",
req_msg.requested_vid_commitment
);
}
sent = true;
} else {
tracing::info!(
"Failed to get matching BlockBuilder for {:?}@{view_number}, will try again",
req_msg.requested_vid_commitment
);
async_sleep(check_duration).await
}
}

// broadcast the request to the builder states
if !sent {
// broadcast the request to the best fallback builder state
if let Err(e) = self
.global_state
.read_arc()
Expand All @@ -452,48 +490,49 @@ where
req_msg.requested_vid_commitment
);
}
}

tracing::debug!(
"Waiting for response for get_available_blocks with parent {:?}@{view_number}",
req_msg.requested_vid_commitment
);
tracing::debug!(
"Waiting for response for get_available_blocks with parent {:?}@{view_number}",
req_msg.requested_vid_commitment
);

loop {
match async_timeout(check_duration, response_receiver.recv()).await {
Err(toe) => {
if Instant::now() >= timeout_after {
tracing::warn!(%toe, "Couldn't get available blocks in time for parent {:?}", req_msg.requested_vid_commitment);
// lookup into the builder_state_to_last_built_block, if it contains the result, return that otherwise return error
if let Some(last_built_block) = self
.global_state
.read_arc()
.await
.builder_state_to_last_built_block
.get(&(*for_parent, view_num))
{
tracing::info!(
"Returning last built block for parent {:?}",
req_msg.requested_vid_commitment
);
break Ok(last_built_block.clone());
}
break Err(BuildError::Error {
message: "No blocks available".to_string(),
});
}
continue;
}
Ok(recv_attempt) => {
if let Err(ref e) = recv_attempt {
tracing::error!(%e, "Channel closed while getting available blocks for parent {:?}", req_msg.requested_vid_commitment);
let response_received = loop {
match async_timeout(check_duration, response_receiver.recv()).await {
Err(toe) => {
if Instant::now() >= timeout_after {
tracing::warn!(%toe, "Couldn't get available blocks in time for parent {:?}", req_msg.requested_vid_commitment);
// lookup into the builder_state_to_last_built_block, if it contains the result, return that otherwise return error
if let Some(last_built_block) = self
.global_state
.read_arc()
.await
.builder_state_to_last_built_block
.get(&(*for_parent, view_num))
{
tracing::info!(
"Returning last built block for parent {:?}",
req_msg.requested_vid_commitment
);
break Ok(last_built_block.clone());
}
break recv_attempt.map_err(|_| BuildError::Error {
message: "channel unexpectedly closed".to_string(),
break Err(BuildError::Error {
message: "No blocks available".to_string(),
});
}
continue;
}
Ok(recv_attempt) => {
if let Err(ref e) = recv_attempt {
tracing::error!(%e, "Channel closed while getting available blocks for parent {:?}", req_msg.requested_vid_commitment);
}
break recv_attempt.map_err(|_| BuildError::Error {
message: "channel unexpectedly closed".to_string(),
});
}
}
};
// };

match response_received {
Ok(response) => {
Expand Down

0 comments on commit 778fc05

Please sign in to comment.