diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index dc4f92af..1a121640 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -15,19 +15,27 @@ */ package com.datastax.oss.pulsar.jms.selectors; +import static org.apache.pulsar.common.protocol.Commands.skipBrokerEntryMetadataIfExist; +import static org.apache.pulsar.common.protocol.Commands.skipChecksumIfPresent; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; import java.io.IOException; import java.lang.reflect.Field; import java.util.Collections; -import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -47,7 +55,6 @@ import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.intercept.InterceptException; -import org.apache.pulsar.common.protocol.Commands; @Slf4j public class JMSPublishFilters implements BrokerInterceptor { @@ -68,8 +75,21 @@ public class JMSPublishFilters implements BrokerInterceptor { .labelNames("topic", "subscription") .create(); + private static final Gauge memoryUsed = + Gauge.build() + .name("pulsar_jmsfilter_processing_memory") + .help("Current memory held by the JMSPublishFilters interceptor") + .create(); + + private static final Gauge pendingOperations = + Gauge.build() + .name("pulsar_jmsfilter_processing_pending_operations") + .help("Number of pending operations in the JMSPublishFilters interceptor") + .create(); + private final JMSFilter filter = new JMSFilter(false); private boolean enabled = false; + private Semaphore memoryLimit; private final AtomicBoolean closed = new AtomicBoolean(); private static final Field dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers; @@ -110,10 +130,26 @@ public void initialize(PulsarService pulsarService) { log.info("Registering JMSFilter metrics"); CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnPublish); CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnProduce); + CollectorRegistry.defaultRegistry.register(memoryUsed); + CollectorRegistry.defaultRegistry.register(pendingOperations); } catch (IllegalArgumentException alreadyRegistered) { // ignore log.info("Filter metrics already registered", alreadyRegistered); } + String memoryLimitString = + pulsarService + .getConfiguration() + .getProperties() + .getProperty("jmsFiltersOnPublishMaxMemoryMB", "128"); + + try { + int memoryLimitBytes = Integer.parseInt(memoryLimitString) * 1024 * 1024; + memoryLimit = new Semaphore(memoryLimitBytes); + log.info("jmsFiltersOnPublishMaxMemoryMB={} ({} bytes)", memoryLimitString, memoryLimitBytes); + } catch (NumberFormatException e) { + throw new RuntimeException( + "Invalid memory limit jmsFiltersOnPublishMaxMemoryMB=" + memoryLimitString, e); + } } @Override @@ -130,20 +166,11 @@ public void onMessagePublish( long now = System.nanoTime(); try { for (Subscription subscription : producer.getTopic().getSubscriptions().values()) { - if (!(subscription instanceof PersistentSubscription)) { + if (!(isPersistentSubscriptionWithSelector(subscription))) { continue; } - Map subscriptionProperties = subscription.getSubscriptionProperties(); - if (!subscriptionProperties.containsKey("jms.selector")) { - continue; - } - // we must make a copy because the ByteBuf will be released - MessageMetadata messageMetadata = - new MessageMetadata() - .copyFrom( - Commands.peekMessageMetadata(headersAndPayload, "jms-filter-on-publish", -1)); - + ByteBuf messageMetadata = copyMessageMetadataAndAcquireMemory(headersAndPayload); publishContext.setProperty(JMS_FILTER_METADATA, messageMetadata); // as soon as we find a good reason to apply the filters in messageProduced // we can exit @@ -156,6 +183,24 @@ public void onMessagePublish( } } + public ByteBuf copyMessageMetadataAndAcquireMemory(ByteBuf buffer) { + int readerIndex = buffer.readerIndex(); + skipBrokerEntryMetadataIfExist(buffer); + skipChecksumIfPresent(buffer); + int metadataSize = (int) buffer.readUnsignedInt(); + // this is going to throttle the producer if the memory limit is reached + // please note that this is a blocking operation on the Netty eventpool + // currently we cannot do better than this, as the interceptor API is blocking + memoryLimit.acquireUninterruptibly(metadataSize); + // please note that Netty would probably retain more memory than this buffer + // but this is the best approximation we can do + memoryUsed.inc(metadataSize); + ByteBuf copy = PooledByteBufAllocator.DEFAULT.buffer(metadataSize); + buffer.readBytes(copy); + buffer.readerIndex(readerIndex); + return copy; + } + @Override public void messageProduced( ServerCnx cnx, @@ -164,38 +209,80 @@ public void messageProduced( long ledgerId, long entryId, Topic.PublishContext publishContext) { - if (!enabled) { + ByteBuf messageMetadataUnparsed = (ByteBuf) publishContext.getProperty(JMS_FILTER_METADATA); + if (messageMetadataUnparsed == null) { return; } - MessageMetadata messageMetadata = - (MessageMetadata) publishContext.getProperty(JMS_FILTER_METADATA); - if (messageMetadata == null) { + if (!enabled) { return; } - if (messageMetadata.hasNumMessagesInBatch()) { - return; + int memorySize = messageMetadataUnparsed.readableBytes(); + AtomicInteger pending = new AtomicInteger(1); + Consumer onComplete = + (mainThread) -> { + if (!mainThread) { + // the main thread doesn't count as a pending operation + pendingOperations.dec(); + } + if (pending.decrementAndGet() == 0) { + messageMetadataUnparsed.release(); + memoryLimit.release(memorySize); + memoryUsed.dec(memorySize); + } + }; + try { + producer + .getTopic() + .getSubscriptions() + .forEach( + (___, subscription) -> { + if (!(isPersistentSubscriptionWithSelector(subscription))) { + return; + } + pending.incrementAndGet(); + pendingOperations.inc(); + ByteBuf duplicate = messageMetadataUnparsed.duplicate(); + FilterAndAckMessageOperation filterAndAckMessageOperation = + new FilterAndAckMessageOperation( + ledgerId, entryId, subscription, duplicate, onComplete); + scheduleOnDispatchThread(subscription, filterAndAckMessageOperation); + }); + } finally { + onComplete.accept(true); } + } + + private static boolean isPersistentSubscriptionWithSelector(Subscription subscription) { + return subscription instanceof PersistentSubscription + && subscription.getSubscriptionProperties().containsKey("jms.selector"); + } - for (Subscription subscription : producer.getTopic().getSubscriptions().values()) { - scheduleOnDispatchThread( - subscription, - () -> { - filterAndAckMessage(producer, ledgerId, entryId, subscription, messageMetadata); - }); + @AllArgsConstructor + private class FilterAndAckMessageOperation implements Runnable { + final long ledgerId; + final long entryId; + final Subscription subscription; + final ByteBuf messageMetadataUnparsed; + final Consumer onComplete; + + @Override + public void run() { + try { + filterAndAckMessage(ledgerId, entryId, subscription, messageMetadataUnparsed); + } finally { + onComplete.accept(false); + } } } private void filterAndAckMessage( - Producer producer, - long ledgerId, - long entryId, - Subscription subscription, - MessageMetadata messageMetadata) { + long ledgerId, long entryId, Subscription subscription, ByteBuf messageMetadataUnparsed) { if (closed.get()) { // the broker is shutting down, we cannot process the entries // this operation has been enqueued before the broker shutdown return; } + MessageMetadata messageMetadata = getMessageMetadata(messageMetadataUnparsed); long now = System.nanoTime(); try { FilterContext filterContext = new FilterContext(); @@ -222,11 +309,17 @@ private void filterAndAckMessage( } } finally { filterProcessingTimeOnProduce - .labels(producer.getTopic().getName(), subscription.getName()) + .labels(subscription.getTopic().getName(), subscription.getName()) .observe(System.nanoTime() - now); } } + private static MessageMetadata getMessageMetadata(ByteBuf messageMetadataUnparsed) { + MessageMetadata messageMetadata = new MessageMetadata(); + messageMetadata.parseFrom(messageMetadataUnparsed, messageMetadataUnparsed.readableBytes()); + return messageMetadata; + } + private static void scheduleOnDispatchThread(Subscription subscription, Runnable runnable) { try { Dispatcher dispatcher = subscription.getDispatcher(); diff --git a/pulsar-jms-integration-tests/pom.xml b/pulsar-jms-integration-tests/pom.xml index 87d06043..e63e59d8 100644 --- a/pulsar-jms-integration-tests/pom.xml +++ b/pulsar-jms-integration-tests/pom.xml @@ -103,8 +103,8 @@ copy filters - - + + diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml index 4befd1d3..a72666bb 100644 --- a/pulsar-jms/pom.xml +++ b/pulsar-jms/pom.xml @@ -128,10 +128,10 @@ copy filters - - - - + + + + diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/CompletableFutureCompletionListener.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/CompletableFutureCompletionListener.java new file mode 100644 index 00000000..faa099fa --- /dev/null +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/CompletableFutureCompletionListener.java @@ -0,0 +1,35 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms; + +import java.util.concurrent.CompletableFuture; +import javax.jms.CompletionListener; +import javax.jms.Message; + +/** Utility class to convert a CompletionListener into a CompletableFuture. */ +public class CompletableFutureCompletionListener extends CompletableFuture + implements CompletionListener { + + @Override + public void onCompletion(Message message) { + complete(message); + } + + @Override + public void onException(Message message, Exception exception) { + completeExceptionally(exception); + } +} diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java index 2872106c..060f25f2 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java @@ -19,17 +19,26 @@ import static org.junit.jupiter.api.Assertions.assertNull; import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import javax.jms.CompletionListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; +import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats; +import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.RegisterExtension; @@ -154,4 +163,76 @@ private void sendMessageReceiveFromQueue(boolean transacted) throws Exception { } } } + + @Test + void testManyMessagesWithPartitions() throws Exception { + Map properties = buildProperties(); + String topicName = "persistent://public/default/test-" + UUID.randomUUID(); + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { + try (PulsarConnection connection = factory.createConnection()) { + connection.start(); + try (PulsarSession session = connection.createSession(Session.AUTO_ACKNOWLEDGE); ) { + factory.getPulsarAdmin().topics().createPartitionedTopic(topicName, 20); + + Queue destination = session.createQueue(topicName); + + try (PulsarMessageConsumer consumer1 = session.createConsumer(destination); ) { + assertEquals( + SubscriptionType.Shared, ((PulsarMessageConsumer) consumer1).getSubscriptionType()); + + String newSelector = "foo='bar'"; + Map subscriptionProperties = new HashMap<>(); + subscriptionProperties.put("jms.selector", newSelector); + subscriptionProperties.put("jms.filtering", "true"); + + pulsarContainer + .getAdmin() + .topics() + .updateSubscriptionProperties(topicName, "jms-queue", subscriptionProperties); + + int numMessages = 10000; + try (MessageProducer producer = session.createProducer(destination); ) { + List futures = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + TextMessage textMessage = session.createTextMessage("foo-" + i); + for (int j = 0; j < 10; j++) { + textMessage.setIntProperty("some" + j, j); + } + // half of the messages pass the filter + if (i % 2 == 0) { + textMessage.setStringProperty("foo", "bar"); + } + CompletableFutureCompletionListener listener = new CompletableFutureCompletionListener(); + futures.add(listener); + producer.send(textMessage, listener); + if (futures.size() == 1000) { + for (CompletableFutureCompletionListener future : futures) { + future.get(); + } + futures.clear(); + } + } + for (CompletableFutureCompletionListener future : futures) { + future.get(); + } + } + + // wait for the filters to be processed in background + Awaitility.await().untilAsserted(() -> { + PartitionedTopicStats partitionedInternalStats = + factory.getPulsarAdmin().topics().getPartitionedStats(topicName, true); + AtomicLong sum = new AtomicLong(); + partitionedInternalStats.getPartitions().forEach((partition, stats) -> { + SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue"); + log.info("backlog for partition {}: {}", partition, subscriptionStats.getMsgBacklog()); + sum.addAndGet(subscriptionStats.getMsgBacklog()); + }); + log.info("total backlog: {}", sum.get()); + assertEquals(numMessages / 2, sum.get()); + }); + } + } + } + } + } }