diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java index 9408f1677a40..f83cfa760d79 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java @@ -279,6 +279,12 @@ public ActiveMQException createException(String msg) { public ActiveMQException createException(String msg) { return new ActiveMQTimeoutException(msg); } + }, + INVALID_MESSAGE_EXCEPTION(224) { + @Override + public ActiveMQException createException(String msg) { + return new ActiveMQInvalidMessageException(msg); + } }; private static final Map TYPE_MAP; diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidMessageException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidMessageException.java new file mode 100644 index 000000000000..11be3c2e7453 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidMessageException.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core; + +public class ActiveMQInvalidMessageException extends ActiveMQException { + + public ActiveMQInvalidMessageException(String message) { + super(ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION, message); + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 2c6beaefda64..b21bbe47eb22 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -173,6 +173,11 @@ public interface Message { */ SimpleString HDR_INGRESS_TIMESTAMP = SimpleString.of("_AMQ_INGRESS_TIMESTAMP"); + /** + * This gives extra information as to why the messages is sent to DLQ + */ + SimpleString HDR_ROUTE_DLQ_DETAIL = SimpleString.of("_AMQ_DLQ_DETAIL"); + /** * The prefix used (if any) when sending this message. For protocols (e.g. STOMP) that need to track this and restore * the prefix when the message is consumed. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 0064bac1d1fc..e7bf4c06a32b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -559,4 +559,6 @@ IllegalStateException invalidRoutingTypeUpdate(String queueName, @Message(id = 229255, value = "Bridge {} cannot be {}. Current state: {}") ActiveMQIllegalStateException bridgeOperationCannotBeExecuted(String bridgeName, String failedOp, BridgeImpl.State currentState); + @Message(id = 229256, value = "Missing header {}") + String messageMissingHeader(SimpleString idsHeaderName); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 33e2887a1286..c0af55ba8888 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -641,9 +641,8 @@ void slowConsumerDetected(String sessionID, @LogMessage(id = 222109, value = "Timed out waiting for write lock on consumer {} from {}. Check the Thread dump", level = LogMessage.Level.WARN) void timeoutLockingConsumer(String consumer, String remoteAddress); - @LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN) + @LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, props={}", level = LogMessage.Level.WARN) void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message, - org.apache.activemq.artemis.api.core.Message messageCopy, SimpleString idsHeaderName); @LogMessage(id = 222111, value = "exception while invoking {} on {}", level = LogMessage.Level.TRACE) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 01ded75a40d5..0b49e43d1dbe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -506,7 +506,7 @@ public void failed(Throwable t) { } /* Hook for processing message before forwarding */ - protected Message beforeForward(Message message, final SimpleString forwardingAddress) { + protected Message beforeForward(Message message, final SimpleString forwardingAddress) throws ActiveMQException { message = message.copy(); ((RefCountMessage)message).setParentRef((RefCountMessage)message); @@ -605,7 +605,19 @@ public HandleStatus handle(final MessageReference ref) throws Exception { dest = ref.getMessage().getAddressSimpleString(); } - final Message message = beforeForward(ref.getMessage(), dest); + final Message message; + try { + message = beforeForward(ref.getMessage(), dest); + } catch (ActiveMQException ex) { + if (ex.getType() == ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION) { + ref.getMessage().putStringProperty(Message.HDR_ROUTE_DLQ_DETAIL, SimpleString.of(ex.getMessage())); + ref.getQueue().sendToDeadLetterAddress(null, ref); + refs.remove(ref.getMessageID()); + return HandleStatus.HANDLED; + } else { + throw ex; + } + } pendingAcks.countUp(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index 3cedd46da706..63f122887f37 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.BindingType; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; @@ -185,7 +186,7 @@ protected ClientSessionFactoryInternal createSessionFactory() throws Exception { } @Override - protected Message beforeForward(final Message message, final SimpleString forwardingAddress) { + protected Message beforeForward(final Message message, final SimpleString forwardingAddress) throws ActiveMQException { // We make a copy of the message, then we strip out the unwanted routing id headers and leave // only // the one pertinent for the address node - this is important since different queues on different @@ -200,11 +201,9 @@ protected Message beforeForward(final Message message, final SimpleString forwar Set propNames = new HashSet<>(messageCopy.getPropertyNames()); byte[] queueIds = message.getExtraBytesProperty(idsHeaderName); - if (queueIds == null) { - // Sanity check only - ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy, idsHeaderName); - throw new IllegalStateException("no queueIDs defined"); + ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, idsHeaderName); + throw ActiveMQExceptionType.createException(ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION.getCode(), ActiveMQMessageBundle.BUNDLE.messageMissingHeader(idsHeaderName)); } for (SimpleString propName : propNames) { diff --git a/docs/user-manual/clusters.adoc b/docs/user-manual/clusters.adoc index 6312a287bdb0..53294374ce07 100644 --- a/docs/user-manual/clusters.adoc +++ b/docs/user-manual/clusters.adoc @@ -715,6 +715,11 @@ The default value is `-1`. It often makes sense to introduce a delay before redistributing as it's a common case that a consumer closes but another one quickly is created on the same queue, in such a case you probably don't want to redistribute immediately since the new consumer will arrive shortly. +[WARNING] +==== +The broker uses internal store and forward queues to handle message redistribution. Be aware that any clients should not directly send messages to the sore and forward queues. If a client sends messages to a store and forward queue, the messages will be sent to dead letter address. If security is enabled, make sure the clients do not have `send` permission on any store and forward queues. (The name pattern for a store and forward queue is .sf.. where the default internal-naming-prefix is `$.activemq.internal`, the cluster-name is the name of the cluster-connection, and the nodeID is the target node's ID) +==== + == Cluster topologies Apache ActiveMQ Artemis clusters can be connected together in many different topologies, let's consider the two most common ones here diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java index 62fecf764db9..fdceade37c6c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor; @@ -413,6 +414,106 @@ public void testPauseAddressBlockingSnFQueue() throws Exception { stopServers(0, 1); } + @Test + public void testBadClientSendMessagesToSnFQueue() throws Exception { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + + String dla = "DLA"; + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setDeadLetterAddress(SimpleString.of(dla)); + + servers[0].getAddressSettingsRepository().addMatch("#", addressSettings); + servers[1].getAddressSettingsRepository().addMatch("#", addressSettings); + + startServers(0, 1); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + createQueue(0, dla, dla, null, true); + createQueue(1, dla, dla, null, true); + + waitForBindings(0, dla, 1, 0, true); + waitForBindings(1, dla, 1, 0, true); + + ClientSession session0 = sfs[0].createSession(); + ClientSession session1 = sfs[1].createSession(); + + session0.start(); + session1.start(); + + final int num = 10; + + SimpleString nodeId1 = servers[1].getNodeID(); + ClusterConnectionImpl cc0 = (ClusterConnectionImpl) servers[0].getClusterManager().getClusterConnection("cluster0"); + SimpleString snfQueue0 = cc0.getSfQueueName(nodeId1.toString()); + + ClientProducer badProducer0 = session0.createProducer(snfQueue0); + for (int i = 0; i < num; i++) { + Message msg = session0.createMessage(true); + msg.putStringProperty("origin", "from producer 0"); + badProducer0.send(msg); + } + + //add a remote queue and consumer to enable message to flow from node 0 to node 1 + createQueue(1, "queues.testaddress", "queue0", null, true); + ClientConsumer consumer1 = session1.createConsumer("queue0"); + + waitForBindings(0, "queues.testaddress", 0, 0, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + + waitForBindings(0, "queues.testaddress", 1, 1, false); + waitForBindings(1, "queues.testaddress", 0, 0, false); + + ClientConsumer dlqConsumer = session0.createConsumer(dla); + + for (int i = 0; i < num; i++) { + Message msg = session0.createMessage(true); + msg.putStringProperty("origin", "from producer 0"); + badProducer0.send(msg); + } + + //messages will never reache the consumer + assertNull(consumer1.receiveImmediate()); + + SimpleString idHeadersName = Message.HDR_ROUTE_TO_IDS.concat(snfQueue0); + for (int i = 0; i < num * 2; i++) { + ClientMessage m = dlqConsumer.receive(5000); + assertNotNull(m); + String propValue = m.getStringProperty("origin"); + assertEquals("from producer 0", propValue); + propValue = m.getStringProperty(Message.HDR_ROUTE_DLQ_DETAIL); + assertEquals(ActiveMQMessageBundle.BUNDLE.messageMissingHeader(idHeadersName), propValue); + m.acknowledge(); + } + assertNull(dlqConsumer.receiveImmediate()); + + //normal message flow should work + ClientProducer goodProducer0 = session0.createProducer("queues.testaddress"); + for (int i = 0; i < num; i++) { + Message msg = session0.createMessage(true); + msg.putStringProperty("origin", "from producer 0"); + goodProducer0.send(msg); + } + + //consumer1 can receive from node0 + for (int i = 0; i < num; i++) { + ClientMessage m = consumer1.receive(5000); + assertNotNull(m); + String propValue = m.getStringProperty("origin"); + assertEquals("from producer 0", propValue); + m.acknowledge(); + } + assertNull(consumer1.receiveImmediate()); + + stopServers(0, 1); + } + @Override @AfterEach public void tearDown() throws Exception {