Skip to content

Commit

Permalink
remove subs from subscription manager on unsubscribe (#957)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsturzl authored Dec 29, 2023
1 parent 0e749ee commit 7ee99e9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,27 @@ public List<Pair<String, String>> findMatchedTopic(String topic) {
public void removeSubscription(String clientId) {
subscriptions.remove(clientId);
}

public boolean removeSubscriptionForTopic(String clientId, String topic) {
List<MqttTopicSubscription> subscriptionsList = this.subscriptions.get(clientId);
if (subscriptionsList == null) {
// return false if no subscriptions are found for this client
return false;
}
synchronized (clientId.intern()) {
List<MqttTopicSubscription> withSubscriptionRemoved = subscriptionsList.stream()
.filter(sub -> !matchSubscription(sub.topicName(), topic))
.collect(Collectors.toList());
if (withSubscriptionRemoved.size() == subscriptionsList.size()) {
// if no subscription is removed, return false
return false;
}
this.subscriptions.put(clientId, withSubscriptionRemoved);
}
return true;
}

private static boolean matchSubscription(String topic1, String topic2) {
return new TopicFilterImpl(topic1).test(topic2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,11 @@ public void processUnSubscribe(MqttAdapterMessage adapter) {
final List<String> topicFilters = msg.payload().topics();
final List<CompletableFuture<Void>> futureList = new ArrayList<>(topicFilters.size());
for (String topicFilter : topicFilters) {
final boolean removed = mqttSubscriptionManager.removeSubscriptionForTopic(clientId, topicFilter);
if (!removed) {
throw new MQTTTopicNotExistedException(
String.format("Can not found topic when %s unsubscribe.", clientId));
}
metricsCollector.removeSub(topicFilter);
CompletableFuture<List<String>> topicListFuture = PulsarTopicUtils.asyncGetTopicListFromTopicSubscription(
topicFilter, configuration.getDefaultTenant(), configuration.getDefaultNamespace(), pulsarService,
Expand Down

0 comments on commit 7ee99e9

Please sign in to comment.