Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-5148 Simplifying and making ClusteredLargeMessageInterruptTest more reliable #5338

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,13 @@ private static String createBody() {
Process serverProcess2;

public ConnectionFactory createConnectionFactory(int broker, String protocol) {

int portUsed = 61616 + broker * 100;

if (protocol.equals("CORE")) {
switch (broker) {
// I need the connections stable in the selected server
case 0:
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
case 1:
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
default:
logger.warn("undefined argument {}", broker);
throw new IllegalArgumentException("undefined");
}
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:" + portUsed + "?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
} else {
return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + (61616 + broker * 100) + "?ha=false");
return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + portUsed);
}
}

Expand All @@ -145,52 +139,27 @@ private Process startServer1() throws Exception {

@Test
public void testLargeMessageAMQPTX() throws Throwable {
testInterrupt("AMQP", true, false);
}

@Test
public void testLargeMessageAMQPTXKill() throws Throwable {
testInterrupt("AMQP", true, true);
testInterrupt("AMQP", true);
}

@Test
public void testInterruptAMQPNonTX() throws Throwable {
testInterrupt("AMQP", false, false);
}

@Test
public void testInterruptAMQPNonTXKill() throws Throwable {
testInterrupt("AMQP", false, true);
testInterrupt("AMQP", false);
}

@Test
public void testInterruptCORETX() throws Throwable {
testInterrupt("CORE", true, false);
}

@Test
public void testInterruptCORETXKill() throws Throwable {
testInterrupt("CORE", true, true);
testInterrupt("CORE", true);
}

@Test
public void testInterruptOPENWIRETX() throws Throwable {
testInterrupt("OPENWIRE", true, false);
}

@Test
public void testInterruptOPENWIRETXKill() throws Throwable {
testInterrupt("OPENWIRE", true, true);
testInterrupt("OPENWIRE", true);
}

@Test
public void testInterruptCORENonTX() throws Throwable {
testInterrupt("CORE", false, false);
}

@Test
public void testInterruptCORENonTXKill() throws Throwable {
testInterrupt("CORE", false, true);
testInterrupt("CORE", false);
}

private CountDownLatch startSendingThreads(Executor executor, String protocol, int broker, int threads, boolean tx, String queueName) {
Expand Down Expand Up @@ -242,8 +211,7 @@ private CountDownLatch startConsumingThreads(Executor executor, String protocol,
for (int i = 0; i < threads; i++) {
executor.execute(() -> {
int numberOfMessages = 0;
try {
Connection connection = factory.createConnection();
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
Expand All @@ -265,6 +233,7 @@ private CountDownLatch startConsumingThreads(Executor executor, String protocol,
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
logger.info("Done sending");
done.countDown();
Expand All @@ -279,7 +248,7 @@ private CountDownLatch startConsumingThreads(Executor executor, String protocol,

// this test has sleeps as the test will send while still active
// we keep sending all the time.. so the testInterruptLM acts like a controller telling the threads when to stop
private void testInterrupt(String protocol, boolean tx, boolean useKill) throws Throwable {
private void testInterrupt(String protocol, boolean tx) throws Throwable {
final int SENDING_THREADS = 10;
final int CONSUMING_THREADS = 10;
final AtomicInteger errors = new AtomicInteger(0); // I don't expect many errors since this test is disconnecting and reconnecting the server
Expand All @@ -289,55 +258,50 @@ private void testInterrupt(String protocol, boolean tx, boolean useKill) throws
ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
runAfter(executorService::shutdownNow);

CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName);
CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 0, CONSUMING_THREADS, tx, queueName);

Thread.sleep(2000);

killProcess(serverProcess, useKill);
runningSend = false;
runningConsumer = false;
assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
assertTrue(sendDone.await(10, TimeUnit.SECONDS));

logger.info("All receivers and senders are done!!!");

serverProcess = startServer0();

Thread.sleep(2000);

sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName);
receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);

killProcess(serverProcess2, useKill);
assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
runningSend = false;
runningConsumer = false;
assertTrue(sendDone.await(1, TimeUnit.MINUTES));
assertTrue(receiverDone.await(10, TimeUnit.SECONDS));

serverProcess2 = startServer1();

sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName);
receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);

Thread.sleep(2000);
runningSend = false;
assertTrue(sendDone.await(10, TimeUnit.SECONDS));

QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, queueName, queueName, RoutingType.ANYCAST, 5000);
QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, queueName, queueName, RoutingType.ANYCAST, 5000);

File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages");
File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + "/data/large-messages");

Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0);
{
CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName);
CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);

// let it producing for a while
Thread.sleep(2000);

runningSend = false;
assertTrue(sendDone.await(1, TimeUnit.MINUTES));

killProcess(serverProcess);
assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
clebertsuconic marked this conversation as resolved.
Show resolved Hide resolved
serverProcess = startServer0();
runningConsumer = false;
assertTrue(receiverDone.await(1, TimeUnit.MINUTES));

long timeout = System.currentTimeMillis() + 60_000;

ConnectionFactory factory = createConnectionFactory(1, protocol);

// This will flush all messages, making sure everything is consumed.
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
connection.start();
while (System.currentTimeMillis() < timeout) {
TextMessage message = (TextMessage)consumer.receive(100);
if (message == null) {
if (lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0) {
break;
}
} else {
assertTrue(message.getText().startsWith(largebody));
}
}
}

runningConsumer = false;
assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
}

logger.info("All receivers and senders are done!!!");

// no need to use wait here, the previous check should have checked that already
assertEquals(0, lmFolder.listFiles().length);
assertEquals(0, lmFolder2.listFiles().length);
assertEquals(0, errors.get());
Expand All @@ -353,12 +317,8 @@ public void testBridgeFailureCORE() throws Throwable {
testInterruptFailOnBridge("CORE", false);
}

private void killProcess(Process process, boolean useKill) throws Exception {
if (useKill) {
Runtime.getRuntime().exec("kill -SIGINT " + process.pid());
} else {
process.destroyForcibly();
}
private void killProcess(Process process) throws Exception {
process.destroyForcibly();
}


Expand All @@ -375,26 +335,25 @@ private void testInterruptFailOnBridge(String protocol, boolean tx) throws Throw
ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
runAfter(executorService::shutdownNow);

// only start the sender for a while
CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName);

Thread.sleep(2000);

runningSend = runningConsumer = false;

killProcess(serverProcess, false);
assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
assertTrue(sendDone.await(10, TimeUnit.SECONDS));
killProcess(serverProcess);
assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
clebertsuconic marked this conversation as resolved.
Show resolved Hide resolved
assertTrue(sendDone.await(1, TimeUnit. MINUTES));

sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName);
CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);
killProcess(serverProcess, false);
assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
killProcess(serverProcess);
assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
clebertsuconic marked this conversation as resolved.
Show resolved Hide resolved
serverProcess = startServer0();

Thread.sleep(5000);
runningSend = false;
assertTrue(sendDone.await(10, TimeUnit.SECONDS));
assertTrue(sendDone.await(1, TimeUnit.MINUTES));

QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, queueName, queueName, RoutingType.ANYCAST, 5000);
QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, queueName, queueName, RoutingType.ANYCAST, 5000);
Expand All @@ -405,7 +364,7 @@ private void testInterruptFailOnBridge(String protocol, boolean tx) throws Throw
Wait.assertTrue(() -> queueControl1.getMessageCount() == 0 && queueControl2.getMessageCount() == 0);

runningConsumer = false;
assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
assertTrue(receiverDone.await(1, TimeUnit.MINUTES));


Wait.assertEquals(0, () -> lmFolder.listFiles().length);
Expand Down
Loading