Skip to content

Commit

Permalink
ARTEMIS-5119 Expired Messages on Cluster SNF should to to the origina…
Browse files Browse the repository at this point in the history
…l Expiry Queue
  • Loading branch information
clebertsuconic committed Oct 30, 2024
1 parent 9f1681a commit 47e931a
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,10 @@ default Queue locateQueue(String queueName) {
return locateQueue(SimpleString.of(queueName));
}

default Queue locateQueue(String address, String queue) throws Exception {
return null;
}

default BindingQueryResult bindingQuery(SimpleString address) throws Exception {
return bindingQuery(address, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
Expand Down Expand Up @@ -2385,6 +2386,27 @@ public Queue locateQueue(SimpleString queueName) {
return (Queue) binding.getBindable();
}

@Override
public Queue locateQueue(String address, String queue) throws Exception {
Bindings bindings = postOffice.getBindingsForAddress(SimpleString.of(address));
if (bindings == null) {
return null;
}

Binding binding = bindings.getBinding(queue);
if (binding == null) {
return null;
}

Bindable bindingContent = binding.getBindable();

if (!(bindingContent instanceof Queue)) {
throw new IllegalStateException("locateQueue should only be used to locate queues");
}

return (Queue) bindingContent;
}

@Deprecated
@Override
public Queue deployQueue(final SimpleString address,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2129,14 +2129,18 @@ public void expire(final MessageReference ref) throws Exception {
* hence no information about delivering statistics should be updated. */
@Override
public void expire(final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception {
if (addressSettings.getExpiryAddress() != null) {
createExpiryResources();
if (logger.isDebugEnabled()) {
logger.debug("Expiry on {}, expiryAddress={}", this.address, addressSettings.getExpiryAddress());
}
AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage());
if (settingsToUse.getExpiryAddress() != null) {
createExpiryResources(ref.getMessage().getAddress(), settingsToUse);

if (logger.isTraceEnabled()) {
logger.trace("moving expired reference {} to address = {} from queue={}", ref, addressSettings.getExpiryAddress(), name);
}

move(null, addressSettings.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer, null, delivering);
move(null, settingsToUse.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer, null, delivering);
} else {
logger.trace("expiry is null, just acking expired message for reference {} from queue={}", ref, name);

Expand All @@ -2147,10 +2151,71 @@ public void expire(final MessageReference ref, final ServerConsumer consumer, bo
refCountForConsumers.check();

if (server != null && server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, addressSettings.getExpiryAddress(), consumer));
server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, settingsToUse.getExpiryAddress(), consumer));
}
}


AddressSettings getMessageAddressSettings(Message message) {
if (message.getAddress().equals(String.valueOf(address))) {
return addressSettings;
} else {
return server.getAddressSettingsRepository().getMatch(message.getAddress());
}
}

private void expire(final Transaction tx, final MessageReference ref, boolean delivering) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Expiry on {}, expiryAddress={}", this.address, addressSettings.getExpiryAddress());
}

AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage());
SimpleString expiryAddress = settingsToUse.getExpiryAddress();

if (expiryAddress != null && expiryAddress.length() != 0) {

createExpiryResources(ref.getMessage().getAddress(), settingsToUse);

Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress);

if (bindingList == null || bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
} else {
move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, null, null, delivering);
}
} else {
if (!printErrorExpiring) {
printErrorExpiring = true;
// print this only once
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name);
}

acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
}

if (server != null && server.hasBrokerMessagePlugins()) {
ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
if (expiryLogger == null) {
expiryLogger = new ExpiryLogger();
tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger);
tx.addOperation(expiryLogger);
}

expiryLogger.addExpiry(address, ref);
}

// potentially auto-delete this queue if this expired the last message
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
refCountForConsumers.check();
}
});
}



@Override
public SimpleString getExpiryAddress() {
return this.addressSettings.getExpiryAddress();
Expand Down Expand Up @@ -3844,51 +3909,6 @@ private Message makeCopy(final MessageReference ref,
return LargeServerMessageImpl.checkLargeMessage(copy, storageManager);
}

private void expire(final Transaction tx, final MessageReference ref, boolean delivering) throws Exception {
SimpleString expiryAddress = addressSettings.getExpiryAddress();

if (expiryAddress != null && expiryAddress.length() != 0) {

createExpiryResources();

Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress);

if (bindingList == null || bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
} else {
move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, null, null, delivering);
}
} else {
if (!printErrorExpiring) {
printErrorExpiring = true;
// print this only once
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name);
}

acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
}

if (server != null && server.hasBrokerMessagePlugins()) {
ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
if (expiryLogger == null) {
expiryLogger = new ExpiryLogger();
tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger);
tx.addOperation(expiryLogger);
}

