diff --git a/rclcpp/include/rclcpp/executor_options.hpp b/rclcpp/include/rclcpp/executor_options.hpp index 066f29eb4e..54948d618d 100644 --- a/rclcpp/include/rclcpp/executor_options.hpp +++ b/rclcpp/include/rclcpp/executor_options.hpp @@ -15,6 +15,8 @@ #ifndef RCLCPP__EXECUTOR_OPTIONS_HPP_ #define RCLCPP__EXECUTOR_OPTIONS_HPP_ +#include + #include "rclcpp/context.hpp" #include "rclcpp/contexts/default_context.hpp" #include "rclcpp/memory_strategies.hpp" @@ -36,6 +38,7 @@ struct ExecutorOptions rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy; rclcpp::Context::SharedPtr context; size_t max_conditions; + std::string name; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 9ec7ace383..88dd303b8d 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -21,12 +21,13 @@ #include #include #include +#include -#include "rcutils/thread_attr.h" #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcpputils/thread/thread_attribute.hpp" namespace rclcpp { @@ -59,6 +60,21 @@ class MultiThreadedExecutor : public rclcpp::Executor bool yield_before_execute = false, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + RCLCPP_PUBLIC + explicit MultiThreadedExecutor( + size_t number_of_threads, + const rcpputils::ThreadAttribute & thread_attr, + bool yield_before_execute = false, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + RCLCPP_PUBLIC + explicit MultiThreadedExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + const rcpputils::ThreadAttribute & thread_attr, + bool yield_before_execute = false, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + RCLCPP_PUBLIC virtual ~MultiThreadedExecutor(); @@ -74,6 +90,16 @@ class MultiThreadedExecutor : public rclcpp::Executor size_t get_number_of_threads(); + RCLCPP_PUBLIC + const std::optional & + get_thread_attribute() const + { + return thread_attr_; + } + + RCLCPP_PUBLIC + static const char default_name[]; + protected: RCLCPP_PUBLIC void @@ -82,11 +108,19 @@ class MultiThreadedExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(MultiThreadedExecutor) + RCLCPP_PUBLIC + explicit MultiThreadedExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + std::optional thread_attr, + bool yield_before_execute, + std::chrono::nanoseconds timeout); + std::mutex wait_mutex_; size_t number_of_threads_; + std::optional thread_attr_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; - rcutils_thread_attrs_t * thread_attributes_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index 95c2818a61..819587052f 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -21,8 +21,8 @@ #include #include #include +#include -#include "rcutils/thread_attr.h" #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -30,6 +30,7 @@ #include "rclcpp/utilities.hpp" #include "rclcpp/rate.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcpputils/thread/thread_attribute.hpp" namespace rclcpp { @@ -50,6 +51,11 @@ class SingleThreadedExecutor : public rclcpp::Executor explicit SingleThreadedExecutor( const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); + RCLCPP_PUBLIC + explicit SingleThreadedExecutor( + const rclcpp::ExecutorOptions & options, + const rcpputils::ThreadAttribute & thread_attr); + /// Default destructor. RCLCPP_PUBLIC virtual ~SingleThreadedExecutor(); @@ -66,6 +72,22 @@ class SingleThreadedExecutor : public rclcpp::Executor void spin() override; + RCLCPP_PUBLIC + bool has_thread_attribute() const + { + return thread_attr_.has_value(); + } + + RCLCPP_PUBLIC + const std::optional & + get_thread_attribute() const + { + return thread_attr_; + } + + RCLCPP_PUBLIC + static const char default_name[]; + protected: RCLCPP_PUBLIC void @@ -73,7 +95,8 @@ class SingleThreadedExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(SingleThreadedExecutor) - rcutils_thread_attrs_t * thread_attributes_; + + std::optional thread_attr_; }; } // namespace executors diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 079f5ddb5b..8e61746f93 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -14,50 +14,70 @@ #include "rclcpp/executors/multi_threaded_executor.hpp" +#include #include #include #include +#include #include #include "rcpputils/scope_exit.hpp" -#include "rcpputils/threads.hpp" +#include "rcpputils/thread.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/utilities.hpp" using rclcpp::executors::MultiThreadedExecutor; +const char MultiThreadedExecutor::default_name[] = "RCLCPP_EXECUTOR_MULTI_THREADED"; + +static std::optional +default_thread_attr(const rclcpp::ExecutorOptions & options); + MultiThreadedExecutor::MultiThreadedExecutor( const rclcpp::ExecutorOptions & options, size_t number_of_threads, bool yield_before_execute, std::chrono::nanoseconds next_exec_timeout) +: MultiThreadedExecutor( + options, number_of_threads, default_thread_attr(options), + yield_before_execute, next_exec_timeout) {} + +MultiThreadedExecutor::MultiThreadedExecutor( + size_t number_of_threads, + rcpputils::ThreadAttribute const & thread_attr, + bool yield_before_execute, + std::chrono::nanoseconds next_exec_timeout) +: MultiThreadedExecutor( + rclcpp::ExecutorOptions(), number_of_threads, std::optional(thread_attr), + yield_before_execute, next_exec_timeout) {} + +MultiThreadedExecutor::MultiThreadedExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + rcpputils::ThreadAttribute const & thread_attr, + bool yield_before_execute, + std::chrono::nanoseconds next_exec_timeout) +: MultiThreadedExecutor( + options, number_of_threads, std::optional(thread_attr), + yield_before_execute, next_exec_timeout) {} + +MultiThreadedExecutor::MultiThreadedExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + std::optional thread_attr, + bool yield_before_execute, + std::chrono::nanoseconds next_exec_timeout) : rclcpp::Executor(options), + thread_attr_(std::move(thread_attr)), yield_before_execute_(yield_before_execute), - next_exec_timeout_(next_exec_timeout), - thread_attributes_(nullptr) + next_exec_timeout_(next_exec_timeout) { - bool has_number_of_threads_arg = number_of_threads > 0; - number_of_threads_ = number_of_threads > 0 ? number_of_threads : std::max(rcpputils::Thread::hardware_concurrency(), 2U); - if (rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs( - options.context->get_rcl_context().get())) - { - thread_attributes_ = attrs; - } - - if (has_number_of_threads_arg && thread_attributes_ && - thread_attributes_->num_attributes != number_of_threads) - { - RCLCPP_WARN( - rclcpp::get_logger("rclcpp"), - "The number of threads argument passed to the MultiThreadedExecutor" - " is different from the number of thread attributes.\n" - "The executor runs using the thread attributes and ignores the former."); - } else if (number_of_threads_ == 1) { + if (number_of_threads_ == 1) { RCLCPP_WARN( rclcpp::get_logger("rclcpp"), "MultiThreadedExecutor is used with a single thread.\n" @@ -77,21 +97,12 @@ MultiThreadedExecutor::spin() std::vector threads; size_t thread_id = 0; - if (thread_attributes_) { - rcpputils::Thread::Attribute thread_attr; - { - std::lock_guard wait_lock{wait_mutex_}; - for (; thread_id < thread_attributes_->num_attributes - 1; ++thread_id) { - thread_attr.set_thread_attribute( - thread_attributes_->attributes[thread_id]); - auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); - threads.emplace_back(rcpputils::Thread(thread_attr, func)); - } + if (thread_attr_) { + std::lock_guard wait_lock{wait_mutex_}; + for (; thread_id < number_of_threads_; ++thread_id) { + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(thread_attr_.value(), func); } - thread_attr.set_thread_attribute( - thread_attributes_->attributes[thread_id]); - rcpputils::this_thread::run_with_thread_attribute( - thread_attr, &MultiThreadedExecutor::run, this, thread_id); } else { { std::lock_guard wait_lock{wait_mutex_}; @@ -111,11 +122,7 @@ MultiThreadedExecutor::spin() size_t MultiThreadedExecutor::get_number_of_threads() { - if (thread_attributes_) { - return thread_attributes_->num_attributes; - } else { - return number_of_threads_; - } + return number_of_threads_; } void @@ -144,3 +151,40 @@ MultiThreadedExecutor::run(size_t this_thread_number) any_exec.callback_group.reset(); } } + +std::optional +default_thread_attr(rclcpp::ExecutorOptions const & options) +{ + const rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs( + options.context->get_rcl_context().get()); + if (!attrs) { + return std::nullopt; + } + + std::string name; + bool name_specified = !options.name.empty(); + if (name_specified) { + name = options.name; + } else { + name = MultiThreadedExecutor::default_name; + } + + const rcutils_thread_attr_t * attrs_beg = attrs->attributes; + const rcutils_thread_attr_t * attrs_end = attrs->attributes + attrs->num_attributes; + const rcutils_thread_attr_t * attr = std::find_if( + attrs_beg, attrs_end, + [&](const auto & attr) { + return attr.name == name; + }); + if (attr != attrs_end) { + return rcpputils::ThreadAttribute(*attr); + } else { + if (name_specified) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "MultiThreadedExecutor is named \"%s\", but not found corresponding thread attribute.", + name.c_str()); + } + return std::nullopt; + } +} diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index c4de845c51..8d3212efc9 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -12,43 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rcpputils/scope_exit.hpp" -#include "rcpputils/threads.hpp" +#include "rcpputils/thread.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/any_executable.hpp" using rclcpp::executors::SingleThreadedExecutor; -SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::ExecutorOptions & options) +const char SingleThreadedExecutor::default_name[] = "RCLCPP_EXECUTOR_SINGLE_THREADED"; + +static std::optional +default_thread_attr(const rclcpp::ExecutorOptions & options); + +SingleThreadedExecutor::SingleThreadedExecutor( + const rclcpp::ExecutorOptions & options) : rclcpp::Executor(options), - thread_attributes_(nullptr) -{ - if (rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs( - options.context->get_rcl_context().get())) - { - thread_attributes_ = attrs; - } + thread_attr_(default_thread_attr(options)) {} - if (thread_attributes_ && thread_attributes_->num_attributes != 1) { - RCLCPP_WARN( - rclcpp::get_logger("rclcpp"), - "Specified thread attributes contains multiple configurations.\n" - "The executor runs only using first configuration."); - } -} +SingleThreadedExecutor::SingleThreadedExecutor( + const rclcpp::ExecutorOptions & options, + const rcpputils::ThreadAttribute & thread_attr) +: rclcpp::Executor(options), + thread_attr_(thread_attr) {} SingleThreadedExecutor::~SingleThreadedExecutor() {} void SingleThreadedExecutor::spin() { - if (thread_attributes_) { - rcpputils::Thread::Attribute thread_attr; - thread_attr.set_thread_attribute( - thread_attributes_->attributes[0]); - rcpputils::this_thread::run_with_thread_attribute( - thread_attr, &SingleThreadedExecutor::run, this); + if (thread_attr_) { + rcpputils::Thread thread(thread_attr_.value(), &SingleThreadedExecutor::run, this); + thread.join(); } else { run(); } @@ -68,3 +65,40 @@ SingleThreadedExecutor::run() } } } + +std::optional +default_thread_attr(rclcpp::ExecutorOptions const & options) +{ + const rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs( + options.context->get_rcl_context().get()); + if (!attrs) { + return std::nullopt; + } + + std::string name; + bool name_specified = !options.name.empty(); + if (name_specified) { + name = options.name; + } else { + name = SingleThreadedExecutor::default_name; + } + + const rcutils_thread_attr_t * attrs_beg = attrs->attributes; + const rcutils_thread_attr_t * attrs_end = attrs->attributes + attrs->num_attributes; + const rcutils_thread_attr_t * attr = std::find_if( + attrs_beg, attrs_end, + [&](const auto & attr) { + return attr.name == name; + }); + if (attr != attrs_end) { + return rcpputils::ThreadAttribute(*attr); + } else { + if (name_specified) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "SingleThreadedExecutor is named \"%s\", but not found corresponding thread attribute.", + name.c_str()); + } + return std::nullopt; + } +} diff --git a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp index b6fd2c3fda..8b552a4c04 100644 --- a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp +++ b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp @@ -14,10 +14,16 @@ #include +#include + +#include #include +#include #include #include +#include +#include "rcpputils/thread.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/node.hpp" #include "rclcpp/rclcpp.hpp" @@ -28,10 +34,14 @@ using namespace std::chrono_literals; class TestMultiThreadedExecutor : public ::testing::Test { protected: - static void SetUpTestCase() + void SetUp() override { rclcpp::init(0, nullptr); } + void TearDown() override + { + rclcpp::shutdown(); + } }; constexpr std::chrono::milliseconds PERIOD_MS = 1000ms; @@ -97,3 +107,124 @@ TEST_F(TestMultiThreadedExecutor, timer_over_take) { executor.add_node(node); executor.spin(); } + +TEST_F(TestMultiThreadedExecutor, thread_attribute_apply) { + using rcpputils::Thread; + using rcpputils::ThreadAttribute; + using rcpputils::ThreadId; + using rcpputils::SchedPolicy; + + ThreadAttribute attr; + unsigned thread_count = 10; + +#if __linux__ + ThreadId parent_thread = rcpputils::this_thread::get_id(); + { + sched_param param = {0}; + int r = pthread_setschedparam(pthread_self(), SCHED_OTHER, ¶m); + EXPECT_EQ(0, r); + // SCHED_FIFO, SCHED_RR are require the privilege on linux + attr.set_sched_policy(SchedPolicy::batch); + } +#endif + + rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(), thread_count, attr); + + std::atomic succ_count = 0; + auto node = std::make_shared("node", "ns"); + auto timer = node->create_wall_timer( + std::chrono::milliseconds(1), [&]() { +#if __linux__ + int policy; + sched_param param; + ThreadId worker_thread = rcpputils::this_thread::get_id(); + int r = pthread_getschedparam(pthread_self(), &policy, ¶m); + SchedPolicy rclcpp_policy = SchedPolicy(0x8000'0000 | policy); + EXPECT_EQ(0, r); + EXPECT_EQ(SchedPolicy::batch, rclcpp_policy); + EXPECT_NE(worker_thread, parent_thread); +#endif + unsigned n = succ_count.fetch_add(1); + if (n == thread_count - 1) { + executor.cancel(); + } + }); + + executor.add_node(node); + executor.spin(); + EXPECT_EQ(succ_count, thread_count); +} + +constexpr char const * argv[] = { + "test_multi_threaded_executor", + "--ros-args", + "--thread-attrs-value", + R"( +- name: RCLCPP_EXECUTOR_MULTI_THREADED + scheduling_policy: FIFO + priority: 10 + core_affinity: [] +- name: executor-1 + scheduling_policy: RR + priority: 20 + core_affinity: [0] + )", + NULL, +}; +constexpr int argc = sizeof(argv) / sizeof(*argv) - 1; + +class TestMultiThreadedExecutorAttribute : public ::testing::Test +{ +protected: + void SetUp() override + { + rclcpp::init(argc, argv); + } + void TearDown() override + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestMultiThreadedExecutorAttribute, thread_attribute_default_name) { + using rcpputils::SchedPolicy; + rclcpp::executors::MultiThreadedExecutor executor; + + const auto & opt_attr = executor.get_thread_attribute(); + ASSERT_TRUE(opt_attr); + + const auto & attr = opt_attr.value(); + EXPECT_EQ(SchedPolicy::fifo, attr.get_sched_policy()); + EXPECT_EQ(10, attr.get_priority()); + + const auto & affinity = attr.get_affinity(); + EXPECT_EQ(0, affinity.count()); +} + +TEST_F(TestMultiThreadedExecutorAttribute, thread_attribute_specified_name) { + using rcpputils::SchedPolicy; + auto options = rclcpp::ExecutorOptions(); + options.name = "executor-1"; + rclcpp::executors::MultiThreadedExecutor executor(options); + + const auto & opt_attr = executor.get_thread_attribute(); + ASSERT_TRUE(opt_attr); + + const auto & attr = opt_attr.value(); + EXPECT_EQ(SchedPolicy::rr, attr.get_sched_policy()); + EXPECT_EQ(20, attr.get_priority()); + + const auto & affinity = attr.get_affinity(); + EXPECT_EQ(1, affinity.count()); + EXPECT_TRUE(affinity.is_set(0)); +} + +TEST_F(TestMultiThreadedExecutorAttribute, thread_attribute_not_found) { + using rcpputils::SchedPolicy; + auto options = rclcpp::ExecutorOptions(); + options.name = "executor-not-exists"; + rclcpp::executors::MultiThreadedExecutor executor(options); + + const auto & opt_attr = executor.get_thread_attribute(); + ASSERT_FALSE(opt_attr); +}