Skip to content

Commit

Permalink
ARTEMIS-5173 Improving reliability on SimpleNotificationService
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Dec 4, 2024
1 parent 9620f82 commit 8b8d552
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@
*/
package org.apache.activemq.artemis.tests.integration;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;

import org.apache.activemq.artemis.api.core.management.NotificationType;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleNotificationService implements NotificationService {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final List<NotificationListener> listeners = new ArrayList<>();

Expand Down Expand Up @@ -55,16 +60,56 @@ public void sendNotification(final Notification notification) throws Exception {

public static class Listener implements NotificationListener {

public synchronized int count(NotificationType... interestingTypes) {
if (logger.isDebugEnabled()) {
logger.debug("count for {}", stringOf(interestingTypes));
}
return (int) notifications.stream().filter(n -> matchTypes(n, interestingTypes)).count();
}

private static String stringOf(NotificationType[] types) {
StringBuilder builder = new StringBuilder();
builder.append("types[" + types.length + "] = {");
for (int i = 0; i < types.length; i++) {
builder.append(types[i]);
if (i + 1 < types.length) {
builder.append(",");
}
}
builder.append("}");
return builder.toString();
}

public synchronized int size() {
return notifications.size();
}

private boolean matchTypes(Notification notification, NotificationType... interestingTypes) {
logger.debug("matching {}", notification);
for (NotificationType t : interestingTypes) {
logger.debug("looking to match {} with type parameter {}", notification, t);
if (notification.getType() == t) {
return true;
}
}
return false;
}

public synchronized Notification findAny(NotificationType notificationType) {
return notifications.stream().filter(n -> n.getType() == notificationType).findAny().get();
}

////////////////////////////////////////////////////////////////////////////////////
// Note: Do not expose this collection directly.
// Instead, filter notifications by the types you are interested in.
// Previous tests validated whether this collection was empty, but later, new notifications were added.
// These tests became flaky as they received irrelevant notifications.
private final List<Notification> notifications = new ArrayList<>();

@Override
public void onNotification(final Notification notification) {
public synchronized void onNotification(final Notification notification) {
notifications.add(notification);
}

public List<Notification> getNotifications() {
return notifications;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -917,20 +918,18 @@ public void testDiscoveryGroupNotifications() throws Exception {

dg = newDiscoveryGroup(RandomUtil.randomString(), RandomUtil.randomString(), null, groupAddress, groupPort, timeout, notifService);

assertEquals(0, notifListener.getNotifications().size());
assertEquals(0, notifListener.count(CoreNotificationType.DISCOVERY_GROUP_STARTED));

dg.start();

assertEquals(1, notifListener.getNotifications().size());
Notification notif = notifListener.getNotifications().get(0);
Wait.assertEquals(1, () -> notifListener.count(CoreNotificationType.DISCOVERY_GROUP_STARTED), 5000, 100);
Notification notif = notifListener.findAny(CoreNotificationType.DISCOVERY_GROUP_STARTED);
assertEquals(CoreNotificationType.DISCOVERY_GROUP_STARTED, notif.getType());
assertEquals(dg.getName(), notif.getProperties().getSimpleStringProperty(SimpleString.of("name")).toString());

dg.stop();

assertEquals(2, notifListener.getNotifications().size());
notif = notifListener.getNotifications().get(1);
assertEquals(CoreNotificationType.DISCOVERY_GROUP_STOPPED, notif.getType());
assertEquals(2, notifListener.count(CoreNotificationType.DISCOVERY_GROUP_STARTED, CoreNotificationType.DISCOVERY_GROUP_STOPPED));
assertEquals(dg.getName(), notif.getProperties().getSimpleStringProperty(SimpleString.of("name")).toString());
}

Expand All @@ -947,19 +946,18 @@ public void testBroadcastGroupNotifications() throws Exception {

bg.setNotificationService(notifService);

assertEquals(0, notifListener.getNotifications().size());
assertEquals(0, notifListener.count(CoreNotificationType.BROADCAST_GROUP_STARTED));

bg.start();
Wait.assertEquals(1, () -> notifListener.count(CoreNotificationType.BROADCAST_GROUP_STARTED), 5000, 100);

assertEquals(1, notifListener.getNotifications().size());
Notification notif = notifListener.getNotifications().get(0);
assertEquals(CoreNotificationType.BROADCAST_GROUP_STARTED, notif.getType());
Notification notif = notifListener.findAny(CoreNotificationType.BROADCAST_GROUP_STARTED);
assertEquals(bg.getName(), notif.getProperties().getSimpleStringProperty(SimpleString.of("name")).toString());

bg.stop();

assertEquals(2, notifListener.getNotifications().size());
notif = notifListener.getNotifications().get(1);
Wait.assertEquals(2, () -> notifListener.count(CoreNotificationType.BROADCAST_GROUP_STARTED, CoreNotificationType.BROADCAST_GROUP_STOPPED), 5000, 100);
notif = notifListener.findAny(CoreNotificationType.BROADCAST_GROUP_STOPPED);
assertEquals(CoreNotificationType.BROADCAST_GROUP_STOPPED, notif.getType());
assertEquals(bg.getName(), notif.getProperties().getSimpleStringProperty(SimpleString.of("name")).toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,11 @@
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.Test;

public class AcceptorControlTest extends ManagementTestBase {



public boolean usingCore() {
return false;
}

@Test
public void testAttributes() throws Exception {
TransportConfiguration acceptorConfig = new TransportConfiguration(InVMAcceptorFactory.class.getName(), new HashMap<>(), RandomUtil.randomString());
Expand Down Expand Up @@ -134,38 +129,22 @@ public void testNotifications() throws Exception {

service.getManagementService().addNotificationListener(notifListener);

assertEquals(0, notifListener.getNotifications().size());
assertEquals(0, notifListener.count(CoreNotificationType.ACCEPTOR_STOPPED));

acceptorControl.stop();

assertEquals(usingCore() ? 7 : 1, notifListener.getNotifications().size());

int i = findNotification(notifListener, CoreNotificationType.ACCEPTOR_STOPPED);

Notification notif = notifListener.getNotifications().get(i);
Notification notif = notifListener.findAny(CoreNotificationType.ACCEPTOR_STOPPED);
assertEquals(CoreNotificationType.ACCEPTOR_STOPPED, notif.getType());
assertEquals(NettyAcceptorFactory.class.getName(), notif.getProperties().getSimpleStringProperty(SimpleString.of("factory")).toString());

acceptorControl.start();

i = findNotification(notifListener, CoreNotificationType.ACCEPTOR_STARTED);
notif = notifListener.getNotifications().get(i);
Wait.assertEquals(1, () -> notifListener.count(CoreNotificationType.ACCEPTOR_STARTED), 5000, 100);
notif = notifListener.findAny(CoreNotificationType.ACCEPTOR_STARTED);
assertEquals(CoreNotificationType.ACCEPTOR_STARTED, notif.getType());
assertEquals(NettyAcceptorFactory.class.getName(), notif.getProperties().getSimpleStringProperty(SimpleString.of("factory")).toString());
}

private int findNotification(SimpleNotificationService.Listener notifListener, CoreNotificationType type) {
int i = 0;
for (i = 0; i < notifListener.getNotifications().size(); i++) {
if (notifListener.getNotifications().get(i).getType().equals(type)) {
break;
}
}
assertTrue(i < notifListener.getNotifications().size());
return i;
}



protected AcceptorControl createManagementControl(final String name) throws Exception {
return ManagementControlHelper.createAcceptorControl(name, mbeanServer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ public void stop() throws Exception {
}


@Override
public boolean usingCore() {
return true;
}

@Override
@Test
public void testStartStop() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -102,19 +103,19 @@ public void testNotifications() throws Exception {

server_0.getManagementService().addNotificationListener(notifListener);

assertEquals(0, notifListener.getNotifications().size());
assertEquals(0, notifListener.count(CoreNotificationType.BRIDGE_STOPPED));

bridgeControl.stop();

assertEquals(1, notifListener.getNotifications().size());
Notification notif = notifListener.getNotifications().get(0);
Wait.assertEquals(1, () -> notifListener.count(CoreNotificationType.BRIDGE_STOPPED), 5000, 100);
Notification notif = notifListener.findAny(CoreNotificationType.BRIDGE_STOPPED);
assertEquals(CoreNotificationType.BRIDGE_STOPPED, notif.getType());
assertEquals(bridgeControl.getName(), notif.getProperties().getSimpleStringProperty(SimpleString.of("name")).toString());

bridgeControl.start();

assertEquals(2, notifListener.getNotifications().size());
notif = notifListener.getNotifications().get(1);
Wait.assertEquals(2, () -> notifListener.count(CoreNotificationType.BRIDGE_STOPPED, CoreNotificationType.BRIDGE_STARTED), 5000, 100);
notif = notifListener.findAny(CoreNotificationType.BRIDGE_STARTED);
assertEquals(CoreNotificationType.BRIDGE_STARTED, notif.getType());
assertEquals(bridgeControl.getName(), notif.getProperties().getSimpleStringProperty(SimpleString.of("name")).toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -151,41 +152,21 @@ public void testNotifications() throws Exception {
ClusterConnectionControl clusterConnectionControl = createManagementControl(clusterConnectionConfig1.getName());

server_0.getManagementService().addNotificationListener(notifListener);

assertEquals(0, notifListener.getNotifications().size());

assertEquals(0, notifListener.count(CoreNotificationType.CLUSTER_CONNECTION_STOPPED));
clusterConnectionControl.stop();

assertTrue(notifListener.getNotifications().size() > 0);
Notification notif = getFirstNotificationOfType(notifListener.getNotifications(), CoreNotificationType.CLUSTER_CONNECTION_STOPPED);
Wait.assertEquals(1, () -> notifListener.count(CoreNotificationType.CLUSTER_CONNECTION_STOPPED), 5000, 100);
Notification notif = notifListener.findAny(CoreNotificationType.CLUSTER_CONNECTION_STOPPED);
assertNotNull(notif);
assertEquals(clusterConnectionControl.getName(), notif.getProperties().getSimpleStringProperty(SimpleString.of("name")).toString());

clusterConnectionControl.start();

assertTrue(notifListener.getNotifications().size() > 0);
notif = getFirstNotificationOfType(notifListener.getNotifications(), CoreNotificationType.CLUSTER_CONNECTION_STARTED);
Wait.assertTrue(() -> notifListener.size() > 0, 5000, 100);
notif = notifListener.findAny(CoreNotificationType.CLUSTER_CONNECTION_STARTED);
assertNotNull(notif);
assertEquals(clusterConnectionControl.getName(), notif.getProperties().getSimpleStringProperty(SimpleString.of("name")).toString());
}

private Notification getFirstNotificationOfType(List<Notification> notifications, CoreNotificationType type) {
Notification result = null;

// the notifications can change while we're looping
List<Notification> notificationsClone = new ArrayList<>(notifications);

for (Notification notification : notificationsClone) {
if (notification.getType().equals(type)) {
result = notification;
}
}

return result;
}



@Override
@BeforeEach
public void setUp() throws Exception {
Expand Down

0 comments on commit 8b8d552

Please sign in to comment.