Skip to content

Commit

Permalink
builder event type
Browse files Browse the repository at this point in the history
  • Loading branch information
move47 committed Mar 27, 2024
1 parent 7b62455 commit 6319a37
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 35 deletions.
153 changes: 121 additions & 32 deletions src/events_source.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,117 @@
use async_broadcast::{broadcast, Receiver as BroadcastReceiver, Sender as BroadcastSender};
use async_trait::async_trait;
use futures::stream::{self, BoxStream, Stream, StreamExt};
use hotshot_types::event::{Event, EventType};
use hotshot_types::traits::node_implementation::NodeType;
use hotshot_types::{
data::{DAProposal, QuorumProposal},
error::HotShotError,
event::{error_adaptor, Event, EventType, LeafChain},
message::Proposal,
traits::node_implementation::NodeType,
};
use serde::{Deserialize, Serialize};
use std::num::NonZeroUsize;
use std::sync::Arc;

const RETAINED_EVENTS_COUNT: usize = 4096;

/// A builder event
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(bound(deserialize = "Types: NodeType"))]
pub struct BuilderEvent<Types: NodeType> {
/// The view number that this event originates from
pub view_number: Types::Time,

/// The underlying event
pub event: BuilderEventType<Types>,
}
pub fn get_builder_event_from_hotshot_event<Types: NodeType>(
hotshot_event: Event<Types>,
leader: Option<Types::SignatureKey>,
vid_nodes: Option<NonZeroUsize>,
) -> Option<BuilderEvent<Types>> {
// match the event and generate the builder event
let builder_event = match hotshot_event.event {
EventType::Error { error } => BuilderEventType::HotshotError { error },
EventType::Transactions { transactions } => {
BuilderEventType::HotshotTransactions { transactions }
}
EventType::Decide {
leaf_chain,
block_size,
..
} => BuilderEventType::HotshotDecide {
leaf_chain,
block_size,
},
EventType::DAProposal { proposal, sender } => BuilderEventType::HotshotDAProposal {
proposal,
sender,
leader: leader.unwrap(),
vid_nodes: vid_nodes.unwrap(),
},
EventType::QuorumProposal { proposal, sender } => BuilderEventType::HotshotQuorumProposal {
proposal,
sender,
leader: leader.unwrap(),
},
_ => return None,
};
Some(BuilderEvent {
view_number: hotshot_event.view_number,
event: builder_event,
})
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(bound(deserialize = "Types: NodeType"))]
pub enum BuilderEventType<Types: NodeType> {
HotshotError {
/// The underlying error
#[serde(with = "error_adaptor")]
error: Arc<HotShotError<Types>>,
},
/// Hotshot public mempool transactions
HotshotTransactions {
/// The list of hotshot transactions
transactions: Vec<Types::Transaction>,
},
// Decide event with the chain of decided leaves
HotshotDecide {
/// The chain of decided leaves with its corresponding state and VID info.
leaf_chain: Arc<LeafChain<Types>>,
/// Optional information of the number of transactions in the block
block_size: Option<u64>,
},
/// DA proposal was received from the network
HotshotDAProposal {
/// Contents of the proposal
proposal: Proposal<Types, DAProposal<Types>>,
/// Public key of the leader submitting the proposal
sender: Types::SignatureKey,
/// leader for the view
leader: Types::SignatureKey,
/// nodes in vid calculation
vid_nodes: NonZeroUsize,
},
/// Quorum proposal was received from the network
HotshotQuorumProposal {
/// Contents of the proposal
proposal: Proposal<Types, QuorumProposal<Types>>,
/// Public key of the leader submitting the proposal
sender: Types::SignatureKey,
/// leader for the view
leader: Types::SignatureKey,
},
}

#[async_trait]
pub trait EventsSource<Types>
where
Types: NodeType,
{
type EventStream: Stream<Item = Arc<Event<Types>>> + Unpin + Send + 'static;
type EventStream: Stream<Item = Arc<BuilderEvent<Types>>> + Unpin + Send + 'static;
async fn get_event_stream(&self) -> Self::EventStream;

async fn subscribe_events(&self) -> BoxStream<'static, Arc<Event<Types>>> {
async fn subscribe_events(&self) -> BoxStream<'static, Arc<BuilderEvent<Types>>> {
self.get_event_stream().await.boxed()
}
}
Expand All @@ -24,47 +121,39 @@ pub trait EventConsumer<Types>
where
Types: NodeType,
{
async fn handle_event(&mut self, event: &Event<Types>);
async fn handle_event(
&mut self,
event: Event<Types>,
leader: Option<Types::SignatureKey>,
vid_nodes: Option<NonZeroUsize>,
);
}

