Skip to content

Commit

Permalink
Fix subscribers inject error.
Browse files Browse the repository at this point in the history
  • Loading branch information
ifgris committed Jan 23, 2024
1 parent 52c367b commit 4530e85
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 61 deletions.
2 changes: 1 addition & 1 deletion Example/Example.MultiConsumers/FirstConsumeService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class FirstConsumeService : RabbitSubscriber
{
private readonly ILogger<RabbitSubscriber>? _logger;

public FirstConsumeService(IRabbitConsumer consumer, ILogger<RabbitSubscriber>? logger, string consumerName) : base(consumer, consumerName, logger)
public FirstConsumeService(IRabbitConsumer consumer, string consumerName, ILogger<RabbitSubscriber>? logger = null, int consumerCount = 1) : base(consumer, consumerName, logger, consumerCount)
{
_logger = logger;
}
Expand Down
4 changes: 2 additions & 2 deletions Example/Example.MultiConsumers/SecondConsumeService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ namespace Example.MultiConsumers;
public class SecondConsumeService : RabbitAsyncSubscriber
{
private readonly ILogger<SecondConsumeService> _logger;
public SecondConsumeService(IRabbitConsumer consumer, string consumerName, ILogger<RabbitAsyncSubscriber>? logger, ILogger<SecondConsumeService> logger2) : base(consumer, consumerName, logger)

public SecondConsumeService(ILogger<SecondConsumeService> logger2, IRabbitConsumer consumer, string consumerName, ILogger<RabbitAsyncSubscriber>? logger, int consumerCount = 1) : base(consumer, consumerName, logger, consumerCount)
{
_logger = logger2;
}
Expand Down
52 changes: 38 additions & 14 deletions Src/NanoRabbit/Consumer/RabbitSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

namespace NanoRabbit.Consumer;


/// <summary>
/// RabbitMQ Synchronous Subscriber abstract class.
/// </summary>
Expand All @@ -15,29 +14,41 @@ public abstract class RabbitSubscriber : IHostedService
private readonly ILogger<RabbitSubscriber>? _logger;
private readonly IRabbitConsumer _consumer;
private readonly string _consumerName;

private readonly AutoResetEvent _exitSignal;
private readonly Thread _consumerThread;
private readonly List<Thread> _consumerThreads;


protected RabbitSubscriber(IRabbitConsumer consumer, string consumerName, ILogger<RabbitSubscriber>? logger = null)
protected RabbitSubscriber(IRabbitConsumer consumer, string consumerName, ILogger<RabbitSubscriber>? logger = null,
int consumerCount = 1)
{
_consumer = consumer;
_logger = logger;
_consumerName = consumerName;
_exitSignal = new AutoResetEvent(false);
_consumerThread = new Thread(() => Register(_exitSignal));
_consumerThreads = Enumerable.Range(0, consumerCount)
.Select(_ => new Thread(() => Register(_exitSignal)))
.ToList();
}

public Task StartAsync(CancellationToken cancellationToken)
{
_consumerThread.Start();
foreach (var thread in _consumerThreads)
{
thread.Start();
}

return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
_exitSignal.Set();
_consumerThread.Join();
foreach (var thread in _consumerThreads)
{
thread.Join();
}

return Task.CompletedTask;
}

Expand All @@ -47,7 +58,7 @@ public Task StopAsync(CancellationToken cancellationToken)
/// <param name="message"></param>
/// <returns></returns>
protected abstract bool HandleMessage(string message);

/// <summary>
/// Register a consumer
/// </summary>
Expand Down Expand Up @@ -97,7 +108,7 @@ private void Register(AutoResetEvent exitSignal)
queue: consumerOptions.QueueName,
autoAck: false,
consumer: consumer);

exitSignal.WaitOne(); // wait for signal
}
}
Expand All @@ -110,27 +121,40 @@ public abstract class RabbitAsyncSubscriber : IHostedService
private readonly ILogger<RabbitAsyncSubscriber>? _logger;
private readonly IRabbitConsumer _consumer;
private readonly string _consumerName;
private readonly int _consumerCount;
private List<Task>? _consumerTasks;
private CancellationTokenSource? _cancellationTokenSource;

protected RabbitAsyncSubscriber(IRabbitConsumer consumer, string consumerName,
ILogger<RabbitAsyncSubscriber>? logger)
ILogger<RabbitAsyncSubscriber>? logger, int consumerCount = 1)
{
_consumer = consumer;
_logger = logger;
_consumerCount = consumerCount;
_consumerName = consumerName;
}

