Skip to content

Commit

Permalink
Measure time messages wait in the work item queue (#3934)
Browse files Browse the repository at this point in the history
  • Loading branch information
premun authored Sep 9, 2024
1 parent 3d25e8c commit f0c6ffa
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
using ProductConstructionService.DependencyFlow.WorkItems;
using ProductConstructionService.WorkItems;
using ProductConstructionService.DependencyFlow;
using ProductConstructionService.ServiceDefaults;

namespace ProductConstructionService.Api;

Expand Down Expand Up @@ -169,7 +170,9 @@ internal static async Task ConfigurePcs(
// TODO (https://github.com/dotnet/arcade-services/issues/3880) - Remove subscriptionIdGenerator
builder.Services.AddSingleton<SubscriptionIdGenerator>(sp => new(RunningService.PCS));

await builder.AddRedisCache(authRedis);
builder.AddBuildAssetRegistry();
builder.AddMetricRecorder();
builder.AddWorkItemQueues(azureCredential, waitForInitialization: initializeService);
builder.AddDependencyFlowProcessors();
builder.AddVmrRegistrations();
Expand All @@ -193,8 +196,6 @@ internal static async Task ConfigurePcs(
builder.Services.AddMergePolicies();
builder.Services.Configure<SlaOptions>(builder.Configuration.GetSection(ConfigurationKeys.DependencyFlowSLAs));

await builder.AddRedisCache(authRedis);

if (initializeService)
{
builder.AddVmrInitialization();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using ProductConstructionService.Api;
using ProductConstructionService.Api.Configuration;
using ProductConstructionService.Common;
using ProductConstructionService.ServiceDefaults;
using ProductConstructionService.WorkItems;

var builder = WebApplication.CreateBuilder(args);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.Metrics;
using Azure.Storage.Queues.Models;

namespace ProductConstructionService.Common;

public interface IMetricRecorder
{
void QueueMessageReceived(QueueMessage message, TimeSpan delay);
}

public class MetricRecorder : IMetricRecorder
{
public const string PcsMetricsNamespace = "ProductConstructionService.Metrics";
private const string WaitTimeMetricName = "pcs.queue.wait_time";

private readonly Counter<int> _queueWaitTimeCounter;

public MetricRecorder(IMeterFactory meterFactory)
{
var meter = meterFactory.Create(PcsMetricsNamespace);
_queueWaitTimeCounter = meter.CreateCounter<int>(WaitTimeMetricName);
}

public void QueueMessageReceived(QueueMessage message, TimeSpan delay)
{
TimeSpan timeInQueue = DateTimeOffset.UtcNow - message.InsertedOn!.Value - delay;
_queueWaitTimeCounter.Add((int)timeInQueue.TotalSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,9 @@ public static async Task AddRedisCache(
builder.Services.AddSingleton(redisConfig);
builder.Services.AddScoped<IRedisCacheFactory, RedisCacheFactory>();
}

public static void AddMetricRecorder(this IHostApplicationBuilder builder)
{
builder.Services.AddSingleton<IMetricRecorder, MetricRecorder>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Trace;
using ProductConstructionService.Common;

namespace Microsoft.Extensions.Hosting;
namespace ProductConstructionService.ServiceDefaults;

public static class Extensions
{
Expand Down Expand Up @@ -47,7 +49,8 @@ public static IHostApplicationBuilder ConfigureOpenTelemetry(this IHostApplicati
.WithMetrics(metrics =>
{
metrics.AddRuntimeInstrumentation()
.AddBuiltInMeters();
.AddBuiltInMeters()
.AddMeter(MetricRecorder.PcsMetricsNamespace);
})
.WithTracing(tracing =>
{
Expand Down Expand Up @@ -75,6 +78,7 @@ private static IHostApplicationBuilder AddOpenTelemetryExporters(this IHostAppli
{
builder.Services.Configure<OpenTelemetryLoggerOptions>(logging => logging.AddOtlpExporter());
builder.Services.ConfigureOpenTelemetryMeterProvider(metrics => metrics.AddOtlpExporter());
builder.Services.ConfigureOpenTelemetryMeterProvider(metrics => metrics.AddMeter(MetricRecorder.PcsMetricsNamespace));
builder.Services.ConfigureOpenTelemetryTracerProvider(tracing => tracing.AddOtlpExporter());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\ProductConstructionService.Common\ProductConstructionService.Common.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Text.Json.Serialization;

namespace ProductConstructionService.WorkItems;

public abstract class WorkItem
{
public Guid Id { get; init; } = Guid.NewGuid();
/// <summary>
/// Type of the message for easier deserialization.
/// </summary>
public string Type => GetType().Name;

/// <summary>
/// Period of time before the WorkItem becomes visible in the queue.
/// </summary>
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault | JsonIgnoreCondition.WhenWritingNull)]
public TimeSpan? Delay { get; internal set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ProductConstructionService.Common;

namespace ProductConstructionService.WorkItems;

internal class WorkItemConsumer(
ILogger<WorkItemConsumer> logger,
IOptions<WorkItemConsumerOptions> options,
WorkItemScopeManager scopeManager,
QueueServiceClient queueServiceClient)
ILogger<WorkItemConsumer> logger,
IOptions<WorkItemConsumerOptions> options,
WorkItemScopeManager scopeManager,
QueueServiceClient queueServiceClient,
IMetricRecorder metricRecorder)
: BackgroundService
{
private readonly ILogger<WorkItemConsumer> _logger = logger;
private readonly IOptions<WorkItemConsumerOptions> _options = options;
private readonly WorkItemScopeManager _scopeManager = scopeManager;
private readonly IMetricRecorder _metricRecorder = metricRecorder;

protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -66,14 +69,18 @@ private async Task ReadAndProcessWorkItemAsync(QueueClient queueClient, WorkItem
return;
}

string workItemId;
string workItemType;
TimeSpan? delay;
JsonNode node;
try
{
node = JsonNode.Parse(message.Body)!;
workItemId = node["id"]!.ToString();
workItemType = node["type"]!.ToString();

var d = node["delay"]?.GetValue<int>();
delay = d.HasValue
? TimeSpan.FromSeconds(d.Value)
: null;
}
catch (Exception ex)
{
Expand All @@ -82,9 +89,11 @@ private async Task ReadAndProcessWorkItemAsync(QueueClient queueClient, WorkItem
return;
}

_metricRecorder.QueueMessageReceived(message, delay ?? default);

try
{
_logger.LogInformation("Starting attempt {attemptNumber} for work item {workItemId}, type {workItemType}", message.DequeueCount, workItemId, workItemType);
_logger.LogInformation("Starting attempt {attemptNumber} for {workItemType}", message.DequeueCount, workItemType);
await workItemScope.RunWorkItemAsync(node, cancellationToken);
await queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt, cancellationToken);
}
Expand All @@ -95,13 +104,13 @@ private async Task ReadAndProcessWorkItemAsync(QueueClient queueClient, WorkItem
}
catch (Exception ex)
{
_logger.LogError(ex, "Processing work item {workItemId} attempt {attempt}/{maxAttempts} failed",
workItemId, message.DequeueCount, _options.Value.MaxWorkItemRetries);
_logger.LogError(ex, "Processing work item {workItemType} attempt {attempt}/{maxAttempts} failed",
workItemType, message.DequeueCount, _options.Value.MaxWorkItemRetries);
// Let the workItem retry a few times. If it fails a few times, delete it from the queue, it's a bad work item
if (message.DequeueCount == _options.Value.MaxWorkItemRetries || ex is NonRetriableException)
{
_logger.LogError("Work item {workItemId} has failed {maxAttempts} times. Discarding the message {message} from the queue",
workItemId, _options.Value.MaxWorkItemRetries, message.Body.ToString());
_logger.LogError("Work item {type} has failed {maxAttempts} times. Discarding the message {message} from the queue",
workItemType, _options.Value.MaxWorkItemRetries, message.Body.ToString());
await queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt, cancellationToken);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public class WorkItemProducer<T>(QueueServiceClient queueServiceClient, string q
public async Task<SendReceipt> ProduceWorkItemAsync(T payload, TimeSpan delay = default)
{
var client = _queueServiceClient.GetQueueClient(_queueName);

if (delay != default)
{
payload.Delay = delay;
}

var json = JsonSerializer.Serialize(payload, WorkItemConfiguration.JsonSerializerOptions);
return await client.SendMessageAsync(json, delay);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,8 @@ public void UpdaterTests_SetUp()
[TearDown]
public void UpdaterTests_TearDown()
{
var ExcludeWorkItemId = static (FluentAssertions.Equivalency.EquivalencyAssertionOptions<Dictionary<string, object>> opt)
=> opt.Excluding(member => member.DeclaringType == typeof(WorkItem) && member.Name.Equals(nameof(WorkItem.Id)));
Cache.Data.Should().BeEquivalentTo(ExpectedCacheState, ExcludeWorkItemId);
Reminders.Reminders.Should().BeEquivalentTo(ExpectedReminders, ExcludeWorkItemId);
Cache.Data.Should().BeEquivalentTo(ExpectedCacheState);
Reminders.Reminders.Should().BeEquivalentTo(ExpectedReminders);
}

protected void SetState<T>(Subscription subscription, T state) where T : class
Expand Down

0 comments on commit f0c6ffa

Please sign in to comment.