diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 132b8150fe..7b58379fa2 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -82,6 +82,18 @@ class Executor virtual void spin() = 0; + /** + * \sa rclcpp::Executor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * @param exception_handler will be called for every exception in the processing threads + * + * The exception_handler can be called from multiple threads at the same time. + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */ + RCLCPP_PUBLIC + virtual void + spin(const std::function & exception_handler) = 0; + /// Add a callback group to an executor. /** * An executor can have zero or more callback groups which provide work during `spin` functions. @@ -470,6 +482,18 @@ class Executor void execute_any_executable(AnyExecutable & any_exec); + /// Find the next available executable and do the work associated with it. + /** + * \param[in] any_exec Union structure that can hold any executable type (timer, subscription, + * service, client). + * \throws std::runtime_error if there is an issue triggering the guard condition + */ + RCLCPP_PUBLIC + void + execute_any_executable( + AnyExecutable & any_exec, + const std::function & exception_handler); + /// Run subscription executable. /** * Do necessary setup and tear-down as well as executing the subscription. diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index b63ea2c41b..a75e7af8cc 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -79,7 +79,7 @@ class MultiThreadedExecutor : public rclcpp::Executor */ RCLCPP_PUBLIC void - spin(std::function exception_handler); + spin(const std::function & exception_handler) override; RCLCPP_PUBLIC size_t @@ -88,7 +88,9 @@ class MultiThreadedExecutor : public rclcpp::Executor protected: RCLCPP_PUBLIC void - run(size_t this_thread_number, std::function exception_handler); + run( + size_t this_thread_number, + const std::function & exception_handler); private: RCLCPP_DISABLE_COPY(MultiThreadedExecutor) diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index 9dc6dec57b..7b7d889547 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -65,6 +65,18 @@ class SingleThreadedExecutor : public rclcpp::Executor void spin() override; + /// Single-threaded implementation of spin. + /** + * \sa rclcpp::SingleThreadedExecutor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * @param exception_handler will be called for every exception in the processing threads + * + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */ + RCLCPP_PUBLIC + void + spin(const std::function & exception_handler) override; + private: RCLCPP_DISABLE_COPY(SingleThreadedExecutor) }; diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp index 6f22909caf..b35bbbad5c 100644 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -68,6 +68,16 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor void spin() override; + /** + * \sa rclcpp::SingleThreadedExecutor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * @param exception_handler will be called for every exception in the processing threads + * + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */RCLCPP_PUBLIC + virtual void + spin(const std::function & exception_handler) override; + /// Static executor implementation of spin some /** * This non-blocking function will execute entities that diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp index dd5b1ebe63..a028f0e7d9 100644 --- a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp @@ -92,6 +92,18 @@ class EventsExecutor : public rclcpp::Executor void spin() override; + /** + * \sa rclcpp::Executor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + * @param exception_handler will be called for every exception in the processing threads + * + * The exception_handler can be called from multiple threads at the same time. + * The exception_handler shall rethrow the exception it if wants to terminate the program. + */ + RCLCPP_PUBLIC + void + spin(const std::function & exception_handler) override; + /// Events executor implementation of spin some /** * This non-blocking function will execute the timers and events diff --git a/rclcpp/include/rclcpp/experimental/timers_manager.hpp b/rclcpp/include/rclcpp/experimental/timers_manager.hpp index af3337bfd6..23580f505b 100644 --- a/rclcpp/include/rclcpp/experimental/timers_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/timers_manager.hpp @@ -125,9 +125,14 @@ class TimersManager /** * @brief Starts a thread that takes care of executing the timers stored in this object. * Function will throw an error if the timers thread was already running. + * + * @param exception_handler if valid, the execution of the timer will be done in a try catch block, + * and any occurring exception will be passed to the given handler */ RCLCPP_PUBLIC - void start(); + void start( + const std::function & exception_handler = std::function()); /** * @brief Stops the timers thread. @@ -511,6 +516,11 @@ class TimersManager */ void run_timers(); + /** + * @brief calls run_timers with a try catch block. + */ + void run_timers(const std::function & exception_handler); + /** * @brief Get the amount of time before the next timer triggers. * This function is not thread safe, acquire a mutex before calling it. @@ -528,7 +538,7 @@ class TimersManager * while keeping the heap correctly sorted. * This function is not thread safe, acquire the timers_mutex_ before calling it. */ - void execute_ready_timers_unsafe(); + void execute_ready_timers_unsafe(std::function exception_handler); // Callback to be called when timer is ready std::function +void execute_guarded( + const Function & fun, + const std::function & exception_handler) +{ + try { + fun(); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning : " << e.what()); + + exception_handler(e); + } +} + +void +Executor::execute_any_executable( + AnyExecutable & any_exec, + const std::function & exception_handler) +{ + if (!spinning.load()) { + return; + } + + if (any_exec.timer) { + TRACETOOLS_TRACEPOINT( + rclcpp_executor_execute, + static_cast(any_exec.timer->get_timer_handle().get())); + execute_guarded([&any_exec]() { + execute_timer(any_exec.timer, any_exec.data); + }, exception_handler); + } + if (any_exec.subscription) { + TRACETOOLS_TRACEPOINT( + rclcpp_executor_execute, + static_cast(any_exec.subscription->get_subscription_handle().get())); + execute_guarded( + [&any_exec]() { + execute_subscription(any_exec.subscription); + }, exception_handler); + } + if (any_exec.service) { + execute_guarded([&any_exec]() {execute_service(any_exec.service);}, exception_handler); + } + if (any_exec.client) { + execute_guarded([&any_exec]() {execute_client(any_exec.client);}, exception_handler); + } + if (any_exec.waitable) { + execute_guarded([&any_exec]() { + any_exec.waitable->execute(static_cast>(any_exec.data)); + }, exception_handler); + } + // Reset the callback_group, regardless of type + any_exec.callback_group->can_be_taken_from().store(true); + // Wake the wait, because it may need to be recalculated or work that + // was previously blocked is now available. + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition from execute_any_executable: ") + ex.what()); + } +} + + template static void diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 7ae9dd2cad..67b0855b72 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -57,7 +57,7 @@ MultiThreadedExecutor::spin() void -MultiThreadedExecutor::spin(std::function exception_handler) +MultiThreadedExecutor::spin(const std::function & exception_handler) { if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); @@ -88,49 +88,40 @@ MultiThreadedExecutor::get_number_of_threads() void MultiThreadedExecutor::run( size_t this_thread_number, - std::function exception_handler) + const std::function & exception_handler) { - try { - - (void)this_thread_number; - while (rclcpp::ok(this->context_) && spinning.load()) { - rclcpp::AnyExecutable any_exec; - { - std::lock_guard wait_lock{wait_mutex_}; - if (!rclcpp::ok(this->context_) || !spinning.load()) { - return; - } - if (!get_next_executable(any_exec, next_exec_timeout_)) { - continue; - } + (void)this_thread_number; + while (rclcpp::ok(this->context_) && spinning.load()) { + rclcpp::AnyExecutable any_exec; + { + std::lock_guard wait_lock{wait_mutex_}; + if (!rclcpp::ok(this->context_) || !spinning.load()) { + return; } - if (yield_before_execute_) { - std::this_thread::yield(); + if (!get_next_executable(any_exec, next_exec_timeout_)) { + continue; } + } + if (yield_before_execute_) { + std::this_thread::yield(); + } - execute_any_executable(any_exec); - - if (any_exec.callback_group && - any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive) - { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group change: ") + ex.what()); - } + execute_any_executable(any_exec, exception_handler); + + if (any_exec.callback_group && + any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive) + { + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group change: ") + ex.what()); } - - // Clear the callback_group to prevent the AnyExecutable destructor from - // resetting the callback group `can_be_taken_from` - any_exec.callback_group.reset(); } - } catch (const std::exception & e) { - RCLCPP_ERROR_STREAM( - rclcpp::get_logger("rclcpp"), - "Exception while spinning : " << e.what()); - exception_handler(e); + // Clear the callback_group to prevent the AnyExecutable destructor from + // resetting the callback group `can_be_taken_from` + any_exec.callback_group.reset(); } } diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index 975733b497..5b56cdcf42 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -43,3 +43,20 @@ SingleThreadedExecutor::spin() } } } + + +void +SingleThreadedExecutor::spin( + const std::function & exception_handler) +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + while (rclcpp::ok(this->context_) && spinning.load()) { + rclcpp::AnyExecutable any_executable; + if (get_next_executable(any_executable)) { + execute_any_executable(any_executable, exception_handler); + } + } +} diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 2d837103e5..2d7d08a380 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -43,6 +43,33 @@ StaticSingleThreadedExecutor::spin() } } + +void +StaticSingleThreadedExecutor::spin( + const std::function & exception_handler) +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + if (rclcpp::ok(context_) && spinning.load()) { + std::lock_guard guard(mutex_); + auto wait_result = this->collect_and_wait(std::chrono::nanoseconds(-1)); + if (wait_result.has_value()) { + try { + this->execute_ready_executables(current_collection_, wait_result.value(), true); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning : " << e.what()); + + exception_handler(e); + } + } + } +} + void StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration) { diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index ce6a103ab2..9f91ca0827 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -126,6 +126,35 @@ EventsExecutor::spin() } } +void +EventsExecutor::spin(const std::function & exception_handler) +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + timers_manager_->start(exception_handler); + RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); ); + + while (rclcpp::ok(context_) && spinning.load()) { + // Wait until we get an event + ExecutorEvent event; + bool has_event = events_queue_->dequeue(event); + if (has_event) { + try { + this->execute_event(event); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning : " << e.what()); + + exception_handler(e); + } + } + } +} + void EventsExecutor::spin_some(std::chrono::nanoseconds max_duration) { diff --git a/rclcpp/src/rclcpp/experimental/timers_manager.cpp b/rclcpp/src/rclcpp/experimental/timers_manager.cpp index 2caa0a6b15..735fb4b9ef 100644 --- a/rclcpp/src/rclcpp/experimental/timers_manager.cpp +++ b/rclcpp/src/rclcpp/experimental/timers_manager.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "rcpputils/scope_exit.hpp" @@ -71,14 +72,18 @@ void TimersManager::add_timer(rclcpp::TimerBase::SharedPtr timer) } } -void TimersManager::start() +RCLCPP_PUBLIC +void TimersManager::start(const std::function & exception_handler) { // Make sure that the thread is not already running if (running_.exchange(true)) { throw std::runtime_error("TimersManager::start() can't start timers thread as already running"); } - timers_thread_ = std::thread(&TimersManager::run_timers, this); + timers_thread_ = std::thread( + [this, exception_handler]() { + run_timers(exception_handler); + }); } void TimersManager::stop() @@ -203,7 +208,8 @@ std::optional TimersManager::get_head_timeout_unsafe() return head_timer->time_until_trigger(); } -void TimersManager::execute_ready_timers_unsafe() +void TimersManager::execute_ready_timers_unsafe( + std::function exception_handler) { // We start by locking the timers TimersHeap locked_heap = weak_timers_heap_.validate_and_lock(); @@ -226,7 +232,19 @@ void TimersManager::execute_ready_timers_unsafe() if (on_ready_callback_) { on_ready_callback_(head_timer.get(), data); } else { - head_timer->execute_callback(data); + if (exception_handler) { + try { + head_timer->execute_callback(data); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "Exception while spinning : " << e.what()); + + exception_handler(e); + } + } else { + head_timer->execute_callback(data); + } } } else { // someone canceled the timer between is_ready and call @@ -245,7 +263,8 @@ void TimersManager::execute_ready_timers_unsafe() weak_timers_heap_.store(locked_heap); } -void TimersManager::run_timers() +void TimersManager::run_timers( + const std::function & exception_handler) { // Make sure the running flag is set to false when we exit from this function // to allow restarting the timers thread. @@ -289,7 +308,7 @@ void TimersManager::run_timers() timers_updated_ = false; // Execute timers - this->execute_ready_timers_unsafe(); + this->execute_ready_timers_unsafe(exception_handler); } } diff --git a/rclcpp/test/benchmark/benchmark_parameter_client.cpp b/rclcpp/test/benchmark/benchmark_parameter_client.cpp index 0d9a6fd9c9..b59c029dd0 100644 --- a/rclcpp/test/benchmark/benchmark_parameter_client.cpp +++ b/rclcpp/test/benchmark/benchmark_parameter_client.cpp @@ -46,7 +46,10 @@ class RemoteNodeTest : public benchmark::Fixture remote_node_name, rclcpp::NodeOptions().context(remote_context)); remote_executor->add_node(remote_node); - remote_thread = std::thread(&rclcpp::executors::SingleThreadedExecutor::spin, remote_executor); + remote_thread = + std::thread( + static_cast(&rclcpp:: + executors::SingleThreadedExecutor::spin), remote_executor); } void TearDown(benchmark::State &) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 1a538eaa30..cf654575e7 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -133,6 +133,52 @@ TYPED_TEST(TestExecutors, emptyExecutor) spinner.join(); } +TYPED_TEST(TestExecutors, catch_exception) { + using ExecutorType = TypeParam; + ExecutorType executor; + + std::shared_ptr node = + std::make_shared("test_executor_catch_exception"); + + const std::string test_reason = "test exception"; + + std::atomic_bool timer_executed_after_exception = false; + + size_t cnt = 0; + + auto timer = node->create_wall_timer( + std::chrono::milliseconds(1), [test_reason, &timer_executed_after_exception, &executor, &cnt]() + { + if (cnt == 0) { + cnt++; + throw std::runtime_error(test_reason); + } + + timer_executed_after_exception = true; + + executor.cancel(); + }); + + std::atomic_bool caught_exception = false; + + executor.add_node(node); + executor.spin( + [&caught_exception, &test_reason](const std::exception & e) + { + const std::runtime_error * runtime_error = dynamic_cast(&e); + ASSERT_NE(runtime_error, nullptr); + + ASSERT_EQ(runtime_error->what(), test_reason); + + caught_exception = true; + } + ); + + ASSERT_TRUE(caught_exception); + ASSERT_TRUE(timer_executed_after_exception); +} + + // Check executor throws properly if the same node is added a second time TYPED_TEST(TestExecutors, addNodeTwoExecutors) { diff --git a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp index b4729bde4c..b6fd2c3fda 100644 --- a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp +++ b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp @@ -96,48 +96,4 @@ TEST_F(TestMultiThreadedExecutor, timer_over_take) { auto timer = node->create_wall_timer(PERIOD_MS, timer_callback, cbg); executor.add_node(node); executor.spin(); - -} - -TEST_F(TestMultiThreadedExecutor, catch_exception) { - - rclcpp::executors::MultiThreadedExecutor executor; - - std::shared_ptr node = - std::make_shared("test_multi_threaded_executor_catch_exception"); - - const std::string test_reason = "test exception"; - - std::atomic_bool timer_executed_after_exception = false; - - auto timer = node->create_wall_timer( - std::chrono::milliseconds(1), [test_reason, &timer_executed_after_exception, &executor]() - { - static size_t cnt = 0; - if (cnt == 0) { - cnt++; - throw std::runtime_error(test_reason); - } - - timer_executed_after_exception = true; - executor.cancel(); - }); - - std::atomic_bool caught_exception = false; - - executor.add_node(node); - executor.spin( - [&caught_exception, &test_reason](const std::exception & e) - { - const std::runtime_error * runtime_error = dynamic_cast(&e); - ASSERT_NE(runtime_error, nullptr); - - ASSERT_EQ(runtime_error->what(), test_reason); - - caught_exception = true; - } - ); - - ASSERT_TRUE(caught_exception); - ASSERT_TRUE(timer_executed_after_exception); } diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index 668ab96797..63c15192f8 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -42,6 +42,10 @@ class DummyExecutor : public rclcpp::Executor { } + void spin(const std::function & exception_handler) override + { + } + void spin_nanoseconds(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { spin_node_once_nanoseconds(node, std::chrono::milliseconds(100));