From df35cbe489bc7454654213b05c9cdb150e46f7bb Mon Sep 17 00:00:00 2001 From: Oleksandr Didyk <106967057+oleksandr-didyk@users.noreply.github.com> Date: Tue, 15 Oct 2024 16:05:38 +0200 Subject: [PATCH] Add multiple work item consumers (#4049) Co-authored-by: dkurepa --- .../appsettings.Development.json | 2 +- .../appsettings.json | 3 ++- .../WorkItemConfiguration.cs | 15 ++++++++++++++- .../WorkItemConsumer.cs | 5 ++++- .../WorkItemProcessorState.cs | 2 +- .../WorkItemScopeManager.cs | 9 ++++++++- .../WorkItemsProcessorScopeManagerTests.cs | 4 ++-- 7 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/ProductConstructionService/ProductConstructionService.Api/appsettings.Development.json b/src/ProductConstructionService/ProductConstructionService.Api/appsettings.Development.json index 956a0bf33d..8beb2f6a65 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/appsettings.Development.json +++ b/src/ProductConstructionService/ProductConstructionService.Api/appsettings.Development.json @@ -10,7 +10,7 @@ "WorkItemQueueName": "pcs-workitems", "QueuePollTimeout": "00:00:05", "MaxWorkItemRetries": 3, - "QueueMessageInvisibilityTime": "00:05:00" + "QueueMessageInvisibilityTime": "00:15:00" }, "Maestro": { "Uri": "http://localhost:8088/", diff --git a/src/ProductConstructionService/ProductConstructionService.Api/appsettings.json b/src/ProductConstructionService/ProductConstructionService.Api/appsettings.json index c0803ea3ac..2d14aa38fd 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/appsettings.json +++ b/src/ProductConstructionService/ProductConstructionService.Api/appsettings.json @@ -13,10 +13,11 @@ }, "AllowedHosts": "*", "WorkItemQueueName": "pcs-workitems", + "WorkItemConsumerCount": 5, "WorkItemConsumerOptions": { "QueuePollTimeout": "00:01:00", "MaxWorkItemRetries": 3, - "QueueMessageInvisibilityTime": "00:01:00" + "QueueMessageInvisibilityTime": "00:15:00" }, "EntraAuthentication": { "Instance": "https://login.microsoftonline.com/", diff --git a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConfiguration.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConfiguration.cs index 8cb6c0dc5c..e71f2b0285 100644 --- a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConfiguration.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConfiguration.cs @@ -14,6 +14,7 @@ namespace ProductConstructionService.WorkItems; public static class WorkItemConfiguration { public const string WorkItemQueueNameConfigurationKey = "WorkItemQueueName"; + public const string WorkItemConsumerCountConfigurationKey = "WorkItemConsumerCount"; public const string ReplicaNameKey = "CONTAINER_APP_REPLICA_NAME"; public const int PollingRateSeconds = 10; @@ -40,7 +41,19 @@ public static void AddWorkItemQueues(this IHostApplicationBuilder builder, Defau builder.Configuration.GetRequiredValue(WorkItemQueueNameConfigurationKey); builder.Services.Configure( builder.Configuration.GetSection(WorkItemConsumerOptions.ConfigurationKey)); - builder.Services.AddHostedService(); + + var consumerCount = int.Parse( + builder.Configuration.GetRequiredValue(WorkItemConsumerCountConfigurationKey)); + + for (int i = 0; i < consumerCount; i++) + { + var consumerId = $"WorkItemConsumer_{i}"; + + // https://github.com/dotnet/runtime/issues/38751 + builder.Services.AddSingleton( + p => ActivatorUtilities.CreateInstance(p, consumerId)); + } + builder.Services.AddTransient(); } diff --git a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumer.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumer.cs index 904c85c239..655afc25f2 100644 --- a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumer.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumer.cs @@ -12,6 +12,7 @@ namespace ProductConstructionService.WorkItems; internal class WorkItemConsumer( + string consumerId, ILogger logger, IOptions options, WorkItemScopeManager scopeManager, @@ -19,6 +20,7 @@ internal class WorkItemConsumer( IMetricRecorder metricRecorder) : BackgroundService { + private readonly string _consumerId = consumerId; private readonly ILogger _logger = logger; private readonly IOptions _options = options; private readonly WorkItemScopeManager _scopeManager = scopeManager; @@ -31,7 +33,8 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) await Task.Yield(); QueueClient queueClient = queueServiceClient.GetQueueClient(_options.Value.WorkItemQueueName); - _logger.LogInformation("Starting to process PCS queue {queueName}", _options.Value.WorkItemQueueName); + _logger.LogInformation("Consumer {consumerId} starting to process PCS queue {queueName}", _consumerId, _options.Value.WorkItemQueueName); + while (!cancellationToken.IsCancellationRequested) { await using (WorkItemScope workItemScope = await _scopeManager.BeginWorkItemScopeWhenReadyAsync()) diff --git a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessorState.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessorState.cs index 4840f5c6ec..42188ef738 100644 --- a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessorState.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessorState.cs @@ -64,7 +64,7 @@ public async Task ReturnWhenWorkingAsync(int pollingRateSeconds) do { status = await _cache.GetAsync(); - } while (_autoResetEvent.WaitIfTrue(() => status == Stopped, pollingRateSeconds)); + } while (_autoResetEvent.WaitIfTrue(() => status != Working, pollingRateSeconds)); } public async Task SetStoppedIfStoppingAsync() diff --git a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemScopeManager.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemScopeManager.cs index cdffdcb9d5..15632d52ce 100644 --- a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemScopeManager.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemScopeManager.cs @@ -13,6 +13,8 @@ public class WorkItemScopeManager private readonly WorkItemProcessorState _state; private readonly int _pollingRateSeconds; + private int _activeWorkItems = 0; + public WorkItemScopeManager( IServiceProvider serviceProvider, WorkItemProcessorState state, @@ -32,6 +34,8 @@ public async Task BeginWorkItemScopeWhenReadyAsync() await _state.ReturnWhenWorkingAsync(_pollingRateSeconds); var scope = _serviceProvider.CreateScope(); + Interlocked.Increment(ref _activeWorkItems); + return new WorkItemScope( scope.ServiceProvider.GetRequiredService>(), WorkItemFinishedAsync, @@ -41,7 +45,10 @@ public async Task BeginWorkItemScopeWhenReadyAsync() private async Task WorkItemFinishedAsync() { - await _state.SetStoppedIfStoppingAsync(); + if (Interlocked.Decrement(ref _activeWorkItems) == 0) + { + await _state.SetStoppedIfStoppingAsync(); + } } public async Task InitializationFinished() diff --git a/test/ProductConstructionService.WorkItem.Tests/WorkItemsProcessorScopeManagerTests.cs b/test/ProductConstructionService.WorkItem.Tests/WorkItemsProcessorScopeManagerTests.cs index 858aaf8e60..a9d97c53cd 100644 --- a/test/ProductConstructionService.WorkItem.Tests/WorkItemsProcessorScopeManagerTests.cs +++ b/test/ProductConstructionService.WorkItem.Tests/WorkItemsProcessorScopeManagerTests.cs @@ -55,9 +55,9 @@ public async Task WorkItemsProcessorStatusNormalFlow() TaskCompletionSource workItemCompletion1 = new(); TaskCompletionSource workItemCompletion2 = new(); - Thread t = new(() => + Thread t = new(async () => { - using (_scopeManager.BeginWorkItemScopeWhenReadyAsync()) { } + await using (await _scopeManager.BeginWorkItemScopeWhenReadyAsync()) { } workItemCompletion1.SetResult(); }); t.Start();