Skip to content

Commit

Permalink
Modified the conventional executors (single/multithreaded) to use the…
Browse files Browse the repository at this point in the history
… infrastructure

proposed in the REP-2017 below.

ros-infrastructure/rep#385

Signed-off-by: Shoji Morita <[email protected]>
  • Loading branch information
smorita-esol committed Dec 14, 2023
1 parent 0e9616a commit 35af6fa
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 66 deletions.
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/executor_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef RCLCPP__EXECUTOR_OPTIONS_HPP_
#define RCLCPP__EXECUTOR_OPTIONS_HPP_

#include <string>

#include "rclcpp/context.hpp"
#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/memory_strategies.hpp"
Expand All @@ -36,6 +38,7 @@ struct ExecutorOptions
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy;
rclcpp::Context::SharedPtr context;
size_t max_conditions;
std::string name;
};

} // namespace rclcpp
Expand Down
38 changes: 36 additions & 2 deletions rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
#include <vector>
#include <set>
#include <unordered_map>
#include <optional>

#include "rcutils/thread_attr.h"
#include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rcpputils/thread/thread_attribute.hpp"

namespace rclcpp
{
Expand Down Expand Up @@ -59,6 +60,21 @@ class MultiThreadedExecutor : public rclcpp::Executor
bool yield_before_execute = false,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

RCLCPP_PUBLIC
explicit MultiThreadedExecutor(
size_t number_of_threads,
const rcpputils::ThreadAttribute & thread_attr,
bool yield_before_execute = false,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

RCLCPP_PUBLIC
explicit MultiThreadedExecutor(
const rclcpp::ExecutorOptions & options,
size_t number_of_threads,
const rcpputils::ThreadAttribute & thread_attr,
bool yield_before_execute = false,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

RCLCPP_PUBLIC
virtual ~MultiThreadedExecutor();

Expand All @@ -74,6 +90,16 @@ class MultiThreadedExecutor : public rclcpp::Executor
size_t
get_number_of_threads();

RCLCPP_PUBLIC
const std::optional<rcpputils::ThreadAttribute> &
get_thread_attribute() const
{
return thread_attr_;
}

RCLCPP_PUBLIC
static const char default_name[];

protected:
RCLCPP_PUBLIC
void
Expand All @@ -82,11 +108,19 @@ class MultiThreadedExecutor : public rclcpp::Executor
private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

RCLCPP_PUBLIC
explicit MultiThreadedExecutor(
const rclcpp::ExecutorOptions & options,
size_t number_of_threads,
std::optional<rcpputils::ThreadAttribute> thread_attr,
bool yield_before_execute,
std::chrono::nanoseconds timeout);

std::mutex wait_mutex_;
size_t number_of_threads_;
std::optional<rcpputils::ThreadAttribute> thread_attr_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;
rcutils_thread_attrs_t * thread_attributes_;
};

} // namespace executors
Expand Down
27 changes: 25 additions & 2 deletions rclcpp/include/rclcpp/executors/single_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
#include <cstdlib>
#include <memory>
#include <vector>
#include <optional>

#include "rcutils/thread_attr.h"
#include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/rate.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rcpputils/thread/thread_attribute.hpp"

namespace rclcpp
{
Expand All @@ -50,6 +51,11 @@ class SingleThreadedExecutor : public rclcpp::Executor
explicit SingleThreadedExecutor(
const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions());

RCLCPP_PUBLIC
explicit SingleThreadedExecutor(
const rclcpp::ExecutorOptions & options,
const rcpputils::ThreadAttribute & thread_attr);

/// Default destructor.
RCLCPP_PUBLIC
virtual ~SingleThreadedExecutor();
Expand All @@ -66,14 +72,31 @@ class SingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

RCLCPP_PUBLIC
bool has_thread_attribute() const
{
return thread_attr_.has_value();
}

RCLCPP_PUBLIC
const std::optional<rcpputils::ThreadAttribute> &
get_thread_attribute() const
{
return thread_attr_;
}

RCLCPP_PUBLIC
static const char default_name[];

protected:
RCLCPP_PUBLIC
void
run();

private:
RCLCPP_DISABLE_COPY(SingleThreadedExecutor)
rcutils_thread_attrs_t * thread_attributes_;

std::optional<rcpputils::ThreadAttribute> thread_attr_;
};

} // namespace executors
Expand Down
122 changes: 83 additions & 39 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,70 @@

#include "rclcpp/executors/multi_threaded_executor.hpp"

#include <algorithm>
#include <chrono>
#include <functional>
#include <memory>
#include <optional>
#include <vector>

#include "rcpputils/scope_exit.hpp"
#include "rcpputils/threads.hpp"
#include "rcpputils/thread.hpp"

#include "rclcpp/logging.hpp"
#include "rclcpp/utilities.hpp"

using rclcpp::executors::MultiThreadedExecutor;

const char MultiThreadedExecutor::default_name[] = "RCLCPP_EXECUTOR_MULTI_THREADED";

