Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add VID to webserver #1954

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 148 additions & 4 deletions crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ struct Inner<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> {
/// Task map for quorum votes.
vote_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for vid votes
vid_vote_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for VID certs
vid_cert_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for VID disperse data
vid_disperse_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for DACs.
dac_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Expand Down Expand Up @@ -169,7 +178,7 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
MessagePurpose::DAC => config::get_da_certificate_route(view_number),
MessagePurpose::VidDisperse => config::get_vid_disperse_route(view_number), // like `Proposal`
MessagePurpose::VidVote => config::get_vid_vote_route(view_number, vote_index), // like `Vote`
MessagePurpose::VidCert => config::get_vid_cert_route(view_number), // like `DAC`
MessagePurpose::VidCert => config::get_vid_certificate_route(view_number), // like `DAC`
};

if message_purpose == MessagePurpose::Data {
Expand Down Expand Up @@ -351,8 +360,10 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
// TODO ED Should add extra error checking here to make sure we are intending to cancel a task
ConsensusIntentEvent::CancelPollForVotes(event_view)
| ConsensusIntentEvent::CancelPollForProposal(event_view)
| ConsensusIntentEvent::CancelPollForVIDVotes(event_view)
| ConsensusIntentEvent::CancelPollForVIDCertificate(event_view)
| ConsensusIntentEvent::CancelPollForDAC(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncCertificate(event_view)
| ConsensusIntentEvent::CancelPollForVIDDisperse(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncVotes(event_view) => {
if view_number == event_view {
debug!("Shutting down polling task for view {}", event_view);
Expand All @@ -371,7 +382,9 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
}
}

_ => unimplemented!(),
_ => {
unimplemented!()
}
}
}
// Nothing on receiving channel
Expand Down Expand Up @@ -528,6 +541,9 @@ impl<
tx_index: Arc::default(),
proposal_task_map: Arc::default(),
vote_task_map: Arc::default(),
vid_vote_task_map: Arc::default(),
vid_cert_task_map: Arc::default(),
vid_disperse_task_map: Arc::default(),
dac_task_map: Arc::default(),
view_sync_cert_task_map: Arc::default(),
view_sync_vote_task_map: Arc::default(),
Expand Down Expand Up @@ -562,7 +578,7 @@ impl<
MessagePurpose::DAC => config::post_da_certificate_route(*view_number),
MessagePurpose::VidVote => config::post_vid_vote_route(*view_number),
MessagePurpose::VidDisperse => config::post_vid_disperse_route(*view_number),
MessagePurpose::VidCert => config::post_vid_cert_route(*view_number),
MessagePurpose::VidCert => config::post_vid_certificate_route(*view_number),
};

let network_msg: SendMsg<M> = SendMsg {
Expand Down Expand Up @@ -822,6 +838,46 @@ impl<
.await;
}
}
ConsensusIntentEvent::PollForVIDDisperse(view_number) => {
// Check if we already have a task for this (we shouldn't)

// Going to do a write lock since mostly likely we will need it - can change to upgradable read in the future
let mut task_map = self.inner.vid_disperse_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);

async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::VidDisperse, view_number)
.await
{
error!(
"Background receive VID disperse polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&view_number.wrapping_sub(2)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDDisperse(
view_number.wrapping_sub(2),
))
.await;
}
}
ConsensusIntentEvent::PollForCurrentProposal => {
// create new task
let (_, receiver) = unbounded();
Expand Down Expand Up @@ -878,6 +934,44 @@ impl<
.await;
}
}
ConsensusIntentEvent::PollForVIDVotes(view_number) => {
let mut task_map = self.inner.vid_vote_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);
async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::VidVote, view_number)
.await
{
error!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
// TODO ED This won't work for vote collection, last task is more than 2 view ago depending on size of network, will need to rely on cancel task from consensus
if let Some((_, sender)) = task_map.remove_entry(&(view_number.wrapping_sub(2))) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDVotes(
view_number.wrapping_sub(2),
))
.await;
}
}

ConsensusIntentEvent::PollForDAC(view_number) => {
let mut task_map = self.inner.dac_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down Expand Up @@ -914,6 +1008,43 @@ impl<
.await;
}
}

ConsensusIntentEvent::PollForVIDCertificate(view_number) => {
let mut task_map = self.inner.vid_cert_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);
async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::VidCert, view_number)
.await
{
error!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&(view_number.wrapping_sub(2))) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDCertificate(
view_number.wrapping_sub(2),
))
.await;
}
}
ConsensusIntentEvent::CancelPollForVotes(view_number) => {
let mut task_map = self.inner.vote_task_map.write().await;

Expand All @@ -927,6 +1058,19 @@ impl<
}
}

ConsensusIntentEvent::CancelPollForVIDVotes(view_number) => {
let mut task_map = self.inner.vid_vote_task_map.write().await;

if let Some((_, sender)) = task_map.remove_entry(&(view_number)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDVotes(view_number))
.await;
}
}

ConsensusIntentEvent::PollForViewSyncCertificate(view_number) => {
let mut task_map = self.inner.view_sync_cert_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down
7 changes: 2 additions & 5 deletions crates/task-impls/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,11 +1251,9 @@ where
let view = cert.view_number;
self.vid_certs.insert(view, cert);

// TODO Make sure we aren't voting for an arbitrarily old round for no reason
if self.vote_if_able().await {
self.current_proposal = None;
}
// RM TODO: VOTING
}

