From 27771f0146c9e555d8ae2a47b9ab8fc966fc9549 Mon Sep 17 00:00:00 2001 From: Anton Smolkov Date: Thu, 2 Jan 2025 13:51:46 +0300 Subject: [PATCH] Improve session message injection (#2117) * Improve session message injection methods * Add more tests for session injection * Change return type from MqttPublishPacket to MqttPacketWithIdentifier * Use dedicated type for MattApplicationMessage injection result * Code review minor fixes * Apply project code style * Update release notes --------- Co-authored-by: Anton Smolkov Co-authored-by: christian <6939810+chkr1011@users.noreply.github.com> --- .../MqttPendingMessagesOverflowException.cs | 19 + Source/MQTTnet.Server/Internal/MqttSession.cs | 15 +- .../Options/MqttServerOptionsBuilder.cs | 6 + .../Status/MqttSessionStatus.cs | 53 +- Source/MQTTnet.Tests/BaseTestClass.cs | 7 +- .../MQTTnet.Tests/Mockups/TestEnvironment.cs | 8 +- .../MQTTnet.Tests/Server/Injection_Tests.cs | 454 ++++++++++++++++-- .../InjectMqttApplicationMessageResult.cs | 10 + Source/MQTTnet/Internal/MqttPacketBus.cs | 8 - Source/MQTTnet/Internal/MqttPacketBusItem.cs | 8 +- Source/ReleaseNotes.md | 2 + 11 files changed, 521 insertions(+), 69 deletions(-) create mode 100644 Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs create mode 100644 Source/MQTTnet/InjectMqttApplicationMessageResult.cs diff --git a/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs b/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs new file mode 100644 index 000000000..0c045bc8c --- /dev/null +++ b/Source/MQTTnet.Server/Exceptions/MqttPendingMessagesOverflowException.cs @@ -0,0 +1,19 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace MQTTnet.Server.Exceptions; + +public class MqttPendingMessagesOverflowException : Exception +{ + public MqttPendingMessagesOverflowException(string sessionId, MqttPendingMessagesOverflowStrategy overflowStrategy) : base( + $"Send buffer max pending messages overflow occurred for session '{sessionId}'. Strategy: {overflowStrategy}.") + { + SessionId = sessionId; + OverflowStrategy = overflowStrategy; + } + + public MqttPendingMessagesOverflowStrategy OverflowStrategy { get; } + + public string SessionId { get; } +} \ No newline at end of file diff --git a/Source/MQTTnet.Server/Internal/MqttSession.cs b/Source/MQTTnet.Server/Internal/MqttSession.cs index 55814b849..7e3ab1279 100644 --- a/Source/MQTTnet.Server/Internal/MqttSession.cs +++ b/Source/MQTTnet.Server/Internal/MqttSession.cs @@ -6,6 +6,7 @@ using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; +using MQTTnet.Server.Exceptions; namespace MQTTnet.Server.Internal; @@ -111,10 +112,11 @@ public void EnqueueControlPacket(MqttPacketBusItem packetBusItem) public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem) { - if (_packetBus.ItemsCount(MqttPacketBusPartition.Data) >= _serverOptions.MaxPendingMessagesPerClient) + if (PendingDataPacketsCount >= _serverOptions.MaxPendingMessagesPerClient) { if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage) { + packetBusItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy)); return EnqueueDataPacketResult.Dropped; } @@ -123,10 +125,15 @@ public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem // Only drop from the data partition. Dropping from control partition might break the connection // because the client does not receive PINGREQ packets etc. any longer. var firstItem = _packetBus.DropFirstItem(MqttPacketBusPartition.Data); - if (firstItem != null && _eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers) + if (firstItem != null) { - var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet); - _eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false); + firstItem.Fail(new MqttPendingMessagesOverflowException(Id, _serverOptions.PendingMessagesOverflowStrategy)); + + if (_eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers) + { + var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet); + _eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false); + } } } } diff --git a/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs b/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs index 2e86e21eb..c238ea016 100644 --- a/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs +++ b/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs @@ -115,6 +115,12 @@ public MqttServerOptionsBuilder WithMaxPendingMessagesPerClient(int value) return this; } + public MqttServerOptionsBuilder WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy value) + { + _options.PendingMessagesOverflowStrategy = value; + return this; + } + public MqttServerOptionsBuilder WithoutDefaultEndpoint() { _options.DefaultEndpointOptions.IsEnabled = false; diff --git a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs index ecb8460fb..780c08130 100644 --- a/Source/MQTTnet.Server/Status/MqttSessionStatus.cs +++ b/Source/MQTTnet.Server/Status/MqttSessionStatus.cs @@ -40,22 +40,65 @@ public Task DeleteAsync() return _session.DeleteAsync(); } - public Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage) + /// + /// Delivers an application message immediately to the session. + /// + /// The application message to deliver. + /// + /// A task that represents the asynchronous operation. + /// The result contains the that includes the packet identifier of the enqueued message. + /// + public async Task DeliverApplicationMessageAsync(MqttApplicationMessage applicationMessage) { ArgumentNullException.ThrowIfNull(applicationMessage); - var packetBusItem = new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage)); + var publishPacket = MqttPublishPacketFactory.Create(applicationMessage); + var packetBusItem = new MqttPacketBusItem(publishPacket); _session.EnqueueDataPacket(packetBusItem); - return packetBusItem.WaitAsync(); + await packetBusItem.WaitAsync().ConfigureAwait(false); + + var injectResult = new InjectMqttApplicationMessageResult() + { + PacketIdentifier = publishPacket.PacketIdentifier + }; + + return injectResult; } - public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage) + /// + /// Attempts to enqueue an application message to the session's send buffer. + /// + /// The application message to enqueue. + /// that includes the packet identifier of the enqueued message. + /// true if the message was successfully enqueued; otherwise, false. + /// + /// When is set to , + /// this method always returns true. + /// However, an existing message in the queue may be dropped later to make room for the newly enqueued message. + /// Such dropped messages can be tracked by subscribing to event. + /// + public bool TryEnqueueApplicationMessage(MqttApplicationMessage applicationMessage, out InjectMqttApplicationMessageResult injectResult) { ArgumentNullException.ThrowIfNull(applicationMessage); - _session.EnqueueDataPacket(new MqttPacketBusItem(MqttPublishPacketFactory.Create(applicationMessage))); + var publishPacket = MqttPublishPacketFactory.Create(applicationMessage); + var enqueueDataPacketResult = _session.EnqueueDataPacket(new MqttPacketBusItem(publishPacket)); + + if (enqueueDataPacketResult != EnqueueDataPacketResult.Enqueued) + { + injectResult = null; + return false; + } + injectResult = new InjectMqttApplicationMessageResult() { PacketIdentifier = publishPacket.PacketIdentifier }; + return true; + } + + [Obsolete("This method is obsolete. Use TryEnqueueApplicationMessage instead.")] + public Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage) + { + TryEnqueueApplicationMessage(applicationMessage, out _); return CompletedTask.Instance; } } \ No newline at end of file diff --git a/Source/MQTTnet.Tests/BaseTestClass.cs b/Source/MQTTnet.Tests/BaseTestClass.cs index 8e5248e7f..00291b024 100644 --- a/Source/MQTTnet.Tests/BaseTestClass.cs +++ b/Source/MQTTnet.Tests/BaseTestClass.cs @@ -13,10 +13,11 @@ namespace MQTTnet.Tests public abstract class BaseTestClass { public TestContext TestContext { get; set; } - - protected TestEnvironment CreateTestEnvironment(MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) + + protected TestEnvironment CreateTestEnvironment( + MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, bool trackUnobservedTaskException = true) { - return new TestEnvironment(TestContext, protocolVersion); + return new TestEnvironment(TestContext, protocolVersion, trackUnobservedTaskException); } protected Task LongTestDelay() diff --git a/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs b/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs index 4f1391f15..6f2d8ae73 100644 --- a/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs +++ b/Source/MQTTnet.Tests/Mockups/TestEnvironment.cs @@ -32,12 +32,16 @@ public TestEnvironment() : this(null) { } - public TestEnvironment(TestContext testContext, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) + public TestEnvironment( + TestContext testContext, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, bool trackUnobservedTaskException = true) { _protocolVersion = protocolVersion; TestContext = testContext; - TaskScheduler.UnobservedTaskException += TrackUnobservedTaskException; + if (trackUnobservedTaskException) + { + TaskScheduler.UnobservedTaskException += TrackUnobservedTaskException; + } ServerLogger.LogMessagePublished += (s, e) => { diff --git a/Source/MQTTnet.Tests/Server/Injection_Tests.cs b/Source/MQTTnet.Tests/Server/Injection_Tests.cs index cefbc34dd..85ac53414 100644 --- a/Source/MQTTnet.Tests/Server/Injection_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Injection_Tests.cs @@ -1,7 +1,11 @@ +using System; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Internal; +using MQTTnet.Packets; +using MQTTnet.Protocol; using MQTTnet.Server; +using MQTTnet.Server.Exceptions; namespace MQTTnet.Tests.Server { @@ -9,79 +13,443 @@ namespace MQTTnet.Tests.Server public sealed class Injection_Tests : BaseTestClass { [TestMethod] - public async Task Inject_Application_Message_At_Session_Level() + public async Task Enqueue_Application_Message_At_Session_Level() { - using (var testEnvironment = CreateTestEnvironment()) + using var testEnvironment = CreateTestEnvironment(); + + var server = await testEnvironment.StartServer(); + var receiver1 = await testEnvironment.ConnectClient(); + var receiver2 = await testEnvironment.ConnectClient(); + var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1); + var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2); + + var status = await server.GetSessionsAsync(); + var clientStatus = status[0]; + + await receiver1.SubscribeAsync("#"); + await receiver2.SubscribeAsync("#"); + + var message = new MqttApplicationMessageBuilder() + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) + .WithTopic("InjectedOne").Build(); + + var enqueued = clientStatus.TryEnqueueApplicationMessage(message, out var injectResult); + + Assert.IsTrue(enqueued); + + await LongTestDelay(); + + Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count); + Assert.AreEqual(injectResult.PacketIdentifier, messageReceivedHandler1.ReceivedEventArgs[0].PacketIdentifier); + Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic); + + // The second receiver should NOT receive the message. + Assert.AreEqual(0, messageReceivedHandler2.ReceivedEventArgs.Count); + } + + [TestMethod] + public async Task Enqueue_Application_Message_At_Session_Level_QueueOverflow_DropNewMessageStrategy() + { + using var testEnvironment = CreateTestEnvironment(trackUnobservedTaskException: false); + + var server = await testEnvironment.StartServer( + builder => builder + .WithMaxPendingMessagesPerClient(1) + .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage)); + + var receiver = await testEnvironment.ConnectClient(); + + var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource(); + server.InterceptingOutboundPacketAsync += async args => { - var server = await testEnvironment.StartServer(); - var receiver1 = await testEnvironment.ConnectClient(); - var receiver2 = await testEnvironment.ConnectClient(); - var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1); - var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2); + // - The first message is dequeued normally and calls this delay + // - The second message fills the outbound queue + // - The third message overflows the outbound queue + if (args.Packet is MqttPublishPacket) + { + firstMessageOutboundPacketInterceptedTcs.SetResult(); + await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken); + } + }; - var status = await server.GetSessionsAsync(); - var clientStatus = status[0]; + var firstMessageEvicted = false; + var secondMessageEvicted = false; + var thirdMessageEvicted = false; - await receiver1.SubscribeAsync("#"); - await receiver2.SubscribeAsync("#"); + server.QueuedApplicationMessageOverwrittenAsync += args => + { + if (args.Packet is not MqttPublishPacket publishPacket) + { + return Task.CompletedTask; + } + + switch (publishPacket.Topic) + { + case "InjectedOne": + firstMessageEvicted = true; + break; + case "InjectedTwo": + secondMessageEvicted = true; + break; + case "InjectedThree": + thirdMessageEvicted = true; + break; + } - await clientStatus.EnqueueApplicationMessageAsync(new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build()); + return Task.CompletedTask; + }; - await LongTestDelay(); + var status = await server.GetSessionsAsync(); + var clientStatus = status[0]; + await receiver.SubscribeAsync("#"); - Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count); - Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic); + var firstMessageEnqueued = clientStatus.TryEnqueueApplicationMessage( + new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(), out _); + await firstMessageOutboundPacketInterceptedTcs.Task; - // The second receiver should NOT receive the message. - Assert.AreEqual(0, messageReceivedHandler2.ReceivedEventArgs.Count); - } + var secondMessageEnqueued = clientStatus.TryEnqueueApplicationMessage( + new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build(), out _); + + var thirdMessageEnqueued = clientStatus.TryEnqueueApplicationMessage( + new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build(), out _); + + // Due to the DropNewMessage strategy the third message will not be enqueued. + // As a result, no existing messages in the queue will be dropped (evicted). + Assert.IsTrue(firstMessageEnqueued); + Assert.IsTrue(secondMessageEnqueued); + Assert.IsFalse(thirdMessageEnqueued); + + Assert.IsFalse(firstMessageEvicted); + Assert.IsFalse(secondMessageEvicted); + Assert.IsFalse(thirdMessageEvicted); } + [TestMethod] - public async Task Inject_ApplicationMessage_At_Server_Level() + public async Task Enqueue_Application_Message_At_Session_Level_QueueOverflow_DropOldestQueuedMessageStrategy() { - using (var testEnvironment = CreateTestEnvironment()) + using var testEnvironment = CreateTestEnvironment(trackUnobservedTaskException: false); + + var server = await testEnvironment.StartServer( + builder => builder + .WithMaxPendingMessagesPerClient(1) + .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)); + + var receiver = await testEnvironment.ConnectClient(); + + var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource(); + server.InterceptingOutboundPacketAsync += async args => { - var server = await testEnvironment.StartServer(); + // - The first message is dequeued normally and calls this delay + // - The second message fills the outbound queue + // - The third message overflows the outbound queue + if (args.Packet is MqttPublishPacket) + { + firstMessageOutboundPacketInterceptedTcs.SetResult(); + await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken); + } + }; + + var firstMessageEvicted = false; + var secondMessageEvicted = false; + var thirdMessageEvicted = false; + + server.QueuedApplicationMessageOverwrittenAsync += args => + { + if (args.Packet is not MqttPublishPacket publishPacket) + { + return Task.CompletedTask; + } - var receiver = await testEnvironment.ConnectClient(); + switch (publishPacket.Topic) + { + case "InjectedOne": + firstMessageEvicted = true; + break; + case "InjectedTwo": + secondMessageEvicted = true; + break; + case "InjectedThree": + thirdMessageEvicted = true; + break; + } + + return Task.CompletedTask; + }; - var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver); + var status = await server.GetSessionsAsync(); + var clientStatus = status[0]; + await receiver.SubscribeAsync("#"); - await receiver.SubscribeAsync("#"); + var firstMessageEnqueued = clientStatus.TryEnqueueApplicationMessage( + new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(), out _); + await firstMessageOutboundPacketInterceptedTcs.Task; - var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + var secondMessageEnqueued = clientStatus.TryEnqueueApplicationMessage( + new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build(), out _); - await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage)); + var thirdMessageEnqueued = clientStatus.TryEnqueueApplicationMessage( + new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build(), out _); - await LongTestDelay(); + // Due to the DropOldestQueuedMessage strategy, all messages will be enqueued initially. + // But the second message will eventually be dropped (evicted) to make room for the third one. + Assert.IsTrue(firstMessageEnqueued); + Assert.IsTrue(secondMessageEnqueued); + Assert.IsTrue(thirdMessageEnqueued); - Assert.AreEqual(1, messageReceivedHandler.ReceivedEventArgs.Count); - Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic); - } + Assert.IsFalse(firstMessageEvicted); + Assert.IsTrue(secondMessageEvicted); + Assert.IsFalse(thirdMessageEvicted); } [TestMethod] - public async Task Intercept_Injected_Application_Message() + public async Task Deliver_Application_Message_At_Session_Level() + { + using var testEnvironment = CreateTestEnvironment(); + + var server = await testEnvironment.StartServer(); + var receiver1 = await testEnvironment.ConnectClient(); + var receiver2 = await testEnvironment.ConnectClient(); + var messageReceivedHandler1 = testEnvironment.CreateApplicationMessageHandler(receiver1); + var messageReceivedHandler2 = testEnvironment.CreateApplicationMessageHandler(receiver2); + + var status = await server.GetSessionsAsync(); + var clientStatus = status[0]; + + await receiver1.SubscribeAsync("#"); + await receiver2.SubscribeAsync("#"); + + var mqttApplicationMessage = new MqttApplicationMessageBuilder() + .WithTopic("InjectedOne") + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) + .Build(); + var injectResult = await clientStatus.DeliverApplicationMessageAsync(mqttApplicationMessage); + + await LongTestDelay(); + + Assert.AreEqual(1, messageReceivedHandler1.ReceivedEventArgs.Count); + Assert.AreEqual(injectResult.PacketIdentifier, messageReceivedHandler1.ReceivedEventArgs[0].PacketIdentifier); + Assert.AreEqual("InjectedOne", messageReceivedHandler1.ReceivedEventArgs[0].ApplicationMessage.Topic); + + // The second receiver should NOT receive the message. + Assert.AreEqual(0, messageReceivedHandler2.ReceivedEventArgs.Count); + } + + [TestMethod] + public async Task Deliver_Application_Message_At_Session_Level_QueueOverflow_DropNewMessageStrategy() + { + using var testEnvironment = CreateTestEnvironment(); + + var server = await testEnvironment.StartServer( + builder => builder + .WithMaxPendingMessagesPerClient(1) + .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage)); + + var receiver = await testEnvironment.ConnectClient(); + + var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource(); + server.InterceptingOutboundPacketAsync += async args => + { + // - The first message is dequeued normally and calls this delay + // - The second message fills the outbound queue + // - The third message overflows the outbound queue + if (args.Packet is MqttPublishPacket) + { + firstMessageOutboundPacketInterceptedTcs.SetResult(); + await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken); + } + }; + + var firstMessageEvicted = false; + var secondMessageEvicted = false; + var thirdMessageEvicted = false; + + server.QueuedApplicationMessageOverwrittenAsync += args => + { + if (args.Packet is not MqttPublishPacket publishPacket) + { + return Task.CompletedTask; + } + + switch (publishPacket.Topic) + { + case "InjectedOne": + firstMessageEvicted = true; + break; + case "InjectedTwo": + secondMessageEvicted = true; + break; + case "InjectedThree": + thirdMessageEvicted = true; + break; + } + + return Task.CompletedTask; + }; + + var status = await server.GetSessionsAsync(); + var clientStatus = status[0]; + await receiver.SubscribeAsync("#"); + + var firstMessageTask = Task.Run( + () => clientStatus.DeliverApplicationMessageAsync( + new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build())); + await LongTestDelay(); + await firstMessageOutboundPacketInterceptedTcs.Task; + + var secondMessageTask = Task.Run( + () => clientStatus.DeliverApplicationMessageAsync( + new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build())); + await LongTestDelay(); + + var thirdMessageTask = Task.Run( + () => clientStatus.DeliverApplicationMessageAsync( + new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build())); + await LongTestDelay(); + + Task.WaitAny(firstMessageTask, secondMessageTask, thirdMessageTask); + + // Due to the DropNewMessage strategy the third message delivery will fail. + // As a result, no existing messages in the queue will be dropped (evicted). + Assert.AreEqual(firstMessageTask.Status, TaskStatus.WaitingForActivation); + Assert.AreEqual(secondMessageTask.Status, TaskStatus.WaitingForActivation); + Assert.AreEqual(thirdMessageTask.Status, TaskStatus.Faulted); + Assert.IsTrue(thirdMessageTask.Exception?.InnerException is MqttPendingMessagesOverflowException); + + Assert.IsFalse(firstMessageEvicted); + Assert.IsFalse(secondMessageEvicted); + Assert.IsFalse(thirdMessageEvicted); + } + + [TestMethod] + public async Task Deliver_Application_Message_At_Session_Level_QueueOverflow_DropOldestQueuedMessageStrategy() { - using (var testEnvironment = CreateTestEnvironment()) + using var testEnvironment = CreateTestEnvironment(trackUnobservedTaskException: false); + + var server = await testEnvironment.StartServer( + builder => builder + .WithMaxPendingMessagesPerClient(1) + .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)); + + var receiver = await testEnvironment.ConnectClient(); + + var firstMessageOutboundPacketInterceptedTcs = new TaskCompletionSource(); + server.InterceptingOutboundPacketAsync += async args => { - var server = await testEnvironment.StartServer(); + // - The first message is dequeued normally and calls this delay + // - The second message fills the outbound queue + // - The third message overflows the outbound queue + if (args.Packet is MqttPublishPacket) + { + firstMessageOutboundPacketInterceptedTcs.SetResult(); + await Task.Delay(TimeSpan.FromDays(1), args.CancellationToken); + } + }; - MqttApplicationMessage interceptedMessage = null; - server.InterceptingPublishAsync += eventArgs => + var firstMessageEvicted = false; + var secondMessageEvicted = false; + var thirdMessageEvicted = false; + + server.QueuedApplicationMessageOverwrittenAsync += args => + { + if (args.Packet is not MqttPublishPacket publishPacket) + { + return Task.CompletedTask; + } + + switch (publishPacket.Topic) { - interceptedMessage = eventArgs.ApplicationMessage; - return CompletedTask.Instance; - }; + case "InjectedOne": + firstMessageEvicted = true; + break; + case "InjectedTwo": + secondMessageEvicted = true; + break; + case "InjectedThree": + thirdMessageEvicted = true; + break; + } + + return Task.CompletedTask; + }; + + var status = await server.GetSessionsAsync(); + var clientStatus = status[0]; + await receiver.SubscribeAsync("#"); + + var firstMessageTask = Task.Run( + () => clientStatus.DeliverApplicationMessageAsync( + new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build())); + await LongTestDelay(); + await firstMessageOutboundPacketInterceptedTcs.Task; + + var secondMessageTask = Task.Run( + () => clientStatus.DeliverApplicationMessageAsync( + new MqttApplicationMessageBuilder().WithTopic("InjectedTwo").Build())); + await LongTestDelay(); + + var thirdMessageTask = Task.Run( + () => clientStatus.DeliverApplicationMessageAsync( + new MqttApplicationMessageBuilder().WithTopic("InjectedThree").Build())); + await LongTestDelay(); + + Task.WaitAny(firstMessageTask, secondMessageTask, thirdMessageTask); + + // Due to the DropOldestQueuedMessage strategy, the second message delivery will fail + // to make room for the third one. + Assert.AreEqual(firstMessageTask.Status, TaskStatus.WaitingForActivation); + Assert.AreEqual(secondMessageTask.Status, TaskStatus.Faulted); + Assert.IsTrue(secondMessageTask.Exception?.InnerException is MqttPendingMessagesOverflowException); + Assert.AreEqual(thirdMessageTask.Status, TaskStatus.WaitingForActivation); + + Assert.IsFalse(firstMessageEvicted); + Assert.IsTrue(secondMessageEvicted); + Assert.IsFalse(thirdMessageEvicted); + } + + [TestMethod] + public async Task Inject_ApplicationMessage_At_Server_Level() + { + using var testEnvironment = CreateTestEnvironment(); + + var server = await testEnvironment.StartServer(); + + var receiver = await testEnvironment.ConnectClient(); + + var messageReceivedHandler = testEnvironment.CreateApplicationMessageHandler(receiver); + + await receiver.SubscribeAsync("#"); + + var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + + await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage)); + + await LongTestDelay(); + + Assert.AreEqual(1, messageReceivedHandler.ReceivedEventArgs.Count); + Assert.AreEqual("InjectedOne", messageReceivedHandler.ReceivedEventArgs[0].ApplicationMessage.Topic); + } + + [TestMethod] + public async Task Intercept_Injected_Application_Message() + { + using var testEnvironment = CreateTestEnvironment(); + + var server = await testEnvironment.StartServer(); + + MqttApplicationMessage interceptedMessage = null; + server.InterceptingPublishAsync += eventArgs => + { + interceptedMessage = eventArgs.ApplicationMessage; + return CompletedTask.Instance; + }; - var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); - await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage)); + var injectedApplicationMessage = new MqttApplicationMessageBuilder().WithTopic("InjectedOne").Build(); + await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(injectedApplicationMessage)); - await LongTestDelay(); + await LongTestDelay(); - Assert.IsNotNull(interceptedMessage); - } + Assert.IsNotNull(interceptedMessage); } } } \ No newline at end of file diff --git a/Source/MQTTnet/InjectMqttApplicationMessageResult.cs b/Source/MQTTnet/InjectMqttApplicationMessageResult.cs new file mode 100644 index 000000000..b2e6da802 --- /dev/null +++ b/Source/MQTTnet/InjectMqttApplicationMessageResult.cs @@ -0,0 +1,10 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace MQTTnet; + +public class InjectMqttApplicationMessageResult +{ + public ushort PacketIdentifier { get; init; } +} \ No newline at end of file diff --git a/Source/MQTTnet/Internal/MqttPacketBus.cs b/Source/MQTTnet/Internal/MqttPacketBus.cs index 8aec37565..46e2b3493 100644 --- a/Source/MQTTnet/Internal/MqttPacketBus.cs +++ b/Source/MQTTnet/Internal/MqttPacketBus.cs @@ -141,14 +141,6 @@ public List ExportPackets(MqttPacketBusPartition partition) } } - public int ItemsCount(MqttPacketBusPartition partition) - { - lock (_syncRoot) - { - return _partitions[(int)partition].Count; - } - } - public int PartitionItemsCount(MqttPacketBusPartition partition) { lock (_syncRoot) diff --git a/Source/MQTTnet/Internal/MqttPacketBusItem.cs b/Source/MQTTnet/Internal/MqttPacketBusItem.cs index b94654cf5..6fb5b114d 100644 --- a/Source/MQTTnet/Internal/MqttPacketBusItem.cs +++ b/Source/MQTTnet/Internal/MqttPacketBusItem.cs @@ -10,8 +10,8 @@ namespace MQTTnet.Internal { public sealed class MqttPacketBusItem { - readonly AsyncTaskCompletionSource _promise = new AsyncTaskCompletionSource(); - + readonly AsyncTaskCompletionSource _promise = new AsyncTaskCompletionSource(); + public MqttPacketBusItem(MqttPacket packet) { Packet = packet ?? throw new ArgumentNullException(nameof(packet)); @@ -28,7 +28,7 @@ public void Cancel() public void Complete() { - _promise.TrySetResult(true); + _promise.TrySetResult(Packet); Completed?.Invoke(this, EventArgs.Empty); } @@ -37,7 +37,7 @@ public void Fail(Exception exception) _promise.TrySetException(exception); } - public Task WaitAsync() + public Task WaitAsync() { return _promise.Task; } diff --git a/Source/ReleaseNotes.md b/Source/ReleaseNotes.md index 7c70587e3..e68ff4c39 100644 --- a/Source/ReleaseNotes.md +++ b/Source/ReleaseNotes.md @@ -9,6 +9,7 @@ * Removal of Managed Client **(BREAKING CHANGE)** * Fixed missing release notes in nuget packages. + * Client: MQTT 5.0.0 is now the default version when connecting with a server **(BREAKING CHANGE)** * Client: Fixed enhanced authentication. * Client: Exposed WebSocket compression options in MQTT client options (thanks to @victornor, #2127) @@ -19,3 +20,4 @@ * Server: Set SSL version to "None" which will let the OS choose the version **(BREAKING CHANGE)** * Server: Added API for getting a single session (thanks to @AntonSmolkov, #2131) * Server: Fixed "TryPrivate" (Mosquitto feature) handling (thanks to @victornor, #2125) **(BREAKING CHANGE)** +* Server: Fixed dead lock when awaiting a packet transmission but the packet gets dropped due to quotas (#2117, thanks to @AntonSmolkov)