diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index b93a1280bd5..17f42748cdc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -758,6 +758,12 @@ public int sendMessage(MessageReference ref, ServerConsumer consumer, int delive } + @Override + public boolean filterRef(MessageReference ref, ServerConsumer consumer) { + ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext(); + return plugSender.filterRef(ref); + } + @Override public int sendLargeMessage(MessageReference ref, ServerConsumer consumer, diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index dab59c69a30..bfd7cf9fddf 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -469,7 +469,7 @@ private void doConnect() { queue.getName().toString(), mirrorControllerSource::setLink, (r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)), - (r) -> mirrorControllerSource.filterMessage(r), + (r) -> mirrorControllerSource.shouldFilterRef(r), server.getNodeID().toString(), desiredCapabilities, null, @@ -776,7 +776,7 @@ private void connectSender(Queue queue, String targetName, java.util.function.Consumer senderConsumer, java.util.function.Consumer beforeDeliver, - java.util.function.Predicate beforeDeliverFiltering, + java.util.function.Predicate shouldFilterRef, String brokerID, Symbol[] desiredCapabilities, Symbol[] targetCapabilities, @@ -837,7 +837,7 @@ private void connectSender(Queue queue, // Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> { - ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setBeforeDeliveryFiltering(beforeDeliverFiltering); + ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setShouldFilterRef(shouldFilterRef); try { if (!cancelled.get()) { if (futureTimeout != null) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index b1c01a94099..dae23699a7f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -506,7 +506,7 @@ public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier, * @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value * than the remoteMirrorID, false otherwise. */ - public boolean filterMessage(MessageReference ref) { + public boolean shouldFilterRef(MessageReference ref) { Object filterID = ref.getMessage().getAnnotation(RECEIVER_ID_FILTER); if (filterID != null) { String remoteMirrorId = getRemoteMirrorId(); @@ -514,6 +514,7 @@ public boolean filterMessage(MessageReference ref) { if (remoteMirrorId.equals(filterID)) { return false; } else { + logger.trace("filtering message {} as remote mirror ID {} diverges from the wanted receiver {}", ref, remoteMirrorId, filterID); return true; } } @@ -607,7 +608,7 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin noForwardSource = String.valueOf(ref.getMessage().getBrokerProperty(NO_FORWARD_SOURCE)); if (remoteMirrorId != null && !remoteMirrorId.equals(noForwardSource)) { if (logger.isInfoEnabled()) { - logger.info("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId); + logger.trace("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId); } return; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index b0a13c08d61..fc2cee081d2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessage; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; @@ -89,7 +90,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private int credits = 0; private AtomicInteger pending = new AtomicInteger(0); private java.util.function.Consumer beforeDelivery; - private java.util.function.Predicate beforeDeliveryFiltering; + private java.util.function.Predicate shouldFilterRef; protected volatile Runnable afterDelivery; protected volatile MessageWriter messageWriter = SenderController.REJECTING_MESSAGE_WRITER; @@ -121,8 +122,8 @@ public ProtonServerSenderContext setBeforeDelivery(java.util.function.Consumer beforeDeliveryFiltering) { - this.beforeDeliveryFiltering = beforeDeliveryFiltering; + public ProtonServerSenderContext setShouldFilterRef(java.util.function.Predicate shouldFilterRef) { + this.shouldFilterRef = shouldFilterRef; return this; } @@ -447,6 +448,13 @@ private boolean handleExtendedDeliveryOutcomes(Message message, Delivery deliver return handled; } + public boolean filterRef(MessageReference ref) { + if (shouldFilterRef != null) { + return shouldFilterRef.test(ref); + } + return false; + } + private final class ConnectionFlushIOCallback implements IOCallback { @Override @@ -481,12 +489,6 @@ public int deliverMessage(final MessageReference messageReference, final ServerC beforeDelivery.accept(messageReference); } - if (beforeDeliveryFiltering != null) { - if (beforeDeliveryFiltering.test(messageReference)) { - return 0; - } - } - synchronized (creditsLock) { if (sender.getLocalState() == EndpointState.CLOSED) { return 0; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index bab579a5c0b..cc58439ce0a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -428,6 +428,12 @@ public HandleStatus handle(final MessageReference ref) throws Exception { return HandleStatus.NO_MATCH; } + if (callback != null && callback.filterRef(ref, ServerConsumerImpl.this)) { + if (logger.isDebugEnabled()) { + // TODO + } + return HandleStatus.NO_MATCH; + } synchronized (lock) { // If the consumer is stopped then we don't accept the message, it diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index e20f01277f2..7b45d03140a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -98,4 +98,8 @@ default void close(boolean failed) { default Transaction getCurrentTransaction() { return null; } + + default boolean filterRef(MessageReference ref, ServerConsumer consumer) { + return false; + } }