Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement] Do not throw error for Temporary Topic/Queue creation and deletion without admin privileges #154

Merged
merged 7 commits into from
Nov 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -912,11 +912,7 @@ public TemporaryQueue createTemporaryQueue(PulsarSession session) throws JMSExce
checkNotClosed();
String name =
"persistent://" + factory.getSystemNamespace() + "/jms-temp-queue-" + UUID.randomUUID();
try {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
} catch (Exception err) {
throw Utils.handleException(err);
}
createPulsarTemporaryTopic(name);
PulsarTemporaryQueue res = new PulsarTemporaryQueue(name, session);
temporaryDestinations.add(res);
return res;
Expand All @@ -926,11 +922,7 @@ public TemporaryTopic createTemporaryTopic(PulsarSession session) throws JMSExce
checkNotClosed();
String name =
"persistent://" + factory.getSystemNamespace() + "/jms-temp-topic-" + UUID.randomUUID();
try {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
} catch (Exception err) {
throw Utils.handleException(err);
}
createPulsarTemporaryTopic(name);
PulsarTemporaryTopic res = new PulsarTemporaryTopic(name, session);
temporaryDestinations.add(res);
return res;
Expand Down Expand Up @@ -996,6 +988,22 @@ private ConnectionConsumer buildConnectionConsumer(
return connectionConsumer;
}

private void createPulsarTemporaryTopic(String name) throws JMSException {
try {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
} catch (IllegalStateException err) {
if (!factory.isAllowTemporaryTopicWithoutAdmin()) {
throw Utils.handleException(err);
}
log.warn(
"Skipping creation of nonPartitionedTopic {} as jms.allowTemporaryTopicWithoutAdmin=true",
name,
err);
} catch (Exception err) {
sandeep-ctds marked this conversation as resolved.
Show resolved Hide resolved
throw Utils.handleException(err);
}
}

void refreshServerSideSelectors() {
sessions.forEach(
s -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public class PulsarConnectionFactory
private transient SubscriptionType topicSharedSubscriptionType = SubscriptionType.Shared;
private transient long waitForServerStartupTimeout = 60000;
private transient boolean usePulsarAdmin = true;
private transient boolean allowTemporaryTopicWithoutAdmin = false;
private transient boolean precreateQueueSubscription = true;
private transient int precreateQueueSubscriptionConsumerQueueSize = 0;
private transient boolean initialized;
Expand Down Expand Up @@ -330,6 +331,11 @@ private synchronized void ensureInitialized(String connectUsername, String conne
this.usePulsarAdmin =
Boolean.parseBoolean(getAndRemoveString("jms.usePulsarAdmin", "true", configurationCopy));

this.allowTemporaryTopicWithoutAdmin =
Boolean.parseBoolean(
getAndRemoveString(
"jms.allowTemporaryTopicWithoutAdmin", "false", configurationCopy));

this.precreateQueueSubscription =
Boolean.parseBoolean(
getAndRemoveString("jms.precreateQueueSubscription", "true", configurationCopy));
Expand Down Expand Up @@ -1726,6 +1732,10 @@ public boolean isAcknowledgeRejectedMessages() {
return acknowledgeRejectedMessages;
}

public boolean isAllowTemporaryTopicWithoutAdmin() {
return allowTemporaryTopicWithoutAdmin;
}

public synchronized boolean isClosed() {
return closed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package com.datastax.oss.pulsar.jms;

import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.TopicStats;

Expand All @@ -44,8 +46,20 @@ public final void delete() throws JMSException {
log.info("Deleting {}", this);
String topicName = getInternalTopicName();
String fullQualifiedTopicName = session.getFactory().applySystemNamespace(topicName);
TopicStats stats =
session.getFactory().getPulsarAdmin().topics().getStats(fullQualifiedTopicName);
PulsarAdmin pulsarAdmin;
try {
pulsarAdmin = session.getFactory().getPulsarAdmin();
} catch (IllegalStateException err) {
if (!session.getFactory().isAllowTemporaryTopicWithoutAdmin()) {
throw Utils.handleException(err);
}
log.warn(
"Cannot delete a temporary destination {}. Skipping because jms.allowTemporaryTopicWithoutAdmin=true",
this,
err);
return;
}
TopicStats stats = pulsarAdmin.topics().getStats(fullQualifiedTopicName);
log.info("Stats {}", stats);

int numConsumers =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

@Slf4j
public class TemporaryDestinationsNonAdminTest {

@RegisterExtension
static PulsarContainerExtension pulsarContainer =
new PulsarContainerExtension()
.withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "true")
.withEnv("PULSAR_PREFIX_allowAutoTopicCreationType", "non-partitioned")
.withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false");

@Test
public void allowTemporaryTopicWithoutAdminTest() throws Exception {
Map<String, Object> properties = getJmsProperties();
properties.put("jms.allowTemporaryTopicWithoutAdmin", "true");
useTemporaryDestinationNonAdminTest(properties, false);
}

@Test
public void forbidTemporaryTopicWithoutAdminTest() throws Exception {
Map<String, Object> properties = getJmsProperties();
useTemporaryDestinationNonAdminTest(properties, true);
}

@NotNull
private static Map<String, Object> getJmsProperties() {
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
properties.put("jms.forceDeleteTemporaryDestinations", "true");
properties.put("jms.usePulsarAdmin", "false");
return properties;
}

private void useTemporaryDestinationNonAdminTest(
Map<String, Object> properties, boolean expectAdminErrors) throws Exception {

try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties)) {
try (Connection connection = factory.createConnection()) {
connection.start();
try (Session session = connection.createSession()) {
if (expectAdminErrors) {
assertThrows(JMSException.class, session::createTemporaryTopic);
return;
}
Destination clientAddress = session.createTemporaryTopic();
testProducerAndConsumer(session, clientAddress);
}
}
}
}

private static void testProducerAndConsumer(Session session, Destination clientAddress)
throws JMSException {
try (MessageProducer producerClient = session.createProducer(clientAddress)) {
// subscribe on the temporary queue
try (MessageConsumer consumerClient = session.createConsumer(clientAddress)) {

String testMessage = "message";
// produce a message
producerClient.send(session.createTextMessage(testMessage));

// on the consumer receive the message
Message theResponse = consumerClient.receive();
assertEquals(testMessage, theResponse.getBody(String.class));
}
}
}
}
Loading