Skip to content
This repository has been archived by the owner on Dec 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #251 from EspressoSystems/ts/fix/write-lock-async
Browse files Browse the repository at this point in the history
Refactor read and write locks to minimize lock duration
  • Loading branch information
Ayiga authored Aug 23, 2024
2 parents b9316d9 + 657c583 commit 39c87e5
Showing 1 changed file with 102 additions and 32 deletions.
134 changes: 102 additions & 32 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,16 +414,20 @@ where
let mut sent = false;
while Instant::now() < time_to_wait_for_matching_builder {
// try to broadcast the request to the correct builder state
if let Some(builder) = self
.global_state
.read_arc()
.await
.spawned_builder_states
.get(&state_id)
{
let found_builder_state = {
let global_state_read_lock_guard = self.global_state.read_arc().await;

global_state_read_lock_guard
.spawned_builder_states
.get(&state_id)
.cloned()
};

if let Some(builder) = found_builder_state {
tracing::info!(
"Got matching BlockBuilder for {state_id}, sending get_available_blocks request",
);

if let Err(e) = builder
.broadcast(MessageType::RequestMessage(req_msg.clone()))
.await
Expand Down Expand Up @@ -551,10 +555,31 @@ where
}
let (pub_key, sign_key) = self.builder_keys.clone();

if let Some(block_info) = self.global_state.write_arc().await.blocks.get(&block_id) {
let extracted_block_info_option = {
// We store this write lock guard separately to make it explicit
// that this will end up holding a lock for the duration of this
// closure.
//
// Additionally, we clone the properties from the block_info that
// end up being cloned if found anyway. Since we know this already
// we can perform the clone here to avoid holding the lock for
// longer than needed.
let mut global_state_write_lock_guard = self.global_state.write_arc().await;
let block_info_some = global_state_write_lock_guard.blocks.get(&block_id);

block_info_some.map(|block_info| {
(
block_info.vid_trigger.clone(),
block_info.block_payload.clone(),
block_info.metadata.clone(),
)
})
};

if let Some((vid_trigger, block_payload, metadata)) = extracted_block_info_option {
tracing::info!("Trying sending vid trigger info for {block_id}",);

if let Some(trigger_writer) = block_info.vid_trigger.write().await.take() {
if let Some(trigger_writer) = vid_trigger.write().await.take() {
tracing::info!("Sending vid trigger for {block_id}");
trigger_writer.send(TriggerStatus::Start);
tracing::info!("Sent vid trigger for {block_id}");
Expand All @@ -563,9 +588,7 @@ where

// sign over the builder commitment, as the proposer can computer it based on provide block_payload
// and the metata data
let response_block_hash = block_info
.block_payload
.builder_commitment(&block_info.metadata);
let response_block_hash = block_payload.builder_commitment(&metadata);
let signature_over_builder_commitment =
<Types as NodeType>::BuilderSignatureKey::sign_builder_message(
&sign_key,
Expand All @@ -576,8 +599,8 @@ where
})?;

let block_data = AvailableBlockData::<Types> {
block_payload: block_info.block_payload.clone(),
metadata: block_info.metadata.clone(),
block_payload: block_payload.clone(),
metadata: metadata.clone(),
signature: signature_over_builder_commitment,
sender: pub_key.clone(),
};
Expand Down Expand Up @@ -612,25 +635,48 @@ where
});
}
let (pub_key, sign_key) = self.builder_keys.clone();
if let Some(block_info) = self.global_state.write_arc().await.blocks.get(&id) {

let extracted_block_info_option = {
// We store this write lock guard separately to make it explicit
// that this will end up holding a lock for the duration of this
// closure.
//
// Additionally, we clone the properties from the block_info that
// end up being cloned if found anyway. Since we know this already
// we can perform the clone here to avoid holding the lock for
// longer than needed.
let mut global_state_write_lock_guard = self.global_state.write_arc().await;
let block_info_some = global_state_write_lock_guard.blocks.get(&id);

block_info_some.map(|block_info| {
(
block_info.vid_receiver.clone(),
block_info.metadata.clone(),
block_info.offered_fee,
block_info.truncated,
)
})
};

if let Some((vid_receiver, metadata, offered_fee, truncated)) = extracted_block_info_option
{
tracing::info!("Waiting for vid commitment for block {id}");

let timeout_after = Instant::now() + self.max_api_waiting_time;
let check_duration = self.max_api_waiting_time / 10;

let response_received = loop {
match async_timeout(check_duration, block_info.vid_receiver.write().await.get())
.await
{
match async_timeout(check_duration, vid_receiver.write().await.get()).await {
Err(_toe) => {
if Instant::now() >= timeout_after {
tracing::warn!("Couldn't get vid commitment in time for block {id}",);
{
// we can't keep up with this block size, reduce max block size
let mut global_state = self.global_state.write_arc().await;
global_state.max_block_size = std::cmp::min(
global_state.max_block_size
- global_state
let mut global_state_write_lock_guard =
self.global_state.write_arc().await;
global_state_write_lock_guard.max_block_size = std::cmp::min(
global_state_write_lock_guard.max_block_size
- global_state_write_lock_guard
.max_block_size
.div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR),
MAX_BLOCK_SIZE_FLOOR,
Expand Down Expand Up @@ -659,14 +705,15 @@ where

// This block was truncated, but we got VID in time with margin left.
// Maybe we can handle bigger blocks?
if block_info.truncated
if truncated
&& timeout_after.duration_since(Instant::now())
> self.max_api_waiting_time / VID_RESPONSE_TARGET_MARGIN_DIVISOR
{
// Increase max block size
let mut global_state = self.global_state.write_arc().await;
global_state.max_block_size = global_state.max_block_size
+ global_state
let mut global_state_write_lock_guard = self.global_state.write_arc().await;
global_state_write_lock_guard.max_block_size = global_state_write_lock_guard
.max_block_size
+ global_state_write_lock_guard
.max_block_size
.div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR);
}
Expand All @@ -689,8 +736,8 @@ where

let signature_over_fee_info = Types::BuilderSignatureKey::sign_fee(
&sign_key,
block_info.offered_fee,
&block_info.metadata,
offered_fee,
&metadata,
&vid_commitment,
)
.map_err(|e| BuildError::Error {
Expand Down Expand Up @@ -853,7 +900,12 @@ pub async fn run_non_permissioned_standalone_builder_service<Types: NodeType, V:
let (mut subscribed_events, mut membership) =
connected.context("Failed to connect to events service")?;

let tx_sender = global_state.read_arc().await.tx_sender.clone();
let tx_sender = {
// This closure is likely unnecessary, but we want to play it safe
// with our RWLocks.
let global_state_read_lock_guard = global_state.read_arc().await;
global_state_read_lock_guard.tx_sender.clone()
};

loop {
let event = subscribed_events.next().await;
Expand All @@ -866,7 +918,13 @@ pub async fn run_non_permissioned_standalone_builder_service<Types: NodeType, V:
}
// tx event
EventType::Transactions { transactions } => {
let max_block_size = global_state.read_arc().await.max_block_size;
let max_block_size = {
// This closure is likely unnecessary, but we want
// to play it safe with our RWLocks.
let global_state_read_lock_guard = global_state.read_arc().await;
global_state_read_lock_guard.max_block_size
};

handle_received_txns(
&tx_sender,
transactions,
Expand Down Expand Up @@ -954,7 +1012,13 @@ pub async fn run_permissioned_standalone_builder_service<
global_state: Arc<RwLock<GlobalState<Types>>>,
) {
let mut event_stream = hotshot_handle.event_stream();
let tx_sender = global_state.read_arc().await.tx_sender.clone();
let tx_sender = {
// This closure is likely unnecessary, but we want to play it safe
// with our RWLocks.
let global_state_read_lock_guard = global_state.read_arc().await;
global_state_read_lock_guard.tx_sender.clone()
};

loop {
tracing::debug!("Waiting for events from HotShot");
match event_stream.next().await {
Expand All @@ -969,7 +1033,13 @@ pub async fn run_permissioned_standalone_builder_service<
}
// tx event
EventType::Transactions { transactions } => {
let max_block_size = global_state.read_arc().await.max_block_size;
let max_block_size = {
// This closure is likely unnecessary, but we want
// to play it safe with our RWLocks.
let global_state_read_lock_guard = global_state.read_arc().await;
global_state_read_lock_guard.max_block_size
};

handle_received_txns(
&tx_sender,
transactions,
Expand Down

0 comments on commit 39c87e5

Please sign in to comment.