Skip to content

Commit

Permalink
ARTEMIS-5037: option to limit mirror propagation
Browse files Browse the repository at this point in the history
Add a new option in the Mirror settings to prevent a broker from
propagating messages.

When working with a topology where 4 nodes are forming a square and
where each node in that square mirrors its two neighbors: a message
leaving a corner can reach the opposite corner of the square by two
different routes. This is causing the message ordering to get broken.

example:
1 <-> 2
^     ^
|     |
v     v
4 <-> 3

A message from 1 will reach 3 by 2 and 4. Message duplication checks
will prevent the message from being duplicated but won't help regarding
the order of the messages. This is because a either the route by 2 or 4
can be faster than the other, so whomever wins the race sets the message
first.

Fixing the example:
Using the new option to not forward messages coming from a link, we
break the possibilities to have two routes to reach the opposite corner.

The above example is updated as followed:
* 2 never forwards messages coming from 1
* 1 never forwards messages coming from 2
* 3 never forwards messages coming from 4
* 4 never forwards messages coming from 3

Now, when a messages leaves 1:
* it reaches 2 and stops there
* it reaches 4
* it reaches 3 through 4 and stops there

Now, when a messages leaves 2:
* it reaches 1 and stops there
* it reaches 3
* it reaches 4 through 3 and stops there

Now, when a messages leaves 3:
* it reaches 4 and stops there
* it reaches 2
* it reaches 1 through 2 and stops there

Now, when a messages leaves 4:
* it reaches 3 and stops there
* it reaches 1
* it reaches 2 through 1 and stops there

The new test AMQPSquareMirroringTest.java is testing this exact setup.
  • Loading branch information
lavocatt committed Nov 22, 2024
1 parent 828e112 commit aaec2d8
Show file tree
Hide file tree
Showing 12 changed files with 613 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.connect;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -430,16 +431,19 @@ private void doConnect() {
final Queue queue = server.locateQueue(getMirrorSNF(replica));

final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica);
final Symbol[] desiredCapabilities;

ArrayList<Symbol> desiredCapabilitiesList = new ArrayList<>();
desiredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
if (coreTunnelingEnabled) {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
} else {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
desiredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
}
if (replica.getNoForward()) {
desiredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD);
}

final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
final Symbol[] desiredCapabilities = (Symbol[]) desiredCapabilitiesList.toArray(new Symbol[]{});

final Symbol[] requiredOfferedCapabilities = replica.getNoForward() ? new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, AMQPMirrorControllerSource.NO_FORWARD} : new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};

connectSender(queue,
queue.getName().toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,9 +90,15 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
// Capabilities
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");
public static final Symbol NO_FORWARD_SOURCE = Symbol.getSymbol("amq.no.forward.source");
public static final Symbol RECEIVER_ID_FILTER = Symbol.getSymbol("amq.receiver.id.filter");

public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString());
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString());
public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString());
public static final SimpleString INTERNAL_NO_FORWARD_SOURCE = SimpleString.of(NO_FORWARD_SOURCE.toString());
public static final SimpleString INTERNAL_RECEIVER_ID_FILTER = SimpleString.of(RECEIVER_ID_FILTER.toString());

private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));

Expand Down Expand Up @@ -230,12 +237,17 @@ public void addAddress(AddressInfo addressInfo) throws Exception {
public void deleteAddress(AddressInfo addressInfo) throws Exception {
logger.trace("{} deleteAddress {}", server, addressInfo);

if (isBlockedByNoForward()) {
return;
}

if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
return;
}
if (ignoreAddress(addressInfo.getName())) {
return;
}

if (deleteQueues) {
Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON());
routeMirrorCommand(server, message);
Expand All @@ -246,6 +258,10 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception {
public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
logger.trace("{} createQueue {}", server, queueConfiguration);

if (isBlockedByNoForward()) {
return;
}

if (invalidTarget(getControllerInUse()) || queueConfiguration.isInternal()) {
if (logger.isTraceEnabled()) {
logger.trace("Rejecting ping pong on create {} as isInternal={} and mirror target = {}", queueConfiguration, queueConfiguration.isInternal(), getControllerInUse());
Expand All @@ -264,6 +280,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
}
return;
}

if (addQueues) {
Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON());
routeMirrorCommand(server, message);
Expand All @@ -276,6 +293,10 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti
logger.trace("{} deleteQueue {}/{}", server, address, queue);
}

if (isBlockedByNoForward()) {
return;
}

if (invalidTarget(getControllerInUse())) {
return;
}
Expand Down Expand Up @@ -310,6 +331,14 @@ private boolean invalidTarget(MirrorController controller) {
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
}

private boolean isBlockedByNoForward() {
return getControllerInUse() != null && getControllerInUse().isNoForward();
}

private boolean isBlockedByNoForward(Message message) {
return isBlockedByNoForward() || Boolean.TRUE.equals(message.getBrokerProperty(INTERNAL_NO_FORWARD));
}

private boolean ignoreAddress(SimpleString address) {
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
return true;
Expand Down Expand Up @@ -338,6 +367,12 @@ Message copyMessageForPaging(Message message) {
public void sendMessage(Transaction tx, Message message, RoutingContext context) {
SimpleString address = context.getAddress(message);

if (isBlockedByNoForward(message)) {
String remoteID = getRemoteMirrorId();
logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server);
return;
}

if (context.isInternal()) {
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
return;
Expand All @@ -353,6 +388,8 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
return;
}

logger.trace("sendMessage::{} send message {}", server, message);

try {
context.setReusable(false);

Expand Down Expand Up @@ -543,6 +580,17 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, reason);
}

SimpleString noForwardSource = null;
if (Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD))) {
noForwardSource = (SimpleString) ref.getMessage().getBrokerProperty(INTERNAL_NO_FORWARD_SOURCE);
String remoteMirrorId = getRemoteMirrorId();
if (remoteMirrorId != null) {
if (!SimpleString.of(remoteMirrorId).equals(noForwardSource)) {
return;
}
}
}