#[derive(Debug)]
pub struct EventsStreamer<Types: NodeType> {
to_subscribe_clone_recv: BroadcastReceiver<Arc<Event<Types>>>,
subscriber_send_channel: BroadcastSender<Arc<Event<Types>>>,
to_subscribe_clone_recv: BroadcastReceiver<Arc<BuilderEvent<Types>>>,
subscriber_send_channel: BroadcastSender<Arc<BuilderEvent<Types>>>,
}

#[async_trait]
impl<Types: NodeType> EventConsumer<Types> for EventsStreamer<Types> {
async fn handle_event(&mut self, event: &Event<Types>) {
let filter = match event {
Event {
event: EventType::DAProposal { .. },
..
} => true,
Event {
event: EventType::QuorumProposal { .. },
..
} => true,
Event {
event: EventType::Transactions { .. },
..
} => true,
Event {
event: EventType::Decide { .. },
..
} => true,
Event { .. } => false,
};
if filter {
let event = Arc::new(event.clone());
async fn handle_event(
&mut self,
event: Event<Types>,
leader: Option<Types::SignatureKey>,
vid_nodes: Option<NonZeroUsize>,
) {
let builder_event = get_builder_event_from_hotshot_event(event, leader, vid_nodes);
if builder_event.is_some() {
let event = Arc::new(builder_event.unwrap().clone());
let _status = self.subscriber_send_channel.broadcast(event).await;
}
}
}

#[async_trait]
impl<Types: NodeType> EventsSource<Types> for EventsStreamer<Types> {
type EventStream = BoxStream<'static, Arc<Event<Types>>>;
type EventStream = BoxStream<'static, Arc<BuilderEvent<Types>>>;
async fn get_event_stream(&self) -> Self::EventStream {
let recv_channel = self.to_subscribe_clone_recv.clone();
stream::unfold(recv_channel, move |mut recv_channel| async move {
Expand All @@ -81,7 +170,7 @@ impl<Types: NodeType> EventsSource<Types> for EventsStreamer<Types> {
impl<Types: NodeType> EventsStreamer<Types> {
pub fn new() -> Self {
let (mut subscriber_send_channel, to_subscribe_clone_recv) =
broadcast::<Arc<Event<Types>>>(RETAINED_EVENTS_COUNT);
broadcast::<Arc<BuilderEvent<Types>>>(RETAINED_EVENTS_COUNT);
subscriber_send_channel.set_overflow(true);
EventsStreamer {
subscriber_send_channel,
Expand Down
16 changes: 13 additions & 3 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod tests {
use crate::events::{define_api, Error, Options};
use async_compatibility_layer::art::async_spawn;
use async_compatibility_layer::logging::{setup_backtrace, setup_logging};
use async_std::stream::IntoStream;
use async_std::sync::RwLock;
use futures::stream::StreamExt;
use hotshot_types::constants::{Version01, STATIC_VER_0_1};
Expand Down Expand Up @@ -64,7 +65,12 @@ mod tests {
tracing::info!("Connected to server");

// client subscrive to hotshot events
let mut events = client
let mut events: surf_disco::socket::Connection<
Event<TestTypes>,
surf_disco::socket::Unsupported,
Error,
versioned_binary_serialization::version::StaticVersion<0, 1>,
> = client
.socket("events")
.subscribe::<Event<TestTypes>>()
.await
Expand All @@ -73,7 +79,7 @@ mod tests {
tracing::info!("Subscribed to events");

let total_count = 5;

//let stream = events.into_stream();
// wait for these events to receive
let receive_handle = async_spawn(async move {
let mut receive_count = 0;
Expand All @@ -94,7 +100,11 @@ mod tests {
loop {
let tx_event = generate_event(send_count);
tracing::debug!("Before writing to events_source");
events_streamer.write().await.handle_event(&tx_event).await;
events_streamer
.write()
.await
.handle_event(tx_event.clone(), None, None)
.await;
send_count += 1;
tracing::debug!("After writing to events_source");
tracing::info!("Event sent: {:?}", tx_event);
Expand Down

0 comments on commit 6319a37

Please sign in to comment.