Skip to content

Commit

Permalink
Seperate proxy and broker a single module (#1510)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Oct 24, 2024
1 parent 8121554 commit c2431a6
Show file tree
Hide file tree
Showing 207 changed files with 1,683 additions and 176 deletions.
2 changes: 1 addition & 1 deletion .ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ ARG MOP_VERSION
USER root
RUN rm -rf /pulsar/protocols/pulsar-protocol-handler-mqtt-*.nar

COPY ../mqtt-impl/target/pulsar-protocol-handler-mqtt-${MOP_VERSION}.nar /pulsar/protocols
COPY ../mqtt-broker/target/pulsar-protocol-handler-mqtt-${MOP_VERSION}.nar /pulsar/protocols
4 changes: 2 additions & 2 deletions .github/workflows/pr_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
id: list-test
run: |
TESTS=`find tests/src/test/java/io/streamnative/pulsar/handlers/mqtt \
mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt -name "*Test.java" | \
mqtt-broker/src/test/java/io/streamnative/pulsar/handlers/mqtt -name "*Test.java" | \
awk -F "/" '{ print $NF }' | \
awk -F "." '{ print $1 }' | \
jq -R -s -c 'split("\n") | map(select(. != ""))'`
Expand Down Expand Up @@ -146,7 +146,7 @@ jobs:
- name: Download jacoco artifact
uses: actions/download-artifact@v3
with:
path: mqtt-impl/target
path: mqtt-broker/target

- name: Merge jacoco report
run: mvn jacoco:merge
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ MQTT-on-Pulsar (aka MoP) is developed to support MQTT protocol natively on Apach
3. The NAR file can be found at this location.

```bash
./mqtt-impl/target/pulsar-protocol-handler-mqtt-${version}.nar
./mqtt-broker/target/pulsar-protocol-handler-mqtt-${version}.nar
```

### Install MoP protocol handler
Expand Down
7 changes: 7 additions & 0 deletions mqtt-impl/pom.xml → mqtt-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@
<name>StreamNative :: Pulsar Protocol Handler :: MQTT</name>
<description>MQTT on Pulsar implemented using Pulsar Protocol Handler</description>

<dependencies>
<dependency>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-mqtt-proxy</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,17 @@ public MQTTService(BrokerService brokerService, MQTTServerConfiguration serverCo
this.brokerService = brokerService;
this.pulsarService = brokerService.pulsar();
this.serverConfiguration = serverConfiguration;
this.pskConfiguration = new PSKConfiguration(serverConfiguration);
this.pskConfiguration = new PSKConfiguration(serverConfiguration.getMqttTlsPskIdentityHint(),
serverConfiguration.getMqttTlsPskIdentity(), serverConfiguration.getMqttTlsPskIdentityFile(),
serverConfiguration.getMqttTlsProtocols(), serverConfiguration.getMqttTlsCiphers());
this.authorizationService = brokerService.getAuthorizationService();
this.bundleOwnershipListener = new MQTTNamespaceBundleOwnershipListener(pulsarService.getNamespaceService());
this.metricsCollector = new MQTTMetricsCollector(serverConfiguration);
this.metricsProvider = new MQTTMetricsProvider(metricsCollector);
this.pulsarService.addPrometheusRawMetricsProvider(metricsProvider);
this.authenticationService = serverConfiguration.isMqttAuthenticationEnabled()
? new MQTTAuthenticationService(brokerService,
serverConfiguration.getMqttAuthenticationMethods(),
serverConfiguration.isMqttProxyMTlsAuthenticationEnabled()) : null;
serverConfiguration.getMqttAuthenticationMethods()) : null;
this.connectionManager = new MQTTConnectionManager(pulsarService.getAdvertisedAddress());
this.subscriptionManager = new MQTTSubscriptionManager();
if (getServerConfiguration().isMqttProxyEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.broker.channel;

import static com.google.common.base.Preconditions.checkArgument;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTService;
import io.streamnative.pulsar.handlers.mqtt.broker.processor.MQTTBrokerProtocolMethodProcessor;
import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonInboundHandler;
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;

/**
* MQTT in bound handler.
*/
@Sharable
@Slf4j
public class MQTTBrokerInboundHandler extends MQTTCommonInboundHandler {

public static final String NAME = "handler";

private final MQTTService mqttService;

public MQTTBrokerInboundHandler(MQTTService mqttService) {
this.mqttService = mqttService;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object message) {
checkArgument(message instanceof MqttAdapterMessage);
MqttAdapterMessage adapterMsg = (MqttAdapterMessage) message;
processors.computeIfAbsent(adapterMsg.getClientId(), key -> {
MQTTBrokerProtocolMethodProcessor p = new MQTTBrokerProtocolMethodProcessor(mqttService, ctx);
CompletableFuture<Void> inactiveFuture = p.getInactiveFuture();
inactiveFuture.whenComplete((id, ex) -> {
processors.remove(adapterMsg.getClientId());
});
return p;
});
super.channelRead(ctx, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(MqttAdapterEncoder.NAME, MqttAdapterEncoder.INSTANCE);
// Handler
ch.pipeline().addLast(CombineAdapterHandler.NAME, new CombineAdapterHandler());
ch.pipeline().addLast(MQTTInboundHandler.NAME, new MQTTInboundHandler(mqttService));
ch.pipeline().addLast(MQTTBrokerInboundHandler.NAME, new MQTTBrokerInboundHandler(mqttService));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.common.authentication.mtls;
package io.streamnative.pulsar.handlers.mqtt.broker.channel;
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.broker.codec;
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.concurrent.FastThreadLocal;
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTServerConfiguration;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.PacketIdGenerator;
import io.streamnative.pulsar.handlers.mqtt.common.utils.MessageBuilder;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.concurrent.Future;
import io.streamnative.pulsar.handlers.mqtt.broker.channel.MQTTServerCnx;
import io.streamnative.pulsar.handlers.mqtt.broker.impl.PacketIdGenerator;
import io.streamnative.pulsar.handlers.mqtt.broker.impl.PulsarMessageConverter;
import io.streamnative.pulsar.handlers.mqtt.broker.metric.MQTTMetricsCollector;
import io.streamnative.pulsar.handlers.mqtt.common.Connection;
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.PacketIdGenerator;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.restrictions.ClientRestrictions;
import io.streamnative.pulsar.handlers.mqtt.common.utils.PulsarTopicUtils;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.broker.metric;
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTService;
import io.streamnative.pulsar.handlers.mqtt.broker.channel.MQTTServerCnx;
import io.streamnative.pulsar.handlers.mqtt.broker.impl.MQTTSubscriptionManager;
import io.streamnative.pulsar.handlers.mqtt.broker.impl.PacketIdGenerator;
import io.streamnative.pulsar.handlers.mqtt.broker.impl.consumer.MQTTConsumer;
import io.streamnative.pulsar.handlers.mqtt.broker.impl.consumer.OutstandingPacket;
import io.streamnative.pulsar.handlers.mqtt.broker.impl.consumer.OutstandingPacketContainer;
Expand All @@ -58,6 +57,7 @@
import io.streamnative.pulsar.handlers.mqtt.common.messages.codes.mqtt5.Mqtt5PubReasonCode;
import io.streamnative.pulsar.handlers.mqtt.common.messages.codes.mqtt5.Mqtt5UnsubReasonCode;
import io.streamnative.pulsar.handlers.mqtt.common.messages.properties.PulsarProperties;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.PacketIdGenerator;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.RetainedMessageHandler;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.restrictions.ClientRestrictions;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.restrictions.ServerRestrictions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.broker.processor;
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTServerConfiguration;
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTService;
import io.streamnative.pulsar.handlers.mqtt.broker.impl.TopicAliasManager;
import io.streamnative.pulsar.handlers.mqtt.broker.impl.consumer.MessagePublishContext;
import io.streamnative.pulsar.handlers.mqtt.common.Connection;
import io.streamnative.pulsar.handlers.mqtt.common.exception.MQTTNoMatchingSubscriberException;
import io.streamnative.pulsar.handlers.mqtt.common.exception.MQTTTopicAliasExceedsLimitException;
import io.streamnative.pulsar.handlers.mqtt.common.exception.MQTTTopicAliasNotFoundException;
import io.streamnative.pulsar.handlers.mqtt.common.messages.MqttPropertyUtils;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.RetainedMessageHandler;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.TopicAliasManager;
import io.streamnative.pulsar.handlers.mqtt.common.utils.PulsarTopicUtils;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.broker.rest;
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import io.streamnative.pulsar.handlers.mqtt.proxy.MessageAckTracker;
import io.streamnative.pulsar.handlers.mqtt.proxy.impl.MessageAckTracker;
import org.testng.annotations.Test;

public class MessageAckTrackerTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package io.streamnative.pulsar.handlers.mqtt.untils;

import io.streamnative.pulsar.handlers.mqtt.broker.impl.PacketIdGenerator;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.PacketIdGenerator;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down
File renamed without changes.
27 changes: 27 additions & 0 deletions mqtt-common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>pulsar-protocol-handler-mqtt-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>3.4.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-protocol-handler-mqtt-common</artifactId>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
import io.netty.handler.ssl.SslHandler;
import io.streamnative.pulsar.handlers.mqtt.broker.processor.ProtocolMethodProcessor;
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage;
import io.streamnative.pulsar.handlers.mqtt.common.authentication.MQTTAuthenticationService;
import io.streamnative.pulsar.handlers.mqtt.common.exception.MQTTAuthException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.timeout.IdleStateHandler;
import io.streamnative.pulsar.handlers.mqtt.broker.impl.TopicAliasManager;
import io.streamnative.pulsar.handlers.mqtt.broker.impl.TopicSubscriptionManager;
import io.streamnative.pulsar.handlers.mqtt.broker.processor.ProtocolMethodProcessor;
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage;
import io.streamnative.pulsar.handlers.mqtt.common.exception.restrictions.InvalidSessionExpireIntervalException;
import io.streamnative.pulsar.handlers.mqtt.common.messages.ack.MqttAck;
import io.streamnative.pulsar.handlers.mqtt.common.messages.ack.MqttConnectAck;
import io.streamnative.pulsar.handlers.mqtt.common.messages.codes.mqtt5.Mqtt5DisConnReasonCode;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.TopicAliasManager;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.TopicSubscriptionManager;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.restrictions.ClientRestrictions;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.restrictions.ServerRestrictions;
import io.streamnative.pulsar.handlers.mqtt.common.utils.FutureUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.streamnative.pulsar.handlers.mqtt.common;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.streamnative.pulsar.handlers.mqtt.common.utils.MqttMessageUtils.checkState;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -23,14 +24,9 @@
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTService;
import io.streamnative.pulsar.handlers.mqtt.broker.processor.MQTTBrokerProtocolMethodProcessor;
import io.streamnative.pulsar.handlers.mqtt.broker.processor.ProtocolMethodProcessor;
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage;
import io.streamnative.pulsar.handlers.mqtt.common.utils.NettyUtils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -42,25 +38,16 @@ public class MQTTCommonInboundHandler extends ChannelInboundHandlerAdapter {

public static final String NAME = "InboundHandler";

@Setter
protected MQTTService mqttService;

protected final ConcurrentHashMap<String, ProtocolMethodProcessor> processors = new ConcurrentHashMap<>();

@Override
public void channelRead(ChannelHandlerContext ctx, Object message) {
checkArgument(message instanceof MqttAdapterMessage);
MqttAdapterMessage adapterMsg = (MqttAdapterMessage) message;
MqttMessage mqttMessage = adapterMsg.getMqttMessage();
final ProtocolMethodProcessor processor = processors.computeIfAbsent(adapterMsg.getClientId(), key -> {
MQTTBrokerProtocolMethodProcessor p = new MQTTBrokerProtocolMethodProcessor(mqttService, ctx);
CompletableFuture<Void> inactiveFuture = p.getInactiveFuture();
inactiveFuture.whenComplete((id, ex) -> {
processors.remove(adapterMsg.getClientId());
});
return p;
});
final ProtocolMethodProcessor processor = processors.get(adapterMsg.getClientId());
try {
checkNotNull(processor);
checkState(mqttMessage);
MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.broker.processor;
package io.streamnative.pulsar.handlers.mqtt.common;

import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage;

Expand Down
Loading

0 comments on commit c2431a6

Please sign in to comment.