Skip to content

Commit

Permalink
Cache Message Metadata in case of multiple subscriptipons
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 12, 2024
1 parent 93abcfb commit 141a1f0
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public JMSFilter() {
public FilterResult filterEntry(Entry entry, FilterContext context) {
long start = System.nanoTime();
try {
return filterEntry(entry, context, false);
return filterEntry(entry, context, false, null);
} finally {
filterProcessingTime
.labels(context.getSubscription().getTopicName(), context.getSubscription().getName())
Expand Down Expand Up @@ -206,7 +206,11 @@ private boolean isHandleOnlySelectors(FilterContext context) {
return handleOnlySelectors;
}

public FilterResult filterEntry(Entry entry, FilterContext context, boolean onMessagePublish) {
public FilterResult filterEntry(
Entry entry,
FilterContext context,
boolean onMessagePublish,
MessageMetadataCache messageMetadataCache) {
Consumer consumer = context.getConsumer();
Map<String, String> consumerMetadata =
consumer != null ? consumer.getMetadata() : Collections.emptyMap();
Expand Down Expand Up @@ -279,7 +283,8 @@ public FilterResult filterEntry(Entry entry, FilterContext context, boolean onMe
selectorOnSubscription,
selector,
subscription,
consumerMetadata);
consumerMetadata,
messageMetadataCache);
}
} catch (Throwable err) {
log.error("Error while processing entry " + err, err);
Expand Down Expand Up @@ -325,13 +330,19 @@ private FilterResult processSingleMessageEntry(
SelectorSupport selectorOnSubscription,
SelectorSupport selector,
Subscription subscription,
Map<String, String> consumerMetadata)
Map<String, String> consumerMetadata,
MessageMetadataCache messageMetadataCache)
throws JMSException {
// here we are dealing with a single message,
// so we can reject the message more easily
PropertyEvaluator typedProperties =
new PropertyEvaluator(
metadata.getPropertiesCount(), metadata.getPropertiesList(), null, metadata, context);
metadata.getPropertiesCount(),
metadata.getPropertiesList(),
null,
metadata,
context,
messageMetadataCache);

if (selectorOnSubscription != null) {
boolean matchesSubscriptionFilter = matches(typedProperties, selectorOnSubscription);
Expand Down Expand Up @@ -414,7 +425,8 @@ private FilterResult processBatchEntry(
singleMessageMetadata.getPropertiesList(),
singleMessageMetadata,
null,
context);
context,
null);

// noLocal filter
// all the messages in the batch come from the Producer/Connection
Expand Down Expand Up @@ -545,8 +557,13 @@ private static class PropertyEvaluator implements Function<String, Object> {
private SingleMessageMetadata singleMessageMetadata;
private MessageMetadata metadata;
private FilterContext context;
private MessageMetadataCache messageMetadataCache;

private Object getProperty(String name) {
if (messageMetadataCache != null) {
return messageMetadataCache.getProperty(
name, n -> JMSFilter.getProperty(propertiesCount, propertiesList, n));
}
return JMSFilter.getProperty(propertiesCount, propertiesList, name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
Expand Down Expand Up @@ -64,6 +67,7 @@
public class JMSPublishFilters implements BrokerInterceptor {
private static final String JMS_FILTER_METADATA = "jms-msg-metadata";
private static final ByteBuf COULDNOT_ACQUIRE_MEMORY_PLACEHOLDER = Unpooled.EMPTY_BUFFER;
private static final int TIMEOUT_READ_ENTRY = 10000; // 10 seconds to read

private static final Histogram filterProcessingTimeOnPublish =
Histogram.build()
Expand Down Expand Up @@ -95,10 +99,10 @@ public class JMSPublishFilters implements BrokerInterceptor {
.create();

private static final Counter readFromLedger =
Counter.build()
.name("pulsar_jmsfilter_entries_read_from_ledger")
.help("Number of entries read from ledgers by JMSPublishFilters interceptor")
.create();
Counter.build()
.name("pulsar_jmsfilter_entries_read_from_ledger")
.help("Number of entries read from ledgers by JMSPublishFilters interceptor")
.create();

private final JMSFilter filter = new JMSFilter(false);
private boolean enabled = false;
Expand Down Expand Up @@ -313,7 +317,16 @@ private void filterAndAckMessage(
try {
final MessageMetadata messageMetadata;
if (messageMetadataUnparsed == COULDNOT_ACQUIRE_MEMORY_PLACEHOLDER) {
entryReadFromBookie = readSingleEntry(ledgerId, entryId, topic).join();
try {
entryReadFromBookie =
readSingleEntry(ledgerId, entryId, topic).get(TIMEOUT_READ_ENTRY, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
} catch (TimeoutException timeoutException) {
// timeout
} catch (ExecutionException err) {
throw err.getCause();
}
if (entryReadFromBookie == null) {
log.error("Could not read entry {}:{} from topic {}", ledgerId, entryId, topic);
return;
Expand All @@ -327,6 +340,11 @@ private void filterAndAckMessage(
messageMetadata =
getMessageMetadata(messageMetadataUnparsed, messageMetadataUnparsed.readableBytes());
}

// if we have more than one subscription we can save a lot of resources by caching the
// properties
MessageMetadataCache messageMetadataCache =
subscriptions.size() > 1 ? new MessageMetadataCache() : null;
for (Subscription subscription : subscriptions) {
if (closed.get()) {
// the broker is shutting down, we cannot process the entries
Expand All @@ -340,7 +358,8 @@ private void filterAndAckMessage(
filterContext.setMsgMetadata(messageMetadata);
filterContext.setConsumer(null);
Entry entry = null; // we would need the Entry only in case of batch messages
EntryFilter.FilterResult filterResult = filter.filterEntry(entry, filterContext, true);
EntryFilter.FilterResult filterResult =
filter.filterEntry(entry, filterContext, true, messageMetadataCache);
if (filterResult == EntryFilter.FilterResult.REJECT) {
if (log.isDebugEnabled()) {
log.debug(
Expand Down Expand Up @@ -380,6 +399,8 @@ private static CompletableFuture<ByteBuf> readSingleEntry(

PositionImpl position = new PositionImpl(ledgerId, entryId);
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger();
// asyncReadEntry reads from the Broker cache, and falls bach to the Bookie
// is also leverage bookie read deduplication
managedLedger.asyncReadEntry(
position,
new AsyncCallbacks.ReadEntryCallback() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms.selectors;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

class MessageMetadataCache {
final Map<String, Object> properties = new HashMap<>();

Object getProperty(String key, Function<String, Object> compute) {
return properties.computeIfAbsent(key, compute);
}
}
8 changes: 4 additions & 4 deletions pulsar-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<mkdir dir="${project.build.outputDirectory}/interceptors" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<mkdir dir="${project.build.outputDirectory}/interceptors"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down

0 comments on commit 141a1f0

Please sign in to comment.