Skip to content

Commit

Permalink
Unify subscription topic handling and fix it for REST (#328)
Browse files Browse the repository at this point in the history
Do not lose params when subscribing to a topic via REST.

Bump cpp-httplib to newest version, as it fixes an encoding error
with query parameters encoded in query parameters (SubscriptionContext)
when used with redirects.

To unify subscription handling (different client/servers were making
slightly different assumptions), the following is now implemented (to
be revised later):

 - Service names are a simple alpha-numerical strings, valid are e.g.
   "someService" and "some.service", but not "Some/Service" not
   "/SomeService".
 - The topic can be of the form "topic", "?param1=..&param2",
   "topic?param1=..&param2".
 - When serialized to a ZMQ topic, i.e. the string-based PUB/SUB topic
   matching,  "service/topic?params" is used. service is *not* optional
   right now. The params are alphabetically sorted by their keys so
   that a subscription "?a&b" and "?b&a" match the same notifications.
 - Valid REST URIs are e.g. "http://localhost:8080/service/topic?a&b"
   or http://localhost:8080/service?a&b", i.e. the service is a
   mandatory part of the URI. mds:// and mdp:// URIs are analogous.
 - On the ZMQ message level, the "serviceName" in above examples would
   be "service" and the "endpoint" would be "topic?a&b" (here param
   order is not enforced).
  • Loading branch information
frankosterfeld committed Nov 29, 2023
1 parent c19531d commit 914d77d
Show file tree
Hide file tree
Showing 20 changed files with 362 additions and 209 deletions.
2 changes: 1 addition & 1 deletion cmake/DependenciesNative.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ option(WITH_PERF_TOOL "Build with perf-tools" OFF)
FetchContent_Declare(
cpp-httplib
GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git
GIT_TAG v0.11.2 # latest v0.11.2
GIT_TAG v0.14.1 # latest v0.14.1
)

# zlib: optional httplib dependency
Expand Down
4 changes: 1 addition & 3 deletions concepts/client/RestSubscription_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ class EventDispatcher {
std::unique_lock lk(_mutex);
int id = std::atomic_load_explicit(&_id, std::memory_order_acquire);
_condition.wait(lk, [id, this] { return _cid == id; });
if (sink.is_writable()) {
sink.write(_message.data(), _message.size());
}
sink.write(_message.data(), _message.size());
}

void send_event(const std::string_view &message) {
Expand Down
6 changes: 3 additions & 3 deletions concepts/majordomo/FilterSubscription_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ int main() {
std::cerr << "Could not bind to broker address" << std::endl;
return 1;
}
AcquisitionWorker<"/DeviceName/Acquisition"> acquisitionWorker(broker);
AcquisitionWorker<"Acquisition"> acquisitionWorker(broker);

// start broker and worker as background processes
std::jthread brokerThread([&broker] { broker.run(); });
Expand All @@ -153,11 +153,11 @@ int main() {

std::atomic<int> receivedA{ 0 };
std::atomic<int> receivedAB{ 0 };
client.subscribe(URI("mds://127.0.0.1:12345/DeviceName/Acquisition?signalFilter=A"), [&receivedA](const opencmw::mdp::Message &update) {
client.subscribe(URI("mds://127.0.0.1:12345/Acquisition?signalFilter=A"), [&receivedA](const opencmw::mdp::Message &update) {
fmt::print("Client('A') received message from service '{}' for endpoint '{}'\n", update.serviceName, update.endpoint.str());
receivedA++;
});
client.subscribe(URI("mds://127.0.0.1:12345/DeviceName/Acquisition?signalFilter=A,B"), [&receivedAB](const opencmw::mdp::Message &update) {
client.subscribe(URI("mds://127.0.0.1:12345/Acquisition?signalFilter=A%2CB"), [&receivedAB](const opencmw::mdp::Message &update) {
fmt::print("Client('A,B') received message from service '{}' for endpoint '{}'\n", update.serviceName, update.endpoint.str());
receivedAB++;
});
Expand Down
14 changes: 5 additions & 9 deletions concepts/majordomo/helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,26 +281,22 @@ class TestNode {
return zmq::invoke(zmq_bind, _socket, mdp::toZeroMQEndpoint(address).data()).isValid();
}

bool connect(const opencmw::URI<opencmw::STRICT> &address, const mdp::SubscriptionTopic &subscription = {}) {
bool connect(const opencmw::URI<opencmw::STRICT> &address, const std::string_view subscriptionTopic = {}) {
auto result = zmq::invoke(zmq_connect, _socket, mdp::toZeroMQEndpoint(address).data());
if (!result) return false;

if (!subscription.empty()) {
return subscribe(subscription);
if (!subscriptionTopic.empty()) {
return subscribe(subscriptionTopic);
}

return true;
}

bool subscribe(const mdp::SubscriptionTopic &subscription) {
const auto topic = subscription.toZmqTopic();
assert(!topic.empty());
bool subscribe(std::string_view topic) {
return zmq::invoke(zmq_setsockopt, _socket, ZMQ_SUBSCRIBE, topic.data(), topic.size()).isValid();
}

bool unsubscribe(const mdp::SubscriptionTopic &subscription) {
const auto topic = subscription.toZmqTopic();
assert(!topic.empty());
bool unsubscribe(const std::string_view topic) {
return zmq::invoke(zmq_setsockopt, _socket, ZMQ_UNSUBSCRIBE, topic.data(), topic.size()).isValid();
}

Expand Down
45 changes: 32 additions & 13 deletions src/client/include/Client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,26 +96,26 @@ class Client : public MDClientBase {

void get(const URI<STRICT> &uri, std::string_view req_id) override {
const auto &con = findConnection(uri);
auto message = createRequestTemplate(mdp::Command::Get, uri.relativeRefNoFragment().value(), req_id);
auto message = createRequestTemplate(mdp::Command::Get, uri, req_id);
zmq::send(std::move(message), con._socket).assertSuccess();
}

void set(const URI<STRICT> &uri, std::string_view req_id, const std::span<const std::byte> &request) override {
const auto &con = findConnection(uri);
auto message = createRequestTemplate(mdp::Command::Set, uri.relativeRefNoFragment().value(), req_id);
auto message = createRequestTemplate(mdp::Command::Set, uri, req_id);
message.data = IoBuffer(reinterpret_cast<const char *>(request.data()), request.size());
zmq::send(std::move(message), con._socket).assertSuccess();
}

void subscribe(const URI<STRICT> &uri, std::string_view req_id) override {
const auto &con = findConnection(uri);
auto message = createRequestTemplate(mdp::Command::Subscribe, uri.relativeRefNoFragment().value(), req_id);
auto message = createRequestTemplate(mdp::Command::Subscribe, uri, req_id);
zmq::send(std::move(message), con._socket).assertSuccess();
}

void unsubscribe(const URI<STRICT> &uri, std::string_view req_id) override {
const auto &con = findConnection(uri);
auto message = createRequestTemplate(mdp::Command::Unsubscribe, uri.relativeRefNoFragment().value(), req_id);
auto message = createRequestTemplate(mdp::Command::Unsubscribe, uri, req_id);
zmq::send(std::move(message), con._socket).assertSuccess();
}

Expand All @@ -134,7 +134,7 @@ class Client : public MDClientBase {

static bool handleMessage(mdp::Message &message) {
if (message.command == mdp::Command::Notify || message.command == mdp::Command::Final) {
message.arrivalTime = std::chrono::system_clock::now();
message.arrivalTime = std::chrono::system_clock::now();
const auto requestId_sv = message.clientRequestID.asString();
if (auto result = std::from_chars(requestId_sv.data(), requestId_sv.data() + requestId_sv.size(), message.id); result.ec == std::errc::invalid_argument || result.ec == std::errc::result_out_of_range) {
message.id = 0;
Expand Down Expand Up @@ -184,11 +184,30 @@ class Client : public MDClientBase {
}

private:
static mdp::Message createRequestTemplate(mdp::Command command, std::string_view serviceName, std::string_view req_id) {
static mdp::Message createRequestTemplate(mdp::Command command, const URI<STRICT> &uri, std::string_view req_id) {
const auto ref = uri.relativeRefNoFragment();
const auto &[serviceName, endpoint] = [&ref] {
if (!ref) {
return std::pair{ std::string_view{}, URI<>{ "" } };
}
auto view = std::string_view{ *ref };
while (view.starts_with("/")) {
view.remove_prefix(1);
}
const auto serviceEnd = view.find_first_of("/?");
if (serviceEnd == std::string_view::npos) {
return std::pair{ view, URI<>{ "" } };
}
auto service = view.substr(0, serviceEnd);
view.remove_prefix(service.size());
return std::pair{ service, URI<>{ std::string(view) } };
}();

mdp::Message req;
req.protocolName = mdp::clientProtocol;
req.command = command;
req.serviceName = std::string(serviceName);
req.endpoint = std::move(endpoint);
req.clientRequestID = IoBuffer(req_id.data(), req_id.size());

return req;
Expand Down Expand Up @@ -247,16 +266,16 @@ class SubscriptionClient : public MDClientBase {

void subscribe(const URI<STRICT> &uri, std::string_view /*reqId*/) override {
auto &con = findConnection(uri);
const auto serviceName = mdp::SubscriptionTopic::fromURI(uri).toZmqTopic();
assert(!serviceName.empty());
opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_SUBSCRIBE, serviceName.data(), serviceName.size()).assertSuccess();
const auto topic = mdp::SubscriptionTopic::fromServiceAndEndpointString(uri.relativeRefNoFragment().value_or("")).toZmqTopic();
assert(!topic.empty());
opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_SUBSCRIBE, topic.data(), topic.size()).assertSuccess();
}

void unsubscribe(const URI<STRICT> &uri, std::string_view /*reqId*/) override {
auto &con = findConnection(uri);
const auto serviceName = mdp::SubscriptionTopic::fromURI(uri).toZmqTopic();
assert(!serviceName.empty());
opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_UNSUBSCRIBE, serviceName.data(), serviceName.size()).assertSuccess();
const auto topic = mdp::SubscriptionTopic::fromServiceAndEndpointString(uri.relativeRefNoFragment().value_or("")).toZmqTopic();
assert(!topic.empty());
opencmw::zmq::invoke(zmq_setsockopt, con._socket, ZMQ_UNSUBSCRIBE, topic.data(), topic.size()).assertSuccess();
}

bool disconnect(detail::Connection &con) {
Expand Down Expand Up @@ -392,7 +411,7 @@ class MDClientCtx : public ClientBase {
_requests.insert({ req_id, Request{ .uri = cmd.endpoint, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } });
} else if (cmd.command == mdp::Command::Subscribe) {
req_id = _request_id++;
_subscriptions.insert({ mdp::SubscriptionTopic::fromURI(cmd.endpoint).toZmqTopic(), Subscription{ .uri = cmd.endpoint, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } });
_subscriptions.insert({ mdp::SubscriptionTopic::fromEndpoint(cmd.endpoint).toZmqTopic(), Subscription{ .uri = cmd.endpoint, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } });
} else if (cmd.command == mdp::Command::Unsubscribe) {
_requests.erase(0); // todo: lookup correct subscription
}
Expand Down
7 changes: 4 additions & 3 deletions src/client/include/MockServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,16 @@ class MockServer {
return true;
}

void notify(const mdp::SubscriptionTopic &topic, std::string_view value) {
void notify(std::string_view topic, std::string_view value) {
static const auto brokerName = "";
static const auto serviceName = "a.service";
const auto subscription = mdp::SubscriptionTopic::fromServiceAndEndpoint(serviceName, URI<>(std::string(topic)));
mdp::BasicMessage<mdp::MessageFormat::WithSourceId> notify;
notify.protocolName = mdp::clientProtocol;
notify.command = mdp::Command::Final;
notify.serviceName = serviceName;
notify.endpoint = topic.toEndpoint();
notify.sourceId = topic.toZmqTopic();
notify.endpoint = subscription.toEndpoint();
notify.sourceId = subscription.toZmqTopic();
notify.clientRequestID = IoBuffer(brokerName);
notify.data = IoBuffer(value.data(), value.size());
zmq::send(std::move(notify), _pubSocket.value()).assertSuccess();
Expand Down
8 changes: 5 additions & 3 deletions src/client/include/RestClientNative.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class RestClient : public ClientBase {
return {};
}

const auto httpError = httplib::detail::status_message(result->status);
const auto httpError = httplib::status_message(result->status);

Check warning on line 232 in src/client/include/RestClientNative.hpp

View check run for this annotation

Codecov / codecov/patch

src/client/include/RestClientNative.hpp#L232

Added line #L232 was not covered by tests
return fmt::format("{} - {}:{}", result->status, httpError, errorMsgExt.empty() ? result->body : errorMsgExt);
}();

Expand Down Expand Up @@ -266,6 +266,7 @@ class RestClient : public ClientBase {
auto endpoint = endpointBuilder.build();

auto callback = [&cmd, &preferredHeader, &endpoint]<typename ClientType>(ClientType &client) {
client.set_follow_location(true);
client.set_read_timeout(cmd.timeout); // default keep-alive value
if (const httplib::Result &result = client.Get(endpoint.relativeRef()->data(), preferredHeader)) {
returnMdpMessage(cmd, result);
Expand Down Expand Up @@ -316,9 +317,10 @@ class RestClient : public ClientBase {
) {
auto it = _subscription1.find(cmd.endpoint);
if (it == _subscription1.end()) {
auto &client = _subscription1.try_emplace(cmd.endpoint, httplib::Client(cmd.endpoint.hostName().value(), cmd.endpoint.port().value())).first->second;
auto &client = _subscription1.try_emplace(cmd.endpoint, httplib::Client(cmd.endpoint.hostName().value(), cmd.endpoint.port().value())).first->second;
client.set_follow_location(true);

auto longPollingEndpoint = [&] {
auto longPollingEndpoint = [&] {

Check warning on line 323 in src/client/include/RestClientNative.hpp

View check run for this annotation

Codecov / codecov/patch

src/client/include/RestClientNative.hpp#L323

Added line #L323 was not covered by tests
if (!cmd.endpoint.queryParamMap().contains(LONG_POLLING_IDX_TAG)) {
return URI<>::factory(cmd.endpoint).addQueryParameter(LONG_POLLING_IDX_TAG, "Next").build();
} else {
Expand Down
5 changes: 3 additions & 2 deletions src/client/test/ClientPublisher_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ TEST_CASE("Basic subscription test", "[ClientContext]") {
ClientContext clientContext{ std::move(clients) };
std::this_thread::sleep_for(100ms);
// subscription
auto endpoint = URI<STRICT>::factory(URI<STRICT>(server.addressSub())).scheme("mds").path("/a.topic").addQueryParameter("C", "2").build();
auto endpoint = URI<STRICT>::factory(URI<STRICT>(server.addressSub())).scheme("mds").path("/a.service/a.topic").addQueryParameter("C", "2").build();
std::atomic<int> received{ 0 };
clientContext.subscribe(endpoint, [&received, &payload](const Message &update) {
if (update.data.size() == payload.size()) {
Expand All @@ -84,7 +84,8 @@ TEST_CASE("Basic subscription test", "[ClientContext]") {
std::this_thread::sleep_for(100ms); // allow for the subscription request to be processed
// send notifications
for (int i = 0; i < 100; i++) {
server.notify(mdp::SubscriptionTopic::fromURI(endpoint), payload);
server.notify("/a.topic?C=2", payload); // received by client
server.notify("/a.topic?C=3", payload); // not received by client
}
std::this_thread::sleep_for(10ms); // allow for all the notifications to reach the client
fmt::print("received notifications {}\n", received.load());
Expand Down
6 changes: 3 additions & 3 deletions src/client/test/CmwClient_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ TEST_CASE("Basic Client Subscription Test", "[Client]") {
subscriptionClient.connect(uri);
subscriptionClient.housekeeping(std::chrono::system_clock::now());

const auto endpoint = URI<uri_check::STRICT>::UriFactory(uri).path("a.topic").build();
const auto endpoint = URI<uri_check::STRICT>::UriFactory(uri).path("a.service/a.topic").build();

std::string reqId = "2";
subscriptionClient.subscribe(endpoint, reqId);
std::this_thread::sleep_for(50ms); // allow for subscription to be established

server.notify(SubscriptionTopic("a.topic?ctx=test_ctx1"), "101");
server.notify(SubscriptionTopic("a.topic?ctx=test_ctx2"), "102");
server.notify("/a.topic?ctx=test_ctx1", "101");
server.notify("/a.topic?ctx=test_ctx2", "102");

Message resultOfNotify1;
REQUIRE(subscriptionClient.receive(resultOfNotify1));
Expand Down
8 changes: 4 additions & 4 deletions src/client/test/MockServerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ TEST_CASE("MockServer Subscription Test", "[mock-server][lambda_handler]") {
BrokerMessageNode client(context, ZMQ_SUB);
REQUIRE(client.connect(opencmw::URI<>(server.addressSub())));

client.subscribe(mdp::SubscriptionTopic("a.topic"));
client.subscribe("/a.service/a.topic");
std::this_thread::sleep_for(10ms); // wait for the subscription to be set-up. todo: investigate more clever way

server.notify(mdp::SubscriptionTopic("a.topic"), "100");
server.notify(mdp::SubscriptionTopic("a.topic"), "23");
server.notify("/a.topic", "100");
server.notify("/a.topic", "23");

{
auto reply = client.tryReadOne();
Expand All @@ -85,7 +85,7 @@ TEST_CASE("MockServer Subscription Test", "[mock-server][lambda_handler]") {
REQUIRE(reply->endpoint.str() == "/a.topic");
}

server.notify(mdp::SubscriptionTopic("a.topic"), "10");
server.notify("/a.topic", "10");
{
auto reply = client.tryReadOne();
fmt::print("{}\n", reply.has_value());
Expand Down
4 changes: 1 addition & 3 deletions src/client/test/RestClient_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,7 @@ class EventDispatcher {
std::unique_lock lk(_mutex);
int id = _id;
_condition.wait(lk, [&id, this] { return _cid == id; });
if (sink.is_writable()) {
sink.write(_message.data(), _message.size());
}
sink.write(_message.data(), _message.size());
}

void send_event(const std::string_view &message) {
Expand Down
Loading

0 comments on commit 914d77d

Please sign in to comment.