Skip to content

Commit

Permalink
Fix/extend comments based on review + elsewhere
Browse files Browse the repository at this point in the history
  • Loading branch information
strohel committed Jan 16, 2024
1 parent 66fcd74 commit 6e473f3
Showing 1 changed file with 24 additions and 9 deletions.
33 changes: 24 additions & 9 deletions src/timed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,21 @@ use std::{

/// A message that can be enqueued now, at certain time and optionally repeatedly.
pub enum TimedMessage<M> {
Instant { message: M },
Delayed { message: M, enqueue_at: Instant },
Recurring { factory: Box<dyn FnMut() -> M + Send>, enqueue_at: Instant, interval: Duration },
/// Instant message `handle()`d by the wrapped actor right away.
Instant {
message: M,
},
/// Request to setup a delayed message. Goes to internal [`Timed`] wrapper queue and then gets
/// sent to ourselves as an `Instant` message at the specified time.
Delayed {
message: M,
enqueue_at: Instant,
},
Recurring {
factory: Box<dyn FnMut() -> M + Send>,
enqueue_at: Instant,
interval: Duration,
},
}

/// This implementation allows sending direct unwrapped messages to wrapped actors.
Expand Down Expand Up @@ -121,9 +133,10 @@ where

/// Process any pending messages in the internal queue, calling wrapped actor's `handle()`.
fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), SendError> {
// If the message on top of the queue is due, send it to the regular actor queue.
// No problem if there are multiple such messages, it's handle() will call process_queue()
// again.
// If the message on top of the queue is due, send it to ourselves as `Instant` to enqueue
// it in the regular actor queue.
// No problem if there are multiple such messages, the next Timed::handle() will call
// process_queue() again.
if self.queue.peek().map(|m| m.enqueue_at <= Instant::now()).unwrap_or(false) {
let item = self.queue.pop().expect("heap is non-empty, we have just peeked");

Expand Down Expand Up @@ -201,8 +214,10 @@ where
match message {
// Use underlying message priority for instant messages.
TimedMessage::Instant { message } => A::priority(message),
// Recurring and Delayed messages are only added to the queue when handled, and then go
// through actors priority inboxes again when actually enqueued.
// These priorities apply to the *set-up* of Delayed and Recurring messages and we
// want to handle that pronto.
// The resulting inner message then comes back as `Instant` and is prioritized per its
// underlying priority.
TimedMessage::Recurring { .. } | TimedMessage::Delayed { .. } => Priority::High,
}
}
Expand Down Expand Up @@ -303,7 +318,7 @@ mod tests {
context.myself.send_now(3).unwrap();
}

// Message 2 is a recurring one, sleep based on a paremeter.
// Message 2 is a recurring one, sleep based on a parameter.
if message == 2 {
thread::sleep(self.recurring_message_sleep);
}
Expand Down

0 comments on commit 6e473f3

Please sign in to comment.