diff --git a/src/intrometry.cpp b/src/intrometry.cpp index 512605f..e1e62b4 100644 --- a/src/intrometry.cpp +++ b/src/intrometry.cpp @@ -7,8 +7,8 @@ */ #include -#include #include +#include #include #include @@ -195,13 +195,15 @@ namespace intrometry }; - class Publisher::Implementation : public tut::thread::InheritableSupervisor + class Publisher::Implementation { protected: using SourceSet = std::unordered_map; protected: + std::shared_ptr node_; + tut::thread::Supervisor thread_supervisor_; uint32_t names_version_; SourceSet sources_; @@ -210,14 +212,6 @@ namespace intrometry NamesPublisherPtr names_publisher_; ValuesPublisherPtr values_publisher_; - std::shared_ptr node_; - - protected: - // cppcheck-suppress virtualCallInConstructor - void stopSupervisedThreads() override - { - getThreadSupervisor().stop(); - } public: Implementation(const std::string &publisher_id, const std::size_t rate) @@ -274,7 +268,7 @@ namespace intrometry .append_parameter_override("use_sim_time", false) .use_clock_thread(false) .enable_rosout(false)); - getThreadSupervisor().initializeLogger(node_); + thread_supervisor_.initializeLogger(node_); names_publisher_ = node_->create_publisher( str_concat(topic_prefix, "/names"), rclcpp::QoS(/*history_depth=*/20).reliable().transient_local()); @@ -283,7 +277,7 @@ namespace intrometry rclcpp::QoS(/*history_depth=*/20).best_effort().durability_volatile()); - this->addSupervisedThread( + thread_supervisor_.add( tut::thread::Parameters( tut::thread::Parameters::Restart(/*attempts=*/100, /*sleep_ms=*/50), tut::thread::Parameters::TerminationPolicy::IGNORE, @@ -295,30 +289,40 @@ namespace intrometry virtual ~Implementation() { - stopSupervisedThreads(); + thread_supervisor_.stop(); } void spin(const std::size_t rate) { - rclcpp::WallRate loop_rate(static_cast(rate)); + const std::chrono::nanoseconds step(std::nano::den / rate); - while (rclcpp::ok() and not isThreadSupervisorInterrupted()) + if (step.count() > 0) { - if (publish_mutex_.try_lock()) + std::chrono::time_point time_threshold = std::chrono::steady_clock::now(); + + while (rclcpp::ok() and not thread_supervisor_.isInterrupted()) { - for (std::pair &source : sources_) + if (publish_mutex_.try_lock()) { - source.second.publish(names_publisher_, values_publisher_); + for (std::pair &source : sources_) + { + source.second.publish(names_publisher_, values_publisher_); + } + + publish_mutex_.unlock(); } + rclcpp::spin_some(node_); - publish_mutex_.unlock(); + time_threshold += step; + std::this_thread::sleep_until(time_threshold); } - rclcpp::spin_some(node_); - - loop_rate.sleep(); } - getThreadSupervisor().interrupt(); + else + { + thread_supervisor_.log("Incorrect spin rate"); + } + thread_supervisor_.interrupt(); } @@ -346,7 +350,7 @@ namespace intrometry if (sources_.end() == source_it) { - getThreadSupervisor().log( + thread_supervisor_.log( "Measurement source handler is not assigned, skipping id: ", source.arilesDefaultID()); } else diff --git a/test/common.h b/test/common.h index 55ddd93..5d7b091 100644 --- a/test/common.h +++ b/test/common.h @@ -68,7 +68,9 @@ namespace intrometry_tests rclcpp::QoS(/*history_depth=*/10).best_effort(), [this](const t_Message &msg) { - pal_statistics_msgs::msg::to_block_style_yaml(msg, std::cerr); + std::stringstream output; + pal_statistics_msgs::msg::to_block_style_yaml(msg, output); + std::cerr << output.str(); ++counter_; }); } @@ -83,10 +85,10 @@ namespace intrometry_tests class SubscriberNode : public tut::thread::InheritableSupervisor<> { public: + std::shared_ptr node_; Subscription names_subscription_; Subscription values_subscription_; std::thread spinner_; - std::shared_ptr node_; protected: // cppcheck-suppress virtualCallInConstructor @@ -127,12 +129,14 @@ namespace intrometry_tests void spin() { - rclcpp::WallRate loop_rate(1000); + const std::chrono::nanoseconds step(std::nano::den / 1000); + std::chrono::time_point time_threshold = std::chrono::steady_clock::now(); while (rclcpp::ok() and not isThreadSupervisorInterrupted()) { rclcpp::spin_some(node_); - loop_rate.sleep(); + time_threshold += step; + std::this_thread::sleep_until(time_threshold); } getThreadSupervisor().interrupt(); }