From 4530e85120dafe9b430f9e87a40a554f09ea393c Mon Sep 17 00:00:00 2001 From: cgcel Date: Tue, 23 Jan 2024 17:38:41 +0800 Subject: [PATCH] Fix subscribers inject error. --- .../FirstConsumeService.cs | 2 +- .../SecondConsumeService.cs | 4 +- Src/NanoRabbit/Consumer/RabbitSubscriber.cs | 52 +++++++++++----- .../RabbitSubscriberExtensions.cs | 62 ++++++------------- 4 files changed, 59 insertions(+), 61 deletions(-) diff --git a/Example/Example.MultiConsumers/FirstConsumeService.cs b/Example/Example.MultiConsumers/FirstConsumeService.cs index b3fb767..2bba2e2 100644 --- a/Example/Example.MultiConsumers/FirstConsumeService.cs +++ b/Example/Example.MultiConsumers/FirstConsumeService.cs @@ -7,7 +7,7 @@ public class FirstConsumeService : RabbitSubscriber { private readonly ILogger? _logger; - public FirstConsumeService(IRabbitConsumer consumer, ILogger? logger, string consumerName) : base(consumer, consumerName, logger) + public FirstConsumeService(IRabbitConsumer consumer, string consumerName, ILogger? logger = null, int consumerCount = 1) : base(consumer, consumerName, logger, consumerCount) { _logger = logger; } diff --git a/Example/Example.MultiConsumers/SecondConsumeService.cs b/Example/Example.MultiConsumers/SecondConsumeService.cs index e0d35db..f5c5ec9 100644 --- a/Example/Example.MultiConsumers/SecondConsumeService.cs +++ b/Example/Example.MultiConsumers/SecondConsumeService.cs @@ -6,8 +6,8 @@ namespace Example.MultiConsumers; public class SecondConsumeService : RabbitAsyncSubscriber { private readonly ILogger _logger; - - public SecondConsumeService(IRabbitConsumer consumer, string consumerName, ILogger? logger, ILogger logger2) : base(consumer, consumerName, logger) + + public SecondConsumeService(ILogger logger2, IRabbitConsumer consumer, string consumerName, ILogger? logger, int consumerCount = 1) : base(consumer, consumerName, logger, consumerCount) { _logger = logger2; } diff --git a/Src/NanoRabbit/Consumer/RabbitSubscriber.cs b/Src/NanoRabbit/Consumer/RabbitSubscriber.cs index 43d6bde..8270b45 100644 --- a/Src/NanoRabbit/Consumer/RabbitSubscriber.cs +++ b/Src/NanoRabbit/Consumer/RabbitSubscriber.cs @@ -6,7 +6,6 @@ namespace NanoRabbit.Consumer; - /// /// RabbitMQ Synchronous Subscriber abstract class. /// @@ -15,29 +14,41 @@ public abstract class RabbitSubscriber : IHostedService private readonly ILogger? _logger; private readonly IRabbitConsumer _consumer; private readonly string _consumerName; + private readonly AutoResetEvent _exitSignal; - private readonly Thread _consumerThread; + private readonly List _consumerThreads; - protected RabbitSubscriber(IRabbitConsumer consumer, string consumerName, ILogger? logger = null) + protected RabbitSubscriber(IRabbitConsumer consumer, string consumerName, ILogger? logger = null, + int consumerCount = 1) { _consumer = consumer; _logger = logger; _consumerName = consumerName; _exitSignal = new AutoResetEvent(false); - _consumerThread = new Thread(() => Register(_exitSignal)); + _consumerThreads = Enumerable.Range(0, consumerCount) + .Select(_ => new Thread(() => Register(_exitSignal))) + .ToList(); } public Task StartAsync(CancellationToken cancellationToken) { - _consumerThread.Start(); + foreach (var thread in _consumerThreads) + { + thread.Start(); + } + return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _exitSignal.Set(); - _consumerThread.Join(); + foreach (var thread in _consumerThreads) + { + thread.Join(); + } + return Task.CompletedTask; } @@ -47,7 +58,7 @@ public Task StopAsync(CancellationToken cancellationToken) /// /// protected abstract bool HandleMessage(string message); - + /// /// Register a consumer /// @@ -97,7 +108,7 @@ private void Register(AutoResetEvent exitSignal) queue: consumerOptions.QueueName, autoAck: false, consumer: consumer); - + exitSignal.WaitOne(); // wait for signal } } @@ -110,27 +121,40 @@ public abstract class RabbitAsyncSubscriber : IHostedService private readonly ILogger? _logger; private readonly IRabbitConsumer _consumer; private readonly string _consumerName; + private readonly int _consumerCount; + private List? _consumerTasks; private CancellationTokenSource? _cancellationTokenSource; protected RabbitAsyncSubscriber(IRabbitConsumer consumer, string consumerName, - ILogger? logger) + ILogger? logger, int consumerCount = 1) { _consumer = consumer; _logger = logger; + _consumerCount = consumerCount; _consumerName = consumerName; } public Task StartAsync(CancellationToken cancellationToken) { _cancellationTokenSource = new CancellationTokenSource(); - Task.Run(() => RegisterAsync(_cancellationTokenSource.Token)); + + _consumerTasks = Enumerable.Range(0, _consumerCount) + .Select(_ => Task.Run(() => RegisterAsync(_cancellationTokenSource.Token))) + .ToList(); + return Task.CompletedTask; } public async Task StopAsync(CancellationToken cancellationToken) { - _cancellationTokenSource?.Cancel(); - await Task.WhenAny(Task.Delay(Timeout.Infinite, cancellationToken)); + // _cancellationTokenSource?.Cancel(); + // await Task.WhenAny(Task.Delay(Timeout.Infinite, cancellationToken)); + if (_consumerTasks != null) + { + _cancellationTokenSource?.Cancel(); + + await Task.WhenAll(_consumerTasks); + } } /// @@ -159,7 +183,7 @@ CancellationToken cancellationToken VirtualHost = consumerOptions.VirtualHost, AutomaticRecoveryEnabled = consumerOptions.AutomaticRecoveryEnabled }; - + factory.DispatchConsumersAsync = true; using var connection = factory.CreateConnection(); @@ -192,7 +216,7 @@ CancellationToken cancellationToken queue: consumerOptions.QueueName, autoAck: false, consumer: consumer); - + while (!cancellationToken.IsCancellationRequested) { await Task.Delay(1000, cancellationToken); diff --git a/Src/NanoRabbit/DependencyInjection/RabbitSubscriberExtensions.cs b/Src/NanoRabbit/DependencyInjection/RabbitSubscriberExtensions.cs index 7c4f32e..b1ffd1a 100644 --- a/Src/NanoRabbit/DependencyInjection/RabbitSubscriberExtensions.cs +++ b/Src/NanoRabbit/DependencyInjection/RabbitSubscriberExtensions.cs @@ -12,30 +12,17 @@ public static IServiceCollection AddRabbitSubscriber(this IServiceC { services.AddHostedService(provider => { - var hostedServices = new List(); - - for (int i = 0; i < consumerCount; i++) + if (enableLogging) { - if (enableLogging) - { - var logger = provider.GetRequiredService>(); - var consumer = provider.GetRequiredService(); - var subscriberService = - ActivatorUtilities.CreateInstance(provider, consumer, logger, consumerName); - // return subscriberService; - hostedServices.Add(subscriberService); - } - else - { - var consumer = provider.GetRequiredService(); - var subscriberService = - ActivatorUtilities.CreateInstance(provider, consumer, consumerName); - // return subscriberService; - hostedServices.Add(subscriberService); - } + var logger = provider.GetRequiredService>(); + var consumer = provider.GetRequiredService(); + return ActivatorUtilities.CreateInstance(provider, consumer, logger, consumerName, consumerCount); + } + else + { + var consumer = provider.GetRequiredService(); + return ActivatorUtilities.CreateInstance(provider, consumer, consumerName, consumerCount); } - - return new CompositeHostedService(hostedServices); }); return services; @@ -46,30 +33,17 @@ public static IServiceCollection AddRabbitAsyncSubscriber(this { services.AddHostedService(provider => { - var hostedServices = new List(); - - for (int i = 0; i < consumerCount; i++) + if (enableLogging) { - if (enableLogging) - { - var logger = provider.GetRequiredService>(); - var consumer = provider.GetRequiredService(); - var subscriberService = - ActivatorUtilities.CreateInstance(provider, consumer, logger, consumerName); - // return subscriberService; - hostedServices.Add(subscriberService); - } - else - { - var consumer = provider.GetRequiredService(); - var subscriberService = - ActivatorUtilities.CreateInstance(provider, consumer, consumerName); - // return subscriberService; - hostedServices.Add(subscriberService); - } + var logger = provider.GetRequiredService>(); + var consumer = provider.GetRequiredService(); + return ActivatorUtilities.CreateInstance(provider, consumer, logger, consumerName, consumerCount); + } + else + { + var consumer = provider.GetRequiredService(); + return ActivatorUtilities.CreateInstance(provider, consumer, consumerName, consumerCount); } - - return new CompositeHostedService(hostedServices); }); return services;