From a88a4e7ae1b55c7d2ac1ba8f7e6a942ed78fa568 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mat=C4=9Bj=20Laitl?= Date: Mon, 12 Aug 2024 22:58:28 +0200 Subject: [PATCH] Make Actor::started(), stopped() fallible --- benches/pub_sub.rs | 20 ++++++---------- examples/actor_wrapping.rs | 21 +++++++---------- examples/pub_sub_example.rs | 29 ++++++++--------------- examples/simple_timer.rs | 17 ++++---------- src/lib.rs | 47 +++++++++++++++++++++++-------------- src/timed.rs | 8 +++---- 6 files changed, 65 insertions(+), 77 deletions(-) diff --git a/benches/pub_sub.rs b/benches/pub_sub.rs index ea4e74b..5bf14a2 100644 --- a/benches/pub_sub.rs +++ b/benches/pub_sub.rs @@ -1,4 +1,4 @@ -use anyhow::Error; +use anyhow::{Error, Result}; use criterion::{criterion_group, criterion_main, Criterion}; use std::{ hint::black_box, @@ -36,11 +36,7 @@ impl Actor for PublisherActor { "PublisherActor" } - fn handle( - &mut self, - context: &mut Self::Context, - message: Self::Message, - ) -> Result<(), Self::Error> { + fn handle(&mut self, context: &mut Self::Context, message: Self::Message) -> Result<()> { match message { PublisherMessage::SubscriberStarted => { self.subscriber_count = self.subscriber_count.checked_sub(1).unwrap(); @@ -83,18 +79,16 @@ impl Actor for SubscriberActor { "SubscriberActor" } - fn started(&mut self, context: &mut Self::Context) { + fn started(&mut self, context: &mut Self::Context) -> Result<()> { context.subscribe::(); for publisher_addr in self.publisher_addrs.iter() { - publisher_addr.send(PublisherMessage::SubscriberStarted).unwrap(); + publisher_addr.send(PublisherMessage::SubscriberStarted)?; } + + Ok(()) } - fn handle( - &mut self, - _context: &mut Self::Context, - message: Self::Message, - ) -> Result<(), Self::Error> { + fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<()> { // This black_box has a nice side effect that it silences the 'field is never read' warning. black_box(message.0); Ok(()) diff --git a/examples/actor_wrapping.rs b/examples/actor_wrapping.rs index 766c4cb..d7ec67e 100644 --- a/examples/actor_wrapping.rs +++ b/examples/actor_wrapping.rs @@ -1,4 +1,4 @@ -use anyhow::Error; +use anyhow::{Error, Result}; use env_logger::Env; use log::debug; use std::time::{Duration, Instant}; @@ -34,12 +34,12 @@ impl Actor for LoggingAdapter { A::priority(message) } - fn started(&mut self, context: &mut Self::Context) { + fn started(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> { debug!("LoggingAdapter: started()"); self.inner.started(context) } - fn stopped(&mut self, context: &mut Self::Context) { + fn stopped(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> { debug!("LoggingAdapter: stopped()"); self.inner.stopped(context) } @@ -61,7 +61,7 @@ impl Actor for TestActor { type Error = Error; type Message = String; - fn handle(&mut self, context: &mut Self::Context, message: String) -> Result<(), Error> { + fn handle(&mut self, context: &mut Self::Context, message: String) -> Result<()> { println!("Got a message: {}. Shuting down.", message); context.system_handle.shutdown().map_err(Error::from) } @@ -70,20 +70,17 @@ impl Actor for TestActor { "TestActor" } - fn started(&mut self, context: &mut Self::Context) { - context.set_timeout(Some(Duration::from_millis(100))) + fn started(&mut self, context: &mut Self::Context) -> Result<()> { + context.set_timeout(Some(Duration::from_millis(100))); + Ok(()) } - fn deadline_passed( - &mut self, - context: &mut Self::Context, - deadline: Instant, - ) -> Result<(), Error> { + fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<()> { context.myself.send(format!("deadline was {:?}", deadline)).map_err(Error::from) } } -fn main() -> Result<(), Error> { +fn main() -> Result<()> { env_logger::Builder::from_env(Env::default().default_filter_or("debug")).init(); let mut system = System::new("Actor Wrapping Example"); diff --git a/examples/pub_sub_example.rs b/examples/pub_sub_example.rs index 392fe07..86eed4b 100644 --- a/examples/pub_sub_example.rs +++ b/examples/pub_sub_example.rs @@ -1,4 +1,4 @@ -use anyhow::Error; +use anyhow::{Error, Result}; use env_logger::Env; use std::time::{Duration, Instant}; use tonari_actor::{Actor, Context, Event, System}; @@ -40,9 +40,10 @@ impl Actor for PublisherActor { "PublisherActor" } - fn started(&mut self, context: &mut Self::Context) { + fn started(&mut self, context: &mut Self::Context) -> Result<()> { context.set_deadline(Some(self.started_at + Duration::from_millis(1500))); context.subscribe::(); + Ok(()) } fn handle( @@ -71,11 +72,7 @@ impl Actor for PublisherActor { Ok(()) } - fn deadline_passed( - &mut self, - context: &mut Self::Context, - deadline: Instant, - ) -> Result<(), Error> { + fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<()> { context.myself.send(PublisherMessage::Periodic)?; context.set_deadline(Some(deadline + Duration::from_secs(1))); Ok(()) @@ -104,15 +101,12 @@ impl Actor for SubscriberActor1 { "SubscriberActor1" } - fn started(&mut self, context: &mut Self::Context) { + fn started(&mut self, context: &mut Self::Context) -> Result<()> { context.subscribe::(); + Ok(()) } - fn handle( - &mut self, - _context: &mut Self::Context, - message: Self::Message, - ) -> Result<(), Self::Error> { + fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<()> { match message { SubscriberMessage::Text(text) => { println!("SubscriberActor1 got a text message: {:?}", text); @@ -132,15 +126,12 @@ impl Actor for SubscriberActor2 { "SubscriberActor1" } - fn started(&mut self, context: &mut Self::Context) { + fn started(&mut self, context: &mut Self::Context) -> Result<()> { context.subscribe::(); + Ok(()) } - fn handle( - &mut self, - _context: &mut Self::Context, - message: Self::Message, - ) -> Result<(), Self::Error> { + fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<()> { match message { SubscriberMessage::Text(text) => { println!("SubscriberActor2 got a text message: {:?}", text); diff --git a/examples/simple_timer.rs b/examples/simple_timer.rs index 8f8036a..c2813b8 100644 --- a/examples/simple_timer.rs +++ b/examples/simple_timer.rs @@ -1,4 +1,4 @@ -use anyhow::Error; +use anyhow::{Error, Result}; use env_logger::Env; use std::time::{Duration, Instant}; use tonari_actor::{Actor, Context, System}; @@ -27,24 +27,17 @@ impl Actor for TimerExampleActor { "TimerExampleActor" } - fn started(&mut self, context: &mut Self::Context) { + fn started(&mut self, context: &mut Self::Context) -> Result<()> { context.set_deadline(Some(self.started_at + Duration::from_millis(1500))); + Ok(()) } - fn handle( - &mut self, - _context: &mut Self::Context, - message: Self::Message, - ) -> Result<(), Self::Error> { + fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<()> { println!("Got a message: {:?} at {:?}", message, self.started_at.elapsed()); Ok(()) } - fn deadline_passed( - &mut self, - context: &mut Self::Context, - deadline: Instant, - ) -> Result<(), Error> { + fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<()> { context.myself.send(TimerMessage::Periodic)?; context.set_deadline(Some(deadline + Duration::from_secs(1))); Ok(()) diff --git a/src/lib.rs b/src/lib.rs index 7e636f5..b733855 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -495,7 +495,10 @@ impl System { .spawn(move || { let mut actor = factory(); - actor.started(&mut context); + if let Err(error) = actor.started(&mut context) { + Self::report_error_shutdown(&system_handle, A::name(), "started", error); + return; + } debug!("[{}] started actor: {}", system_handle.name, A::name()); Self::run_actor_select_loop(actor, addr, &mut context, &system_handle); @@ -535,9 +538,13 @@ impl System { self.handle.registry.lock().push(RegistryEntry::CurrentThread(addr.control_tx.clone())); - actor.started(&mut context); - debug!("[{}] started actor: {}", system_handle.name, A::name()); - Self::run_actor_select_loop(actor, addr, &mut context, system_handle); + match actor.started(&mut context) { + Ok(()) => { + debug!("[{}] started actor: {}", system_handle.name, A::name()); + Self::run_actor_select_loop(actor, addr, &mut context, system_handle); + }, + Err(error) => Self::report_error_shutdown(system_handle, A::name(), "started", error), + } // Wait for the system to shutdown before we exit, otherwise the process // would exit before the system is completely shutdown @@ -602,7 +609,10 @@ impl System { // Process the event. Returning ends actor loop, the normal operation is to fall through. match received { Received::Control(Control::Stop) => { - actor.stopped(context); + if let Err(error) = actor.stopped(context) { + // FWIW this should always hit the "while shutting down" variant. + Self::report_error_shutdown(system_handle, A::name(), "stopped", error); + } debug!("[{}] stopped actor: {}", system_handle.name, A::name()); return; }, @@ -885,10 +895,14 @@ pub trait Actor { } /// An optional callback when the Actor has been started. - fn started(&mut self, _context: &mut Self::Context) {} + fn started(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> { + Ok(()) + } /// An optional callback when the Actor has been stopped. - fn stopped(&mut self, _context: &mut Self::Context) {} + fn stopped(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> { + Ok(()) + } /// An optional callback when a deadline has passed. /// @@ -1129,16 +1143,17 @@ mod tests { fn handle(&mut self, _: &mut Self::Context, message: usize) -> Result<(), String> { println!("message: {}", message); - Ok(()) } - fn started(&mut self, _: &mut Self::Context) { + fn started(&mut self, _: &mut Self::Context) -> Result<(), String> { println!("started"); + Ok(()) } - fn stopped(&mut self, _: &mut Self::Context) { + fn stopped(&mut self, _: &mut Self::Context) -> Result<(), String> { println!("stopped"); + Ok(()) } } @@ -1193,8 +1208,8 @@ mod tests { } /// We just need this test to compile, not run. - fn started(&mut self, ctx: &mut Self::Context) { - ctx.system_handle.shutdown().unwrap(); + fn started(&mut self, ctx: &mut Self::Context) -> Result<(), String> { + ctx.system_handle.shutdown().map_err(|e| e.to_string()) } } @@ -1374,13 +1389,11 @@ mod tests { struct Subscriber; impl Actor for Subscriber { type Context = Context; - type Error = (); + type Error = String; type Message = (); - fn started(&mut self, context: &mut Self::Context) { - context - .subscribe_and_receive_latest::() - .expect("can receive last cached value"); + fn started(&mut self, context: &mut Self::Context) -> Result<(), String> { + context.subscribe_and_receive_latest::().map_err(|e| e.to_string()) } fn handle( diff --git a/src/timed.rs b/src/timed.rs index eceaf52..e8b8e77 100644 --- a/src/timed.rs +++ b/src/timed.rs @@ -216,11 +216,11 @@ impl, Message = M>> Actor } } - fn started(&mut self, context: &mut Self::Context) { + fn started(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> { self.inner.started(&mut TimedContext::from_context(context)) } - fn stopped(&mut self, context: &mut Self::Context) { + fn stopped(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> { self.inner.stopped(&mut TimedContext::from_context(context)) } @@ -314,7 +314,7 @@ mod tests { Ok(()) } - fn started(&mut self, context: &mut Self::Context) { + fn started(&mut self, context: &mut Self::Context) -> Result<(), String> { context .myself .send_recurring( @@ -322,7 +322,7 @@ mod tests { Instant::now() + Duration::from_millis(50), Duration::from_millis(100), ) - .unwrap() + .map_err(|e| e.to_string()) } }