Skip to content

Commit

Permalink
sim-rs: use better vote request model to reduce network traffic
Browse files Browse the repository at this point in the history
  • Loading branch information
SupernaviX committed Nov 26, 2024
1 parent 1562519 commit 7bce079
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 71 deletions.
10 changes: 8 additions & 2 deletions sim-rs/sim-cli/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl EventMonitor {
let mut ibs_containing_tx: BTreeMap<TransactionId, f64> = BTreeMap::new();
let mut ebs_containing_ib: BTreeMap<InputBlockId, f64> = BTreeMap::new();
let mut pending_ibs: BTreeSet<InputBlockId> = BTreeSet::new();
let mut votes_per_bundle: BTreeMap<(u64, NodeId), f64> = BTreeMap::new();
let mut eb_votes: BTreeMap<EndorserBlockId, f64> = BTreeMap::new();

let mut last_timestamp = Timestamp(Duration::from_secs(0));
Expand Down Expand Up @@ -246,8 +247,9 @@ impl EventMonitor {
Event::EndorserBlockReceived { .. } => {
eb_messages.received += 1;
}
Event::Vote { eb, .. } => {
Event::Vote { eb, producer, slot } => {
total_votes += 1;
*votes_per_bundle.entry((slot, producer)).or_default() += 1.0;
*eb_votes.entry(eb).or_default() += 1.0;
}
Event::NoVote { .. } => {}
Expand Down Expand Up @@ -373,8 +375,12 @@ impl EventMonitor {
expired_ibs, generated_ibs,
);
info!("{} total votes were generated.", total_votes);
info!("Each EB received an average of {:.3} vote(s) (stddev {:3})..",
info!("Each EB received an average of {:.3} vote(s) (stddev {:.3}).",
votes_per_eb.mean, votes_per_eb.std_dev);
let bundle_count = votes_per_bundle.len();
let votes_per_bundle = compute_stats(votes_per_bundle.into_values());
info!("There were {bundle_count} bundle(s) of votes. Each bundle contained {:.3} vote(s) (stddev {:.3}).",
votes_per_bundle.mean, votes_per_bundle.std_dev);
});

info_span!("network").in_scope(|| {
Expand Down
28 changes: 9 additions & 19 deletions sim-rs/sim-core/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
config::NodeId,
model::{
Block, EndorserBlock, EndorserBlockId, InputBlock, InputBlockHeader, InputBlockId,
NoVoteReason, Transaction, TransactionId, Vote,
NoVoteReason, Transaction, TransactionId, VoteBundle,
},
};

Expand Down Expand Up @@ -217,12 +217,8 @@ impl EventTracker {
});
}

pub fn track_vote(&self, vote: &Vote) {
self.send(Event::Vote {
slot: vote.slot,
producer: vote.producer,
eb: vote.eb,
});
pub fn track_vote(&self, slot: u64, producer: NodeId, eb: EndorserBlockId) {
self.send(Event::Vote { slot, producer, eb });
}

pub fn track_no_vote(
Expand All @@ -240,25 +236,19 @@ impl EventTracker {
});
}

pub fn track_votes_sent(&self, votes: &[Vote], sender: NodeId, recipient: NodeId) {
let Some(Vote { slot, producer, .. }) = votes.first() else {
return;
};
pub fn track_votes_sent(&self, votes: &VoteBundle, sender: NodeId, recipient: NodeId) {
self.send(Event::VotesSent {
slot: *slot,
producer: *producer,
slot: votes.slot,
producer: votes.producer,
sender,
recipient,
});
}

pub fn track_votes_received(&self, votes: &[Vote], sender: NodeId, recipient: NodeId) {
let Some(Vote { slot, producer, .. }) = votes.first() else {
return;
};
pub fn track_votes_received(&self, votes: &VoteBundle, sender: NodeId, recipient: NodeId) {
self.send(Event::VotesReceived {
slot: *slot,
producer: *producer,
slot: votes.slot,
producer: votes.producer,
sender,
recipient,
});
Expand Down
4 changes: 2 additions & 2 deletions sim-rs/sim-core/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ impl EndorserBlock {
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
pub struct Vote {
pub struct VoteBundle {
pub slot: u64,
pub producer: NodeId,
pub eb: EndorserBlockId,
pub ebs: Vec<EndorserBlockId>, // contains duplicates
}

#[derive(Debug, Clone, Serialize)]
Expand Down
10 changes: 7 additions & 3 deletions sim-rs/sim-core/src/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
events::EventTracker,
model::{
Block, EndorserBlock, EndorserBlockId, InputBlock, InputBlockHeader, InputBlockId,
Transaction, TransactionId, Vote,
Transaction, TransactionId, VoteBundle,
},
network::Network,
};
Expand Down Expand Up @@ -224,7 +224,9 @@ enum SimulationMessage {
RequestEB(EndorserBlockId),
EB(Arc<EndorserBlock>),
// Get out the vote
Votes(Arc<Vec<Vote>>),
AnnounceVotes(u64, NodeId),
RequestVotes(u64, NodeId),
Votes(Arc<VoteBundle>),
}

impl HasBytesSize for SimulationMessage {
Expand All @@ -250,7 +252,9 @@ impl HasBytesSize for SimulationMessage {
Self::RequestEB(_) => 8,
Self::EB(_) => 32,

Self::Votes(v) => 8 * v.len() as u64,
Self::AnnounceVotes(_, _) => 8,
Self::RequestVotes(_, _) => 8,
Self::Votes(v) => 8 * v.ebs.len() as u64,
}
}
}
98 changes: 53 additions & 45 deletions sim-rs/sim-core/src/sim/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
events::EventTracker,
model::{
Block, EndorserBlock, EndorserBlockId, InputBlock, InputBlockHeader, InputBlockId,
NoVoteReason, Transaction, TransactionId, Vote,
NoVoteReason, Transaction, TransactionId, VoteBundle,
},
network::{NetworkSink, NetworkSource},
};
Expand Down Expand Up @@ -68,8 +68,8 @@ struct NodeLeiosState {
ibs_by_slot: BTreeMap<u64, Vec<InputBlockId>>,
ebs: BTreeMap<EndorserBlockId, Arc<EndorserBlock>>,
ebs_by_slot: BTreeMap<u64, Vec<EndorserBlockId>>,
voter_slots_seen: BTreeMap<NodeId, BTreeSet<u64>>,
votes_by_eb: BTreeMap<EndorserBlockId, Vec<Vote>>,
votes_by_eb: BTreeMap<EndorserBlockId, u64>,
votes: BTreeMap<(u64, NodeId), Arc<VoteBundle>>,
}

enum InputBlockState {
Expand Down Expand Up @@ -210,6 +210,12 @@ impl Node {
}

// Voting
SimulationMessage::AnnounceVotes(slot, producer) => {
self.receive_announce_votes(from, slot, producer)?;
}
SimulationMessage::RequestVotes(slot, producer) => {
self.receive_request_votes(from, slot, producer)?;
}
SimulationMessage::Votes(votes) => {
self.receive_votes(from, votes)?;
}
Expand Down Expand Up @@ -325,17 +331,14 @@ impl Node {
// For every VRF lottery you won, you can vote for every EB
vrf_wins * ebs.len()
};
let votes = ebs
.iter()
.cycle()
.map(|eb_id| Vote {
slot,
producer: self.id,
eb: *eb_id,
})
.take(votes_allowed)
.collect();
self.send_votes(votes)?;
let votes = VoteBundle {
slot,
producer: self.id,
ebs: ebs.iter().cloned().cycle().take(votes_allowed).collect(),
};
if !votes.ebs.is_empty() {
self.send_votes(votes)?;
}
Ok(())
}

Expand Down Expand Up @@ -616,33 +619,42 @@ impl Node {
Ok(())
}

fn receive_votes(&mut self, from: NodeId, votes: Arc<Vec<Vote>>) -> Result<()> {
fn receive_announce_votes(&mut self, from: NodeId, slot: u64, producer: NodeId) -> Result<()> {
if !self.leios.votes.contains_key(&(slot, producer)) {
self.send_to(from, SimulationMessage::RequestVotes(slot, producer))?;
}
Ok(())
}

fn receive_request_votes(&mut self, from: NodeId, slot: u64, producer: NodeId) -> Result<()> {
if let Some(votes) = self.leios.votes.get(&(slot, producer)) {
self.tracker.track_votes_sent(votes, self.id, from);
self.send_to(from, SimulationMessage::Votes(votes.clone()))?;
}
Ok(())
}

fn receive_votes(&mut self, from: NodeId, votes: Arc<VoteBundle>) -> Result<()> {
let slot = votes.slot;
let producer = votes.producer;
self.tracker.track_votes_received(&votes, from, self.id);
let producer = votes[0].producer;
let slot = votes[0].slot;
if !self
if self
.leios
.voter_slots_seen
.entry(producer)
.or_default()
.insert(slot)
.votes
.insert((votes.slot, votes.producer), votes.clone())
.is_some()
{
return Ok(());
}
for vote in votes.iter() {
self.leios
.votes_by_eb
.entry(vote.eb)
.or_default()
.push(vote.clone());
for eb in votes.ebs.iter() {
*self.leios.votes_by_eb.entry(*eb).or_default() += 1;
}
// We haven't seen these votes before, so propagate them to our neighbors
for peer in &self.peers {
if *peer == from {
continue;
}
self.tracker.track_votes_sent(&votes, self.id, *peer);
self.send_to(*peer, SimulationMessage::Votes(votes.clone()))?;
self.send_to(*peer, SimulationMessage::AnnounceVotes(slot, producer))?;
}
Ok(())
}
Expand Down Expand Up @@ -761,24 +773,20 @@ impl Node {
Ok(())
}

fn send_votes(&mut self, votes: Vec<Vote>) -> Result<()> {
for vote in &votes {
self.tracker.track_vote(vote);
self.leios
.votes_by_eb
.entry(vote.eb)
.or_default()
.push(vote.clone());
fn send_votes(&mut self, votes: VoteBundle) -> Result<()> {
for eb in &votes.ebs {
self.tracker.track_vote(votes.slot, votes.producer, *eb);
*self.leios.votes_by_eb.entry(*eb).or_default() += 1;
}
self.leios
.voter_slots_seen
.entry(self.id)
.or_default()
.insert(votes[0].slot);
let votes = Arc::new(votes);
self.leios
.votes
.insert((votes.slot, votes.producer), votes.clone());
for peer in &self.peers {
self.tracker.track_votes_sent(&votes, self.id, *peer);
self.send_to(*peer, SimulationMessage::Votes(votes.clone()))?;
self.send_to(
*peer,
SimulationMessage::AnnounceVotes(votes.slot, votes.producer),
)?;
}
Ok(())
}
Expand Down

0 comments on commit 7bce079

Please sign in to comment.