MirrorController controllerInUse = getControllerInUse();

// Retried ACKs are not forwarded.
Expand Down Expand Up @@ -578,6 +626,9 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
Message messageCommand = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
if (noForwardSource != null) {
messageCommand.setBrokerProperty(INTERNAL_RECEIVER_ID_FILTER, noForwardSource);
}
if (sync) {
OperationContext operationContext;
operationContext = OperationContextImpl.getContext(server.getExecutorFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
Expand All @@ -53,6 +53,7 @@
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.utils.ByteUtil;
Expand All @@ -77,29 +78,39 @@
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD_SOURCE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.TARGET_QUEUES;
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;

public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController {
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements TargetMirrorController {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<TargetMirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();

public static void setControllerInUse(MirrorController controller) {
public static void setControllerInUse(TargetMirrorController controller) {
CONTROLLER_THREAD_LOCAL.set(controller);
}

public static MirrorController getControllerInUse() {
public static TargetMirrorController getControllerInUse() {
return CONTROLLER_THREAD_LOCAL.get();
}

private boolean noMessageForwarding = false;

@Override
public boolean isNoForward() {
return noMessageForwarding;
}

/**
* Objects of this class can be used by either transaction or by OperationContext.
* It is important that when you're using the transactions you clear any references to
Expand Down Expand Up @@ -248,6 +259,7 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
this.configuration = server.getConfiguration();
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
mirrorContext = protonSession.getSessionSPI().getSessionContext();
this.noMessageForwarding = AmqpSupport.verifyDesiredCapability(receiver, NO_FORWARD);
}

@Override
Expand Down Expand Up @@ -534,6 +546,10 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat

message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID);
if (noMessageForwarding) {
message.setBrokerProperty(INTERNAL_NO_FORWARD, true);
message.setBrokerProperty(INTERNAL_NO_FORWARD_SOURCE, getRemoteMirrorId());
}

if (internalAddress != null) {
message.setAddress(internalAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
Expand Down Expand Up @@ -237,7 +237,7 @@ private boolean isEmpty(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Que

// to be used with the same executor as the PagingStore executor
public void retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
TargetMirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
logger.trace("retrying address {} on server {}", address, server);
try {
AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController);
Expand Down Expand Up @@ -518,7 +518,7 @@ private void deliveryAsync(JournalHashMap<AckRetry, AckRetry, Queue> map) {



private static class DisabledAckMirrorController implements MirrorController {
private static class DisabledAckMirrorController implements TargetMirrorController {

@Override
public boolean isRetryACK() {
Expand Down Expand Up @@ -564,5 +564,10 @@ public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reaso
public String getRemoteMirrorId() {
return null;
}

@Override
public boolean isNoForward() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.slf4j.Logger;
Expand All @@ -34,7 +34,7 @@ public class MirrorTransaction extends TransactionImpl {

boolean allowPageTransaction;

MirrorController controlInUse;
TargetMirrorController controlInUse;

public MirrorTransaction(StorageManager storageManager) {
super(storageManager);
Expand All @@ -44,7 +44,7 @@ public MirrorTransaction(StorageManager storageManager) {

@Override
protected synchronized void afterCommit(List<TransactionOperation> operationsToComplete) {
MirrorController beforeController = AMQPMirrorControllerTarget.getControllerInUse();
TargetMirrorController beforeController = AMQPMirrorControllerTarget.getControllerInUse();
AMQPMirrorControllerTarget.setControllerInUse(controlInUse);
try {
super.afterCommit(operationsToComplete);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.proton;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -458,14 +459,19 @@ private void handleReplicaTargetLinkOpened(AMQPSessionContext protonSession, Rec
return;
}

ArrayList<Symbol> offeredCapabilitiesList = new ArrayList<>();
offeredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
// We need to check if the remote desires to send us tunneled core messages or not, and if
// we support that we need to offer that back so it knows it can actually do core tunneling.
if (verifyDesiredCapability(receiver, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT)) {
receiver.setOfferedCapabilities(new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT});
} else {
receiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY});
offeredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
}

// If the remote wants us to not forward any messages to other mirrors we need to offer that capability
if (verifyDesiredCapability(receiver, AMQPMirrorControllerSource.NO_FORWARD)) {
offeredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD);
}
receiver.setOfferedCapabilities((Symbol[]) offeredCapabilitiesList.toArray(new Symbol[]{}));

protonSession.addReplicaTarget(receiver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme

boolean queueCreation = true;

boolean noForward = false;

boolean queueRemoval = true;

boolean messageAcknowledgements = true;
Expand Down Expand Up @@ -75,6 +77,15 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation)
return this;
}

public boolean getNoForward() {
return noForward;
}

public AMQPMirrorBrokerConnectionElement setNoForward(boolean noForward) {
this.noForward = noForward;
return this;
}

public boolean isQueueRemoval() {
return queueRemoval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2194,10 +2194,11 @@ private void parseAMQPBrokerConnections(final Element e,
boolean durable = getBooleanAttribute(e2, "durable", true);
boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true);
boolean sync = getBooleanAttribute(e2, "sync", false);
boolean noForward = !getBooleanAttribute(e2, "no-forward", false);
String addressFilter = getAttributeValue(e2, "address-filter");

AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement();
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync);
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync).setNoForward(noForward);
connectionElement = amqpMirrorConnectionElement;
connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR);

Expand Down
Loading

0 comments on commit aaec2d8

Please sign in to comment.