Skip to content

Commit

Permalink
Add filter system topic when using EventCenter (#1219)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Jan 23, 2024
1 parent 9efcb3e commit 7e4ea20
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.metadata.api.Notification;

Expand Down Expand Up @@ -59,25 +60,28 @@ public void accept(Notification notification) {
return;
}
final String path = notification.getPath();
// give the task to another thread to avoid blocking metadata
callbackExecutor.executeOrdered(path, () -> {
for (PulsarEventListener listener : listeners.stream().filter(listeners ->
listeners.matchPath(path)).collect(Collectors.toList())) {
try {
switch (notification.getType()) {
case Created:
listener.onNodeCreated(path);
break;
case Deleted:
listener.onNodeDeleted(path);
break;
default:
break;
final List<PulsarEventListener> matcherListeners = listeners.stream().filter(listeners ->
listeners.matchPath(path)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(matcherListeners)) {
// give the task to another thread to avoid blocking metadata
callbackExecutor.executeOrdered(path, () -> {
for (PulsarEventListener listener : matcherListeners) {
try {
switch (notification.getType()) {
case Created:
listener.onNodeCreated(path);
break;
case Deleted:
listener.onNodeDeleted(path);
break;
default:
break;
}
} catch (Throwable ex) {
log.warn("notify change {} {} failed.", notification.getType(), notification.getPath(), ex);
}
} catch (Throwable ex) {
log.warn("notify change {} {} failed.", notification.getType(), notification.getPath(), ex);
}
}
});
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
import io.streamnative.pulsar.handlers.mqtt.utils.EventParserUtils;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;

public interface PulsarTopicChangeListener extends PulsarEventListener {
String MANAGED_LEDGER_PATH = "/managed-ledgers";
String REGEX = MANAGED_LEDGER_PATH + "/" + ".*" + "/persistent/";
Pattern PATTERN = Pattern.compile(REGEX);

boolean FILTER_SYSTEM_TOPIC = true;

@Override
default boolean matchPath(String path) {
String listenNamespace = getListenNamespace();
Expand All @@ -41,6 +44,9 @@ default String getListenNamespace() {
default void onNodeCreated(String path) {
try {
TopicName topicName = EventParserUtils.parseTopicNameFromManagedLedgerEvent(path);
if (isFilterSystemTopic() && SystemTopicNames.isSystemTopic(topicName)) {
return;
}
onTopicLoad(topicName);
} catch (IllegalArgumentException ex) {
// NO-OP don't notify
Expand All @@ -51,6 +57,9 @@ default void onNodeCreated(String path) {
default void onNodeDeleted(String path) {
try {
TopicName topicName = EventParserUtils.parseTopicNameFromManagedLedgerEvent(path);
if (isFilterSystemTopic() && SystemTopicNames.isSystemTopic(topicName)) {
return;
}
onTopicUnload(topicName);
} catch (IllegalArgumentException ex) {
// NO-OP don't notify
Expand All @@ -60,4 +69,8 @@ default void onNodeDeleted(String path) {
void onTopicLoad(TopicName topicName);

void onTopicUnload(TopicName topicName);

default boolean isFilterSystemTopic() {
return FILTER_SYSTEM_TOPIC;
}
}

0 comments on commit 7e4ea20

Please sign in to comment.