Skip to content

Commit

Permalink
JMS Selectors: apply selectors on the write path (initial prototype) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Apr 11, 2024
1 parent 5041029 commit 2bd0fd9
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms.selectors;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.protocol.Commands;

@Slf4j
public class JMSPublishFilters implements BrokerInterceptor {
private static final String JMS_FILTERED_PROPERTY = "jms-filtered";
private final JMSFilter filter = new JMSFilter();
private boolean enabled = false;

@Override
public void initialize(PulsarService pulsarService) {
enabled =
Boolean.parseBoolean(
pulsarService
.getConfiguration()
.getProperties()
.getProperty("jmsApplyFiltersOnPublish", "true"));
log.info("jmsApplyFiltersOnPublish={}", enabled);
}

@Override
public void onMessagePublish(
Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) {
if (!enabled) {
return;
}
if (publishContext.isMarkerMessage()
|| publishContext.isChunked()
|| publishContext.getNumberOfMessages() > 1) {
return;
}

MessageMetadata messageMetadata =
Commands.peekMessageMetadata(headersAndPayload, "jms-filter-on-publish", -1);
if (messageMetadata.hasNumMessagesInBatch()) {
return;
}
producer
.getTopic()
.getSubscriptions()
.forEach(
(name, subscription) -> {
if (!(subscription instanceof PersistentSubscription)) {
return;
}
Map<String, String> subscriptionProperties = subscription.getSubscriptionProperties();
if (!subscriptionProperties.containsKey("jms.selector")) {
return;
}
FilterContext filterContext = new FilterContext();
filterContext.setSubscription(subscription);
filterContext.setMsgMetadata(messageMetadata);
filterContext.setConsumer(null);
Entry entry = null; // we would need the Entry only in case of batch messages
EntryFilter.FilterResult filterResult = filter.filterEntry(entry, filterContext);
if (filterResult == EntryFilter.FilterResult.REJECT) {
String property = "filter-result-" + name + "@" + subscription.getTopicName();
publishContext.setProperty(property, filterResult);
publishContext.setProperty(JMS_FILTERED_PROPERTY, true);
}
});
}

@Override
public void messageProduced(
ServerCnx cnx,
Producer producer,
long startTimeNs,
long ledgerId,
long entryId,
Topic.PublishContext publishContext) {
if (!enabled || publishContext.getProperty(JMS_FILTERED_PROPERTY) == null) {
return;
}
producer
.getTopic()
.getSubscriptions()
.forEach(
(name, subscription) -> {
String property = "filter-result-" + name + "@" + subscription.getTopicName();
EntryFilter.FilterResult filterResult =
(EntryFilter.FilterResult) publishContext.getProperty(property);
if (filterResult == EntryFilter.FilterResult.REJECT) {
if (log.isDebugEnabled()) {
log.debug("Reject message {}:{} for subscription {}", ledgerId, entryId, name);
}
// ir is possible that calling this method in this thread may affect performance
// let's keep it simple for now, we can optimize it later
subscription.acknowledgeMessage(
Collections.singletonList(new PositionImpl(ledgerId, entryId)),
CommandAck.AckType.Individual,
null);
}
});
}

@Override
public void close() {
filter.close();
}

@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {}

@Override
public void onConnectionClosed(ServerCnx cnx) {}

@Override
public void onWebserviceRequest(ServletRequest request)
throws IOException, ServletException, InterceptException {}

@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response)
throws IOException, ServletException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
interceptorClass: com.datastax.oss.pulsar.jms.selectors.JMSPublishFilters
name: jms-publish-filters
description: Starlight for JMS - support for server side filters on the publish path
4 changes: 2 additions & 2 deletions pulsar-jms-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
6 changes: 4 additions & 2 deletions pulsar-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<mkdir dir="${project.build.outputDirectory}/interceptors"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;

