diff --git a/benches/benches.rs b/benches/benches.rs index c874921..c62aa8a 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -8,7 +8,7 @@ struct ChainLink { impl Actor for ChainLink { type Context = Context; - type Error = (); + type Error = String; type Message = u64; fn name() -> &'static str { diff --git a/src/lib.rs b/src/lib.rs index 1fa978a..2648315 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,14 +15,14 @@ //! struct TestActor {} //! impl Actor for TestActor { //! type Context = Context; -//! type Error = (); +//! type Error = String; //! type Message = usize; //! //! fn name() -> &'static str { //! "TestActor" //! } //! -//! fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<(), ()> { +//! fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<(), String> { //! println!("message: {}", message); //! //! Ok(()) @@ -92,13 +92,13 @@ impl fmt::Display for ActorError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { ActorError::SystemStopped { actor_name } => { - write!(f, "The system is not running. The actor {} can not be started.", actor_name) + write!(f, "the system is not running; the actor {} can not be started", actor_name) }, ActorError::SpawnFailed { actor_name } => { - write!(f, "Failed to spawn a thread for the actor {}.", actor_name) + write!(f, "failed to spawn a thread for the actor {}", actor_name) }, ActorError::ActorPanic => { - write!(f, "A panic inside an actor thread. See above for more verbose logs.") + write!(f, "panic inside an actor thread; see above for more verbose logs") }, } } @@ -125,7 +125,7 @@ impl fmt::Display for SendError { SendErrorReason::Full => { write!( f, - "The capacity of {}'s {:?}-priority channel is full.", + "the capacity of {}'s {:?}-priority channel is full", recipient_name, priority ) }, @@ -145,7 +145,7 @@ impl fmt::Display for PublishError { let error_strings: Vec = self.0.iter().map(ToString::to_string).collect(); write!( f, - "Failed to deliver an event to {} subscribers: {}.", + "failed to deliver an event to {} subscribers: {}", self.0.len(), error_strings.join(", ") ) @@ -165,7 +165,7 @@ pub struct DisconnectedError { impl fmt::Display for DisconnectedError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "The recipient of the message ({}) no longer exists.", self.recipient_name) + write!(f, "the recipient of the message ({}) no longer exists", self.recipient_name) } } @@ -608,35 +608,55 @@ impl System { }, Received::Message(msg) => { trace!("[{}] message received by {}", system_handle.name, A::name()); - if let Err(err) = actor.handle(context, msg) { - error!( - "[{}] {} handle error: {:?}, shutting down.", - system_handle.name, - A::name(), - err - ); - let _ = system_handle.shutdown(); - + if let Err(error) = actor.handle(context, msg) { + Self::report_error_shutdown(system_handle, A::name(), "handle", error); return; } }, Received::Timeout => { let deadline = context.receive_deadline.take().expect("implied by timeout"); - if let Err(err) = actor.deadline_passed(context, deadline) { - error!( - "[{}] {} deadline_passed error: {:?}, shutting down.", - system_handle.name, + if let Err(error) = actor.deadline_passed(context, deadline) { + Self::report_error_shutdown( + system_handle, A::name(), - err + "deadline_passed", + error, ); - let _ = system_handle.shutdown(); - return; } }, } } } + + fn report_error_shutdown( + system_handle: &SystemHandle, + actor_name: &str, + method_name: &str, + error: impl std::fmt::Display, + ) { + let is_running = match *system_handle.system_state.read() { + SystemState::Running => true, + SystemState::ShuttingDown | SystemState::Stopped => false, + }; + + let system_name = &system_handle.name; + + // Note that the system may have transitioned from running to stopping (but not the other + // way around) in the mean time. Slightly imprecise log and an extra no-op call is fine. + if is_running { + error!( + "[{system_name}] {actor_name} {method_name}() error: {error:#}. Shutting down the \ + actor system." + ); + let _ = system_handle.shutdown(); + } else { + warn!( + "[{system_name}] {actor_name} {method_name}() error while shutting down: \ + {error:#}. Ignoring." + ); + } + } } impl Drop for System { @@ -658,7 +678,6 @@ impl SystemHandle { pub fn shutdown(&self) -> Result<(), ActorError> { let current_thread = thread::current(); let current_thread_name = current_thread.name().unwrap_or("Unknown thread id"); - info!("Thread [{}] shutting down the actor system", current_thread_name); // Use an inner scope to prevent holding the lock for the duration of shutdown { @@ -667,24 +686,22 @@ impl SystemHandle { match *system_state_lock { SystemState::ShuttingDown | SystemState::Stopped => { debug!( - "Thread [{}] called system.shutdown() but the system is already shutting \ - down or stopped", - current_thread_name + "[{}] thread {} called system.shutdown() but the system is already \ + shutting down or stopped.", + self.name, current_thread_name, ); return Ok(()); }, SystemState::Running => { - debug!( - "Thread [{}] setting the system_state value to ShuttingDown", - current_thread_name + info!( + "[{}] thread {} shutting down the actor system.", + self.name, current_thread_name, ); *system_state_lock = SystemState::ShuttingDown; }, } } - info!("[{}] system shutting down.", self.name); - if let Some(callback) = self.callbacks.preshutdown.as_ref() { info!("[{}] calling pre-shutdown callback.", self.name); if let Err(err) = callback() { @@ -845,7 +862,7 @@ pub trait Actor { // 'static required to create trait object in Addr, https://stackoverflow.com/q/29740488/4345715 type Message: Send + 'static; /// The type to return on error in the handle method. - type Error: std::fmt::Debug; + type Error: std::fmt::Display; /// What kind of context this actor accepts. Usually [`Context`]. type Context; @@ -888,13 +905,13 @@ pub trait Actor { /// # struct TickingActor; /// impl Actor for TickingActor { /// # type Context = Context; - /// # type Error = (); + /// # type Error = String; /// # type Message = (); /// # fn name() -> &'static str { "TickingActor" } - /// # fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), ()> { Ok(()) } + /// # fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), String> { Ok(()) } /// // ... /// - /// fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<(), ()> { + /// fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<(), String> { /// // do_periodic_housekeeping(); /// /// // A: Schedule one second from now (even if delayed); drifting tick. @@ -1106,14 +1123,14 @@ mod tests { struct TestActor; impl Actor for TestActor { type Context = Context; - type Error = (); + type Error = String; type Message = usize; fn name() -> &'static str { "TestActor" } - fn handle(&mut self, _: &mut Self::Context, message: usize) -> Result<(), ()> { + fn handle(&mut self, _: &mut Self::Context, message: usize) -> Result<(), String> { println!("message: {}", message); Ok(()) @@ -1167,14 +1184,14 @@ mod tests { } impl Actor for LocalActor { type Context = Context; - type Error = (); + type Error = String; type Message = (); fn name() -> &'static str { "LocalActor" } - fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), ()> { + fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), String> { Ok(()) } @@ -1204,14 +1221,18 @@ mod tests { impl Actor for TimeoutActor { type Context = Context; - type Error = (); + type Error = String; type Message = Option; fn name() -> &'static str { "TimeoutActor" } - fn handle(&mut self, ctx: &mut Self::Context, msg: Self::Message) -> Result<(), ()> { + fn handle( + &mut self, + ctx: &mut Self::Context, + msg: Self::Message, + ) -> Result<(), String> { self.handle_count.fetch_add(1, Ordering::SeqCst); if msg.is_some() { ctx.receive_deadline = msg; @@ -1219,7 +1240,7 @@ mod tests { Ok(()) } - fn deadline_passed(&mut self, _: &mut Self::Context, _: Instant) -> Result<(), ()> { + fn deadline_passed(&mut self, _: &mut Self::Context, _: Instant) -> Result<(), String> { self.timeout_count.fetch_add(1, Ordering::SeqCst); Ok(()) } @@ -1272,7 +1293,7 @@ mod tests { let error = low_capacity_actor.send(123).unwrap_err(); assert_eq!( error.to_string(), - "The capacity of TestActor's Normal-priority channel is full." + "the capacity of TestActor's Normal-priority channel is full" ); assert_eq!( format!("{:?}", error), @@ -1282,7 +1303,7 @@ mod tests { system.shutdown().unwrap(); let error = stopped_actor.send(456usize).unwrap_err(); - assert_eq!(error.to_string(), "The recipient of the message (TestActor) no longer exists."); + assert_eq!(error.to_string(), "the recipient of the message (TestActor) no longer exists"); assert_eq!( format!("{:?}", error), r#"SendError { recipient_name: "TestActor", priority: Normal, reason: Disconnected }"# @@ -1299,7 +1320,7 @@ mod tests { impl Actor for PriorityActor { type Context = Context; - type Error = (); + type Error = String; type Message = usize; fn handle( @@ -1356,7 +1377,7 @@ mod tests { struct Subscriber; impl Actor for Subscriber { type Context = Context; - type Error = (); + type Error = String; type Message = (); fn started(&mut self, context: &mut Self::Context) { diff --git a/src/timed.rs b/src/timed.rs index 48d1a91..eceaf52 100644 --- a/src/timed.rs +++ b/src/timed.rs @@ -292,14 +292,14 @@ mod tests { impl Actor for TimedTestActor { type Context = TimedContext; - type Error = (); + type Error = String; type Message = usize; fn name() -> &'static str { "TimedTestActor" } - fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), ()> { + fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), String> { { let mut guard = self.received.lock().unwrap(); guard.push(message);