Skip to content

Commit

Permalink
Sanitizer fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
asherikov committed Aug 10, 2024
1 parent a033cd5 commit dd1cf0d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 28 deletions.
52 changes: 28 additions & 24 deletions src/intrometry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
*/

#include <random>
#include <mutex>
#include <unordered_map>
#include <ratio>

#include <ariles2/visitors/namevalue2.h>
#include <thread_supervisor/supervisor.h>
Expand Down Expand Up @@ -195,13 +195,15 @@ namespace intrometry
};


class Publisher::Implementation : public tut::thread::InheritableSupervisor<ROSLogger>
class Publisher::Implementation
{
protected:
using SourceSet = std::unordered_map<std::string, WriterWrapper>;


protected:
std::shared_ptr<rclcpp::Node> node_;
tut::thread::Supervisor<ROSLogger> thread_supervisor_;
uint32_t names_version_;

SourceSet sources_;
Expand All @@ -210,14 +212,6 @@ namespace intrometry

NamesPublisherPtr names_publisher_;
ValuesPublisherPtr values_publisher_;
std::shared_ptr<rclcpp::Node> node_;

protected:
// cppcheck-suppress virtualCallInConstructor
void stopSupervisedThreads() override
{
getThreadSupervisor().stop();
}

public:
Implementation(const std::string &publisher_id, const std::size_t rate)
Expand Down Expand Up @@ -274,7 +268,7 @@ namespace intrometry
.append_parameter_override("use_sim_time", false)
.use_clock_thread(false)
.enable_rosout(false));
getThreadSupervisor().initializeLogger(node_);
thread_supervisor_.initializeLogger(node_);

names_publisher_ = node_->create_publisher<NamesMsg>(
str_concat(topic_prefix, "/names"), rclcpp::QoS(/*history_depth=*/20).reliable().transient_local());
Expand All @@ -283,7 +277,7 @@ namespace intrometry
rclcpp::QoS(/*history_depth=*/20).best_effort().durability_volatile());


this->addSupervisedThread(
thread_supervisor_.add(
tut::thread::Parameters(
tut::thread::Parameters::Restart(/*attempts=*/100, /*sleep_ms=*/50),
tut::thread::Parameters::TerminationPolicy::IGNORE,
Expand All @@ -295,30 +289,40 @@ namespace intrometry

virtual ~Implementation()
{
stopSupervisedThreads();
thread_supervisor_.stop();
}


void spin(const std::size_t rate)
{
rclcpp::WallRate loop_rate(static_cast<double>(rate));
const std::chrono::nanoseconds step(std::nano::den / rate);

while (rclcpp::ok() and not isThreadSupervisorInterrupted())
if (step.count() > 0)
{
if (publish_mutex_.try_lock())
std::chrono::time_point<std::chrono::steady_clock> time_threshold = std::chrono::steady_clock::now();

while (rclcpp::ok() and not thread_supervisor_.isInterrupted())
{
for (std::pair<const std::string, WriterWrapper> &source : sources_)
if (publish_mutex_.try_lock())
{
source.second.publish(names_publisher_, values_publisher_);
for (std::pair<const std::string, WriterWrapper> &source : sources_)
{
source.second.publish(names_publisher_, values_publisher_);
}

publish_mutex_.unlock();
}
rclcpp::spin_some(node_);

publish_mutex_.unlock();
time_threshold += step;
std::this_thread::sleep_until(time_threshold);
}
rclcpp::spin_some(node_);

loop_rate.sleep();
}
getThreadSupervisor().interrupt();
else
{
thread_supervisor_.log("Incorrect spin rate");
}
thread_supervisor_.interrupt();
}


Expand Down Expand Up @@ -346,7 +350,7 @@ namespace intrometry

if (sources_.end() == source_it)
{
getThreadSupervisor().log(
thread_supervisor_.log(
"Measurement source handler is not assigned, skipping id: ", source.arilesDefaultID());
}
else
Expand Down
12 changes: 8 additions & 4 deletions test/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ namespace intrometry_tests
rclcpp::QoS(/*history_depth=*/10).best_effort(),
[this](const t_Message &msg)
{
pal_statistics_msgs::msg::to_block_style_yaml(msg, std::cerr);
std::stringstream output;
pal_statistics_msgs::msg::to_block_style_yaml(msg, output);
std::cerr << output.str();
++counter_;
});
}
Expand All @@ -83,10 +85,10 @@ namespace intrometry_tests
class SubscriberNode : public tut::thread::InheritableSupervisor<>
{
public:
std::shared_ptr<rclcpp::Node> node_;
Subscription<NamesMsg> names_subscription_;
Subscription<ValuesMsg> values_subscription_;
std::thread spinner_;
std::shared_ptr<rclcpp::Node> node_;

protected:
// cppcheck-suppress virtualCallInConstructor
Expand Down Expand Up @@ -127,12 +129,14 @@ namespace intrometry_tests

void spin()
{
rclcpp::WallRate loop_rate(1000);
const std::chrono::nanoseconds step(std::nano::den / 1000);

std::chrono::time_point<std::chrono::steady_clock> time_threshold = std::chrono::steady_clock::now();
while (rclcpp::ok() and not isThreadSupervisorInterrupted())
{
rclcpp::spin_some(node_);
loop_rate.sleep();
time_threshold += step;
std::this_thread::sleep_until(time_threshold);
}
getThreadSupervisor().interrupt();
}
Expand Down

0 comments on commit dd1cf0d

Please sign in to comment.