Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JMSPublishFilter: improve memory footprint and impose a limit #142

Merged
merged 7 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,27 @@
*/
package com.datastax.oss.pulsar.jms.selectors;

import static org.apache.pulsar.common.protocol.Commands.skipBrokerEntryMetadataIfExist;
import static org.apache.pulsar.common.protocol.Commands.skipChecksumIfPresent;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand All @@ -47,7 +55,6 @@
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.protocol.Commands;

@Slf4j
public class JMSPublishFilters implements BrokerInterceptor {
Expand All @@ -68,8 +75,21 @@ public class JMSPublishFilters implements BrokerInterceptor {
.labelNames("topic", "subscription")
.create();

private static final Gauge memoryUsed =
Gauge.build()
.name("pulsar_jmsfilter_processing_memory")
.help("Current memory held by the JMSPublishFilters interceptor")
.create();

private static final Gauge pendingOperations =
Gauge.build()
.name("pulsar_jmsfilter_processing_pending_operations")
.help("Number of pending operations in the JMSPublishFilters interceptor")
.create();

private final JMSFilter filter = new JMSFilter(false);
private boolean enabled = false;
private Semaphore memoryLimit;
private final AtomicBoolean closed = new AtomicBoolean();

private static final Field dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers;
Expand Down Expand Up @@ -110,10 +130,26 @@ public void initialize(PulsarService pulsarService) {
log.info("Registering JMSFilter metrics");
CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnPublish);
CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnProduce);
CollectorRegistry.defaultRegistry.register(memoryUsed);
CollectorRegistry.defaultRegistry.register(pendingOperations);
} catch (IllegalArgumentException alreadyRegistered) {
// ignore
log.info("Filter metrics already registered", alreadyRegistered);
}
String memoryLimitString =
pulsarService
.getConfiguration()
.getProperties()
.getProperty("jmsFiltersOnPublishMaxMemoryMB", "128");

try {
int memoryLimitBytes = Integer.parseInt(memoryLimitString) * 1024 * 1024;
memoryLimit = new Semaphore(memoryLimitBytes);
log.info("jmsFiltersOnPublishMaxMemoryMB={} ({} bytes)", memoryLimitString, memoryLimitBytes);
} catch (NumberFormatException e) {
throw new RuntimeException(
"Invalid memory limit jmsFiltersOnPublishMaxMemoryMB=" + memoryLimitString, e);
}
}

