Skip to content

Commit

Permalink
Do not drop URI params in subscription handling
Browse files Browse the repository at this point in the history
Ensure that a client subscription "/service?a=b" arrives in the worker
as such, and not as "/service".
  • Loading branch information
frankosterfeld committed Oct 20, 2023
1 parent 3ab5a8f commit bf358db
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 14 deletions.
11 changes: 5 additions & 6 deletions src/majordomo/include/majordomo/Broker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,7 @@ class Broker {
auto [it, inserted] = _subscribedTopics.try_emplace(topic, 0);
it->second++;
if (it->second == 1) {
// auto topicStr = topic.serialized();
auto topicStr = topic.path();
const auto topicStr = topic.serialized();
zmq::invoke(zmq_setsockopt, _subSocket, ZMQ_SUBSCRIBE, topicStr.data(), topicStr.size()).assertSuccess();
}
}
Expand All @@ -525,7 +524,7 @@ class Broker {
if (it != _subscribedTopics.end()) {
it->second--;
if (it->second == 0) {
auto topicStr = topic.serialized();
const auto topicStr = topic.serialized();
zmq::invoke(zmq_setsockopt, _subSocket, ZMQ_UNSUBSCRIBE, topicStr.data(), topicStr.size()).assertSuccess();
_subscribedTopics.erase(it);
}
Expand Down Expand Up @@ -581,7 +580,7 @@ class Broker {
return true;
}
case mdp::Command::Subscribe: {
SubscriptionData subscription(message.serviceName, message.endpoint.str(), {});
const auto subscription = SubscriptionData::fromURIAndServiceName(message.endpoint, message.serviceName);

subscribe(subscription);

Expand All @@ -590,7 +589,7 @@ class Broker {
return true;
}
case mdp::Command::Unsubscribe: {
SubscriptionData subscription(message.serviceName, message.endpoint.str(), {});
const auto subscription = SubscriptionData::fromURIAndServiceName(message.endpoint, message.serviceName);

unsubscribe(subscription);

Expand Down Expand Up @@ -678,7 +677,7 @@ class Broker {
}

void dispatchMessageToMatchingSubscribers(BrokerMessage &&message) {
SubscriptionData subscription(message.serviceName, message.endpoint.str(), {});
const auto subscription = SubscriptionData::fromURIAndServiceName(message.endpoint, message.serviceName);

// TODO avoid clone() for last message sent out
for (const auto &[topic, _] : _subscribedTopics) {
Expand Down
5 changes: 5 additions & 0 deletions src/majordomo/include/majordomo/SubscriptionMatcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ struct SubscriptionData {
return SubscriptionData(""s, uri.path().value_or(""s), uri.queryParamMap());
}

template<typename TURI, typename ServiceString>
static SubscriptionData fromURIAndServiceName(const TURI &uri, ServiceString serviceName) {
return SubscriptionData(std::forward<ServiceString>(serviceName), uri.path().value_or(""s), uri.queryParamMap());
}

std::string serialized() const {
return URI<RELAXED>::factory().path(std::string(_path)).setQuery(_params).build().str();
}
Expand Down
11 changes: 5 additions & 6 deletions src/majordomo/include/majordomo/Worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ class BasicWorker {
return true;
}

const auto topicString = data.substr(1);
// const auto topic = URI<RELAXED>(std::string(topicString));
const SubscriptionData subscription(serviceName.data(), topicString, {});
const auto topicString = data.substr(1);
const auto topicUrl = URI<RELAXED>(std::string(topicString));
const auto subscription = SubscriptionData::fromURIAndServiceName(topicUrl, serviceName.data());

// this assumes that the broker does the subscribe/unsubscribe counting
// for multiple clients and sends us a single sub/unsub for each topic
Expand All @@ -274,9 +274,8 @@ class BasicWorker {

bool receiveNotificationMessage() {
if (auto message = zmq::receive<mdp::MessageFormat::WithoutSourceId>(_notifyListenerSocket)) {
// const auto topic = URI<RELAXED>(std::string(message->topic()));
const SubscriptionData currentSubscription(message->serviceName, message->endpoint.str(), {});
const auto matchesNotificationTopic = [this, &currentSubscription](const auto &activeSubscription) {
const auto currentSubscription = SubscriptionData::fromURIAndServiceName(message->endpoint, message->serviceName);
const auto matchesNotificationTopic = [this, &currentSubscription](const auto &activeSubscription) {
return _subscriptionMatcher(currentSubscription, activeSubscription);
};

Expand Down
6 changes: 4 additions & 2 deletions src/majordomo/test/majordomoworker_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ TEST_CASE("MajordomoWorker test using raw messages", "[majordomo][majordomoworke
BrokerMessageNode subClient(broker.context, ZMQ_SUB);
REQUIRE(client.connect(opencmw::majordomo::INTERNAL_ADDRESS_BROKER));
REQUIRE(subClient.connect(opencmw::majordomo::INTERNAL_ADDRESS_PUBLISHER));
const char *testSubscription = "/newAddress?ctx=FAIR.SELECTOR.C%3D1";
constexpr auto testSubscription = std::string_view("/newAddress?ctx=FAIR.SELECTOR.C%3D1");
REQUIRE(subClient.subscribe(testSubscription));

{
Expand Down Expand Up @@ -311,7 +311,9 @@ TEST_CASE("MajordomoWorker test using raw messages", "[majordomo][majordomoworke
};
REQUIRE(worker.notify("/newAddress", TestContext{ .ctx = opencmw::TimingCtx(1), .contentType = opencmw::MIME::JSON }, entry));
REQUIRE(worker.activeSubscriptions().size() == 1);
REQUIRE(std::string_view(testSubscription).starts_with(worker.activeSubscriptions().begin()->serialized()));
REQUIRE(worker.activeSubscriptions().begin()->service() == "addressbook");
REQUIRE(worker.activeSubscriptions().begin()->path() == "/newAddress");
REQUIRE(worker.activeSubscriptions().begin()->params() == SubscriptionData::map{{"ctx", "FAIR.SELECTOR.C=1"}});
}

{
Expand Down

0 comments on commit bf358db

Please sign in to comment.