diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/event/PulsarEventCenterImpl.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/event/PulsarEventCenterImpl.java index 811b9f35f..da6f9fdc1 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/event/PulsarEventCenterImpl.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/event/PulsarEventCenterImpl.java @@ -19,6 +19,7 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.OrderedExecutor; +import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.metadata.api.Notification; @@ -59,25 +60,28 @@ public void accept(Notification notification) { return; } final String path = notification.getPath(); - // give the task to another thread to avoid blocking metadata - callbackExecutor.executeOrdered(path, () -> { - for (PulsarEventListener listener : listeners.stream().filter(listeners -> - listeners.matchPath(path)).collect(Collectors.toList())) { - try { - switch (notification.getType()) { - case Created: - listener.onNodeCreated(path); - break; - case Deleted: - listener.onNodeDeleted(path); - break; - default: - break; + final List matcherListeners = listeners.stream().filter(listeners -> + listeners.matchPath(path)).collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(matcherListeners)) { + // give the task to another thread to avoid blocking metadata + callbackExecutor.executeOrdered(path, () -> { + for (PulsarEventListener listener : matcherListeners) { + try { + switch (notification.getType()) { + case Created: + listener.onNodeCreated(path); + break; + case Deleted: + listener.onNodeDeleted(path); + break; + default: + break; + } + } catch (Throwable ex) { + log.warn("notify change {} {} failed.", notification.getType(), notification.getPath(), ex); } - } catch (Throwable ex) { - log.warn("notify change {} {} failed.", notification.getType(), notification.getPath(), ex); } - } - }); + }); + } } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/event/PulsarTopicChangeListener.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/event/PulsarTopicChangeListener.java index d3a859f6d..62a4baf69 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/event/PulsarTopicChangeListener.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/event/PulsarTopicChangeListener.java @@ -16,6 +16,7 @@ import io.streamnative.pulsar.handlers.mqtt.utils.EventParserUtils; import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; public interface PulsarTopicChangeListener extends PulsarEventListener { @@ -23,6 +24,8 @@ public interface PulsarTopicChangeListener extends PulsarEventListener { String REGEX = MANAGED_LEDGER_PATH + "/" + ".*" + "/persistent/"; Pattern PATTERN = Pattern.compile(REGEX); + boolean FILTER_SYSTEM_TOPIC = true; + @Override default boolean matchPath(String path) { String listenNamespace = getListenNamespace(); @@ -41,6 +44,9 @@ default String getListenNamespace() { default void onNodeCreated(String path) { try { TopicName topicName = EventParserUtils.parseTopicNameFromManagedLedgerEvent(path); + if (isFilterSystemTopic() && SystemTopicNames.isSystemTopic(topicName)) { + return; + } onTopicLoad(topicName); } catch (IllegalArgumentException ex) { // NO-OP don't notify @@ -51,6 +57,9 @@ default void onNodeCreated(String path) { default void onNodeDeleted(String path) { try { TopicName topicName = EventParserUtils.parseTopicNameFromManagedLedgerEvent(path); + if (isFilterSystemTopic() && SystemTopicNames.isSystemTopic(topicName)) { + return; + } onTopicUnload(topicName); } catch (IllegalArgumentException ex) { // NO-OP don't notify @@ -60,4 +69,8 @@ default void onNodeDeleted(String path) { void onTopicLoad(TopicName topicName); void onTopicUnload(TopicName topicName); + + default boolean isFilterSystemTopic() { + return FILTER_SYSTEM_TOPIC; + } }