diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 5bf29c32c53e..982c6e388053 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -1096,6 +1096,7 @@ public boolean checkMemory(boolean runOnFailure, Runnable runWhenAvailableParame if (isFull()) { if (runOnFailure && runWhenAvailable != null) { addToBlockList(runWhenAvailable, blockedCallback); + pagingManager.addBlockedStore(this); } return false; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java index bf260770e488..34db6e7e3f2d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java @@ -19,22 +19,57 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.lang.invoke.MethodHandles; import java.net.URI; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter; +import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension; +import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters; +import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +@ExtendWith(ParameterizedTestExtension.class) public class GlobalDiskFullTest extends AmqpClientTestSupport { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Parameter(index = 0) + public AddressFullMessagePolicy addressFullPolicy; + + @Parameters(name = "addressFullPolicy={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {AddressFullMessagePolicy.FAIL}, {AddressFullMessagePolicy.DROP}, {AddressFullMessagePolicy.PAGE} + }); + } + + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAddressFullMessagePolicy(addressFullPolicy); + server.getConfiguration().addAddressSetting(getQueueName(), addressSettings); + } @Override protected void addConfiguration(ActiveMQServer server) { @@ -42,15 +77,41 @@ protected void addConfiguration(ActiveMQServer server) { serverConfig.setDiskScanPeriod(100); } - @Test + @TestTemplate public void testProducerOnDiskFull() throws Exception { - FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0); - final CountDownLatch latch = new CountDownLatch(1); + + FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor(); + + AtomicBoolean diskUsageOk = new AtomicBoolean(true); + AtomicInteger checkValid = new AtomicInteger(0); + monitor.addCallback((usableSpace, totalSpace, ok, type) -> { - latch.countDown(); + + if (checkValid.get() == -1) { + return; + } + + checkValid.incrementAndGet(); + + double usage = FileStoreMonitor.calculateUsage(usableSpace, totalSpace); + + if (type == FileStoreMonitor.FileStoreMonitorType.MaxDiskUsage && ok && usage < monitor.getMaxUsage()) { + diskUsageOk.set(true); + } else if (type == FileStoreMonitor.FileStoreMonitorType.MaxDiskUsage && !ok && usage >= monitor.getMaxUsage()) { + diskUsageOk.set(false); + } else { + logger.warn("invalid state, usableSpace: {}, totalSpace: {}, ok: {}, type: {}", usableSpace, totalSpace, ok, type); + checkValid.set(-1); + } }); - assertTrue(latch.await(1, TimeUnit.MINUTES)); + Wait.assertTrue(() -> checkValid.get() > 0, 1000); + Wait.assertTrue(() -> diskUsageOk.get(), 1000); + + monitor.setMaxUsage(0.0); + + Wait.assertTrue(() -> checkValid.get() > 0, 1000); + Wait.assertTrue(() -> !diskUsageOk.get(), 1000); AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT)); AmqpConnection connection = addConnection(client.connect()); @@ -60,59 +121,52 @@ public void testProducerOnDiskFull() throws Exception { AmqpSender sender = session.createSender(getQueueName()); byte[] payload = new byte[1000]; - AmqpSender anonSender = session.createSender(); CountDownLatch sentWithName = new CountDownLatch(1); CountDownLatch sentAnon = new CountDownLatch(1); - Thread threadWithName = new Thread(() -> { + ExecutorService pool = Executors.newCachedThreadPool(); + runAfter(pool::shutdownNow); + pool.execute(() -> { try { final AmqpMessage message = new AmqpMessage(); message.setBytes(payload); sender.setSendTimeout(-1); sender.send(message); } catch (Exception e) { - e.printStackTrace(); + logger.warn("Caught exception while sending", e); } finally { sentWithName.countDown(); } }); - - threadWithName.start(); - - - Thread threadWithAnon = new Thread(() -> { + pool.execute(()-> { try { final AmqpMessage message = new AmqpMessage(); message.setBytes(payload); anonSender.setSendTimeout(-1); message.setAddress(getQueueName()); anonSender.send(message); - } catch (Exception e) { - e.printStackTrace(); - } finally { sentAnon.countDown(); + } catch (Exception e) { + logger.warn("Caught exception while sending", e); } }); - threadWithAnon.start(); - assertFalse(sentWithName.await(500, TimeUnit.MILLISECONDS), "Thread sender should be blocked"); assertFalse(sentAnon.await(500, TimeUnit.MILLISECONDS), "Thread sender anonymous should be blocked"); monitor.setMaxUsage(100.0); + Wait.assertTrue(() -> checkValid.get() > 0, 1000); + Wait.assertTrue(() -> diskUsageOk.get(), 1000); + assertTrue(sentWithName.await(30, TimeUnit.SECONDS), "Thread sender should be released"); assertTrue(sentAnon.await(30, TimeUnit.SECONDS), "Thread sender anonymous should be released"); - - threadWithName.join(TimeUnit.SECONDS.toMillis(30)); - threadWithAnon.join(TimeUnit.SECONDS.toMillis(30)); - assertFalse(threadWithName.isAlive()); - assertFalse(threadWithAnon.isAlive()); } finally { connection.close(); } } + }