From 4ea371215bb96852037ef0083c33e03a0391b37a Mon Sep 17 00:00:00 2001 From: Jen Tak Date: Tue, 13 Aug 2024 12:11:56 +0200 Subject: [PATCH] Cache last published event (#87) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Make pub_sub bench more realistic: use `String` payload instead of zero-sized-type The benchmark results are therefore not comparable before/after (they are 10% to 30% slower on my laptop). * Cache last published event and let subscribers receive it upon subscription. Closes https://github.com/tonarino/actor/issues/86 * Try to use dashmap This fixes the performance regression for me, but we need to think whether that's worth the dependency tradeoff. * Split `Event` and `CacheableEvent` With a small trick (that unfortunately leaks to a public type) we were able to implement a kind of specialization [1] with perfect behavior: `Event`s don't pay the price for caching, and it is not possible to subscribe_and_receive_latest() to bare `Event`s. Also, it means that we didn't have to add bounds to `Event` [2]. [1] I thought the orphan rule would prevent us from adding the blanked implementation, but not! [2] I've sneaked in the removal of `'static`, but that's mostly aesthetic: `std::any::Any` requires it anyway. * Extend pub_sub benchmark with CacheableEvent type * Use DashMap also for the subscriber callbacks map ...now that we have it in the dependency tree anyway. Allows us to remove the `RwLock` which in turn allows us to deduplicate subscribe_recipient() vs. subscribe_and_receive_latest(). Let's think about the possible situations of "publishing the first message" vs. "just subscribing and getting latest". * Drop `dashmap` and reinstate `RwLock` again, but try to lock efficiently * Deduplicate subscription code into EventSubscribers::subscribe_recipient() * Back to dashmap, but keep deduplication and extend comments * Update src/lib.rs Co-authored-by: Matěj Laitl --------- Co-authored-by: Matěj Laitl --- Cargo.toml | 1 + benches/pub_sub.rs | 17 ++++-- rustfmt.toml | 3 +- src/lib.rs | 125 +++++++++++++++++++++++++++++++++++++++++---- src/timed.rs | 10 ++++ 5 files changed, 140 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 872c92a..61eac38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ keywords = ["actor", "threads"] flume = { version = "0.10", default-features = false, features = ["select"] } log = "0.4" parking_lot = "0.12" +dashmap = "6" [dev-dependencies] anyhow = "1" diff --git a/benches/pub_sub.rs b/benches/pub_sub.rs index a1a55e3..ea4e74b 100644 --- a/benches/pub_sub.rs +++ b/benches/pub_sub.rs @@ -1,10 +1,17 @@ use anyhow::Error; use criterion::{criterion_group, criterion_main, Criterion}; -use std::time::{Duration, Instant}; +use std::{ + hint::black_box, + time::{Duration, Instant}, +}; use tonari_actor::{Actor, Context, Event, Recipient, System}; +const PAYLOAD: &str = "This is the payload that will be used in the test event. It should be of \ + reasonable size for a representative event, which is hard to determine. \ + But let's say 3 lines of text is fine for now."; + #[derive(Debug, Clone)] -struct StringEvent; +struct StringEvent(String); impl Event for StringEvent {} @@ -45,7 +52,7 @@ impl Actor for PublisherActor { PublisherMessage::PublishEvents => { let start = Instant::now(); for _i in 0..self.iterations { - context.system_handle.publish(StringEvent)?; + context.system_handle.publish(StringEvent(PAYLOAD.to_string()))?; } let elapsed = start.elapsed(); @@ -86,8 +93,10 @@ impl Actor for SubscriberActor { fn handle( &mut self, _context: &mut Self::Context, - _message: Self::Message, + message: Self::Message, ) -> Result<(), Self::Error> { + // This black_box has a nice side effect that it silences the 'field is never read' warning. + black_box(message.0); Ok(()) } } diff --git a/rustfmt.toml b/rustfmt.toml index bb3c633..8596bc0 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,4 +1,5 @@ -use_small_heuristics="Max" +format_strings = true +use_small_heuristics = "Max" imports_granularity = "Crate" match_block_trailing_comma = true reorder_impl_items = true diff --git a/src/lib.rs b/src/lib.rs index 1d24d1d..1fa978a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -226,11 +226,29 @@ enum SystemState { /// A marker trait for types which participate in the publish-subscribe system /// of the actor framework. -pub trait Event: Clone + std::any::Any + 'static {} +pub trait Event: Clone + std::any::Any + Send + Sync {} #[derive(Default)] struct EventSubscribers { events: HashMap>, + /// We cache the last published value of each event type. + /// Subscribers can request to receive it upon subscription. + /// Use a concurrent map type, benchmarks in #87 have shown that there is a contention on + /// updating the cached value otherwise. + last_value_cache: dashmap::DashMap>, +} + +impl EventSubscribers { + fn subscribe_recipient>(&mut self, recipient: Recipient) { + let subs = self.events.entry(TypeId::of::()).or_default(); + subs.push(Box::new(move |e| { + if let Some(event) = e.downcast_ref::() { + let msg = event.clone(); + recipient.send(msg.into())?; + } + Ok(()) + })); + } } /// Contains the "metadata" of the system, including information about the registry @@ -290,6 +308,22 @@ impl Context { { self.system_handle.subscribe_recipient::(self.myself.clone()); } + + /// Subscribe current actor to event of type `E` and send the last cached event to it. + /// This is part of the event system. You don't need to call this method to receive + /// direct messages sent using [`Addr`] and [`Recipient`]. + /// + /// Note that subscribing twice to the same event would result in duplicate events -- no + /// de-duplication of subscriptions is performed. + /// + /// This method may fail if it is not possible to send the latest event. In this case it is + /// guaranteed that the subscription did not take place. You can safely try again. + pub fn subscribe_and_receive_latest>(&self) -> Result<(), SendError> + where + M: 'static, + { + self.system_handle.subscribe_and_receive_latest::(self.myself.clone()) + } } /// Capacity of actor's normal- and high-priority inboxes. @@ -322,7 +356,8 @@ impl From for Capacity { /// A builder for configuring [`Actor`] spawning. /// You can specify your own [`Addr`] for the Actor, or let the system create /// a new address with either provided or default capacity. -#[must_use = "You must call .with_addr(), .with_capacity(), or .with_default_capacity() to configure this builder"] +#[must_use = "You must call .with_addr(), .with_capacity(), or .with_default_capacity() to \ + configure this builder"] pub struct SpawnBuilderWithoutAddress<'a, A: Actor, F: FnOnce() -> A> { system: &'a mut System, factory: F, @@ -631,7 +666,11 @@ impl SystemHandle { match *system_state_lock { SystemState::ShuttingDown | SystemState::Stopped => { - debug!("Thread [{}] called system.shutdown() but the system is already shutting down or stopped", current_thread_name); + debug!( + "Thread [{}] called system.shutdown() but the system is already shutting \ + down or stopped", + current_thread_name + ); return Ok(()); }, SystemState::Running => { @@ -710,26 +749,48 @@ impl SystemHandle { /// Subscribe given `recipient` to events of type `E`. See [`Context::subscribe()`]. pub fn subscribe_recipient>(&self, recipient: Recipient) { let mut event_subscribers = self.event_subscribers.write(); + event_subscribers.subscribe_recipient::(recipient); + } - let subs = event_subscribers.events.entry(TypeId::of::()).or_default(); + /// Subscribe given `recipient` to events of type `E` and send the last cached event to it. + /// See [`Context::subscribe_and_receive_latest()`]. + pub fn subscribe_and_receive_latest>( + &self, + recipient: Recipient, + ) -> Result<(), SendError> { + let mut event_subscribers = self.event_subscribers.write(); - subs.push(Box::new(move |e| { - if let Some(event) = e.downcast_ref::() { - let msg = event.clone(); - recipient.send(msg.into())?; + // Send the last cached value if there is one. The cached event sending and adding ourselves + // to the subscriber list needs to be under the same write lock guard to avoid race + // conditions between this method and `publish()` (to guarantee exactly-once delivery). + if let Some(last_cached_value) = event_subscribers.last_value_cache.get(&TypeId::of::()) + { + if let Some(msg) = last_cached_value.downcast_ref::() { + recipient.send(msg.clone().into())?; } + } - Ok(()) - })); + event_subscribers.subscribe_recipient::(recipient); + Ok(()) } /// Publish an event. All actors that have previously subscribed to the type will receive it. /// + /// The event will be also cached. Actors that will subscribe to the type in future may choose + /// to receive the last cached event upon subscription. + /// /// When sending to some subscriber fails, others are still tried and vec of errors is returned. /// For direct, non-[`Clone`] or high-throughput messages please use [`Addr`] or [`Recipient`]. pub fn publish(&self, event: E) -> Result<(), PublishError> { let event_subscribers = self.event_subscribers.read(); - if let Some(subs) = event_subscribers.events.get(&TypeId::of::()) { + let type_id = TypeId::of::(); + + // This value update must be under the read lock (even if it would be possible to factor + // `last_value_cache` outside of the `RwLock`) to prevent race conditions between this and + // `subscribe_and_receive_latest()`. + event_subscribers.last_value_cache.insert(type_id, Box::new(event.clone())); + + if let Some(subs) = event_subscribers.events.get(&type_id) { let errors: Vec = subs .iter() .filter_map(|subscriber_callback| subscriber_callback(&event).err()) @@ -1287,4 +1348,46 @@ mod tests { [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] ); } + + #[test] + fn last_cached_event() { + impl Event for () {} + + struct Subscriber; + impl Actor for Subscriber { + type Context = Context; + type Error = (); + type Message = (); + + fn started(&mut self, context: &mut Self::Context) { + context + .subscribe_and_receive_latest::() + .expect("can receive last cached value"); + } + + fn handle( + &mut self, + context: &mut Self::Context, + _: Self::Message, + ) -> Result<(), Self::Error> { + println!("Event received!"); + context.system_handle.shutdown().unwrap(); + Ok(()) + } + + fn name() -> &'static str { + "recipient" + } + } + + let mut system = System::new("last cached event"); + system.publish(()).expect("can publish event"); + + // This test will block indefinitely if the event isn't delivered. + system + .prepare(Subscriber) + .with_addr(Addr::with_capacity(1)) + .run_and_block() + .expect("actor finishes successfully"); + } } diff --git a/src/timed.rs b/src/timed.rs index 0911783..48d1a91 100644 --- a/src/timed.rs +++ b/src/timed.rs @@ -102,6 +102,16 @@ impl TimedContext { // The recipient() call allows conversion from M to TimedMessage. self.system_handle.subscribe_recipient::(self.myself.recipient()); } + + /// Subscribe current actor to event of type `E` and send the last cached event to it. + /// Events will be delivered as instant messages. + /// See [`crate::Context::subscribe()`]. + pub fn subscribe_and_receive_latest>(&self) -> Result<(), SendError> + where + M: 'static, + { + self.system_handle.subscribe_and_receive_latest::(self.myself.recipient()) + } } /// A wrapper around actors to add ability to receive delayed and recurring messages.