Skip to content

Commit

Permalink
Cache last published event (#87)
Browse files Browse the repository at this point in the history
* Make pub_sub bench more realistic: use `String` payload instead of zero-sized-type

The benchmark results are therefore not comparable before/after (they are 10% to 30% slower on my laptop).

* Cache last published event

and let subscribers receive it upon subscription.

Closes #86

* Try to use dashmap

This fixes the performance regression for me, but we need to think whether that's worth the dependency tradeoff.

* Split `Event` and `CacheableEvent`

With a small trick (that unfortunately leaks to a public type) we were able to implement a kind of specialization [1] with perfect behavior: `Event`s don't pay the price for caching, and it is not possible to subscribe_and_receive_latest() to bare `Event`s.

Also, it means that we didn't have to add bounds to `Event` [2].

[1] I thought the orphan rule would prevent us from adding the blanked implementation, but not!
[2] I've sneaked in the removal of `'static`, but that's mostly aesthetic: `std::any::Any` requires it anyway.

* Extend pub_sub benchmark with CacheableEvent type

* Use DashMap also for the subscriber callbacks map

...now that we have it in the dependency tree anyway.

Allows us to remove the `RwLock` which in turn allows us to deduplicate subscribe_recipient() vs. subscribe_and_receive_latest().

Let's think about the possible situations of "publishing the first message" vs. "just subscribing and getting latest".

* Drop `dashmap` and reinstate `RwLock` again, but try to lock efficiently

* Deduplicate subscription code into EventSubscribers::subscribe_recipient()

* Back to dashmap, but keep deduplication and extend comments

* Update src/lib.rs

Co-authored-by: Matěj Laitl <[email protected]>

---------

Co-authored-by: Matěj Laitl <[email protected]>
  • Loading branch information
goodhoko and strohel authored Aug 13, 2024
1 parent 4ce67b5 commit 4ea3712
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ keywords = ["actor", "threads"]
flume = { version = "0.10", default-features = false, features = ["select"] }
log = "0.4"
parking_lot = "0.12"
dashmap = "6"

[dev-dependencies]
anyhow = "1"
Expand Down
17 changes: 13 additions & 4 deletions benches/pub_sub.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use anyhow::Error;
use criterion::{criterion_group, criterion_main, Criterion};
use std::time::{Duration, Instant};
use std::{
hint::black_box,
time::{Duration, Instant},
};
use tonari_actor::{Actor, Context, Event, Recipient, System};

const PAYLOAD: &str = "This is the payload that will be used in the test event. It should be of \
reasonable size for a representative event, which is hard to determine. \
But let's say 3 lines of text is fine for now.";

#[derive(Debug, Clone)]
struct StringEvent;
struct StringEvent(String);

impl Event for StringEvent {}

Expand Down Expand Up @@ -45,7 +52,7 @@ impl Actor for PublisherActor {
PublisherMessage::PublishEvents => {
let start = Instant::now();
for _i in 0..self.iterations {
context.system_handle.publish(StringEvent)?;
context.system_handle.publish(StringEvent(PAYLOAD.to_string()))?;
}
let elapsed = start.elapsed();

Expand Down Expand Up @@ -86,8 +93,10 @@ impl Actor for SubscriberActor {
fn handle(
&mut self,
_context: &mut Self::Context,
_message: Self::Message,
message: Self::Message,
) -> Result<(), Self::Error> {
// This black_box has a nice side effect that it silences the 'field is never read' warning.
black_box(message.0);
Ok(())
}
}
Expand Down
3 changes: 2 additions & 1 deletion rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use_small_heuristics="Max"
format_strings = true
use_small_heuristics = "Max"
imports_granularity = "Crate"
match_block_trailing_comma = true
reorder_impl_items = true
Expand Down
125 changes: 114 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,29 @@ enum SystemState {

/// A marker trait for types which participate in the publish-subscribe system
/// of the actor framework.
pub trait Event: Clone + std::any::Any + 'static {}
pub trait Event: Clone + std::any::Any + Send + Sync {}

#[derive(Default)]
struct EventSubscribers {
events: HashMap<TypeId, Vec<EventCallback>>,
/// We cache the last published value of each event type.
/// Subscribers can request to receive it upon subscription.
/// Use a concurrent map type, benchmarks in #87 have shown that there is a contention on
/// updating the cached value otherwise.
last_value_cache: dashmap::DashMap<TypeId, Box<dyn std::any::Any + Send + Sync>>,
}

impl EventSubscribers {
fn subscribe_recipient<M: 'static, E: Event + Into<M>>(&mut self, recipient: Recipient<M>) {
let subs = self.events.entry(TypeId::of::<E>()).or_default();
subs.push(Box::new(move |e| {
if let Some(event) = e.downcast_ref::<E>() {
let msg = event.clone();
recipient.send(msg.into())?;
}
Ok(())
}));
}
}

/// Contains the "metadata" of the system, including information about the registry
Expand Down Expand Up @@ -290,6 +308,22 @@ impl<M> Context<M> {
{
self.system_handle.subscribe_recipient::<M, E>(self.myself.clone());
}

/// Subscribe current actor to event of type `E` and send the last cached event to it.
/// This is part of the event system. You don't need to call this method to receive
/// direct messages sent using [`Addr`] and [`Recipient`].
///
/// Note that subscribing twice to the same event would result in duplicate events -- no
/// de-duplication of subscriptions is performed.
///
/// This method may fail if it is not possible to send the latest event. In this case it is
/// guaranteed that the subscription did not take place. You can safely try again.
pub fn subscribe_and_receive_latest<E: Event + Into<M>>(&self) -> Result<(), SendError>
where
M: 'static,
{
self.system_handle.subscribe_and_receive_latest::<M, E>(self.myself.clone())
}
}

/// Capacity of actor's normal- and high-priority inboxes.
Expand Down Expand Up @@ -322,7 +356,8 @@ impl From<usize> for Capacity {
/// A builder for configuring [`Actor`] spawning.
/// You can specify your own [`Addr`] for the Actor, or let the system create
/// a new address with either provided or default capacity.
#[must_use = "You must call .with_addr(), .with_capacity(), or .with_default_capacity() to configure this builder"]
#[must_use = "You must call .with_addr(), .with_capacity(), or .with_default_capacity() to \
configure this builder"]
pub struct SpawnBuilderWithoutAddress<'a, A: Actor, F: FnOnce() -> A> {
system: &'a mut System,
factory: F,
Expand Down Expand Up @@ -631,7 +666,11 @@ impl SystemHandle {

match *system_state_lock {
SystemState::ShuttingDown | SystemState::Stopped => {
debug!("Thread [{}] called system.shutdown() but the system is already shutting down or stopped", current_thread_name);
debug!(
"Thread [{}] called system.shutdown() but the system is already shutting \
down or stopped",
current_thread_name
);
return Ok(());
},
SystemState::Running => {
Expand Down Expand Up @@ -710,26 +749,48 @@ impl SystemHandle {
/// Subscribe given `recipient` to events of type `E`. See [`Context::subscribe()`].
pub fn subscribe_recipient<M: 'static, E: Event + Into<M>>(&self, recipient: Recipient<M>) {
let mut event_subscribers = self.event_subscribers.write();
event_subscribers.subscribe_recipient::<M, E>(recipient);
}

let subs = event_subscribers.events.entry(TypeId::of::<E>()).or_default();
/// Subscribe given `recipient` to events of type `E` and send the last cached event to it.
/// See [`Context::subscribe_and_receive_latest()`].
pub fn subscribe_and_receive_latest<M: 'static, E: Event + Into<M>>(
&self,
recipient: Recipient<M>,
) -> Result<(), SendError> {
let mut event_subscribers = self.event_subscribers.write();

subs.push(Box::new(move |e| {
if let Some(event) = e.downcast_ref::<E>() {
let msg = event.clone();
recipient.send(msg.into())?;
// Send the last cached value if there is one. The cached event sending and adding ourselves
// to the subscriber list needs to be under the same write lock guard to avoid race
// conditions between this method and `publish()` (to guarantee exactly-once delivery).
if let Some(last_cached_value) = event_subscribers.last_value_cache.get(&TypeId::of::<E>())
{
if let Some(msg) = last_cached_value.downcast_ref::<E>() {
recipient.send(msg.clone().into())?;
}
}

Ok(())
}));
event_subscribers.subscribe_recipient::<M, E>(recipient);
Ok(())
}

/// Publish an event. All actors that have previously subscribed to the type will receive it.
///
/// The event will be also cached. Actors that will subscribe to the type in future may choose
/// to receive the last cached event upon subscription.
///
/// When sending to some subscriber fails, others are still tried and vec of errors is returned.
/// For direct, non-[`Clone`] or high-throughput messages please use [`Addr`] or [`Recipient`].
pub fn publish<E: Event>(&self, event: E) -> Result<(), PublishError> {
let event_subscribers = self.event_subscribers.read();
if let Some(subs) = event_subscribers.events.get(&TypeId::of::<E>()) {
let type_id = TypeId::of::<E>();

// This value update must be under the read lock (even if it would be possible to factor
// `last_value_cache` outside of the `RwLock`) to prevent race conditions between this and
// `subscribe_and_receive_latest()`.
event_subscribers.last_value_cache.insert(type_id, Box::new(event.clone()));

if let Some(subs) = event_subscribers.events.get(&type_id) {
let errors: Vec<SendError> = subs
.iter()
.filter_map(|subscriber_callback| subscriber_callback(&event).err())
Expand Down Expand Up @@ -1287,4 +1348,46 @@ mod tests {
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
);
}

#[test]
fn last_cached_event() {
impl Event for () {}

struct Subscriber;
impl Actor for Subscriber {
type Context = Context<Self::Message>;
type Error = ();
type Message = ();

fn started(&mut self, context: &mut Self::Context) {
context
.subscribe_and_receive_latest::<Self::Message>()
.expect("can receive last cached value");
}

fn handle(
&mut self,
context: &mut Self::Context,
_: Self::Message,
) -> Result<(), Self::Error> {
println!("Event received!");
context.system_handle.shutdown().unwrap();
Ok(())
}

fn name() -> &'static str {
"recipient"
}
}

let mut system = System::new("last cached event");
system.publish(()).expect("can publish event");

// This test will block indefinitely if the event isn't delivered.
system
.prepare(Subscriber)
.with_addr(Addr::with_capacity(1))
.run_and_block()
.expect("actor finishes successfully");
}
}
10 changes: 10 additions & 0 deletions src/timed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ impl<M> TimedContext<M> {
// The recipient() call allows conversion from M to TimedMessage<M>.
self.system_handle.subscribe_recipient::<M, E>(self.myself.recipient());
}

/// Subscribe current actor to event of type `E` and send the last cached event to it.
/// Events will be delivered as instant messages.
/// See [`crate::Context::subscribe()`].
pub fn subscribe_and_receive_latest<E: Event + Into<M>>(&self) -> Result<(), SendError>
where
M: 'static,
{
self.system_handle.subscribe_and_receive_latest::<M, E>(self.myself.recipient())
}
}

/// A wrapper around actors to add ability to receive delayed and recurring messages.
Expand Down

0 comments on commit 4ea3712

Please sign in to comment.