diff --git a/arcade-services.sln b/arcade-services.sln index fcd2113231..182ade03b9 100644 --- a/arcade-services.sln +++ b/arcade-services.sln @@ -126,9 +126,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ProductConstructionService. EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ProductConstructionService.Common", "src\ProductConstructionService\ProductConstructionService.Common\ProductConstructionService.Common.csproj", "{9C6F75EE-618D-4268-88B6-9E4C99B062FF}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ProductConstructionService.LongestBuildPathUpdater", "src\ProductConstructionService\ProductConstructionService.LongestBuildPathUpdater\ProductConstructionService.LongestBuildPathUpdater.csproj", "{BE0088E3-A8FF-4F05-9456-E8BAD2E50A19}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ProductConstructionService.LongestBuildPathUpdater", "src\ProductConstructionService\ProductConstructionService.LongestBuildPathUpdater\ProductConstructionService.LongestBuildPathUpdater.csproj", "{BE0088E3-A8FF-4F05-9456-E8BAD2E50A19}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ProductConstructionService.LongestBuildPathUpdater.Tests", "test\ProductConstructionService.LongestBuildPathUpdater.Tests\ProductConstructionService.LongestBuildPathUpdater.Tests.csproj", "{D40EADB7-5D48-421B-806D-6E2F79C077F8}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ProductConstructionService.LongestBuildPathUpdater.Tests", "test\ProductConstructionService.LongestBuildPathUpdater.Tests\ProductConstructionService.LongestBuildPathUpdater.Tests.csproj", "{D40EADB7-5D48-421B-806D-6E2F79C077F8}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ProductConstructionService.WorkItems", "src\ProductConstructionService\ProductConstructionService.WorkItems\ProductConstructionService.WorkItems.csproj", "{90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -609,6 +611,18 @@ Global {D40EADB7-5D48-421B-806D-6E2F79C077F8}.Release|x64.Build.0 = Release|Any CPU {D40EADB7-5D48-421B-806D-6E2F79C077F8}.Release|x86.ActiveCfg = Release|Any CPU {D40EADB7-5D48-421B-806D-6E2F79C077F8}.Release|x86.Build.0 = Release|Any CPU + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}.Debug|x64.ActiveCfg = Debug|Any CPU + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}.Debug|x64.Build.0 = Debug|Any CPU + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}.Debug|x86.ActiveCfg = Debug|Any CPU + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}.Debug|x86.Build.0 = Debug|Any CPU + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}.Release|Any CPU.Build.0 = Release|Any CPU + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}.Release|x64.ActiveCfg = Release|Any CPU + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}.Release|x64.Build.0 = Release|Any CPU + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}.Release|x86.ActiveCfg = Release|Any CPU + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -657,6 +671,7 @@ Global {9C6F75EE-618D-4268-88B6-9E4C99B062FF} = {243A4561-BF35-405A-AF12-AC57BB27796D} {BE0088E3-A8FF-4F05-9456-E8BAD2E50A19} = {243A4561-BF35-405A-AF12-AC57BB27796D} {D40EADB7-5D48-421B-806D-6E2F79C077F8} = {1A456CF0-C09A-4DE6-89CE-1110EED31180} + {90C7747B-EBEF-4CF5-92A7-7856A3A13CAA} = {243A4561-BF35-405A-AF12-AC57BB27796D} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {32B9C883-432E-4FC8-A1BF-090EB033DD5B} diff --git a/eng/service-templates/ProductConstructionService/provision.bicep b/eng/service-templates/ProductConstructionService/provision.bicep index 0ea5a46297..ff1b37b874 100644 --- a/eng/service-templates/ProductConstructionService/provision.bicep +++ b/eng/service-templates/ProductConstructionService/provision.bicep @@ -715,7 +715,7 @@ resource storageAccountQueueService 'Microsoft.Storage/storageAccounts/queueServ } resource storageAccountQueue 'Microsoft.Storage/storageAccounts/queueServices/queues@2022-09-01' = { - name: 'pcs-jobs' + name: 'pcs-workitems' parent: storageAccountQueueService } diff --git a/src/Microsoft.DotNet.Darc/DarcLib/TelemetryRecorder.cs b/src/Microsoft.DotNet.Darc/DarcLib/TelemetryRecorder.cs index 648c2de3e9..8b2c0bc182 100644 --- a/src/Microsoft.DotNet.Darc/DarcLib/TelemetryRecorder.cs +++ b/src/Microsoft.DotNet.Darc/DarcLib/TelemetryRecorder.cs @@ -15,9 +15,9 @@ public enum TrackedGitOperation public interface ITelemetryRecorder { /// - /// Records job duration and result in the customEvents table, with the `job.{jobType}` name + /// Records work item duration and result in the customEvents table, with the `workItem.{type}` name /// - ITelemetryScope RecordJobCompletion(string jobName); + ITelemetryScope RecordWorkItemCompletion(string workItemName); /// /// Records git operation duration and result. @@ -40,7 +40,7 @@ public class NoTelemetryRecorder : ITelemetryRecorder { private static readonly NoTelemetryScope _instance = new(); - public ITelemetryScope RecordJobCompletion(string jobName) => _instance; + public ITelemetryScope RecordWorkItemCompletion(string workItemName) => _instance; public ITelemetryScope RecordGitOperation(TrackedGitOperation operation, string repoUri) => _instance; public class NoTelemetryScope : ITelemetryScope diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Api/v2020_02_20/Controllers/CodeFlowController.cs b/src/ProductConstructionService/ProductConstructionService.Api/Api/v2020_02_20/Controllers/CodeFlowController.cs index 0acaa86513..ffa3e9c9bd 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Api/v2020_02_20/Controllers/CodeFlowController.cs +++ b/src/ProductConstructionService/ProductConstructionService.Api/Api/v2020_02_20/Controllers/CodeFlowController.cs @@ -6,8 +6,8 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.DotNet.DarcLib; using Microsoft.DotNet.Maestro.Client.Models; -using ProductConstructionService.Api.Queue; -using ProductConstructionService.Api.Queue.Jobs; +using ProductConstructionService.WorkItems; +using ProductConstructionService.WorkItems.WorkItemDefinitions; namespace ProductConstructionService.Api.Api.v2020_02_20.Controllers; @@ -15,11 +15,11 @@ namespace ProductConstructionService.Api.Api.v2020_02_20.Controllers; [ApiVersion("2020-02-20")] public class CodeFlowController( IBasicBarClient barClient, - JobProducerFactory jobProducerFactory) + WorkItemProducerFactory workItemProducerFactory) : ControllerBase { private readonly IBasicBarClient _barClient = barClient; - private readonly JobProducerFactory _jobProducerFactory = jobProducerFactory; + private readonly WorkItemProducerFactory _workItemProducerFactory = workItemProducerFactory; [HttpPost(Name = "Flow")] public async Task FlowBuild([Required, FromBody] Maestro.Api.Model.v2020_02_20.CodeFlowRequest request) @@ -41,7 +41,7 @@ public async Task FlowBuild([Required, FromBody] Maestro.Api.Mode return NotFound($"Build {request.BuildId} not found"); } - await _jobProducerFactory.Create().ProduceJobAsync(new() + await _workItemProducerFactory.Create().ProduceWorkItemAsync(new() { BuildId = request.BuildId, SubscriptionId = request.SubscriptionId, diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Controllers/StatusController.cs b/src/ProductConstructionService/ProductConstructionService.Api/Controllers/StatusController.cs index 949d8f58b9..5b531a9ba6 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Controllers/StatusController.cs +++ b/src/ProductConstructionService/ProductConstructionService.Api/Controllers/StatusController.cs @@ -5,38 +5,38 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.OpenApi.Extensions; using ProductConstructionService.Api.Controllers.ActionResults; -using ProductConstructionService.Api.Queue; +using ProductConstructionService.WorkItems; namespace ProductConstructionService.Api.Controllers; [Route("status")] -public class StatusController(JobScopeManager jobProcessorScopeManager) +public class StatusController(WorkItemScopeManager workItemScopeManager) : ControllerBase { [HttpPut("stop", Name = "Stop")] - public IActionResult StopPcsJobProcessor() + public IActionResult StopPcsWorkItemProcessor() { - jobProcessorScopeManager.FinishJobAndStop(); - return GetPcsJobProcessorStatus(); + workItemScopeManager.FinishWorkItemAndStop(); + return GetPcsWorkItemProcessorStatus(); } [HttpPut("start", Name = "Start")] - public IActionResult StartPcsJobProcessor() + public IActionResult StartPcsWorkItemProcessor() { - if (jobProcessorScopeManager.State == JobsProcessorState.Initializing) + if (workItemScopeManager.State == WorkItemProcessorState.Initializing) { return new PreconditionFailedActionResult("The background worker can't be started until the VMR is cloned"); } - jobProcessorScopeManager.Start(); + workItemScopeManager.Start(); - return GetPcsJobProcessorStatus(); + return GetPcsWorkItemProcessorStatus(); } [AllowAnonymous] [HttpGet(Name = "Status")] - public IActionResult GetPcsJobProcessorStatus() + public IActionResult GetPcsWorkItemProcessorStatus() { - return Ok(jobProcessorScopeManager.State.GetDisplayName()); + return Ok(workItemScopeManager.State.GetDisplayName()); } } diff --git a/src/ProductConstructionService/ProductConstructionService.Api/InitializationBackgroundService.cs b/src/ProductConstructionService/ProductConstructionService.Api/InitializationBackgroundService.cs index 51ed2da67d..5d523085b6 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/InitializationBackgroundService.cs +++ b/src/ProductConstructionService/ProductConstructionService.Api/InitializationBackgroundService.cs @@ -3,7 +3,7 @@ using Microsoft.DotNet.DarcLib; using Microsoft.DotNet.DarcLib.VirtualMonoRepo; -using ProductConstructionService.Api.Queue; +using ProductConstructionService.WorkItems; namespace ProductConstructionService.Api; @@ -16,7 +16,7 @@ internal class InitializationBackgroundService( IServiceScopeFactory serviceScopeFactory, ITelemetryRecorder telemetryRecorder, InitializationBackgroundServiceOptions options, - JobScopeManager jobScopeManager) + WorkItemScopeManager workItemScopeManager) : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -32,7 +32,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) linkedTokenSource.Token.ThrowIfCancellationRequested(); telemetryScope.SetSuccess(); - jobScopeManager.InitializingDone(); + workItemScopeManager.InitializingDone(); } } } diff --git a/src/ProductConstructionService/ProductConstructionService.Api/InitializationHealthCheck.cs b/src/ProductConstructionService/ProductConstructionService.Api/InitializationHealthCheck.cs index 6fdbea11a1..2f1ed2c634 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/InitializationHealthCheck.cs +++ b/src/ProductConstructionService/ProductConstructionService.Api/InitializationHealthCheck.cs @@ -2,15 +2,15 @@ // The .NET Foundation licenses this file to you under the MIT license. using Microsoft.Extensions.Diagnostics.HealthChecks; -using ProductConstructionService.Api.Queue; +using ProductConstructionService.WorkItems; namespace ProductConstructionService.Api; -internal class InitializationHealthCheck(JobScopeManager jobProcessorScopeManager) : IHealthCheck +internal class InitializationHealthCheck(WorkItemScopeManager workItemProcessorScopeManager) : IHealthCheck { public Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default) { - if (jobProcessorScopeManager.State == JobsProcessorState.Initializing) + if (workItemProcessorScopeManager.State == WorkItemProcessorState.Initializing) { return Task.FromResult(HealthCheckResult.Unhealthy("Background worker is waiting for initialization to finish")); } diff --git a/src/ProductConstructionService/ProductConstructionService.Api/PcsStartup.cs b/src/ProductConstructionService/ProductConstructionService.Api/PcsStartup.cs index 9a9f430848..308dc692ff 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/PcsStartup.cs +++ b/src/ProductConstructionService/ProductConstructionService.Api/PcsStartup.cs @@ -32,10 +32,10 @@ using ProductConstructionService.Api.Configuration; using ProductConstructionService.Api.Controllers; using ProductConstructionService.Api.Pages.DependencyFlow; -using ProductConstructionService.Api.Queue; using ProductConstructionService.Api.Telemetry; using ProductConstructionService.Api.VirtualMonoRepo; using ProductConstructionService.Common; +using ProductConstructionService.WorkItems; namespace ProductConstructionService.Api; @@ -138,7 +138,8 @@ internal static void ConfigurePcs( } builder.Services.RegisterBuildAssetRegistry(builder.Configuration); - builder.AddWorkitemQueues(azureCredential, waitForInitialization: initializeService); + builder.AddWorkItemQueues(azureCredential, waitForInitialization: initializeService); + builder.Services.AddWorkItemProcessors(); builder.AddVmrRegistrations(gitHubToken); builder.AddMaestroApiClient(managedIdentityId); builder.AddGitHubClientFactory(); diff --git a/src/ProductConstructionService/ProductConstructionService.Api/ProductConstructionService.Api.csproj b/src/ProductConstructionService/ProductConstructionService.Api/ProductConstructionService.Api.csproj index ffb21907b5..ef1a4644e6 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/ProductConstructionService.Api.csproj +++ b/src/ProductConstructionService/ProductConstructionService.Api/ProductConstructionService.Api.csproj @@ -47,6 +47,7 @@ + diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Program.cs b/src/ProductConstructionService/ProductConstructionService.Api/Program.cs index 14d6d86e4b..e66647020f 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Program.cs +++ b/src/ProductConstructionService/ProductConstructionService.Api/Program.cs @@ -1,13 +1,11 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using Azure.Storage.Queues; using Microsoft.AspNetCore.Rewrite; using Microsoft.Extensions.FileProviders; using ProductConstructionService.Api; using ProductConstructionService.Api.Configuration; -using ProductConstructionService.Api.Queue; -using ProductConstructionService.Common; +using ProductConstructionService.WorkItems; var builder = WebApplication.CreateBuilder(args); @@ -55,10 +53,7 @@ new PhysicalFileProvider(Path.Combine(Environment.CurrentDirectory, "wwwroot"))), }); - // When running locally, create the workitem queue, if it doesn't already exist - var queueServiceClient = app.Services.GetRequiredService(); - var queueClient = queueServiceClient.GetQueueClient(app.Configuration.GetRequiredValue(QueueConfiguration.JobQueueNameConfigurationKey)); - await queueClient.CreateIfNotExistsAsync(); + await app.UseLocalWorkItemQueues(); if (useSwagger) { diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProcessors/IJobProcessor.cs b/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProcessors/IJobProcessor.cs deleted file mode 100644 index 6ee3677021..0000000000 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProcessors/IJobProcessor.cs +++ /dev/null @@ -1,11 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using ProductConstructionService.Api.Queue.Jobs; - -namespace ProductConstructionService.Api.Queue.JobProcessors; - -internal interface IJobProcessor -{ - Task ProcessJobAsync(Job job, CancellationToken cancellationToken); -} diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProcessors/TextJobProcessor.cs b/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProcessors/TextJobProcessor.cs deleted file mode 100644 index 35db7ac49c..0000000000 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProcessors/TextJobProcessor.cs +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using ProductConstructionService.Api.Queue.Jobs; - -namespace ProductConstructionService.Api.Queue.JobProcessors; - -internal class TextJobProcessor(ILogger logger) : IJobProcessor -{ - private readonly ILogger _logger = logger; - - public Task ProcessJobAsync(Job job, CancellationToken cancellationToken) - { - var textJob = (TextJob)job; - _logger.LogInformation("Processed text job. Message: {message}", textJob.Text); - return Task.CompletedTask; - } -} diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobScope.cs b/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobScope.cs deleted file mode 100644 index 5e66a84212..0000000000 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobScope.cs +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using Microsoft.DotNet.DarcLib; -using ProductConstructionService.Api.Queue.JobProcessors; -using ProductConstructionService.Api.Queue.Jobs; - -namespace ProductConstructionService.Api.Queue; - -public class JobScope( - Action finalizer, - IServiceScope serviceScope, - ITelemetryRecorder telemetryRecorder) - : IDisposable -{ - private readonly IServiceScope _serviceScope = serviceScope; - private readonly ITelemetryRecorder _telemetryRecorder = telemetryRecorder; - private Job? _job = null; - - public void Dispose() - { - finalizer.Invoke(); - _serviceScope.Dispose(); - } - - public void InitializeScope(Job job) - { - _job = job; - } - - public async Task RunJobAsync(CancellationToken cancellationToken) - { - if (_job is null) - { - throw new Exception($"{nameof(JobScope)} not initialized! Call InitializeScope before calling {nameof(RunJobAsync)}"); - } - - var jobRunner = _serviceScope.ServiceProvider.GetRequiredKeyedService(_job.Type); - - using (ITelemetryScope telemetryScope = _telemetryRecorder.RecordJobCompletion(_job.Type)) - { - await jobRunner.ProcessJobAsync(_job, cancellationToken); - telemetryScope.SetSuccess(); - } - } -} diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobScopeManager.cs b/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobScopeManager.cs deleted file mode 100644 index 3cfbe83446..0000000000 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobScopeManager.cs +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -namespace ProductConstructionService.Api.Queue; - -public class JobScopeManager -{ - private readonly AutoResetEvent _autoResetEvent; - private readonly IServiceProvider _serviceProvider; - private readonly ILogger _logger; - private JobsProcessorState _state; - - public JobsProcessorState State - { - get => _state; - private set - { - if (_state != value) - { - _state = value; - _logger.LogInformation($"JobsProcessor state changing to {value}"); - } - } - } - - public JobScopeManager(bool initializingOnStartup, IServiceProvider serviceProvider, ILogger logger) - { - _autoResetEvent = new AutoResetEvent(!initializingOnStartup); - _logger = logger; - _serviceProvider = serviceProvider; - _state = initializingOnStartup ? JobsProcessorState.Initializing : JobsProcessorState.Working; - } - - public void Start() - { - State = JobsProcessorState.Working; - _autoResetEvent.Set(); - } - - /// - /// Creates a new scope for the currently executing Job, when the the JobsProcessor is in the `Working` state. - /// - public JobScope BeginJobScopeWhenReady() - { - _autoResetEvent.WaitOne(); - var scope = _serviceProvider.CreateScope(); - return ActivatorUtilities.CreateInstance(scope.ServiceProvider, scope, new Action(JobFinished)); - } - - private void JobFinished() - { - switch (State) - { - case JobsProcessorState.Stopping: - State = JobsProcessorState.Stopped; - break; - case JobsProcessorState.Working: - _autoResetEvent.Set(); - break; - } - } - - public void FinishJobAndStop() - { - if (State == JobsProcessorState.Working) - { - State = JobsProcessorState.Stopping; - } - } - - public void InitializingDone() - { - if (State == JobsProcessorState.Initializing) - { - State = JobsProcessorState.Stopped; - } - } -} diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/Jobs/TextJob.cs b/src/ProductConstructionService/ProductConstructionService.Api/Queue/Jobs/TextJob.cs deleted file mode 100644 index 1ce523f601..0000000000 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/Jobs/TextJob.cs +++ /dev/null @@ -1,10 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -namespace ProductConstructionService.Api.Queue.Jobs; - -internal class TextJob : Job -{ - public required string Text { get; init; } - public override string Type => nameof(TextJob); -} diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/QueueConfiguration.cs b/src/ProductConstructionService/ProductConstructionService.Api/Queue/QueueConfiguration.cs deleted file mode 100644 index eaacaf7484..0000000000 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/QueueConfiguration.cs +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using Azure.Identity; -using ProductConstructionService.Api.Queue.JobProcessors; -using ProductConstructionService.Api.Queue.Jobs; -using ProductConstructionService.Common; - -namespace ProductConstructionService.Api.Queue; - -internal static class QueueConfiguration -{ - public const string JobQueueNameConfigurationKey = $"{JobConsumerOptions.ConfigurationKey}:JobQueueName"; - - public static void AddWorkitemQueues(this WebApplicationBuilder builder, DefaultAzureCredential credential, bool waitForInitialization) - { - builder.AddAzureQueueClient("queues", settings => settings.Credential = credential); - - var queueName = builder.Configuration.GetRequiredValue(JobQueueNameConfigurationKey); - - // When running the service locally, the JobsProcessor should start in the Working state - builder.Services.AddSingleton(sp => ActivatorUtilities.CreateInstance(sp, waitForInitialization)); - builder.Services.Configure( - builder.Configuration.GetSection(JobConsumerOptions.ConfigurationKey)); - builder.Services.AddTransient(sp => - ActivatorUtilities.CreateInstance(sp, queueName)); - builder.Services.AddHostedService(); - - // Register all job processors - builder.Services.RegisterJobProcessor(); - builder.Services.RegisterJobProcessor(); - } -} - -static file class JobProcessorExtensions -{ - public static void RegisterJobProcessor(this IServiceCollection services) - where TProcessor : class, IJobProcessor - { - services.AddKeyedTransient(typeof(TJob).Name); - } -} diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Telemetry/TelemetryRecorder.cs b/src/ProductConstructionService/ProductConstructionService.Api/Telemetry/TelemetryRecorder.cs index 6a774d252d..416015d2d0 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Telemetry/TelemetryRecorder.cs +++ b/src/ProductConstructionService/ProductConstructionService.Api/Telemetry/TelemetryRecorder.cs @@ -15,8 +15,8 @@ public class TelemetryRecorder( private readonly ILogger _logger = logger; private readonly TelemetryClient _telemetryClient = telemetryClient; - public ITelemetryScope RecordJobCompletion(string jobType) - => CreateScope("JobExecuted", new() {{ "JobType", jobType }}); + public ITelemetryScope RecordWorkItemCompletion(string workItemType) + => CreateScope("WorkItemExecuted", new() {{ "WorkItemType", workItemType }}); public ITelemetryScope RecordGitOperation(TrackedGitOperation operation, string repoUri) => CreateScope($"Git{operation}", new() { { "Uri", repoUri } }); diff --git a/src/ProductConstructionService/ProductConstructionService.Api/appsettings.Development.json b/src/ProductConstructionService/ProductConstructionService.Api/appsettings.Development.json index 8023a3509b..52397e3ac4 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/appsettings.Development.json +++ b/src/ProductConstructionService/ProductConstructionService.Api/appsettings.Development.json @@ -11,10 +11,10 @@ }, "KeyVaultName": "ProductConstructionDev", "BuildAssetRegistrySqlConnectionString": "Data Source=localhost\\SQLEXPRESS;Initial Catalog=BuildAssetRegistry;Integrated Security=true", - "JobConsumerOptions": { - "JobQueueName": "pcs-jobs", + "WorkItemConsumerOptions": { + "WorkItemQueueName": "pcs-workitems", "QueuePollTimeout": "00:00:05", - "MaxJobRetries": 3, + "MaxWorkItemRetries": 3, "QueueMessageInvisibilityTime": "00:05:00" }, "Maestro": { diff --git a/src/ProductConstructionService/ProductConstructionService.Api/appsettings.json b/src/ProductConstructionService/ProductConstructionService.Api/appsettings.json index ffc53b08d6..9b3a94c957 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/appsettings.json +++ b/src/ProductConstructionService/ProductConstructionService.Api/appsettings.json @@ -12,10 +12,10 @@ } }, "AllowedHosts": "*", - "JobConsumerOptions": { - "JobQueueName": "pcs-jobs", + "WorkItemConsumerOptions": { + "WorkItemQueueName": "pcs-workitems", "QueuePollTimeout": "00:01:00", - "MaxJobRetries": 3, + "MaxWorkItemRetries": 3, "QueueMessageInvisibilityTime": "00:01:00" }, "EntraAuthentication": { diff --git a/src/ProductConstructionService/ProductConstructionService.SubscriptionTriggerer/appsettings.Staging.json b/src/ProductConstructionService/ProductConstructionService.SubscriptionTriggerer/appsettings.Staging.json index fddad34b0e..27d62da21c 100644 --- a/src/ProductConstructionService/ProductConstructionService.SubscriptionTriggerer/appsettings.Staging.json +++ b/src/ProductConstructionService/ProductConstructionService.SubscriptionTriggerer/appsettings.Staging.json @@ -1,5 +1,5 @@ { - "QueueConnectionString": "https://productconstructionint.queue.core.windows.net/pcs-jobs", + "QueueConnectionString": "https://productconstructionint.queue.core.windows.net/pcs-workitems", "ManagedIdentityClientId": "9729e72a-f381-4d59-a958-8aa94a18a8d2", "BuildAssetRegistrySqlConnectionString": "Data Source=tcp:maestro-int-server.database.windows.net,1433; Initial Catalog=BuildAssetRegistry; Authentication=Active Directory Managed Identity; Persist Security Info=False; MultipleActiveResultSets=True; Connect Timeout=30; Encrypt=True; TrustServerCertificate=False; User Id=USER_ID_PLACEHOLDER", "Kusto": { diff --git a/src/ProductConstructionService/ProductConstructionService.WorkItems/ProductConstructionService.WorkItems.csproj b/src/ProductConstructionService/ProductConstructionService.WorkItems/ProductConstructionService.WorkItems.csproj new file mode 100644 index 0000000000..9e3667ce00 --- /dev/null +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/ProductConstructionService.WorkItems.csproj @@ -0,0 +1,28 @@ + + + + net8.0 + enable + enable + + False + + + + + + + + + + + + + + + + + + + + diff --git a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConfiguration.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConfiguration.cs new file mode 100644 index 0000000000..108df31b86 --- /dev/null +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConfiguration.cs @@ -0,0 +1,52 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Azure.Identity; +using Azure.Storage.Queues; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using ProductConstructionService.Common; +using ProductConstructionService.WorkItems.WorkItemDefinitions; +using ProductConstructionService.WorkItems.WorkItemProcessors; + +namespace ProductConstructionService.WorkItems; + +public static class WorkItemConfiguration +{ + private const string WorkItemQueueNameConfigurationKey = $"{WorkItemConsumerOptions.ConfigurationKey}:WorkItemQueueName"; + + public static void AddWorkItemQueues(this WebApplicationBuilder builder, DefaultAzureCredential credential, bool waitForInitialization) + { + builder.AddAzureQueueClient("queues", settings => settings.Credential = credential); + + var queueName = builder.Configuration.GetRequiredValue(WorkItemQueueNameConfigurationKey); + + // When running the service locally, the WorkItemProcessor should start in the Working state + builder.Services.AddSingleton(sp => ActivatorUtilities.CreateInstance(sp, waitForInitialization)); + builder.Services.Configure( + builder.Configuration.GetSection(WorkItemConsumerOptions.ConfigurationKey)); + builder.Services.AddTransient(sp => + ActivatorUtilities.CreateInstance(sp, queueName)); + builder.Services.AddHostedService(); + } + + // When running locally, create the workitem queue, if it doesn't already exist + public static async Task UseLocalWorkItemQueues(this WebApplication app) + { + var queueServiceClient = app.Services.GetRequiredService(); + var queueClient = queueServiceClient.GetQueueClient(app.Configuration.GetRequiredValue(WorkItemQueueNameConfigurationKey)); + await queueClient.CreateIfNotExistsAsync(); + } + + public static void AddWorkItemProcessors(this IServiceCollection services) + { + services.RegisterWorkItemProcessor(); + } + + private static void RegisterWorkItemProcessor(this IServiceCollection services) + where TProcessor : class, IWorkItemProcessor + { + services.AddKeyedTransient(typeof(TWorkItem).Name); + } +} diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobConsumer.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumer.cs similarity index 51% rename from src/ProductConstructionService/ProductConstructionService.Api/Queue/JobConsumer.cs rename to src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumer.cs index adf7a49379..f94cc15e66 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobConsumer.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumer.cs @@ -3,21 +3,23 @@ using Azure.Storage.Queues; using Azure.Storage.Queues.Models; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using ProductConstructionService.Api.Queue.Jobs; +using ProductConstructionService.WorkItems.WorkItemDefinitions; -namespace ProductConstructionService.Api.Queue; +namespace ProductConstructionService.WorkItems; -internal class JobConsumer( - ILogger logger, - IOptions options, - JobScopeManager scopeManager, +internal class WorkItemConsumer( + ILogger logger, + IOptions options, + WorkItemScopeManager scopeManager, QueueServiceClient queueServiceClient) : BackgroundService { - private readonly ILogger _logger = logger; - private readonly IOptions _options = options; - private readonly JobScopeManager _scopeManager = scopeManager; + private readonly ILogger _logger = logger; + private readonly IOptions _options = options; + private readonly WorkItemScopeManager _scopeManager = scopeManager; protected override async Task ExecuteAsync(CancellationToken cancellationToken) { @@ -25,11 +27,11 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) // Otherwise, the service will be stuck here await Task.Yield(); - QueueClient queueClient = queueServiceClient.GetQueueClient(_options.Value.JobQueueName); - _logger.LogInformation("Starting to process PCS jobs {queueName}", _options.Value.JobQueueName); + QueueClient queueClient = queueServiceClient.GetQueueClient(_options.Value.WorkItemQueueName); + _logger.LogInformation("Starting to process PCS queue {queueName}", _options.Value.WorkItemQueueName); while (!cancellationToken.IsCancellationRequested) { - using (JobScope jobScope = _scopeManager.BeginJobScopeWhenReady()) + using (WorkItemScope workItemScope = _scopeManager.BeginWorkItemScopeWhenReady()) { if (cancellationToken.IsCancellationRequested) { @@ -37,7 +39,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) } try { - await ReadAndProcessJobAsync(queueClient, jobScope, cancellationToken); + await ReadAndProcessWorkItemAsync(queueClient, workItemScope, cancellationToken); } // If the cancellation token gets cancelled, we just want to exit catch (OperationCanceledException) @@ -46,32 +48,32 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) } catch (Exception ex) { - _logger.LogError(ex, "An unexpected exception occurred during Pcs job processing"); + _logger.LogError(ex, "An unexpected exception occurred during PCS work item processing"); } } } } - private async Task ReadAndProcessJobAsync(QueueClient queueClient, JobScope jobScope, CancellationToken cancellationToken) + private async Task ReadAndProcessWorkItemAsync(QueueClient queueClient, WorkItemScope workItemScope, CancellationToken cancellationToken) { QueueMessage message = await queueClient.ReceiveMessageAsync(_options.Value.QueueMessageInvisibilityTime, cancellationToken); if (message?.Body == null) { // Queue is empty, wait a bit - _logger.LogDebug("Queue {queueName} is empty. Sleeping for {sleepingTime} seconds", _options.Value.JobQueueName, (int)_options.Value.QueuePollTimeout.TotalSeconds); + _logger.LogDebug("Queue {queueName} is empty. Sleeping for {sleepingTime} seconds", _options.Value.WorkItemQueueName, (int)_options.Value.QueuePollTimeout.TotalSeconds); await Task.Delay(_options.Value.QueuePollTimeout, cancellationToken); return; } - var job = message.Body.ToObjectFromJson(); + var workItem = message.Body.ToObjectFromJson(); - jobScope.InitializeScope(job); + workItemScope.InitializeScope(workItem); try { - _logger.LogInformation("Starting attempt {attemptNumber} for job {jobId}, type {jobType}", message.DequeueCount, job.Id, job.Type); - await jobScope.RunJobAsync(cancellationToken); + _logger.LogInformation("Starting attempt {attemptNumber} for work item {workItemId}, type {workItemType}", message.DequeueCount, workItem.Id, workItem.Type); + await workItemScope.RunWorkItemAsync(cancellationToken); await queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt, cancellationToken); } @@ -82,13 +84,13 @@ private async Task ReadAndProcessJobAsync(QueueClient queueClient, JobScope jobS } catch (Exception ex) { - _logger.LogError(ex, "Processing job {jobId} attempt {attempt}/{maxAttempts} failed", - job.Id, message.DequeueCount, _options.Value.MaxJobRetries); - // Let the job retry a few times. If it fails a few times, delete it from the queue, it's a bad job - if (message.DequeueCount == _options.Value.MaxJobRetries) + _logger.LogError(ex, "Processing work item {workItemId} attempt {attempt}/{maxAttempts} failed", + workItem.Id, message.DequeueCount, _options.Value.MaxWorkItemRetries); + // Let the workItem retry a few times. If it fails a few times, delete it from the queue, it's a bad work item + if (message.DequeueCount == _options.Value.MaxWorkItemRetries) { - _logger.LogError("Job {jobId} has failed {maxAttempts} times. Discarding the message {message} from the queue", - job.Id, _options.Value.MaxJobRetries, message.Body.ToString()); + _logger.LogError("Work item {workItemId} has failed {maxAttempts} times. Discarding the message {message} from the queue", + workItem.Id, _options.Value.MaxWorkItemRetries, message.Body.ToString()); await queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt, cancellationToken); } } diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobConsumerOptions.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumerOptions.cs similarity index 50% rename from src/ProductConstructionService/ProductConstructionService.Api/Queue/JobConsumerOptions.cs rename to src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumerOptions.cs index 1caa360f9f..b4f7fb6a1c 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobConsumerOptions.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumerOptions.cs @@ -1,14 +1,14 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -namespace ProductConstructionService.Api.Queue; +namespace ProductConstructionService.WorkItems; -public class JobConsumerOptions +public class WorkItemConsumerOptions { - public const string ConfigurationKey = "JobConsumerOptions"; + public const string ConfigurationKey = "WorkItemConsumerOptions"; public required TimeSpan QueuePollTimeout { get; init; } - public required string JobQueueName { get; init; } - public required int MaxJobRetries { get; init; } + public required string WorkItemQueueName { get; init; } + public required int MaxWorkItemRetries { get; init; } public required TimeSpan QueueMessageInvisibilityTime { get; init; } } diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/Jobs/CodeFlowJob.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemDefinitions/CodeFlowWorkItem.cs similarity index 71% rename from src/ProductConstructionService/ProductConstructionService.Api/Queue/Jobs/CodeFlowJob.cs rename to src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemDefinitions/CodeFlowWorkItem.cs index 11a53687d7..a6c8ad9552 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/Jobs/CodeFlowJob.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemDefinitions/CodeFlowWorkItem.cs @@ -1,12 +1,12 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -namespace ProductConstructionService.Api.Queue.Jobs; +namespace ProductConstructionService.WorkItems.WorkItemDefinitions; /// -/// Main code flow job which causes new code changes to be flown to a new branch in the target repo. +/// Main code flow work item which causes new code changes to be flown to a new branch in the target repo. /// -internal class CodeFlowJob : Job +public class CodeFlowWorkItem : WorkItem { /// /// Subscription that is being flown/triggered. @@ -28,5 +28,5 @@ internal class CodeFlowJob : Job /// public string? PrUrl { get; init; } - public override string Type => nameof(CodeFlowJob); + public override string Type => nameof(CodeFlowWorkItem); } diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/Jobs/Job.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemDefinitions/WorkItem.cs similarity index 54% rename from src/ProductConstructionService/ProductConstructionService.Api/Queue/Jobs/Job.cs rename to src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemDefinitions/WorkItem.cs index 8df36fc774..40ee1b5646 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/Jobs/Job.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemDefinitions/WorkItem.cs @@ -3,11 +3,10 @@ using System.Text.Json.Serialization; -namespace ProductConstructionService.Api.Queue.Jobs; +namespace ProductConstructionService.WorkItems.WorkItemDefinitions; -[JsonDerivedType(typeof(TextJob), typeDiscriminator: nameof(TextJob))] -[JsonDerivedType(typeof(CodeFlowJob), typeDiscriminator: nameof(CodeFlowJob))] -public abstract class Job +[JsonDerivedType(typeof(CodeFlowWorkItem), typeDiscriminator: nameof(CodeFlowWorkItem))] +public abstract class WorkItem { public Guid Id { get; } = Guid.NewGuid(); public abstract string Type { get; } diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobsProcessorState.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessorState.cs similarity index 52% rename from src/ProductConstructionService/ProductConstructionService.Api/Queue/JobsProcessorState.cs rename to src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessorState.cs index 7807aeed79..eac45ed7f9 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobsProcessorState.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessorState.cs @@ -1,27 +1,27 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -namespace ProductConstructionService.Api.Queue; +namespace ProductConstructionService.WorkItems; -public enum JobsProcessorState +public enum WorkItemProcessorState { /// - /// The JobsProcessor is waiting for service to fully initialize + /// The processor is waiting for service to fully initialize /// Initializing, /// - /// The JobsProcessor will keep taking and processing new jobs + /// The processor will keep taking and processing new work items /// Working, /// - /// The JobsProcessor isn't doing anything + /// The processor isn't doing anything /// Stopped, /// - /// The JobsProcessor will finish its current job and stop + /// The processor will finish its current work item and stop /// Stopping, } diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProcessors/CodeFlowJobProcessor.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessors/CodeFlowWorkItemProcessor.cs similarity index 74% rename from src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProcessors/CodeFlowJobProcessor.cs rename to src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessors/CodeFlowWorkItemProcessor.cs index d2eaf62c86..418c2a22d7 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProcessors/CodeFlowJobProcessor.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessors/CodeFlowWorkItemProcessor.cs @@ -6,11 +6,12 @@ using Microsoft.DotNet.DarcLib.VirtualMonoRepo; using Microsoft.DotNet.Maestro.Client; using Microsoft.DotNet.Maestro.Client.Models; -using ProductConstructionService.Api.Queue.Jobs; +using Microsoft.Extensions.Logging; +using ProductConstructionService.WorkItems.WorkItemDefinitions; -namespace ProductConstructionService.Api.Queue.JobProcessors; +namespace ProductConstructionService.WorkItems.WorkItemProcessors; -internal class CodeFlowJobProcessor( +internal class CodeFlowWorkItemProcessor( IVmrInfo vmrInfo, IBasicBarClient barClient, IMaestroApi maestroApi, @@ -18,8 +19,8 @@ internal class CodeFlowJobProcessor( IPcsVmrForwardFlower vmrForwardFlower, ILocalLibGit2Client gitClient, ITelemetryRecorder telemetryRecorder, - ILogger logger) - : IJobProcessor + ILogger logger) + : IWorkItemProcessor { private readonly IVmrInfo _vmrInfo = vmrInfo; private readonly IBasicBarClient _barClient = barClient; @@ -28,22 +29,22 @@ internal class CodeFlowJobProcessor( private readonly IPcsVmrForwardFlower _vmrForwardFlower = vmrForwardFlower; private readonly ILocalLibGit2Client _gitClient = gitClient; private readonly ITelemetryRecorder _telemetryRecorder = telemetryRecorder; - private readonly ILogger _logger = logger; + private readonly ILogger _logger = logger; - public async Task ProcessJobAsync(Job job, CancellationToken cancellationToken) + public async Task ProcessWorkItemAsync(WorkItem workItem, CancellationToken cancellationToken) { - var codeflowJob = (CodeFlowJob)job; + var codeflowWorkItem = (CodeFlowWorkItem)workItem; - Subscription subscription = await _barClient.GetSubscriptionAsync(codeflowJob.SubscriptionId) - ?? throw new Exception($"Subscription {codeflowJob.SubscriptionId} not found"); + Subscription subscription = await _barClient.GetSubscriptionAsync(codeflowWorkItem.SubscriptionId) + ?? throw new Exception($"Subscription {codeflowWorkItem.SubscriptionId} not found"); if (!subscription.SourceEnabled || (subscription.SourceDirectory ?? subscription.TargetDirectory) == null) { - throw new Exception($"Subscription {codeflowJob.SubscriptionId} is not source enabled or source directory is not set"); + throw new Exception($"Subscription {codeflowWorkItem.SubscriptionId} is not source enabled or source directory is not set"); } - Build build = await _barClient.GetBuildAsync(codeflowJob.BuildId) - ?? throw new Exception($"Build {codeflowJob.BuildId} not found"); + Build build = await _barClient.GetBuildAsync(codeflowWorkItem.BuildId) + ?? throw new Exception($"Build {codeflowWorkItem.BuildId} not found"); var isForwardFlow = subscription.TargetDirectory != null; @@ -54,7 +55,7 @@ public async Task ProcessJobAsync(Job job, CancellationToken cancellationToken) subscription.Id, subscription.TargetRepository, subscription.TargetBranch, - codeflowJob.PrBranch); + codeflowWorkItem.PrBranch); bool hadUpdates; NativePath targetRepo; @@ -68,7 +69,7 @@ public async Task ProcessJobAsync(Job job, CancellationToken cancellationToken) subscription.TargetDirectory!, build, subscription.TargetBranch, - codeflowJob.PrBranch, + codeflowWorkItem.PrBranch, cancellationToken); } else @@ -77,7 +78,7 @@ public async Task ProcessJobAsync(Job job, CancellationToken cancellationToken) subscription.SourceDirectory!, build, subscription.TargetBranch, - codeflowJob.PrBranch, + codeflowWorkItem.PrBranch, cancellationToken); } } @@ -103,19 +104,19 @@ public async Task ProcessJobAsync(Job job, CancellationToken cancellationToken) // TODO https://github.com/dotnet/arcade-services/issues/3318: Handle failures (conflict, non-ff etc) using (var scope = _telemetryRecorder.RecordGitOperation(TrackedGitOperation.Push, subscription.TargetRepository)) { - await _gitClient.Push(targetRepo, codeflowJob.PrBranch, subscription.TargetRepository); + await _gitClient.Push(targetRepo, codeflowWorkItem.PrBranch, subscription.TargetRepository); scope.SetSuccess(); } // When no PR is created yet, we notify Maestro that the branch is ready - if (codeflowJob.PrUrl == null) + if (codeflowWorkItem.PrUrl == null) { _logger.LogInformation( "Notifying Maestro that subscription code changes for {subscriptionId} are ready in local branch {branch}", subscription.Id, subscription.TargetBranch); - await _maestroApi.Subscriptions.TriggerSubscriptionAsync(codeflowJob.BuildId, subscription.Id, default); + await _maestroApi.Subscriptions.TriggerSubscriptionAsync(codeflowWorkItem.BuildId, subscription.Id, default); } } } diff --git a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessors/IWorkItemProcessor.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessors/IWorkItemProcessor.cs new file mode 100644 index 0000000000..5e04125288 --- /dev/null +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProcessors/IWorkItemProcessor.cs @@ -0,0 +1,11 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using ProductConstructionService.WorkItems.WorkItemDefinitions; + +namespace ProductConstructionService.WorkItems.WorkItemProcessors; + +public interface IWorkItemProcessor +{ + Task ProcessWorkItemAsync(WorkItem workItem, CancellationToken cancellationToken); +} diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProducer.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProducer.cs similarity index 62% rename from src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProducer.cs rename to src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProducer.cs index e1856105ef..c2edd9714b 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProducer.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProducer.cs @@ -4,18 +4,18 @@ using System.Text.Json; using Azure.Storage.Queues; using Azure.Storage.Queues.Models; -using ProductConstructionService.Api.Queue.Jobs; +using ProductConstructionService.WorkItems.WorkItemDefinitions; -namespace ProductConstructionService.Api.Queue; +namespace ProductConstructionService.WorkItems; -public class JobProducer(QueueServiceClient queueServiceClient, string queueName) where T : Job +public class WorkItemProducer(QueueServiceClient queueServiceClient, string queueName) where T : WorkItem { private readonly QueueServiceClient _queueServiceClient = queueServiceClient; private readonly string _queueName = queueName; - public async Task ProduceJobAsync(T payload) + public async Task ProduceWorkItemAsync(T payload) { var client = _queueServiceClient.GetQueueClient(_queueName); - return await client.SendMessageAsync(JsonSerializer.Serialize(payload)); + return await client.SendMessageAsync(JsonSerializer.Serialize(payload)); } } diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProducerFactory.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProducerFactory.cs similarity index 57% rename from src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProducerFactory.cs rename to src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProducerFactory.cs index 91260b0069..cdc862456d 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Queue/JobProducerFactory.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProducerFactory.cs @@ -2,15 +2,15 @@ // The .NET Foundation licenses this file to you under the MIT license. using Azure.Storage.Queues; -using ProductConstructionService.Api.Queue.Jobs; +using ProductConstructionService.WorkItems.WorkItemDefinitions; -namespace ProductConstructionService.Api.Queue; +namespace ProductConstructionService.WorkItems; -public class JobProducerFactory(QueueServiceClient queueServiceClient, string queueName) +public class WorkItemProducerFactory(QueueServiceClient queueServiceClient, string queueName) { private readonly QueueServiceClient _queueServiceClient = queueServiceClient; private readonly string _queueName = queueName; - public JobProducer Create() where T : Job + public WorkItemProducer Create() where T : WorkItem => new(_queueServiceClient, _queueName); } diff --git a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemScope.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemScope.cs new file mode 100644 index 0000000000..eaedb0d313 --- /dev/null +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemScope.cs @@ -0,0 +1,47 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Microsoft.DotNet.DarcLib; +using Microsoft.Extensions.DependencyInjection; +using ProductConstructionService.WorkItems.WorkItemDefinitions; +using ProductConstructionService.WorkItems.WorkItemProcessors; + +namespace ProductConstructionService.WorkItems; + +public class WorkItemScope( + Action finalizer, + IServiceScope serviceScope, + ITelemetryRecorder telemetryRecorder) + : IDisposable +{ + private readonly IServiceScope _serviceScope = serviceScope; + private readonly ITelemetryRecorder _telemetryRecorder = telemetryRecorder; + private WorkItem? _workItem = null; + + public void Dispose() + { + finalizer.Invoke(); + _serviceScope.Dispose(); + } + + public void InitializeScope(WorkItem workItem) + { + _workItem = workItem; + } + + public async Task RunWorkItemAsync(CancellationToken cancellationToken) + { + if (_workItem is null) + { + throw new Exception($"{nameof(WorkItemScope)} not initialized! Call InitializeScope before calling {nameof(RunWorkItemAsync)}"); + } + + var workItemProcessor = _serviceScope.ServiceProvider.GetRequiredKeyedService(_workItem.Type); + + using (ITelemetryScope telemetryScope = _telemetryRecorder.RecordWorkItemCompletion(_workItem.Type)) + { + await workItemProcessor.ProcessWorkItemAsync(_workItem, cancellationToken); + telemetryScope.SetSuccess(); + } + } +} diff --git a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemScopeManager.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemScopeManager.cs new file mode 100644 index 0000000000..77f93a694e --- /dev/null +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemScopeManager.cs @@ -0,0 +1,81 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace ProductConstructionService.WorkItems; + +public class WorkItemScopeManager +{ + private readonly AutoResetEvent _autoResetEvent; + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + private WorkItemProcessorState _state; + + public WorkItemProcessorState State + { + get => _state; + private set + { + if (_state != value) + { + _state = value; + _logger.LogInformation($"WorkItemsProcessor state changing to {value}"); + } + } + } + + public WorkItemScopeManager(bool initializingOnStartup, IServiceProvider serviceProvider, ILogger logger) + { + _autoResetEvent = new AutoResetEvent(!initializingOnStartup); + _logger = logger; + _serviceProvider = serviceProvider; + _state = initializingOnStartup ? WorkItemProcessorState.Initializing : WorkItemProcessorState.Working; + } + + public void Start() + { + State = WorkItemProcessorState.Working; + _autoResetEvent.Set(); + } + + /// + /// Creates a new scope for the currently executing WorkItem, when the the WorkItemsProcessor is in the `Working` state. + /// + public WorkItemScope BeginWorkItemScopeWhenReady() + { + _autoResetEvent.WaitOne(); + var scope = _serviceProvider.CreateScope(); + return ActivatorUtilities.CreateInstance(scope.ServiceProvider, scope, new Action(WorkItemFinished)); + } + + private void WorkItemFinished() + { + switch (State) + { + case WorkItemProcessorState.Stopping: + State = WorkItemProcessorState.Stopped; + break; + case WorkItemProcessorState.Working: + _autoResetEvent.Set(); + break; + } + } + + public void FinishWorkItemAndStop() + { + if (State == WorkItemProcessorState.Working) + { + State = WorkItemProcessorState.Stopping; + } + } + + public void InitializingDone() + { + if (State == WorkItemProcessorState.Initializing) + { + State = WorkItemProcessorState.Stopped; + } + } +} diff --git a/test/ProductConstructionService.Api.Tests/JobScopeTests.cs b/test/ProductConstructionService.Api.Tests/JobScopeTests.cs deleted file mode 100644 index 0b411c4cf0..0000000000 --- a/test/ProductConstructionService.Api.Tests/JobScopeTests.cs +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using FluentAssertions; -using Microsoft.DotNet.DarcLib; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Moq; -using ProductConstructionService.Api.Queue; -using ProductConstructionService.Api.Queue.JobProcessors; -using ProductConstructionService.Api.Queue.Jobs; - -namespace ProductConstructionService.Api.Tests; - -public class JobScopeTests -{ - [Test] - public async Task JobScopeRecordsMetricsTest() - { - IServiceCollection services = new ServiceCollection(); - - Mock telemetryScope = new(); - Mock metricRecorderMock = new(); - TextJob textJob = new() { Text = string.Empty }; - - metricRecorderMock.Setup(m => m.RecordJobCompletion(textJob.Type)).Returns(telemetryScope.Object); - - services.AddSingleton(metricRecorderMock.Object); - services.AddKeyedSingleton(nameof(TextJob), new Mock().Object); - - IServiceProvider serviceProvider = services.BuildServiceProvider(); - - JobScopeManager scopeManager = new(false, serviceProvider, Mock.Of>()); - - using (JobScope jobScope = scopeManager.BeginJobScopeWhenReady()) - { - jobScope.InitializeScope(textJob); - - await jobScope.RunJobAsync(CancellationToken.None); - } - - metricRecorderMock.Verify(m => m.RecordJobCompletion(textJob.Type), Times.Once); - telemetryScope.Verify(m => m.SetSuccess(), Times.Once); - } - - [Test] - public void JobScopeRecordsMetricsWhenThrowingTest() - { - IServiceCollection services = new ServiceCollection(); - - Mock metricRecorderScopeMock = new(); - Mock metricRecorderMock = new(); - TextJob textJob = new() { Text = string.Empty }; - - metricRecorderMock.Setup(m => m.RecordJobCompletion(textJob.Type)).Returns(metricRecorderScopeMock.Object); - - services.AddSingleton(metricRecorderMock.Object); - - Mock jobRunnerMock = new(); - jobRunnerMock.Setup(j => j.ProcessJobAsync(textJob, It.IsAny())).Throws(); - services.AddKeyedSingleton(nameof(TextJob), jobRunnerMock.Object); - - IServiceProvider serviceProvider = services.BuildServiceProvider(); - - JobScopeManager scopeManager = new(false, serviceProvider, Mock.Of>()); - - using (JobScope jobScope = scopeManager.BeginJobScopeWhenReady()) - { - jobScope.InitializeScope(textJob); - - Func func = async () => await jobScope.RunJobAsync(CancellationToken.None); - func.Should().ThrowAsync(); - } - - metricRecorderMock.Verify(m => m.RecordJobCompletion(textJob.Type), Times.Once); - metricRecorderScopeMock.Verify(m => m.SetSuccess(), Times.Never); - } -} diff --git a/test/ProductConstructionService.Api.Tests/JobsProcessorScopeManagerTests.cs b/test/ProductConstructionService.Api.Tests/JobsProcessorScopeManagerTests.cs deleted file mode 100644 index 07e24b0a02..0000000000 --- a/test/ProductConstructionService.Api.Tests/JobsProcessorScopeManagerTests.cs +++ /dev/null @@ -1,183 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using FluentAssertions; -using Microsoft.DotNet.DarcLib; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Moq; -using ProductConstructionService.Api.Queue; - -namespace ProductConstructionService.Api.Tests; - -public class JobsProcessorScopeManagerTests -{ - private readonly IServiceProvider _serviceProvider; - - public JobsProcessorScopeManagerTests() - { - ServiceCollection services = new(); - - services.AddSingleton(new Mock().Object); - - _serviceProvider = services.BuildServiceProvider(); - } - - [Test, CancelAfter(30000)] - public async Task JobsProcessorStatusNormalFlow() - { - JobScopeManager scopeManager = new(true, _serviceProvider, Mock.Of>()); - // When it starts, the processor is not initializing - scopeManager.State.Should().Be(JobsProcessorState.Initializing); - - // Initialization is done - scopeManager.InitializingDone(); - scopeManager.State.Should().Be(JobsProcessorState.Stopped); - - TaskCompletionSource jobCompletion1 = new(); - TaskCompletionSource jobCompletion2 = new(); - Thread t = new(() => - { - using (scopeManager.BeginJobScopeWhenReady()) { } - jobCompletion1.SetResult(); - }); - t.Start(); - - // Wait for the worker to start and get blocked - while (t.ThreadState != ThreadState.WaitSleepJoin) ; - - // Start the service again - scopeManager.Start(); - - scopeManager.State.Should().Be(JobsProcessorState.Working); - - // Wait for the worker to finish the job - await jobCompletion1.Task; - - // The JobProcessor is working now, it shouldn't block on anything - using (scopeManager.BeginJobScopeWhenReady()) { } - - scopeManager.State.Should().Be(JobsProcessorState.Working); - - // Simulate someone calling stop in the middle of a job - jobCompletion1 = new(); - jobCompletion2 = new(); - - var workerTask = Task.Run(async () => - { - using (scopeManager.BeginJobScopeWhenReady()) - { - jobCompletion1.SetResult(); - await jobCompletion2.Task; - } - }); - // Wait for the workerTask to start the job - await jobCompletion1.Task; - - scopeManager.FinishJobAndStop(); - - // Before the job is finished, we should be in the Stopping stage - scopeManager.State.Should().Be(JobsProcessorState.Stopping); - - // Let the job finish - jobCompletion2.SetResult(); - - await workerTask; - - // Now we should be in the stopped state - scopeManager.State.Should().Be(JobsProcessorState.Stopped); - } - - [Test, CancelAfter(30000)] - public async Task JobsProcessorMultipleStopFlow() - { - JobScopeManager scopeManager = new(true, _serviceProvider, Mock.Of>()); - - scopeManager.InitializingDone(); - // The jobs processor should start in a stopped state - scopeManager.State.Should().Be(JobsProcessorState.Stopped); - - scopeManager.FinishJobAndStop(); - - // We were already stopped, so we should continue to be so - scopeManager.State.Should().Be(JobsProcessorState.Stopped); - - TaskCompletionSource jobCompletion = new(); - - // Start a new job that should get blocked - Thread t = new(() => - { - using (scopeManager.BeginJobScopeWhenReady()) - { - jobCompletion.SetResult(); - } - }); - t.Start(); - - // Wait for the worker to start and get blocked - while (t.ThreadState != ThreadState.WaitSleepJoin) ; - - scopeManager.Start(); - // Wait for the worker to unblock and start the job - await jobCompletion.Task; - - scopeManager.State.Should().Be(JobsProcessorState.Working); - } - - [Test, CancelAfter(30000)] - public async Task JobsProcessorMultipleStartStop() - { - JobScopeManager scopeManager = new(true, _serviceProvider, Mock.Of>()); - - scopeManager.InitializingDone(); - - scopeManager.State.Should().Be(JobsProcessorState.Stopped); - - // Start the JobsProcessor multiple times in a row - scopeManager.Start(); - scopeManager.Start(); - scopeManager.Start(); - - scopeManager.State.Should().Be(JobsProcessorState.Working); - - TaskCompletionSource jobCompletion1 = new(); - TaskCompletionSource jobCompletion2 = new(); - - var workerTask = Task.Run(async () => - { - using (scopeManager.BeginJobScopeWhenReady()) - { - jobCompletion1.SetResult(); - await jobCompletion2.Task; - } - }); - - scopeManager.Start(); - - await jobCompletion1.Task; - - // Now stop in the middle of a job - scopeManager.FinishJobAndStop(); - - scopeManager.State.Should().Be(JobsProcessorState.Stopping); - - jobCompletion2.SetResult(); - - // Wait for the job to finish - await workerTask; - - scopeManager.State.Should().Be(JobsProcessorState.Stopped); - - // Verify that the new job will actually be blocked - Thread t = new(() => - { - using (scopeManager.BeginJobScopeWhenReady()) { } - }); - t.Start(); - - while (t.ThreadState != ThreadState.WaitSleepJoin) ; - - // Unblock the worker thread - scopeManager.Start(); - } -} diff --git a/test/ProductConstructionService.Api.Tests/ProductConstructionService.Api.Tests.csproj b/test/ProductConstructionService.Api.Tests/ProductConstructionService.Api.Tests.csproj index f18768545e..6e73600bc5 100644 --- a/test/ProductConstructionService.Api.Tests/ProductConstructionService.Api.Tests.csproj +++ b/test/ProductConstructionService.Api.Tests/ProductConstructionService.Api.Tests.csproj @@ -11,6 +11,7 @@ + diff --git a/test/ProductConstructionService.Api.Tests/WorkItemScopeTests.cs b/test/ProductConstructionService.Api.Tests/WorkItemScopeTests.cs new file mode 100644 index 0000000000..38645e8793 --- /dev/null +++ b/test/ProductConstructionService.Api.Tests/WorkItemScopeTests.cs @@ -0,0 +1,87 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using FluentAssertions; +using Microsoft.DotNet.DarcLib; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Moq; +using ProductConstructionService.WorkItems; +using ProductConstructionService.WorkItems.WorkItemDefinitions; +using ProductConstructionService.WorkItems.WorkItemProcessors; + +namespace ProductConstructionService.Api.Tests; + +public class WorkItemScopeTests +{ + [Test] + public async Task WorkItemScopeRecordsMetricsTest() + { + IServiceCollection services = new ServiceCollection(); + + Mock telemetryScope = new(); + Mock metricRecorderMock = new(); + TestWorkItem testWorkItem = new() { Text = string.Empty }; + + metricRecorderMock.Setup(m => m.RecordWorkItemCompletion(testWorkItem.Type)).Returns(telemetryScope.Object); + + services.AddSingleton(metricRecorderMock.Object); + services.AddKeyedSingleton(nameof(TestWorkItem), new Mock().Object); + + IServiceProvider serviceProvider = services.BuildServiceProvider(); + + WorkItemScopeManager scopeManager = new(false, serviceProvider, Mock.Of>()); + + using (WorkItemScope workItemScope = scopeManager.BeginWorkItemScopeWhenReady()) + { + workItemScope.InitializeScope(testWorkItem); + + await workItemScope.RunWorkItemAsync(CancellationToken.None); + } + + metricRecorderMock.Verify(m => m.RecordWorkItemCompletion(testWorkItem.Type), Times.Once); + telemetryScope.Verify(m => m.SetSuccess(), Times.Once); + } + + [Test] + public void WorkItemScopeRecordsMetricsWhenThrowingTest() + { + IServiceCollection services = new ServiceCollection(); + + Mock metricRecorderScopeMock = new(); + Mock metricRecorderMock = new(); + TestWorkItem textWorkItem = new() { Text = string.Empty }; + + metricRecorderMock + .Setup(m => m.RecordWorkItemCompletion(textWorkItem.Type)) + .Returns(metricRecorderScopeMock.Object); + + services.AddSingleton(metricRecorderMock.Object); + + Mock workItemProcessor = new(); + workItemProcessor.Setup(i => i.ProcessWorkItemAsync(textWorkItem, It.IsAny())).Throws(); + services.AddKeyedSingleton(nameof(TestWorkItem), workItemProcessor.Object); + + IServiceProvider serviceProvider = services.BuildServiceProvider(); + + WorkItemScopeManager scopeManager = new(false, serviceProvider, Mock.Of>()); + + using (WorkItemScope workItemScope = scopeManager.BeginWorkItemScopeWhenReady()) + { + workItemScope.InitializeScope(textWorkItem); + + Func func = async () => await workItemScope.RunWorkItemAsync(CancellationToken.None); + func.Should().ThrowAsync(); + } + + metricRecorderMock.Verify(m => m.RecordWorkItemCompletion(textWorkItem.Type), Times.Once); + metricRecorderScopeMock.Verify(m => m.SetSuccess(), Times.Never); + } + + private class TestWorkItem : WorkItem + { + public required string Text { get; set; } + + public override string Type => nameof(TestWorkItem); + } +} diff --git a/test/ProductConstructionService.Api.Tests/WorkItemsProcessorScopeManagerTests.cs b/test/ProductConstructionService.Api.Tests/WorkItemsProcessorScopeManagerTests.cs new file mode 100644 index 0000000000..69306c82a7 --- /dev/null +++ b/test/ProductConstructionService.Api.Tests/WorkItemsProcessorScopeManagerTests.cs @@ -0,0 +1,183 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using FluentAssertions; +using Microsoft.DotNet.DarcLib; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Moq; +using ProductConstructionService.WorkItems; + +namespace ProductConstructionService.Api.Tests; + +public class WorkItemsProcessorScopeManagerTests +{ + private readonly IServiceProvider _serviceProvider; + + public WorkItemsProcessorScopeManagerTests() + { + ServiceCollection services = new(); + + services.AddSingleton(new Mock().Object); + + _serviceProvider = services.BuildServiceProvider(); + } + + [Test, CancelAfter(30000)] + public async Task WorkItemsProcessorStatusNormalFlow() + { + WorkItemScopeManager scopeManager = new(true, _serviceProvider, Mock.Of>()); + // When it starts, the processor is not initializing + scopeManager.State.Should().Be(WorkItemProcessorState.Initializing); + + // Initialization is done + scopeManager.InitializingDone(); + scopeManager.State.Should().Be(WorkItemProcessorState.Stopped); + + TaskCompletionSource workItemCompletion1 = new(); + TaskCompletionSource workItemCompletion2 = new(); + Thread t = new(() => + { + using (scopeManager.BeginWorkItemScopeWhenReady()) { } + workItemCompletion1.SetResult(); + }); + t.Start(); + + // Wait for the worker to start and get blocked + while (t.ThreadState != ThreadState.WaitSleepJoin) ; + + // Start the service again + scopeManager.Start(); + + scopeManager.State.Should().Be(WorkItemProcessorState.Working); + + // Wait for the worker to finish the workItem + await workItemCompletion1.Task; + + // The WorkItemProcessor is working now, it shouldn't block on anything + using (scopeManager.BeginWorkItemScopeWhenReady()) { } + + scopeManager.State.Should().Be(WorkItemProcessorState.Working); + + // Simulate someone calling stop in the middle of a workItem + workItemCompletion1 = new(); + workItemCompletion2 = new(); + + var workerTask = Task.Run(async () => + { + using (scopeManager.BeginWorkItemScopeWhenReady()) + { + workItemCompletion1.SetResult(); + await workItemCompletion2.Task; + } + }); + // Wait for the workerTask to start the workItem + await workItemCompletion1.Task; + + scopeManager.FinishWorkItemAndStop(); + + // Before the workItem is finished, we should be in the Stopping stage + scopeManager.State.Should().Be(WorkItemProcessorState.Stopping); + + // Let the workItem finish + workItemCompletion2.SetResult(); + + await workerTask; + + // Now we should be in the stopped state + scopeManager.State.Should().Be(WorkItemProcessorState.Stopped); + } + + [Test, CancelAfter(30000)] + public async Task WorkItemsProcessorMultipleStopFlow() + { + WorkItemScopeManager scopeManager = new(true, _serviceProvider, Mock.Of>()); + + scopeManager.InitializingDone(); + // The workItems processor should start in a stopped state + scopeManager.State.Should().Be(WorkItemProcessorState.Stopped); + + scopeManager.FinishWorkItemAndStop(); + + // We were already stopped, so we should continue to be so + scopeManager.State.Should().Be(WorkItemProcessorState.Stopped); + + TaskCompletionSource workItemCompletion = new(); + + // Start a new workItem that should get blocked + Thread t = new(() => + { + using (scopeManager.BeginWorkItemScopeWhenReady()) + { + workItemCompletion.SetResult(); + } + }); + t.Start(); + + // Wait for the worker to start and get blocked + while (t.ThreadState != ThreadState.WaitSleepJoin) ; + + scopeManager.Start(); + // Wait for the worker to unblock and start the workItem + await workItemCompletion.Task; + + scopeManager.State.Should().Be(WorkItemProcessorState.Working); + } + + [Test, CancelAfter(30000)] + public async Task WorkItemsProcessorMultipleStartStop() + { + WorkItemScopeManager scopeManager = new(true, _serviceProvider, Mock.Of>()); + + scopeManager.InitializingDone(); + + scopeManager.State.Should().Be(WorkItemProcessorState.Stopped); + + // Start the WorkItemsProcessor multiple times in a row + scopeManager.Start(); + scopeManager.Start(); + scopeManager.Start(); + + scopeManager.State.Should().Be(WorkItemProcessorState.Working); + + TaskCompletionSource workItemCompletion1 = new(); + TaskCompletionSource workItemCompletion2 = new(); + + var workerTask = Task.Run(async () => + { + using (scopeManager.BeginWorkItemScopeWhenReady()) + { + workItemCompletion1.SetResult(); + await workItemCompletion2.Task; + } + }); + + scopeManager.Start(); + + await workItemCompletion1.Task; + + // Now stop in the middle of a workItem + scopeManager.FinishWorkItemAndStop(); + + scopeManager.State.Should().Be(WorkItemProcessorState.Stopping); + + workItemCompletion2.SetResult(); + + // Wait for the workItem to finish + await workerTask; + + scopeManager.State.Should().Be(WorkItemProcessorState.Stopped); + + // Verify that the new workItem will actually be blocked + Thread t = new(() => + { + using (scopeManager.BeginWorkItemScopeWhenReady()) { } + }); + t.Start(); + + while (t.ThreadState != ThreadState.WaitSleepJoin) ; + + // Unblock the worker thread + scopeManager.Start(); + } +}