diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java index 2c4ed313c44..8afb103a317 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java @@ -110,19 +110,13 @@ private static String createBody() { Process serverProcess2; public ConnectionFactory createConnectionFactory(int broker, String protocol) { + + int portUsed = 61616 + broker * 100; + if (protocol.equals("CORE")) { - switch (broker) { - // I need the connections stable in the selected server - case 0: - return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000"); - case 1: - return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000"); - default: - logger.warn("undefined argument {}", broker); - throw new IllegalArgumentException("undefined"); - } + return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:" + portUsed + "?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000"); } else { - return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + (61616 + broker * 100) + "?ha=false"); + return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + portUsed); } } @@ -145,52 +139,27 @@ private Process startServer1() throws Exception { @Test public void testLargeMessageAMQPTX() throws Throwable { - testInterrupt("AMQP", true, false); - } - - @Test - public void testLargeMessageAMQPTXKill() throws Throwable { - testInterrupt("AMQP", true, true); + testInterrupt("AMQP", true); } @Test public void testInterruptAMQPNonTX() throws Throwable { - testInterrupt("AMQP", false, false); - } - - @Test - public void testInterruptAMQPNonTXKill() throws Throwable { - testInterrupt("AMQP", false, true); + testInterrupt("AMQP", false); } @Test public void testInterruptCORETX() throws Throwable { - testInterrupt("CORE", true, false); - } - - @Test - public void testInterruptCORETXKill() throws Throwable { - testInterrupt("CORE", true, true); + testInterrupt("CORE", true); } @Test public void testInterruptOPENWIRETX() throws Throwable { - testInterrupt("OPENWIRE", true, false); - } - - @Test - public void testInterruptOPENWIRETXKill() throws Throwable { - testInterrupt("OPENWIRE", true, true); + testInterrupt("OPENWIRE", true); } @Test public void testInterruptCORENonTX() throws Throwable { - testInterrupt("CORE", false, false); - } - - @Test - public void testInterruptCORENonTXKill() throws Throwable { - testInterrupt("CORE", false, true); + testInterrupt("CORE", false); } private CountDownLatch startSendingThreads(Executor executor, String protocol, int broker, int threads, boolean tx, String queueName) { @@ -242,8 +211,7 @@ private CountDownLatch startConsumingThreads(Executor executor, String protocol, for (int i = 0; i < threads; i++) { executor.execute(() -> { int numberOfMessages = 0; - try { - Connection connection = factory.createConnection(); + try (Connection connection = factory.createConnection()) { connection.start(); Session session = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); @@ -265,6 +233,7 @@ private CountDownLatch startConsumingThreads(Executor executor, String protocol, } } } catch (Exception e) { + logger.warn(e.getMessage(), e); } finally { logger.info("Done sending"); done.countDown(); @@ -279,7 +248,7 @@ private CountDownLatch startConsumingThreads(Executor executor, String protocol, // this test has sleeps as the test will send while still active // we keep sending all the time.. so the testInterruptLM acts like a controller telling the threads when to stop - private void testInterrupt(String protocol, boolean tx, boolean useKill) throws Throwable { + private void testInterrupt(String protocol, boolean tx) throws Throwable { final int SENDING_THREADS = 10; final int CONSUMING_THREADS = 10; final AtomicInteger errors = new AtomicInteger(0); // I don't expect many errors since this test is disconnecting and reconnecting the server @@ -289,55 +258,50 @@ private void testInterrupt(String protocol, boolean tx, boolean useKill) throws ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS); runAfter(executorService::shutdownNow); - CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); - CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 0, CONSUMING_THREADS, tx, queueName); - - Thread.sleep(2000); - - killProcess(serverProcess, useKill); - runningSend = false; - runningConsumer = false; - assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS)); - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); - assertTrue(sendDone.await(10, TimeUnit.SECONDS)); - - logger.info("All receivers and senders are done!!!"); - - serverProcess = startServer0(); - - Thread.sleep(2000); - - sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName); - receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); - - killProcess(serverProcess2, useKill); - assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS)); - runningSend = false; - runningConsumer = false; - assertTrue(sendDone.await(1, TimeUnit.MINUTES)); - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); - - serverProcess2 = startServer1(); - - sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName); - receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); - - Thread.sleep(2000); - runningSend = false; - assertTrue(sendDone.await(10, TimeUnit.SECONDS)); - - QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, queueName, queueName, RoutingType.ANYCAST, 5000); - QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, queueName, queueName, RoutingType.ANYCAST, 5000); - File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages"); File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + "/data/large-messages"); - Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0); + { + CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); + CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); + + // let it producing for a while + Thread.sleep(2000); + + runningSend = false; + assertTrue(sendDone.await(1, TimeUnit.MINUTES)); + + killProcess(serverProcess); + assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES)); + serverProcess = startServer0(); + runningConsumer = false; + assertTrue(receiverDone.await(1, TimeUnit.MINUTES)); + + long timeout = System.currentTimeMillis() + 60_000; + + ConnectionFactory factory = createConnectionFactory(1, protocol); + + // This will flush all messages, making sure everything is consumed. + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); + connection.start(); + while (System.currentTimeMillis() < timeout) { + TextMessage message = (TextMessage)consumer.receive(100); + if (message == null) { + if (lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0) { + break; + } + } else { + assertTrue(message.getText().startsWith(largebody)); + } + } + } - runningConsumer = false; - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); + } + + logger.info("All receivers and senders are done!!!"); - // no need to use wait here, the previous check should have checked that already assertEquals(0, lmFolder.listFiles().length); assertEquals(0, lmFolder2.listFiles().length); assertEquals(0, errors.get()); @@ -353,12 +317,8 @@ public void testBridgeFailureCORE() throws Throwable { testInterruptFailOnBridge("CORE", false); } - private void killProcess(Process process, boolean useKill) throws Exception { - if (useKill) { - Runtime.getRuntime().exec("kill -SIGINT " + process.pid()); - } else { - process.destroyForcibly(); - } + private void killProcess(Process process) throws Exception { + process.destroyForcibly(); } @@ -375,26 +335,25 @@ private void testInterruptFailOnBridge(String protocol, boolean tx) throws Throw ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS); runAfter(executorService::shutdownNow); - // only start the sender for a while CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName); Thread.sleep(2000); runningSend = runningConsumer = false; - killProcess(serverProcess, false); - assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES)); - assertTrue(sendDone.await(10, TimeUnit.SECONDS)); + killProcess(serverProcess); + assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES)); + assertTrue(sendDone.await(1, TimeUnit. MINUTES)); sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName); CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName); - killProcess(serverProcess, false); - assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS)); + killProcess(serverProcess); + assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES)); serverProcess = startServer0(); Thread.sleep(5000); runningSend = false; - assertTrue(sendDone.await(10, TimeUnit.SECONDS)); + assertTrue(sendDone.await(1, TimeUnit.MINUTES)); QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, queueName, queueName, RoutingType.ANYCAST, 5000); QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, queueName, queueName, RoutingType.ANYCAST, 5000); @@ -405,7 +364,7 @@ private void testInterruptFailOnBridge(String protocol, boolean tx) throws Throw Wait.assertTrue(() -> queueControl1.getMessageCount() == 0 && queueControl2.getMessageCount() == 0); runningConsumer = false; - assertTrue(receiverDone.await(10, TimeUnit.SECONDS)); + assertTrue(receiverDone.await(1, TimeUnit.MINUTES)); Wait.assertEquals(0, () -> lmFolder.listFiles().length);