Skip to content
This repository has been archived by the owner on Dec 3, 2024. It is now read-only.

Refactor read and write locks to minimize lock duration #251

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 102 additions & 32 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,16 +414,20 @@ where
let mut sent = false;
while Instant::now() < time_to_wait_for_matching_builder {
// try to broadcast the request to the correct builder state
if let Some(builder) = self
.global_state
.read_arc()
.await
.spawned_builder_states
.get(&state_id)
{
let found_builder_state = {
let global_state_read_lock_guard = self.global_state.read_arc().await;

global_state_read_lock_guard
.spawned_builder_states
.get(&state_id)
.cloned()
};

if let Some(builder) = found_builder_state {
tracing::info!(
"Got matching BlockBuilder for {state_id}, sending get_available_blocks request",
);

if let Err(e) = builder
.broadcast(MessageType::RequestMessage(req_msg.clone()))
.await
Expand Down Expand Up @@ -551,10 +555,31 @@ where
}
let (pub_key, sign_key) = self.builder_keys.clone();

if let Some(block_info) = self.global_state.write_arc().await.blocks.get(&block_id) {
let extracted_block_info_option = {
// We store this write lock guard separately to make it explicit
// that this will end up holding a lock for the duration of this
// closure.
//
// Additionally, we clone the properties from the block_info that
// end up being cloned if found anyway. Since we know this already
// we can perform the clone here to avoid holding the lock for
// longer than needed.
let mut global_state_write_lock_guard = self.global_state.write_arc().await;
let block_info_some = global_state_write_lock_guard.blocks.get(&block_id);

block_info_some.map(|block_info| {
(
block_info.vid_trigger.clone(),
block_info.block_payload.clone(),
block_info.metadata.clone(),
)
})
};

if let Some((vid_trigger, block_payload, metadata)) = extracted_block_info_option {
tracing::info!("Trying sending vid trigger info for {block_id}",);

if let Some(trigger_writer) = block_info.vid_trigger.write().await.take() {
if let Some(trigger_writer) = vid_trigger.write().await.take() {
shenkeyao marked this conversation as resolved.
Show resolved Hide resolved
tracing::info!("Sending vid trigger for {block_id}");
trigger_writer.send(TriggerStatus::Start);
tracing::info!("Sent vid trigger for {block_id}");
Expand All @@ -563,9 +588,7 @@ where

// sign over the builder commitment, as the proposer can computer it based on provide block_payload
// and the metata data
let response_block_hash = block_info
.block_payload
.builder_commitment(&block_info.metadata);
let response_block_hash = block_payload.builder_commitment(&metadata);
let signature_over_builder_commitment =
<Types as NodeType>::BuilderSignatureKey::sign_builder_message(
&sign_key,
Expand All @@ -576,8 +599,8 @@ where
})?;

let block_data = AvailableBlockData::<Types> {
block_payload: block_info.block_payload.clone(),
metadata: block_info.metadata.clone(),
block_payload: block_payload.clone(),
metadata: metadata.clone(),
signature: signature_over_builder_commitment,
sender: pub_key.clone(),
};
Expand Down Expand Up @@ -612,25 +635,48 @@ where
});
}
let (pub_key, sign_key) = self.builder_keys.clone();
if let Some(block_info) = self.global_state.write_arc().await.blocks.get(&id) {

let extracted_block_info_option = {
// We store this write lock guard separately to make it explicit
// that this will end up holding a lock for the duration of this
// closure.
//
// Additionally, we clone the properties from the block_info that
// end up being cloned if found anyway. Since we know this already
// we can perform the clone here to avoid holding the lock for
// longer than needed.
let mut global_state_write_lock_guard = self.global_state.write_arc().await;
let block_info_some = global_state_write_lock_guard.blocks.get(&id);

block_info_some.map(|block_info| {
(
block_info.vid_receiver.clone(),
block_info.metadata.clone(),
block_info.offered_fee,
block_info.truncated,
)
})
};

if let Some((vid_receiver, metadata, offered_fee, truncated)) = extracted_block_info_option
{
tracing::info!("Waiting for vid commitment for block {id}");

let timeout_after = Instant::now() + self.max_api_waiting_time;
let check_duration = self.max_api_waiting_time / 10;

let response_received = loop {
match async_timeout(check_duration, block_info.vid_receiver.write().await.get())
.await
{
match async_timeout(check_duration, vid_receiver.write().await.get()).await {
Err(_toe) => {
if Instant::now() >= timeout_after {
tracing::warn!("Couldn't get vid commitment in time for block {id}",);
{
// we can't keep up with this block size, reduce max block size
let mut global_state = self.global_state.write_arc().await;
global_state.max_block_size = std::cmp::min(
global_state.max_block_size
- global_state
let mut global_state_write_lock_guard =
self.global_state.write_arc().await;
global_state_write_lock_guard.max_block_size = std::cmp::min(
global_state_write_lock_guard.max_block_size
- global_state_write_lock_guard
.max_block_size
.div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR),
MAX_BLOCK_SIZE_FLOOR,
Expand Down Expand Up @@ -659,14 +705,15 @@ where

// This block was truncated, but we got VID in time with margin left.
// Maybe we can handle bigger blocks?
if block_info.truncated
if truncated
&& timeout_after.duration_since(Instant::now())
> self.max_api_waiting_time / VID_RESPONSE_TARGET_MARGIN_DIVISOR
{
// Increase max block size
let mut global_state = self.global_state.write_arc().await;
global_state.max_block_size = global_state.max_block_size
+ global_state
let mut global_state_write_lock_guard = self.global_state.write_arc().await;
global_state_write_lock_guard.max_block_size = global_state_write_lock_guard
.max_block_size
+ global_state_write_lock_guard
.max_block_size
.div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR);
}
Expand All @@ -689,8 +736,8 @@ where

let signature_over_fee_info = Types::BuilderSignatureKey::sign_fee(
&sign_key,
block_info.offered_fee,
&block_info.metadata,
offered_fee,
&metadata,
&vid_commitment,
)
.map_err(|e| BuildError::Error {
Expand Down Expand Up @@ -853,7 +900,12 @@ pub async fn run_non_permissioned_standalone_builder_service<Types: NodeType, V:
let (mut subscribed_events, mut membership) =
connected.context("Failed to connect to events service")?;

let tx_sender = global_state.read_arc().await.tx_sender.clone();
let tx_sender = {
// This closure is likely unnecessary, but we want to play it safe
// with our RWLocks.
let global_state_read_lock_guard = global_state.read_arc().await;
global_state_read_lock_guard.tx_sender.clone()
};

loop {
let event = subscribed_events.next().await;
Expand All @@ -866,7 +918,13 @@ pub async fn run_non_permissioned_standalone_builder_service<Types: NodeType, V:
}
// tx event
EventType::Transactions { transactions } => {
let max_block_size = global_state.read_arc().await.max_block_size;
let max_block_size = {
// This closure is likely unnecessary, but we want
// to play it safe with our RWLocks.
let global_state_read_lock_guard = global_state.read_arc().await;
global_state_read_lock_guard.max_block_size
};

handle_received_txns(
&tx_sender,
transactions,
Expand Down Expand Up @@ -954,7 +1012,13 @@ pub async fn run_permissioned_standalone_builder_service<
global_state: Arc<RwLock<GlobalState<Types>>>,
) {
let mut event_stream = hotshot_handle.event_stream();
let tx_sender = global_state.read_arc().await.tx_sender.clone();
let tx_sender = {
// This closure is likely unnecessary, but we want to play it safe
// with our RWLocks.
let global_state_read_lock_guard = global_state.read_arc().await;
global_state_read_lock_guard.tx_sender.clone()
};

loop {
tracing::debug!("Waiting for events from HotShot");
match event_stream.next().await {
Expand All @@ -969,7 +1033,13 @@ pub async fn run_permissioned_standalone_builder_service<
}
// tx event
EventType::Transactions { transactions } => {
let max_block_size = global_state.read_arc().await.max_block_size;
let max_block_size = {
// This closure is likely unnecessary, but we want
// to play it safe with our RWLocks.
let global_state_read_lock_guard = global_state.read_arc().await;
global_state_read_lock_guard.max_block_size
};

handle_received_txns(
&tx_sender,
transactions,
Expand Down
Loading