Skip to content

Commit

Permalink
ARTEMIS-5002 AMQP producer not unblock if the disk space is freed
Browse files Browse the repository at this point in the history
  • Loading branch information
howardgao committed Aug 21, 2024
1 parent ac666f3 commit 4dd3911
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,7 @@ public boolean checkMemory(boolean runOnFailure, Runnable runWhenAvailableParame
if (isFull()) {
if (runOnFailure && runWhenAvailable != null) {
addToBlockList(runWhenAvailable, blockedCallback);
pagingManager.addBlockedStore(this);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;

import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(ParameterizedTestExtension.class)
public class GlobalDiskFullFailPolicyTest extends GlobalDiskFullTest {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Parameter(index = 0)
public AddressFullMessagePolicy addressFullPolicy;

@Parameters(name = "addressFullPolicy={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{AddressFullMessagePolicy.FAIL}, {AddressFullMessagePolicy.DROP}
});
}

@Override
protected void configureAddressPolicy(ActiveMQServer server) {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(addressFullPolicy);
server.getConfiguration().addAddressSetting("TEST",addressSettings);
}

@TestTemplate
@Override
public void testProducerOnDiskFull() throws Exception {
FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
final CountDownLatch latch = new CountDownLatch(1);
monitor.addCallback((usableSpace, totalSpace, ok, type) -> {
latch.countDown();
});

assertTrue(latch.await(1, TimeUnit.MINUTES));

AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT));
AmqpConnection connection = addConnection(client.connect());

try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("TEST");
byte[] payload = new byte[1000];

AmqpSender anonSender = session.createSender();

CountDownLatch sentWithName = new CountDownLatch(1);
CountDownLatch sentAnon = new CountDownLatch(1);

Thread threadWithName = new Thread(() -> {
try {
final AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
sender.setSendTimeout(-1);
sender.send(message);
} catch (Exception e) {
logger.warn("Caught exception while sending", e);
} finally {
sentWithName.countDown();
}
});

Thread threadWithAnon = new Thread(() -> {
try {
final AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
anonSender.setSendTimeout(-1);
message.setAddress(getQueueName());
anonSender.send(message);
sentAnon.countDown();
} catch (Exception e) {
logger.warn("Caught exception while sending", e);
}
});

ExecutorService pool = Executors.newCachedThreadPool();
runAfter(pool::shutdownNow);

pool.execute(threadWithName);
pool.execute(threadWithAnon);

assertFalse(sentWithName.await(500, TimeUnit.MILLISECONDS), "Thread sender should be blocked");
assertFalse(sentAnon.await(500, TimeUnit.MILLISECONDS), "Thread sender anonymous should be blocked");
monitor.setMaxUsage(100.0);

assertTrue(sentWithName.await(30, TimeUnit.SECONDS), "Thread sender should be released");
assertTrue(sentAnon.await(30, TimeUnit.SECONDS), "Thread sender anonymous should be released");

} finally {
connection.close();
}
}
}

0 comments on commit 4dd3911

Please sign in to comment.