-
Notifications
You must be signed in to change notification settings - Fork 425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(MultiThreadedExecutor): Added ability to handle exceptions from threads #2382
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,6 +82,18 @@ class Executor | |
virtual void | ||
spin() = 0; | ||
|
||
/** | ||
* \sa rclcpp::Executor:spin() for more details | ||
* \throws std::runtime_error when spin() called while already spinning | ||
* @param exception_handler will be called for every exception in the processing threads | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't use |
||
* | ||
* The exception_handler can be called from multiple threads at the same time. | ||
* The exception_handler shall rethrow the exception it if wants to terminate the program. | ||
*/ | ||
RCLCPP_PUBLIC | ||
virtual void | ||
spin(const std::function<void(const std::exception & e)> & exception_handler) = 0; | ||
Comment on lines
+85
to
+95
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would have liked to have had a non-virtual However, I don't know if I have time to do that before the freeze, so I won't block this on it, but I wanted to make the comment somewhere. |
||
|
||
/// Add a callback group to an executor. | ||
/** | ||
* An executor can have zero or more callback groups which provide work during `spin` functions. | ||
|
@@ -470,6 +482,18 @@ class Executor | |
void | ||
execute_any_executable(AnyExecutable & any_exec); | ||
|
||
/// Find the next available executable and do the work associated with it. | ||
/** | ||
* \param[in] any_exec Union structure that can hold any executable type (timer, subscription, | ||
* service, client). | ||
* \throws std::runtime_error if there is an issue triggering the guard condition | ||
*/ | ||
RCLCPP_PUBLIC | ||
void | ||
execute_any_executable( | ||
AnyExecutable & any_exec, | ||
const std::function<void(const std::exception & e)> & exception_handler); | ||
|
||
/// Run subscription executable. | ||
/** | ||
* Do necessary setup and tear-down as well as executing the subscription. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,9 +125,14 @@ class TimersManager | |
/** | ||
* @brief Starts a thread that takes care of executing the timers stored in this object. | ||
* Function will throw an error if the timers thread was already running. | ||
* | ||
* @param exception_handler if valid, the execution of the timer will be done in a try catch block, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll leave this |
||
* and any occurring exception will be passed to the given handler | ||
*/ | ||
RCLCPP_PUBLIC | ||
void start(); | ||
void start( | ||
const std::function<void(const std::exception & e)> & exception_handler = std::function<void( | ||
const std::exception & e)>()); | ||
|
||
/** | ||
* @brief Stops the timers thread. | ||
|
@@ -511,6 +516,11 @@ class TimersManager | |
*/ | ||
void run_timers(); | ||
|
||
/** | ||
* @brief calls run_timers with a try catch block. | ||
*/ | ||
void run_timers(const std::function<void(const std::exception & e)> & exception_handler); | ||
|
||
/** | ||
* @brief Get the amount of time before the next timer triggers. | ||
* This function is not thread safe, acquire a mutex before calling it. | ||
|
@@ -528,7 +538,7 @@ class TimersManager | |
* while keeping the heap correctly sorted. | ||
* This function is not thread safe, acquire the timers_mutex_ before calling it. | ||
*/ | ||
void execute_ready_timers_unsafe(); | ||
void execute_ready_timers_unsafe(std::function<void(const std::exception & e)> exception_handler); | ||
|
||
// Callback to be called when timer is ready | ||
std::function<void(const rclcpp::TimerBase *, | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -395,6 +395,69 @@ Executor::execute_any_executable(AnyExecutable & any_exec) | |||||
any_exec.callback_group->can_be_taken_from().store(true); | ||||||
} | ||||||
|
||||||
template<typename Function> | ||||||
void execute_guarded( | ||||||
const Function & fun, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick: use full variable names
Suggested change
|
||||||
const std::function<void(const std::exception & e)> & exception_handler) | ||||||
{ | ||||||
try { | ||||||
fun(); | ||||||
} catch (const std::exception & e) { | ||||||
RCLCPP_ERROR_STREAM( | ||||||
rclcpp::get_logger("rclcpp"), | ||||||
"Exception while spinning : " << e.what()); | ||||||
|
||||||
exception_handler(e); | ||||||
} | ||||||
} | ||||||
|
||||||
void | ||||||
Executor::execute_any_executable( | ||||||
AnyExecutable & any_exec, | ||||||
const std::function<void(const std::exception & e)> & exception_handler) | ||||||
{ | ||||||
if (!spinning.load()) { | ||||||
return; | ||||||
} | ||||||
|
||||||
assert( | ||||||
(void("cannot execute an AnyExecutable without a valid callback group"), | ||||||
any_exec.callback_group)); | ||||||
|
||||||
if (any_exec.timer) { | ||||||
TRACETOOLS_TRACEPOINT( | ||||||
rclcpp_executor_execute, | ||||||
static_cast<const void *>(any_exec.timer->get_timer_handle().get())); | ||||||
execute_guarded([&any_exec]() { | ||||||
execute_timer(any_exec.timer, any_exec.data); | ||||||
}, exception_handler); | ||||||
} | ||||||
if (any_exec.subscription) { | ||||||
TRACETOOLS_TRACEPOINT( | ||||||
rclcpp_executor_execute, | ||||||
static_cast<const void *>(any_exec.subscription->get_subscription_handle().get())); | ||||||
execute_guarded( | ||||||
[&any_exec]() { | ||||||
execute_subscription(any_exec.subscription); | ||||||
}, exception_handler); | ||||||
} | ||||||
if (any_exec.service) { | ||||||
execute_guarded([&any_exec]() {execute_service(any_exec.service);}, exception_handler); | ||||||
} | ||||||
if (any_exec.client) { | ||||||
execute_guarded([&any_exec]() {execute_client(any_exec.client);}, exception_handler); | ||||||
} | ||||||
if (any_exec.waitable) { | ||||||
execute_guarded([&any_exec]() { | ||||||
const std::shared_ptr<void> & const_data = any_exec.data; | ||||||
any_exec.waitable->execute(const_data); | ||||||
}, exception_handler); | ||||||
} | ||||||
// Reset the callback_group, regardless of type | ||||||
any_exec.callback_group->can_be_taken_from().store(true); | ||||||
} | ||||||
|
||||||
|
||||||
template<typename Taker, typename Handler> | ||||||
static | ||||||
void | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,24 @@ StaticSingleThreadedExecutor::spin() | |
} | ||
} | ||
|
||
|
||
void | ||
StaticSingleThreadedExecutor::spin( | ||
const std::function<void(const std::exception & e)> & exception_handler) | ||
{ | ||
if (spinning.exchange(true)) { | ||
throw std::runtime_error("spin() called while already spinning"); | ||
} | ||
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); | ||
|
||
// This is essentially the contents of the rclcpp::Executor::wait_for_work method, | ||
// except we need to keep the wait result to reproduce the StaticSingleThreadedExecutor | ||
// behavior. | ||
while (rclcpp::ok(this->context_) && spinning.load()) { | ||
this->spin_once_impl(std::chrono::nanoseconds(-1), exception_handler); | ||
} | ||
} | ||
|
||
Comment on lines
+46
to
+63
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should not copy-paste like this, we should have one call the other... |
||
void | ||
StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration) | ||
{ | ||
|
@@ -97,12 +115,32 @@ StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_durati | |
|
||
void | ||
StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout) | ||
{ | ||
spin_once_impl(timeout, std::function<void(const std::exception & e)>()); | ||
} | ||
|
||
void | ||
StaticSingleThreadedExecutor::spin_once_impl( | ||
std::chrono::nanoseconds timeout, | ||
const std::function<void(const std::exception & e)> & exception_handler) | ||
{ | ||
if (rclcpp::ok(context_) && spinning.load()) { | ||
std::lock_guard<std::mutex> guard(mutex_); | ||
auto wait_result = this->collect_and_wait(timeout); | ||
if (wait_result.has_value()) { | ||
this->execute_ready_executables(current_collection_, wait_result.value(), true); | ||
if(exception_handler) { | ||
try { | ||
this->execute_ready_executables(current_collection_, wait_result.value(), true); | ||
} catch (const std::exception & e) { | ||
RCLCPP_ERROR_STREAM( | ||
rclcpp::get_logger("rclcpp"), | ||
"Exception while spinning : " << e.what()); | ||
|
||
exception_handler(e); | ||
} | ||
} else { | ||
this->execute_ready_executables(current_collection_, wait_result.value(), true); | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a "brief", with a
/// ...
line before the block comment.