diff --git a/src/events_source.rs b/src/events_source.rs index 398f77f..29506bb 100644 --- a/src/events_source.rs +++ b/src/events_source.rs @@ -1,20 +1,117 @@ use async_broadcast::{broadcast, Receiver as BroadcastReceiver, Sender as BroadcastSender}; use async_trait::async_trait; use futures::stream::{self, BoxStream, Stream, StreamExt}; -use hotshot_types::event::{Event, EventType}; -use hotshot_types::traits::node_implementation::NodeType; +use hotshot_types::{ + data::{DAProposal, QuorumProposal}, + error::HotShotError, + event::{error_adaptor, Event, EventType, LeafChain}, + message::Proposal, + traits::node_implementation::NodeType, +}; +use serde::{Deserialize, Serialize}; +use std::num::NonZeroUsize; use std::sync::Arc; - const RETAINED_EVENTS_COUNT: usize = 4096; + +/// A builder event +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(bound(deserialize = "Types: NodeType"))] +pub struct BuilderEvent { + /// The view number that this event originates from + pub view_number: Types::Time, + + /// The underlying event + pub event: BuilderEventType, +} +pub fn get_builder_event_from_hotshot_event( + hotshot_event: Event, + leader: Option, + vid_nodes: Option, +) -> Option> { + // match the event and generate the builder event + let builder_event = match hotshot_event.event { + EventType::Error { error } => BuilderEventType::HotshotError { error }, + EventType::Transactions { transactions } => { + BuilderEventType::HotshotTransactions { transactions } + } + EventType::Decide { + leaf_chain, + block_size, + .. + } => BuilderEventType::HotshotDecide { + leaf_chain, + block_size, + }, + EventType::DAProposal { proposal, sender } => BuilderEventType::HotshotDAProposal { + proposal, + sender, + leader: leader.unwrap(), + vid_nodes: vid_nodes.unwrap(), + }, + EventType::QuorumProposal { proposal, sender } => BuilderEventType::HotshotQuorumProposal { + proposal, + sender, + leader: leader.unwrap(), + }, + _ => return None, + }; + Some(BuilderEvent { + view_number: hotshot_event.view_number, + event: builder_event, + }) +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(bound(deserialize = "Types: NodeType"))] +pub enum BuilderEventType { + HotshotError { + /// The underlying error + #[serde(with = "error_adaptor")] + error: Arc>, + }, + /// Hotshot public mempool transactions + HotshotTransactions { + /// The list of hotshot transactions + transactions: Vec, + }, + // Decide event with the chain of decided leaves + HotshotDecide { + /// The chain of decided leaves with its corresponding state and VID info. + leaf_chain: Arc>, + /// Optional information of the number of transactions in the block + block_size: Option, + }, + /// DA proposal was received from the network + HotshotDAProposal { + /// Contents of the proposal + proposal: Proposal>, + /// Public key of the leader submitting the proposal + sender: Types::SignatureKey, + /// leader for the view + leader: Types::SignatureKey, + /// nodes in vid calculation + vid_nodes: NonZeroUsize, + }, + /// Quorum proposal was received from the network + HotshotQuorumProposal { + /// Contents of the proposal + proposal: Proposal>, + /// Public key of the leader submitting the proposal + sender: Types::SignatureKey, + /// leader for the view + leader: Types::SignatureKey, + }, +} + #[async_trait] pub trait EventsSource where Types: NodeType, { - type EventStream: Stream>> + Unpin + Send + 'static; + type EventStream: Stream>> + Unpin + Send + 'static; async fn get_event_stream(&self) -> Self::EventStream; - async fn subscribe_events(&self) -> BoxStream<'static, Arc>> { + async fn subscribe_events(&self) -> BoxStream<'static, Arc>> { self.get_event_stream().await.boxed() } } @@ -24,39 +121,31 @@ pub trait EventConsumer where Types: NodeType, { - async fn handle_event(&mut self, event: &Event); + async fn handle_event( + &mut self, + event: Event, + leader: Option, + vid_nodes: Option, + ); } #[derive(Debug)] pub struct EventsStreamer { - to_subscribe_clone_recv: BroadcastReceiver>>, - subscriber_send_channel: BroadcastSender>>, + to_subscribe_clone_recv: BroadcastReceiver>>, + subscriber_send_channel: BroadcastSender>>, } #[async_trait] impl EventConsumer for EventsStreamer { - async fn handle_event(&mut self, event: &Event) { - let filter = match event { - Event { - event: EventType::DAProposal { .. }, - .. - } => true, - Event { - event: EventType::QuorumProposal { .. }, - .. - } => true, - Event { - event: EventType::Transactions { .. }, - .. - } => true, - Event { - event: EventType::Decide { .. }, - .. - } => true, - Event { .. } => false, - }; - if filter { - let event = Arc::new(event.clone()); + async fn handle_event( + &mut self, + event: Event, + leader: Option, + vid_nodes: Option, + ) { + let builder_event = get_builder_event_from_hotshot_event(event, leader, vid_nodes); + if builder_event.is_some() { + let event = Arc::new(builder_event.unwrap().clone()); let _status = self.subscriber_send_channel.broadcast(event).await; } } @@ -64,7 +153,7 @@ impl EventConsumer for EventsStreamer { #[async_trait] impl EventsSource for EventsStreamer { - type EventStream = BoxStream<'static, Arc>>; + type EventStream = BoxStream<'static, Arc>>; async fn get_event_stream(&self) -> Self::EventStream { let recv_channel = self.to_subscribe_clone_recv.clone(); stream::unfold(recv_channel, move |mut recv_channel| async move { @@ -81,7 +170,7 @@ impl EventsSource for EventsStreamer { impl EventsStreamer { pub fn new() -> Self { let (mut subscriber_send_channel, to_subscribe_clone_recv) = - broadcast::>>(RETAINED_EVENTS_COUNT); + broadcast::>>(RETAINED_EVENTS_COUNT); subscriber_send_channel.set_overflow(true); EventsStreamer { subscriber_send_channel, diff --git a/src/test.rs b/src/test.rs index 173b9c5..3707b6b 100644 --- a/src/test.rs +++ b/src/test.rs @@ -5,6 +5,7 @@ mod tests { use crate::events::{define_api, Error, Options}; use async_compatibility_layer::art::async_spawn; use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; + use async_std::stream::IntoStream; use async_std::sync::RwLock; use futures::stream::StreamExt; use hotshot_types::constants::{Version01, STATIC_VER_0_1}; @@ -64,7 +65,12 @@ mod tests { tracing::info!("Connected to server"); // client subscrive to hotshot events - let mut events = client + let mut events: surf_disco::socket::Connection< + Event, + surf_disco::socket::Unsupported, + Error, + versioned_binary_serialization::version::StaticVersion<0, 1>, + > = client .socket("events") .subscribe::>() .await @@ -73,7 +79,7 @@ mod tests { tracing::info!("Subscribed to events"); let total_count = 5; - + //let stream = events.into_stream(); // wait for these events to receive let receive_handle = async_spawn(async move { let mut receive_count = 0; @@ -94,7 +100,11 @@ mod tests { loop { let tx_event = generate_event(send_count); tracing::debug!("Before writing to events_source"); - events_streamer.write().await.handle_event(&tx_event).await; + events_streamer + .write() + .await + .handle_event(tx_event.clone(), None, None) + .await; send_count += 1; tracing::debug!("After writing to events_source"); tracing::info!("Event sent: {:?}", tx_event);