@Override
Expand All @@ -130,20 +166,11 @@ public void onMessagePublish(
long now = System.nanoTime();
try {
for (Subscription subscription : producer.getTopic().getSubscriptions().values()) {
if (!(subscription instanceof PersistentSubscription)) {
if (!(isPersistentSubscriptionWithSelector(subscription))) {
continue;
}
Map<String, String> subscriptionProperties = subscription.getSubscriptionProperties();
if (!subscriptionProperties.containsKey("jms.selector")) {
continue;
}

// we must make a copy because the ByteBuf will be released
MessageMetadata messageMetadata =
new MessageMetadata()
.copyFrom(
Commands.peekMessageMetadata(headersAndPayload, "jms-filter-on-publish", -1));

ByteBuf messageMetadata = copyMessageMetadataAndAcquireMemory(headersAndPayload);
publishContext.setProperty(JMS_FILTER_METADATA, messageMetadata);
// as soon as we find a good reason to apply the filters in messageProduced
// we can exit
Expand All @@ -156,6 +183,24 @@ public void onMessagePublish(
}
}

public ByteBuf copyMessageMetadataAndAcquireMemory(ByteBuf buffer) {
int readerIndex = buffer.readerIndex();
skipBrokerEntryMetadataIfExist(buffer);
skipChecksumIfPresent(buffer);
int metadataSize = (int) buffer.readUnsignedInt();
// this is going to throttle the producer if the memory limit is reached
// please note that this is a blocking operation on the Netty eventpool
// currently we cannot do better than this, as the interceptor API is blocking
memoryLimit.acquireUninterruptibly(metadataSize);
// please note that Netty would probably retain more memory than this buffer
// but this is the best approximation we can do
memoryUsed.inc(metadataSize);
ByteBuf copy = PooledByteBufAllocator.DEFAULT.buffer(metadataSize);
buffer.readBytes(copy);
buffer.readerIndex(readerIndex);
return copy;
}

@Override
public void messageProduced(
ServerCnx cnx,
Expand All @@ -164,38 +209,80 @@ public void messageProduced(
long ledgerId,
long entryId,
Topic.PublishContext publishContext) {
if (!enabled) {
ByteBuf messageMetadataUnparsed = (ByteBuf) publishContext.getProperty(JMS_FILTER_METADATA);
if (messageMetadataUnparsed == null) {
return;
}
MessageMetadata messageMetadata =
(MessageMetadata) publishContext.getProperty(JMS_FILTER_METADATA);
if (messageMetadata == null) {
if (!enabled) {
return;
}
if (messageMetadata.hasNumMessagesInBatch()) {
return;
int memorySize = messageMetadataUnparsed.readableBytes();
AtomicInteger pending = new AtomicInteger(1);
Consumer<Boolean> onComplete =
(mainThread) -> {
if (!mainThread) {
// the main thread doesn't count as a pending operation
pendingOperations.dec();
}
if (pending.decrementAndGet() == 0) {
messageMetadataUnparsed.release();
memoryLimit.release(memorySize);
memoryUsed.dec(memorySize);
}
};
try {
producer
.getTopic()
.getSubscriptions()
.forEach(
(___, subscription) -> {
if (!(isPersistentSubscriptionWithSelector(subscription))) {
return;
}
pending.incrementAndGet();
pendingOperations.inc();
ByteBuf duplicate = messageMetadataUnparsed.duplicate();
FilterAndAckMessageOperation filterAndAckMessageOperation =
new FilterAndAckMessageOperation(
ledgerId, entryId, subscription, duplicate, onComplete);
scheduleOnDispatchThread(subscription, filterAndAckMessageOperation);
});
} finally {
onComplete.accept(true);
}
}

private static boolean isPersistentSubscriptionWithSelector(Subscription subscription) {
return subscription instanceof PersistentSubscription
&& subscription.getSubscriptionProperties().containsKey("jms.selector");
}

for (Subscription subscription : producer.getTopic().getSubscriptions().values()) {
scheduleOnDispatchThread(
subscription,
() -> {
filterAndAckMessage(producer, ledgerId, entryId, subscription, messageMetadata);
});
@AllArgsConstructor
private class FilterAndAckMessageOperation implements Runnable {
final long ledgerId;
final long entryId;
final Subscription subscription;
final ByteBuf messageMetadataUnparsed;
final Consumer<Boolean> onComplete;

@Override
public void run() {
try {
filterAndAckMessage(ledgerId, entryId, subscription, messageMetadataUnparsed);
} finally {
onComplete.accept(false);
}
}
}

private void filterAndAckMessage(
Producer producer,
long ledgerId,
long entryId,
Subscription subscription,
MessageMetadata messageMetadata) {
long ledgerId, long entryId, Subscription subscription, ByteBuf messageMetadataUnparsed) {
if (closed.get()) {
// the broker is shutting down, we cannot process the entries
// this operation has been enqueued before the broker shutdown
return;
}
MessageMetadata messageMetadata = getMessageMetadata(messageMetadataUnparsed);
long now = System.nanoTime();
try {
FilterContext filterContext = new FilterContext();
Expand All @@ -222,11 +309,17 @@ private void filterAndAckMessage(
}
} finally {
filterProcessingTimeOnProduce
.labels(producer.getTopic().getName(), subscription.getName())
.labels(subscription.getTopic().getName(), subscription.getName())
.observe(System.nanoTime() - now);
}
}

private static MessageMetadata getMessageMetadata(ByteBuf messageMetadataUnparsed) {
MessageMetadata messageMetadata = new MessageMetadata();
messageMetadata.parseFrom(messageMetadataUnparsed, messageMetadataUnparsed.readableBytes());
return messageMetadata;
}

private static void scheduleOnDispatchThread(Subscription subscription, Runnable runnable) {
try {
Dispatcher dispatcher = subscription.getDispatcher();
Expand Down
4 changes: 2 additions & 2 deletions pulsar-jms-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
8 changes: 4 additions & 4 deletions pulsar-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<mkdir dir="${project.build.outputDirectory}/interceptors" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<mkdir dir="${project.build.outputDirectory}/interceptors"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms;

import java.util.concurrent.CompletableFuture;
import javax.jms.CompletionListener;
import javax.jms.Message;

/** Utility class to convert a CompletionListener into a CompletableFuture. */
public class CompletableFutureCompletionListener extends CompletableFuture<Message>
implements CompletionListener {

@Override
public void onCompletion(Message message) {
complete(message);
}

@Override
public void onException(Message message, Exception exception) {
completeExceptionally(exception);
}
}
Loading
Loading