static std::optional<rcpputils::ThreadAttribute>
default_thread_attr(const rclcpp::ExecutorOptions & options);

MultiThreadedExecutor::MultiThreadedExecutor(
const rclcpp::ExecutorOptions & options,
size_t number_of_threads,
bool yield_before_execute,
std::chrono::nanoseconds next_exec_timeout)
: MultiThreadedExecutor(
options, number_of_threads, default_thread_attr(options),
yield_before_execute, next_exec_timeout) {}

MultiThreadedExecutor::MultiThreadedExecutor(
size_t number_of_threads,
rcpputils::ThreadAttribute const & thread_attr,
bool yield_before_execute,
std::chrono::nanoseconds next_exec_timeout)
: MultiThreadedExecutor(
rclcpp::ExecutorOptions(), number_of_threads, std::optional(thread_attr),
yield_before_execute, next_exec_timeout) {}

MultiThreadedExecutor::MultiThreadedExecutor(
const rclcpp::ExecutorOptions & options,
size_t number_of_threads,
rcpputils::ThreadAttribute const & thread_attr,
bool yield_before_execute,
std::chrono::nanoseconds next_exec_timeout)
: MultiThreadedExecutor(
options, number_of_threads, std::optional(thread_attr),
yield_before_execute, next_exec_timeout) {}

MultiThreadedExecutor::MultiThreadedExecutor(
const rclcpp::ExecutorOptions & options,
size_t number_of_threads,
std::optional<rcpputils::ThreadAttribute> thread_attr,
bool yield_before_execute,
std::chrono::nanoseconds next_exec_timeout)
: rclcpp::Executor(options),
thread_attr_(std::move(thread_attr)),
yield_before_execute_(yield_before_execute),
next_exec_timeout_(next_exec_timeout),
thread_attributes_(nullptr)
next_exec_timeout_(next_exec_timeout)
{
bool has_number_of_threads_arg = number_of_threads > 0;

number_of_threads_ = number_of_threads > 0 ?
number_of_threads :
std::max(rcpputils::Thread::hardware_concurrency(), 2U);

if (rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs(
options.context->get_rcl_context().get()))
{
thread_attributes_ = attrs;
}

if (has_number_of_threads_arg && thread_attributes_ &&
thread_attributes_->num_attributes != number_of_threads)
{
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"The number of threads argument passed to the MultiThreadedExecutor"
" is different from the number of thread attributes.\n"
"The executor runs using the thread attributes and ignores the former.");
} else if (number_of_threads_ == 1) {
if (number_of_threads_ == 1) {
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"MultiThreadedExecutor is used with a single thread.\n"
Expand All @@ -77,21 +97,12 @@ MultiThreadedExecutor::spin()
std::vector<rcpputils::Thread> threads;
size_t thread_id = 0;

if (thread_attributes_) {
rcpputils::Thread::Attribute thread_attr;
{
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < thread_attributes_->num_attributes - 1; ++thread_id) {
thread_attr.set_thread_attribute(
thread_attributes_->attributes[thread_id]);
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(rcpputils::Thread(thread_attr, func));
}
if (thread_attr_) {
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < number_of_threads_; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(thread_attr_.value(), func);
}
thread_attr.set_thread_attribute(
thread_attributes_->attributes[thread_id]);
rcpputils::this_thread::run_with_thread_attribute(
thread_attr, &MultiThreadedExecutor::run, this, thread_id);
} else {
{
std::lock_guard wait_lock{wait_mutex_};
Expand All @@ -111,11 +122,7 @@ MultiThreadedExecutor::spin()
size_t
MultiThreadedExecutor::get_number_of_threads()
{
if (thread_attributes_) {
return thread_attributes_->num_attributes;
} else {
return number_of_threads_;
}
return number_of_threads_;
}

void
Expand Down Expand Up @@ -144,3 +151,40 @@ MultiThreadedExecutor::run(size_t this_thread_number)
any_exec.callback_group.reset();
}
}

std::optional<rcpputils::ThreadAttribute>
default_thread_attr(rclcpp::ExecutorOptions const & options)
{
const rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs(
options.context->get_rcl_context().get());
if (!attrs) {
return std::nullopt;
}

std::string name;
bool name_specified = !options.name.empty();
if (name_specified) {
name = options.name;
} else {
name = MultiThreadedExecutor::default_name;
}

const rcutils_thread_attr_t * attrs_beg = attrs->attributes;
const rcutils_thread_attr_t * attrs_end = attrs->attributes + attrs->num_attributes;
const rcutils_thread_attr_t * attr = std::find_if(
attrs_beg, attrs_end,
[&](const auto & attr) {
return attr.name == name;
});
if (attr != attrs_end) {
return rcpputils::ThreadAttribute(*attr);
} else {
if (name_specified) {
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"MultiThreadedExecutor is named \"%s\", but not found corresponding thread attribute.",
name.c_str());
}
return std::nullopt;
}
}
Loading

0 comments on commit 35af6fa

Please sign in to comment.