Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(MultiThreadedExecutor): Added ability to handle exceptions from threads #2382

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ class Executor
virtual void
spin() = 0;

/**
* \sa rclcpp::Executor:spin() for more details
Comment on lines +85 to +86
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a "brief", with a /// ... line before the block comment.

* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use \ everywhere (some other @ have slipped through reviews), but we should tend towards the desired state when possible, which is \ everywhere. I'll fix these up in my version of this pr.

*
* The exception_handler can be called from multiple threads at the same time.
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
virtual void
spin(const std::function<void(const std::exception & e)> & exception_handler) = 0;
Comment on lines +85 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have liked to have had a non-virtual spin(... exception_handler = nullptr), that called a virtual and protected spin_impl(... exception_handler) = 0, so that we didn't need two signatures. It would be easier to document and wouldn't require every executor to implement two functions, one of which is pretty much always be boilerplate calling the other.

However, I don't know if I have time to do that before the freeze, so I won't block this on it, but I wanted to make the comment somewhere.


/// Add a callback group to an executor.
/**
* An executor can have zero or more callback groups which provide work during `spin` functions.
Expand Down Expand Up @@ -470,6 +482,18 @@ class Executor
void
execute_any_executable(AnyExecutable & any_exec);

/// Find the next available executable and do the work associated with it.
/**
* \param[in] any_exec Union structure that can hold any executable type (timer, subscription,
* service, client).
* \throws std::runtime_error if there is an issue triggering the guard condition
*/
RCLCPP_PUBLIC
void
execute_any_executable(
AnyExecutable & any_exec,
const std::function<void(const std::exception & e)> & exception_handler);

/// Run subscription executable.
/**
* Do necessary setup and tear-down as well as executing the subscription.
Expand Down
16 changes: 15 additions & 1 deletion rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,28 @@ class MultiThreadedExecutor : public rclcpp::Executor
void
spin() override;

/**
* \sa rclcpp::Executor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler can be called from multiple threads at the same time.
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

RCLCPP_PUBLIC
size_t
get_number_of_threads();

protected:
RCLCPP_PUBLIC
void
run(size_t this_thread_number);
run(
size_t this_thread_number,
const std::function<void(const std::exception & e)> & exception_handler);

private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)
Expand Down
12 changes: 12 additions & 0 deletions rclcpp/include/rclcpp/executors/single_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ class SingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

/// Single-threaded implementation of spin.
/**
* \sa rclcpp::SingleThreadedExecutor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

private:
RCLCPP_DISABLE_COPY(SingleThreadedExecutor)
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

/**
* \sa rclcpp::SingleThreadedExecutor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/RCLCPP_PUBLIC
virtual void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

/// Static executor implementation of spin some
/**
* This non-blocking function will execute entities that
Expand Down Expand Up @@ -125,6 +135,11 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
void
spin_once_impl(std::chrono::nanoseconds timeout) override;

void
spin_once_impl(
std::chrono::nanoseconds timeout,
const std::function<void(const std::exception & e)> & exception_handler);

std::optional<rclcpp::WaitResult<rclcpp::WaitSet>>
collect_and_wait(std::chrono::nanoseconds timeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ class EventsExecutor : public rclcpp::Executor
void
spin() override;

/**
* \sa rclcpp::Executor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler can be called from multiple threads at the same time.
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

/// Events executor implementation of spin some
/**
* This non-blocking function will execute the timers and events
Expand Down
14 changes: 12 additions & 2 deletions rclcpp/include/rclcpp/experimental/timers_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,14 @@ class TimersManager
/**
* @brief Starts a thread that takes care of executing the timers stored in this object.
* Function will throw an error if the timers thread was already running.
*
* @param exception_handler if valid, the execution of the timer will be done in a try catch block,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave this @, because this file uses that style, but the whole file should be updated (it mostly got a review pass for being in the experimental namespace.

* and any occurring exception will be passed to the given handler
*/
RCLCPP_PUBLIC
void start();
void start(
const std::function<void(const std::exception & e)> & exception_handler = std::function<void(
const std::exception & e)>());

/**
* @brief Stops the timers thread.
Expand Down Expand Up @@ -511,6 +516,11 @@ class TimersManager
*/
void run_timers();

/**
* @brief calls run_timers with a try catch block.
*/
void run_timers(const std::function<void(const std::exception & e)> & exception_handler);

/**
* @brief Get the amount of time before the next timer triggers.
* This function is not thread safe, acquire a mutex before calling it.
Expand All @@ -528,7 +538,7 @@ class TimersManager
* while keeping the heap correctly sorted.
* This function is not thread safe, acquire the timers_mutex_ before calling it.
*/
void execute_ready_timers_unsafe();
void execute_ready_timers_unsafe(std::function<void(const std::exception & e)> exception_handler);

