Skip to content

Commit

Permalink
feat(Executor): Added spin API with exception handler.
Browse files Browse the repository at this point in the history
Signed-off-by: Janosch Machowinski <[email protected]>

# Conflicts:
#	rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
#	rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp
  • Loading branch information
Janosch Machowinski committed Apr 3, 2024
1 parent 3edeb90 commit 2a499bb
Show file tree
Hide file tree
Showing 16 changed files with 332 additions and 52 deletions.
24 changes: 24 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ class Executor
virtual void
spin() = 0;

/**
* \sa rclcpp::Executor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler can be called from multiple threads at the same time.
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
virtual void
spin(const std::function<void(const std::exception & e)> & exception_handler) = 0;

/// Add a callback group to an executor.
/**
* An executor can have zero or more callback groups which provide work during `spin` functions.
Expand Down Expand Up @@ -470,6 +482,18 @@ class Executor
void
execute_any_executable(AnyExecutable & any_exec);

/// Find the next available executable and do the work associated with it.
/**
* \param[in] any_exec Union structure that can hold any executable type (timer, subscription,
* service, client).
* \throws std::runtime_error if there is an issue triggering the guard condition
*/
RCLCPP_PUBLIC
void
execute_any_executable(
AnyExecutable & any_exec,
const std::function<void(const std::exception & e)> & exception_handler);

/// Run subscription executable.
/**
* Do necessary setup and tear-down as well as executing the subscription.
Expand Down
6 changes: 4 additions & 2 deletions rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class MultiThreadedExecutor : public rclcpp::Executor
*/
RCLCPP_PUBLIC
void
spin(std::function<void(const std::exception & e)> exception_handler);
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

RCLCPP_PUBLIC
size_t
Expand All @@ -88,7 +88,9 @@ class MultiThreadedExecutor : public rclcpp::Executor
protected:
RCLCPP_PUBLIC
void
run(size_t this_thread_number, std::function<void(const std::exception & e)> exception_handler);
run(
size_t this_thread_number,
const std::function<void(const std::exception & e)> & exception_handler);

private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)
Expand Down
11 changes: 11 additions & 0 deletions rclcpp/include/rclcpp/executors/single_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ class SingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

/**
* \sa rclcpp::SingleThreadedExecutor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

private:
RCLCPP_DISABLE_COPY(SingleThreadedExecutor)
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

/// Static executor implementation of spin.
/**
* \sa rclcpp::StaticSingleThreadedExecutor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

/// Static executor implementation of spin some
/**
* This non-blocking function will execute entities that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ class EventsExecutor : public rclcpp::Executor
void
spin() override;

/**
* \sa rclcpp::Executor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* @param exception_handler will be called for every exception in the processing threads
*
* The exception_handler can be called from multiple threads at the same time.
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

/// Events executor implementation of spin some
/**
* This non-blocking function will execute the timers and events
Expand Down
14 changes: 12 additions & 2 deletions rclcpp/include/rclcpp/experimental/timers_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,14 @@ class TimersManager
/**
* @brief Starts a thread that takes care of executing the timers stored in this object.
* Function will throw an error if the timers thread was already running.
*
* @param exception_handler if valid, the execution of the timer will be done in a try catch block,
* and any occurring exception will be passed to the given handler
*/
RCLCPP_PUBLIC
void start();
void start(
const std::function<void(const std::exception & e)> & exception_handler =
std::function<void(const std::exception & e)>());

/**
* @brief Stops the timers thread.
Expand Down Expand Up @@ -509,6 +514,11 @@ class TimersManager
*/
void run_timers();

/**
* @brief calls run_timers with a try catch block.
*/
void run_timers(const std::function<void(const std::exception & e)> & exception_handler);

/**
* @brief Get the amount of time before the next timer triggers.
* This function is not thread safe, acquire a mutex before calling it.
Expand All @@ -526,7 +536,7 @@ class TimersManager
* while keeping the heap correctly sorted.
* This function is not thread safe, acquire the timers_mutex_ before calling it.
*/
void execute_ready_timers_unsafe();
void execute_ready_timers_unsafe(std::function<void(const std::exception & e)> exception_handler);