HotShotEvent::ViewChange(new_view) => {
debug!("View Change event for view {}", *new_view);

Expand Down Expand Up @@ -1541,7 +1539,6 @@ pub fn consensus_event_filter<TYPES: NodeType, I: NodeImplementation<TYPES>>(
| HotShotEvent::QuorumVoteRecv(_)
| HotShotEvent::QCFormed(_)
| HotShotEvent::DACRecv(_)
| HotShotEvent::VidCertRecv(_)
| HotShotEvent::ViewChange(_)
| HotShotEvent::SendDABlockData(_)
| HotShotEvent::Timeout(_)
Expand Down
6 changes: 1 addition & 5 deletions crates/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,7 @@ where
}
}
HotShotEvent::DAVoteRecv(vote) => {
// warn!(
// "DA vote recv, Main Task {:?}, key: {:?}",
// vote.current_view,
// self.committee_exchange.public_key()
// );
debug!("DA vote recv, Main Task {:?}", vote.current_view,);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

// Check if we are the leader and the vote is from the sender.
let view = vote.current_view;
if !self.committee_exchange.is_leader(view) {
Expand Down
1 change: 1 addition & 0 deletions crates/task-impls/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ where

// TODO (Keyao) Determine and update where to publish VidDisperseSend.
// <https://github.com/EspressoSystems/HotShot/issues/1817>
debug!("publishing VID disperse for view {}", *view + 1);
self.event_stream
.publish(HotShotEvent::VidDisperseSend(
Proposal {
Expand Down
58 changes: 49 additions & 9 deletions crates/task-impls/src/vid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use hotshot_task::{
task::{FilterEvent, HandleEvent, HotShotTaskCompleted, HotShotTaskTypes, TS},
task_impls::{HSTWithEvent, TaskBuilder},
};
use hotshot_types::vote::VoteType;
use hotshot_types::traits::network::CommunicationChannel;
use hotshot_types::traits::network::ConsensusIntentEvent;
use hotshot_types::{
certificate::VIDCertificate, traits::election::SignedCertificate, vote::VIDVoteAccumulator,
};
Expand Down Expand Up @@ -141,13 +142,16 @@ where
{
match event {
HotShotEvent::VidVoteRecv(vote) => {
// TODO copy-pasted from DAVoteRecv https://github.com/EspressoSystems/HotShot/issues/1690
debug!("VID vote recv, collection task {:?}", vote.current_view);
// panic!("Vote handle received VID vote for view {}", *vote.current_view);

debug!("VID vote recv, collection task {:?}", vote.get_view());
// panic!("Vote handle received DA vote for view {}", *vote.current_view);
// For the case where we receive votes after we've made a certificate
if state.accumulator.is_right() {
debug!("VID accumulator finished view: {:?}", state.cur_view);
return (None, state);
}

let accumulator = state.accumulator.left().unwrap();

match state
.vid_exchange
.accumulate_vote(accumulator, &vote, &vote.block_commitment)
Expand All @@ -167,13 +171,19 @@ where
.await;

state.accumulator = Right(vid_cert.clone());
state
.vid_exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::CancelPollForVIDVotes(
*vid_cert.view_number,
))
.await;

// Return completed at this point
return (Some(HotShotTaskCompleted::ShutDown), state);
}
}
}
HotShotEvent::Shutdown => return (Some(HotShotTaskCompleted::ShutDown), state),
_ => {
error!("unexpected event {:?}", event);
}
Expand Down Expand Up @@ -206,12 +216,10 @@ where
) -> Option<HotShotTaskCompleted> {
match event {
HotShotEvent::VidVoteRecv(vote) => {
// TODO copy-pasted from DAVoteRecv https://github.com/EspressoSystems/HotShot/issues/1690

// warn!(
// "VID vote recv, Main Task {:?}, key: {:?}",
// vote.current_view,
// self.vid_exchange.public_key()
// self.committee_exchange.public_key()
// );
// Check if we are the leader and the vote is from the sender.
let view = vote.current_view;
Expand Down Expand Up @@ -361,6 +369,9 @@ where
}
}
}
HotShotEvent::VidCertRecv(_) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think we'd want to put this event into the consensus task, since replicas need to check if they have it before voting on a quorum proposal. But this can be done in a separate issue.

// RM TODO
}
HotShotEvent::ViewChange(view) => {
if *self.cur_view >= *view {
return None;
Expand All @@ -371,6 +382,35 @@ where
}
self.cur_view = view;

// Start polling for VID disperse for the new view
self.vid_exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::PollForVIDDisperse(
*self.cur_view + 1,
))
.await;

self.vid_exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::PollForVIDCertificate(
*self.cur_view + 1,
))
.await;

// If we are not the next leader, we should exit
if !self.vid_exchange.is_leader(self.cur_view + 1) {
// panic!("We are not the DA leader for view {}", *self.cur_view + 1);
return None;
}

// Start polling for VID votes for the "next view"
self.vid_exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::PollForVIDVotes(
*self.cur_view + 1,
))
.await;

return None;
}

Expand Down
Loading
Loading