Skip to content

Commit

Permalink
Fix BuilderState unecessary forking
Browse files Browse the repository at this point in the history
The current `BuilderState` extending behavior fails to anticipate enough of the
various edge cases that determine whether the current `BuilderState` is the best,
or one of the only `BuilderState`s that should be extended.  Due to this behavior
many more `BuilderState`s are spawned that really should not be spawned.  This
causes a lot of unecessary computation and memory usage that will never get cleaned
up as the `Sender`s that are registered for them get overwritten by subsequent
clones that have the same `BuilderStateId`. This effectively causes an async
memory leak, async processing leak, and a potential fork-bomb that will effectively
prevent the builder from being responsive.

Additionally, the race condition that stems from multiple `BuilderState`s spawning
a clone that extends them for the next `QuorumProposal` means that we often
recieve an unexpected list of transactions as a result.  This behavior can be
best seen by the test
marketplace::testing::order_test::test_builder_order_chain_fork`.

To fix these issues a few changes have been maded.  First a new method has been
added called `spawn_clone_that_extends_self` that ensures a standard way of
spawning a new `BuilderState` that extends the current `BuilderState`. Another
method has been added called `am_i_the_best_builder_state_to_extend` that
attempts to ensure that we are the best fit, or one of the only fits that can
extend the current `BuilderState` based on the given `QuorumProposal`. Finally,
as a safe guard, a check has been added when calling `spawn_clone` that ensures
that the same `BuilderStateId` will not be registered more than once.  IE, if
there is already an async process spawned, there is no need to spawn another.

Remove newly added asserts as they fail to represent the correct anticipated intermediate state
  • Loading branch information
Ayiga committed Sep 10, 2024
1 parent a96d8d3 commit 2d5ed84
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 65 deletions.
242 changes: 185 additions & 57 deletions crates/marketplace/src/builder_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,111 @@ pub struct BuilderState<TYPES: NodeType> {
}

impl<TYPES: NodeType> BuilderState<TYPES> {
/// [am_i_the_best_builder_state_to_extend] is a utility method that
/// attempts to determine the best [BuilderState] to extend from, given a
/// [QuorumProposal].
///
/// In an ideal case the [BuilderState] whose recorded view and
/// vid_commitment that match the [QuorumProposal]'s justify_qc should be
/// the best fit. However, if there is no exact match, then the best fit
/// is the [BuilderState] with the largest view number smaller than the
/// [QuorumProposal]'s view number.
///
/// If there are no [BuilderState]s with a view number smaller than the
/// [QuorumProposal]'s view number, then the best fit is the only
/// [BuilderState] active.
async fn am_i_the_best_builder_state_to_extend(
&self,
quorum_proposal: Arc<Proposal<TYPES, QuorumProposal<TYPES>>>,
) -> bool {
let justify_qc = &quorum_proposal.data.justify_qc;
let parent_view_number = justify_qc.view_number;
let parent_commitment = quorum_proposal.data.block_header.payload_commitment();

if quorum_proposal.data.view_number == self.built_from_proposed_block.view_number {
// We are being asked to extend ourselves. This is likely a
// spawning initial condition or a test condition.
return false;
}

if parent_view_number == self.built_from_proposed_block.view_number
&& parent_commitment == self.built_from_proposed_block.vid_commitment
{
// This is us exactly, so we should be the best one to extend.
return true;
}

let desired_builder_state_id = BuilderStateId::<TYPES> {
parent_commitment,
parent_view: parent_view_number,
};

// Alright, we weren't the immediate best fit, let's see if there's a better fit out there.

let global_state_read_lock = self.global_state.read_arc().await;

if global_state_read_lock
.spawned_builder_states
.contains_key(&desired_builder_state_id)
{
// There is an exact match that isn't us, so we should not extend,
// and we should wait for that match to extend instead.
return false;
}

// There is no exact match that we are aware of. This ultimately means
// that we do not have visibility on the previously extended
// [BuilderState].
//
// It would be best if we could determine which was the best one to
// extend from, but there's a very real possibility that we just do
// not have any previous [BuilderState]s to extend from. This would
// mean that we should extend from the oldest [BuilderState] that we
// have locally in order to ensure that we don't drop any transactions
// that we believe may be pending with other calls that have been made.

let leaf = Leaf::<TYPES>::from_quorum_proposal(&quorum_proposal.data);
if self.built_from_proposed_block.leaf_commit == leaf.commit() {
// We are the oldest [BuilderState] that we have locally, so we
// should extend from ourselves.
return true;
}

// At this point we don't have any way to inspecting the other
// [BuilderState]'s `build_from_proposed_block` values to determine
// if another [BuilderState] will be extending from the same parent.
// So we'll do some other checks to see if we can make some safe
// assumptions.

let maximum_stored_view_number_smaller_than_quorum_proposal = global_state_read_lock
.spawned_builder_states
.keys()
.map(|builder_state_id| *builder_state_id.parent_view)
.filter(|view_number| view_number < &quorum_proposal.data.view_number)
.max();

if let Some(maximum_stored_view_number_smaller_than_quorum_proposal) =
maximum_stored_view_number_smaller_than_quorum_proposal
{
// If we are the maximum stored view number smaller than the quorum
// proposal's view number, then we are the best fit.
return maximum_stored_view_number_smaller_than_quorum_proposal
== self.built_from_proposed_block.view_number.u64();
}

// If there are no stored view numbers smaller than the quorum proposal
// Are we the only builder state?
if global_state_read_lock.spawned_builder_states.len() == 1 {
return true;
}

// This implies that there are only larger [BuilderState]s active than
// the one we are. This is weird, it implies that some sort of time
// travel has occurred view-wise. It is unclear what to do in this
// situation.

true
}
/// processing the DA proposal
#[tracing::instrument(skip_all, name = "process da proposal",
fields(builder_built_from_proposed_block = %self.built_from_proposed_block))]
Expand Down Expand Up @@ -215,9 +320,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
self.quorum_proposal_payload_commit_to_quorum_proposal
.remove(&(da_msg.builder_commitment.clone(), da_msg.view_number));

let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity());
self.clone_with_receiver(req_receiver)
.spawn_clone(da_msg, quorum_proposal, req_sender)
self.spawn_clone_that_extends_self(da_msg, quorum_proposal)
.await;
} else {
tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers");
Expand Down Expand Up @@ -246,43 +349,6 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
// To handle both cases, we can have the highest view number builder state running
// and only doing the insertion if and only if intended builder state for a particulat view is not present
// check the presence of quorum_proposal.data.view_number-1 in the spawned_builder_states list
if qc_msg.proposal.data.justify_qc.view_number != self.built_from_proposed_block.view_number
{
tracing::debug!(
"View number {:?} from justify qc does not match for builder {:?}",
qc_msg.proposal.data.justify_qc.view_number,
self.built_from_proposed_block
);
if !self
.global_state
.read_arc()
.await
.should_view_handle_other_proposals(
&self.built_from_proposed_block.view_number,
&qc_msg.proposal.data.justify_qc.view_number,
)
{
tracing::debug!(
"Builder {:?} is not currently bootstrapping.",
self.built_from_proposed_block
);
// if we have the matching da proposal, we now know we don't need to keep it.
self.da_proposal_payload_commit_to_da_proposal.remove(&(
qc_msg
.proposal
.data
.block_header
.builder_commitment()
.clone(),
qc_msg.proposal.data.view_number,
));
return;
}
tracing::debug!(
"Builder {:?} handling proposal as bootstrap.",
self.built_from_proposed_block
);
}
let quorum_proposal = &qc_msg.proposal;
let view_number = quorum_proposal.data.view_number;
let payload_builder_commitment = quorum_proposal.data.block_header.builder_commitment();
Expand Down Expand Up @@ -314,9 +380,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
view_number
);

let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity());
self.clone_with_receiver(req_receiver)
.spawn_clone(da_proposal_info, quorum_proposal.clone(), req_sender)
self.spawn_clone_that_extends_self(da_proposal_info, quorum_proposal.clone())
.await;
} else {
tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers");
Expand All @@ -329,6 +393,46 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
}
}

/// [spawn_a_clone_that_extends_self] is a helper function that is used by
/// both [process_da_proposal] and [process_quorum_proposal] to spawn a
/// new [BuilderState] that extends from the current [BuilderState].
///
/// This helper function also adds additional checks in order to ensure
/// that the [BuilderState] that is being spawned is the best fit for the
/// [QuorumProposal] that is being extended from.
async fn spawn_clone_that_extends_self(
&mut self,
da_proposal_info: Arc<DaProposalMessage<TYPES>>,
quorum_proposal: Arc<Proposal<TYPES, QuorumProposal<TYPES>>>,
) {
if !self
.am_i_the_best_builder_state_to_extend(quorum_proposal.clone())
.await
{
tracing::debug!(
"{} is not the best fit for forking, {}@{}, so ignoring the QC proposal, and leaving it to another BuilderState",
self.built_from_proposed_block,
quorum_proposal.data.block_header.payload_commitment(),
quorum_proposal.data.view_number.u64(),
);
return;
}

let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity());

tracing::debug!(
"extending BuilderState with a clone from {} with new proposal {}@{}",
self.built_from_proposed_block,
quorum_proposal.data.block_header.payload_commitment(),
quorum_proposal.data.view_number.u64()
);

// We literally fork ourselves
self.clone_with_receiver(req_receiver)
.spawn_clone(da_proposal_info, quorum_proposal.clone(), req_sender)
.await;
}

/// processing the decide event
#[tracing::instrument(skip_all, name = "process decide event",
fields(builder_built_from_proposed_block = %self.built_from_proposed_block))]
Expand Down Expand Up @@ -371,14 +475,39 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
quorum_proposal: Arc<Proposal<TYPES, QuorumProposal<TYPES>>>,
req_sender: BroadcastSender<MessageType<TYPES>>,
) {
self.built_from_proposed_block.view_number = quorum_proposal.data.view_number;
self.built_from_proposed_block.vid_commitment =
quorum_proposal.data.block_header.payload_commitment();
self.built_from_proposed_block.builder_commitment =
quorum_proposal.data.block_header.builder_commitment();
let leaf = Leaf::from_quorum_proposal(&quorum_proposal.data);

self.built_from_proposed_block.leaf_commit = leaf.commit();
// We replace our built_from_proposed_block with information from the
// quorum proposal. This is identifying the block that this specific
// instance of [BuilderState] is attempting to build for.
self.built_from_proposed_block = BuiltFromProposedBlock {
view_number: quorum_proposal.data.view_number,
vid_commitment: quorum_proposal.data.block_header.payload_commitment(),
leaf_commit: leaf.commit(),
builder_commitment: quorum_proposal.data.block_header.builder_commitment(),
};

let builder_state_id = BuilderStateId {
parent_commitment: self.built_from_proposed_block.vid_commitment,
parent_view: self.built_from_proposed_block.view_number,
};

{
// Let's ensure that we don't already have one of these BuilderStates
// running already.

let global_state_read_lock = self.global_state.read_arc().await;
if global_state_read_lock
.spawned_builder_states
.contains_key(&builder_state_id)
{
tracing::warn!(
"Aborting spawn_clone, builder state already exists in spawned_builder_states: {:?}",
builder_state_id
);
return;
}
}

for tx in da_proposal_info.txn_commitments.iter() {
self.txns_in_queue.remove(tx);
Expand All @@ -389,14 +518,13 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
self.tx_queue
.retain(|tx| self.txns_in_queue.contains(&tx.commit));

// register the spawned builder state to spawned_builder_states in the global state
self.global_state.write_arc().await.register_builder_state(
BuilderStateId {
parent_commitment: self.built_from_proposed_block.vid_commitment,
parent_view: self.built_from_proposed_block.view_number,
},
req_sender,
);
// register the spawned builder state to spawned_builder_states in the
// global state We register this new child within the global_state, so
// that it can be looked up via the [BuilderStateId] in the future.
self.global_state
.write_arc()
.await
.register_builder_state(builder_state_id, req_sender);

self.event_loop();
}
Expand Down
8 changes: 0 additions & 8 deletions crates/marketplace/src/testing/order_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,6 @@ async fn test_builder_order_chain_fork() {
.await
.unwrap();

if prev_proposed_transactions_branch_1 == prev_proposed_transactions_branch_2 {
assert_eq!(transactions_branch_1, transactions_branch_2, "if the previous proposed transactions are the same, then the new transactions should also be the same");
} else if prev_proposed_transactions_branch_2.map(|txs| txs.len()) == Some(0) {
assert_ne!(transactions_branch_1, transactions_branch_2, "if the previous proposed transactions differ and the previous proposed transactions is empty, then the new transactions should also differ");
} else {
assert_eq!(transactions_branch_1, transactions_branch_2, "if the previous proposed transactions differ, then the new transactions should be the same, as they should now have been repaired");
}

let transactions_branch_2 = fork_round_behavior.process_transactions(transactions_branch_2);
if transactions_branch_2 != transactions_branch_1 {
tracing::debug!("Fork Exist.")
Expand Down

0 comments on commit 2d5ed84

Please sign in to comment.