diff --git a/src/builder_state.rs b/src/builder_state.rs index f9722e09..4402a4f8 100644 --- a/src/builder_state.rs +++ b/src/builder_state.rs @@ -17,14 +17,19 @@ use crate::service::GlobalState; use async_broadcast::broadcast; use async_broadcast::Receiver as BroadcastReceiver; use async_broadcast::Sender as BroadcastSender; -use async_compatibility_layer::art::async_sleep; -use async_compatibility_layer::channel::{unbounded, UnboundedSender}; +use async_compatibility_layer::channel::{oneshot, unbounded, UnboundedSender}; +use async_compatibility_layer::{art::async_sleep, channel::OneShotSender}; use async_compatibility_layer::{art::async_spawn, channel::UnboundedReceiver}; use async_lock::RwLock; use async_trait::async_trait; use core::panic; use futures::StreamExt; +#[cfg(async_executor_impl = "async-std")] +use async_std::task::spawn_blocking; +#[cfg(async_executor_impl = "tokio")] +use tokio::task::{spawn_blocking, JoinHandle}; + use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Debug; use std::sync::Arc; @@ -74,6 +79,11 @@ pub struct RequestMessage { pub requested_view_number: u64, pub response_channel: UnboundedSender, } +pub enum TriggerStatus { + Start, + Exit, +} + /// Response Message to be put on the response channel #[derive(Debug)] pub struct BuildBlockInfo { @@ -82,6 +92,7 @@ pub struct BuildBlockInfo { pub offered_fee: u64, pub block_payload: TYPES::BlockPayload, pub metadata: <::BlockPayload as BlockPayload>::Metadata, + pub vid_trigger: OneShotSender, pub vid_receiver: UnboundedReceiver<(VidCommitment, VidPrecomputeData)>, } @@ -702,14 +713,20 @@ impl BuilderProgress for BuilderState { // stored while processing the DA Proposal let vid_num_nodes = self.total_nodes.get(); + let (trigger_send, trigger_recv) = oneshot(); + // spawn a task to calculate the VID commitment, and pass the handle to the global state // later global state can await on it before replying to the proposer let (unbounded_sender, unbounded_receiver) = unbounded(); #[allow(unused_must_use)] async_spawn(async move { - let (vidc, pre_compute_data) = - precompute_vid_commitment(&encoded_txns, vid_num_nodes); - unbounded_sender.send((vidc, pre_compute_data)).await; + if let Ok(TriggerStatus::Start) = trigger_recv.recv().await { + let (vidc, pre_compute_data) = spawn_blocking(move || { + precompute_vid_commitment(&encoded_txns, vid_num_nodes) + }) + .await; + unbounded_sender.send((vidc, pre_compute_data)).await; + } }); tracing::info!( @@ -725,6 +742,7 @@ impl BuilderProgress for BuilderState { offered_fee, block_payload: payload, metadata, + vid_trigger: trigger_send, vid_receiver: unbounded_receiver, }) } else { diff --git a/src/service.rs b/src/service.rs index 6e46e59e..9af3a8ce 100644 --- a/src/service.rs +++ b/src/service.rs @@ -25,14 +25,17 @@ use hotshot_types::{ use crate::builder_state::{ BuildBlockInfo, DaProposalMessage, DecideMessage, QCMessage, TransactionMessage, - TransactionSource, + TransactionSource, TriggerStatus, }; use crate::builder_state::{MessageType, RequestMessage, ResponseMessage}; use crate::WaitAndKeep; use anyhow::anyhow; use async_broadcast::Sender as BroadcastSender; pub use async_broadcast::{broadcast, RecvError, TryRecvError}; -use async_compatibility_layer::{art::async_timeout, channel::unbounded}; +use async_compatibility_layer::{ + art::async_timeout, + channel::{unbounded, OneShotSender}, +}; use async_lock::RwLock; use async_trait::async_trait; use committable::{Commitment, Committable}; @@ -57,6 +60,7 @@ use tide_disco::{method::ReadState, Url}; pub struct BlockInfo { pub block_payload: Types::BlockPayload, pub metadata: <::BlockPayload as BlockPayload>::Metadata, + pub vid_trigger: Arc>>>, pub vid_receiver: Arc>>, pub offered_fee: u64, } @@ -161,6 +165,7 @@ impl GlobalState { .or_insert_with(|| BlockInfo { block_payload: build_block_info.block_payload, metadata: build_block_info.metadata, + vid_trigger: Arc::new(RwLock::new(Some(build_block_info.vid_trigger))), vid_receiver: Arc::new(RwLock::new(WaitAndKeep::Wait( build_block_info.vid_receiver, ))), @@ -520,6 +525,9 @@ where .block_hash_to_block .get(&(block_hash.clone(), view_num)) { + if let Some(trigger_writer) = block_info.vid_trigger.write().await.take() { + trigger_writer.send(TriggerStatus::Start); + } // sign over the builder commitment, as the proposer can computer it based on provide block_payload // and the metata data let response_block_hash = block_info