diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java index 64408315..607c2acd 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java @@ -566,8 +566,7 @@ private static class PropertyEvaluator implements Function { private Object getProperty(String name) { if (messageMetadataCache != null) { - return messageMetadataCache.getProperty( - name, n -> JMSFilter.getProperty(propertiesCount, propertiesList, n)); + return messageMetadataCache.getProperty(name); } return JMSFilter.getProperty(propertiesCount, propertiesList, name); } @@ -729,6 +728,20 @@ private static Object getProperty( return getObjectProperty(value, type); } + static Object getProperty(Map cacheProperties, String name) { + if (cacheProperties.isEmpty()) { + return null; + } + // we don't write the type for system fields + // we pre-compute the type in order to avoid to scan the list to fine the type + String type = SYSTEM_PROPERTIES_TYPES.get(name); + if (type == null) { + type = cacheProperties.get(propertyType(name)); + } + String value = cacheProperties.get(name); + return getObjectProperty(value, type); + } + @Override public void close() { selectors.clear(); diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index 1e171098..499fe303 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -411,7 +411,7 @@ private void filterAndAckMessage( // if we have more than one subscription we can save a lot of resources by caching the // properties MessageMetadataCache messageMetadataCache = - subscriptions.size() > 1 ? new MessageMetadataCache() : null; + subscriptions.size() > 1 ? new MessageMetadataCache(messageMetadata) : null; for (Subscription subscription : subscriptions) { if (closed.get()) { // the broker is shutting down, we cannot process the entries diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCache.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCache.java index c0b2f685..bb8134cb 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCache.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCache.java @@ -17,12 +17,36 @@ import java.util.HashMap; import java.util.Map; -import java.util.function.Function; +import org.apache.pulsar.common.api.proto.MessageMetadata; class MessageMetadataCache { + final Map rawProperties = new HashMap<>(); final Map properties = new HashMap<>(); + private static final Object CACHED_NULL = new Object(); - Object getProperty(String key, Function compute) { - return properties.computeIfAbsent(key, compute); + MessageMetadataCache(MessageMetadata metadata) { + // computing this is expensive because it involves a lot of string manipulation + // protobuf has to parse the bytes and then convert them to Strings + // so we want to do it only once + // please note that when a selector references a property that is not + // in the message we would end up in scanning the whole list + metadata.getPropertiesList().forEach(p -> rawProperties.put(p.getKey(), p.getValue())); + } + + Object getProperty(String key) { + Object cached = properties.get(key); + if (cached == CACHED_NULL) { + return null; + } + if (cached != null) { + return cached; + } + Object result = JMSFilter.getProperty(rawProperties, key); + if (result == null) { + properties.put(key, CACHED_NULL); + } else { + properties.put(key, result); + } + return result; } }