Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into master-jakarta
Browse files Browse the repository at this point in the history
  • Loading branch information
mukesh-ctds committed Nov 19, 2024
2 parents 1966914 + ca2713d commit 829d888
Show file tree
Hide file tree
Showing 18 changed files with 159 additions and 27 deletions.
2 changes: 1 addition & 1 deletion activemq-filters/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-jms-parent</artifactId>
<groupId>com.datastax.oss</groupId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-jms-activemq-filters</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion examples/payara-micro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>com.datastax.oss</groupId>
<artifactId>pulsar-jms-parent</artifactId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<properties>
Expand Down
2 changes: 1 addition & 1 deletion examples/spring/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>pulsar-jms-parent</artifactId>
<groupId>com.datastax.oss</groupId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>com.example</groupId>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>com.datastax.oss</groupId>
<artifactId>pulsar-jms-parent</artifactId>
<packaging>pom</packaging>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
<name>DataStax Starlight for JMS</name>
<description>Implementation of the Java Messaging Service Client API around Apache Pulsar Java Client</description>
<url>https://github.com/datastax/pulsar-jms</url>
Expand Down Expand Up @@ -439,7 +439,7 @@ limitations under the License.]]></inlineHeader>
<connection>scm:git:[email protected]:datastax/pulsar-jms.git</connection>
<developerConnection>scm:git:[email protected]:datastax/pulsar-jms.git</developerConnection>
<url>https://github.com/datastax/pulsar-jms</url>
<tag>6.0.0</tag>
<tag>HEAD</tag>
</scm>
<developers>
<developer>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-jms-admin-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-jms-parent</artifactId>
<groupId>com.datastax.oss</groupId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-jms-admin-ext/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-jms-parent</artifactId>
<groupId>com.datastax.oss</groupId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-jms-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-jms-parent</artifactId>
<groupId>com.datastax.oss</groupId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-jms-all</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-jms-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-jms-parent</artifactId>
<groupId>com.datastax.oss</groupId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-jms-filters/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-jms-parent</artifactId>
<groupId>com.datastax.oss</groupId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-jms-filters</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-jms-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-jms-parent</artifactId>
<groupId>com.datastax.oss</groupId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-jms-parent</artifactId>
<groupId>com.datastax.oss</groupId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-jms</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,11 +912,7 @@ public TemporaryQueue createTemporaryQueue(PulsarSession session) throws JMSExce
checkNotClosed();
String name =
"persistent://" + factory.getSystemNamespace() + "/jms-temp-queue-" + UUID.randomUUID();
try {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
} catch (Exception err) {
throw Utils.handleException(err);
}
createPulsarTemporaryTopic(name);
PulsarTemporaryQueue res = new PulsarTemporaryQueue(name, session);
temporaryDestinations.add(res);
return res;
Expand All @@ -926,11 +922,7 @@ public TemporaryTopic createTemporaryTopic(PulsarSession session) throws JMSExce
checkNotClosed();
String name =
"persistent://" + factory.getSystemNamespace() + "/jms-temp-topic-" + UUID.randomUUID();
try {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
} catch (Exception err) {
throw Utils.handleException(err);
}
createPulsarTemporaryTopic(name);
PulsarTemporaryTopic res = new PulsarTemporaryTopic(name, session);
temporaryDestinations.add(res);
return res;
Expand Down Expand Up @@ -996,6 +988,22 @@ private ConnectionConsumer buildConnectionConsumer(
return connectionConsumer;
}

private void createPulsarTemporaryTopic(String name) throws JMSException {
try {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
} catch (IllegalStateException err) {
if (!factory.isAllowTemporaryTopicWithoutAdmin()) {
throw Utils.handleException(err);
}
log.warn(
"Skipping creation of nonPartitionedTopic {} as jms.allowTemporaryTopicWithoutAdmin=true",
name,
err);
} catch (Exception err) {
throw Utils.handleException(err);
}
}

