Skip to content

Commit

Permalink
hubero_ros - TaskRequestRosApi extended with an approach of task …
Browse files Browse the repository at this point in the history
…execution performed in a separate thread [#29, #57]
  • Loading branch information
rayvburn committed Oct 3, 2023
1 parent d0e1adc commit 63e62ba
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 0 deletions.
67 changes: 67 additions & 0 deletions hubero_ros/include/hubero_ros/task_request_ros_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class TaskRequestRosApi {
/// Defines the duration of sleeps between subsequent processings of the callback queue
const int CALLBACK_SPINNING_SLEEP_TIME_MS = 10;

/// Defines the duration of sleeps between subsequent evaluations of task conditions
const int THREADED_EXECUTOR_SLEEP_TIME_MS = 100;

/**
* @brief Constructor of @ref TaskRequestRosApi instance
* @param actor_name actor identifier from simulation
Expand Down Expand Up @@ -383,6 +386,46 @@ class TaskRequestRosApi {
return actor_name_;
}

/**
* @defgroup execution Methods related to a task execution in a separate thread
*/
/**
* @brief Starts the execution of a single task defined by the arguments in a separate thread
*
* This method does not stop previously started task in any way.
*
* @param state_checker_fun a reference to a function that allows checking the state of task execution,
* e.g., @ref getTeleopState; reference is a must here - inline lambda won't work
* @param logging_task_name task name used in logs
* @param timeout timeout for task execution (by default - no limit)
* @param state_executing a state in which the task is assumed to be executing
*/
void startThreadedExecution(
const std::function<TaskFeedbackType()>& state_checker_fun,
const std::string& logging_task_name,
const ros::Duration& timeout = ros::Duration(0.0),
TaskFeedbackType state_executing = TaskFeedbackType::TASK_FEEDBACK_ACTIVE
);

/// Evaluates whether the thread that executes the newest task does still perform computations
inline bool isThreadExecuting() const {
return threaded_task_execution_requested_ && !threaded_task_execution_done_;
}

/**
* @brief Joins the thread that executes the newest task
*
* Can be also treated as a method that executes a task started with @ref startThreadedExecution in a blocking way
*/
void join();

/// @brief Calls @ref std::thread::joinable method on the @ref task_executor_ thread
inline bool joinable() const {
return task_executor_.joinable();
}

/** @} */ // end of execution group

/**
* @defgroup utils Static functions that may be handy when designing a custom scenario
*/
Expand Down Expand Up @@ -511,13 +554,37 @@ class TaskRequestRosApi {
*/
virtual void callbackProcessingThread();

/**
* Takes care of the execution of a task defined by the arguments.
*
* This should be called immediately after sending a task request.
*
* @param state_checker_fun a function that allows checking the state of task execution, e.g., @ref getTeleopState
* @param logging_task_name task name used in logs
* @param timeout timeout for task execution (by default no limit)
* @param state_executing a state in which the task is assumed to be executing
*/
void threadedExecutor(
const std::function<TaskFeedbackType()>& state_checker_fun,
const std::string& logging_task_name,
const ros::Duration& timeout,
TaskFeedbackType state_executing
);

private:
/// Name of the actor provided in the constructor
std::string actor_name_;

/// A separate thread for processing callbacks
std::thread callback_spinner_;

/// A thread for executing a task
std::thread task_executor_;
/// Atomic flag indicating the request of a threaded execution of a task
std::atomic<bool> threaded_task_execution_requested_;
/// Atomic flag indicating the finish of computations for @ref task_executor_ thread
std::atomic<bool> threaded_task_execution_done_;

/// Flag set in the destructor to join threads
std::atomic<bool> destructing_;

Expand Down
104 changes: 104 additions & 0 deletions hubero_ros/src/task_request_ros_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace hubero {
TaskRequestRosApi::TaskRequestRosApi(const std::string& actor_name):
node_ptr_(std::make_shared<Node>("task_request_ros_api_node")),
callback_spinner_(std::thread(&TaskRequestRosApi::callbackProcessingThread, this)),
threaded_task_execution_requested_(false),
threaded_task_execution_done_(false),
destructing_(false)
{
// assignment - init list does not support string reference copying
Expand Down Expand Up @@ -116,6 +118,9 @@ TaskRequestRosApi::TaskRequestRosApi(const std::string& actor_name):

TaskRequestRosApi::~TaskRequestRosApi() {
destructing_ = true;
if (task_executor_.joinable()) {
task_executor_.join();
}
if (callback_spinner_.joinable()) {
callback_spinner_.join();
}
Expand Down Expand Up @@ -414,6 +419,32 @@ std::string TaskRequestRosApi::getTeleopStateDescription() const {
return getActionStateDescription(ac_teleop_ptr_);
}

void TaskRequestRosApi::startThreadedExecution(
const std::function<TaskFeedbackType()>& state_checker_fun,
const std::string& logging_task_name,
const ros::Duration& timeout,
TaskFeedbackType state_executing
) {
threaded_task_execution_requested_ = true;
threaded_task_execution_done_ = false;
task_executor_ = std::thread(
&TaskRequestRosApi::threadedExecutor,
this,
std::ref(state_checker_fun),
logging_task_name,
timeout,
state_executing
);
}

void TaskRequestRosApi::join() {
if (task_executor_.joinable()) {
task_executor_.join();
}
threaded_task_execution_done_ = true;
threaded_task_execution_requested_ = false;
}

// static
void TaskRequestRosApi::wait(const std::chrono::milliseconds& ms) {
std::this_thread::sleep_for(ms);
Expand Down Expand Up @@ -447,4 +478,77 @@ void TaskRequestRosApi::callbackProcessingThread() {
}
}

void TaskRequestRosApi::threadedExecutor(
const std::function<TaskFeedbackType()>& state_checker_fun,
const std::string& logging_task_name,
const ros::Duration& timeout,
TaskFeedbackType state_executing
) {
auto timeout_start = ros::Time::now();

// we will be waiting for actor tasks finishes, so we must also know if the task request was processed
while (!destructing_ && state_checker_fun() != state_executing) {
if (!ros::ok()) {
throw std::runtime_error(
"[HuBeRo] [TaskRequestRosApi].[threadedExecutor] ROS stopped working during "
+ logging_task_name
+ " preparation by the "
+ getName()
);
}
if (!timeout.isZero() && (ros::Time::now() - timeout_start) >= timeout) {
throw std::runtime_error(
"[HuBeRo] [TaskRequestRosApi].[threadedExecutor] Timeout of "
+ logging_task_name
+ " (preparation) for "
+ getName()
+ " has elapsed!"
);
}
HUBERO_LOG(
"[%s].[TaskRequestRosApi] Waiting for the `%s` task to become active. Current state %d...\r\n",
getName().c_str(),
logging_task_name.c_str(),
state_checker_fun()
);
std::this_thread::sleep_for(std::chrono::milliseconds(THREADED_EXECUTOR_SLEEP_TIME_MS));
}

HUBERO_LOG(
"[%s].[TaskRequestRosApi] `%s` task properly activated (state %d), starting the execution...\r\n",
getName().c_str(),
logging_task_name.c_str(),
state_checker_fun()
);

while (!destructing_ && state_checker_fun() == state_executing) {
if (!ros::ok()) {
throw std::runtime_error(
"[HuBeRo] [TaskRequestRosApi].[threadedExecutor] ROS stopped working during "
+ logging_task_name
+ " execution by the "
+ getName()
);
}
if (!timeout.isZero() && (ros::Time::now() - timeout_start) >= timeout) {
throw std::runtime_error(
"[HuBeRo] [TaskRequestRosApi].[threadedExecutor] Timeout of "
+ logging_task_name
+ " (execution) for "
+ getName()
+ " has elapsed!"
);
}
std::this_thread::sleep_for(std::chrono::milliseconds(THREADED_EXECUTOR_SLEEP_TIME_MS));
}

HUBERO_LOG(
"[%s].[TaskRequestRosApi] Execution of `%s` task ended with the state %d\r\n",
getName().c_str(),
logging_task_name.c_str(),
state_checker_fun()
);
threaded_task_execution_done_ = true;
}

} // namespace hubero

0 comments on commit 63e62ba

Please sign in to comment.