expiryLogger.addExpiry(address, ref);
}

// potentially auto-delete this queue if this expired the last message
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
refCountForConsumers.check();
}
});
}

private class ExpiryLogger extends TransactionOperationAbstract {

List<Pair<SimpleString, MessageReference>> expiries = new LinkedList<>();
Expand Down Expand Up @@ -3947,22 +3967,23 @@ private boolean sendToDeadLetterAddress(final Transaction tx,

private void createDeadLetterResources() throws Exception {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString());
createResources(addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix());
createResources(String.valueOf(getAddress()), addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix());
}

private void createExpiryResources() throws Exception {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString());
createResources(addressSettings.isAutoCreateExpiryResources(), addressSettings.getExpiryAddress(), addressSettings.getExpiryQueuePrefix(), addressSettings.getExpiryQueueSuffix());
private void createExpiryResources(String address, AddressSettings messageAddressSettings) throws Exception {
createResources(address, messageAddressSettings.isAutoCreateExpiryResources(), messageAddressSettings.getExpiryAddress(), messageAddressSettings.getExpiryQueuePrefix(), messageAddressSettings.getExpiryQueueSuffix());
}

private void createResources(boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception {
if (isAutoCreate && !getAddress().equals(destinationAddress)) {
private void createResources(String address, boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception {
if (isAutoCreate && !address.equals(destinationAddress)) {
if (destinationAddress != null && destinationAddress.length() != 0) {
SimpleString destinationQueueName = prefix.concat(getAddress()).concat(suffix);
SimpleString destinationQueueName = prefix.concat(address).concat(suffix);
SimpleString filter = SimpleString.of(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress()));
try {
logger.debug("Creating Resource queue {}", destinationQueueName);
server.createQueue(QueueConfiguration.of(destinationQueueName).setAddress(destinationAddress).setFilterString(filter).setAutoCreated(true).setAutoCreateAddress(true), true);
} catch (ActiveMQQueueExistsException e) {
logger.debug("resource {} already existed, ignoring outcome", destinationQueueName);
// ignore
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.expiry;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.lang.invoke.MethodHandles;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

public class ClusteredExpiryTest extends ClusterTestBase {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Queue snfPaused;

@Test
public void testExpiryOnSNF() throws Exception {
setupServer(0, true, true);
setupServer(1, true, true);

setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, true, 0, 1);

setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.STRICT, 1, true, 1, 0);

servers[0].getConfiguration().setMessageExpiryScanPeriod(10);

startServers(0, 1);

servers[0].getAddressSettingsRepository().clear();
servers[0].getAddressSettingsRepository().addMatch("queues#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
servers[0].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));

servers[1].getAddressSettingsRepository().clear();
servers[1].getAddressSettingsRepository().addMatch("queues#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
servers[1].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));

Queue serverQueue0 = servers[0].createQueue(QueueConfiguration.of("queues." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
servers[1].createQueue(QueueConfiguration.of("queues." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
servers[0].createQueue(QueueConfiguration.of("Expiry" + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
servers[1].createQueue(QueueConfiguration.of("Expiry." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));

waitForBindings(0, "queues." + getName(), 1, 0, true);
waitForBindings(1, "queues." + getName(), 1, 0, true);

waitForBindings(0, "queues." + getName(), 1, 0, false);
waitForBindings(1, "queues." + getName(), 1, 0, false);

// pausing the SNF queue to keep messages stuck on the queue
servers[0].getPostOffice().getAllBindings().filter(f -> f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue);
assertNotNull(snfPaused);

long NUMBER_OF_MESSAGES = 100;

ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
ConnectionFactory factory2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61617");
try (Connection connection = factory.createConnection()) {
Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session1.createProducer(session1.createQueue("queues." + getName()));
producer.setTimeToLive(2_000);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session1.createTextMessage("hello"));
}
session1.commit();
}
Wait.assertEquals(0L, serverQueue0::getMessageCount, 50_000, 100);
Wait.assertEquals(0L, snfPaused::getMessageCount, 50_000, 100);
Queue expiryQueue = servers[0].locateQueue("Expiry", "EXP.queues." + getName() + ".Expiry");
assertNotNull(expiryQueue);
Wait.assertEquals(NUMBER_OF_MESSAGES, expiryQueue::getMessageCount, 5000, 100);

}

private void pauseQueue(Binding binding) {
assertNull(snfPaused);
if (binding instanceof LocalQueueBinding) {
logger.info("Pausing {}", binding.getUniqueName());
snfPaused = ((LocalQueueBinding) binding).getQueue();
snfPaused.pause();
}
}

}
Loading

0 comments on commit 47e931a

Please sign in to comment.