Skip to content

Commit

Permalink
Cache last published event
Browse files Browse the repository at this point in the history
let subscribers receive it upon subscription of they wish
  • Loading branch information
goodhoko committed Aug 6, 2024
1 parent 4ce67b5 commit d4622ae
Showing 1 changed file with 76 additions and 3 deletions.
79 changes: 76 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,12 @@ enum SystemState {

/// A marker trait for types which participate in the publish-subscribe system
/// of the actor framework.
pub trait Event: Clone + std::any::Any + 'static {}
pub trait Event: Clone + std::any::Any + 'static + Send + Sync {}

#[derive(Default)]
struct EventSubscribers {
events: HashMap<TypeId, Vec<EventCallback>>,
last_value_cache: HashMap<TypeId, Arc<dyn std::any::Any + Send + Sync>>,
}

/// Contains the "metadata" of the system, including information about the registry
Expand Down Expand Up @@ -290,6 +291,13 @@ impl<M> Context<M> {
{
self.system_handle.subscribe_recipient::<M, E>(self.myself.clone());
}

pub fn subscribe_and_receive_latest<E: Event + Into<M>>(&self) -> Result<(), SendError>
where
M: 'static,
{
self.system_handle.subscribe_and_receive_latest::<M, E>(self.myself.clone())
}
}

/// Capacity of actor's normal- and high-priority inboxes.
Expand Down Expand Up @@ -723,13 +731,36 @@ impl SystemHandle {
}));
}

/// Subscribe given `recipient` to events of type `E`. See [`Context::subscribe()`].
pub fn subscribe_and_receive_latest<M: 'static, E: Event + Into<M>>(
&self,
recipient: Recipient<M>,
) -> Result<(), SendError> {
// Send the last cached value (if there is one)
if let Some(last_cached_value) =
self.event_subscribers.read().last_value_cache.get(&TypeId::of::<E>())
{
if let Some(msg) = last_cached_value.downcast_ref::<E>() {
recipient.send(msg.clone().into())?;
}
}

self.subscribe_recipient::<M, E>(recipient);

Ok(())
}

/// Publish an event. All actors that have previously subscribed to the type will receive it.
///
/// When sending to some subscriber fails, others are still tried and vec of errors is returned.
/// For direct, non-[`Clone`] or high-throughput messages please use [`Addr`] or [`Recipient`].
pub fn publish<E: Event>(&self, event: E) -> Result<(), PublishError> {
let event_subscribers = self.event_subscribers.read();
if let Some(subs) = event_subscribers.events.get(&TypeId::of::<E>()) {
let mut event_subscribers = self.event_subscribers.write();
let type_id = TypeId::of::<E>();

event_subscribers.last_value_cache.insert(type_id, Arc::new(event.clone()));

if let Some(subs) = event_subscribers.events.get(&type_id) {
let errors: Vec<SendError> = subs
.iter()
.filter_map(|subscriber_callback| subscriber_callback(&event).err())
Expand Down Expand Up @@ -1287,4 +1318,46 @@ mod tests {
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
);
}

#[test]
fn last_cached_event() {
impl Event for () {}

struct Subscriber;
impl Actor for Subscriber {
type Message = ();
type Context = Context<Self::Message>;
type Error = ();

fn started(&mut self, context: &mut Self::Context) {
context
.subscribe_and_receive_latest::<Self::Message>()
.expect("can receive last cached value");
}

fn handle(
&mut self,
context: &mut Self::Context,
_: Self::Message,
) -> Result<(), Self::Error> {
println!("Event received!");
context.system_handle.shutdown().unwrap();
Ok(())
}

fn name() -> &'static str {
"recipient"
}
}

let mut system = System::new("last cached event");
system.publish(()).expect("can publish event");

// This test will block indefinitely if the event isn't delivered.
system
.prepare(Subscriber)
.with_addr(Addr::with_capacity(1))
.run_and_block()
.expect("actor finishes successfully");
}
}

0 comments on commit d4622ae

Please sign in to comment.