From 63e62bab815be4cf7385345f45971f8f225391ed Mon Sep 17 00:00:00 2001
From: Jarek Karwowski <chromedivizer@gmail.com>
Date: Wed, 4 Oct 2023 00:09:30 +0200
Subject: [PATCH] `hubero_ros` - `TaskRequestRosApi` extended with an approach
 of task execution performed in a separate thread [#29, #57]

---
 .../include/hubero_ros/task_request_ros_api.h |  67 +++++++++++
 hubero_ros/src/task_request_ros_api.cpp       | 104 ++++++++++++++++++
 2 files changed, 171 insertions(+)

diff --git a/hubero_ros/include/hubero_ros/task_request_ros_api.h b/hubero_ros/include/hubero_ros/task_request_ros_api.h
index d84af8c..0e6b15d 100644
--- a/hubero_ros/include/hubero_ros/task_request_ros_api.h
+++ b/hubero_ros/include/hubero_ros/task_request_ros_api.h
@@ -37,6 +37,9 @@ class TaskRequestRosApi {
 	/// Defines the duration of sleeps between subsequent processings of the callback queue
 	const int CALLBACK_SPINNING_SLEEP_TIME_MS = 10;
 
+	/// Defines the duration of sleeps between subsequent evaluations of task conditions
+	const int THREADED_EXECUTOR_SLEEP_TIME_MS = 100;
+
 	/**
 	 * @brief Constructor of @ref TaskRequestRosApi instance
 	 * @param actor_name actor identifier from simulation
@@ -383,6 +386,46 @@ class TaskRequestRosApi {
 		return actor_name_;
 	}
 
+	/**
+	 * @defgroup execution Methods related to a task execution in a separate thread
+	 */
+	/**
+	 * @brief Starts the execution of a single task defined by the arguments in a separate thread
+	 *
+	 * This method does not stop previously started task in any way.
+	 *
+	 * @param state_checker_fun a reference to a function that allows checking the state of task execution,
+	 * e.g., @ref getTeleopState; reference is a must here - inline lambda won't work
+	 * @param logging_task_name task name used in logs
+	 * @param timeout timeout for task execution (by default - no limit)
+	 * @param state_executing a state in which the task is assumed to be executing
+	 */
+	void startThreadedExecution(
+		const std::function<TaskFeedbackType()>& state_checker_fun,
+		const std::string& logging_task_name,
+		const ros::Duration& timeout = ros::Duration(0.0),
+		TaskFeedbackType state_executing = TaskFeedbackType::TASK_FEEDBACK_ACTIVE
+	);
+
+	/// Evaluates whether the thread that executes the newest task does still perform computations
+	inline bool isThreadExecuting() const {
+		return threaded_task_execution_requested_ && !threaded_task_execution_done_;
+	}
+
+	/**
+	 * @brief Joins the thread that executes the newest task
+	 *
+	 * Can be also treated as a method that executes a task started with @ref startThreadedExecution in a blocking way
+	 */
+	void join();
+
+	/// @brief Calls @ref std::thread::joinable method on the @ref task_executor_ thread
+	inline bool joinable() const {
+		return task_executor_.joinable();
+	}
+
+	/** @} */ // end of execution group
+
 	/**
 	 * @defgroup utils Static functions that may be handy when designing a custom scenario
 	 */
@@ -511,6 +554,23 @@ class TaskRequestRosApi {
 	 */
 	virtual void callbackProcessingThread();
 
+	/**
+	 * Takes care of the execution of a task defined by the arguments.
+	 *
+	 * This should be called immediately after sending a task request.
+	 *
+	 * @param state_checker_fun a function that allows checking the state of task execution, e.g., @ref getTeleopState
+	 * @param logging_task_name task name used in logs
+	 * @param timeout timeout for task execution (by default no limit)
+	 * @param state_executing a state in which the task is assumed to be executing
+	 */
+	void threadedExecutor(
+		const std::function<TaskFeedbackType()>& state_checker_fun,
+		const std::string& logging_task_name,
+		const ros::Duration& timeout,
+		TaskFeedbackType state_executing
+	);
+
 private:
 	/// Name of the actor provided in the constructor
 	std::string actor_name_;
@@ -518,6 +578,13 @@ class TaskRequestRosApi {
 	/// A separate thread for processing callbacks
 	std::thread callback_spinner_;
 
+	/// A thread for executing a task
+	std::thread task_executor_;
+	/// Atomic flag indicating the request of a threaded execution of a task
+	std::atomic<bool> threaded_task_execution_requested_;
+	/// Atomic flag indicating the finish of computations for @ref task_executor_ thread
+	std::atomic<bool> threaded_task_execution_done_;
+
 	/// Flag set in the destructor to join threads
 	std::atomic<bool> destructing_;
 
diff --git a/hubero_ros/src/task_request_ros_api.cpp b/hubero_ros/src/task_request_ros_api.cpp
index bced63e..1f1f8b5 100644
--- a/hubero_ros/src/task_request_ros_api.cpp
+++ b/hubero_ros/src/task_request_ros_api.cpp
@@ -9,6 +9,8 @@ namespace hubero {
 TaskRequestRosApi::TaskRequestRosApi(const std::string& actor_name):
 	node_ptr_(std::make_shared<Node>("task_request_ros_api_node")),
 	callback_spinner_(std::thread(&TaskRequestRosApi::callbackProcessingThread, this)),
+	threaded_task_execution_requested_(false),
+	threaded_task_execution_done_(false),
 	destructing_(false)
 {
 	// assignment - init list does not support string reference copying
@@ -116,6 +118,9 @@ TaskRequestRosApi::TaskRequestRosApi(const std::string& actor_name):
 
 TaskRequestRosApi::~TaskRequestRosApi() {
 	destructing_ = true;
+	if (task_executor_.joinable()) {
+		task_executor_.join();
+	}
 	if (callback_spinner_.joinable()) {
 		callback_spinner_.join();
 	}
@@ -414,6 +419,32 @@ std::string TaskRequestRosApi::getTeleopStateDescription() const {
 	return getActionStateDescription(ac_teleop_ptr_);
 }
 
+void TaskRequestRosApi::startThreadedExecution(
+	const std::function<TaskFeedbackType()>& state_checker_fun,
+	const std::string& logging_task_name,
+	const ros::Duration& timeout,
+	TaskFeedbackType state_executing
+) {
+	threaded_task_execution_requested_ = true;
+	threaded_task_execution_done_ = false;
+	task_executor_ = std::thread(
+		&TaskRequestRosApi::threadedExecutor,
+		this,
+		std::ref(state_checker_fun),
+		logging_task_name,
+		timeout,
+		state_executing
+	);
+}
+
+void TaskRequestRosApi::join() {
+	if (task_executor_.joinable()) {
+		task_executor_.join();
+	}
+	threaded_task_execution_done_ = true;
+	threaded_task_execution_requested_ = false;
+}
+
 // static
 void TaskRequestRosApi::wait(const std::chrono::milliseconds& ms) {
 	std::this_thread::sleep_for(ms);
@@ -447,4 +478,77 @@ void TaskRequestRosApi::callbackProcessingThread() {
 	}
 }
 
+void TaskRequestRosApi::threadedExecutor(
+	const std::function<TaskFeedbackType()>& state_checker_fun,
+	const std::string& logging_task_name,
+	const ros::Duration& timeout,
+	TaskFeedbackType state_executing
+) {
+	auto timeout_start = ros::Time::now();
+
+	// we will be waiting for actor tasks finishes, so we must also know if the task request was processed
+	while (!destructing_ && state_checker_fun() != state_executing) {
+		if (!ros::ok()) {
+			throw std::runtime_error(
+				"[HuBeRo] [TaskRequestRosApi].[threadedExecutor] ROS stopped working during "
+				+ logging_task_name
+				+ " preparation by the "
+				+ getName()
+			);
+		}
+		if (!timeout.isZero() && (ros::Time::now() - timeout_start) >= timeout) {
+			throw std::runtime_error(
+				"[HuBeRo] [TaskRequestRosApi].[threadedExecutor] Timeout of "
+				+ logging_task_name
+				+ " (preparation) for "
+				+ getName()
+				+ " has elapsed!"
+			);
+		}
+		HUBERO_LOG(
+			"[%s].[TaskRequestRosApi] Waiting for the `%s` task to become active. Current state %d...\r\n",
+			getName().c_str(),
+			logging_task_name.c_str(),
+			state_checker_fun()
+		);
+		std::this_thread::sleep_for(std::chrono::milliseconds(THREADED_EXECUTOR_SLEEP_TIME_MS));
+	}
+
+	HUBERO_LOG(
+		"[%s].[TaskRequestRosApi] `%s` task properly activated (state %d), starting the execution...\r\n",
+		getName().c_str(),
+		logging_task_name.c_str(),
+		state_checker_fun()
+	);
+
+	while (!destructing_ && state_checker_fun() == state_executing) {
+		if (!ros::ok()) {
+			throw std::runtime_error(
+				"[HuBeRo] [TaskRequestRosApi].[threadedExecutor] ROS stopped working during "
+				+ logging_task_name
+				+ " execution by the "
+				+ getName()
+			);
+		}
+		if (!timeout.isZero() && (ros::Time::now() - timeout_start) >= timeout) {
+			throw std::runtime_error(
+				"[HuBeRo] [TaskRequestRosApi].[threadedExecutor] Timeout of "
+				+ logging_task_name
+				+ " (execution) for "
+				+ getName()
+				+ " has elapsed!"
+			);
+		}
+		std::this_thread::sleep_for(std::chrono::milliseconds(THREADED_EXECUTOR_SLEEP_TIME_MS));
+	}
+
+	HUBERO_LOG(
+		"[%s].[TaskRequestRosApi] Execution of `%s` task ended with the state %d\r\n",
+		getName().c_str(),
+		logging_task_name.c_str(),
+		state_checker_fun()
+	);
+	threaded_task_execution_done_ = true;
+}
+
 } // namespace hubero