void refreshServerSideSelectors() {
sessions.forEach(
s -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public class PulsarConnectionFactory
private transient SubscriptionType topicSharedSubscriptionType = SubscriptionType.Shared;
private transient long waitForServerStartupTimeout = 60000;
private transient boolean usePulsarAdmin = true;
private transient boolean allowTemporaryTopicWithoutAdmin = false;
private transient boolean precreateQueueSubscription = true;
private transient int precreateQueueSubscriptionConsumerQueueSize = 0;
private transient boolean initialized;
Expand Down Expand Up @@ -330,6 +331,11 @@ private synchronized void ensureInitialized(String connectUsername, String conne
this.usePulsarAdmin =
Boolean.parseBoolean(getAndRemoveString("jms.usePulsarAdmin", "true", configurationCopy));

this.allowTemporaryTopicWithoutAdmin =
Boolean.parseBoolean(
getAndRemoveString(
"jms.allowTemporaryTopicWithoutAdmin", "false", configurationCopy));

this.precreateQueueSubscription =
Boolean.parseBoolean(
getAndRemoveString("jms.precreateQueueSubscription", "true", configurationCopy));
Expand Down Expand Up @@ -1726,6 +1732,10 @@ public boolean isAcknowledgeRejectedMessages() {
return acknowledgeRejectedMessages;
}

public boolean isAllowTemporaryTopicWithoutAdmin() {
return allowTemporaryTopicWithoutAdmin;
}

public synchronized boolean isClosed() {
return closed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package com.datastax.oss.pulsar.jms;

import jakarta.jms.IllegalStateException;
import jakarta.jms.InvalidDestinationException;
import jakarta.jms.JMSException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.TopicStats;

Expand All @@ -44,8 +46,20 @@ public final void delete() throws JMSException {
log.info("Deleting {}", this);
String topicName = getInternalTopicName();
String fullQualifiedTopicName = session.getFactory().applySystemNamespace(topicName);
TopicStats stats =
session.getFactory().getPulsarAdmin().topics().getStats(fullQualifiedTopicName);
PulsarAdmin pulsarAdmin;
try {
pulsarAdmin = session.getFactory().getPulsarAdmin();
} catch (IllegalStateException err) {
if (!session.getFactory().isAllowTemporaryTopicWithoutAdmin()) {
throw Utils.handleException(err);
}
log.warn(
"Cannot delete a temporary destination {}. Skipping because jms.allowTemporaryTopicWithoutAdmin=true",
this,
err);
return;
}
TopicStats stats = pulsarAdmin.topics().getStats(fullQualifiedTopicName);
log.info("Stats {}", stats);

int numConsumers =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

@Slf4j
public class TemporaryDestinationsNonAdminTest {

@RegisterExtension
static PulsarContainerExtension pulsarContainer =
new PulsarContainerExtension()
.withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "true")
.withEnv("PULSAR_PREFIX_allowAutoTopicCreationType", "non-partitioned")
.withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false");

@Test
public void allowTemporaryTopicWithoutAdminTest() throws Exception {
Map<String, Object> properties = getJmsProperties();
properties.put("jms.allowTemporaryTopicWithoutAdmin", "true");
useTemporaryDestinationNonAdminTest(properties, false);
}

@Test
public void forbidTemporaryTopicWithoutAdminTest() throws Exception {
Map<String, Object> properties = getJmsProperties();
useTemporaryDestinationNonAdminTest(properties, true);
}

@NotNull
private static Map<String, Object> getJmsProperties() {
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
properties.put("jms.forceDeleteTemporaryDestinations", "true");
properties.put("jms.usePulsarAdmin", "false");
return properties;
}

private void useTemporaryDestinationNonAdminTest(
Map<String, Object> properties, boolean expectAdminErrors) throws Exception {

try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties)) {
try (Connection connection = factory.createConnection()) {
connection.start();
try (Session session = connection.createSession()) {
if (expectAdminErrors) {
assertThrows(JMSException.class, session::createTemporaryTopic);
return;
}
Destination clientAddress = session.createTemporaryTopic();
testProducerAndConsumer(session, clientAddress);
}
}
}
}

private static void testProducerAndConsumer(Session session, Destination clientAddress)
throws JMSException {
try (MessageProducer producerClient = session.createProducer(clientAddress)) {
// subscribe on the temporary queue
try (MessageConsumer consumerClient = session.createConsumer(clientAddress)) {

String testMessage = "message";
// produce a message
producerClient.send(session.createTextMessage(testMessage));

// on the consumer receive the message
Message theResponse = consumerClient.receive();
assertEquals(testMessage, theResponse.getBody(String.class));
}
}
}
}
2 changes: 1 addition & 1 deletion resource-adapter-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-jms-parent</artifactId>
<groupId>com.datastax.oss</groupId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion resource-adapter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-jms-parent</artifactId>
<groupId>com.datastax.oss</groupId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion tck-executor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-jms-parent</artifactId>
<groupId>com.datastax.oss</groupId>
<version>6.0.1-SNAPSHOT</version>
<version>7.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>tck-executor</artifactId>
Expand Down

0 comments on commit 829d888

Please sign in to comment.