diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java index 63af7b1418e..04d786d7e73 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.protocol.amqp.proton; +import java.lang.invoke.MethodHandles; + import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage; @@ -25,6 +27,8 @@ import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reader of {@link AMQPLargeMessage} content which reads all bytes and completes once a @@ -32,6 +36,8 @@ */ public class AMQPLargeMessageReader implements MessageReader { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final ProtonAbstractReceiver serverReceiver; private volatile AMQPLargeMessage currentMessage; @@ -51,14 +57,27 @@ public DeliveryAnnotations getDeliveryAnnotations() { public void close() { if (!closed) { try { - AMQPLargeMessage localCurrentMessage = currentMessage; - if (localCurrentMessage != null) { - localCurrentMessage.deleteFile(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + + if (currentMessage != null) { + sessionSPI.execute(() -> { + // Run the file delete on the session thread, this allows processing of the + // last addBytes to complete which might allow the message to be fully read + // in which case currentMessage will be nulled and we won't delete it as it + // will have already been handed to the connection thread for enqueue. + if (currentMessage != null) { + try { + currentMessage.deleteFile(); + } catch (Throwable error) { + ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); + } finally { + currentMessage = null; + } + } + }); } - } catch (Throwable error) { - ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); - } finally { - currentMessage = null; + } catch (Exception ex) { + logger.trace("AMQP Large Message reader close ignored error: ", ex); } deliveryAnnotations = null; @@ -117,6 +136,14 @@ public Message readBytes(Delivery delivery) throws Exception { private void addBytes(Delivery delivery, ReadableBuffer dataBuffer, boolean isPartial) { final AMQPLargeMessage localCurrentMessage = currentMessage; + // Add bytes runs on the session thread and if the close is called and the scheduled file + // delete occurs on the session thread first then current message will be null and we return. + // But if the closed delete hasn't run first we can safely continue processing this message + // in hopes we already read all the bytes before the connection was dropped. + if (localCurrentMessage == null) { + return; + } + try { localCurrentMessage.addBytes(dataBuffer);