diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index e491706d802..d35cad4304e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -682,6 +682,10 @@ default Queue locateQueue(String queueName) { return locateQueue(SimpleString.of(queueName)); } + default Queue locateQueue(String address, String queue) throws Exception { + return null; + } + default BindingQueryResult bindingQuery(SimpleString address) throws Exception { return bindingQuery(address, true); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index eeaf45b258b..383c01feaaf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -111,6 +111,7 @@ import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding; @@ -2385,6 +2386,27 @@ public Queue locateQueue(SimpleString queueName) { return (Queue) binding.getBindable(); } + @Override + public Queue locateQueue(String address, String queue) throws Exception { + Bindings bindings = postOffice.getBindingsForAddress(SimpleString.of(address)); + if (bindings == null) { + return null; + } + + Binding binding = bindings.getBinding(queue); + if (binding == null) { + return null; + } + + Bindable bindingContent = binding.getBindable(); + + if (!(bindingContent instanceof Queue)) { + throw new IllegalStateException("locateQueue should only be used to locate queues"); + } + + return (Queue) bindingContent; + } + @Deprecated @Override public Queue deployQueue(final SimpleString address, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 39b4186acb3..616040bff8f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2129,14 +2129,18 @@ public void expire(final MessageReference ref) throws Exception { * hence no information about delivering statistics should be updated. */ @Override public void expire(final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception { - if (addressSettings.getExpiryAddress() != null) { - createExpiryResources(); + if (logger.isDebugEnabled()) { + logger.debug("Expiry on {}, expiryAddress={}", this.address, addressSettings.getExpiryAddress()); + } + AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage()); + if (settingsToUse.getExpiryAddress() != null) { + createExpiryResources(ref.getMessage().getAddress(), settingsToUse); if (logger.isTraceEnabled()) { logger.trace("moving expired reference {} to address = {} from queue={}", ref, addressSettings.getExpiryAddress(), name); } - move(null, addressSettings.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer, null, delivering); + move(null, settingsToUse.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer, null, delivering); } else { logger.trace("expiry is null, just acking expired message for reference {} from queue={}", ref, name); @@ -2147,10 +2151,71 @@ public void expire(final MessageReference ref, final ServerConsumer consumer, bo refCountForConsumers.check(); if (server != null && server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, addressSettings.getExpiryAddress(), consumer)); + server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, settingsToUse.getExpiryAddress(), consumer)); + } + } + + + AddressSettings getMessageAddressSettings(Message message) { + if (message.getAddress().equals(String.valueOf(address))) { + return addressSettings; + } else { + return server.getAddressSettingsRepository().getMatch(message.getAddress()); } } + private void expire(final Transaction tx, final MessageReference ref, boolean delivering) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Expiry on {}, expiryAddress={}", this.address, addressSettings.getExpiryAddress()); + } + + AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage()); + SimpleString expiryAddress = settingsToUse.getExpiryAddress(); + + if (expiryAddress != null && expiryAddress.length() != 0) { + + createExpiryResources(ref.getMessage().getAddress(), settingsToUse); + + Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress); + + if (bindingList == null || bindingList.getBindings().isEmpty()) { + ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress); + acknowledge(tx, ref, AckReason.EXPIRED, null, delivering); + } else { + move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, null, null, delivering); + } + } else { + if (!printErrorExpiring) { + printErrorExpiring = true; + // print this only once + ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name); + } + + acknowledge(tx, ref, AckReason.EXPIRED, null, delivering); + } + + if (server != null && server.hasBrokerMessagePlugins()) { + ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER); + if (expiryLogger == null) { + expiryLogger = new ExpiryLogger(); + tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger); + tx.addOperation(expiryLogger); + } + + expiryLogger.addExpiry(address, ref); + } + + // potentially auto-delete this queue if this expired the last message + tx.addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + refCountForConsumers.check(); + } + }); + } + + + @Override public SimpleString getExpiryAddress() { return this.addressSettings.getExpiryAddress(); @@ -3844,51 +3909,6 @@ private Message makeCopy(final MessageReference ref, return LargeServerMessageImpl.checkLargeMessage(copy, storageManager); } - private void expire(final Transaction tx, final MessageReference ref, boolean delivering) throws Exception { - SimpleString expiryAddress = addressSettings.getExpiryAddress(); - - if (expiryAddress != null && expiryAddress.length() != 0) { - - createExpiryResources(); - - Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress); - - if (bindingList == null || bindingList.getBindings().isEmpty()) { - ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress); - acknowledge(tx, ref, AckReason.EXPIRED, null, delivering); - } else { - move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, null, null, delivering); - } - } else { - if (!printErrorExpiring) { - printErrorExpiring = true; - // print this only once - ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name); - } - - acknowledge(tx, ref, AckReason.EXPIRED, null, delivering); - } - - if (server != null && server.hasBrokerMessagePlugins()) { - ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER); - if (expiryLogger == null) { - expiryLogger = new ExpiryLogger(); - tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger); - tx.addOperation(expiryLogger); - } - - expiryLogger.addExpiry(address, ref); - } - - // potentially auto-delete this queue if this expired the last message - tx.addOperation(new TransactionOperationAbstract() { - @Override - public void afterCommit(Transaction tx) { - refCountForConsumers.check(); - } - }); - } - private class ExpiryLogger extends TransactionOperationAbstract { List> expiries = new LinkedList<>(); @@ -3947,22 +3967,23 @@ private boolean sendToDeadLetterAddress(final Transaction tx, private void createDeadLetterResources() throws Exception { AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString()); - createResources(addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix()); + createResources(String.valueOf(getAddress()), addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix()); } - private void createExpiryResources() throws Exception { - AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString()); - createResources(addressSettings.isAutoCreateExpiryResources(), addressSettings.getExpiryAddress(), addressSettings.getExpiryQueuePrefix(), addressSettings.getExpiryQueueSuffix()); + private void createExpiryResources(String address, AddressSettings messageAddressSettings) throws Exception { + createResources(address, messageAddressSettings.isAutoCreateExpiryResources(), messageAddressSettings.getExpiryAddress(), messageAddressSettings.getExpiryQueuePrefix(), messageAddressSettings.getExpiryQueueSuffix()); } - private void createResources(boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception { - if (isAutoCreate && !getAddress().equals(destinationAddress)) { + private void createResources(String address, boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception { + if (isAutoCreate && !address.equals(destinationAddress)) { if (destinationAddress != null && destinationAddress.length() != 0) { - SimpleString destinationQueueName = prefix.concat(getAddress()).concat(suffix); + SimpleString destinationQueueName = prefix.concat(address).concat(suffix); SimpleString filter = SimpleString.of(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress())); try { + logger.debug("Creating Resource queue {}", destinationQueueName); server.createQueue(QueueConfiguration.of(destinationQueueName).setAddress(destinationAddress).setFilterString(filter).setAutoCreated(true).setAutoCreateAddress(true), true); } catch (ActiveMQQueueExistsException e) { + logger.debug("resource {} already existed, ignoring outcome", destinationQueueName); // ignore } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java new file mode 100644 index 00000000000..dd1af574ca8 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.expiry; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class ClusteredExpiryTest extends ClusterTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + Queue snfPaused; + + @Test + public void testExpiryOnSNF() throws Exception { + setupServer(0, true, true); + setupServer(1, true, true); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, true, 0, 1); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.STRICT, 1, true, 1, 0); + + servers[0].getConfiguration().setMessageExpiryScanPeriod(10); + + startServers(0, 1); + + servers[0].getAddressSettingsRepository().clear(); + servers[0].getAddressSettingsRepository().addMatch("queues#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry"))); + servers[0].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry"))); + + servers[1].getAddressSettingsRepository().clear(); + servers[1].getAddressSettingsRepository().addMatch("queues#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry"))); + servers[1].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry"))); + + Queue serverQueue0 = servers[0].createQueue(QueueConfiguration.of("queues." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[1].createQueue(QueueConfiguration.of("queues." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[0].createQueue(QueueConfiguration.of("Expiry" + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[1].createQueue(QueueConfiguration.of("Expiry." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + + waitForBindings(0, "queues." + getName(), 1, 0, true); + waitForBindings(1, "queues." + getName(), 1, 0, true); + + waitForBindings(0, "queues." + getName(), 1, 0, false); + waitForBindings(1, "queues." + getName(), 1, 0, false); + + // pausing the SNF queue to keep messages stuck on the queue + servers[0].getPostOffice().getAllBindings().filter(f -> f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue); + assertNotNull(snfPaused); + + long NUMBER_OF_MESSAGES = 100; + + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + ConnectionFactory factory2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61617"); + try (Connection connection = factory.createConnection()) { + Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session1.createProducer(session1.createQueue("queues." + getName())); + producer.setTimeToLive(2_000); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session1.createTextMessage("hello")); + } + session1.commit(); + } + Wait.assertEquals(0L, serverQueue0::getMessageCount, 50_000, 100); + Wait.assertEquals(0L, snfPaused::getMessageCount, 50_000, 100); + Queue expiryQueue = servers[0].locateQueue("Expiry", "EXP.queues." + getName() + ".Expiry"); + assertNotNull(expiryQueue); + Wait.assertEquals(NUMBER_OF_MESSAGES, expiryQueue::getMessageCount, 5000, 100); + + } + + private void pauseQueue(Binding binding) { + assertNull(snfPaused); + if (binding instanceof LocalQueueBinding) { + logger.info("Pausing {}", binding.getUniqueName()); + snfPaused = ((LocalQueueBinding) binding).getQueue(); + snfPaused.pause(); + } + } + +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java new file mode 100644 index 00000000000..a750e7978eb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.server; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSContext; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.CompositeAddress; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class ExpireQueueSuffixTest extends ActiveMQTestBase { + + public final SimpleString queueA = SimpleString.of("queueA"); + public final SimpleString queueB = SimpleString.of("queueB"); + public final SimpleString expiryAddress = SimpleString.of("myExpiry"); + + public final SimpleString expirySuffix = SimpleString.of(".expSuffix"); + public final long EXPIRY_DELAY = 10L; + + private ActiveMQServer server; + + @Override + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + server = createServer(true, true); + server.getConfiguration().setAddressQueueScanPeriod(50L).setMessageExpiryScanPeriod(50L); + + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(EXPIRY_DELAY).setExpiryQueueSuffix(expirySuffix)); + + server.start(); + + server.createQueue(QueueConfiguration.of(queueA).setRoutingType(RoutingType.ANYCAST)); + server.createQueue(QueueConfiguration.of(queueB).setRoutingType(RoutingType.ANYCAST)); + } + + @Test + public void testAutoCreationOfExpiryResources() throws Exception { + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + + long sendA = 7; + long sendB = 11; + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(queueA.toString())); + producer.setTimeToLive(100); + + for (int i = 0; i < sendA; i++) { + producer.send(session.createTextMessage("queueA")); + } + session.commit(); + + producer = session.createProducer(session.createQueue(queueB.toString())); + producer.setTimeToLive(100); + for (int i = 0; i < sendB; i++) { + producer.send(session.createTextMessage("queueB")); + } + session.commit(); + } + + Wait.waitFor(() -> server.locateQueue(expiryAddress.toString(), "EXP." + queueA + expirySuffix) != null, 5000); + Queue expA = server.locateQueue(expiryAddress.toString(), "EXP." + queueA + expirySuffix); + assertNotNull(expA); + + Wait.waitFor(() -> server.locateQueue(expiryAddress.toString(), "EXP." + queueB + expirySuffix) != null, 5000); + Queue expB = server.locateQueue(expiryAddress.toString(), "EXP." + queueB + expirySuffix); + assertNotNull(expB); + + Wait.assertEquals(sendA, expA::getMessageCount, 5000, 100); + Wait.assertEquals(sendB, expB::getMessageCount, 5000, 100); + } +} +