From a9c78900271d7acbbaaff88952e31a23e8fec5fd Mon Sep 17 00:00:00 2001 From: nikhil-ctds Date: Wed, 30 Oct 2024 20:12:06 +0530 Subject: [PATCH 1/6] Updated Temporary topics and queues create and delete logic to create only when Admin. Responsibility to create and delete will be delegated to the server. --- .../oss/pulsar/jms/PulsarConnection.java | 20 +++++++++---------- .../jms/PulsarTemporaryDestination.java | 10 +++++++++- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java index 593bfdf3..dce1777f 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java @@ -912,11 +912,7 @@ public TemporaryQueue createTemporaryQueue(PulsarSession session) throws JMSExce checkNotClosed(); String name = "persistent://" + factory.getSystemNamespace() + "/jms-temp-queue-" + UUID.randomUUID(); - try { - factory.getPulsarAdmin().topics().createNonPartitionedTopic(name); - } catch (Exception err) { - throw Utils.handleException(err); - } + createNonPartitionedTopic(name); PulsarTemporaryQueue res = new PulsarTemporaryQueue(name, session); temporaryDestinations.add(res); return res; @@ -926,11 +922,7 @@ public TemporaryTopic createTemporaryTopic(PulsarSession session) throws JMSExce checkNotClosed(); String name = "persistent://" + factory.getSystemNamespace() + "/jms-temp-topic-" + UUID.randomUUID(); - try { - factory.getPulsarAdmin().topics().createNonPartitionedTopic(name); - } catch (Exception err) { - throw Utils.handleException(err); - } + createNonPartitionedTopic(name); PulsarTemporaryTopic res = new PulsarTemporaryTopic(name, session); temporaryDestinations.add(res); return res; @@ -996,6 +988,14 @@ private ConnectionConsumer buildConnectionConsumer( return connectionConsumer; } + private void createNonPartitionedTopic(String name) { + try { + factory.getPulsarAdmin().topics().createNonPartitionedTopic(name); + } catch (Exception err) { + log.warn("Skipping creation of nonPartitionedTopic {}", name, err); + } + } + void refreshServerSideSelectors() { sessions.forEach( s -> { diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java index 7d766202..f047e560 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java @@ -18,6 +18,7 @@ import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.TopicStats; @@ -44,8 +45,15 @@ public final void delete() throws JMSException { log.info("Deleting {}", this); String topicName = getInternalTopicName(); String fullQualifiedTopicName = session.getFactory().applySystemNamespace(topicName); + PulsarAdmin pulsarAdmin; + try { + pulsarAdmin = session.getFactory().getPulsarAdmin(); + } catch (Exception e) { + log.warn("Cannot delete a temporary destination {}", this, e); + return; + } TopicStats stats = - session.getFactory().getPulsarAdmin().topics().getStats(fullQualifiedTopicName); + pulsarAdmin.topics().getStats(fullQualifiedTopicName); log.info("Stats {}", stats); int numConsumers = From 1aec59a94abd66275227fc1a54d5cebaef1420bb Mon Sep 17 00:00:00 2001 From: nikhil-ctds Date: Wed, 30 Oct 2024 20:32:08 +0530 Subject: [PATCH 2/6] Formatting change --- .../datastax/oss/pulsar/jms/PulsarTemporaryDestination.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java index f047e560..82aed7d2 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java @@ -52,8 +52,7 @@ public final void delete() throws JMSException { log.warn("Cannot delete a temporary destination {}", this, e); return; } - TopicStats stats = - pulsarAdmin.topics().getStats(fullQualifiedTopicName); + TopicStats stats = pulsarAdmin.topics().getStats(fullQualifiedTopicName); log.info("Stats {}", stats); int numConsumers = From 4d271713d61d96bf397f74fb1a1aa61f5bee06cf Mon Sep 17 00:00:00 2001 From: sandeep-ctds Date: Mon, 4 Nov 2024 18:23:40 +0530 Subject: [PATCH 3/6] Added jms.allowTemporaryTopicWithoutAdmin parameter for allowing temporary topic flow without admin privileges and changed exception handling to be specific. --- .../oss/pulsar/jms/PulsarConnection.java | 16 ++++++++++++---- .../oss/pulsar/jms/PulsarConnectionFactory.java | 10 ++++++++++ .../pulsar/jms/PulsarTemporaryDestination.java | 11 +++++++++-- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java index dce1777f..9ed65c20 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java @@ -912,7 +912,7 @@ public TemporaryQueue createTemporaryQueue(PulsarSession session) throws JMSExce checkNotClosed(); String name = "persistent://" + factory.getSystemNamespace() + "/jms-temp-queue-" + UUID.randomUUID(); - createNonPartitionedTopic(name); + createPulsarTemporaryTopic(name); PulsarTemporaryQueue res = new PulsarTemporaryQueue(name, session); temporaryDestinations.add(res); return res; @@ -922,7 +922,7 @@ public TemporaryTopic createTemporaryTopic(PulsarSession session) throws JMSExce checkNotClosed(); String name = "persistent://" + factory.getSystemNamespace() + "/jms-temp-topic-" + UUID.randomUUID(); - createNonPartitionedTopic(name); + createPulsarTemporaryTopic(name); PulsarTemporaryTopic res = new PulsarTemporaryTopic(name, session); temporaryDestinations.add(res); return res; @@ -988,11 +988,19 @@ private ConnectionConsumer buildConnectionConsumer( return connectionConsumer; } - private void createNonPartitionedTopic(String name) { + private void createPulsarTemporaryTopic(String name) throws JMSException { try { factory.getPulsarAdmin().topics().createNonPartitionedTopic(name); + } catch (IllegalStateException err) { + if (!factory.isAllowTemporaryTopicWithoutAdmin()) { + throw Utils.handleException(err); + } + log.warn( + "Skipping creation of nonPartitionedTopic {} as jms.allowTemporaryTopicWithoutAdmin=true", + name, + err); } catch (Exception err) { - log.warn("Skipping creation of nonPartitionedTopic {}", name, err); + throw Utils.handleException(err); } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index 8cbc0bca..d79c8671 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -148,6 +148,7 @@ public class PulsarConnectionFactory private transient SubscriptionType topicSharedSubscriptionType = SubscriptionType.Shared; private transient long waitForServerStartupTimeout = 60000; private transient boolean usePulsarAdmin = true; + private transient boolean allowTemporaryTopicWithoutAdmin = true; private transient boolean precreateQueueSubscription = true; private transient int precreateQueueSubscriptionConsumerQueueSize = 0; private transient boolean initialized; @@ -330,6 +331,11 @@ private synchronized void ensureInitialized(String connectUsername, String conne this.usePulsarAdmin = Boolean.parseBoolean(getAndRemoveString("jms.usePulsarAdmin", "true", configurationCopy)); + this.allowTemporaryTopicWithoutAdmin = + Boolean.parseBoolean( + getAndRemoveString( + "jms.allowTemporaryTopicWithoutAdmin", "false", configurationCopy)); + this.precreateQueueSubscription = Boolean.parseBoolean( getAndRemoveString("jms.precreateQueueSubscription", "true", configurationCopy)); @@ -1726,6 +1732,10 @@ public boolean isAcknowledgeRejectedMessages() { return acknowledgeRejectedMessages; } + public boolean isAllowTemporaryTopicWithoutAdmin() { + return allowTemporaryTopicWithoutAdmin; + } + public synchronized boolean isClosed() { return closed; } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java index 82aed7d2..9015c75a 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java @@ -15,6 +15,7 @@ */ package com.datastax.oss.pulsar.jms; +import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import lombok.extern.slf4j.Slf4j; @@ -48,8 +49,14 @@ public final void delete() throws JMSException { PulsarAdmin pulsarAdmin; try { pulsarAdmin = session.getFactory().getPulsarAdmin(); - } catch (Exception e) { - log.warn("Cannot delete a temporary destination {}", this, e); + } catch (IllegalStateException err) { + if (!session.getFactory().isAllowTemporaryTopicWithoutAdmin()) { + throw Utils.handleException(err); + } + log.warn( + "Cannot delete a temporary destination {}. Skipping because jms.allowTemporaryTopicWithoutAdmin=true", + this, + err); return; } TopicStats stats = pulsarAdmin.topics().getStats(fullQualifiedTopicName); From 0d8dcd5267144d1c3aec8679523d12498db56283 Mon Sep 17 00:00:00 2001 From: sandeep-ctds Date: Mon, 4 Nov 2024 20:58:04 +0530 Subject: [PATCH 4/6] Added tests for jms.allowTemporaryTopicWithoutAdmin parameter --- .../TemporaryDestinationsNonAdminTest.java | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java new file mode 100644 index 00000000..99e3a7c7 --- /dev/null +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java @@ -0,0 +1,100 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; +import java.util.Map; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +@Slf4j +public class TemporaryDestinationsNonAdminTest { + + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "true") + .withEnv("PULSAR_PREFIX_allowAutoTopicCreationType", "non-partitioned") + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false"); + + @Test + public void allowTemporaryTopicWithoutAdminTest() throws Exception { + Map properties = getJmsProperties(); + properties.put("jms.allowTemporaryTopicWithoutAdmin", "true"); + useTemporaryDestinationNonAdminTest(properties, false); + } + + @Test + public void forbidTemporaryTopicWithoutAdminTest() throws Exception { + Map properties = getJmsProperties(); + useTemporaryDestinationNonAdminTest(properties, true); + } + + @NotNull + private static Map getJmsProperties() { + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("jms.forceDeleteTemporaryDestinations", "true"); + properties.put("jms.usePulsarAdmin", "false"); + return properties; + } + + private void useTemporaryDestinationNonAdminTest(Map properties, boolean expectAdminErrors) + throws Exception { + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties)) { + try (Connection connection = factory.createConnection()) { + connection.start(); + try (Session session = connection.createSession()) { + if (expectAdminErrors) { + assertThrows(IllegalStateException.class, session::createTemporaryTopic); + return; + } + Destination clientAddress = session.createTemporaryTopic(); + testProducerAndConsumer(session, clientAddress); + } + } + } + } + + private static void testProducerAndConsumer(Session session, Destination clientAddress) + throws JMSException { + try (MessageProducer producerClient = session.createProducer(clientAddress)) { + // subscribe on the temporary queue + try (MessageConsumer consumerClient = session.createConsumer(clientAddress)) { + + String testMessage = "message"; + // produce a message + producerClient.send(session.createTextMessage(testMessage)); + + // on the consumer receive the message + Message theResponse = consumerClient.receive(); + assertEquals(testMessage, theResponse.getBody(String.class)); + } + } + } +} From dcd478cd62d4bd1e9e873e75b5d810df7ce512ad Mon Sep 17 00:00:00 2001 From: sandeep-ctds Date: Mon, 4 Nov 2024 22:11:42 +0530 Subject: [PATCH 5/6] Updated test and default value for instance variable. --- .../com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java | 2 +- .../oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index d79c8671..df306905 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -148,7 +148,7 @@ public class PulsarConnectionFactory private transient SubscriptionType topicSharedSubscriptionType = SubscriptionType.Shared; private transient long waitForServerStartupTimeout = 60000; private transient boolean usePulsarAdmin = true; - private transient boolean allowTemporaryTopicWithoutAdmin = true; + private transient boolean allowTemporaryTopicWithoutAdmin = false; private transient boolean precreateQueueSubscription = true; private transient int precreateQueueSubscriptionConsumerQueueSize = 0; private transient boolean initialized; diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java index 99e3a7c7..192b598c 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java @@ -71,7 +71,7 @@ private void useTemporaryDestinationNonAdminTest(Map properties, connection.start(); try (Session session = connection.createSession()) { if (expectAdminErrors) { - assertThrows(IllegalStateException.class, session::createTemporaryTopic); + assertThrows(JMSException.class, session::createTemporaryTopic); return; } Destination clientAddress = session.createTemporaryTopic(); From 889010522b4e3682257b64dff86ffb1eae05308a Mon Sep 17 00:00:00 2001 From: sandeep-ctds Date: Mon, 4 Nov 2024 22:13:42 +0530 Subject: [PATCH 6/6] formatting change --- .../TemporaryDestinationsNonAdminTest.java | 104 +++++++++--------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java index 192b598c..87965adc 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java @@ -17,11 +17,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; + import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.Map; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -35,66 +35,66 @@ @Slf4j public class TemporaryDestinationsNonAdminTest { - @RegisterExtension - static PulsarContainerExtension pulsarContainer = - new PulsarContainerExtension() - .withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "true") - .withEnv("PULSAR_PREFIX_allowAutoTopicCreationType", "non-partitioned") - .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false"); + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "true") + .withEnv("PULSAR_PREFIX_allowAutoTopicCreationType", "non-partitioned") + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false"); - @Test - public void allowTemporaryTopicWithoutAdminTest() throws Exception { - Map properties = getJmsProperties(); - properties.put("jms.allowTemporaryTopicWithoutAdmin", "true"); - useTemporaryDestinationNonAdminTest(properties, false); - } + @Test + public void allowTemporaryTopicWithoutAdminTest() throws Exception { + Map properties = getJmsProperties(); + properties.put("jms.allowTemporaryTopicWithoutAdmin", "true"); + useTemporaryDestinationNonAdminTest(properties, false); + } - @Test - public void forbidTemporaryTopicWithoutAdminTest() throws Exception { - Map properties = getJmsProperties(); - useTemporaryDestinationNonAdminTest(properties, true); - } + @Test + public void forbidTemporaryTopicWithoutAdminTest() throws Exception { + Map properties = getJmsProperties(); + useTemporaryDestinationNonAdminTest(properties, true); + } - @NotNull - private static Map getJmsProperties() { - Map properties = pulsarContainer.buildJMSConnectionProperties(); - properties.put("jms.forceDeleteTemporaryDestinations", "true"); - properties.put("jms.usePulsarAdmin", "false"); - return properties; - } + @NotNull + private static Map getJmsProperties() { + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("jms.forceDeleteTemporaryDestinations", "true"); + properties.put("jms.usePulsarAdmin", "false"); + return properties; + } - private void useTemporaryDestinationNonAdminTest(Map properties, boolean expectAdminErrors) - throws Exception { + private void useTemporaryDestinationNonAdminTest( + Map properties, boolean expectAdminErrors) throws Exception { - try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties)) { - try (Connection connection = factory.createConnection()) { - connection.start(); - try (Session session = connection.createSession()) { - if (expectAdminErrors) { - assertThrows(JMSException.class, session::createTemporaryTopic); - return; - } - Destination clientAddress = session.createTemporaryTopic(); - testProducerAndConsumer(session, clientAddress); - } - } + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties)) { + try (Connection connection = factory.createConnection()) { + connection.start(); + try (Session session = connection.createSession()) { + if (expectAdminErrors) { + assertThrows(JMSException.class, session::createTemporaryTopic); + return; + } + Destination clientAddress = session.createTemporaryTopic(); + testProducerAndConsumer(session, clientAddress); } + } } + } - private static void testProducerAndConsumer(Session session, Destination clientAddress) - throws JMSException { - try (MessageProducer producerClient = session.createProducer(clientAddress)) { - // subscribe on the temporary queue - try (MessageConsumer consumerClient = session.createConsumer(clientAddress)) { + private static void testProducerAndConsumer(Session session, Destination clientAddress) + throws JMSException { + try (MessageProducer producerClient = session.createProducer(clientAddress)) { + // subscribe on the temporary queue + try (MessageConsumer consumerClient = session.createConsumer(clientAddress)) { - String testMessage = "message"; - // produce a message - producerClient.send(session.createTextMessage(testMessage)); + String testMessage = "message"; + // produce a message + producerClient.send(session.createTextMessage(testMessage)); - // on the consumer receive the message - Message theResponse = consumerClient.receive(); - assertEquals(testMessage, theResponse.getBody(String.class)); - } - } + // on the consumer receive the message + Message theResponse = consumerClient.receive(); + assertEquals(testMessage, theResponse.getBody(String.class)); + } } + } }