From 0fce72ea80283039eba306427c0ed56c039b1a1a Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Thu, 25 Jul 2024 15:46:12 -0500 Subject: [PATCH] Accurately report how many queue items were processed. --- src/Foundatio/Jobs/IJob.cs | 13 ++++++++++++- src/Foundatio/Jobs/IQueueJob.cs | 18 ++++++------------ src/Foundatio/Jobs/QueueJobBase.cs | 17 +++++++++++------ src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs | 13 ++++++++++--- tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs | 7 +++---- 5 files changed, 42 insertions(+), 26 deletions(-) diff --git a/src/Foundatio/Jobs/IJob.cs b/src/Foundatio/Jobs/IJob.cs index cff7287b..77b1b487 100644 --- a/src/Foundatio/Jobs/IJob.cs +++ b/src/Foundatio/Jobs/IJob.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Foundatio.Utility; @@ -30,6 +31,10 @@ public static async Task TryRunAsync(this IJob job, CancellationToken } } + /// + /// Runs the job continuously until the cancellation token is set or the iteration limit is reached. + /// + /// Returns the iteration count for normal jobs. For queue based jobs this will be the amount of items processed successfully. public static async Task RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func> continuationCallback = null) { int iterations = 0; @@ -37,6 +42,9 @@ public static async Task RunContinuousAsync(this IJob job, TimeSpan? interv var logger = job.GetLogger(); bool isInformationLogLevelEnabled = logger.IsEnabled(LogLevel.Information); + int queueItemsProcessed = 0; + bool isQueueJob = job.GetType().GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(IQueueJob<>)); + using var _ = logger.BeginScope(new Dictionary { { "job", jobName } }); if (isInformationLogLevelEnabled) logger.LogInformation("Starting continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName); @@ -45,7 +53,10 @@ public static async Task RunContinuousAsync(this IJob job, TimeSpan? interv { var result = await job.TryRunAsync(cancellationToken).AnyContext(); logger.LogJobResult(result, jobName); + iterations++; + if (isQueueJob && result.IsSuccess) + queueItemsProcessed++; if (cancellationToken.IsCancellationRequested || (iterationLimit > -1 && iterationLimit <= iterations)) break; @@ -98,6 +109,6 @@ public static async Task RunContinuousAsync(this IJob job, TimeSpan? interv } } - return iterations; + return isQueueJob ? queueItemsProcessed : iterations; } } diff --git a/src/Foundatio/Jobs/IQueueJob.cs b/src/Foundatio/Jobs/IQueueJob.cs index bde7685e..beb261a9 100644 --- a/src/Foundatio/Jobs/IQueueJob.cs +++ b/src/Foundatio/Jobs/IQueueJob.cs @@ -20,9 +20,9 @@ public interface IQueueJob : IJob where T : class public static class QueueJobExtensions { /// - /// Will run until the wait timeout expires. If there is still data the job will be cancelled. and then will cancel the job. + /// Will run until the wait timeout expires, if expired the job will be cancelled. /// - /// The amount of queue items processed. This count will not be accurate if the job is cancelled. + /// The amount of queue items processed. public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan waitTimeout, CancellationToken cancellationToken = default) where T : class { @@ -39,17 +39,13 @@ public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeS /// /// Will wait up to thirty seconds if queue is empty, otherwise will run until the queue is empty or cancelled. /// - /// The amount of queue items processed. This count will not be accurate if the job is cancelled. - public static async Task RunUntilEmptyAsync(this IQueueJob job, CancellationToken cancellationToken = default) where T : class + /// The amount of queue items processed. + public static Task RunUntilEmptyAsync(this IQueueJob job, CancellationToken cancellationToken = default) where T : class { var logger = job.GetLogger(); - // NOTE: processed count is not accurate if the continuation callback is skipped due to cancellation. - int processed = 0; - await job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () => + return job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () => { - processed++; - // Allow abandoned items to be added in a background task. Thread.Yield(); @@ -58,8 +54,6 @@ await job.RunContinuousAsync(cancellationToken: cancellationToken, continuationC logger.LogTrace("RunUntilEmpty continuation: Queued={Queued}, Working={Working}, Abandoned={Abandoned}", stats.Queued, stats.Working, stats.Abandoned); return stats.Queued + stats.Working > 0; - }).AnyContext(); - - return processed; + }); } } diff --git a/src/Foundatio/Jobs/QueueJobBase.cs b/src/Foundatio/Jobs/QueueJobBase.cs index bf08be5c..c8fd1334 100644 --- a/src/Foundatio/Jobs/QueueJobBase.cs +++ b/src/Foundatio/Jobs/QueueJobBase.cs @@ -41,6 +41,10 @@ public virtual async Task RunAsync(CancellationToken cancellationToke { queueEntry = await _queue.Value.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext(); } + catch (OperationCanceledException) + { + return JobResult.Cancelled; + } catch (Exception ex) { return JobResult.FromException(ex, $"Error trying to dequeue message: {ex.Message}"); @@ -51,8 +55,11 @@ public virtual async Task RunAsync(CancellationToken cancellationToke public async Task ProcessAsync(IQueueEntry queueEntry, CancellationToken cancellationToken) { + if (cancellationToken.IsCancellationRequested && queueEntry == null) + return JobResult.Cancelled; + if (queueEntry == null) - return JobResult.Success; + return JobResult.CancelledWithMessage("No queue entry to process."); using var activity = StartProcessQueueEntryActivity(queueEntry); using var _ = _logger.BeginScope(s => s @@ -76,8 +83,7 @@ public async Task ProcessAsync(IQueueEntry queueEntry, Cancellatio if (lockValue == null) { await queueEntry.AbandonAsync().AnyContext(); - _logger.LogTrace("Unable to acquire queue entry lock"); - return JobResult.Success; + return JobResult.CancelledWithMessage($"Unable to acquire queue entry lock. Abandoning {_queueEntryName} queue entry: {queueEntry.Id}"); } bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); @@ -130,9 +136,8 @@ public async Task ProcessAsync(IQueueEntry queueEntry, Cancellatio protected virtual Activity StartProcessQueueEntryActivity(IQueueEntry entry) { var activity = FoundatioDiagnostics.ActivitySource.StartActivity("ProcessQueueEntry", ActivityKind.Server, entry.CorrelationId); - - if (activity == null) - return activity; + if (activity is null) + return null; if (entry.Properties != null && entry.Properties.TryGetValue("TraceState", out var traceState)) activity.TraceStateString = traceState.ToString(); diff --git a/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs b/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs index b5bf439d..4584a410 100644 --- a/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs +++ b/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs @@ -43,6 +43,10 @@ public virtual async Task RunAsync(CancellationToken cancellationToke { queueEntry = await _queue.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext(); } + catch (OperationCanceledException) + { + return JobResult.Cancelled; + } catch (Exception ex) { return JobResult.FromException(ex, $"Error trying to dequeue work item: {ex.Message}"); @@ -53,8 +57,11 @@ public virtual async Task RunAsync(CancellationToken cancellationToke public async Task ProcessAsync(IQueueEntry queueEntry, CancellationToken cancellationToken) { + if (cancellationToken.IsCancellationRequested && queueEntry == null) + return JobResult.Cancelled; + if (queueEntry == null) - return JobResult.Success; + return JobResult.CancelledWithMessage("No queue entry to process."); if (cancellationToken.IsCancellationRequested) { @@ -101,10 +108,10 @@ public async Task ProcessAsync(IQueueEntry queueEntry, if (lockValue == null) { if (handler.Log.IsEnabled(LogLevel.Information)) - handler.Log.LogInformation("Abandoning {TypeName} work item: {Id}: Unable to acquire work item lock.", queueEntry.Value.Type, queueEntry.Id); + handler.Log.LogInformation("Abandoning {TypeName} work item: {Id}: Unable to acquire work item lock", queueEntry.Value.Type, queueEntry.Id); await queueEntry.AbandonAsync().AnyContext(); - return JobResult.Success; + return JobResult.CancelledWithMessage("Unable to acquire work item lock."); } var progressCallback = new Func(async (progress, message) => diff --git a/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs b/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs index abc8e4d6..307386c1 100644 --- a/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs +++ b/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs @@ -263,7 +263,7 @@ public async Task CanRunWorkItemJobUntilEmptyWithNoEnqueuedItems() Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100))); sw.Stop(); - Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(500)); + Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(250)); var stats = await queue.GetQueueStatsAsync(); Assert.Equal(0, stats.Enqueued); @@ -291,8 +291,7 @@ await queue.EnqueueAsync(new MyWorkItem SomeData = "Test" }, true); - // NOTE: This count is wrong due to the continuation callback not firing due to the cancellation. - Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(50))); + Assert.Equal(1, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(50))); var stats = await queue.GetQueueStatsAsync(); Assert.Equal(2, stats.Enqueued); @@ -328,7 +327,7 @@ await messageBus.SubscribeAsync(status => countdown.Signal(); }); - Assert.Equal(1, await job.RunUntilEmptyAsync()); + Assert.Equal(0, await job.RunUntilEmptyAsync()); await countdown.WaitAsync(TimeSpan.FromSeconds(2)); Assert.Equal(0, countdown.CurrentCount); }