public Task StartAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource = new CancellationTokenSource();
Task.Run(() => RegisterAsync(_cancellationTokenSource.Token));

_consumerTasks = Enumerable.Range(0, _consumerCount)
.Select(_ => Task.Run(() => RegisterAsync(_cancellationTokenSource.Token)))
.ToList();

return Task.CompletedTask;
}

public async Task StopAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource?.Cancel();
await Task.WhenAny(Task.Delay(Timeout.Infinite, cancellationToken));
// _cancellationTokenSource?.Cancel();
// await Task.WhenAny(Task.Delay(Timeout.Infinite, cancellationToken));
if (_consumerTasks != null)
{
_cancellationTokenSource?.Cancel();

await Task.WhenAll(_consumerTasks);
}
}

/// <summary>
Expand Down Expand Up @@ -159,7 +183,7 @@ CancellationToken cancellationToken
VirtualHost = consumerOptions.VirtualHost,
AutomaticRecoveryEnabled = consumerOptions.AutomaticRecoveryEnabled
};

factory.DispatchConsumersAsync = true;

using var connection = factory.CreateConnection();
Expand Down Expand Up @@ -192,7 +216,7 @@ CancellationToken cancellationToken
queue: consumerOptions.QueueName,
autoAck: false,
consumer: consumer);

while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(1000, cancellationToken);
Expand Down
62 changes: 18 additions & 44 deletions Src/NanoRabbit/DependencyInjection/RabbitSubscriberExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,17 @@ public static IServiceCollection AddRabbitSubscriber<TSubscriber>(this IServiceC
{
services.AddHostedService(provider =>
{
var hostedServices = new List<IHostedService>();

for (int i = 0; i < consumerCount; i++)
if (enableLogging)
{
if (enableLogging)
{
var logger = provider.GetRequiredService<ILogger<RabbitAsyncSubscriber>>();
var consumer = provider.GetRequiredService<IRabbitConsumer>();
var subscriberService =
ActivatorUtilities.CreateInstance<TSubscriber>(provider, consumer, logger, consumerName);
// return subscriberService;
hostedServices.Add(subscriberService);
}
else
{
var consumer = provider.GetRequiredService<IRabbitConsumer>();
var subscriberService =
ActivatorUtilities.CreateInstance<TSubscriber>(provider, consumer, consumerName);
// return subscriberService;
hostedServices.Add(subscriberService);
}
var logger = provider.GetRequiredService<ILogger<RabbitSubscriber>>();
var consumer = provider.GetRequiredService<IRabbitConsumer>();
return ActivatorUtilities.CreateInstance<TSubscriber>(provider, consumer, logger, consumerName, consumerCount);
}
else
{
var consumer = provider.GetRequiredService<IRabbitConsumer>();
return ActivatorUtilities.CreateInstance<TSubscriber>(provider, consumer, consumerName, consumerCount);
}

return new CompositeHostedService(hostedServices);
});

return services;
Expand All @@ -46,30 +33,17 @@ public static IServiceCollection AddRabbitAsyncSubscriber<TAsyncSubscriber>(this
{
services.AddHostedService(provider =>
{
var hostedServices = new List<IHostedService>();

for (int i = 0; i < consumerCount; i++)
if (enableLogging)
{
if (enableLogging)
{
var logger = provider.GetRequiredService<ILogger<RabbitAsyncSubscriber>>();
var consumer = provider.GetRequiredService<IRabbitConsumer>();
var subscriberService =
ActivatorUtilities.CreateInstance<TAsyncSubscriber>(provider, consumer, logger, consumerName);
// return subscriberService;
hostedServices.Add(subscriberService);
}
else
{
var consumer = provider.GetRequiredService<IRabbitConsumer>();
var subscriberService =
ActivatorUtilities.CreateInstance<TAsyncSubscriber>(provider, consumer, consumerName);
// return subscriberService;
hostedServices.Add(subscriberService);
}
var logger = provider.GetRequiredService<ILogger<RabbitAsyncSubscriber>>();
var consumer = provider.GetRequiredService<IRabbitConsumer>();
return ActivatorUtilities.CreateInstance<TAsyncSubscriber>(provider, consumer, logger, consumerName, consumerCount);
}
else
{
var consumer = provider.GetRequiredService<IRabbitConsumer>();
return ActivatorUtilities.CreateInstance<TAsyncSubscriber>(provider, consumer, consumerName, consumerCount);
}

return new CompositeHostedService(hostedServices);
});

return services;
Expand Down

0 comments on commit 4530e85

Please sign in to comment.