diff --git a/src/majordomo/include/majordomo/Broker.hpp b/src/majordomo/include/majordomo/Broker.hpp index 4b6e9596..a1f6ba35 100644 --- a/src/majordomo/include/majordomo/Broker.hpp +++ b/src/majordomo/include/majordomo/Broker.hpp @@ -514,8 +514,7 @@ class Broker { auto [it, inserted] = _subscribedTopics.try_emplace(topic, 0); it->second++; if (it->second == 1) { - // auto topicStr = topic.serialized(); - auto topicStr = topic.path(); + const auto topicStr = topic.serialized(); zmq::invoke(zmq_setsockopt, _subSocket, ZMQ_SUBSCRIBE, topicStr.data(), topicStr.size()).assertSuccess(); } } @@ -525,7 +524,7 @@ class Broker { if (it != _subscribedTopics.end()) { it->second--; if (it->second == 0) { - auto topicStr = topic.serialized(); + const auto topicStr = topic.serialized(); zmq::invoke(zmq_setsockopt, _subSocket, ZMQ_UNSUBSCRIBE, topicStr.data(), topicStr.size()).assertSuccess(); _subscribedTopics.erase(it); } @@ -581,7 +580,7 @@ class Broker { return true; } case mdp::Command::Subscribe: { - SubscriptionData subscription(message.serviceName, message.endpoint.str(), {}); + const auto subscription = SubscriptionData::fromURIAndServiceName(message.endpoint, message.serviceName); subscribe(subscription); @@ -590,7 +589,7 @@ class Broker { return true; } case mdp::Command::Unsubscribe: { - SubscriptionData subscription(message.serviceName, message.endpoint.str(), {}); + const auto subscription = SubscriptionData::fromURIAndServiceName(message.endpoint, message.serviceName); unsubscribe(subscription); @@ -678,7 +677,7 @@ class Broker { } void dispatchMessageToMatchingSubscribers(BrokerMessage &&message) { - SubscriptionData subscription(message.serviceName, message.endpoint.str(), {}); + const auto subscription = SubscriptionData::fromURIAndServiceName(message.endpoint, message.serviceName); // TODO avoid clone() for last message sent out for (const auto &[topic, _] : _subscribedTopics) { diff --git a/src/majordomo/include/majordomo/SubscriptionMatcher.hpp b/src/majordomo/include/majordomo/SubscriptionMatcher.hpp index 243f9089..54edc674 100644 --- a/src/majordomo/include/majordomo/SubscriptionMatcher.hpp +++ b/src/majordomo/include/majordomo/SubscriptionMatcher.hpp @@ -91,6 +91,11 @@ struct SubscriptionData { return SubscriptionData(""s, uri.path().value_or(""s), uri.queryParamMap()); } + template + static SubscriptionData fromURIAndServiceName(const TURI &uri, std::string_view serviceName) { + return SubscriptionData(serviceName, uri.path().value_or(""s), uri.queryParamMap()); + } + std::string serialized() const { return URI::factory().path(std::string(_path)).setQuery(_params).build().str(); } diff --git a/src/majordomo/include/majordomo/Worker.hpp b/src/majordomo/include/majordomo/Worker.hpp index 970feea5..6f81a7df 100644 --- a/src/majordomo/include/majordomo/Worker.hpp +++ b/src/majordomo/include/majordomo/Worker.hpp @@ -255,9 +255,9 @@ class BasicWorker { return true; } - const auto topicString = data.substr(1); - // const auto topic = URI(std::string(topicString)); - const SubscriptionData subscription(serviceName.data(), topicString, {}); + const auto topicString = data.substr(1); + const auto topicUrl = URI(std::string(topicString)); + const auto subscription = SubscriptionData::fromURIAndServiceName(topicUrl, serviceName.data()); // this assumes that the broker does the subscribe/unsubscribe counting // for multiple clients and sends us a single sub/unsub for each topic @@ -274,9 +274,8 @@ class BasicWorker { bool receiveNotificationMessage() { if (auto message = zmq::receive(_notifyListenerSocket)) { - // const auto topic = URI(std::string(message->topic())); - const SubscriptionData currentSubscription(message->serviceName, message->endpoint.str(), {}); - const auto matchesNotificationTopic = [this, ¤tSubscription](const auto &activeSubscription) { + const auto currentSubscription = SubscriptionData::fromURIAndServiceName(message->endpoint, message->serviceName); + const auto matchesNotificationTopic = [this, ¤tSubscription](const auto &activeSubscription) { return _subscriptionMatcher(currentSubscription, activeSubscription); }; diff --git a/src/majordomo/test/majordomoworker_tests.cpp b/src/majordomo/test/majordomoworker_tests.cpp index a217b57a..1beb2060 100644 --- a/src/majordomo/test/majordomoworker_tests.cpp +++ b/src/majordomo/test/majordomoworker_tests.cpp @@ -184,7 +184,7 @@ TEST_CASE("MajordomoWorker test using raw messages", "[majordomo][majordomoworke BrokerMessageNode subClient(broker.context, ZMQ_SUB); REQUIRE(client.connect(opencmw::majordomo::INTERNAL_ADDRESS_BROKER)); REQUIRE(subClient.connect(opencmw::majordomo::INTERNAL_ADDRESS_PUBLISHER)); - const char *testSubscription = "/newAddress?ctx=FAIR.SELECTOR.C%3D1"; + constexpr auto testSubscription = std::string_view("/newAddress?ctx=FAIR.SELECTOR.C%3D1"); REQUIRE(subClient.subscribe(testSubscription)); { @@ -311,7 +311,9 @@ TEST_CASE("MajordomoWorker test using raw messages", "[majordomo][majordomoworke }; REQUIRE(worker.notify("/newAddress", TestContext{ .ctx = opencmw::TimingCtx(1), .contentType = opencmw::MIME::JSON }, entry)); REQUIRE(worker.activeSubscriptions().size() == 1); - REQUIRE(std::string_view(testSubscription).starts_with(worker.activeSubscriptions().begin()->serialized())); + REQUIRE(worker.activeSubscriptions().begin()->service() == "addressbook"); + REQUIRE(worker.activeSubscriptions().begin()->path() == "/newAddress"); + REQUIRE(worker.activeSubscriptions().begin()->params() == SubscriptionData::map{{"ctx", "FAIR.SELECTOR.C=1"}}); } {