diff --git a/rclcpp/include/rclcpp/executors/events_executor.hpp b/rclcpp/include/rclcpp/executors/events_executor.hpp index d6ded52be8..49e3b24c7d 100644 --- a/rclcpp/include/rclcpp/executors/events_executor.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor.hpp @@ -37,10 +37,16 @@ namespace executors /// Events executor implementation /** - * This executor is a variation of the standard one that does not uses a waitset. - * The executor uses an events queue and a timers manager to execute entities from its + * This executor uses an events queue and a timers manager to execute entities from its * associated nodes and callback groups. - * This provides improved performance as it allows to skip all the waitset maintenance operations. + * The RMW listener APIs are used to collect new events. + * + * This executor tries to reduce as much as possible the amount of maintenance operations. + * This allows to use customized `EventsQueue` classes to achieve different goals such + * as very low CPU usage, bounded memory requirement, determinism, etc. + * + * The executor uses a weak ownership model and it locks entities only while executing + * their related events. * * To run this executor: * rclcpp::executors::EventsExecutor executor; @@ -204,8 +210,6 @@ class EventsExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(EventsExecutor) - using EventQueue = std::queue; - // Executor callback: Push new events into the queue and trigger cv. // This function is called by the DDS entities when an event happened, // like a subscription receiving a message. @@ -229,11 +233,6 @@ class EventsExecutor : public rclcpp::Executor this_executor->events_queue_cv_.notify_one(); } - /// Extract and execute events from the queue until it is empty - RCLCPP_PUBLIC - void - consume_all_events(EventQueue & queue); - // Execute a single event RCLCPP_PUBLIC void diff --git a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp index 77a5efd695..625d8c7651 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp @@ -17,8 +17,8 @@ #include -#include "rclcpp/executors/events_executor_entities_collector.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/visibility_control.hpp" #include "rmw/listener_event_types.h" @@ -30,10 +30,14 @@ namespace buffers { /** - * @brief This abstract class is intended to be used as - * a wrapper around a queue. The derived classes should chose - * which container to use and the strategies for push and prune - * events from the queue. + * @brief This abstract class can be used to implement different types of queues + * where `rmw_listener_event_t` can be stored. + * The derived classes should choose which underlying container to use and + * the strategy for pushing and popping events. + * For example a queue implementation may be bounded or unbounded and have + * different pruning strategies. + * Implementations may or may not check the validity of events and decide how to handle + * the situation where an event is not valid anymore (e.g. a subscription history cache overruns) */ class EventsQueue { @@ -56,9 +60,7 @@ class EventsQueue push(const rmw_listener_event_t & event) = 0; /** - * @brief removes front element from the queue - * The element removed is the "oldest" element in the queue whose - * value can be retrieved by calling member front(). + * @brief removes front element from the queue. */ RCLCPP_PUBLIC virtual @@ -101,14 +103,13 @@ class EventsQueue init() = 0; /** - * @brief gets a queue with all events accumulated on it since - * the last call. The member queue is empty when the call returns. + * @brief pops out all events stored in the object into an output queue. * @return queue with events */ RCLCPP_PUBLIC virtual std::queue - get_all_events() = 0; + pop_all_events() = 0; }; } // namespace buffers diff --git a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp index 5bb0a85b51..278e50d7be 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp @@ -28,11 +28,10 @@ namespace buffers { /** - * @brief This class provides a simple queue implementation - * based on a std::queue. As the objective is having a CPU peformant - * queue, it does not performs any checks about the size of - * the queue, so the queue size could grow unbounded. - * It does not implement any pruning mechanisms. + * @brief This class implements an EventsQueue as a simple wrapper around a std::queue. + * It does not perform any checks about the size of queue, which can grow + * unbounded without being pruned. + * The simplicity of this implementation makes it suitable for optimizing CPU usage. */ class SimpleEventsQueue : public EventsQueue { @@ -53,9 +52,7 @@ class SimpleEventsQueue : public EventsQueue } /** - * @brief removes front element from the queue - * The element removed is the "oldest" element in the queue whose - * value can be retrieved by calling member front(). + * @brief removes front event from the queue. */ RCLCPP_PUBLIC virtual @@ -114,7 +111,6 @@ class SimpleEventsQueue : public EventsQueue std::swap(event_queue_, local_queue); } - /** * @brief gets a queue with all events accumulated on it since * the last call. The member queue is empty when the call returns. @@ -123,7 +119,7 @@ class SimpleEventsQueue : public EventsQueue RCLCPP_PUBLIC virtual std::queue - get_all_events() + pop_all_events() { std::queue local_queue; std::swap(event_queue_, local_queue); diff --git a/rclcpp/src/rclcpp/executors/events_executor.cpp b/rclcpp/src/rclcpp/executors/events_executor.cpp index d4dcdd8b90..31f8039a43 100644 --- a/rclcpp/src/rclcpp/executors/events_executor.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor.cpp @@ -66,13 +66,19 @@ EventsExecutor::spin() std::unique_lock push_lock(push_mutex_); // We wait here until something has been pushed to the event queue events_queue_cv_.wait(push_lock, has_event_predicate); - // Local event queue to allow entities to push events while we execute them - EventQueue execution_event_queue = events_queue_->get_all_events(); + // Move all events into a local events queue to allow entities to push while we execute them + std::queue execution_events_queue = events_queue_->pop_all_events(); // Unlock the mutex push_lock.unlock(); - // Consume all available events, this queue will be empty at the end of the function - this->consume_all_events(execution_event_queue); + // Consume all available events + while (!execution_events_queue.empty()) { + rmw_listener_event_t event = execution_events_queue.front(); + execution_events_queue.pop(); + this->execute_event(event); + } } + + // Stop the timers manager thread when we are done spinning timers_manager_->stop(); } @@ -235,17 +241,6 @@ EventsExecutor::remove_node(std::shared_ptr node_ptr, bool notify) this->remove_node(node_ptr->get_node_base_interface(), notify); } -void -EventsExecutor::consume_all_events(EventQueue & event_queue) -{ - while (!event_queue.empty()) { - rmw_listener_event_t event = event_queue.front(); - event_queue.pop(); - - this->execute_event(event); - } -} - void EventsExecutor::execute_event(const rmw_listener_event_t & event) { diff --git a/rclcpp/test/rclcpp/executors/test_events_queue.cpp b/rclcpp/test/rclcpp/executors/test_events_queue.cpp index 4d22a5eb1e..9f540687d0 100644 --- a/rclcpp/test/rclcpp/executors/test_events_queue.cpp +++ b/rclcpp/test/rclcpp/executors/test_events_queue.cpp @@ -21,21 +21,7 @@ using namespace std::chrono_literals; -class TestEventsQueue : public ::testing::Test -{ -public: - void SetUp() - { - rclcpp::init(0, nullptr); - } - - void TearDown() - { - rclcpp::shutdown(); - } -}; - -TEST_F(TestEventsQueue, SimpleQueueTest) +TEST(TestEventsQueue, SimpleQueueTest) { // Create a SimpleEventsQueue and a local queue auto simple_queue = std::make_unique(); @@ -54,7 +40,7 @@ TEST_F(TestEventsQueue, SimpleQueueTest) // Pop one message simple_queue->pop(); - local_events_queue = simple_queue->get_all_events(); + local_events_queue = simple_queue->pop_all_events(); // We should have (11 - 1) events in the local queue size_t local_queue_size = local_events_queue.size();