diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTSubscriptionManager.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTSubscriptionManager.java index 671357fec..4c39142a2 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTSubscriptionManager.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTSubscriptionManager.java @@ -79,4 +79,27 @@ public List> findMatchedTopic(String topic) { public void removeSubscription(String clientId) { subscriptions.remove(clientId); } + + public boolean removeSubscriptionForTopic(String clientId, String topic) { + List subscriptionsList = this.subscriptions.get(clientId); + if (subscriptionsList == null) { + // return false if no subscriptions are found for this client + return false; + } + synchronized (clientId.intern()) { + List withSubscriptionRemoved = subscriptionsList.stream() + .filter(sub -> !matchSubscription(sub.topicName(), topic)) + .collect(Collectors.toList()); + if (withSubscriptionRemoved.size() == subscriptionsList.size()) { + // if no subscription is removed, return false + return false; + } + this.subscriptions.put(clientId, withSubscriptionRemoved); + } + return true; + } + + private static boolean matchSubscription(String topic1, String topic2) { + return new TopicFilterImpl(topic1).test(topic2); + } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java index 2702d96b2..defa43884 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java @@ -505,6 +505,11 @@ public void processUnSubscribe(MqttAdapterMessage adapter) { final List topicFilters = msg.payload().topics(); final List> futureList = new ArrayList<>(topicFilters.size()); for (String topicFilter : topicFilters) { + final boolean removed = mqttSubscriptionManager.removeSubscriptionForTopic(clientId, topicFilter); + if (!removed) { + throw new MQTTTopicNotExistedException( + String.format("Can not found topic when %s unsubscribe.", clientId)); + } metricsCollector.removeSub(topicFilter); CompletableFuture> topicListFuture = PulsarTopicUtils.asyncGetTopicListFromTopicSubscription( topicFilter, configuration.getDefaultTenant(), configuration.getDefaultNamespace(), pulsarService,