From c9be67287d04db1ab9c8332072a5dbad8ce11941 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 18 Nov 2024 17:45:59 -0500 Subject: [PATCH] ARTEMIS-5155 Race on AMQP large message read and close When the final frame of a large message is being written to the file in the session thread and an IO error occurs such that that connection is torn down, the large message reader can be closed before the message is fully processed resulting in corruption. The large message file close logic needs to occur on the session thread so that the processing of the bytes can finish and the message gets added to the Queue and the close can react by not deleting the file when it runs following the read task. --- .../amqp/proton/AMQPLargeMessageReader.java | 41 +++++++++++++++---- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java index 63af7b1418e..04d786d7e73 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.protocol.amqp.proton; +import java.lang.invoke.MethodHandles; + import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage; @@ -25,6 +27,8 @@ import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reader of {@link AMQPLargeMessage} content which reads all bytes and completes once a @@ -32,6 +36,8 @@ */ public class AMQPLargeMessageReader implements MessageReader { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final ProtonAbstractReceiver serverReceiver; private volatile AMQPLargeMessage currentMessage; @@ -51,14 +57,27 @@ public DeliveryAnnotations getDeliveryAnnotations() { public void close() { if (!closed) { try { - AMQPLargeMessage localCurrentMessage = currentMessage; - if (localCurrentMessage != null) { - localCurrentMessage.deleteFile(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + + if (currentMessage != null) { + sessionSPI.execute(() -> { + // Run the file delete on the session thread, this allows processing of the + // last addBytes to complete which might allow the message to be fully read + // in which case currentMessage will be nulled and we won't delete it as it + // will have already been handed to the connection thread for enqueue. + if (currentMessage != null) { + try { + currentMessage.deleteFile(); + } catch (Throwable error) { + ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); + } finally { + currentMessage = null; + } + } + }); } - } catch (Throwable error) { - ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); - } finally { - currentMessage = null; + } catch (Exception ex) { + logger.trace("AMQP Large Message reader close ignored error: ", ex); } deliveryAnnotations = null; @@ -117,6 +136,14 @@ public Message readBytes(Delivery delivery) throws Exception { private void addBytes(Delivery delivery, ReadableBuffer dataBuffer, boolean isPartial) { final AMQPLargeMessage localCurrentMessage = currentMessage; + // Add bytes runs on the session thread and if the close is called and the scheduled file + // delete occurs on the session thread first then current message will be null and we return. + // But if the closed delete hasn't run first we can safely continue processing this message + // in hopes we already read all the bytes before the connection was dropped. + if (localCurrentMessage == null) { + return; + } + try { localCurrentMessage.addBytes(dataBuffer);