Skip to content

Commit

Permalink
ARTEMIS-5148 Simplifying and making ClusteredLargeMessageInterruptTes…
Browse files Browse the repository at this point in the history
…t more reliable
  • Loading branch information
clebertsuconic committed Nov 12, 2024
1 parent 935690a commit 8645575
Showing 1 changed file with 58 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,13 @@ private static String createBody() {
Process serverProcess2;

public ConnectionFactory createConnectionFactory(int broker, String protocol) {

int portUsed = 61616 + broker * 100;

if (protocol.equals("CORE")) {
switch (broker) {
// I need the connections stable in the selected server
case 0:
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
case 1:
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
default:
logger.warn("undefined argument {}", broker);
throw new IllegalArgumentException("undefined");
}
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:" + portUsed + "?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
} else {
return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + (61616 + broker * 100) + "?ha=false");
return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + portUsed);
}
}

Expand All @@ -145,52 +139,27 @@ private Process startServer1() throws Exception {

@Test
public void testLargeMessageAMQPTX() throws Throwable {
testInterrupt("AMQP", true, false);
}

@Test
public void testLargeMessageAMQPTXKill() throws Throwable {
testInterrupt("AMQP", true, true);
testInterrupt("AMQP", true);
}

@Test
public void testInterruptAMQPNonTX() throws Throwable {
testInterrupt("AMQP", false, false);
}

@Test
public void testInterruptAMQPNonTXKill() throws Throwable {
testInterrupt("AMQP", false, true);
testInterrupt("AMQP", false);
}

@Test
public void testInterruptCORETX() throws Throwable {
testInterrupt("CORE", true, false);
}

@Test
public void testInterruptCORETXKill() throws Throwable {
testInterrupt("CORE", true, true);
testInterrupt("CORE", true);
}

@Test
public void testInterruptOPENWIRETX() throws Throwable {
testInterrupt("OPENWIRE", true, false);
}

@Test
public void testInterruptOPENWIRETXKill() throws Throwable {
testInterrupt("OPENWIRE", true, true);
testInterrupt("OPENWIRE", true);
}

@Test
public void testInterruptCORENonTX() throws Throwable {
testInterrupt("CORE", false, false);
}

@Test
public void testInterruptCORENonTXKill() throws Throwable {
testInterrupt("CORE", false, true);
testInterrupt("CORE", false);
}

private CountDownLatch startSendingThreads(Executor executor, String protocol, int broker, int threads, boolean tx, String queueName) {
Expand Down Expand Up @@ -242,8 +211,7 @@ private CountDownLatch startConsumingThreads(Executor executor, String protocol,
for (int i = 0; i < threads; i++) {
executor.execute(() -> {
int numberOfMessages = 0;
try {
Connection connection = factory.createConnection();
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
Expand All @@ -265,6 +233,7 @@ private CountDownLatch startConsumingThreads(Executor executor, String protocol,
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
logger.info("Done sending");
done.countDown();
Expand All @@ -279,7 +248,7 @@ private CountDownLatch startConsumingThreads(Executor executor, String protocol,

// this test has sleeps as the test will send while still active
// we keep sending all the time.. so the testInterruptLM acts like a controller telling the threads when to stop
private void testInterrupt(String protocol, boolean tx, boolean useKill) throws Throwable {
private void testInterrupt(String protocol, boolean tx) throws Throwable {
final int SENDING_THREADS = 10;
final int CONSUMING_THREADS = 10;
final AtomicInteger errors = new AtomicInteger(0); // I don't expect many errors since this test is disconnecting and reconnecting the server
Expand All @@ -289,55 +258,50 @@ private void testInterrupt(String protocol, boolean tx, boolean useKill) throws
ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
runAfter(executorService::shutdownNow);

CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName);
CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 0, CONSUMING_THREADS, tx, queueName);

Thread.sleep(2000);

killProcess(serverProcess, useKill);
runningSend = false;
runningConsumer = false;
assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
assertTrue(sendDone.await(10, TimeUnit.SECONDS));

logger.info("All receivers and senders are done!!!");

serverProcess = startServer0();

Thread.sleep(2000);

sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName);
receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);

killProcess(serverProcess2, useKill);
assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
runningSend = false;
runningConsumer = false;
assertTrue(sendDone.await(1, TimeUnit.MINUTES));
assertTrue(receiverDone.await(10, TimeUnit.SECONDS));

serverProcess2 = startServer1();

sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName);
receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);

Thread.sleep(2000);
runningSend = false;
assertTrue(sendDone.await(10, TimeUnit.SECONDS));

QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, queueName, queueName, RoutingType.ANYCAST, 5000);
QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, queueName, queueName, RoutingType.ANYCAST, 5000);

File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages");
File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + "/data/large-messages");

Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0);
{
CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName);
CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);

// let it producing for a while
Thread.sleep(2000);

runningSend = false;
assertTrue(sendDone.await(10, TimeUnit.SECONDS));

killProcess(serverProcess);
assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
serverProcess = startServer0();
runningConsumer = false;
assertTrue(receiverDone.await(10, TimeUnit.SECONDS));

long timeout = System.currentTimeMillis() + 60_000;

ConnectionFactory factory = createConnectionFactory(1, protocol);

// This will flush all messages, making sure everything is consumed.
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
connection.start();
while (System.currentTimeMillis() < timeout) {
TextMessage message = (TextMessage)consumer.receive(1_000);
if (message == null) {
if (lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0) {
break;
}
} else {
assertTrue(message.getText().startsWith(largebody));
}
}
}

}

runningConsumer = false;
assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
logger.info("All receivers and senders are done!!!");

// no need to use wait here, the previous check should have checked that already
assertEquals(0, lmFolder.listFiles().length);
assertEquals(0, lmFolder2.listFiles().length);
assertEquals(0, errors.get());
Expand All @@ -353,12 +317,8 @@ public void testBridgeFailureCORE() throws Throwable {
testInterruptFailOnBridge("CORE", false);
}

private void killProcess(Process process, boolean useKill) throws Exception {
if (useKill) {
Runtime.getRuntime().exec("kill -SIGINT " + process.pid());
} else {
process.destroyForcibly();
}
private void killProcess(Process process) throws Exception {
process.destroyForcibly();
}


Expand All @@ -375,21 +335,20 @@ private void testInterruptFailOnBridge(String protocol, boolean tx) throws Throw
ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
runAfter(executorService::shutdownNow);

// only start the sender for a while
CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName);

Thread.sleep(2000);

runningSend = runningConsumer = false;

killProcess(serverProcess, false);
assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
killProcess(serverProcess);
assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
assertTrue(sendDone.await(10, TimeUnit.SECONDS));

sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName);
CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);
killProcess(serverProcess, false);
assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
killProcess(serverProcess);
assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
serverProcess = startServer0();

Thread.sleep(5000);
Expand Down

0 comments on commit 8645575

Please sign in to comment.