From ae84aa43d25fea1f60d9e67d17c07983643bfc55 Mon Sep 17 00:00:00 2001 From: Christian <6939810+chkr1011@users.noreply.github.com> Date: Sat, 25 Nov 2023 16:17:16 +0100 Subject: [PATCH] 1846 server mqttserverstopasync doesnt indicate correct reason (#1872) --- .github/workflows/ReleaseNotes.md | 7 +- Source/MQTTnet.AspnetCore/MqttHostedServer.cs | 20 +++- .../ServiceCollectionExtensions.cs | 110 +++++++++++------- .../LowLevelMqttClient_Tests.cs | 1 + .../Clients/MqttClient/MqttClient_Tests.cs | 1 + Source/MQTTnet.Tests/MQTTv5/Server_Tests.cs | 1 + Source/MQTTnet.Tests/Server/General.cs | 8 +- .../MQTTnet.Tests/Server/Publishing_Tests.cs | 1 + Source/MQTTnet.Tests/Server/Session_Tests.cs | 2 +- Source/MQTTnet.Tests/Server/Tls_Tests.cs | 20 ++-- .../Formatter/MqttDisconnectPacketFactory.cs | 86 ++++++++++++-- Source/MQTTnet/MQTTnet.csproj.DotSettings | 1 + Source/MQTTnet/MqttFactory.cs | 11 ++ .../MqttServerClientDisconnectOptions.cs | 33 ++++++ ...qttServerClientDisconnectOptionsBuilder.cs | 55 +++++++++ Source/MQTTnet/Server/Internal/MqttClient.cs | 41 ++++--- .../Internal/MqttClientSessionsManager.cs | 30 +++-- .../Server/Internal/MqttClientStatistics.cs | 2 +- .../Internal/MqttServerKeepAliveMonitor.cs | 21 +++- Source/MQTTnet/Server/MqttServer.cs | 60 +++++++--- Source/MQTTnet/Server/MqttServerExtensions.cs | 21 ++++ .../MQTTnet/Server/Status/MqttClientStatus.cs | 41 ++++--- .../Status/MqttClientStatusExtensions.cs | 32 +++++ .../Server/Stopping/MqttServerStopOptions.cs | 24 ++++ .../Stopping/MqttServerStopOptionsBuilder.cs | 39 +++++++ 25 files changed, 521 insertions(+), 147 deletions(-) create mode 100644 Source/MQTTnet/Server/Disconnecting/MqttServerClientDisconnectOptions.cs create mode 100644 Source/MQTTnet/Server/Disconnecting/MqttServerClientDisconnectOptionsBuilder.cs create mode 100644 Source/MQTTnet/Server/Status/MqttClientStatusExtensions.cs create mode 100644 Source/MQTTnet/Server/Stopping/MqttServerStopOptions.cs create mode 100644 Source/MQTTnet/Server/Stopping/MqttServerStopOptionsBuilder.cs diff --git a/.github/workflows/ReleaseNotes.md b/.github/workflows/ReleaseNotes.md index b6c54605f..fe734924a 100644 --- a/.github/workflows/ReleaseNotes.md +++ b/.github/workflows/ReleaseNotes.md @@ -1,4 +1,7 @@ -* [Server] Fixed not working _UpdateRetainedMessageAsync_ public api (#1858, thanks to @kimdiego2098). +* [Client] Added support for custom CA chain validation (#1851, thanks to @rido-min). * [Client] Added support for custom CA chain validation (#1851, thanks to @rido-min). * [Client] Fixed handling of unobserved tasks exceptions (#1871). -* [Client] Fixed not specified ReasonCode when using _SendExtendedAuthenticationExchangeDataAsync_ (#1882, thanks to @rido-min). \ No newline at end of file +* [Client] Fixed not specified ReasonCode when using _SendExtendedAuthenticationExchangeDataAsync_ (#1882, thanks to @rido-min). +* [Server] Fixed not working _UpdateRetainedMessageAsync_ public api (#1858, thanks to @kimdiego2098). +* [Server] Added support for custom DISCONNECT packets when stopping the server or disconnect a client (BREAKING CHANGE!, #1846). +* [Server] Added new property to stop the server from accepting new connections even if it is running (#1846). diff --git a/Source/MQTTnet.AspnetCore/MqttHostedServer.cs b/Source/MQTTnet.AspnetCore/MqttHostedServer.cs index 81d0eee7b..c6c64ffd3 100644 --- a/Source/MQTTnet.AspnetCore/MqttHostedServer.cs +++ b/Source/MQTTnet.AspnetCore/MqttHostedServer.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -14,20 +15,27 @@ namespace MQTTnet.AspNetCore { public sealed class MqttHostedServer : MqttServer, IHostedService { - public MqttHostedServer(MqttServerOptions options, IEnumerable adapters, IMqttNetLogger logger) - : base(options, adapters, logger) + readonly MqttFactory _mqttFactory; + + public MqttHostedServer(MqttFactory mqttFactory, MqttServerOptions options, IEnumerable adapters, IMqttNetLogger logger) : base( + options, + adapters, + logger) { + _mqttFactory = mqttFactory ?? throw new ArgumentNullException(nameof(mqttFactory)); } - public Task StartAsync(CancellationToken cancellationToken) + public async Task StartAsync(CancellationToken cancellationToken) { - _ = StartAsync(); - return Task.CompletedTask; + // The yield makes sure that the hosted service is considered up and running. + await Task.Yield(); + + _ = StartAsync(); } public Task StopAsync(CancellationToken cancellationToken) { - return StopAsync(); + return StopAsync(_mqttFactory.CreateMqttServerStopOptionsBuilder().Build()); } } } \ No newline at end of file diff --git a/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs b/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs index 90f251574..9b46ad19f 100644 --- a/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs +++ b/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs @@ -4,7 +4,7 @@ using System; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.DependencyInjection.Extensions; using MQTTnet.Adapter; using MQTTnet.Diagnostics; using MQTTnet.Implementations; @@ -14,76 +14,102 @@ namespace MQTTnet.AspNetCore { public static class ServiceCollectionExtensions { - public static IServiceCollection AddMqttServer(this IServiceCollection serviceCollection, Action configure = null) + public static IServiceCollection AddHostedMqttServer(this IServiceCollection services, MqttServerOptions options) { - if (serviceCollection is null) + if (services == null) { - throw new ArgumentNullException(nameof(serviceCollection)); + throw new ArgumentNullException(nameof(services)); } - serviceCollection.AddMqttConnectionHandler(); - serviceCollection.AddHostedMqttServer(configure); - - return serviceCollection; - } - - public static IServiceCollection AddHostedMqttServer(this IServiceCollection services, MqttServerOptions options) - { - if (options == null) throw new ArgumentNullException(nameof(options)); + if (options == null) + { + throw new ArgumentNullException(nameof(options)); + } services.AddSingleton(options); - services.AddHostedMqttServer(); return services; } - public static IServiceCollection AddHostedMqttServer(this IServiceCollection services, Action configure = null) + public static IServiceCollection AddHostedMqttServer(this IServiceCollection services, Action configure) { - services.AddSingleton(s => + if (services == null) { - var serverOptionsBuilder = new MqttServerOptionsBuilder(); - configure?.Invoke(serverOptionsBuilder); - return serverOptionsBuilder.Build(); - }); + throw new ArgumentNullException(nameof(services)); + } - services.AddHostedMqttServer(); + if (configure == null) + { + throw new ArgumentNullException(nameof(configure)); + } - return services; + var serverOptionsBuilder = new MqttServerOptionsBuilder(); + configure.Invoke(serverOptionsBuilder); + var options = serverOptionsBuilder.Build(); + + return AddHostedMqttServer(services, options); + } + + public static void AddHostedMqttServer(this IServiceCollection services) + { + // The user may have these services already registered. + services.TryAddSingleton(MqttNetNullLogger.Instance); + services.TryAddSingleton(new MqttFactory()); + + services.AddSingleton(); + services.AddHostedService(); } public static IServiceCollection AddHostedMqttServerWithServices(this IServiceCollection services, Action configure) { - services.AddSingleton(s => + if (services == null) { - var builder = new AspNetMqttServerOptionsBuilder(s); - configure(builder); - return builder.Build(); - }); + throw new ArgumentNullException(nameof(services)); + } + + services.AddSingleton( + s => + { + var builder = new AspNetMqttServerOptionsBuilder(s); + configure(builder); + return builder.Build(); + }); services.AddHostedMqttServer(); return services; } - static IServiceCollection AddHostedMqttServer(this IServiceCollection services) + public static IServiceCollection AddMqttConnectionHandler(this IServiceCollection services) { - var logger = new MqttNetEventLogger(); - - services.AddSingleton(logger); - services.AddSingleton(); - services.AddSingleton(s => s.GetService()); - services.AddSingleton(s => s.GetService()); + services.AddSingleton(); + services.AddSingleton(s => s.GetService()); return services; } - public static IServiceCollection AddMqttWebSocketServerAdapter(this IServiceCollection services) + public static void AddMqttLogger(this IServiceCollection services, IMqttNetLogger logger) { - services.AddSingleton(); - services.AddSingleton(s => s.GetService()); + if (services == null) + { + throw new ArgumentNullException(nameof(services)); + } - return services; + services.AddSingleton(logger); + } + + public static IServiceCollection AddMqttServer(this IServiceCollection serviceCollection, Action configure = null) + { + if (serviceCollection is null) + { + throw new ArgumentNullException(nameof(serviceCollection)); + } + + serviceCollection.AddMqttConnectionHandler(); + serviceCollection.AddHostedMqttServer(configure); + + return serviceCollection; } public static IServiceCollection AddMqttTcpServerAdapter(this IServiceCollection services) @@ -94,12 +120,12 @@ public static IServiceCollection AddMqttTcpServerAdapter(this IServiceCollection return services; } - public static IServiceCollection AddMqttConnectionHandler(this IServiceCollection services) + public static IServiceCollection AddMqttWebSocketServerAdapter(this IServiceCollection services) { - services.AddSingleton(); - services.AddSingleton(s => s.GetService()); + services.AddSingleton(); + services.AddSingleton(s => s.GetService()); return services; } } -} +} \ No newline at end of file diff --git a/Source/MQTTnet.Tests/Clients/LowLevelMqttClient/LowLevelMqttClient_Tests.cs b/Source/MQTTnet.Tests/Clients/LowLevelMqttClient/LowLevelMqttClient_Tests.cs index 6e9485f23..f8a1b394d 100644 --- a/Source/MQTTnet.Tests/Clients/LowLevelMqttClient/LowLevelMqttClient_Tests.cs +++ b/Source/MQTTnet.Tests/Clients/LowLevelMqttClient/LowLevelMqttClient_Tests.cs @@ -13,6 +13,7 @@ using MQTTnet.LowLevelClient; using MQTTnet.Packets; using MQTTnet.Protocol; +using MQTTnet.Server; namespace MQTTnet.Tests.Clients.LowLevelMqttClient { diff --git a/Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Tests.cs b/Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Tests.cs index 6894b0102..c6421edb1 100644 --- a/Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Tests.cs +++ b/Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Tests.cs @@ -17,6 +17,7 @@ using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; +using MQTTnet.Server; using MQTTnet.Tests.Mockups; // ReSharper disable InconsistentNaming diff --git a/Source/MQTTnet.Tests/MQTTv5/Server_Tests.cs b/Source/MQTTnet.Tests/MQTTv5/Server_Tests.cs index 42dbb882e..29587c3da 100644 --- a/Source/MQTTnet.Tests/MQTTv5/Server_Tests.cs +++ b/Source/MQTTnet.Tests/MQTTv5/Server_Tests.cs @@ -10,6 +10,7 @@ using System.Threading.Tasks; using MQTTnet.Internal; using MQTTnet.Protocol; +using MQTTnet.Server; namespace MQTTnet.Tests.MQTTv5 { diff --git a/Source/MQTTnet.Tests/Server/General.cs b/Source/MQTTnet.Tests/Server/General.cs index 20bc013cd..b8056fbb8 100644 --- a/Source/MQTTnet.Tests/Server/General.cs +++ b/Source/MQTTnet.Tests/Server/General.cs @@ -303,7 +303,7 @@ public async Task Intercept_Message() var server = await testEnvironment.StartServer(); server.InterceptingPublishAsync += e => { - e.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended"); + e.ApplicationMessage.PayloadSegment = new ArraySegment(Encoding.ASCII.GetBytes("extended")); return CompletedTask.Instance; }; @@ -314,7 +314,7 @@ public async Task Intercept_Message() var isIntercepted = false; c2.ApplicationMessageReceivedAsync += e => { - isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(e.ApplicationMessage.Payload), StringComparison.Ordinal) == 0; + isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray()), StringComparison.Ordinal) == 0; return CompletedTask.Instance; }; @@ -425,7 +425,7 @@ await server.InjectApplicationMessage( new MqttApplicationMessage { Topic = "/test/1", - Payload = Encoding.UTF8.GetBytes("true"), + PayloadSegment = new ArraySegment(Encoding.UTF8.GetBytes("true")), QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce }) { @@ -780,7 +780,7 @@ public async Task Send_Long_Body() var client1 = await testEnvironment.ConnectClient(); client1.ApplicationMessageReceivedAsync += e => { - receivedBody = e.ApplicationMessage.Payload; + receivedBody = e.ApplicationMessage.PayloadSegment.ToArray(); return CompletedTask.Instance; }; diff --git a/Source/MQTTnet.Tests/Server/Publishing_Tests.cs b/Source/MQTTnet.Tests/Server/Publishing_Tests.cs index cccdbdf1e..9ac14aa55 100644 --- a/Source/MQTTnet.Tests/Server/Publishing_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Publishing_Tests.cs @@ -8,6 +8,7 @@ using MQTTnet.Formatter; using MQTTnet.Internal; using MQTTnet.Protocol; +using MQTTnet.Server; namespace MQTTnet.Tests.Server { diff --git a/Source/MQTTnet.Tests/Server/Session_Tests.cs b/Source/MQTTnet.Tests/Server/Session_Tests.cs index 5a6ab80ce..6a3f76f9e 100644 --- a/Source/MQTTnet.Tests/Server/Session_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Session_Tests.cs @@ -283,7 +283,7 @@ public async Task Set_Session_Item() server.InterceptingPublishAsync += e => { - e.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(e.SessionItems["default_payload"] as string ?? string.Empty); + e.ApplicationMessage.PayloadSegment = new ArraySegment(Encoding.UTF8.GetBytes(e.SessionItems["default_payload"] as string ?? string.Empty)); return CompletedTask.Instance; }; diff --git a/Source/MQTTnet.Tests/Server/Tls_Tests.cs b/Source/MQTTnet.Tests/Server/Tls_Tests.cs index 5095f9ec0..482a37cd4 100644 --- a/Source/MQTTnet.Tests/Server/Tls_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Tls_Tests.cs @@ -101,7 +101,7 @@ await firstClient.PublishAsync( new MqttApplicationMessage { Topic = "TestTopic1", - Payload = new byte[] { 1, 2, 3, 4 } + PayloadSegment = new ArraySegment(new byte[] { 1, 2, 3, 4 }) }); await testEnvironment.Server.InjectApplicationMessage( @@ -109,7 +109,7 @@ await testEnvironment.Server.InjectApplicationMessage( new MqttApplicationMessage { Topic = "TestTopic1", - Payload = new byte[] { 1, 2, 3, 4 } + PayloadSegment = new ArraySegment(new byte[] { 1, 2, 3, 4 }) })); certificateProvider.CurrentCertificate = CreateCertificate(secondOid); @@ -137,7 +137,7 @@ await firstClient.PublishAsync( new MqttApplicationMessage { Topic = "TestTopic2", - Payload = new byte[] { 1, 2, 3, 4 } + PayloadSegment = new ArraySegment(new byte[] { 1, 2, 3, 4 }) }); await testEnvironment.Server.InjectApplicationMessage( @@ -145,7 +145,7 @@ await testEnvironment.Server.InjectApplicationMessage( new MqttApplicationMessage { Topic = "TestTopic2", - Payload = new byte[] { 1, 2, 3, 4 } + PayloadSegment = new ArraySegment(new byte[] { 1, 2, 3, 4 }) })); // Ensure first client still works @@ -153,7 +153,7 @@ await firstClient.PublishAsync( new MqttApplicationMessage { Topic = "TestTopic1", - Payload = new byte[] { 1, 2, 3, 4 } + PayloadSegment = new ArraySegment(new byte[] { 1, 2, 3, 4 }) }); await testEnvironment.Server.InjectApplicationMessage( @@ -161,7 +161,7 @@ await testEnvironment.Server.InjectApplicationMessage( new MqttApplicationMessage { Topic = "TestTopic1", - Payload = new byte[] { 1, 2, 3, 4 } + PayloadSegment = new ArraySegment(new byte[] { 1, 2, 3, 4 }) })); await Task.Delay(1000); @@ -178,12 +178,10 @@ static async Task ConnectClientAsync(TestEnvironment testEnvironmen var clientOptionsBuilder = testEnvironment.Factory.CreateClientOptionsBuilder(); clientOptionsBuilder.WithClientId(Guid.NewGuid().ToString()) .WithTcpServer("localhost", 8883) - .WithTls( - tls => + .WithTlsOptions( + o => { - tls.UseTls = true; - tls.SslProtocol = SslProtocols.Tls12; - tls.CertificateValidationHandler = certValidator; + o.WithSslProtocols(SslProtocols.Tls12).WithCertificateValidationHandler(certValidator); }); var clientOptions = clientOptionsBuilder.Build(); diff --git a/Source/MQTTnet/Formatter/MqttDisconnectPacketFactory.cs b/Source/MQTTnet/Formatter/MqttDisconnectPacketFactory.cs index 1c72c039a..ff2810e32 100644 --- a/Source/MQTTnet/Formatter/MqttDisconnectPacketFactory.cs +++ b/Source/MQTTnet/Formatter/MqttDisconnectPacketFactory.cs @@ -5,45 +5,107 @@ using MQTTnet.Client; using MQTTnet.Packets; using MQTTnet.Protocol; +using MQTTnet.Server; +using MQTTnet.Server.Disconnecting; namespace MQTTnet.Formatter { public sealed class MqttDisconnectPacketFactory { - readonly MqttDisconnectPacket _normalDisconnection = new MqttDisconnectPacket + static readonly MqttDisconnectPacket DefaultNormalDisconnection = new MqttDisconnectPacket { - ReasonCode = MqttDisconnectReasonCode.NormalDisconnection + ReasonCode = MqttDisconnectReasonCode.NormalDisconnection, + UserProperties = null, + ReasonString = null, + ServerReference = null, + SessionExpiryInterval = 0 + }; + + static readonly MqttDisconnectPacket DefaultServerShuttingDown = new MqttDisconnectPacket + { + ReasonCode = MqttDisconnectReasonCode.ServerShuttingDown, + UserProperties = null, + ReasonString = null, + ServerReference = null, + SessionExpiryInterval = 0 + }; + + static readonly MqttDisconnectPacket DefaultUnspecifiedError = new MqttDisconnectPacket + { + ReasonCode = MqttDisconnectReasonCode.UnspecifiedError, + UserProperties = null, + ReasonString = null, + ServerReference = null, + SessionExpiryInterval = 0 }; public MqttDisconnectPacket Create(MqttDisconnectReasonCode reasonCode) { if (reasonCode == MqttDisconnectReasonCode.NormalDisconnection) { - return _normalDisconnection; + return DefaultNormalDisconnection; + } + + if (reasonCode == MqttDisconnectReasonCode.ServerShuttingDown) + { + return DefaultServerShuttingDown; + } + + if (reasonCode == MqttDisconnectReasonCode.UnspecifiedError) + { + return DefaultUnspecifiedError; } return new MqttDisconnectPacket { - ReasonCode = reasonCode + ReasonCode = reasonCode, + UserProperties = null, + ReasonString = null, + ServerReference = null, + SessionExpiryInterval = 0 }; } - public MqttDisconnectPacket Create(MqttClientDisconnectOptions clientDisconnectOptions) + public MqttDisconnectPacket Create(MqttServerStopOptions serverStopOptions) { - var packet = new MqttDisconnectPacket(); + if (serverStopOptions == null) + { + return DefaultServerShuttingDown; + } + return Create(serverStopOptions.DefaultClientDisconnectOptions); + } + + public MqttDisconnectPacket Create(MqttServerClientDisconnectOptions clientDisconnectOptions) + { if (clientDisconnectOptions == null) { - packet.ReasonCode = MqttDisconnectReasonCode.NormalDisconnection; + return DefaultNormalDisconnection; } - else + + return new MqttDisconnectPacket { - packet.ReasonCode = (MqttDisconnectReasonCode)clientDisconnectOptions.Reason; - packet.UserProperties = clientDisconnectOptions.UserProperties; - packet.SessionExpiryInterval = clientDisconnectOptions.SessionExpiryInterval; + ReasonCode = clientDisconnectOptions.ReasonCode, + UserProperties = clientDisconnectOptions.UserProperties, + ReasonString = clientDisconnectOptions.ReasonString, + ServerReference = clientDisconnectOptions.ServerReference, + SessionExpiryInterval = 0 // TODO: Not yet supported! + }; + } + + public MqttDisconnectPacket Create(MqttClientDisconnectOptions clientDisconnectOptions) + { + if (clientDisconnectOptions == null) + { + return DefaultNormalDisconnection; } - return packet; + return new MqttDisconnectPacket + { + ReasonCode = (MqttDisconnectReasonCode)clientDisconnectOptions.Reason, + UserProperties = clientDisconnectOptions.UserProperties, + SessionExpiryInterval = clientDisconnectOptions.SessionExpiryInterval + }; } } } \ No newline at end of file diff --git a/Source/MQTTnet/MQTTnet.csproj.DotSettings b/Source/MQTTnet/MQTTnet.csproj.DotSettings index a756552ce..f8a9b730b 100644 --- a/Source/MQTTnet/MQTTnet.csproj.DotSettings +++ b/Source/MQTTnet/MQTTnet.csproj.DotSettings @@ -17,4 +17,5 @@ True True True + True True \ No newline at end of file diff --git a/Source/MQTTnet/MqttFactory.cs b/Source/MQTTnet/MqttFactory.cs index 09e6bd852..e3a0ea8b2 100644 --- a/Source/MQTTnet/MqttFactory.cs +++ b/Source/MQTTnet/MqttFactory.cs @@ -11,6 +11,7 @@ using MQTTnet.Implementations; using MQTTnet.LowLevelClient; using MQTTnet.Server; +using MQTTnet.Server.Disconnecting; using MqttClient = MQTTnet.Client.MqttClient; namespace MQTTnet @@ -175,6 +176,16 @@ public MqttServer CreateMqttServer(MqttServerOptions options, IEnumerable + /// The reason string is sent to every client via a DISCONNECT packet. + /// MQTT 5.0.0+ feature. + /// + public string ReasonString { get; set; } + + /// + /// The server reference is sent to every client via a DISCONNECT packet. + /// MQTT 5.0.0+ feature. + /// + public string ServerReference { get; set; } + + /// + /// These user properties are sent to every client via a DISCONNECT packet. + /// MQTT 5.0.0+ feature. + /// + public List UserProperties { get; set; } + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/Disconnecting/MqttServerClientDisconnectOptionsBuilder.cs b/Source/MQTTnet/Server/Disconnecting/MqttServerClientDisconnectOptionsBuilder.cs new file mode 100644 index 000000000..362585c65 --- /dev/null +++ b/Source/MQTTnet/Server/Disconnecting/MqttServerClientDisconnectOptionsBuilder.cs @@ -0,0 +1,55 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using MQTTnet.Packets; +using MQTTnet.Protocol; + +namespace MQTTnet.Server.Disconnecting +{ + public sealed class MqttServerClientDisconnectOptionsBuilder + { + readonly MqttServerClientDisconnectOptions _options = new MqttServerClientDisconnectOptions(); + + public MqttServerClientDisconnectOptions Build() + { + return _options; + } + + public MqttServerClientDisconnectOptionsBuilder WithReasonCode(MqttDisconnectReasonCode value) + { + _options.ReasonCode = value; + return this; + } + + public MqttServerClientDisconnectOptionsBuilder WithReasonString(string value) + { + _options.ReasonString = value; + return this; + } + + public MqttServerClientDisconnectOptionsBuilder WithServerReference(string value) + { + _options.ServerReference = value; + return this; + } + + public MqttServerClientDisconnectOptionsBuilder WithUserProperties(List value) + { + _options.UserProperties = value; + return this; + } + + public MqttServerClientDisconnectOptionsBuilder WithUserProperty(string name, string value) + { + if (_options.UserProperties == null) + { + _options.UserProperties = new List(); + } + + _options.UserProperties.Add(new MqttUserProperty(name, value)); + return this; + } + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/Internal/MqttClient.cs b/Source/MQTTnet/Server/Internal/MqttClient.cs index 2cfde8877..305b65c0d 100644 --- a/Source/MQTTnet/Server/Internal/MqttClient.cs +++ b/Source/MQTTnet/Server/Internal/MqttClient.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Adapter; @@ -14,6 +15,7 @@ using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; +using MQTTnet.Server.Disconnecting; namespace MQTTnet.Server { @@ -95,15 +97,16 @@ public async Task RunAsync() { var cancellationToken = _cancellationToken.Token; IsRunning = true; - - _ = Task.Factory.StartNew(() => SendPacketsLoop(cancellationToken), cancellationToken, TaskCreationOptions.PreferFairness, TaskScheduler.Default).ConfigureAwait(false); + + _ = Task.Factory.StartNew(() => SendPacketsLoop(cancellationToken), cancellationToken, TaskCreationOptions.PreferFairness, TaskScheduler.Default) + .ConfigureAwait(false); await ReceivePackagesLoop(cancellationToken).ConfigureAwait(false); } finally { IsRunning = false; - + Session.DisconnectedTimestamp = DateTime.UtcNow; _cancellationToken?.TryCancel(); @@ -141,7 +144,7 @@ public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellat Statistics.HandleSentPacket(packet); } - public async Task StopAsync(MqttDisconnectReasonCode reason) + public async Task StopAsync(MqttServerClientDisconnectOptions disconnectOptions) { IsRunning = false; @@ -150,16 +153,20 @@ public async Task StopAsync(MqttDisconnectReasonCode reason) // Sending DISCONNECT packets from the server to the client is only supported when using MQTTv5+. if (ChannelAdapter.PacketFormatterAdapter.ProtocolVersion == MqttProtocolVersion.V500) { - // The Client or Server MAY send a DISCONNECT packet before closing the Network Connection. - // This library does not sent a DISCONNECT packet for a normal disconnection. Maybe adding - // a configuration option is requested in the future. - if (reason != MqttDisconnectReasonCode.NormalDisconnection) + // From RFC: The Client or Server MAY send a DISCONNECT packet before closing the Network Connection. + // This library does not sent a DISCONNECT packet for a normal disconnection. + // TODO: Maybe adding a configuration option is requested in the future. + if (disconnectOptions != null) { - // Is is very important to send the DISCONNECT packet here BEFORE cancelling the - // token because the entire connection is closed (disposed) as soon as the cancellation - // token is cancelled. To there is no chance that the DISCONNECT packet will ever arrive - // at the client! - await TrySendDisconnectPacket(reason).ConfigureAwait(false); + if (disconnectOptions.ReasonCode != MqttDisconnectReasonCode.NormalDisconnection || disconnectOptions.UserProperties?.Any() == true || + !string.IsNullOrEmpty(disconnectOptions.ReasonString) || !string.IsNullOrEmpty(disconnectOptions.ServerReference)) + { + // Is is very important to send the DISCONNECT packet here BEFORE cancelling the + // token because the entire connection is closed (disposed) as soon as the cancellation + // token is cancelled. To there is no chance that the DISCONNECT packet will ever arrive + // at the client! + await TrySendDisconnectPacket(disconnectOptions).ConfigureAwait(false); + } } } } @@ -219,7 +226,7 @@ async Task HandleIncomingPublishPacket(MqttPublishPacket publishPacket, Cancella if (dispatchApplicationMessageResult.CloseConnection) { - await StopAsync(MqttDisconnectReasonCode.UnspecifiedError); + await StopAsync(new MqttServerClientDisconnectOptions { ReasonCode = MqttDisconnectReasonCode.UnspecifiedError }); return; } @@ -550,14 +557,14 @@ void StopInternal() _cancellationToken?.TryCancel(); } - async Task TrySendDisconnectPacket(MqttDisconnectReasonCode reasonCode) + async Task TrySendDisconnectPacket(MqttServerClientDisconnectOptions options) { try { // This also indicates that it was tried at least! _disconnectPacketSent = true; - var disconnectPacket = MqttPacketFactories.Disconnect.Create(reasonCode); + var disconnectPacket = MqttPacketFactories.Disconnect.Create(options); using (var timeout = new CancellationTokenSource(_serverOptions.DefaultCommunicationTimeout)) { @@ -566,7 +573,7 @@ async Task TrySendDisconnectPacket(MqttDisconnectReasonCode reasonCode) } catch (Exception exception) { - _logger.Warning(exception, "Client '{0}': Error while sending DISCONNECT packet (ReasonCode = {1})", Id, reasonCode); + _logger.Warning(exception, "Client '{0}': Error while sending DISCONNECT packet (ReasonCode = {1})", Id, options.ReasonCode); } } } diff --git a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs index 22d9e28ec..19fc18a69 100644 --- a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs @@ -16,6 +16,7 @@ using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; +using MQTTnet.Server.Disconnecting; namespace MQTTnet.Server { @@ -32,11 +33,11 @@ public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification readonly MqttRetainedMessagesManager _retainedMessagesManager; readonly IMqttNetLogger _rootLogger; + readonly ReaderWriterLockSlim _sessionsManagementLock = new ReaderWriterLockSlim(); + // The _sessions dictionary contains all session, the _subscriberSessions hash set contains subscriber sessions only. // See the MqttSubscription object for a detailed explanation. readonly MqttSessionsStorage _sessionsStorage = new MqttSessionsStorage(); - - readonly ReaderWriterLockSlim _sessionsManagementLock = new ReaderWriterLockSlim(); readonly HashSet _subscriberSessions = new HashSet(); public MqttClientSessionsManager( @@ -58,8 +59,13 @@ public MqttClientSessionsManager( _eventContainer = eventContainer ?? throw new ArgumentNullException(nameof(eventContainer)); } - public async Task CloseAllConnections() + public async Task CloseAllConnections(MqttServerClientDisconnectOptions options) { + if (options == null) + { + throw new ArgumentNullException(nameof(options)); + } + List connections; lock (_clients) { @@ -69,7 +75,7 @@ public async Task CloseAllConnections() foreach (var connection in connections) { - await connection.StopAsync(MqttDisconnectReasonCode.NormalDisconnection).ConfigureAwait(false); + await connection.StopAsync(options).ConfigureAwait(false); } } @@ -101,7 +107,7 @@ public async Task DeleteSessionAsync(string clientId) { if (connection != null) { - await connection.StopAsync(MqttDisconnectReasonCode.NormalDisconnection).ConfigureAwait(false); + await connection.StopAsync(new MqttServerClientDisconnectOptions { ReasonCode = MqttDisconnectReasonCode.NormalDisconnection }).ConfigureAwait(false); } } catch (Exception exception) @@ -204,7 +210,7 @@ public async Task DispatchApplicationMessage( { continue; } - + if (_eventContainer.InterceptingClientEnqueueEvent.HasHandlers) { var eventArgs = new InterceptingClientApplicationMessageEnqueueEventArgs(senderId, session.Id, applicationMessage); @@ -366,7 +372,8 @@ public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter if (_eventContainer.ClientConnectedEvent.HasHandlers) { - var eventArgs = new ClientConnectedEventArgs(connectPacket, + var eventArgs = new ClientConnectedEventArgs( + connectPacket, channelAdapter.PacketFormatterAdapter.ProtocolVersion, channelAdapter.Endpoint, client.Session.Items); @@ -595,13 +602,13 @@ async Task CreateClientConnection( session.IsPersistent = sessionShouldPersist; session.DisconnectedTimestamp = null; session.Recover(); - + connAckPacket.IsSessionPresent = true; } } _sessionsStorage.UpdateSession(connectPacket.ClientId, session); - + // Create a new client (always required). lock (_clients) { @@ -631,7 +638,8 @@ async Task CreateClientConnection( if (oldClient != null) { - await oldClient.StopAsync(MqttDisconnectReasonCode.SessionTakenOver).ConfigureAwait(false); + // TODO: Consider event here for session takeover to allow manipulation of user properties etc. + await oldClient.StopAsync(new MqttServerClientDisconnectOptions { ReasonCode = MqttDisconnectReasonCode.SessionTakenOver }).ConfigureAwait(false); if (_eventContainer.ClientDisconnectedEvent.HasHandlers) { @@ -734,4 +742,4 @@ async Task ValidateConnection(MqttConnectPacket c return eventArgs; } } -} +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/Internal/MqttClientStatistics.cs b/Source/MQTTnet/Server/Internal/MqttClientStatistics.cs index 8fd964267..0d6867bdc 100644 --- a/Source/MQTTnet/Server/Internal/MqttClientStatistics.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientStatistics.cs @@ -92,7 +92,7 @@ public void HandleSentPacket(MqttPacket packet) } } - private struct Statistics + struct Statistics { public long _receivedPacketsCount; public long _sentPacketsCount; diff --git a/Source/MQTTnet/Server/Internal/MqttServerKeepAliveMonitor.cs b/Source/MQTTnet/Server/Internal/MqttServerKeepAliveMonitor.cs index 3b193446e..ebad68d1b 100644 --- a/Source/MQTTnet/Server/Internal/MqttServerKeepAliveMonitor.cs +++ b/Source/MQTTnet/Server/Internal/MqttServerKeepAliveMonitor.cs @@ -8,11 +8,20 @@ using MQTTnet.Diagnostics; using MQTTnet.Internal; using MQTTnet.Protocol; +using MQTTnet.Server.Disconnecting; namespace MQTTnet.Server { public sealed class MqttServerKeepAliveMonitor { + static readonly MqttServerClientDisconnectOptions DefaultDisconnectOptions = new MqttServerClientDisconnectOptions + { + ReasonCode = MqttDisconnectReasonCode.KeepAliveTimeout, + UserProperties = null, + ReasonString = null, + ServerReference = null + }; + readonly MqttNetSourceLogger _logger; readonly MqttServerOptions _options; readonly MqttClientSessionsManager _sessionsManager; @@ -43,7 +52,7 @@ void DoWork(CancellationToken cancellationToken) { try { - _logger.Info("Starting keep alive monitor."); + _logger.Info("Starting keep alive monitor"); while (!cancellationToken.IsCancellationRequested) { @@ -56,11 +65,11 @@ void DoWork(CancellationToken cancellationToken) } catch (Exception exception) { - _logger.Error(exception, "Unhandled exception while checking keep alive timeouts."); + _logger.Error(exception, "Unhandled exception while checking keep alive timeouts"); } finally { - _logger.Verbose("Stopped checking keep alive timeout."); + _logger.Verbose("Stopped checking keep alive timeout"); } } @@ -117,18 +126,18 @@ void TryProcessClient(MqttClient connection, DateTime now) return; } - _logger.Warning("Client '{0}': Did not receive any packet or keep alive signal.", connection.Id); + _logger.Warning("Client '{0}': Did not receive any packet or keep alive signal", connection.Id); // Execute the disconnection in background so that the keep alive monitor can continue // with checking other connections. // We do not need to wait for the task so no await is needed. // Also the internal state of the connection must be swapped to "Finalizing" because the // next iteration of the keep alive timer happens. - var _ = connection.StopAsync(MqttDisconnectReasonCode.KeepAliveTimeout); + _ = connection.StopAsync(DefaultDisconnectOptions); } catch (Exception exception) { - _logger.Error(exception, "Client {0}: Unhandled exception while checking keep alive timeouts.", connection.Id); + _logger.Error(exception, "Client {0}: Unhandled exception while checking keep alive timeouts", connection.Id); } } diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index a28f8256c..016b167d8 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -14,6 +14,7 @@ using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; +using MQTTnet.Server.Disconnecting; namespace MQTTnet.Server { @@ -29,6 +30,7 @@ public class MqttServer : Disposable readonly IMqttNetLogger _rootLogger; CancellationTokenSource _cancellationTokenSource; + bool _isStopping; public MqttServer(MqttServerOptions options, IEnumerable adapters, IMqttNetLogger logger) { @@ -169,6 +171,13 @@ public event Func ValidatingConnectionAsync remove => _eventContainer.ValidatingConnectionEvent.RemoveHandler(value); } + /// + /// Gets or sets whether the server will accept new connections. + /// If not, the server will close the connection without any notification (DISCONNECT packet). + /// This feature can be used when the server is shutting down. + /// + public bool AcceptNewConnections { get; set; } = true; + public bool IsStarted => _cancellationTokenSource != null; /// @@ -184,16 +193,21 @@ public Task DeleteRetainedMessagesAsync() return _retainedMessagesManager?.ClearMessages() ?? CompletedTask.Instance; } - public Task DisconnectClientAsync(string id, MqttDisconnectReasonCode reasonCode) + public Task DisconnectClientAsync(string id, MqttServerClientDisconnectOptions options) { if (id == null) { throw new ArgumentNullException(nameof(id)); } + if (options == null) + { + throw new ArgumentNullException(nameof(options)); + } + ThrowIfNotStarted(); - return _clientSessionsManager.GetClient(id).StopAsync(reasonCode); + return _clientSessionsManager.GetClient(id).StopAsync(options); } public Task> GetClientsAsync() @@ -203,13 +217,6 @@ public Task> GetClientsAsync() return _clientSessionsManager.GetClientsStatus(); } - public Task> GetRetainedMessagesAsync() - { - ThrowIfNotStarted(); - - return _retainedMessagesManager.GetMessages(); - } - public Task GetRetainedMessageAsync(string topic) { if (topic == null) @@ -222,6 +229,13 @@ public Task GetRetainedMessageAsync(string topic) return _retainedMessagesManager.GetMessage(topic); } + public Task> GetRetainedMessagesAsync() + { + ThrowIfNotStarted(); + + return _retainedMessagesManager.GetMessages(); + } + public Task> GetSessionsAsync() { ThrowIfNotStarted(); @@ -247,7 +261,7 @@ public Task InjectApplicationMessage(InjectedMqttApplicationMessage injectedAppl if (string.IsNullOrEmpty(injectedApplicationMessage.ApplicationMessage.Topic)) { - throw new NotSupportedException("Injected application messages must contain a topic. Topic alias is not supported."); + throw new NotSupportedException("Injected application messages must contain a topic (topic alias is not supported)"); } var sessionItems = injectedApplicationMessage.CustomSessionItems ?? ServerSessionItems; @@ -263,6 +277,8 @@ public async Task StartAsync() { ThrowIfStarted(); + _isStopping = false; + _cancellationTokenSource = new CancellationTokenSource(); var cancellationToken = _cancellationTokenSource.Token; @@ -278,11 +294,16 @@ public async Task StartAsync() await _eventContainer.StartedEvent.InvokeAsync(EventArgs.Empty).ConfigureAwait(false); - _logger.Info("Started."); + _logger.Info("Started"); } - public async Task StopAsync() + public async Task StopAsync(MqttServerStopOptions options) { + if (options == null) + { + throw new ArgumentNullException(nameof(options)); + } + try { if (_cancellationTokenSource == null) @@ -290,9 +311,11 @@ public async Task StopAsync() return; } + _isStopping = true; + _cancellationTokenSource.Cancel(false); - await _clientSessionsManager.CloseAllConnections().ConfigureAwait(false); + await _clientSessionsManager.CloseAllConnections(options.DefaultClientDisconnectOptions).ConfigureAwait(false); foreach (var adapter in _adapters) { @@ -308,7 +331,7 @@ public async Task StopAsync() await _eventContainer.StoppedEvent.InvokeAsync(EventArgs.Empty).ConfigureAwait(false); - _logger.Info("Stopped."); + _logger.Info("Stopped"); } public Task SubscribeAsync(string clientId, ICollection topicFilters) @@ -369,7 +392,7 @@ protected override void Dispose(bool disposing) { if (disposing) { - StopAsync().GetAwaiter().GetResult(); + StopAsync(new MqttServerStopOptions()).GetAwaiter().GetResult(); foreach (var adapter in _adapters) { @@ -382,6 +405,11 @@ protected override void Dispose(bool disposing) Task OnHandleClient(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) { + if (_isStopping || !AcceptNewConnections) + { + return CompletedTask.Instance; + } + return _clientSessionsManager.HandleClientConnectionAsync(channelAdapter, cancellationToken); } @@ -405,4 +433,4 @@ void ThrowIfStarted() } } } -} +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttServerExtensions.cs b/Source/MQTTnet/Server/MqttServerExtensions.cs index 72cdac246..33284f4ac 100644 --- a/Source/MQTTnet/Server/MqttServerExtensions.cs +++ b/Source/MQTTnet/Server/MqttServerExtensions.cs @@ -8,11 +8,22 @@ using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; +using MQTTnet.Server.Disconnecting; namespace MQTTnet.Server { public static class MqttServerExtensions { + public static Task DisconnectClientAsync(this MqttServer server, string id, MqttDisconnectReasonCode reasonCode = MqttDisconnectReasonCode.NormalDisconnection) + { + if (server == null) + { + throw new ArgumentNullException(nameof(server)); + } + + return server.DisconnectClientAsync(id, new MqttServerClientDisconnectOptions { ReasonCode = reasonCode }); + } + public static Task InjectApplicationMessage( this MqttServer server, string topic, @@ -47,6 +58,16 @@ public static Task InjectApplicationMessage( })); } + public static Task StopAsync(this MqttServer server) + { + if (server == null) + { + throw new ArgumentNullException(nameof(server)); + } + + return server.StopAsync(new MqttServerStopOptions()); + } + public static Task SubscribeAsync(this MqttServer server, string clientId, params MqttTopicFilter[] topicFilters) { if (server == null) diff --git a/Source/MQTTnet/Server/Status/MqttClientStatus.cs b/Source/MQTTnet/Server/Status/MqttClientStatus.cs index 6707c9bc2..b26bad7fd 100644 --- a/Source/MQTTnet/Server/Status/MqttClientStatus.cs +++ b/Source/MQTTnet/Server/Status/MqttClientStatus.cs @@ -5,7 +5,7 @@ using System; using System.Threading.Tasks; using MQTTnet.Formatter; -using MQTTnet.Protocol; +using MQTTnet.Server.Disconnecting; namespace MQTTnet.Server { @@ -18,41 +18,46 @@ public MqttClientStatus(MqttClient client) _client = client ?? throw new ArgumentNullException(nameof(client)); } - /// - /// Gets or sets the client identifier. - /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. - /// - public string Id => _client.Id; + public long BytesReceived => _client.ChannelAdapter.BytesReceived; + + public long BytesSent => _client.ChannelAdapter.BytesSent; + + public DateTime ConnectedTimestamp => _client.Statistics.ConnectedTimestamp; public string Endpoint => _client.Endpoint; - public MqttProtocolVersion ProtocolVersion => _client.ChannelAdapter.PacketFormatterAdapter.ProtocolVersion; + /// + /// Gets or sets the client identifier. + /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. + /// + public string Id => _client.Id; - public DateTime ConnectedTimestamp => _client.Statistics.ConnectedTimestamp; + public DateTime LastNonKeepAlivePacketReceivedTimestamp => _client.Statistics.LastNonKeepAlivePacketReceivedTimestamp; public DateTime LastPacketReceivedTimestamp => _client.Statistics.LastPacketReceivedTimestamp; public DateTime LastPacketSentTimestamp => _client.Statistics.LastPacketSentTimestamp; - public DateTime LastNonKeepAlivePacketReceivedTimestamp => _client.Statistics.LastNonKeepAlivePacketReceivedTimestamp; + public MqttProtocolVersion ProtocolVersion => _client.ChannelAdapter.PacketFormatterAdapter.ProtocolVersion; public long ReceivedApplicationMessagesCount => _client.Statistics.ReceivedApplicationMessagesCount; - public long SentApplicationMessagesCount => _client.Statistics.SentApplicationMessagesCount; - public long ReceivedPacketsCount => _client.Statistics.ReceivedPacketsCount; + public long SentApplicationMessagesCount => _client.Statistics.SentApplicationMessagesCount; + public long SentPacketsCount => _client.Statistics.SentPacketsCount; public MqttSessionStatus Session { get; set; } - public long BytesSent => _client.ChannelAdapter.BytesSent; - - public long BytesReceived => _client.ChannelAdapter.BytesReceived; - - public Task DisconnectAsync() + public Task DisconnectAsync(MqttServerClientDisconnectOptions options) { - return _client.StopAsync(MqttDisconnectReasonCode.NormalDisconnection); + if (options == null) + { + throw new ArgumentNullException(nameof(options)); + } + + return _client.StopAsync(options); } public void ResetStatistics() @@ -60,4 +65,4 @@ public void ResetStatistics() _client.ResetStatistics(); } } -} +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/Status/MqttClientStatusExtensions.cs b/Source/MQTTnet/Server/Status/MqttClientStatusExtensions.cs new file mode 100644 index 000000000..d235b2dd3 --- /dev/null +++ b/Source/MQTTnet/Server/Status/MqttClientStatusExtensions.cs @@ -0,0 +1,32 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Threading.Tasks; +using MQTTnet.Protocol; +using MQTTnet.Server.Disconnecting; + +namespace MQTTnet.Server +{ + public static class MqttClientStatusExtensions + { + static readonly MqttServerClientDisconnectOptions DefaultDisconnectOptions = new MqttServerClientDisconnectOptions + { + ReasonCode = MqttDisconnectReasonCode.NormalDisconnection, + ReasonString = null, + UserProperties = null, + ServerReference = null + }; + + public static Task DisconnectAsync(this MqttClientStatus clientStatus) + { + if (clientStatus == null) + { + throw new ArgumentNullException(nameof(clientStatus)); + } + + return clientStatus.DisconnectAsync(DefaultDisconnectOptions); + } + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/Stopping/MqttServerStopOptions.cs b/Source/MQTTnet/Server/Stopping/MqttServerStopOptions.cs new file mode 100644 index 000000000..7f0b6910d --- /dev/null +++ b/Source/MQTTnet/Server/Stopping/MqttServerStopOptions.cs @@ -0,0 +1,24 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using MQTTnet.Protocol; +using MQTTnet.Server.Disconnecting; + +namespace MQTTnet.Server +{ + public sealed class MqttServerStopOptions + { + /// + /// These disconnect options are sent to every connected client via a DISCONNECT packet. + /// MQTT 5.0.0+ feature. + /// + public MqttServerClientDisconnectOptions DefaultClientDisconnectOptions { get; set; } = new MqttServerClientDisconnectOptions + { + ReasonCode = MqttDisconnectReasonCode.ServerShuttingDown, + UserProperties = null, + ReasonString = null, + ServerReference = null + }; + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/Stopping/MqttServerStopOptionsBuilder.cs b/Source/MQTTnet/Server/Stopping/MqttServerStopOptionsBuilder.cs new file mode 100644 index 000000000..65b3774cb --- /dev/null +++ b/Source/MQTTnet/Server/Stopping/MqttServerStopOptionsBuilder.cs @@ -0,0 +1,39 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using MQTTnet.Server.Disconnecting; + +namespace MQTTnet.Server +{ + public sealed class MqttServerStopOptionsBuilder + { + readonly MqttServerStopOptions _options = new MqttServerStopOptions(); + + public MqttServerStopOptionsBuilder WithDefaultClientDisconnectOptions(MqttServerClientDisconnectOptions value) + { + _options.DefaultClientDisconnectOptions = value; + return this; + } + + public MqttServerStopOptionsBuilder WithDefaultClientDisconnectOptions(Action builder) + { + if (builder == null) + { + throw new ArgumentNullException(nameof(builder)); + } + + var optionsBuilder = new MqttServerClientDisconnectOptionsBuilder(); + builder.Invoke(optionsBuilder); + + _options.DefaultClientDisconnectOptions = optionsBuilder.Build(); + return this; + } + + public MqttServerStopOptions Build() + { + return _options; + } + } +} \ No newline at end of file