// Callback to be called when timer is ready
std::function<void(const rclcpp::TimerBase *)> on_ready_callback_;
Expand Down
65 changes: 63 additions & 2 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
return;
}

assert(any_exec.callback_group);

if (any_exec.timer) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
Expand All @@ -403,11 +405,70 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
}

// Reset the callback_group, regardless of type
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
any_exec.callback_group->can_be_taken_from().store(true);
}

template<typename Function>
void execute_guarded(
const Function & fun,
const std::function<void(const std::exception & e)> & exception_handler)
{
try {
fun();
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"Exception while spinning : " << e.what());

exception_handler(e);
}
}

void
Executor::execute_any_executable(
AnyExecutable & any_exec,
const std::function<void(const std::exception & e)> & exception_handler)
{
if (!spinning.load()) {
return;
}

assert(any_exec.callback_group);

if (any_exec.timer) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
execute_guarded([&any_exec]() {execute_timer(any_exec.timer);}, exception_handler);
}
if (any_exec.subscription) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
execute_guarded(
[&any_exec]() {
execute_subscription(any_exec.subscription);
}, exception_handler);
}
if (any_exec.service) {
execute_guarded([&any_exec]() {execute_service(any_exec.service);}, exception_handler);
}
if (any_exec.client) {
execute_guarded([&any_exec]() {execute_client(any_exec.client);}, exception_handler);
}
if (any_exec.waitable) {
execute_guarded(
[&any_exec]() {
const std::shared_ptr<void> & const_data = any_exec.data;
any_exec.waitable->execute(const_data);
}, exception_handler);
}

// Reset the callback_group, regardless of type
any_exec.callback_group->can_be_taken_from().store(true);
}


template<typename Taker, typename Handler>
static
void
Expand Down
66 changes: 28 additions & 38 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ MultiThreadedExecutor::spin()

void

MultiThreadedExecutor::spin(std::function<void(const std::exception & e)> exception_handler)
MultiThreadedExecutor::spin(const std::function<void(const std::exception & e)> & exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
Expand Down Expand Up @@ -88,48 +88,38 @@ MultiThreadedExecutor::get_number_of_threads()
void
MultiThreadedExecutor::run(
size_t this_thread_number,
std::function<void(const std::exception & e)> exception_handler)
const std::function<void(const std::exception & e)> & exception_handler)
{
try {
(void)this_thread_number;
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{
std::lock_guard wait_lock{wait_mutex_};
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
(void)this_thread_number;
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{
std::lock_guard wait_lock{wait_mutex_};
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}
if (yield_before_execute_) {
std::this_thread::yield();
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
}
if (yield_before_execute_) {
std::this_thread::yield();
}

execute_any_executable(any_exec);

if (any_exec.callback_group &&
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
{
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group change: ") + ex.what());
}
}
execute_any_executable(any_exec, exception_handler);

// Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();
}
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"Exception while spinning : " << e.what());
// Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();

exception_handler(e);
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition from execute_any_executable: ") + ex.what());
}
}
}
22 changes: 22 additions & 0 deletions rclcpp/src/rclcpp/executors/single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,25 @@ SingleThreadedExecutor::spin()
}
}
}


void
SingleThreadedExecutor::spin(
const std::function<void(const std::exception & e)> & exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );

// Clear any previous result and rebuild the waitset
this->wait_result_.reset();
this->entities_need_rebuild_ = true;

while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_executable;
if (get_next_executable(any_executable)) {
execute_any_executable(any_executable, exception_handler);
}
}
}
32 changes: 32 additions & 0 deletions rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,38 @@ StaticSingleThreadedExecutor::spin()
}
}

void
StaticSingleThreadedExecutor::spin(
const std::function<void(const std::exception & e)> & exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );

while (rclcpp::ok(context_) && spinning.load()) {
// Get executables that are ready now
std::lock_guard<std::mutex> guard(mutex_);

auto wait_result = this->collect_and_wait(std::chrono::nanoseconds(-1));
if (wait_result.has_value()) {
try {
// Execute ready executables
this->execute_ready_executables(
current_collection_,
wait_result.value(),
false);
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"Exception while spinning : " << e.what());

exception_handler(e);
}
}
}
}

void
StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration)
{
Expand Down
Loading

0 comments on commit 2a499bb

Please sign in to comment.