From 68ad100bf1f4cc1ebfaf60f10357d1c073b1adb9 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 13 Nov 2024 22:19:09 -0500 Subject: [PATCH] ARTEMIS-5155 Send of AMQP large messages should close the LargeMessageReader before proceeding with the message delivery This is fixing LargeMessageInterruptTest --- .../amqp/proton/ProtonAbstractReceiver.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java index 188eb9eec07..35896575fd1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java @@ -345,25 +345,25 @@ public void onMessageComplete(Delivery delivery, Message message, DeliveryAnnotations deliveryAnnotations) { connection.requireInHandler(); - try { - receiver.advance(); - - Transaction tx = null; - if (delivery.getRemoteState() instanceof TransactionalState) { - TransactionalState txState = (TransactionalState) delivery.getRemoteState(); - try { - tx = this.sessionSPI.getTransaction(txState.getTxnId(), false); - } catch (Exception e) { - this.onExceptionWhileReading(e); - } - } + // we must call messageReader.close() before actualDelivery is called. + // as a network disconnect or any network exceptions would destroy any current pendingLargeMessage. + // so we must set the reader as complete before proceeding with actualDelivery. + this.messageReader.close(); + this.messageReader = null; - actualDelivery(message, delivery, deliveryAnnotations, receiver, tx); - } finally { - // reader is complete, we give it up now - this.messageReader.close(); - this.messageReader = null; + receiver.advance(); + + Transaction tx = null; + if (delivery.getRemoteState() instanceof TransactionalState) { + TransactionalState txState = (TransactionalState) delivery.getRemoteState(); + try { + tx = this.sessionSPI.getTransaction(txState.getTxnId(), false); + } catch (Exception e) { + this.onExceptionWhileReading(e); + } } + + actualDelivery(message, delivery, deliveryAnnotations, receiver, tx); } @Override