Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rmw_publisher_count_non_local_matched_subscriptions #727

Open
wants to merge 3 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,23 @@ rmw_publisher_count_matched_subscriptions(
publisher, subscription_count);
}

rmw_ret_t
rmw_publisher_count_non_local_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * non_local_subscription_count)
{
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
publisher,
publisher->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(non_local_subscription_count, RMW_RET_INVALID_ARGUMENT);

return rmw_fastrtps_shared_cpp::__rmw_publisher_count_non_local_matched_subscriptions(
publisher, non_local_subscription_count);
}

rmw_ret_t
rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher)
{
Expand Down
17 changes: 17 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,23 @@ rmw_publisher_count_matched_subscriptions(
publisher, subscription_count);
}

rmw_ret_t
rmw_publisher_count_non_local_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * non_local_subscription_count)
{
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
publisher,
publisher->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(non_local_subscription_count, RMW_RET_INVALID_ARGUMENT);

return rmw_fastrtps_shared_cpp::__rmw_publisher_count_non_local_matched_subscriptions(
publisher, non_local_subscription_count);
}

rmw_ret_t
rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,21 @@ class RMWPublisherEvent final : public EventListenerInterface
* user calls rmw_count_subscribers().
*
* \param[in] guid The GUID of the newly-matched subscription to track.
* \param[in] is_local Whether \c guid belongs to the same participant as this publisher.
*/
RMW_FASTRTPS_SHARED_CPP_PUBLIC
void track_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid);
void track_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid, bool is_local);

/// Remove a GUID from the internal set of unique subscriptions matched to this publisher.
/**
* This is so we can provide the RMW layer with an accurate count of matched subscriptions if the
* user calls rmw_count_subscribers().
*
* \param[in] guid The GUID of the newly-unmatched subscription to track.
* \param[in] is_local Whether \c guid belongs to the same participant as this publisher.
*/
RMW_FASTRTPS_SHARED_CPP_PUBLIC
void untrack_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid);
void untrack_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid, bool is_local);

/// Return the number of unique subscriptions matched to this publisher.
/**
Expand All @@ -141,6 +143,13 @@ class RMWPublisherEvent final : public EventListenerInterface
RMW_FASTRTPS_SHARED_CPP_PUBLIC
size_t subscription_count() const;

/// Return the number of unique non-local subscriptions matched to this publisher.
/**
* \return Number of unique non-local subscriptions matched to this publisher.
*/
RMW_FASTRTPS_SHARED_CPP_PUBLIC
size_t non_local_subscription_count() const;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void update_deadline(uint32_t total_count, uint32_t total_count_change);

Expand All @@ -165,6 +174,9 @@ class RMWPublisherEvent final : public EventListenerInterface
std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_
RCPPUTILS_TSA_GUARDED_BY(subscriptions_mutex_);

std::set<eprosima::fastrtps::rtps::GUID_t> non_local_subscriptions_
RCPPUTILS_TSA_GUARDED_BY(subscriptions_mutex_);

mutable std::mutex subscriptions_mutex_;

bool deadline_changed_
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ __rmw_publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * subscription_count);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_publisher_count_non_local_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * non_local_subscription_count);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_publisher_get_actual_qos(
Expand Down
30 changes: 23 additions & 7 deletions rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ void CustomDataWriterListener::on_publication_matched(
eprosima::fastdds::dds::DataWriter * writer,
const eprosima::fastdds::dds::PublicationMatchedStatus & status)
{
(void)writer;
eprosima::fastrtps::rtps::GUID_t subscription_guid =
eprosima::fastrtps::rtps::iHandle2GUID(status.last_subscription_handle);
bool is_local = writer->guid().guidPrefix == subscription_guid.guidPrefix;

if (status.current_count_change == 1) {
publisher_event_->track_unique_subscription(
eprosima::fastrtps::rtps::iHandle2GUID(status.last_subscription_handle));
publisher_event_->track_unique_subscription(subscription_guid, is_local);
} else if (status.current_count_change == -1) {
publisher_event_->untrack_unique_subscription(
eprosima::fastrtps::rtps::iHandle2GUID(status.last_subscription_handle));
publisher_event_->untrack_unique_subscription(subscription_guid, is_local);
} else {
return;
}
Expand Down Expand Up @@ -277,16 +277,26 @@ void RMWPublisherEvent::set_on_new_event_callback(
publisher_info_->data_writer_->set_listener(publisher_info_->data_writer_listener_, status_mask);
}

void RMWPublisherEvent::track_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid)
void RMWPublisherEvent::track_unique_subscription(
eprosima::fastrtps::rtps::GUID_t guid,
bool is_local)
{
std::lock_guard<std::mutex> lock(subscriptions_mutex_);
subscriptions_.insert(guid);
if (!is_local) {
non_local_subscriptions_.insert(guid);
}
}

void RMWPublisherEvent::untrack_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid)
void RMWPublisherEvent::untrack_unique_subscription(
eprosima::fastrtps::rtps::GUID_t guid,
bool is_local)
{
std::lock_guard<std::mutex> lock(subscriptions_mutex_);
subscriptions_.erase(guid);
if (!is_local) {
non_local_subscriptions_.erase(guid);
}
}

size_t RMWPublisherEvent::subscription_count() const
Expand All @@ -295,6 +305,12 @@ size_t RMWPublisherEvent::subscription_count() const
return subscriptions_.size();
}

size_t RMWPublisherEvent::non_local_subscription_count() const
{
std::lock_guard<std::mutex> lock(subscriptions_mutex_);
return non_local_subscriptions_.size();
}

void RMWPublisherEvent::update_deadline(uint32_t total_count, uint32_t total_count_change)
{
rcpputils::unique_lock<std::mutex> lock_mutex(on_new_event_m_);
Expand Down
12 changes: 12 additions & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ __rmw_publisher_count_matched_subscriptions(
return RMW_RET_OK;
}

rmw_ret_t
__rmw_publisher_count_non_local_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * non_local_subscription_count)
{
auto info = static_cast<CustomPublisherInfo *>(publisher->data);

*non_local_subscription_count = info->publisher_event_->non_local_subscription_count();

return RMW_RET_OK;
}

rmw_ret_t
__rmw_publisher_assert_liveliness(
const char * identifier,
Expand Down