diff --git a/hubero_ros/include/hubero_ros/task_request_ros_api.h b/hubero_ros/include/hubero_ros/task_request_ros_api.h index d84af8c..0e6b15d 100644 --- a/hubero_ros/include/hubero_ros/task_request_ros_api.h +++ b/hubero_ros/include/hubero_ros/task_request_ros_api.h @@ -37,6 +37,9 @@ class TaskRequestRosApi { /// Defines the duration of sleeps between subsequent processings of the callback queue const int CALLBACK_SPINNING_SLEEP_TIME_MS = 10; + /// Defines the duration of sleeps between subsequent evaluations of task conditions + const int THREADED_EXECUTOR_SLEEP_TIME_MS = 100; + /** * @brief Constructor of @ref TaskRequestRosApi instance * @param actor_name actor identifier from simulation @@ -383,6 +386,46 @@ class TaskRequestRosApi { return actor_name_; } + /** + * @defgroup execution Methods related to a task execution in a separate thread + */ + /** + * @brief Starts the execution of a single task defined by the arguments in a separate thread + * + * This method does not stop previously started task in any way. + * + * @param state_checker_fun a reference to a function that allows checking the state of task execution, + * e.g., @ref getTeleopState; reference is a must here - inline lambda won't work + * @param logging_task_name task name used in logs + * @param timeout timeout for task execution (by default - no limit) + * @param state_executing a state in which the task is assumed to be executing + */ + void startThreadedExecution( + const std::function& state_checker_fun, + const std::string& logging_task_name, + const ros::Duration& timeout = ros::Duration(0.0), + TaskFeedbackType state_executing = TaskFeedbackType::TASK_FEEDBACK_ACTIVE + ); + + /// Evaluates whether the thread that executes the newest task does still perform computations + inline bool isThreadExecuting() const { + return threaded_task_execution_requested_ && !threaded_task_execution_done_; + } + + /** + * @brief Joins the thread that executes the newest task + * + * Can be also treated as a method that executes a task started with @ref startThreadedExecution in a blocking way + */ + void join(); + + /// @brief Calls @ref std::thread::joinable method on the @ref task_executor_ thread + inline bool joinable() const { + return task_executor_.joinable(); + } + + /** @} */ // end of execution group + /** * @defgroup utils Static functions that may be handy when designing a custom scenario */ @@ -511,6 +554,23 @@ class TaskRequestRosApi { */ virtual void callbackProcessingThread(); + /** + * Takes care of the execution of a task defined by the arguments. + * + * This should be called immediately after sending a task request. + * + * @param state_checker_fun a function that allows checking the state of task execution, e.g., @ref getTeleopState + * @param logging_task_name task name used in logs + * @param timeout timeout for task execution (by default no limit) + * @param state_executing a state in which the task is assumed to be executing + */ + void threadedExecutor( + const std::function& state_checker_fun, + const std::string& logging_task_name, + const ros::Duration& timeout, + TaskFeedbackType state_executing + ); + private: /// Name of the actor provided in the constructor std::string actor_name_; @@ -518,6 +578,13 @@ class TaskRequestRosApi { /// A separate thread for processing callbacks std::thread callback_spinner_; + /// A thread for executing a task + std::thread task_executor_; + /// Atomic flag indicating the request of a threaded execution of a task + std::atomic threaded_task_execution_requested_; + /// Atomic flag indicating the finish of computations for @ref task_executor_ thread + std::atomic threaded_task_execution_done_; + /// Flag set in the destructor to join threads std::atomic destructing_; diff --git a/hubero_ros/src/task_request_ros_api.cpp b/hubero_ros/src/task_request_ros_api.cpp index bced63e..1f1f8b5 100644 --- a/hubero_ros/src/task_request_ros_api.cpp +++ b/hubero_ros/src/task_request_ros_api.cpp @@ -9,6 +9,8 @@ namespace hubero { TaskRequestRosApi::TaskRequestRosApi(const std::string& actor_name): node_ptr_(std::make_shared("task_request_ros_api_node")), callback_spinner_(std::thread(&TaskRequestRosApi::callbackProcessingThread, this)), + threaded_task_execution_requested_(false), + threaded_task_execution_done_(false), destructing_(false) { // assignment - init list does not support string reference copying @@ -116,6 +118,9 @@ TaskRequestRosApi::TaskRequestRosApi(const std::string& actor_name): TaskRequestRosApi::~TaskRequestRosApi() { destructing_ = true; + if (task_executor_.joinable()) { + task_executor_.join(); + } if (callback_spinner_.joinable()) { callback_spinner_.join(); } @@ -414,6 +419,32 @@ std::string TaskRequestRosApi::getTeleopStateDescription() const { return getActionStateDescription(ac_teleop_ptr_); } +void TaskRequestRosApi::startThreadedExecution( + const std::function& state_checker_fun, + const std::string& logging_task_name, + const ros::Duration& timeout, + TaskFeedbackType state_executing +) { + threaded_task_execution_requested_ = true; + threaded_task_execution_done_ = false; + task_executor_ = std::thread( + &TaskRequestRosApi::threadedExecutor, + this, + std::ref(state_checker_fun), + logging_task_name, + timeout, + state_executing + ); +} + +void TaskRequestRosApi::join() { + if (task_executor_.joinable()) { + task_executor_.join(); + } + threaded_task_execution_done_ = true; + threaded_task_execution_requested_ = false; +} + // static void TaskRequestRosApi::wait(const std::chrono::milliseconds& ms) { std::this_thread::sleep_for(ms); @@ -447,4 +478,77 @@ void TaskRequestRosApi::callbackProcessingThread() { } } +void TaskRequestRosApi::threadedExecutor( + const std::function& state_checker_fun, + const std::string& logging_task_name, + const ros::Duration& timeout, + TaskFeedbackType state_executing +) { + auto timeout_start = ros::Time::now(); + + // we will be waiting for actor tasks finishes, so we must also know if the task request was processed + while (!destructing_ && state_checker_fun() != state_executing) { + if (!ros::ok()) { + throw std::runtime_error( + "[HuBeRo] [TaskRequestRosApi].[threadedExecutor] ROS stopped working during " + + logging_task_name + + " preparation by the " + + getName() + ); + } + if (!timeout.isZero() && (ros::Time::now() - timeout_start) >= timeout) { + throw std::runtime_error( + "[HuBeRo] [TaskRequestRosApi].[threadedExecutor] Timeout of " + + logging_task_name + + " (preparation) for " + + getName() + + " has elapsed!" + ); + } + HUBERO_LOG( + "[%s].[TaskRequestRosApi] Waiting for the `%s` task to become active. Current state %d...\r\n", + getName().c_str(), + logging_task_name.c_str(), + state_checker_fun() + ); + std::this_thread::sleep_for(std::chrono::milliseconds(THREADED_EXECUTOR_SLEEP_TIME_MS)); + } + + HUBERO_LOG( + "[%s].[TaskRequestRosApi] `%s` task properly activated (state %d), starting the execution...\r\n", + getName().c_str(), + logging_task_name.c_str(), + state_checker_fun() + ); + + while (!destructing_ && state_checker_fun() == state_executing) { + if (!ros::ok()) { + throw std::runtime_error( + "[HuBeRo] [TaskRequestRosApi].[threadedExecutor] ROS stopped working during " + + logging_task_name + + " execution by the " + + getName() + ); + } + if (!timeout.isZero() && (ros::Time::now() - timeout_start) >= timeout) { + throw std::runtime_error( + "[HuBeRo] [TaskRequestRosApi].[threadedExecutor] Timeout of " + + logging_task_name + + " (execution) for " + + getName() + + " has elapsed!" + ); + } + std::this_thread::sleep_for(std::chrono::milliseconds(THREADED_EXECUTOR_SLEEP_TIME_MS)); + } + + HUBERO_LOG( + "[%s].[TaskRequestRosApi] Execution of `%s` task ended with the state %d\r\n", + getName().c_str(), + logging_task_name.c_str(), + state_checker_fun() + ); + threaded_task_execution_done_ = true; +} + } // namespace hubero