Skip to content

Commit

Permalink
ARTEMIS-5156 Making LargeMessageFrozenTest more reliable
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Nov 15, 2024
1 parent bc077f4 commit 8d2cde2
Showing 1 changed file with 85 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,32 +87,12 @@ public void testFreezeAMQP() throws Exception {
public void testFreeze(String protocol) throws Exception {
startProxy();

ConnectionFactory factory;
switch (protocol.toUpperCase(Locale.ROOT)) {
case "CORE":
ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod());
assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL());
assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize());
factory = artemisfactory;
break;
case "AMQP":
JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=1000&jms.prefetchPolicy.all=2");
factory = qpidFactory;
break;
default:
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:33333");
}

org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(QueueConfiguration.of(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
int NUMBER_OF_MESSAGES = 10;

Connection connection = factory.createConnection();
runAfter(connection::close);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getName());
ConnectionFactory proxiedFactory = createProxiedFactory(protocol);
ConnectionFactory regularfactory = createRegularCF(protocol);

assertEquals(1, proxy.getInboundHandlers().size());
assertEquals(1, proxy.getOutbounddHandlers().size());
org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(QueueConfiguration.of(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));

String body;
{
Expand All @@ -123,17 +103,28 @@ public void testFreeze(String protocol) throws Exception {
body = buffer.toString();
}

int NUMBER_OF_MESSAGES = 10;
try (Connection connection = regularfactory.createConnection()) {
runAfter(connection::close);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getName());

MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage(body));
}
session.commit();

MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage(body));
}
session.commit();

Connection connection = proxiedFactory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getName());

assertEquals(1, proxy.getInboundHandlers().size());
assertEquals(1, proxy.getOutbounddHandlers().size());

MessageConsumer consumer = session.createConsumer(queue);
connection.start();

boolean failed = false;

for (int repeat = 0; repeat < 5; repeat++) {
Expand All @@ -152,7 +143,7 @@ public void testFreeze(String protocol) throws Exception {
assertTrue(failed);
server.getRemotingService().getConnections().forEach(r -> r.fail(new ActiveMQException("forced failure")));

connection = factory.createConnection();
connection = proxiedFactory.createConnection();
connection.start();
runAfter(connection::close);
session = connection.createSession(true, Session.SESSION_TRANSACTED);
Expand All @@ -167,7 +158,6 @@ public void testFreeze(String protocol) throws Exception {
}

Wait.assertEquals(0, () -> {
System.gc();
return server.getConfiguration().getLargeMessagesLocation().listFiles().length;
});
}
Expand All @@ -189,26 +179,12 @@ public void testRemoveConsumerOpenWire() throws Exception {

public void testRemoveConsumer(String protocol) throws Exception {

ConnectionFactory factory;
switch (protocol.toUpperCase(Locale.ROOT)) {
case "CORE":
ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:44444?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod());
assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL());
assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize());
factory = artemisfactory;
break;
case "AMQP":
JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:44444?amqp.idleTimeout=300&jms.prefetchPolicy.all=10");
factory = qpidFactory;
break;
default:
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:44444");
}
int NUMBER_OF_MESSAGES = 10;

ConnectionFactory regularCF = createRegularCF(protocol);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(QueueConfiguration.of(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));

Connection connection = factory.createConnection();
Connection connection = regularCF.createConnection();
runAfter(connection::close);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getName());
Expand All @@ -217,13 +193,11 @@ public void testRemoveConsumer(String protocol) throws Exception {
{
StringBuffer buffer = new StringBuffer();
while (buffer.length() < 300 * 1024) {
buffer.append("Not so big, but big!!");
buffer.append("BLA BLA BLA... BLAH BLAH BLAH ... ");
}
body = buffer.toString();
}

int NUMBER_OF_MESSAGES = 10;

MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage(body));
Expand Down Expand Up @@ -256,7 +230,7 @@ public void testRemoveConsumer(String protocol) throws Exception {
}
server.getRemotingService().getConnections().forEach(r -> r.fail(new ActiveMQException("forced failure")));

