From fc219f78ec821100315432b973ba88f8f96aa6ef Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 May 2024 07:44:31 +0200 Subject: [PATCH 1/7] MSPublishFilters: improve memory footprint --- .../jms/selectors/JMSPublishFilters.java | 89 +++++++++++++------ 1 file changed, 61 insertions(+), 28 deletions(-) diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index dc4f92af..4f806a98 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -15,19 +15,25 @@ */ package com.datastax.oss.pulsar.jms.selectors; +import static org.apache.pulsar.common.protocol.Commands.skipBrokerEntryMetadataIfExist; +import static org.apache.pulsar.common.protocol.Commands.skipChecksumIfPresent; import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Histogram; import java.io.IOException; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -139,11 +145,7 @@ public void onMessagePublish( } // we must make a copy because the ByteBuf will be released - MessageMetadata messageMetadata = - new MessageMetadata() - .copyFrom( - Commands.peekMessageMetadata(headersAndPayload, "jms-filter-on-publish", -1)); - + ByteBuf messageMetadata = copyMessageMetadata(headersAndPayload); publishContext.setProperty(JMS_FILTER_METADATA, messageMetadata); // as soon as we find a good reason to apply the filters in messageProduced // we can exit @@ -156,6 +158,16 @@ public void onMessagePublish( } } + public static ByteBuf copyMessageMetadata(ByteBuf buffer) { + int readerIndex = buffer.readerIndex(); + skipBrokerEntryMetadataIfExist(buffer); + skipChecksumIfPresent(buffer); + int metadataSize = (int)buffer.readUnsignedInt(); + ByteBuf copy = buffer.slice(readerIndex, metadataSize).copy(); + buffer.readerIndex(readerIndex); + return copy; + } + @Override public void messageProduced( ServerCnx cnx, @@ -167,35 +179,56 @@ public void messageProduced( if (!enabled) { return; } - MessageMetadata messageMetadata = - (MessageMetadata) publishContext.getProperty(JMS_FILTER_METADATA); - if (messageMetadata == null) { + ByteBuf messageMetadataUnparsed = (ByteBuf) publishContext.getProperty(JMS_FILTER_METADATA); + if (messageMetadataUnparsed == null) { return; } - if (messageMetadata.hasNumMessagesInBatch()) { - return; + try { + producer.getTopic().getSubscriptions().forEach((___, subscription) -> { + if (!(subscription instanceof PersistentSubscription)) { + return; + } + Map subscriptionProperties = subscription.getSubscriptionProperties(); + if (!subscriptionProperties.containsKey("jms.selector")) { + return; + } + messageMetadataUnparsed.retain(); + scheduleOnDispatchThread(subscription, new FilterAndAckMessageOperation(ledgerId, entryId, subscription, messageMetadataUnparsed)); + }); + } finally { + messageMetadataUnparsed.release(); } + } - for (Subscription subscription : producer.getTopic().getSubscriptions().values()) { - scheduleOnDispatchThread( - subscription, - () -> { - filterAndAckMessage(producer, ledgerId, entryId, subscription, messageMetadata); - }); + @AllArgsConstructor + private class FilterAndAckMessageOperation implements Runnable { + final long ledgerId; + final long entryId; + final Subscription subscription; + final ByteBuf messageMetadataUnparsed; + + @Override + public void run() { + try { + filterAndAckMessage(ledgerId, entryId, subscription, messageMetadataUnparsed); + } finally { + messageMetadataUnparsed.release(); + } } } private void filterAndAckMessage( - Producer producer, long ledgerId, long entryId, Subscription subscription, - MessageMetadata messageMetadata) { - if (closed.get()) { + ByteBuf messageMetadataUnparsed) { + if (closed.get()) { // the broker is shutting down, we cannot process the entries // this operation has been enqueued before the broker shutdown return; } + MessageMetadata messageMetadata = new MessageMetadata(); + Commands.parseMessageMetadata(messageMetadataUnparsed, messageMetadata); long now = System.nanoTime(); try { FilterContext filterContext = new FilterContext(); @@ -207,23 +240,23 @@ private void filterAndAckMessage( if (filterResult == EntryFilter.FilterResult.REJECT) { if (log.isDebugEnabled()) { log.debug( - "Reject message {}:{} for subscription {}", - ledgerId, - entryId, - subscription.getName()); + "Reject message {}:{} for subscription {}", + ledgerId, + entryId, + subscription.getName()); } // ir is possible that calling this method in this thread may affect // performance // let's keep it simple for now, we can optimize it later subscription.acknowledgeMessage( - Collections.singletonList(new PositionImpl(ledgerId, entryId)), - CommandAck.AckType.Individual, - null); + Collections.singletonList(new PositionImpl(ledgerId, entryId)), + CommandAck.AckType.Individual, + null); } } finally { filterProcessingTimeOnProduce - .labels(producer.getTopic().getName(), subscription.getName()) - .observe(System.nanoTime() - now); + .labels(subscription.getTopic().getName(), subscription.getName()) + .observe(System.nanoTime() - now); } } From 57a077c485eda203a83e1589d9392358f03e611b Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 May 2024 09:06:38 +0200 Subject: [PATCH 2/7] Add memory limit --- .../jms/selectors/JMSPublishFilters.java | 95 ++++++++++++++----- 1 file changed, 71 insertions(+), 24 deletions(-) diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index 4f806a98..9cc7ade8 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -19,17 +19,18 @@ import static org.apache.pulsar.common.protocol.Commands.skipChecksumIfPresent; import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; import java.io.IOException; import java.lang.reflect.Field; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; @@ -74,8 +75,23 @@ public class JMSPublishFilters implements BrokerInterceptor { .labelNames("topic", "subscription") .create(); + private static final Gauge memoryUsed = + Gauge.build() + .name("pulsar_jmsfilter_processing_memory") + .help( + "Current memory held by the JMSPublishFilters interceptor") + .create(); + + private static final Gauge pendingOperations = + Gauge.build() + .name("pulsar_jmsfilter_processing_pending_operations") + .help( + "Number of pending operations in the JMSPublishFilters interceptor") + .create(); + private final JMSFilter filter = new JMSFilter(false); private boolean enabled = false; + private Semaphore memoryLimit; private final AtomicBoolean closed = new AtomicBoolean(); private static final Field dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers; @@ -120,6 +136,21 @@ public void initialize(PulsarService pulsarService) { // ignore log.info("Filter metrics already registered", alreadyRegistered); } + String memoryLimitString = pulsarService + .getConfiguration() + .getProperties() + .getProperty("jmsFiltersOnPublishMaxMemoryMB", "128"); + + try { + int memoryLimitBytes = Integer.parseInt(pulsarService + .getConfiguration() + .getProperties() + .getProperty("jmsFiltersOnPublishMaxMemoryMB", "64")) * 1024 * 1024; + memoryLimit = new Semaphore(memoryLimitBytes); + log.info("jmsFiltersOnPublishMaxMemoryMB={} ({} bytes)", memoryLimitString, memoryLimitBytes); + } catch (NumberFormatException e) { + throw new RuntimeException("Invalid memory limit jmsFiltersOnPublishMaxMemoryMB=" + memoryLimitString, e); + } } @Override @@ -136,16 +167,11 @@ public void onMessagePublish( long now = System.nanoTime(); try { for (Subscription subscription : producer.getTopic().getSubscriptions().values()) { - if (!(subscription instanceof PersistentSubscription)) { + if (!(isPersistentSubscriptionWithSelector(subscription))) { continue; } - Map subscriptionProperties = subscription.getSubscriptionProperties(); - if (!subscriptionProperties.containsKey("jms.selector")) { - continue; - } - // we must make a copy because the ByteBuf will be released - ByteBuf messageMetadata = copyMessageMetadata(headersAndPayload); + ByteBuf messageMetadata = copyMessageMetadataAndAcquireMemory(headersAndPayload); publishContext.setProperty(JMS_FILTER_METADATA, messageMetadata); // as soon as we find a good reason to apply the filters in messageProduced // we can exit @@ -158,11 +184,18 @@ public void onMessagePublish( } } - public static ByteBuf copyMessageMetadata(ByteBuf buffer) { + public ByteBuf copyMessageMetadataAndAcquireMemory(ByteBuf buffer) { int readerIndex = buffer.readerIndex(); skipBrokerEntryMetadataIfExist(buffer); skipChecksumIfPresent(buffer); - int metadataSize = (int)buffer.readUnsignedInt(); + int metadataSize = (int) buffer.readUnsignedInt(); + // this is going to throttle the producer if the memory limit is reached + // please note that this is a blocking operation on the Netty eventpool + // currently we cannnot do better than this, as the interceptor API is blocking + memoryLimit.acquireUninterruptibly(metadataSize); + // please note that Netty would probably retain more memory than this buffer + // but this is the best approximation we can do + memoryUsed.inc(metadataSize); ByteBuf copy = buffer.slice(readerIndex, metadataSize).copy(); buffer.readerIndex(readerIndex); return copy; @@ -176,43 +209,57 @@ public void messageProduced( long ledgerId, long entryId, Topic.PublishContext publishContext) { - if (!enabled) { - return; - } ByteBuf messageMetadataUnparsed = (ByteBuf) publishContext.getProperty(JMS_FILTER_METADATA); if (messageMetadataUnparsed == null) { return; } + if (!enabled) { + return; + } + int memorySize = messageMetadataUnparsed.readableBytes(); + AtomicInteger pending = new AtomicInteger(1); + Runnable onComplete = () -> { + pendingOperations.dec(); + if (pending.decrementAndGet() == 0) { + messageMetadataUnparsed.release(); + memoryLimit.release(memorySize); + memoryUsed.dec(memorySize); + } + }; try { producer.getTopic().getSubscriptions().forEach((___, subscription) -> { - if (!(subscription instanceof PersistentSubscription)) { + if (!(isPersistentSubscriptionWithSelector(subscription))) { return; } - Map subscriptionProperties = subscription.getSubscriptionProperties(); - if (!subscriptionProperties.containsKey("jms.selector")) { - return; - } - messageMetadataUnparsed.retain(); - scheduleOnDispatchThread(subscription, new FilterAndAckMessageOperation(ledgerId, entryId, subscription, messageMetadataUnparsed)); + pending.incrementAndGet(); + pendingOperations.inc(); + scheduleOnDispatchThread(subscription, + new FilterAndAckMessageOperation(ledgerId, entryId, subscription, messageMetadataUnparsed, onComplete)); }); } finally { - messageMetadataUnparsed.release(); + onComplete.run(); } } + private static boolean isPersistentSubscriptionWithSelector(Subscription subscription) { + return subscription instanceof PersistentSubscription + && subscription.getSubscriptionProperties().containsKey("jms.selector"); + } + @AllArgsConstructor private class FilterAndAckMessageOperation implements Runnable { final long ledgerId; final long entryId; final Subscription subscription; final ByteBuf messageMetadataUnparsed; + final Runnable onComplete; @Override public void run() { try { filterAndAckMessage(ledgerId, entryId, subscription, messageMetadataUnparsed); } finally { - messageMetadataUnparsed.release(); + onComplete.run(); } } } From eac3033b2f510b5ebade63513ae7be7d634c5acd Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 May 2024 09:20:31 +0200 Subject: [PATCH 3/7] Register metrics --- .../jms/selectors/JMSPublishFilters.java | 113 ++++++++++-------- 1 file changed, 61 insertions(+), 52 deletions(-) diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index 9cc7ade8..460ce5bb 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -17,6 +17,7 @@ import static org.apache.pulsar.common.protocol.Commands.skipBrokerEntryMetadataIfExist; import static org.apache.pulsar.common.protocol.Commands.skipChecksumIfPresent; + import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; @@ -24,13 +25,11 @@ import java.io.IOException; import java.lang.reflect.Field; import java.util.Collections; -import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; @@ -76,18 +75,16 @@ public class JMSPublishFilters implements BrokerInterceptor { .create(); private static final Gauge memoryUsed = - Gauge.build() - .name("pulsar_jmsfilter_processing_memory") - .help( - "Current memory held by the JMSPublishFilters interceptor") - .create(); + Gauge.build() + .name("pulsar_jmsfilter_processing_memory") + .help("Current memory held by the JMSPublishFilters interceptor") + .create(); private static final Gauge pendingOperations = - Gauge.build() - .name("pulsar_jmsfilter_processing_pending_operations") - .help( - "Number of pending operations in the JMSPublishFilters interceptor") - .create(); + Gauge.build() + .name("pulsar_jmsfilter_processing_pending_operations") + .help("Number of pending operations in the JMSPublishFilters interceptor") + .create(); private final JMSFilter filter = new JMSFilter(false); private boolean enabled = false; @@ -132,24 +129,32 @@ public void initialize(PulsarService pulsarService) { log.info("Registering JMSFilter metrics"); CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnPublish); CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnProduce); + CollectorRegistry.defaultRegistry.register(memoryUsed); + CollectorRegistry.defaultRegistry.register(pendingOperations); } catch (IllegalArgumentException alreadyRegistered) { // ignore log.info("Filter metrics already registered", alreadyRegistered); } - String memoryLimitString = pulsarService + String memoryLimitString = + pulsarService .getConfiguration() .getProperties() .getProperty("jmsFiltersOnPublishMaxMemoryMB", "128"); try { - int memoryLimitBytes = Integer.parseInt(pulsarService - .getConfiguration() - .getProperties() - .getProperty("jmsFiltersOnPublishMaxMemoryMB", "64")) * 1024 * 1024; - memoryLimit = new Semaphore(memoryLimitBytes); - log.info("jmsFiltersOnPublishMaxMemoryMB={} ({} bytes)", memoryLimitString, memoryLimitBytes); + int memoryLimitBytes = + Integer.parseInt( + pulsarService + .getConfiguration() + .getProperties() + .getProperty("jmsFiltersOnPublishMaxMemoryMB", "64")) + * 1024 + * 1024; + memoryLimit = new Semaphore(memoryLimitBytes); + log.info("jmsFiltersOnPublishMaxMemoryMB={} ({} bytes)", memoryLimitString, memoryLimitBytes); } catch (NumberFormatException e) { - throw new RuntimeException("Invalid memory limit jmsFiltersOnPublishMaxMemoryMB=" + memoryLimitString, e); + throw new RuntimeException( + "Invalid memory limit jmsFiltersOnPublishMaxMemoryMB=" + memoryLimitString, e); } } @@ -188,7 +193,7 @@ public ByteBuf copyMessageMetadataAndAcquireMemory(ByteBuf buffer) { int readerIndex = buffer.readerIndex(); skipBrokerEntryMetadataIfExist(buffer); skipChecksumIfPresent(buffer); - int metadataSize = (int) buffer.readUnsignedInt(); + int metadataSize = (int) buffer.readUnsignedInt(); // this is going to throttle the producer if the memory limit is reached // please note that this is a blocking operation on the Netty eventpool // currently we cannnot do better than this, as the interceptor API is blocking @@ -218,24 +223,31 @@ public void messageProduced( } int memorySize = messageMetadataUnparsed.readableBytes(); AtomicInteger pending = new AtomicInteger(1); - Runnable onComplete = () -> { - pendingOperations.dec(); - if (pending.decrementAndGet() == 0) { - messageMetadataUnparsed.release(); - memoryLimit.release(memorySize); - memoryUsed.dec(memorySize); - } - }; + Runnable onComplete = + () -> { + pendingOperations.dec(); + if (pending.decrementAndGet() == 0) { + messageMetadataUnparsed.release(); + memoryLimit.release(memorySize); + memoryUsed.dec(memorySize); + } + }; try { - producer.getTopic().getSubscriptions().forEach((___, subscription) -> { - if (!(isPersistentSubscriptionWithSelector(subscription))) { - return; - } - pending.incrementAndGet(); - pendingOperations.inc(); - scheduleOnDispatchThread(subscription, - new FilterAndAckMessageOperation(ledgerId, entryId, subscription, messageMetadataUnparsed, onComplete)); - }); + producer + .getTopic() + .getSubscriptions() + .forEach( + (___, subscription) -> { + if (!(isPersistentSubscriptionWithSelector(subscription))) { + return; + } + pending.incrementAndGet(); + pendingOperations.inc(); + scheduleOnDispatchThread( + subscription, + new FilterAndAckMessageOperation( + ledgerId, entryId, subscription, messageMetadataUnparsed, onComplete)); + }); } finally { onComplete.run(); } @@ -265,11 +277,8 @@ public void run() { } private void filterAndAckMessage( - long ledgerId, - long entryId, - Subscription subscription, - ByteBuf messageMetadataUnparsed) { - if (closed.get()) { + long ledgerId, long entryId, Subscription subscription, ByteBuf messageMetadataUnparsed) { + if (closed.get()) { // the broker is shutting down, we cannot process the entries // this operation has been enqueued before the broker shutdown return; @@ -287,23 +296,23 @@ private void filterAndAckMessage( if (filterResult == EntryFilter.FilterResult.REJECT) { if (log.isDebugEnabled()) { log.debug( - "Reject message {}:{} for subscription {}", - ledgerId, - entryId, - subscription.getName()); + "Reject message {}:{} for subscription {}", + ledgerId, + entryId, + subscription.getName()); } // ir is possible that calling this method in this thread may affect // performance // let's keep it simple for now, we can optimize it later subscription.acknowledgeMessage( - Collections.singletonList(new PositionImpl(ledgerId, entryId)), - CommandAck.AckType.Individual, - null); + Collections.singletonList(new PositionImpl(ledgerId, entryId)), + CommandAck.AckType.Individual, + null); } } finally { filterProcessingTimeOnProduce - .labels(subscription.getTopic().getName(), subscription.getName()) - .observe(System.nanoTime() - now); + .labels(subscription.getTopic().getName(), subscription.getName()) + .observe(System.nanoTime() - now); } } From 17bf86c6935738b8feeafe57b745b1c4ae1d6ea4 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 May 2024 09:32:42 +0200 Subject: [PATCH 4/7] pulsar-jms-filters/ --- .../com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index 460ce5bb..3f2e9f50 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -223,6 +223,7 @@ public void messageProduced( } int memorySize = messageMetadataUnparsed.readableBytes(); AtomicInteger pending = new AtomicInteger(1); + pendingOperations.inc(); Runnable onComplete = () -> { pendingOperations.dec(); From f73df6c75388a12998e2c82d6b0d09039a524b4d Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 May 2024 09:42:22 +0200 Subject: [PATCH 5/7] Fix parsing metadata --- .../pulsar/jms/selectors/JMSPublishFilters.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index 3f2e9f50..a0773a80 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -53,7 +53,7 @@ import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.intercept.InterceptException; -import org.apache.pulsar.common.protocol.Commands; +import org.jetbrains.annotations.NotNull; @Slf4j public class JMSPublishFilters implements BrokerInterceptor { @@ -284,8 +284,7 @@ private void filterAndAckMessage( // this operation has been enqueued before the broker shutdown return; } - MessageMetadata messageMetadata = new MessageMetadata(); - Commands.parseMessageMetadata(messageMetadataUnparsed, messageMetadata); + MessageMetadata messageMetadata = getMessageMetadata(messageMetadataUnparsed); long now = System.nanoTime(); try { FilterContext filterContext = new FilterContext(); @@ -317,6 +316,17 @@ private void filterAndAckMessage( } } + @NotNull + private static MessageMetadata getMessageMetadata(ByteBuf messageMetadataUnparsed) { + MessageMetadata messageMetadata = new MessageMetadata(); + synchronized (messageMetadataUnparsed) { + int index = messageMetadataUnparsed.readerIndex(); + messageMetadata.parseFrom(messageMetadataUnparsed, messageMetadataUnparsed.readableBytes()); + messageMetadataUnparsed.readerIndex(index); + } + return messageMetadata; + } + private static void scheduleOnDispatchThread(Subscription subscription, Runnable runnable) { try { Dispatcher dispatcher = subscription.getDispatcher(); From 1a8e4d7d46342a4e53333c79a4b6ae431850157b Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 May 2024 12:27:19 +0200 Subject: [PATCH 6/7] Fix memory copy --- .../jms/selectors/JMSPublishFilters.java | 36 +++++++++++++------ 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index a0773a80..3e23c7de 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -19,6 +19,8 @@ import static org.apache.pulsar.common.protocol.Commands.skipChecksumIfPresent; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.PooledByteBufAllocator; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; @@ -196,16 +198,30 @@ public ByteBuf copyMessageMetadataAndAcquireMemory(ByteBuf buffer) { int metadataSize = (int) buffer.readUnsignedInt(); // this is going to throttle the producer if the memory limit is reached // please note that this is a blocking operation on the Netty eventpool - // currently we cannnot do better than this, as the interceptor API is blocking + // currently we cannot do better than this, as the interceptor API is blocking memoryLimit.acquireUninterruptibly(metadataSize); // please note that Netty would probably retain more memory than this buffer // but this is the best approximation we can do memoryUsed.inc(metadataSize); - ByteBuf copy = buffer.slice(readerIndex, metadataSize).copy(); + ByteBuf copy = PooledByteBufAllocator.DEFAULT.buffer(metadataSize); + buffer.readBytes(copy); buffer.readerIndex(readerIndex); return copy; } + private static void dumpMetadata(ByteBuf copy, int metadataSize) { + int index = copy.readerIndex(); + MessageMetadata msgMetadata = new MessageMetadata(); + msgMetadata.parseFrom(copy, metadataSize); + msgMetadata + .getPropertiesList() + .forEach( + p -> { + log.info("Property: {}={}", p.getKey(), p.getValue()); + }); + copy.readerIndex(index); + } + @Override public void messageProduced( ServerCnx cnx, @@ -244,10 +260,11 @@ public void messageProduced( } pending.incrementAndGet(); pendingOperations.inc(); - scheduleOnDispatchThread( - subscription, + ByteBuf duplicate = messageMetadataUnparsed.duplicate(); + FilterAndAckMessageOperation filterAndAckMessageOperation = new FilterAndAckMessageOperation( - ledgerId, entryId, subscription, messageMetadataUnparsed, onComplete)); + ledgerId, entryId, subscription, duplicate, onComplete); + scheduleOnDispatchThread(subscription, filterAndAckMessageOperation); }); } finally { onComplete.run(); @@ -319,11 +336,10 @@ private void filterAndAckMessage( @NotNull private static MessageMetadata getMessageMetadata(ByteBuf messageMetadataUnparsed) { MessageMetadata messageMetadata = new MessageMetadata(); - synchronized (messageMetadataUnparsed) { - int index = messageMetadataUnparsed.readerIndex(); - messageMetadata.parseFrom(messageMetadataUnparsed, messageMetadataUnparsed.readableBytes()); - messageMetadataUnparsed.readerIndex(index); - } + int size = messageMetadataUnparsed.readableBytes(); + log.info("size {}", size); + log.info("here {}", ByteBufUtil.hexDump(messageMetadataUnparsed)); + messageMetadata.parseFrom(messageMetadataUnparsed, messageMetadataUnparsed.readableBytes()); return messageMetadata; } From 7df790816a737e2e3e11d34040f6c98c79af0436 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 May 2024 14:32:45 +0200 Subject: [PATCH 7/7] Clean up --- .../jms/selectors/JMSPublishFilters.java | 45 +++-------- pulsar-jms-integration-tests/pom.xml | 4 +- pulsar-jms/pom.xml | 8 +- .../CompletableFutureCompletionListener.java | 35 ++++++++ .../oss/pulsar/jms/JMSPublishFiltersTest.java | 81 +++++++++++++++++++ 5 files changed, 133 insertions(+), 40 deletions(-) create mode 100644 pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/CompletableFutureCompletionListener.java diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index 3e23c7de..1a121640 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -19,7 +19,6 @@ import static org.apache.pulsar.common.protocol.Commands.skipChecksumIfPresent; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.buffer.PooledByteBufAllocator; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; @@ -32,6 +31,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; @@ -55,7 +55,6 @@ import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.intercept.InterceptException; -import org.jetbrains.annotations.NotNull; @Slf4j public class JMSPublishFilters implements BrokerInterceptor { @@ -144,14 +143,7 @@ public void initialize(PulsarService pulsarService) { .getProperty("jmsFiltersOnPublishMaxMemoryMB", "128"); try { - int memoryLimitBytes = - Integer.parseInt( - pulsarService - .getConfiguration() - .getProperties() - .getProperty("jmsFiltersOnPublishMaxMemoryMB", "64")) - * 1024 - * 1024; + int memoryLimitBytes = Integer.parseInt(memoryLimitString) * 1024 * 1024; memoryLimit = new Semaphore(memoryLimitBytes); log.info("jmsFiltersOnPublishMaxMemoryMB={} ({} bytes)", memoryLimitString, memoryLimitBytes); } catch (NumberFormatException e) { @@ -209,19 +201,6 @@ public ByteBuf copyMessageMetadataAndAcquireMemory(ByteBuf buffer) { return copy; } - private static void dumpMetadata(ByteBuf copy, int metadataSize) { - int index = copy.readerIndex(); - MessageMetadata msgMetadata = new MessageMetadata(); - msgMetadata.parseFrom(copy, metadataSize); - msgMetadata - .getPropertiesList() - .forEach( - p -> { - log.info("Property: {}={}", p.getKey(), p.getValue()); - }); - copy.readerIndex(index); - } - @Override public void messageProduced( ServerCnx cnx, @@ -239,10 +218,12 @@ public void messageProduced( } int memorySize = messageMetadataUnparsed.readableBytes(); AtomicInteger pending = new AtomicInteger(1); - pendingOperations.inc(); - Runnable onComplete = - () -> { - pendingOperations.dec(); + Consumer onComplete = + (mainThread) -> { + if (!mainThread) { + // the main thread doesn't count as a pending operation + pendingOperations.dec(); + } if (pending.decrementAndGet() == 0) { messageMetadataUnparsed.release(); memoryLimit.release(memorySize); @@ -267,7 +248,7 @@ public void messageProduced( scheduleOnDispatchThread(subscription, filterAndAckMessageOperation); }); } finally { - onComplete.run(); + onComplete.accept(true); } } @@ -282,14 +263,14 @@ private class FilterAndAckMessageOperation implements Runnable { final long entryId; final Subscription subscription; final ByteBuf messageMetadataUnparsed; - final Runnable onComplete; + final Consumer onComplete; @Override public void run() { try { filterAndAckMessage(ledgerId, entryId, subscription, messageMetadataUnparsed); } finally { - onComplete.run(); + onComplete.accept(false); } } } @@ -333,12 +314,8 @@ private void filterAndAckMessage( } } - @NotNull private static MessageMetadata getMessageMetadata(ByteBuf messageMetadataUnparsed) { MessageMetadata messageMetadata = new MessageMetadata(); - int size = messageMetadataUnparsed.readableBytes(); - log.info("size {}", size); - log.info("here {}", ByteBufUtil.hexDump(messageMetadataUnparsed)); messageMetadata.parseFrom(messageMetadataUnparsed, messageMetadataUnparsed.readableBytes()); return messageMetadata; } diff --git a/pulsar-jms-integration-tests/pom.xml b/pulsar-jms-integration-tests/pom.xml index 87d06043..e63e59d8 100644 --- a/pulsar-jms-integration-tests/pom.xml +++ b/pulsar-jms-integration-tests/pom.xml @@ -103,8 +103,8 @@ copy filters - - + + diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml index 4befd1d3..a72666bb 100644 --- a/pulsar-jms/pom.xml +++ b/pulsar-jms/pom.xml @@ -128,10 +128,10 @@ copy filters - - - - + + + + diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/CompletableFutureCompletionListener.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/CompletableFutureCompletionListener.java new file mode 100644 index 00000000..faa099fa --- /dev/null +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/CompletableFutureCompletionListener.java @@ -0,0 +1,35 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms; + +import java.util.concurrent.CompletableFuture; +import javax.jms.CompletionListener; +import javax.jms.Message; + +/** Utility class to convert a CompletionListener into a CompletableFuture. */ +public class CompletableFutureCompletionListener extends CompletableFuture + implements CompletionListener { + + @Override + public void onCompletion(Message message) { + complete(message); + } + + @Override + public void onException(Message message, Exception exception) { + completeExceptionally(exception); + } +} diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java index 2872106c..060f25f2 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java @@ -19,17 +19,26 @@ import static org.junit.jupiter.api.Assertions.assertNull; import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import javax.jms.CompletionListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; +import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats; +import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.RegisterExtension; @@ -154,4 +163,76 @@ private void sendMessageReceiveFromQueue(boolean transacted) throws Exception { } } } + + @Test + void testManyMessagesWithPartitions() throws Exception { + Map properties = buildProperties(); + String topicName = "persistent://public/default/test-" + UUID.randomUUID(); + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { + try (PulsarConnection connection = factory.createConnection()) { + connection.start(); + try (PulsarSession session = connection.createSession(Session.AUTO_ACKNOWLEDGE); ) { + factory.getPulsarAdmin().topics().createPartitionedTopic(topicName, 20); + + Queue destination = session.createQueue(topicName); + + try (PulsarMessageConsumer consumer1 = session.createConsumer(destination); ) { + assertEquals( + SubscriptionType.Shared, ((PulsarMessageConsumer) consumer1).getSubscriptionType()); + + String newSelector = "foo='bar'"; + Map subscriptionProperties = new HashMap<>(); + subscriptionProperties.put("jms.selector", newSelector); + subscriptionProperties.put("jms.filtering", "true"); + + pulsarContainer + .getAdmin() + .topics() + .updateSubscriptionProperties(topicName, "jms-queue", subscriptionProperties); + + int numMessages = 10000; + try (MessageProducer producer = session.createProducer(destination); ) { + List futures = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + TextMessage textMessage = session.createTextMessage("foo-" + i); + for (int j = 0; j < 10; j++) { + textMessage.setIntProperty("some" + j, j); + } + // half of the messages pass the filter + if (i % 2 == 0) { + textMessage.setStringProperty("foo", "bar"); + } + CompletableFutureCompletionListener listener = new CompletableFutureCompletionListener(); + futures.add(listener); + producer.send(textMessage, listener); + if (futures.size() == 1000) { + for (CompletableFutureCompletionListener future : futures) { + future.get(); + } + futures.clear(); + } + } + for (CompletableFutureCompletionListener future : futures) { + future.get(); + } + } + + // wait for the filters to be processed in background + Awaitility.await().untilAsserted(() -> { + PartitionedTopicStats partitionedInternalStats = + factory.getPulsarAdmin().topics().getPartitionedStats(topicName, true); + AtomicLong sum = new AtomicLong(); + partitionedInternalStats.getPartitions().forEach((partition, stats) -> { + SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue"); + log.info("backlog for partition {}: {}", partition, subscriptionStats.getMsgBacklog()); + sum.addAndGet(subscriptionStats.getMsgBacklog()); + }); + log.info("total backlog: {}", sum.get()); + assertEquals(numMessages / 2, sum.get()); + }); + } + } + } + } + } }