Skip to content

Commit

Permalink
working startup info
Browse files Browse the repository at this point in the history
  • Loading branch information
move47 committed Mar 27, 2024
1 parent 6319a37 commit 67f34fc
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 79 deletions.
153 changes: 87 additions & 66 deletions src/events_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use hotshot_types::{
error::HotShotError,
event::{error_adaptor, Event, EventType, LeafChain},
message::Proposal,
traits::node_implementation::NodeType,
traits::node_implementation::{ConsensusTime, NodeType},
PeerConfig,
};
use serde::{Deserialize, Serialize};
use std::num::NonZeroUsize;
use std::sync::Arc;
const RETAINED_EVENTS_COUNT: usize = 4096;

Expand All @@ -23,47 +23,46 @@ pub struct BuilderEvent<Types: NodeType> {
/// The underlying event
pub event: BuilderEventType<Types>,
}
pub fn get_builder_event_from_hotshot_event<Types: NodeType>(
hotshot_event: Event<Types>,
leader: Option<Types::SignatureKey>,
vid_nodes: Option<NonZeroUsize>,
) -> Option<BuilderEvent<Types>> {
// match the event and generate the builder event
let builder_event = match hotshot_event.event {
EventType::Error { error } => BuilderEventType::HotshotError { error },
EventType::Transactions { transactions } => {
BuilderEventType::HotshotTransactions { transactions }

// impl From event to builder event
impl<Types: NodeType> From<Event<Types>> for BuilderEvent<Types> {
fn from(event: Event<Types>) -> Self {
BuilderEvent {
view_number: event.view_number,
event: match event.event {
EventType::Error { error } => BuilderEventType::HotshotError { error },
EventType::Transactions { transactions } => {
BuilderEventType::HotshotTransactions { transactions }
}
EventType::Decide {
leaf_chain,
block_size,
..
} => BuilderEventType::HotshotDecide {
leaf_chain,
block_size,
},
EventType::DAProposal { proposal, sender } => {
BuilderEventType::HotshotDAProposal { proposal, sender }
}
EventType::QuorumProposal { proposal, sender } => {
BuilderEventType::HotshotQuorumProposal { proposal, sender }
}
_ => BuilderEventType::Unknown,
},
}
EventType::Decide {
leaf_chain,
block_size,
..
} => BuilderEventType::HotshotDecide {
leaf_chain,
block_size,
},
EventType::DAProposal { proposal, sender } => BuilderEventType::HotshotDAProposal {
proposal,
sender,
leader: leader.unwrap(),
vid_nodes: vid_nodes.unwrap(),
},
EventType::QuorumProposal { proposal, sender } => BuilderEventType::HotshotQuorumProposal {
proposal,
sender,
leader: leader.unwrap(),
},
_ => return None,
};
Some(BuilderEvent {
view_number: hotshot_event.view_number,
event: builder_event,
})
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(bound(deserialize = "Types: NodeType"))]
pub enum BuilderEventType<Types: NodeType> {
// Information required by the builder to create a membership to get view leader
StarupInfo {
known_node_with_stake: Vec<PeerConfig<Types::SignatureKey>>,
non_statekd_node_count: usize,
},
/// Hotshot error
HotshotError {
/// The underlying error
#[serde(with = "error_adaptor")]
Expand All @@ -87,20 +86,15 @@ pub enum BuilderEventType<Types: NodeType> {
proposal: Proposal<Types, DAProposal<Types>>,
/// Public key of the leader submitting the proposal
sender: Types::SignatureKey,
/// leader for the view
leader: Types::SignatureKey,
/// nodes in vid calculation
vid_nodes: NonZeroUsize,
},
/// Quorum proposal was received from the network
HotshotQuorumProposal {
/// Contents of the proposal
proposal: Proposal<Types, QuorumProposal<Types>>,
/// Public key of the leader submitting the proposal
sender: Types::SignatureKey,
/// leader for the view
leader: Types::SignatureKey,
},
Unknown,
}

#[async_trait]
Expand All @@ -121,32 +115,45 @@ pub trait EventConsumer<Types>
where
Types: NodeType,
{
async fn handle_event(
&mut self,
event: Event<Types>,
leader: Option<Types::SignatureKey>,
vid_nodes: Option<NonZeroUsize>,
);
async fn handle_event(&mut self, event: Event<Types>);
}

#[derive(Debug)]
pub struct EventsStreamer<Types: NodeType> {
// required for api subscription
to_subscribe_clone_recv: BroadcastReceiver<Arc<BuilderEvent<Types>>>,
subscriber_send_channel: BroadcastSender<Arc<BuilderEvent<Types>>>,

// required for sending startup info
known_nodes_with_stake: Vec<PeerConfig<Types::SignatureKey>>,
non_staked_node_count: usize,
}

#[async_trait]
impl<Types: NodeType> EventConsumer<Types> for EventsStreamer<Types> {
async fn handle_event(
&mut self,
event: Event<Types>,
leader: Option<Types::SignatureKey>,
vid_nodes: Option<NonZeroUsize>,
) {
let builder_event = get_builder_event_from_hotshot_event(event, leader, vid_nodes);
if builder_event.is_some() {
let event = Arc::new(builder_event.unwrap().clone());
let _status = self.subscriber_send_channel.broadcast(event).await;
async fn handle_event(&mut self, event: Event<Types>) {
let filter = match event {
Event {
event: EventType::DAProposal { .. },
..
} => true,
Event {
event: EventType::QuorumProposal { .. },
..
} => true,
Event {
event: EventType::Transactions { .. },
..
} => true,
Event {
event: EventType::Decide { .. },
..
} => true,
Event { .. } => false,
};
if filter {
let builder_event = Arc::new(BuilderEvent::from(event));
let _status = self.subscriber_send_channel.broadcast(builder_event).await;
}
}
}
Expand All @@ -156,6 +163,13 @@ impl<Types: NodeType> EventsSource<Types> for EventsStreamer<Types> {
type EventStream = BoxStream<'static, Arc<BuilderEvent<Types>>>;
async fn get_event_stream(&self) -> Self::EventStream {
let recv_channel = self.to_subscribe_clone_recv.clone();

let startup_event = self.get_startup_event();
self.subscriber_send_channel
.broadcast(Arc::new(startup_event))
.await
.expect("Failed to send startup event");

stream::unfold(recv_channel, move |mut recv_channel| async move {
let event_res = recv_channel.recv().await;
if event_res.is_err() {
Expand All @@ -166,21 +180,28 @@ impl<Types: NodeType> EventsSource<Types> for EventsStreamer<Types> {
.boxed()
}
}

impl<Types: NodeType> EventsStreamer<Types> {
pub fn new() -> Self {
pub fn new(
known_nodes_with_stake: Vec<PeerConfig<Types::SignatureKey>>,
non_staked_node_count: usize,
) -> Self {
let (mut subscriber_send_channel, to_subscribe_clone_recv) =
broadcast::<Arc<BuilderEvent<Types>>>(RETAINED_EVENTS_COUNT);
subscriber_send_channel.set_overflow(true);
EventsStreamer {
subscriber_send_channel,
to_subscribe_clone_recv,
known_nodes_with_stake,
non_staked_node_count,
}
}
}

impl<Types: NodeType> Default for EventsStreamer<Types> {
fn default() -> Self {
Self::new()
pub fn get_startup_event(&self) -> BuilderEvent<Types> {
BuilderEvent {
view_number: Types::Time::genesis(),
event: BuilderEventType::StarupInfo {
known_node_with_stake: self.known_nodes_with_stake.clone(),
non_statekd_node_count: self.non_staked_node_count,
},
}
}
}
63 changes: 50 additions & 13 deletions src/test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(test)]
mod tests {
use crate::events_source::{EventConsumer, EventsStreamer}; // EventsUpdater};
//use crate::fetch::Fetch;
use crate::events_source::{BuilderEvent, EventConsumer, EventsStreamer}; // EventsUpdater};
//use crate::fetch::Fetch;
use crate::events::{define_api, Error, Options};
use async_compatibility_layer::art::async_spawn;
use async_compatibility_layer::logging::{setup_backtrace, setup_logging};
Expand Down Expand Up @@ -38,7 +38,12 @@ mod tests {
let port = portpicker::pick_unused_port().expect("Could not find an open port");
let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();

let events_streamer = Arc::new(RwLock::new(EventsStreamer::new()));
let known_nodes_with_stake = vec![];
let non_staked_node_count = 0;
let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
known_nodes_with_stake,
non_staked_node_count,
)));

// Start the web server.
let mut app = App::<_, Error, Version01>::with_state(events_streamer.clone());
Expand All @@ -65,19 +70,33 @@ mod tests {
tracing::info!("Connected to server");

// client subscrive to hotshot events
let mut events: surf_disco::socket::Connection<
Event<TestTypes>,
surf_disco::socket::Unsupported,
Error,
versioned_binary_serialization::version::StaticVersion<0, 1>,
> = client
let mut events = client
.socket("events")
.subscribe::<Event<TestTypes>>()
.subscribe::<BuilderEvent<TestTypes>>()
.await
.unwrap();

tracing::info!("Subscribed to events");

// Start a client.
let client_2 = Client::<Error, Version01>::new(
format!("http://localhost:{}/hotshot_events", port)
.parse()
.unwrap(),
);
assert!(client_2.connect(Some(Duration::from_secs(60))).await);

tracing::info!("C2 Connected to server");

// client subscrive to hotshot events
let mut events_2 = client_2
.socket("events")
.subscribe::<BuilderEvent<TestTypes>>()
.await
.unwrap();

tracing::info!("C2 Subscribed to events");

let total_count = 5;
//let stream = events.into_stream();
// wait for these events to receive
Expand All @@ -87,12 +106,29 @@ mod tests {
let event = events.next().await.unwrap();
tracing::info!("Received event: {:?}", event);
receive_count += 1;
if receive_count >= total_count {
if receive_count >= total_count + 1 {
tracing::info!("Received all sent events, exiting loop");
break;
}
}
// Offest 1 is due to the startup event info
assert_eq!(receive_count, total_count + 1);
});

// wait for these events to receive
let receive_handle_2 = async_spawn(async move {
let mut receive_count = 0;
loop {
let event = events_2.next().await.unwrap();
tracing::info!("Received event: {:?}", event);
receive_count += 1;
if receive_count >= total_count + 1 {
tracing::info!("Received all sent events, exiting loop");
break;
}
}
assert_eq!(receive_count, total_count);
// Offest 1 is due to the startup event info
assert_eq!(receive_count, total_count + 1);
});

let send_handle = async_spawn(async move {
Expand All @@ -103,7 +139,7 @@ mod tests {
events_streamer
.write()
.await
.handle_event(tx_event.clone(), None, None)
.handle_event(tx_event.clone())
.await;
send_count += 1;
tracing::debug!("After writing to events_source");
Expand All @@ -116,5 +152,6 @@ mod tests {

send_handle.await;
receive_handle.await;
receive_handle_2.await;
}
}

0 comments on commit 67f34fc

Please sign in to comment.