connection = factory.createConnection();
connection = regularCF.createConnection();
runAfter(connection::close);

session = connection.createSession(true, Session.SESSION_TRANSACTED);
Expand Down Expand Up @@ -294,33 +268,11 @@ public void testFreezeAutoAckAMQP() throws Exception {
public void testFreezeAutoAck(String protocol) throws Exception {

startProxy();
ConnectionFactory factory;
switch (protocol.toUpperCase(Locale.ROOT)) {
case "CORE":
ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod());
assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL());
assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize());
factory = artemisfactory;
break;
case "AMQP":
JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=1000&jms.prefetchPolicy.all=2");
factory = qpidFactory;
break;
default:
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:33333");
}
ConnectionFactory proxiedFactory = createProxiedFactory(protocol);
ConnectionFactory regularCF = createRegularCF(protocol);

org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(QueueConfiguration.of(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));

Connection connection = factory.createConnection();
runAfter(connection::close);
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sessionConsumer.createQueue(getName());

assertEquals(1, proxy.getInboundHandlers().size());
assertEquals(1, proxy.getOutbounddHandlers().size());

String body;
{
StringBuffer buffer = new StringBuffer();
Expand All @@ -330,22 +282,31 @@ public void testFreezeAutoAck(String protocol) throws Exception {
body = buffer.toString();
}

int NUMBER_OF_MESSAGES = 40;
try (Connection connection = regularCF.createConnection()) {
runAfter(connection::close);

try (Session sessionProducer = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) {
MessageProducer producer = sessionProducer.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(sessionConsumer.createTextMessage(body));
int NUMBER_OF_MESSAGES = 40;

try (Session sessionProducer = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) {
Queue queue = sessionProducer.createQueue(getName());
MessageProducer producer = sessionProducer.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(sessionProducer.createTextMessage(body));
}
sessionProducer.commit();
}
sessionProducer.commit();
}

MessageConsumer consumer = sessionConsumer.createConsumer(queue);
connection.start();

boolean failed = false;
try (Connection connection = proxiedFactory.createConnection()) {
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sessionConsumer.createQueue(getName());
MessageConsumer consumer = sessionConsumer.createConsumer(queue);
connection.start();

assertEquals(1, proxy.getInboundHandlers().size());
assertEquals(1, proxy.getOutbounddHandlers().size());

try {
for (int i = 0; i < 10; i++) {
consumer.receive(5000);
}
Expand All @@ -363,26 +324,51 @@ public void testFreezeAutoAck(String protocol) throws Exception {

assertTrue(failed);

connection = factory.createConnection();
connection.start();
runAfter(connection::close);
sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = sessionConsumer.createQueue(getName());
consumer = sessionConsumer.createConsumer(queue);
try (Connection connection = regularCF.createConnection()) {
connection.start();
runAfter(connection::close);
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sessionConsumer.createQueue(getName());
MessageConsumer consumer = sessionConsumer.createConsumer(queue);

for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
assertNotNull(message);
assertEquals(body, message.getText());
}

for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
assertNotNull(message);
assertEquals(body, message.getText());
assertNull(consumer.receiveNoWait());
}

assertNull(consumer.receiveNoWait());

assertEquals(0L, serverQueue.getMessageCount());

Wait.assertEquals(0, () -> {
System.gc();
return server.getConfiguration().getLargeMessagesLocation().listFiles().length;
});
}

private static ConnectionFactory createRegularCF(String protocol) {
return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
}

private static ConnectionFactory createProxiedFactory(String protocol) {
ConnectionFactory factory;
switch (protocol.toUpperCase(Locale.ROOT)) {
case "CORE":
ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod());
assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL());
assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize());
factory = artemisfactory;
break;
case "AMQP":
JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=1000&jms.prefetchPolicy.all=2");
factory = qpidFactory;
break;
default:
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:33333");
}
return factory;
}
}

0 comments on commit 8d2cde2

Please sign in to comment.