Skip to content

Commit

Permalink
ARTEMIS-4954 AddressControl.pause() can pause the snf queue
Browse files Browse the repository at this point in the history
  • Loading branch information
howardgao authored and clebertsuconic committed Jul 25, 2024
1 parent 284ce80 commit 096a869
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.impl;

import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import org.apache.activemq.artemis.json.JsonNumber;
Expand All @@ -35,7 +36,6 @@
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
Expand Down Expand Up @@ -216,8 +216,8 @@ public synchronized void reloadPause(long recordID) {
Bindings bindings = postOffice.lookupBindingsForAddress(this.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
((QueueBinding) binding).getQueue().pause(false);
if (binding instanceof LocalQueueBinding) {
((LocalQueueBinding) binding).getQueue().pause(false);
}
}
}
Expand Down Expand Up @@ -250,8 +250,8 @@ public synchronized void pause(boolean persist) {
Bindings bindings = postOffice.lookupBindingsForAddress(this.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
((QueueBinding) binding).getQueue().pause(false);
if (binding instanceof LocalQueueBinding) {
((LocalQueueBinding) binding).getQueue().pause(false);
}
}
}
Expand All @@ -278,8 +278,8 @@ public synchronized void resume() {
Bindings bindings = postOffice.lookupBindingsForAddress(this.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
((QueueBinding) binding).getQueue().resume();
if (binding instanceof LocalQueueBinding) {
((LocalQueueBinding) binding).getQueue().resume();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.apache.activemq.artemis.tests.integration.cluster.bridge;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
Expand All @@ -28,17 +30,26 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -305,6 +316,102 @@ public void testClusterBridgeAddRemoteBinding() throws Exception {
stopServers(0, 1);
}

@Test
public void testPauseAddressBlockingSnFQueue() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());

setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);

setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);

AddressSettings addressSettings = new AddressSettings();
addressSettings.setRedistributionDelay(0);

servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);

startServers(0, 1);

setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());

ClientSession session0 = sfs[0].createSession();
ClientSession session1 = sfs[1].createSession();

session0.start();
session1.start();

createQueue(0, "queues.testaddress", "queue1", null, true);
createQueue(1, "queues.testaddress", "queue1", null, true);
ClientConsumer consumer1 = session1.createConsumer("queue1");

waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);

waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 1, 0, false);

final int num = 10;
//normal message flow should work
ClientProducer goodProducer0 = session0.createProducer("queues.testaddress");
for (int i = 0; i < num; i++) {
Message msg = session0.createMessage(true);
msg.putStringProperty("origin", "from producer 0");
goodProducer0.send(msg);
}

//consumer1 can receive from node0
for (int i = 0; i < num; i++) {
ClientMessage m = consumer1.receive(5000);
assertNotNull(m);
String propValue = m.getStringProperty("origin");
assertEquals("from producer 0", propValue);
m.acknowledge();
}
assertNull(consumer1.receiveImmediate());

//pause address from node0
String addressControlResourceName = ResourceNames.ADDRESS + "queues.testaddress";
Object resource = servers[0].getManagementService().getResource(addressControlResourceName);
AddressControl addressControl0 = (AddressControl) resource;
addressControl0.pause();

Bindings bindings0 = servers[0].getPostOffice().getBindingsForAddress(SimpleString.of("queues.testaddress"));
assertNotNull(bindings0);
assertEquals(2, bindings0.getBindings().size());
boolean localBindingPaused = false;
boolean remoteBindingPaused = true;
for (Binding bd : bindings0.getBindings()) {
if (bd instanceof LocalQueueBinding) {
localBindingPaused = ((LocalQueueBinding)bd).getQueue().isPaused();
}
if (bd instanceof RemoteQueueBinding) {
remoteBindingPaused = ((RemoteQueueBinding)bd).getQueue().isPaused();
}
}
assertTrue(localBindingPaused);
assertFalse(remoteBindingPaused);

//now message should flow to node 1 regardless of the pause
for (int i = 0; i < num; i++) {
Message msg = session0.createMessage(true);
msg.putStringProperty("origin", "from producer 0");
goodProducer0.send(msg);
}

//consumer1 can receive from node0
for (int i = 0; i < num; i++) {
ClientMessage m = consumer1.receive(5000);
assertNotNull(m);
String propValue = m.getStringProperty("origin");
assertEquals("from producer 0", propValue);
m.acknowledge();
}
assertNull(consumer1.receiveImmediate());

stopServers(0, 1);
}

@Override
@AfterEach
Expand Down

0 comments on commit 096a869

Please sign in to comment.