// Callback to be called when timer is ready
std::function<void(const rclcpp::TimerBase *,
Expand Down
63 changes: 63 additions & 0 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,69 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
any_exec.callback_group->can_be_taken_from().store(true);
}

template<typename Function>
void execute_guarded(
const Function & fun,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: use full variable names

Suggested change
const Function & fun,
const Function & function,

const std::function<void(const std::exception & e)> & exception_handler)
{
try {
fun();
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"Exception while spinning : " << e.what());

exception_handler(e);
}
}

void
Executor::execute_any_executable(
AnyExecutable & any_exec,
const std::function<void(const std::exception & e)> & exception_handler)
{
if (!spinning.load()) {
return;
}

assert(
(void("cannot execute an AnyExecutable without a valid callback group"),
any_exec.callback_group));

if (any_exec.timer) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
execute_guarded([&any_exec]() {
execute_timer(any_exec.timer, any_exec.data);
}, exception_handler);
}
if (any_exec.subscription) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
execute_guarded(
[&any_exec]() {
execute_subscription(any_exec.subscription);
}, exception_handler);
}
if (any_exec.service) {
execute_guarded([&any_exec]() {execute_service(any_exec.service);}, exception_handler);
}
if (any_exec.client) {
execute_guarded([&any_exec]() {execute_client(any_exec.client);}, exception_handler);
}
if (any_exec.waitable) {
execute_guarded([&any_exec]() {
const std::shared_ptr<void> & const_data = any_exec.data;
any_exec.waitable->execute(const_data);
}, exception_handler);
}
// Reset the callback_group, regardless of type
any_exec.callback_group->can_be_taken_from().store(true);
}


template<typename Taker, typename Handler>
static
void
Expand Down
17 changes: 13 additions & 4 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}

void
MultiThreadedExecutor::spin()
{
spin([](const std::exception & e) {throw e;});
}

void

MultiThreadedExecutor::spin(const std::function<void(const std::exception & e)> & exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
Expand All @@ -61,12 +68,12 @@ MultiThreadedExecutor::spin()
{
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id, exception_handler);
threads.emplace_back(func);
}
}

run(thread_id);
run(thread_id, exception_handler);
for (auto & thread : threads) {
thread.join();
}
Expand All @@ -79,7 +86,9 @@ MultiThreadedExecutor::get_number_of_threads()
}

void
MultiThreadedExecutor::run(size_t this_thread_number)
MultiThreadedExecutor::run(
size_t this_thread_number,
const std::function<void(const std::exception & e)> & exception_handler)
{
(void)this_thread_number;
while (rclcpp::ok(this->context_) && spinning.load()) {
Expand All @@ -97,7 +106,7 @@ MultiThreadedExecutor::run(size_t this_thread_number)
std::this_thread::yield();
}

execute_any_executable(any_exec);
execute_any_executable(any_exec, exception_handler);

if (any_exec.callback_group &&
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
Expand Down
22 changes: 22 additions & 0 deletions rclcpp/src/rclcpp/executors/single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,25 @@ SingleThreadedExecutor::spin()
}
}
}


void
SingleThreadedExecutor::spin(
const std::function<void(const std::exception & e)> & exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );

// Clear any previous result and rebuild the waitset
this->wait_result_.reset();
this->entities_need_rebuild_ = true;

while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_executable;
if (get_next_executable(any_executable)) {
execute_any_executable(any_executable, exception_handler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ StaticSingleThreadedExecutor::spin()
}
}


void
StaticSingleThreadedExecutor::spin(
const std::function<void(const std::exception & e)> & exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );

// This is essentially the contents of the rclcpp::Executor::wait_for_work method,
// except we need to keep the wait result to reproduce the StaticSingleThreadedExecutor
// behavior.
while (rclcpp::ok(this->context_) && spinning.load()) {
this->spin_once_impl(std::chrono::nanoseconds(-1), exception_handler);
}
}

Comment on lines +46 to +63
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not copy-paste like this, we should have one call the other...

void
StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration)
{
Expand Down Expand Up @@ -97,12 +115,32 @@ StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_durati

void
StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
{
spin_once_impl(timeout, std::function<void(const std::exception & e)>());
}

void
StaticSingleThreadedExecutor::spin_once_impl(
std::chrono::nanoseconds timeout,
const std::function<void(const std::exception & e)> & exception_handler)
{
if (rclcpp::ok(context_) && spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
auto wait_result = this->collect_and_wait(timeout);
if (wait_result.has_value()) {
this->execute_ready_executables(current_collection_, wait_result.value(), true);
if(exception_handler) {
try {
this->execute_ready_executables(current_collection_, wait_result.value(), true);
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"Exception while spinning : " << e.what());

exception_handler(e);
}
} else {
this->execute_ready_executables(current_collection_, wait_result.value(), true);
}
}
}
}
Expand Down
Loading