From 2d5ed8461f7352b7524664144dcde1bdf231b5c7 Mon Sep 17 00:00:00 2001 From: Theodore Schnepper Date: Mon, 9 Sep 2024 14:56:22 -0600 Subject: [PATCH] Fix BuilderState unecessary forking The current `BuilderState` extending behavior fails to anticipate enough of the various edge cases that determine whether the current `BuilderState` is the best, or one of the only `BuilderState`s that should be extended. Due to this behavior many more `BuilderState`s are spawned that really should not be spawned. This causes a lot of unecessary computation and memory usage that will never get cleaned up as the `Sender`s that are registered for them get overwritten by subsequent clones that have the same `BuilderStateId`. This effectively causes an async memory leak, async processing leak, and a potential fork-bomb that will effectively prevent the builder from being responsive. Additionally, the race condition that stems from multiple `BuilderState`s spawning a clone that extends them for the next `QuorumProposal` means that we often recieve an unexpected list of transactions as a result. This behavior can be best seen by the test marketplace::testing::order_test::test_builder_order_chain_fork`. To fix these issues a few changes have been maded. First a new method has been added called `spawn_clone_that_extends_self` that ensures a standard way of spawning a new `BuilderState` that extends the current `BuilderState`. Another method has been added called `am_i_the_best_builder_state_to_extend` that attempts to ensure that we are the best fit, or one of the only fits that can extend the current `BuilderState` based on the given `QuorumProposal`. Finally, as a safe guard, a check has been added when calling `spawn_clone` that ensures that the same `BuilderStateId` will not be registered more than once. IE, if there is already an async process spawned, there is no need to spawn another. Remove newly added asserts as they fail to represent the correct anticipated intermediate state --- crates/marketplace/src/builder_state.rs | 242 ++++++++++++++----- crates/marketplace/src/testing/order_test.rs | 8 - 2 files changed, 185 insertions(+), 65 deletions(-) diff --git a/crates/marketplace/src/builder_state.rs b/crates/marketplace/src/builder_state.rs index 9b1b5cb9..0e39af89 100644 --- a/crates/marketplace/src/builder_state.rs +++ b/crates/marketplace/src/builder_state.rs @@ -173,6 +173,111 @@ pub struct BuilderState { } impl BuilderState { + /// [am_i_the_best_builder_state_to_extend] is a utility method that + /// attempts to determine the best [BuilderState] to extend from, given a + /// [QuorumProposal]. + /// + /// In an ideal case the [BuilderState] whose recorded view and + /// vid_commitment that match the [QuorumProposal]'s justify_qc should be + /// the best fit. However, if there is no exact match, then the best fit + /// is the [BuilderState] with the largest view number smaller than the + /// [QuorumProposal]'s view number. + /// + /// If there are no [BuilderState]s with a view number smaller than the + /// [QuorumProposal]'s view number, then the best fit is the only + /// [BuilderState] active. + async fn am_i_the_best_builder_state_to_extend( + &self, + quorum_proposal: Arc>>, + ) -> bool { + let justify_qc = &quorum_proposal.data.justify_qc; + let parent_view_number = justify_qc.view_number; + let parent_commitment = quorum_proposal.data.block_header.payload_commitment(); + + if quorum_proposal.data.view_number == self.built_from_proposed_block.view_number { + // We are being asked to extend ourselves. This is likely a + // spawning initial condition or a test condition. + return false; + } + + if parent_view_number == self.built_from_proposed_block.view_number + && parent_commitment == self.built_from_proposed_block.vid_commitment + { + // This is us exactly, so we should be the best one to extend. + return true; + } + + let desired_builder_state_id = BuilderStateId:: { + parent_commitment, + parent_view: parent_view_number, + }; + + // Alright, we weren't the immediate best fit, let's see if there's a better fit out there. + + let global_state_read_lock = self.global_state.read_arc().await; + + if global_state_read_lock + .spawned_builder_states + .contains_key(&desired_builder_state_id) + { + // There is an exact match that isn't us, so we should not extend, + // and we should wait for that match to extend instead. + return false; + } + + // There is no exact match that we are aware of. This ultimately means + // that we do not have visibility on the previously extended + // [BuilderState]. + // + // It would be best if we could determine which was the best one to + // extend from, but there's a very real possibility that we just do + // not have any previous [BuilderState]s to extend from. This would + // mean that we should extend from the oldest [BuilderState] that we + // have locally in order to ensure that we don't drop any transactions + // that we believe may be pending with other calls that have been made. + + let leaf = Leaf::::from_quorum_proposal(&quorum_proposal.data); + if self.built_from_proposed_block.leaf_commit == leaf.commit() { + // We are the oldest [BuilderState] that we have locally, so we + // should extend from ourselves. + return true; + } + + // At this point we don't have any way to inspecting the other + // [BuilderState]'s `build_from_proposed_block` values to determine + // if another [BuilderState] will be extending from the same parent. + // So we'll do some other checks to see if we can make some safe + // assumptions. + + let maximum_stored_view_number_smaller_than_quorum_proposal = global_state_read_lock + .spawned_builder_states + .keys() + .map(|builder_state_id| *builder_state_id.parent_view) + .filter(|view_number| view_number < &quorum_proposal.data.view_number) + .max(); + + if let Some(maximum_stored_view_number_smaller_than_quorum_proposal) = + maximum_stored_view_number_smaller_than_quorum_proposal + { + // If we are the maximum stored view number smaller than the quorum + // proposal's view number, then we are the best fit. + return maximum_stored_view_number_smaller_than_quorum_proposal + == self.built_from_proposed_block.view_number.u64(); + } + + // If there are no stored view numbers smaller than the quorum proposal + // Are we the only builder state? + if global_state_read_lock.spawned_builder_states.len() == 1 { + return true; + } + + // This implies that there are only larger [BuilderState]s active than + // the one we are. This is weird, it implies that some sort of time + // travel has occurred view-wise. It is unclear what to do in this + // situation. + + true + } /// processing the DA proposal #[tracing::instrument(skip_all, name = "process da proposal", fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] @@ -215,9 +320,7 @@ impl BuilderState { self.quorum_proposal_payload_commit_to_quorum_proposal .remove(&(da_msg.builder_commitment.clone(), da_msg.view_number)); - let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity()); - self.clone_with_receiver(req_receiver) - .spawn_clone(da_msg, quorum_proposal, req_sender) + self.spawn_clone_that_extends_self(da_msg, quorum_proposal) .await; } else { tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers"); @@ -246,43 +349,6 @@ impl BuilderState { // To handle both cases, we can have the highest view number builder state running // and only doing the insertion if and only if intended builder state for a particulat view is not present // check the presence of quorum_proposal.data.view_number-1 in the spawned_builder_states list - if qc_msg.proposal.data.justify_qc.view_number != self.built_from_proposed_block.view_number - { - tracing::debug!( - "View number {:?} from justify qc does not match for builder {:?}", - qc_msg.proposal.data.justify_qc.view_number, - self.built_from_proposed_block - ); - if !self - .global_state - .read_arc() - .await - .should_view_handle_other_proposals( - &self.built_from_proposed_block.view_number, - &qc_msg.proposal.data.justify_qc.view_number, - ) - { - tracing::debug!( - "Builder {:?} is not currently bootstrapping.", - self.built_from_proposed_block - ); - // if we have the matching da proposal, we now know we don't need to keep it. - self.da_proposal_payload_commit_to_da_proposal.remove(&( - qc_msg - .proposal - .data - .block_header - .builder_commitment() - .clone(), - qc_msg.proposal.data.view_number, - )); - return; - } - tracing::debug!( - "Builder {:?} handling proposal as bootstrap.", - self.built_from_proposed_block - ); - } let quorum_proposal = &qc_msg.proposal; let view_number = quorum_proposal.data.view_number; let payload_builder_commitment = quorum_proposal.data.block_header.builder_commitment(); @@ -314,9 +380,7 @@ impl BuilderState { view_number ); - let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity()); - self.clone_with_receiver(req_receiver) - .spawn_clone(da_proposal_info, quorum_proposal.clone(), req_sender) + self.spawn_clone_that_extends_self(da_proposal_info, quorum_proposal.clone()) .await; } else { tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers"); @@ -329,6 +393,46 @@ impl BuilderState { } } + /// [spawn_a_clone_that_extends_self] is a helper function that is used by + /// both [process_da_proposal] and [process_quorum_proposal] to spawn a + /// new [BuilderState] that extends from the current [BuilderState]. + /// + /// This helper function also adds additional checks in order to ensure + /// that the [BuilderState] that is being spawned is the best fit for the + /// [QuorumProposal] that is being extended from. + async fn spawn_clone_that_extends_self( + &mut self, + da_proposal_info: Arc>, + quorum_proposal: Arc>>, + ) { + if !self + .am_i_the_best_builder_state_to_extend(quorum_proposal.clone()) + .await + { + tracing::debug!( + "{} is not the best fit for forking, {}@{}, so ignoring the QC proposal, and leaving it to another BuilderState", + self.built_from_proposed_block, + quorum_proposal.data.block_header.payload_commitment(), + quorum_proposal.data.view_number.u64(), + ); + return; + } + + let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity()); + + tracing::debug!( + "extending BuilderState with a clone from {} with new proposal {}@{}", + self.built_from_proposed_block, + quorum_proposal.data.block_header.payload_commitment(), + quorum_proposal.data.view_number.u64() + ); + + // We literally fork ourselves + self.clone_with_receiver(req_receiver) + .spawn_clone(da_proposal_info, quorum_proposal.clone(), req_sender) + .await; + } + /// processing the decide event #[tracing::instrument(skip_all, name = "process decide event", fields(builder_built_from_proposed_block = %self.built_from_proposed_block))] @@ -371,14 +475,39 @@ impl BuilderState { quorum_proposal: Arc>>, req_sender: BroadcastSender>, ) { - self.built_from_proposed_block.view_number = quorum_proposal.data.view_number; - self.built_from_proposed_block.vid_commitment = - quorum_proposal.data.block_header.payload_commitment(); - self.built_from_proposed_block.builder_commitment = - quorum_proposal.data.block_header.builder_commitment(); let leaf = Leaf::from_quorum_proposal(&quorum_proposal.data); - self.built_from_proposed_block.leaf_commit = leaf.commit(); + // We replace our built_from_proposed_block with information from the + // quorum proposal. This is identifying the block that this specific + // instance of [BuilderState] is attempting to build for. + self.built_from_proposed_block = BuiltFromProposedBlock { + view_number: quorum_proposal.data.view_number, + vid_commitment: quorum_proposal.data.block_header.payload_commitment(), + leaf_commit: leaf.commit(), + builder_commitment: quorum_proposal.data.block_header.builder_commitment(), + }; + + let builder_state_id = BuilderStateId { + parent_commitment: self.built_from_proposed_block.vid_commitment, + parent_view: self.built_from_proposed_block.view_number, + }; + + { + // Let's ensure that we don't already have one of these BuilderStates + // running already. + + let global_state_read_lock = self.global_state.read_arc().await; + if global_state_read_lock + .spawned_builder_states + .contains_key(&builder_state_id) + { + tracing::warn!( + "Aborting spawn_clone, builder state already exists in spawned_builder_states: {:?}", + builder_state_id + ); + return; + } + } for tx in da_proposal_info.txn_commitments.iter() { self.txns_in_queue.remove(tx); @@ -389,14 +518,13 @@ impl BuilderState { self.tx_queue .retain(|tx| self.txns_in_queue.contains(&tx.commit)); - // register the spawned builder state to spawned_builder_states in the global state - self.global_state.write_arc().await.register_builder_state( - BuilderStateId { - parent_commitment: self.built_from_proposed_block.vid_commitment, - parent_view: self.built_from_proposed_block.view_number, - }, - req_sender, - ); + // register the spawned builder state to spawned_builder_states in the + // global state We register this new child within the global_state, so + // that it can be looked up via the [BuilderStateId] in the future. + self.global_state + .write_arc() + .await + .register_builder_state(builder_state_id, req_sender); self.event_loop(); } diff --git a/crates/marketplace/src/testing/order_test.rs b/crates/marketplace/src/testing/order_test.rs index 99625ed8..25a59080 100644 --- a/crates/marketplace/src/testing/order_test.rs +++ b/crates/marketplace/src/testing/order_test.rs @@ -457,14 +457,6 @@ async fn test_builder_order_chain_fork() { .await .unwrap(); - if prev_proposed_transactions_branch_1 == prev_proposed_transactions_branch_2 { - assert_eq!(transactions_branch_1, transactions_branch_2, "if the previous proposed transactions are the same, then the new transactions should also be the same"); - } else if prev_proposed_transactions_branch_2.map(|txs| txs.len()) == Some(0) { - assert_ne!(transactions_branch_1, transactions_branch_2, "if the previous proposed transactions differ and the previous proposed transactions is empty, then the new transactions should also differ"); - } else { - assert_eq!(transactions_branch_1, transactions_branch_2, "if the previous proposed transactions differ, then the new transactions should be the same, as they should now have been repaired"); - } - let transactions_branch_2 = fork_round_behavior.process_transactions(transactions_branch_2); if transactions_branch_2 != transactions_branch_1 { tracing::debug!("Fork Exist.")