From 6e473f36050bbc7b31a0e9c481a2fbb9831e6b0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mat=C4=9Bj=20Laitl?= Date: Tue, 16 Jan 2024 22:46:42 +0100 Subject: [PATCH] Fix/extend comments based on review + elsewhere --- src/timed.rs | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/src/timed.rs b/src/timed.rs index 46e6ca2..0d2bb21 100644 --- a/src/timed.rs +++ b/src/timed.rs @@ -26,9 +26,21 @@ use std::{ /// A message that can be enqueued now, at certain time and optionally repeatedly. pub enum TimedMessage { - Instant { message: M }, - Delayed { message: M, enqueue_at: Instant }, - Recurring { factory: Box M + Send>, enqueue_at: Instant, interval: Duration }, + /// Instant message `handle()`d by the wrapped actor right away. + Instant { + message: M, + }, + /// Request to setup a delayed message. Goes to internal [`Timed`] wrapper queue and then gets + /// sent to ourselves as an `Instant` message at the specified time. + Delayed { + message: M, + enqueue_at: Instant, + }, + Recurring { + factory: Box M + Send>, + enqueue_at: Instant, + interval: Duration, + }, } /// This implementation allows sending direct unwrapped messages to wrapped actors. @@ -121,9 +133,10 @@ where /// Process any pending messages in the internal queue, calling wrapped actor's `handle()`. fn process_queue(&mut self, context: &mut ::Context) -> Result<(), SendError> { - // If the message on top of the queue is due, send it to the regular actor queue. - // No problem if there are multiple such messages, it's handle() will call process_queue() - // again. + // If the message on top of the queue is due, send it to ourselves as `Instant` to enqueue + // it in the regular actor queue. + // No problem if there are multiple such messages, the next Timed::handle() will call + // process_queue() again. if self.queue.peek().map(|m| m.enqueue_at <= Instant::now()).unwrap_or(false) { let item = self.queue.pop().expect("heap is non-empty, we have just peeked"); @@ -201,8 +214,10 @@ where match message { // Use underlying message priority for instant messages. TimedMessage::Instant { message } => A::priority(message), - // Recurring and Delayed messages are only added to the queue when handled, and then go - // through actors priority inboxes again when actually enqueued. + // These priorities apply to the *set-up* of Delayed and Recurring messages and we + // want to handle that pronto. + // The resulting inner message then comes back as `Instant` and is prioritized per its + // underlying priority. TimedMessage::Recurring { .. } | TimedMessage::Delayed { .. } => Priority::High, } } @@ -303,7 +318,7 @@ mod tests { context.myself.send_now(3).unwrap(); } - // Message 2 is a recurring one, sleep based on a paremeter. + // Message 2 is a recurring one, sleep based on a parameter. if message == 2 { thread::sleep(self.recurring_message_sleep); }