From 17dd8995dcc00ffcea73f848e17eb802c96e948c Mon Sep 17 00:00:00 2001 From: Thomas Lavocat Date: Tue, 3 Dec 2024 15:50:56 +0100 Subject: [PATCH] WIP: testing another placement for filtering --- .../amqp/broker/AMQPSessionCallback.java | 6 ++++++ .../amqp/connect/AMQPBrokerConnection.java | 6 +++--- .../mirror/AMQPMirrorControllerSource.java | 5 +++-- .../proton/ProtonServerSenderContext.java | 19 ++++++++++--------- .../core/server/impl/ServerConsumerImpl.java | 6 ++++++ .../spi/core/protocol/SessionCallback.java | 4 ++++ 6 files changed, 32 insertions(+), 14 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index b93a1280bd5..17f42748cdc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -758,6 +758,12 @@ public int sendMessage(MessageReference ref, ServerConsumer consumer, int delive } + @Override + public boolean filterRef(MessageReference ref, ServerConsumer consumer) { + ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext(); + return plugSender.filterRef(ref); + } + @Override public int sendLargeMessage(MessageReference ref, ServerConsumer consumer, diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index dab59c69a30..bfd7cf9fddf 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -469,7 +469,7 @@ private void doConnect() { queue.getName().toString(), mirrorControllerSource::setLink, (r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)), - (r) -> mirrorControllerSource.filterMessage(r), + (r) -> mirrorControllerSource.shouldFilterRef(r), server.getNodeID().toString(), desiredCapabilities, null, @@ -776,7 +776,7 @@ private void connectSender(Queue queue, String targetName, java.util.function.Consumer senderConsumer, java.util.function.Consumer beforeDeliver, - java.util.function.Predicate beforeDeliverFiltering, + java.util.function.Predicate shouldFilterRef, String brokerID, Symbol[] desiredCapabilities, Symbol[] targetCapabilities, @@ -837,7 +837,7 @@ private void connectSender(Queue queue, // Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> { - ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setBeforeDeliveryFiltering(beforeDeliverFiltering); + ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setShouldFilterRef(shouldFilterRef); try { if (!cancelled.get()) { if (futureTimeout != null) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index b1c01a94099..dae23699a7f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -506,7 +506,7 @@ public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier, * @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value * than the remoteMirrorID, false otherwise. */ - public boolean filterMessage(MessageReference ref) { + public boolean shouldFilterRef(MessageReference ref) { Object filterID = ref.getMessage().getAnnotation(RECEIVER_ID_FILTER); if (filterID != null) { String remoteMirrorId = getRemoteMirrorId(); @@ -514,6 +514,7 @@ public boolean filterMessage(MessageReference ref) { if (remoteMirrorId.equals(filterID)) { return false; } else { + logger.trace("filtering message {} as remote mirror ID {} diverges from the wanted receiver {}", ref, remoteMirrorId, filterID); return true; } } @@ -607,7 +608,7 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin noForwardSource = String.valueOf(ref.getMessage().getBrokerProperty(NO_FORWARD_SOURCE)); if (remoteMirrorId != null && !remoteMirrorId.equals(noForwardSource)) { if (logger.isInfoEnabled()) { - logger.info("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId); + logger.trace("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId); } return; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index b0a13c08d61..767c67cf6eb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -89,7 +89,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private int credits = 0; private AtomicInteger pending = new AtomicInteger(0); private java.util.function.Consumer beforeDelivery; - private java.util.function.Predicate beforeDeliveryFiltering; + private java.util.function.Predicate shouldFilterRef; protected volatile Runnable afterDelivery; protected volatile MessageWriter messageWriter = SenderController.REJECTING_MESSAGE_WRITER; @@ -121,8 +121,8 @@ public ProtonServerSenderContext setBeforeDelivery(java.util.function.Consumer beforeDeliveryFiltering) { - this.beforeDeliveryFiltering = beforeDeliveryFiltering; + public ProtonServerSenderContext setShouldFilterRef(java.util.function.Predicate shouldFilterRef) { + this.shouldFilterRef = shouldFilterRef; return this; } @@ -447,6 +447,13 @@ private boolean handleExtendedDeliveryOutcomes(Message message, Delivery deliver return handled; } + public boolean filterRef(MessageReference ref) { + if (shouldFilterRef != null) { + return shouldFilterRef.test(ref); + } + return false; + } + private final class ConnectionFlushIOCallback implements IOCallback { @Override @@ -481,12 +488,6 @@ public int deliverMessage(final MessageReference messageReference, final ServerC beforeDelivery.accept(messageReference); } - if (beforeDeliveryFiltering != null) { - if (beforeDeliveryFiltering.test(messageReference)) { - return 0; - } - } - synchronized (creditsLock) { if (sender.getLocalState() == EndpointState.CLOSED) { return 0; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index bab579a5c0b..a470bc635fb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -428,6 +428,12 @@ public HandleStatus handle(final MessageReference ref) throws Exception { return HandleStatus.NO_MATCH; } + if (callback != null && callback.filterRef(ref, ServerConsumerImpl.this)) { + if (logger.isDebugEnabled()) { + logger.trace("Reference {} is not allowed to be consumed by {} due to message filtering.", ref, this); + } + return HandleStatus.NO_MATCH; + } synchronized (lock) { // If the consumer is stopped then we don't accept the message, it diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index e20f01277f2..7b45d03140a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -98,4 +98,8 @@ default void close(boolean failed) { default Transaction getCurrentTransaction() { return null; } + + default boolean filterRef(MessageReference ref, ServerConsumer consumer) { + return false; + } }