Skip to content

Commit

Permalink
WIP: testing another placement for filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
lavocatt committed Dec 4, 2024
1 parent 745193b commit 17dd899
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,12 @@ public int sendMessage(MessageReference ref, ServerConsumer consumer, int delive

}

@Override
public boolean filterRef(MessageReference ref, ServerConsumer consumer) {
ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
return plugSender.filterRef(ref);
}

@Override
public int sendLargeMessage(MessageReference ref,
ServerConsumer consumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ private void doConnect() {
queue.getName().toString(),
mirrorControllerSource::setLink,
(r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)),
(r) -> mirrorControllerSource.filterMessage(r),
(r) -> mirrorControllerSource.shouldFilterRef(r),
server.getNodeID().toString(),
desiredCapabilities,
null,
Expand Down Expand Up @@ -776,7 +776,7 @@ private void connectSender(Queue queue,
String targetName,
java.util.function.Consumer<Sender> senderConsumer,
java.util.function.Consumer<? super MessageReference> beforeDeliver,
java.util.function.Predicate<? super MessageReference> beforeDeliverFiltering,
java.util.function.Predicate<? super MessageReference> shouldFilterRef,
String brokerID,
Symbol[] desiredCapabilities,
Symbol[] targetCapabilities,
Expand Down Expand Up @@ -837,7 +837,7 @@ private void connectSender(Queue queue,

// Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened
sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> {
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setBeforeDeliveryFiltering(beforeDeliverFiltering);
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setShouldFilterRef(shouldFilterRef);
try {
if (!cancelled.get()) {
if (futureTimeout != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,14 +506,15 @@ public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier,
* @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value
* than the remoteMirrorID, false otherwise.
*/
public boolean filterMessage(MessageReference ref) {
public boolean shouldFilterRef(MessageReference ref) {
Object filterID = ref.getMessage().getAnnotation(RECEIVER_ID_FILTER);
if (filterID != null) {
String remoteMirrorId = getRemoteMirrorId();
if (remoteMirrorId != null) {
if (remoteMirrorId.equals(filterID)) {
return false;
} else {
logger.trace("filtering message {} as remote mirror ID {} diverges from the wanted receiver {}", ref, remoteMirrorId, filterID);
return true;
}
}
Expand Down Expand Up @@ -607,7 +608,7 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
noForwardSource = String.valueOf(ref.getMessage().getBrokerProperty(NO_FORWARD_SOURCE));
if (remoteMirrorId != null && !remoteMirrorId.equals(noForwardSource)) {
if (logger.isInfoEnabled()) {
logger.info("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId);
logger.trace("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId);
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private int credits = 0;
private AtomicInteger pending = new AtomicInteger(0);
private java.util.function.Consumer<? super MessageReference> beforeDelivery;
private java.util.function.Predicate<? super MessageReference> beforeDeliveryFiltering;
private java.util.function.Predicate<? super MessageReference> shouldFilterRef;

protected volatile Runnable afterDelivery;
protected volatile MessageWriter messageWriter = SenderController.REJECTING_MESSAGE_WRITER;
Expand Down Expand Up @@ -121,8 +121,8 @@ public ProtonServerSenderContext setBeforeDelivery(java.util.function.Consumer<?
return this;
}

public ProtonServerSenderContext setBeforeDeliveryFiltering(java.util.function.Predicate<? super MessageReference> beforeDeliveryFiltering) {
this.beforeDeliveryFiltering = beforeDeliveryFiltering;
public ProtonServerSenderContext setShouldFilterRef(java.util.function.Predicate<? super MessageReference> shouldFilterRef) {
this.shouldFilterRef = shouldFilterRef;
return this;
}

Expand Down Expand Up @@ -447,6 +447,13 @@ private boolean handleExtendedDeliveryOutcomes(Message message, Delivery deliver
return handled;
}

public boolean filterRef(MessageReference ref) {
if (shouldFilterRef != null) {
return shouldFilterRef.test(ref);
}
return false;
}

private final class ConnectionFlushIOCallback implements IOCallback {

@Override
Expand Down Expand Up @@ -481,12 +488,6 @@ public int deliverMessage(final MessageReference messageReference, final ServerC
beforeDelivery.accept(messageReference);
}

if (beforeDeliveryFiltering != null) {
if (beforeDeliveryFiltering.test(messageReference)) {
return 0;
}
}

synchronized (creditsLock) {
if (sender.getLocalState() == EndpointState.CLOSED) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,12 @@ public HandleStatus handle(final MessageReference ref) throws Exception {

return HandleStatus.NO_MATCH;
}
if (callback != null && callback.filterRef(ref, ServerConsumerImpl.this)) {
if (logger.isDebugEnabled()) {
logger.trace("Reference {} is not allowed to be consumed by {} due to message filtering.", ref, this);
}
return HandleStatus.NO_MATCH;
}

synchronized (lock) {
// If the consumer is stopped then we don't accept the message, it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,8 @@ default void close(boolean failed) {
default Transaction getCurrentTransaction() {
return null;
}

default boolean filterRef(MessageReference ref, ServerConsumer consumer) {
return false;
}
}

0 comments on commit 17dd899

Please sign in to comment.