From 657c583b6af77cbe4f429c95ff7b345452403700 Mon Sep 17 00:00:00 2001 From: Theodore Schnepper Date: Fri, 23 Aug 2024 06:24:14 -0600 Subject: [PATCH] Refactor read and write locks to minimize lock duration Addresses issue #249 There are some lock acquisition behavior that exists in `service.rs` which leads to non-obvious lock retention lifetimes. The syntactic sugar pattern matching that rust allows ends up hiding the lifetime acquisition of references to a locked global state. These locks end up getting held across async boundaries unnecessarily which will only lead to lock contention when it is not needed. Additionally, in at least one case, the lock acquisition lifetimes will result in a deadlock as a `write_arc` is being attempted when the write lock is already being held in a parent closure: `service.rs:630`. In order to improve the lock contention and to avoid the deadlock potential the lock guards have been made explicit and placed behind a closure ensuring that it will be cleaned up as soon as the information needed from it is accessed. Any information outside of the lock is cloned in order to ensure no further lock is required. The data that has been cloned / copied out of the lock fields was already being cloned / copied regardless. The end result should be an improved representation of the lock guard's lifetimes, and clarity of the closures that hold a lock guard. --- src/service.rs | 134 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 102 insertions(+), 32 deletions(-) diff --git a/src/service.rs b/src/service.rs index e968f8b..96c5da3 100644 --- a/src/service.rs +++ b/src/service.rs @@ -414,16 +414,20 @@ where let mut sent = false; while Instant::now() < time_to_wait_for_matching_builder { // try to broadcast the request to the correct builder state - if let Some(builder) = self - .global_state - .read_arc() - .await - .spawned_builder_states - .get(&state_id) - { + let found_builder_state = { + let global_state_read_lock_guard = self.global_state.read_arc().await; + + global_state_read_lock_guard + .spawned_builder_states + .get(&state_id) + .cloned() + }; + + if let Some(builder) = found_builder_state { tracing::info!( "Got matching BlockBuilder for {state_id}, sending get_available_blocks request", ); + if let Err(e) = builder .broadcast(MessageType::RequestMessage(req_msg.clone())) .await @@ -551,10 +555,31 @@ where } let (pub_key, sign_key) = self.builder_keys.clone(); - if let Some(block_info) = self.global_state.write_arc().await.blocks.get(&block_id) { + let extracted_block_info_option = { + // We store this write lock guard separately to make it explicit + // that this will end up holding a lock for the duration of this + // closure. + // + // Additionally, we clone the properties from the block_info that + // end up being cloned if found anyway. Since we know this already + // we can perform the clone here to avoid holding the lock for + // longer than needed. + let mut global_state_write_lock_guard = self.global_state.write_arc().await; + let block_info_some = global_state_write_lock_guard.blocks.get(&block_id); + + block_info_some.map(|block_info| { + ( + block_info.vid_trigger.clone(), + block_info.block_payload.clone(), + block_info.metadata.clone(), + ) + }) + }; + + if let Some((vid_trigger, block_payload, metadata)) = extracted_block_info_option { tracing::info!("Trying sending vid trigger info for {block_id}",); - if let Some(trigger_writer) = block_info.vid_trigger.write().await.take() { + if let Some(trigger_writer) = vid_trigger.write().await.take() { tracing::info!("Sending vid trigger for {block_id}"); trigger_writer.send(TriggerStatus::Start); tracing::info!("Sent vid trigger for {block_id}"); @@ -563,9 +588,7 @@ where // sign over the builder commitment, as the proposer can computer it based on provide block_payload // and the metata data - let response_block_hash = block_info - .block_payload - .builder_commitment(&block_info.metadata); + let response_block_hash = block_payload.builder_commitment(&metadata); let signature_over_builder_commitment = ::BuilderSignatureKey::sign_builder_message( &sign_key, @@ -576,8 +599,8 @@ where })?; let block_data = AvailableBlockData:: { - block_payload: block_info.block_payload.clone(), - metadata: block_info.metadata.clone(), + block_payload: block_payload.clone(), + metadata: metadata.clone(), signature: signature_over_builder_commitment, sender: pub_key.clone(), }; @@ -612,25 +635,48 @@ where }); } let (pub_key, sign_key) = self.builder_keys.clone(); - if let Some(block_info) = self.global_state.write_arc().await.blocks.get(&id) { + + let extracted_block_info_option = { + // We store this write lock guard separately to make it explicit + // that this will end up holding a lock for the duration of this + // closure. + // + // Additionally, we clone the properties from the block_info that + // end up being cloned if found anyway. Since we know this already + // we can perform the clone here to avoid holding the lock for + // longer than needed. + let mut global_state_write_lock_guard = self.global_state.write_arc().await; + let block_info_some = global_state_write_lock_guard.blocks.get(&id); + + block_info_some.map(|block_info| { + ( + block_info.vid_receiver.clone(), + block_info.metadata.clone(), + block_info.offered_fee, + block_info.truncated, + ) + }) + }; + + if let Some((vid_receiver, metadata, offered_fee, truncated)) = extracted_block_info_option + { tracing::info!("Waiting for vid commitment for block {id}"); let timeout_after = Instant::now() + self.max_api_waiting_time; let check_duration = self.max_api_waiting_time / 10; let response_received = loop { - match async_timeout(check_duration, block_info.vid_receiver.write().await.get()) - .await - { + match async_timeout(check_duration, vid_receiver.write().await.get()).await { Err(_toe) => { if Instant::now() >= timeout_after { tracing::warn!("Couldn't get vid commitment in time for block {id}",); { // we can't keep up with this block size, reduce max block size - let mut global_state = self.global_state.write_arc().await; - global_state.max_block_size = std::cmp::min( - global_state.max_block_size - - global_state + let mut global_state_write_lock_guard = + self.global_state.write_arc().await; + global_state_write_lock_guard.max_block_size = std::cmp::min( + global_state_write_lock_guard.max_block_size + - global_state_write_lock_guard .max_block_size .div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR), MAX_BLOCK_SIZE_FLOOR, @@ -659,14 +705,15 @@ where // This block was truncated, but we got VID in time with margin left. // Maybe we can handle bigger blocks? - if block_info.truncated + if truncated && timeout_after.duration_since(Instant::now()) > self.max_api_waiting_time / VID_RESPONSE_TARGET_MARGIN_DIVISOR { // Increase max block size - let mut global_state = self.global_state.write_arc().await; - global_state.max_block_size = global_state.max_block_size - + global_state + let mut global_state_write_lock_guard = self.global_state.write_arc().await; + global_state_write_lock_guard.max_block_size = global_state_write_lock_guard + .max_block_size + + global_state_write_lock_guard .max_block_size .div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR); } @@ -689,8 +736,8 @@ where let signature_over_fee_info = Types::BuilderSignatureKey::sign_fee( &sign_key, - block_info.offered_fee, - &block_info.metadata, + offered_fee, + &metadata, &vid_commitment, ) .map_err(|e| BuildError::Error { @@ -853,7 +900,12 @@ pub async fn run_non_permissioned_standalone_builder_service { - let max_block_size = global_state.read_arc().await.max_block_size; + let max_block_size = { + // This closure is likely unnecessary, but we want + // to play it safe with our RWLocks. + let global_state_read_lock_guard = global_state.read_arc().await; + global_state_read_lock_guard.max_block_size + }; + handle_received_txns( &tx_sender, transactions, @@ -954,7 +1012,13 @@ pub async fn run_permissioned_standalone_builder_service< global_state: Arc>>, ) { let mut event_stream = hotshot_handle.event_stream(); - let tx_sender = global_state.read_arc().await.tx_sender.clone(); + let tx_sender = { + // This closure is likely unnecessary, but we want to play it safe + // with our RWLocks. + let global_state_read_lock_guard = global_state.read_arc().await; + global_state_read_lock_guard.tx_sender.clone() + }; + loop { tracing::debug!("Waiting for events from HotShot"); match event_stream.next().await { @@ -969,7 +1033,13 @@ pub async fn run_permissioned_standalone_builder_service< } // tx event EventType::Transactions { transactions } => { - let max_block_size = global_state.read_arc().await.max_block_size; + let max_block_size = { + // This closure is likely unnecessary, but we want + // to play it safe with our RWLocks. + let global_state_read_lock_guard = global_state.read_arc().await; + global_state_read_lock_guard.max_block_size + }; + handle_received_txns( &tx_sender, transactions,