From f0c6ffa92beea75290ef29fb72bf13fc0a1bdfe5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C5=99emek=20Vysok=C3=BD?= Date: Mon, 9 Sep 2024 11:33:24 +0200 Subject: [PATCH] Measure time messages wait in the work item queue (#3934) --- .../PcsStartup.cs | 5 +-- .../ProductConstructionService.Api/Program.cs | 1 + .../MetricRecorder.cs | 32 +++++++++++++++++++ .../ProductConstructionServiceExtension.cs | 5 +++ .../Extensions.cs | 8 +++-- ...ConstructionService.ServiceDefaults.csproj | 4 +++ .../WorkItem.cs | 12 ++++++- .../WorkItemConsumer.cs | 31 +++++++++++------- .../WorkItemProducer.cs | 6 ++++ .../UpdaterTests.cs | 6 ++-- 10 files changed, 90 insertions(+), 20 deletions(-) create mode 100644 src/ProductConstructionService/ProductConstructionService.Common/MetricRecorder.cs diff --git a/src/ProductConstructionService/ProductConstructionService.Api/PcsStartup.cs b/src/ProductConstructionService/ProductConstructionService.Api/PcsStartup.cs index cc8a5d395a..bbb84f64b4 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/PcsStartup.cs +++ b/src/ProductConstructionService/ProductConstructionService.Api/PcsStartup.cs @@ -38,6 +38,7 @@ using ProductConstructionService.DependencyFlow.WorkItems; using ProductConstructionService.WorkItems; using ProductConstructionService.DependencyFlow; +using ProductConstructionService.ServiceDefaults; namespace ProductConstructionService.Api; @@ -169,7 +170,9 @@ internal static async Task ConfigurePcs( // TODO (https://github.com/dotnet/arcade-services/issues/3880) - Remove subscriptionIdGenerator builder.Services.AddSingleton(sp => new(RunningService.PCS)); + await builder.AddRedisCache(authRedis); builder.AddBuildAssetRegistry(); + builder.AddMetricRecorder(); builder.AddWorkItemQueues(azureCredential, waitForInitialization: initializeService); builder.AddDependencyFlowProcessors(); builder.AddVmrRegistrations(); @@ -193,8 +196,6 @@ internal static async Task ConfigurePcs( builder.Services.AddMergePolicies(); builder.Services.Configure(builder.Configuration.GetSection(ConfigurationKeys.DependencyFlowSLAs)); - await builder.AddRedisCache(authRedis); - if (initializeService) { builder.AddVmrInitialization(); diff --git a/src/ProductConstructionService/ProductConstructionService.Api/Program.cs b/src/ProductConstructionService/ProductConstructionService.Api/Program.cs index 19f9bf70c6..09e89ab34a 100644 --- a/src/ProductConstructionService/ProductConstructionService.Api/Program.cs +++ b/src/ProductConstructionService/ProductConstructionService.Api/Program.cs @@ -6,6 +6,7 @@ using ProductConstructionService.Api; using ProductConstructionService.Api.Configuration; using ProductConstructionService.Common; +using ProductConstructionService.ServiceDefaults; using ProductConstructionService.WorkItems; var builder = WebApplication.CreateBuilder(args); diff --git a/src/ProductConstructionService/ProductConstructionService.Common/MetricRecorder.cs b/src/ProductConstructionService/ProductConstructionService.Common/MetricRecorder.cs new file mode 100644 index 0000000000..179c2211ab --- /dev/null +++ b/src/ProductConstructionService/ProductConstructionService.Common/MetricRecorder.cs @@ -0,0 +1,32 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics.Metrics; +using Azure.Storage.Queues.Models; + +namespace ProductConstructionService.Common; + +public interface IMetricRecorder +{ + void QueueMessageReceived(QueueMessage message, TimeSpan delay); +} + +public class MetricRecorder : IMetricRecorder +{ + public const string PcsMetricsNamespace = "ProductConstructionService.Metrics"; + private const string WaitTimeMetricName = "pcs.queue.wait_time"; + + private readonly Counter _queueWaitTimeCounter; + + public MetricRecorder(IMeterFactory meterFactory) + { + var meter = meterFactory.Create(PcsMetricsNamespace); + _queueWaitTimeCounter = meter.CreateCounter(WaitTimeMetricName); + } + + public void QueueMessageReceived(QueueMessage message, TimeSpan delay) + { + TimeSpan timeInQueue = DateTimeOffset.UtcNow - message.InsertedOn!.Value - delay; + _queueWaitTimeCounter.Add((int)timeInQueue.TotalSeconds); + } +} diff --git a/src/ProductConstructionService/ProductConstructionService.Common/ProductConstructionServiceExtension.cs b/src/ProductConstructionService/ProductConstructionService.Common/ProductConstructionServiceExtension.cs index 99545e3fcd..fc97d46c00 100644 --- a/src/ProductConstructionService/ProductConstructionService.Common/ProductConstructionServiceExtension.cs +++ b/src/ProductConstructionService/ProductConstructionService.Common/ProductConstructionServiceExtension.cs @@ -74,4 +74,9 @@ public static async Task AddRedisCache( builder.Services.AddSingleton(redisConfig); builder.Services.AddScoped(); } + + public static void AddMetricRecorder(this IHostApplicationBuilder builder) + { + builder.Services.AddSingleton(); + } } diff --git a/src/ProductConstructionService/ProductConstructionService.ServiceDefaults/Extensions.cs b/src/ProductConstructionService/ProductConstructionService.ServiceDefaults/Extensions.cs index 8730cbba42..3659f9b463 100644 --- a/src/ProductConstructionService/ProductConstructionService.ServiceDefaults/Extensions.cs +++ b/src/ProductConstructionService/ProductConstructionService.ServiceDefaults/Extensions.cs @@ -6,12 +6,14 @@ using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Diagnostics.HealthChecks; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using OpenTelemetry.Logs; using OpenTelemetry.Metrics; using OpenTelemetry.Trace; +using ProductConstructionService.Common; -namespace Microsoft.Extensions.Hosting; +namespace ProductConstructionService.ServiceDefaults; public static class Extensions { @@ -47,7 +49,8 @@ public static IHostApplicationBuilder ConfigureOpenTelemetry(this IHostApplicati .WithMetrics(metrics => { metrics.AddRuntimeInstrumentation() - .AddBuiltInMeters(); + .AddBuiltInMeters() + .AddMeter(MetricRecorder.PcsMetricsNamespace); }) .WithTracing(tracing => { @@ -75,6 +78,7 @@ private static IHostApplicationBuilder AddOpenTelemetryExporters(this IHostAppli { builder.Services.Configure(logging => logging.AddOtlpExporter()); builder.Services.ConfigureOpenTelemetryMeterProvider(metrics => metrics.AddOtlpExporter()); + builder.Services.ConfigureOpenTelemetryMeterProvider(metrics => metrics.AddMeter(MetricRecorder.PcsMetricsNamespace)); builder.Services.ConfigureOpenTelemetryTracerProvider(tracing => tracing.AddOtlpExporter()); } diff --git a/src/ProductConstructionService/ProductConstructionService.ServiceDefaults/ProductConstructionService.ServiceDefaults.csproj b/src/ProductConstructionService/ProductConstructionService.ServiceDefaults/ProductConstructionService.ServiceDefaults.csproj index 26b9a08102..f3f3c7dbe2 100644 --- a/src/ProductConstructionService/ProductConstructionService.ServiceDefaults/ProductConstructionService.ServiceDefaults.csproj +++ b/src/ProductConstructionService/ProductConstructionService.ServiceDefaults/ProductConstructionService.ServiceDefaults.csproj @@ -23,4 +23,8 @@ + + + + diff --git a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItem.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItem.cs index 60d1bc1846..7c5b7cd9fd 100644 --- a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItem.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItem.cs @@ -1,10 +1,20 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Text.Json.Serialization; + namespace ProductConstructionService.WorkItems; public abstract class WorkItem { - public Guid Id { get; init; } = Guid.NewGuid(); + /// + /// Type of the message for easier deserialization. + /// public string Type => GetType().Name; + + /// + /// Period of time before the WorkItem becomes visible in the queue. + /// + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault | JsonIgnoreCondition.WhenWritingNull)] + public TimeSpan? Delay { get; internal set; } } diff --git a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumer.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumer.cs index 44697d9889..76e80caf0e 100644 --- a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumer.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemConsumer.cs @@ -7,19 +7,22 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using ProductConstructionService.Common; namespace ProductConstructionService.WorkItems; internal class WorkItemConsumer( - ILogger logger, - IOptions options, - WorkItemScopeManager scopeManager, - QueueServiceClient queueServiceClient) + ILogger logger, + IOptions options, + WorkItemScopeManager scopeManager, + QueueServiceClient queueServiceClient, + IMetricRecorder metricRecorder) : BackgroundService { private readonly ILogger _logger = logger; private readonly IOptions _options = options; private readonly WorkItemScopeManager _scopeManager = scopeManager; + private readonly IMetricRecorder _metricRecorder = metricRecorder; protected override async Task ExecuteAsync(CancellationToken cancellationToken) { @@ -66,14 +69,18 @@ private async Task ReadAndProcessWorkItemAsync(QueueClient queueClient, WorkItem return; } - string workItemId; string workItemType; + TimeSpan? delay; JsonNode node; try { node = JsonNode.Parse(message.Body)!; - workItemId = node["id"]!.ToString(); workItemType = node["type"]!.ToString(); + + var d = node["delay"]?.GetValue(); + delay = d.HasValue + ? TimeSpan.FromSeconds(d.Value) + : null; } catch (Exception ex) { @@ -82,9 +89,11 @@ private async Task ReadAndProcessWorkItemAsync(QueueClient queueClient, WorkItem return; } + _metricRecorder.QueueMessageReceived(message, delay ?? default); + try { - _logger.LogInformation("Starting attempt {attemptNumber} for work item {workItemId}, type {workItemType}", message.DequeueCount, workItemId, workItemType); + _logger.LogInformation("Starting attempt {attemptNumber} for {workItemType}", message.DequeueCount, workItemType); await workItemScope.RunWorkItemAsync(node, cancellationToken); await queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt, cancellationToken); } @@ -95,13 +104,13 @@ private async Task ReadAndProcessWorkItemAsync(QueueClient queueClient, WorkItem } catch (Exception ex) { - _logger.LogError(ex, "Processing work item {workItemId} attempt {attempt}/{maxAttempts} failed", - workItemId, message.DequeueCount, _options.Value.MaxWorkItemRetries); + _logger.LogError(ex, "Processing work item {workItemType} attempt {attempt}/{maxAttempts} failed", + workItemType, message.DequeueCount, _options.Value.MaxWorkItemRetries); // Let the workItem retry a few times. If it fails a few times, delete it from the queue, it's a bad work item if (message.DequeueCount == _options.Value.MaxWorkItemRetries || ex is NonRetriableException) { - _logger.LogError("Work item {workItemId} has failed {maxAttempts} times. Discarding the message {message} from the queue", - workItemId, _options.Value.MaxWorkItemRetries, message.Body.ToString()); + _logger.LogError("Work item {type} has failed {maxAttempts} times. Discarding the message {message} from the queue", + workItemType, _options.Value.MaxWorkItemRetries, message.Body.ToString()); await queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt, cancellationToken); } } diff --git a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProducer.cs b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProducer.cs index 001db4bcf7..620db1d715 100644 --- a/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProducer.cs +++ b/src/ProductConstructionService/ProductConstructionService.WorkItems/WorkItemProducer.cs @@ -28,6 +28,12 @@ public class WorkItemProducer(QueueServiceClient queueServiceClient, string q public async Task ProduceWorkItemAsync(T payload, TimeSpan delay = default) { var client = _queueServiceClient.GetQueueClient(_queueName); + + if (delay != default) + { + payload.Delay = delay; + } + var json = JsonSerializer.Serialize(payload, WorkItemConfiguration.JsonSerializerOptions); return await client.SendMessageAsync(json, delay); } diff --git a/test/ProductConstructionService.DependencyFlow.Tests/UpdaterTests.cs b/test/ProductConstructionService.DependencyFlow.Tests/UpdaterTests.cs index d9ccfa64eb..ec5404f5cd 100644 --- a/test/ProductConstructionService.DependencyFlow.Tests/UpdaterTests.cs +++ b/test/ProductConstructionService.DependencyFlow.Tests/UpdaterTests.cs @@ -96,10 +96,8 @@ public void UpdaterTests_SetUp() [TearDown] public void UpdaterTests_TearDown() { - var ExcludeWorkItemId = static (FluentAssertions.Equivalency.EquivalencyAssertionOptions> opt) - => opt.Excluding(member => member.DeclaringType == typeof(WorkItem) && member.Name.Equals(nameof(WorkItem.Id))); - Cache.Data.Should().BeEquivalentTo(ExpectedCacheState, ExcludeWorkItemId); - Reminders.Reminders.Should().BeEquivalentTo(ExpectedReminders, ExcludeWorkItemId); + Cache.Data.Should().BeEquivalentTo(ExpectedCacheState); + Reminders.Reminders.Should().BeEquivalentTo(ExpectedReminders); } protected void SetState(Subscription subscription, T state) where T : class