diff --git a/FlashMQTests/CMakeLists.txt b/FlashMQTests/CMakeLists.txt index faf04ce..20c6ab6 100644 --- a/FlashMQTests/CMakeLists.txt +++ b/FlashMQTests/CMakeLists.txt @@ -65,6 +65,7 @@ add_executable(flashmq-tests testinitializer.h testinitializer.cpp mainappasfork.h mainappasfork.cpp subscriptionidtests.cpp + bridgeprefixtests.cpp ) diff --git a/FlashMQTests/bridgeprefixtests.cpp b/FlashMQTests/bridgeprefixtests.cpp new file mode 100644 index 0000000..3353456 --- /dev/null +++ b/FlashMQTests/bridgeprefixtests.cpp @@ -0,0 +1,478 @@ +#include "maintests.h" +#include "conffiletemp.h" +#include "flashmqtestclient.h" +#include "mainappasfork.h" +#include "testhelpers.h" + +void waitForMessagesOverBridge(FlashMQTestClient &one, FlashMQTestClient &two, const std::string &topic) +{ + int wait_i = 0; + for(wait_i = 0; wait_i < 10; wait_i++) + { + one.publish(topic, "connectiontest", 2); + + try + { + two.waitForMessageCount(1); + break; + } + catch (std::exception &ex) { } + } + + if (wait_i >= 10) + throw std::runtime_error("Timeout waiting for messages over bridge"); +} + +void MainTests::forkingTestBridgeWithLocalAndRemotePrefix() +{ + for (const std::string &protocol_version : {"mqtt5", "mqtt3.1"}) + { + cleanup(); + + ConfFileTemp confFile; + const std::string config = R"( +allow_anonymous true +log_debug false + +bridge { + address ::1 + port 21883 + subscribe ManglerRemote/shoes 2 + publish ManglerLocal/boots 2 + clientid_prefix Mangler + local_prefix ManglerLocal/ + remote_prefix ManglerRemote/ + protocol_version %s +} +listen { + protocol mqtt + port 51183 +})"; + confFile.writeLine(formatString(config, protocol_version.c_str())); + confFile.closeFile(); + + std::vector args {"--config-file", confFile.getFilePath()}; + + MainAppAsFork app(args); + app.start(); + app.waitForStarted(51183); + + // We will consider the test server initialized here the 'remote' broker. + init(); + + FlashMQTestClient clientToLocalWithBridge; + clientToLocalWithBridge.start(); + + FlashMQTestClient clientToRemote; + clientToRemote.start(); + + clientToLocalWithBridge.connectClient(ProtocolVersion::Mqtt31, 51183); + clientToLocalWithBridge.subscribe("#", 1); + + clientToRemote.connectClient(ProtocolVersion::Mqtt5); + clientToRemote.subscribe("#", 1); + + waitForMessagesOverBridge(clientToLocalWithBridge, clientToRemote, "ManglerLocal/boots"); + + clientToLocalWithBridge.clearReceivedLists(); + clientToRemote.clearReceivedLists(); + + { + clientToLocalWithBridge.publish("ManglerLocal/boots", "asdf", 2); + clientToRemote.waitForMessageCount(1); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getTopic(), std::string("ManglerRemote/boots")); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getQos(), 1); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getPayloadView(), "asdf"); + } + + clientToLocalWithBridge.clearReceivedLists(); + clientToRemote.clearReceivedLists(); + + { + clientToRemote.publish("ManglerRemote/shoes", "are made for walking", 2); + clientToLocalWithBridge.waitForMessageCount(1); + FMQ_COMPARE(clientToLocalWithBridge.receivedPublishes.front().getTopic(), std::string("ManglerLocal/shoes")); + FMQ_COMPARE(clientToLocalWithBridge.receivedPublishes.front().getQos(), 1); + FMQ_COMPARE(clientToLocalWithBridge.receivedPublishes.front().getPayloadView(), "are made for walking"); + } + } +} + +void MainTests::forkingTestBridgeWithOnlyRemotePrefix() +{ + for (const std::string &protocol_version : {"mqtt5", "mqtt3.1"}) + { + cleanup(); + + ConfFileTemp confFile; + const std::string config = R"( +allow_anonymous true +log_debug false + +bridge { + address ::1 + port 21883 + subscribe ManglerRemote/shoes 2 + publish boots 2 + clientid_prefix Mangler + remote_prefix ManglerRemote/ + protocol_version %s +} +listen { + protocol mqtt + port 51183 +})"; + confFile.writeLine(formatString(config, protocol_version.c_str())); + confFile.closeFile(); + + std::vector args {"--config-file", confFile.getFilePath()}; + + MainAppAsFork app(args); + app.start(); + app.waitForStarted(51183); + + // We will consider the test server initialized here the 'remote' broker. + init(); + + FlashMQTestClient clientToLocalWithBridge; + clientToLocalWithBridge.start(); + + FlashMQTestClient clientToRemote; + clientToRemote.start(); + + FlashMQTestClient receiverToLocal; + receiverToLocal.start(); + + FlashMQTestClient receiverToLocal2; + receiverToLocal2.start(); + + receiverToLocal2.connectClient(ProtocolVersion::Mqtt31, 51183); + receiverToLocal2.subscribe("#", 1); + + clientToLocalWithBridge.connectClient(ProtocolVersion::Mqtt31, 51183); + clientToLocalWithBridge.subscribe("#", 1); + + clientToRemote.connectClient(ProtocolVersion::Mqtt5); + clientToRemote.subscribe("#", 1); + + waitForMessagesOverBridge(clientToLocalWithBridge, clientToRemote, "boots"); + + clientToLocalWithBridge.clearReceivedLists(); + clientToRemote.clearReceivedLists(); + + receiverToLocal.connectClient(ProtocolVersion::Mqtt31, 51183); + receiverToLocal.subscribe("#", 1); + + { + clientToLocalWithBridge.publish("boots", "asdf", 2); + clientToRemote.waitForMessageCount(1); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getTopic(), std::string("ManglerRemote/boots")); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getQos(), 1); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getPayloadView(), "asdf"); + } + + // Make sure other clients connected to the server with prefixes defined get normal topics. I.e. test packet cache. + { + receiverToLocal.waitForMessageCount(1); + FMQ_COMPARE(receiverToLocal.receivedPublishes.front().getTopic(), std::string("boots")); + FMQ_COMPARE(receiverToLocal.receivedPublishes.front().getQos(), 1); + FMQ_COMPARE(receiverToLocal.receivedPublishes.front().getPayloadView(), "asdf"); + + receiverToLocal2.waitForMessageCount(1); + FMQ_COMPARE(receiverToLocal2.receivedPublishes.back().getTopic(), std::string("boots")); + FMQ_COMPARE(receiverToLocal2.receivedPublishes.back().getQos(), 1); + FMQ_COMPARE(receiverToLocal2.receivedPublishes.back().getPayloadView(), "asdf"); + } + + clientToLocalWithBridge.clearReceivedLists(); + clientToRemote.clearReceivedLists(); + + { + clientToRemote.publish("ManglerRemote/shoes", "are made for walking", 2); + clientToLocalWithBridge.waitForMessageCount(1); + FMQ_COMPARE(clientToLocalWithBridge.receivedPublishes.front().getTopic(), std::string("shoes")); + FMQ_COMPARE(clientToLocalWithBridge.receivedPublishes.front().getQos(), 1); + FMQ_COMPARE(clientToLocalWithBridge.receivedPublishes.front().getPayloadView(), "are made for walking"); + } + + + } +} + +void MainTests::forkingTestBridgeWithOnlyLocalPrefix() +{ + for (const std::string &protocol_version : {"mqtt5", "mqtt3.1"}) + { + cleanup(); + + ConfFileTemp confFile; + const std::string config = R"( +allow_anonymous true +log_debug false + +bridge { + address ::1 + port 21883 + subscribe shoes 2 + publish ManglerLocal/boots 2 + clientid_prefix Mangler + local_prefix ManglerLocal/ + protocol_version %s +} +listen { + protocol mqtt + port 51183 +})"; + confFile.writeLine(formatString(config, protocol_version.c_str())); + confFile.closeFile(); + + std::vector args {"--config-file", confFile.getFilePath()}; + + MainAppAsFork app(args); + app.start(); + app.waitForStarted(51183); + + // We will consider the test server initialized here the 'remote' broker. + init(); + + FlashMQTestClient clientToLocalWithBridge; + clientToLocalWithBridge.start(); + + FlashMQTestClient clientToRemote; + clientToRemote.start(); + + FlashMQTestClient senderToLocal; + senderToLocal.start(); + + clientToLocalWithBridge.connectClient(ProtocolVersion::Mqtt31, 51183); + clientToLocalWithBridge.subscribe("#", 1); + + clientToRemote.connectClient(ProtocolVersion::Mqtt5); + clientToRemote.subscribe("#", 1); + + waitForMessagesOverBridge(clientToLocalWithBridge, clientToRemote, "ManglerLocal/boots"); + + clientToLocalWithBridge.clearReceivedLists(); + clientToRemote.clearReceivedLists(); + + { + clientToLocalWithBridge.publish("ManglerLocal/boots", "asdf", 2); + clientToRemote.waitForMessageCount(1); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getTopic(), std::string("boots")); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getQos(), 1); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getPayloadView(), "asdf"); + } + + clientToLocalWithBridge.clearReceivedLists(); + clientToRemote.clearReceivedLists(); + + { + clientToRemote.publish("shoes", "are made for walking", 2); + clientToLocalWithBridge.waitForMessageCount(1); + FMQ_COMPARE(clientToLocalWithBridge.receivedPublishes.front().getTopic(), std::string("ManglerLocal/shoes")); + FMQ_COMPARE(clientToLocalWithBridge.receivedPublishes.front().getQos(), 1); + FMQ_COMPARE(clientToLocalWithBridge.receivedPublishes.front().getPayloadView(), "are made for walking"); + } + + // Make sure other clients connected to the server with prefixes defined get normal topics. + { + clientToLocalWithBridge.clearReceivedLists(); + senderToLocal.connectClient(ProtocolVersion::Mqtt31, 51183); + senderToLocal.publish("panic", "attack", 0); + clientToLocalWithBridge.waitForMessageCount(1); + FMQ_COMPARE(clientToLocalWithBridge.receivedPublishes.front().getTopic(), std::string("panic")); + FMQ_COMPARE(clientToLocalWithBridge.receivedPublishes.front().getQos(), 0); + FMQ_COMPARE(clientToLocalWithBridge.receivedPublishes.front().getPayloadView(), "attack"); + } + } +} + +void MainTests::forkingTestBridgeWithLocalAndRemotePrefixRetained() +{ + for (const std::string &protocol_version : {"mqtt5", "mqtt3.1"}) + { + cleanup(); + + ConfFileTemp confFile; + const std::string config = R"( +allow_anonymous true +log_debug false + +bridge { + address ::1 + port 21883 + subscribe ManglerRemote/shoes/# 2 + publish ManglerLocal/boots/# 2 + clientid_prefix Mangler + local_prefix ManglerLocal/ + remote_prefix ManglerRemote/ + protocol_version %s +} +listen { + protocol mqtt + port 51883 +})"; + confFile.writeLine(formatString(config, protocol_version.c_str())); + confFile.closeFile(); + + std::vector args {"--config-file", confFile.getFilePath()}; + + MainAppAsFork localServer(args); + localServer.start(); + localServer.waitForStarted(51883); + + FlashMQTestClient clientToLocalWithBridge; + clientToLocalWithBridge.start(); + clientToLocalWithBridge.connectClient(ProtocolVersion::Mqtt5, 51883); + + { + Publish pub("ManglerLocal/boots/retainme", "asdf", 2); + pub.retain = true; + clientToLocalWithBridge.publish(pub); + } + + // Bring the remote on-line + init(); + + FlashMQTestClient clientToRemote; + clientToRemote.start(); + + clientToRemote.connectClient(ProtocolVersion::Mqtt5); + clientToRemote.subscribe("#", 1); + + clientToRemote.waitForMessageCount(1); + FMQ_COMPARE(clientToRemote.receivedPublishes.size(), static_cast(1)); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getTopic(), std::string("ManglerRemote/boots/retainme")); + + clientToRemote.clearReceivedLists(); + + waitForMessagesOverBridge(clientToLocalWithBridge, clientToRemote, "ManglerLocal/boots/connectiontest"); + + FMQ_COMPARE(clientToRemote.receivedPublishes.size(), static_cast(1)); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getTopic(), std::string("ManglerRemote/boots/connectiontest")); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getQos(), 1); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getPayloadView(), std::string("connectiontest")); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getRetain(), false); + + { + FlashMQTestClient clientToRemote; + clientToRemote.start(); + + clientToRemote.connectClient(ProtocolVersion::Mqtt5); + clientToRemote.subscribe("#", 1); + + clientToRemote.waitForPacketCount(1); + FMQ_COMPARE(clientToRemote.receivedPublishes.size(), static_cast(1)); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getTopic(), std::string("ManglerRemote/boots/retainme")); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getQos(), 1); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getPayloadView(), "asdf"); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getRetain(), true); + } + } +} + +/** + * @brief Test if queued QoS messages have have their prefixes applied. Special care is taken in FlashMQ to support + * that (the topic_override is part of the QueuedPublish), so we need to test it. + */ +void MainTests::forkingTestBridgeWithLocalAndRemotePrefixQueuedQoS() +{ + for (const std::string &protocol_version : {"mqtt5", "mqtt3.1"}) + { + cleanup(); + + ConfFileTemp confFile; + const std::string config = R"( +allow_anonymous true +log_debug false + +bridge { + address ::1 + port 21883 + subscribe ManglerRemote/shoes/# 2 + publish ManglerLocal/boots/# 2 + clientid_prefix Mangler + remote_clean_start false + local_clean_start false + remote_session_expiry_interval 300 + local_session_expiry_interval 300 + local_prefix ManglerLocal/ + remote_prefix ManglerRemote/ + protocol_version %s +} +listen { + protocol mqtt + port 51883 +})"; + confFile.writeLine(formatString(config, protocol_version.c_str())); + confFile.closeFile(); + + std::vector args {"--config-file", confFile.getFilePath()}; + + MainAppAsFork localServer(args); + localServer.start(); + localServer.waitForStarted(51883); + + FlashMQTestClient clientToLocalWithBridge; + clientToLocalWithBridge.start(); + clientToLocalWithBridge.connectClient(ProtocolVersion::Mqtt5, 51883); + + // Bring the remote on-line + init(); + + FlashMQTestClient clientToRemote; + clientToRemote.start(); + + clientToRemote.connectClient(ProtocolVersion::Mqtt5); + clientToRemote.subscribe("#", 1); + + waitForMessagesOverBridge(clientToLocalWithBridge, clientToRemote, "ManglerLocal/boots/connectiontest"); + + FMQ_COMPARE(clientToRemote.receivedPublishes.size(), static_cast(1)); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getTopic(), std::string("ManglerRemote/boots/connectiontest")); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getQos(), 1); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getPayloadView(), std::string("connectiontest")); + + clientToRemote.clearReceivedLists(); + + // Stop the remote + cleanup(); + + Publish pub("ManglerLocal/boots/queue_me", "late to the party", 1); + clientToLocalWithBridge.publish(pub); + + std::cout << "Starting remote server again" << std::endl; + + // Start the remote again + init(); + + { + FlashMQTestClient clientToRemote; + clientToRemote.start(); + + clientToRemote.connectClient(ProtocolVersion::Mqtt5, false, 1000, [](Connect &c) { + c.clientid = "QueuedReceiver_666"; + }); + clientToRemote.subscribe("+/boots/queue_me", 1); + + clientToRemote.waitForMessageCount(1, 10); + FMQ_COMPARE(clientToRemote.receivedPublishes.size(), static_cast(1)); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getTopic(), std::string("ManglerRemote/boots/queue_me")); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getQos(), 1); + FMQ_COMPARE(clientToRemote.receivedPublishes.front().getPayloadView(), "late to the party"); + } + } + +} + + + + + + + + + + + + diff --git a/FlashMQTests/maintests.cpp b/FlashMQTests/maintests.cpp index 638d17b..dfe40fa 100644 --- a/FlashMQTests/maintests.cpp +++ b/FlashMQTests/maintests.cpp @@ -172,6 +172,11 @@ MainTests::MainTests() */ REGISTER_FUNCTION(forkingTestForkingTestServer); REGISTER_FUNCTION(forkingTestSaveAndLoadDelayedWill); + REGISTER_FUNCTION(forkingTestBridgeWithLocalAndRemotePrefix); + REGISTER_FUNCTION(forkingTestBridgeWithOnlyRemotePrefix); + REGISTER_FUNCTION(forkingTestBridgeWithOnlyLocalPrefix); + REGISTER_FUNCTION(forkingTestBridgeWithLocalAndRemotePrefixRetained); + REGISTER_FUNCTION(forkingTestBridgeWithLocalAndRemotePrefixQueuedQoS); REGISTER_FUNCTION(testDummy); REGISTER_FUNCTION3(test_circbuf); diff --git a/FlashMQTests/maintests.h b/FlashMQTests/maintests.h index 667f70e..c7e7dda 100644 --- a/FlashMQTests/maintests.h +++ b/FlashMQTests/maintests.h @@ -246,6 +246,11 @@ class MainTests void testSubscriptionIdChange(); void testSubscriptionIdOverlappingSubscriptions(); + void forkingTestBridgeWithLocalAndRemotePrefix(); + void forkingTestBridgeWithOnlyRemotePrefix(); + void forkingTestBridgeWithOnlyLocalPrefix(); + void forkingTestBridgeWithLocalAndRemotePrefixRetained(); + void forkingTestBridgeWithLocalAndRemotePrefixQueuedQoS(); public: MainTests(); diff --git a/FlashMQTests/tst_maintests.cpp b/FlashMQTests/tst_maintests.cpp index c155e97..50946cb 100644 --- a/FlashMQTests/tst_maintests.cpp +++ b/FlashMQTests/tst_maintests.cpp @@ -2185,7 +2185,7 @@ void MainTests::testQoSPublishQueue() { Publish p1("one", "onepayload", 1); - q.queuePublish(std::move(p1), id++); + q.queuePublish(std::move(p1), id++, std::optional()); qp = q.popNext(); QVERIFY(qp); @@ -2198,8 +2198,8 @@ void MainTests::testQoSPublishQueue() { Publish p1("two", "asdf", 1); Publish p2("three", "wer", 1); - q.queuePublish(std::move(p1), id++); - q.queuePublish(std::move(p2), id++); + q.queuePublish(std::move(p1), id++, std::optional()); + q.queuePublish(std::move(p2), id++, std::optional()); qp = q.popNext(); QCOMPARE(qp->getPublish().topic, "two"); @@ -2213,9 +2213,9 @@ void MainTests::testQoSPublishQueue() Publish p1("four", "asdf", 1); Publish p2("five", "wer", 1); Publish p3("six", "wer", 1); - q.queuePublish(std::move(p1), id++); - q.queuePublish(std::move(p2), id++); - q.queuePublish(std::move(p3), id++); + q.queuePublish(std::move(p1), id++, std::optional()); + q.queuePublish(std::move(p2), id++, std::optional()); + q.queuePublish(std::move(p3), id++, std::optional()); qp = q.popNext(); QCOMPARE(qp->getPublish().topic, "four"); @@ -2234,15 +2234,15 @@ void MainTests::testQoSPublishQueue() Publish p1("seven", "asdf", 1); Publish p2("eight", "wer", 1); Publish p3("nine", "wer", 1); - q.queuePublish(std::move(p1), id++); + q.queuePublish(std::move(p1), id++, std::optional()); idToRemove = id; - q.queuePublish(std::move(p2), id++); - q.queuePublish(std::move(p3), id++); + q.queuePublish(std::move(p2), id++, std::optional()); + q.queuePublish(std::move(p3), id++, std::optional()); q.erase(idToRemove); Publish p4("tool2eW7", "wer", 1); - q.queuePublish(std::move(p4), id++); + q.queuePublish(std::move(p4), id++, std::optional()); qp = q.popNext(); QCOMPARE(qp->getPublish().topic, "seven"); @@ -2262,14 +2262,14 @@ void MainTests::testQoSPublishQueue() Publish p2("eleven", "wer", 1); Publish p3("twelve", "wer", 1); idToRemove = id; - q.queuePublish(std::move(p1), id++); - q.queuePublish(std::move(p2), id++); - q.queuePublish(std::move(p3), id++); + q.queuePublish(std::move(p1), id++, std::optional()); + q.queuePublish(std::move(p2), id++, std::optional()); + q.queuePublish(std::move(p3), id++, std::optional()); q.erase(idToRemove); Publish p4("iew2Bie1", "wer", 1); - q.queuePublish(std::move(p4), id++); + q.queuePublish(std::move(p4), id++, std::optional()); qp = q.popNext(); QCOMPARE(qp->getPublish().topic, "eleven"); @@ -2288,15 +2288,15 @@ void MainTests::testQoSPublishQueue() Publish p1("13", "asdf", 1); Publish p2("14", "wer", 1); Publish p3("15", "wer", 1); - q.queuePublish(std::move(p1), id++); - q.queuePublish(std::move(p2), id++); + q.queuePublish(std::move(p1), id++, std::optional()); + q.queuePublish(std::move(p2), id++, std::optional()); idToRemove = id; - q.queuePublish(std::move(p3), id++); + q.queuePublish(std::move(p3), id++, std::optional()); q.erase(idToRemove); Publish p4("16", "wer", 1); - q.queuePublish(std::move(p4), id++); + q.queuePublish(std::move(p4), id++, std::optional()); qp = q.popNext(); QCOMPARE(qp->getPublish().topic, "13"); diff --git a/bridgeconfig.h b/bridgeconfig.h index dc7f715..0585a0b 100644 --- a/bridgeconfig.h +++ b/bridgeconfig.h @@ -60,6 +60,9 @@ class BridgeConfig bool queueForDelete = false; bool tcpNoDelay = false; + std::optional local_prefix; + std::optional remote_prefix; + void setClientId(const std::string &prefix, const std::string &id); const std::string &getClientid() const; void isValid(); diff --git a/client.cpp b/client.cpp index c965ea8..bcb9757 100644 --- a/client.cpp +++ b/client.cpp @@ -326,7 +326,8 @@ PacketDropReason Client::writeMqttPacket(const MqttPacket &packet) } PacketDropReason Client::writeMqttPacketAndBlameThisClient( - PublishCopyFactory ©Factory, uint8_t max_qos, uint16_t packet_id, bool retain, uint32_t subscriptionIdentifier) + PublishCopyFactory ©Factory, uint8_t max_qos, uint16_t packet_id, bool retain, uint32_t subscriptionIdentifier, + const std::optional &topic_override) { uint16_t topic_alias = 0; uint16_t topic_alias_next = 0; @@ -344,11 +345,13 @@ PacketDropReason Client::writeMqttPacketAndBlameThisClient( */ std::unique_lock aliasMutexExtended; + const std::string &topic = topic_override.value_or(copyFactory.getTopic()); + if (protocolVersion >= ProtocolVersion::Mqtt5 && this->maxOutgoingTopicAliasValue > 0) { std::unique_lock aliasMutex(outgoingTopicAliasMutex); - auto alias_pos = this->outgoingTopicAliases.find(copyFactory.getTopic()); + auto alias_pos = this->outgoingTopicAliases.find(topic); if (alias_pos != this->outgoingTopicAliases.end()) { @@ -364,7 +367,7 @@ PacketDropReason Client::writeMqttPacketAndBlameThisClient( } } - MqttPacket *p = copyFactory.getOptimumPacket(max_qos, this->protocolVersion, topic_alias, skip_topic, subscriptionIdentifier); + MqttPacket *p = copyFactory.getOptimumPacket(max_qos, this->protocolVersion, topic_alias, skip_topic, subscriptionIdentifier, topic_override); assert(static_cast(p->getQos()) == static_cast(max_qos)); assert(PublishCopyFactory::getPublishLayoutCompareKey(this->protocolVersion, p->getQos()) == @@ -384,7 +387,7 @@ PacketDropReason Client::writeMqttPacketAndBlameThisClient( if (dropReason == PacketDropReason::Success && topic_alias_next > 0) { - this->outgoingTopicAliases[copyFactory.getTopic()] = topic_alias_next; + this->outgoingTopicAliases[topic] = topic_alias_next; this->curOutgoingTopicAlias = topic_alias_next; } @@ -994,6 +997,22 @@ std::chrono::seconds Client::getSecondsTillKeepAliveAction() const return secondsTillKillTime; } +const std::optional &Client::getLocalPrefix() const +{ + if (!this->session) + throw std::runtime_error("Client has no session in getSession(). It was probably meant to be discarded."); + + return session->getLocalPrefix(); +} + +const std::optional &Client::getRemotePrefix() const +{ + if (!this->session) + throw std::runtime_error("Client has no session in getSession(). It was probably meant to be discarded."); + + return session->getRemotePrefix(); +} + void Client::clearWill() { willPublish.reset(); diff --git a/client.h b/client.h index 323b828..33bc4ab 100644 --- a/client.h +++ b/client.h @@ -186,13 +186,17 @@ class Client std::shared_ptr getSession(); void setDisconnectReason(const std::string &reason); std::chrono::seconds getSecondsTillKeepAliveAction() const; + const std::optional &getLocalPrefix() const; + const std::optional &getRemotePrefix() const; void writeText(const std::string &text); void writePing(); void writePingResp(); void writeLoginPacket(); PacketDropReason writeMqttPacket(const MqttPacket &packet); - PacketDropReason writeMqttPacketAndBlameThisClient(PublishCopyFactory ©Factory, uint8_t max_qos, uint16_t packet_id, bool retain, uint32_t subscriptionIdentifier); + PacketDropReason writeMqttPacketAndBlameThisClient( + PublishCopyFactory ©Factory, uint8_t max_qos, uint16_t packet_id, bool retain, uint32_t subscriptionIdentifier, + const std::optional &topic_override); PacketDropReason writeMqttPacketAndBlameThisClient(const MqttPacket &packet); void writeBufIntoFd(); DisconnectStage getDisconnectStage() const { return disconnectStage; } diff --git a/configfileparser.cpp b/configfileparser.cpp index b893b84..b78694d 100644 --- a/configfileparser.cpp +++ b/configfileparser.cpp @@ -272,6 +272,8 @@ ConfigFileParser::ConfigFileParser(const std::string &path) : validBridgeKeys.insert("max_outgoing_topic_aliases"); validBridgeKeys.insert("max_incoming_topic_aliases"); validBridgeKeys.insert("tcp_nodelay"); + validBridgeKeys.insert("local_prefix"); + validBridgeKeys.insert("remote_prefix"); } std::list ConfigFileParser::readFileRecursively(const std::string &path) const @@ -747,6 +749,20 @@ void ConfigFileParser::loadFile(bool test) { curBridge->tcpNoDelay = true; } + if (testKeyValidity(key, "local_prefix", validBridgeKeys)) + { + if (value.empty()) + throw ConfigFileException("Option '" + key + "' can't be empty."); + + curBridge->local_prefix = value; + } + if (testKeyValidity(key, "remote_prefix", validBridgeKeys)) + { + if (value.empty()) + throw ConfigFileException("Option '" + key + "' can't be empty."); + + curBridge->remote_prefix = value; + } testCorrectNumberOfValues(key, number_of_expected_values, values); continue; diff --git a/mqttpacket.cpp b/mqttpacket.cpp index 0e3d850..5b348c3 100644 --- a/mqttpacket.cpp +++ b/mqttpacket.cpp @@ -107,7 +107,7 @@ MqttPacket::MqttPacket(const UnsubAck &unsubAck) : } MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &_publish) : - MqttPacket(protocolVersion, _publish, _publish.qos, _publish.topicAlias, _publish.skipTopic, _publish.subscriptionIdentifier) + MqttPacket(protocolVersion, _publish, _publish.qos, _publish.topicAlias, _publish.skipTopic, _publish.subscriptionIdentifier, std::optional()) { } @@ -124,13 +124,8 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &_pu * if you just want the publish object's data. */ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &_publish, const uint8_t _qos, const uint16_t _topic_alias, - const bool _skip_topic, const uint32_t subscriptionIdentifier) + const bool _skip_topic, const uint32_t subscriptionIdentifier, const std::optional &topic_override) { - if (_publish.topic.length() > 0xFFFF) - { - throw ProtocolError("Topic path too long.", ReasonCodes::ProtocolError); - } - this->protocolVersion = protocolVersion; this->publishData.client_id = _publish.client_id; this->publishData.username = _publish.username; @@ -141,7 +136,12 @@ MqttPacket::MqttPacket(const ProtocolVersion protocolVersion, const Publish &_pu this->packetType = PacketType::PUBLISH; if (!this->publishData.skipTopic) - this->publishData.topic = _publish.topic; + this->publishData.topic = topic_override.value_or(_publish.topic); + + if (this->publishData.topic.length() > 0xFFFF) + { + throw ProtocolError("Topic path too long.", ReasonCodes::ProtocolError); + } first_byte = static_cast(packetType) << 4; first_byte |= (this->publishData.qos << 1); @@ -1947,6 +1947,35 @@ void MqttPacket::handlePublish(std::shared_ptr &sender) ThreadGlobals::getThreadData()->receivedMessageCounter.inc(); + /* + * Topic prefixing. Currently, remote prefix is removed and local prefixes is added before any ACL checks are + * done. I think that makes the most sense. + */ + { + const std::optional &remote_prefix = sender->getRemotePrefix(); + const std::optional &local_prefix = sender->getLocalPrefix(); + + bool resplit = false; + + if (remote_prefix && startsWith(publishData.topic, *remote_prefix)) + { + resplit = true; + publishData.topic.erase(0, remote_prefix->length()); + } + + if (local_prefix) + { + resplit = true; + publishData.topic = *local_prefix + publishData.topic; + } + + if (resplit) + { + dontReuseBites = true; + publishData.resplitTopic(); + } + } + Authentication &authentication = *ThreadGlobals::getAuth(); const Settings *settings = ThreadGlobals::getSettings(); diff --git a/mqttpacket.h b/mqttpacket.h index 86711d2..d3490e1 100644 --- a/mqttpacket.h +++ b/mqttpacket.h @@ -117,7 +117,7 @@ class MqttPacket MqttPacket(const UnsubAck &unsubAck); MqttPacket(const ProtocolVersion protocolVersion, const Publish &_publish); MqttPacket(const ProtocolVersion protocolVersion, const Publish &_publish, const uint8_t _qos, const uint16_t _topic_alias, - const bool _skip_topic, const uint32_t subscriptionIdentifier); + const bool _skip_topic, const uint32_t subscriptionIdentifier, const std::optional &topic_override); MqttPacket(const PubResponse &pubAck); MqttPacket(const Disconnect &disconnect); MqttPacket(const Auth &auth); diff --git a/publishcopyfactory.cpp b/publishcopyfactory.cpp index d37128c..60eefac 100644 --- a/publishcopyfactory.cpp +++ b/publishcopyfactory.cpp @@ -31,7 +31,8 @@ PublishCopyFactory::PublishCopyFactory(Publish *publish) : } MqttPacket *PublishCopyFactory::getOptimumPacket( - const uint8_t max_qos, const ProtocolVersion protocolVersion, uint16_t topic_alias, bool skip_topic, uint32_t subscriptionIdentifier) + const uint8_t max_qos, const ProtocolVersion protocolVersion, uint16_t topic_alias, bool skip_topic, uint32_t subscriptionIdentifier, + const std::optional &topic_override) { const uint8_t actualQos = getEffectiveQos(max_qos); @@ -44,9 +45,9 @@ MqttPacket *PublishCopyFactory::getOptimumPacket( assert(packet->getPublishData().subscriptionIdentifier == 0); // When the packet contains an data specific to the receiver, we don't cache it. - if ((protocolVersion >= ProtocolVersion::Mqtt5 && topic_alias > 0) || subscriptionIdentifier > 0) + if (topic_override || (protocolVersion >= ProtocolVersion::Mqtt5 && topic_alias > 0) || subscriptionIdentifier > 0) { - this->oneShotPacket.emplace(protocolVersion, packet->getPublishData(), actualQos, topic_alias, skip_topic, subscriptionIdentifier); + this->oneShotPacket.emplace(protocolVersion, packet->getPublishData(), actualQos, topic_alias, skip_topic, subscriptionIdentifier, topic_override); return &*this->oneShotPacket; } @@ -63,7 +64,8 @@ MqttPacket *PublishCopyFactory::getOptimumPacket( if (!cachedPack) { - cachedPack.emplace(protocolVersion, packet->getPublishData(), actualQos, 0, false, 0); + // Don't include arguments that are not part of the cache key. + cachedPack.emplace(protocolVersion, packet->getPublishData(), actualQos, 0, false, 0, std::optional()); } return &*cachedPack; @@ -75,7 +77,7 @@ MqttPacket *PublishCopyFactory::getOptimumPacket( // The incoming topic alias is not relevant after initial conversion and it should not propagate. assert(publish->topicAlias == 0); - this->oneShotPacket.emplace(protocolVersion, *publish, actualQos, topic_alias, skip_topic, subscriptionIdentifier); + this->oneShotPacket.emplace(protocolVersion, *publish, actualQos, topic_alias, skip_topic, subscriptionIdentifier, topic_override); return &*this->oneShotPacket; } @@ -223,4 +225,3 @@ int PublishCopyFactory::getPublishLayoutCompareKey(ProtocolVersion pv, uint8_t q } - diff --git a/publishcopyfactory.h b/publishcopyfactory.h index 5b979f3..a166ea4 100644 --- a/publishcopyfactory.h +++ b/publishcopyfactory.h @@ -43,7 +43,9 @@ class PublishCopyFactory PublishCopyFactory(const PublishCopyFactory &other) = delete; PublishCopyFactory(PublishCopyFactory &&other) = delete; - MqttPacket *getOptimumPacket(const uint8_t max_qos, const ProtocolVersion protocolVersion, uint16_t topic_alias, bool skip_topic, uint32_t subscriptionIdentifier); + MqttPacket *getOptimumPacket( + const uint8_t max_qos, const ProtocolVersion protocolVersion, uint16_t topic_alias, bool skip_topic, uint32_t subscriptionIdentifier, + const std::optional &topic_override); uint8_t getEffectiveQos(uint8_t max_qos) const; bool getEffectiveRetain(bool retainAsPublished) const; const std::string &getTopic() const; diff --git a/qospacketqueue.cpp b/qospacketqueue.cpp index 85cf2ad..7824131 100644 --- a/qospacketqueue.cpp +++ b/qospacketqueue.cpp @@ -14,9 +14,10 @@ See LICENSE for license details. #include "mqttpacket.h" -QueuedPublish::QueuedPublish(Publish &&publish, uint16_t packet_id) : +QueuedPublish::QueuedPublish(Publish &&publish, uint16_t packet_id, const std::optional &topic_override) : publish(std::move(publish)), - packet_id(packet_id) + packet_id(packet_id), + topic_override(topic_override) { } @@ -31,6 +32,11 @@ Publish &QueuedPublish::getPublish() return publish; } +const std::optional &QueuedPublish::getTopicOverride() const +{ + return topic_override; +} + size_t QueuedPublish::getApproximateMemoryFootprint() const { // TODO: hmm, this is possibly very inaccurate with MQTT5 packets. @@ -139,24 +145,34 @@ void QoSPublishQueue::addToHeadOfLinkedList(std::shared_ptr &qp) * subscribe, which an offline client can't do. However, MQTT5 introduces 'retained as published', so it becomes valid. Bridge * mode uses this as well. */ -void QoSPublishQueue::queuePublish(PublishCopyFactory ©Factory, uint16_t id, uint8_t new_max_qos, bool retainAsPublished, const uint32_t subscriptionIdentifier) +void QoSPublishQueue::queuePublish( + PublishCopyFactory ©Factory, uint16_t id, uint8_t new_max_qos, bool retainAsPublished, const uint32_t subscriptionIdentifier, + const std::optional &topic_override) { assert(new_max_qos > 0); assert(id > 0); Publish pub = copyFactory.getNewPublish(new_max_qos, retainAsPublished, subscriptionIdentifier); - std::shared_ptr qp = std::make_shared(std::move(pub), id); + std::shared_ptr qp = std::make_shared(std::move(pub), id, topic_override); addToHeadOfLinkedList(qp); qosQueueBytes += qp->getApproximateMemoryFootprint(); addToExpirationQueue(qp); queue[id] = std::move(qp); } -void QoSPublishQueue::queuePublish(Publish &&pub, uint16_t id) +/** + * @brief QoSPublishQueue::queuePublish moves the publish into the queue. + * @param pub + * @param id + * @param topic_override could/should theoretically also have been an rvalue ref, but that required maintaining + * a various constructors of QueuedPublish with ref and rref arguments, which didn't seem worth it. So far, this + * function is only used for loading from disk, so not the hot path. + */ +void QoSPublishQueue::queuePublish(Publish &&pub, uint16_t id, const std::optional &topic_override) { assert(id > 0); - std::shared_ptr qp = std::make_shared(std::move(pub), id); + std::shared_ptr qp = std::make_shared(std::move(pub), id, topic_override); addToHeadOfLinkedList(qp); qosQueueBytes += qp->getApproximateMemoryFootprint(); addToExpirationQueue(qp); diff --git a/qospacketqueue.h b/qospacketqueue.h index f0575c6..233add1 100644 --- a/qospacketqueue.h +++ b/qospacketqueue.h @@ -26,8 +26,11 @@ class QueuedPublish { Publish publish; uint16_t packet_id = 0; + + // We store this separately because because we need to retain the original publish path for ACL checking upon resending. + std::optional topic_override; public: - QueuedPublish(Publish &&publish, uint16_t packet_id); + QueuedPublish(Publish &&publish, uint16_t packet_id, const std::optional &topic_override); QueuedPublish(const QueuedPublish &other) = delete; std::shared_ptr prev; @@ -36,6 +39,7 @@ class QueuedPublish size_t getApproximateMemoryFootprint() const; uint16_t getPacketId() const; Publish &getPublish(); + const std::optional &getTopicOverride() const; }; class QoSPublishQueue @@ -61,8 +65,10 @@ class QoSPublishQueue bool erase(const uint16_t packet_id); size_t size() const; size_t getByteSize() const; - void queuePublish(PublishCopyFactory ©Factory, uint16_t id, uint8_t new_max_qos, bool retainAsPublished, const uint32_t subscriptionIdentifier); - void queuePublish(Publish &&pub, uint16_t id); + void queuePublish( + PublishCopyFactory ©Factory, uint16_t id, uint8_t new_max_qos, bool retainAsPublished, const uint32_t subscriptionIdentifier, + const std::optional &topic_override); + void queuePublish(Publish &&pub, uint16_t id, const std::optional &topic_override); int clearExpiredMessages(); const std::shared_ptr &getTail() const; std::shared_ptr popNext(); diff --git a/session.cpp b/session.cpp index 4e56234..d63673e 100644 --- a/session.cpp +++ b/session.cpp @@ -140,6 +140,24 @@ PacketDropReason Session::writePacket(PublishCopyFactory ©Factory, const uin const std::shared_ptr c = makeSharedClient(); + std::optional topic_override; + + { + if (local_prefix && startsWith(copyFactory.getTopic(), *local_prefix)) + { + topic_override = copyFactory.getTopic(); + topic_override->erase(0, local_prefix->length()); + } + + if (remote_prefix) + { + if (!topic_override) + topic_override = copyFactory.getTopic(); + + topic_override = *remote_prefix + *topic_override; + } + } + uint16_t pack_id = 0; if (__builtin_expect(effectiveQos > 0, 0)) @@ -180,7 +198,7 @@ PacketDropReason Session::writePacket(PublishCopyFactory ©Factory, const uin pack_id = getNextPacketId(); if (!destroyOnDisconnect) - qosPacketQueue.queuePublish(copyFactory, pack_id, effectiveQos, effectiveRetain, subscriptionIdentifier); + qosPacketQueue.queuePublish(copyFactory, pack_id, effectiveQos, effectiveRetain, subscriptionIdentifier, topic_override); } PacketDropReason return_value = PacketDropReason::ClientOffline; @@ -190,7 +208,7 @@ PacketDropReason Session::writePacket(PublishCopyFactory ©Factory, const uin if (!c->isRetainedAvailable()) effectiveRetain = false; - return_value = c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, pack_id, effectiveRetain, subscriptionIdentifier); + return_value = c->writeMqttPacketAndBlameThisClient(copyFactory, effectiveQos, pack_id, effectiveRetain, subscriptionIdentifier, topic_override); } return return_value; @@ -247,7 +265,7 @@ void Session::sendAllPendingQosData() std::shared_ptr c = makeSharedClient(); if (c) { - std::vector> copiedPublishes; + std::vector, uint16_t>> copiedPublishes; std::vector copiedQoS2Ids; { @@ -276,7 +294,7 @@ void Session::sendAllPendingQosData() flowControlQuota--; - copiedPublishes.emplace_back(pub, qp->getPacketId()); + copiedPublishes.emplace_back(pub, qp->getTopicOverride(), qp->getPacketId()); } for (const uint16_t packet_id : outgoingQoS2MessageIds) @@ -285,11 +303,12 @@ void Session::sendAllPendingQosData() } } - for(std::pair &p : copiedPublishes) + for(std::tuple, uint16_t> &p : copiedPublishes) { - PublishCopyFactory fac(&p.first); - const bool retain = !c->isRetainedAvailable() ? false : p.first.retain; - c->writeMqttPacketAndBlameThisClient(fac, p.first.qos, p.second, retain, p.first.subscriptionIdentifier); + Publish &pub = std::get(p); + PublishCopyFactory fac(&pub); + const bool retain = !c->isRetainedAvailable() ? false : pub.retain; + c->writeMqttPacketAndBlameThisClient(fac, pub.qos, std::get(p), retain, pub.subscriptionIdentifier, std::get>(p)); } for(uint16_t id : copiedQoS2Ids) @@ -458,3 +477,13 @@ uint32_t Session::getCurrentSessionExpiryInterval() return result; } +void Session::setLocalPrefix(const std::optional &s) +{ + this->local_prefix = s; +} + +void Session::setRemotePrefix(const std::optional &s) +{ + this->remote_prefix = s; +} + diff --git a/session.h b/session.h index 63302af..0c58780 100644 --- a/session.h +++ b/session.h @@ -36,6 +36,8 @@ class Session LockedWeakPtr client; const std::string client_id; const std::string username; + std::optional local_prefix; + std::optional remote_prefix; QoSPublishQueue qosPacketQueue; std::set incomingQoS2MessageIds; std::set outgoingQoS2MessageIds; @@ -104,6 +106,11 @@ class Session void setQueuedRemovalAt(); uint32_t getSessionExpiryInterval() const; uint32_t getCurrentSessionExpiryInterval(); + + void setLocalPrefix(const std::optional &s); + void setRemotePrefix(const std::optional &s); + const std::optional &getLocalPrefix() const { return local_prefix; } + const std::optional &getRemotePrefix() const { return remote_prefix; } }; #endif // SESSION_H diff --git a/sessionsandsubscriptionsdb.cpp b/sessionsandsubscriptionsdb.cpp index 1e8bba9..81da618 100644 --- a/sessionsandsubscriptionsdb.cpp +++ b/sessionsandsubscriptionsdb.cpp @@ -79,7 +79,7 @@ SessionsAndSubscriptionsDB::SessionsAndSubscriptionsDB(const std::string &filePa void SessionsAndSubscriptionsDB::openWrite() { - PersistenceFile::openWrite(MAGIC_STRING_SESSION_FILE_V6); + PersistenceFile::openWrite(MAGIC_STRING_SESSION_FILE_V7); } void SessionsAndSubscriptionsDB::openRead() @@ -100,11 +100,13 @@ void SessionsAndSubscriptionsDB::openRead() readVersion = ReadVersion::v5; else if (detectedVersionString == current_magic_string) readVersion = ReadVersion::v6; + else if (detectedVersionString == MAGIC_STRING_SESSION_FILE_V7) + readVersion = ReadVersion::v7; else throw std::runtime_error("Unknown file version."); } -SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV3V4V5V6() +SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV3V4V5V6V7() { const Settings &settings = *ThreadGlobals::getSettings(); @@ -157,6 +159,11 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV3V4V5V6() const std::string sender_clientid = readString(eofFound); const std::string sender_username = readString(eofFound); + const bool has_topic_override = readVersion >= ReadVersion::v7 ? static_cast(readUint8(eofFound)) : false; + std::optional topic_override; + if (has_topic_override) + topic_override = readString(eofFound); + assert(id > 0); cirbuf.reset(); @@ -177,7 +184,7 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readDataV3V4V5V6() pub.expireInfo->createdAt = timepointFromAge(newPubAge); logger->logf(LOG_DEBUG, "Loaded QoS %d message for topic '%s' for session '%s'.", pub.qos, pub.topic.c_str(), ses->getClientId().c_str()); - ses->qosPacketQueue.queuePublish(std::move(pub), id); + ses->qosPacketQueue.queuePublish(std::move(pub), id, topic_override); } const uint32_t nrOfIncomingPacketIds = readUint32(eofFound); @@ -326,11 +333,9 @@ void SessionsAndSubscriptionsDB::saveData(const std::vector qp = ses->qosPacketQueue.getTail(); while (qp) { - QueuedPublish &p = *qp; - qosPacketsCounted++; - Publish &pub = p.getPublish(); + Publish &pub = qp->getPublish(); assert(!pub.skipTopic); assert(pub.topicAlias == 0); @@ -338,7 +343,7 @@ void SessionsAndSubscriptionsDB::saveData(const std::vectorlogf(LOG_DEBUG, "Saving QoS %d message for topic '%s'.", pub.qos, pub.topic.c_str()); MqttPacket pack(ProtocolVersion::Mqtt5, pub); - pack.setPacketId(p.getPacketId()); + pack.setPacketId(qp->getPacketId()); const uint32_t packSize = pack.getSizeIncludingNonPresentHeader(); cirbuf.reset(); cirbuf.ensureFreeSpace(packSize + 32); @@ -347,11 +352,17 @@ void SessionsAndSubscriptionsDB::saveData(const std::vectorgetPacketId()); writeUint32(pubAge); writeUint32(packSize); writeString(pub.client_id); writeString(pub.username); + + const bool hasTopicOverride = qp->getTopicOverride().has_value(); + writeUint8(hasTopicOverride); + if (hasTopicOverride) + writeString(qp->getTopicOverride().value()); + writeCheck(cirbuf.tailPtr(), 1, cirbuf.usedBytes(), f); qp = qp->next; @@ -456,7 +467,7 @@ SessionsAndSubscriptionsResult SessionsAndSubscriptionsDB::readData() if (readVersion == ReadVersion::v2) logger->logf(LOG_WARNING, "File '%s' is version 2, an internal development version that was never finalized. Not reading.", getFilePath().c_str()); if (readVersion >= ReadVersion::v3) - return readDataV3V4V5V6(); + return readDataV3V4V5V6V7(); return defaultResult; } diff --git a/sessionsandsubscriptionsdb.h b/sessionsandsubscriptionsdb.h index 6277707..841eca0 100644 --- a/sessionsandsubscriptionsdb.h +++ b/sessionsandsubscriptionsdb.h @@ -24,6 +24,7 @@ See LICENSE for license details. #define MAGIC_STRING_SESSION_FILE_V4 "FlashMQSessionDBv4" #define MAGIC_STRING_SESSION_FILE_V5 "FlashMQSessionDBv5" #define MAGIC_STRING_SESSION_FILE_V6 "FlashMQSessionDBv6" +#define MAGIC_STRING_SESSION_FILE_V7 "FlashMQSessionDBv7" #define RESERVED_SPACE_SESSIONS_DB_V2 32 /** @@ -64,12 +65,13 @@ class SessionsAndSubscriptionsDB : private PersistenceFile v3, v4, v5, - v6 + v6, + v7 }; ReadVersion readVersion = ReadVersion::unknown; - SessionsAndSubscriptionsResult readDataV3V4V5V6(); + SessionsAndSubscriptionsResult readDataV3V4V5V6V7(); void writeRowHeader(); public: SessionsAndSubscriptionsDB(const std::string &filePath); diff --git a/threaddata.cpp b/threaddata.cpp index 866099b..82f7fd2 100644 --- a/threaddata.cpp +++ b/threaddata.cpp @@ -375,9 +375,13 @@ void ThreadData::bridgeReconnect() else { std::shared_ptr subscriptionStore = MainApp::getMainApp()->getSubscriptionStore(); - bridge->session = subscriptionStore->getBridgeSession(c); + session = subscriptionStore->getBridgeSession(c); + bridge->session = session; } + session->setLocalPrefix(bridge->c.local_prefix); + session->setRemotePrefix(bridge->c.remote_prefix); + c->connectToBridgeTarget(addr); } catch (std::exception &ex)