@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class JMSPublishFiltersTest {

@RegisterExtension
static PulsarContainerExtension pulsarContainer =
new PulsarContainerExtension()
.withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false")
.withEnv("PULSAR_PREFIX_brokerInterceptorsDirectory", "/pulsar/interceptors")
.withEnv("PULSAR_PREFIX_brokerInterceptors", "jms-publish-filters")
.withEnv("PULSAR_PREFIX_jmsApplyFiltersOnPublish", "true")
.withLogContainerOutput(true);

private Map<String, Object> buildProperties() {
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
properties.put("jms.useServerSideFiltering", true);
properties.put("jms.enableClientSideEmulation", false);

Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put("batchingEnabled", false);
properties.put("producerConfig", producerConfig);
return properties;
}

@Test
public void sendMessageReceiveFromQueue() throws Exception {
Map<String, Object> properties = buildProperties();

String topicName = "persistent://public/default/test-" + UUID.randomUUID();
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) {
try (PulsarConnection connection = factory.createConnection()) {
connection.start();
try (PulsarSession session = connection.createSession(); ) {
Queue destination = session.createQueue(topicName);

try (PulsarMessageConsumer consumer1 = session.createConsumer(destination); ) {
assertEquals(
SubscriptionType.Shared, ((PulsarMessageConsumer) consumer1).getSubscriptionType());

String newSelector = "lastMessage=TRUE";
Map<String, String> subscriptionProperties = new HashMap<>();
subscriptionProperties.put("jms.selector", newSelector);
subscriptionProperties.put("jms.filtering", "true");

pulsarContainer
.getAdmin()
.topics()
.updateSubscriptionProperties(topicName, "jms-queue", subscriptionProperties);

try (MessageProducer producer = session.createProducer(destination); ) {
for (int i = 0; i < 10; i++) {
TextMessage textMessage = session.createTextMessage("foo-" + i);
if (i == 9) {
textMessage.setBooleanProperty("lastMessage", true);
}
producer.send(textMessage);
}
}

TextMessage textMessage = (TextMessage) consumer1.receive();
assertEquals("foo-9", textMessage.getText());

assertEquals(1, consumer1.getReceivedMessages());
assertEquals(0, consumer1.getSkippedMessages());

// no more messages
assertNull(consumer1.receiveNoWait());

// ensure that the filter didn't reject any message while dispatching to the consumer
// because the filter has been already applied on the write path
TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName);
SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue");
assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 1);
assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0);
assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1);
}

// create a message that doesn't match the filter
// verify that the back log is accurate (0)

try (MessageProducer producer = session.createProducer(destination); ) {
TextMessage textMessage = session.createTextMessage("backlog");
producer.send(textMessage);

TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName);
SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue");
assertEquals(0, subscriptionStats.getMsgBacklog());
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,8 @@ public void testConsumerPriorityQueue(String mapping) throws Exception {
try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); ) {
Queue destination = session.createQueue("test-" + UUID.randomUUID());

pulsarContainer.getAdmin()
pulsarContainer
.getAdmin()
.topics()
.createPartitionedTopic(factory.getPulsarTopicName(destination), 10);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@

@Slf4j
public class PulsarContainerExtension implements BeforeAllCallback, AfterAllCallback {
public static final String PULSAR_IMAGE = "apachepulsar/pulsar:3.0.0";
public static final String PULSAR_IMAGE = "datastax/lunastreaming:3.1_3.1";
private PulsarContainer pulsarContainer;
private Consumer<PulsarContainerExtension> onContainerReady;
private Map<String, String> env = new HashMap<>();

private Network network;

private PulsarAdmin admin;
private boolean logContainerOutput = false;

public PulsarContainerExtension() {
env.put("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true");
Expand Down Expand Up @@ -72,8 +73,11 @@ public void afterAll(ExtensionContext extensionContext) {
public void beforeAll(ExtensionContext extensionContext) {
network = Network.newNetwork();
CountDownLatch pulsarReady = new CountDownLatch(1);
log.info("ENV: {}", env);
pulsarContainer =
new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE))
new PulsarContainer(
DockerImageName.parse(PULSAR_IMAGE)
.asCompatibleSubstituteFor("apachepulsar/pulsar"))
.withNetwork(network)
.withEnv(env)
.withLogConsumer(
Expand All @@ -82,10 +86,16 @@ public void beforeAll(ExtensionContext extensionContext) {
if (text.contains("messaging service is ready")) {
pulsarReady.countDown();
}
log.debug(text);
if (logContainerOutput) {
log.info(text);
} else {
log.debug(text);
}
})
.withCopyFileToContainer(
MountableFile.forHostPath("target/classes/filters"), "/pulsar/filters");
MountableFile.forHostPath("target/classes/filters"), "/pulsar/filters")
.withCopyFileToContainer(
MountableFile.forHostPath("target/classes/interceptors"), "/pulsar/interceptors");
// start Pulsar and wait for it to be ready to accept requests
pulsarContainer.start();
assertTrue(pulsarReady.await(1, TimeUnit.MINUTES));
Expand All @@ -104,6 +114,11 @@ public PulsarContainerExtension withOnContainerReady(
return this;
}

public PulsarContainerExtension withLogContainerOutput(boolean logContainerOutput) {
this.logContainerOutput = logContainerOutput;
return this;
}

public PulsarContainerExtension withEnv(String key, String value) {
this.env.put(key, value);
return this;
Expand Down

0 comments on commit 2bd0fd9

Please sign in to comment.