From d4acf05460576657229ea26acb0cc79296e14b45 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Mon, 22 Jul 2024 10:54:35 -0500 Subject: [PATCH 1/8] Approach for better implementation for RunUntilEmptyAsync --- src/Foundatio/Jobs/IQueueJob.cs | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/Foundatio/Jobs/IQueueJob.cs b/src/Foundatio/Jobs/IQueueJob.cs index 944eaf8d..8125ddd1 100644 --- a/src/Foundatio/Jobs/IQueueJob.cs +++ b/src/Foundatio/Jobs/IQueueJob.cs @@ -19,6 +19,10 @@ public interface IQueueJob : IJob where T : class public static class QueueJobExtensions { + /// + /// Will run wait for the acquire timeout to expire waiting if there are no queued items. It will then run until the queue is empty. + /// NOTE: The acquire timeout will not be reset until after the first job is processed, + /// public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan acquireTimeout, CancellationToken cancellationToken = default) where T : class { @@ -27,9 +31,29 @@ public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan a using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); linkedCancellationTokenSource.CancelAfter(acquireTimeout); + bool hasAcquireTimeout = true; + + var logger = job.GetLogger(); // NOTE: This has to be awaited otherwise the linkedCancellationTokenSource cancel timer will not fire. - await job.RunUntilEmptyAsync(linkedCancellationTokenSource.Token); + await job.RunContinuousAsync(cancellationToken: linkedCancellationTokenSource.Token, continuationCallback: async () => + { + // Stop the Cancel After + if (hasAcquireTimeout) + { + linkedCancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan); + hasAcquireTimeout = false; + } + + // Allow abandoned items to be added in a background task. + Thread.Yield(); + + var stats = await job.Queue.GetQueueStatsAsync().AnyContext(); + if (logger.IsEnabled(LogLevel.Trace)) + logger.LogTrace("RunUntilEmpty continuation: Queued={Queued}, Working={Working}, Abandoned={Abandoned}", stats.Queued, stats.Working, stats.Abandoned); + + return stats.Queued + stats.Working > 0; + }); } public static Task RunUntilEmptyAsync(this IQueueJob job, CancellationToken cancellationToken = default) where T : class From 1cad305224d2d9eaa8ed648a2c49061d547a11e0 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Mon, 22 Jul 2024 11:18:38 -0500 Subject: [PATCH 2/8] PR Feedback --- src/Foundatio/Jobs/IQueueJob.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Foundatio/Jobs/IQueueJob.cs b/src/Foundatio/Jobs/IQueueJob.cs index 8125ddd1..62ea072d 100644 --- a/src/Foundatio/Jobs/IQueueJob.cs +++ b/src/Foundatio/Jobs/IQueueJob.cs @@ -23,14 +23,14 @@ public static class QueueJobExtensions /// Will run wait for the acquire timeout to expire waiting if there are no queued items. It will then run until the queue is empty. /// NOTE: The acquire timeout will not be reset until after the first job is processed, /// - public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan acquireTimeout, + public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan waitTimeout, CancellationToken cancellationToken = default) where T : class { - if (acquireTimeout <= TimeSpan.Zero) - throw new ArgumentException("Acquire timeout must be greater than zero", nameof(acquireTimeout)); + if (waitTimeout <= TimeSpan.Zero) + throw new ArgumentException("Acquire timeout must be greater than zero", nameof(waitTimeout)); using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - linkedCancellationTokenSource.CancelAfter(acquireTimeout); + linkedCancellationTokenSource.CancelAfter(waitTimeout); bool hasAcquireTimeout = true; var logger = job.GetLogger(); From 8814fbe49286b53b7fcdda84ebf342673a2dd3da Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Mon, 22 Jul 2024 11:19:21 -0500 Subject: [PATCH 3/8] More pr feedback --- src/Foundatio/Jobs/IQueueJob.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Foundatio/Jobs/IQueueJob.cs b/src/Foundatio/Jobs/IQueueJob.cs index 62ea072d..89d7f293 100644 --- a/src/Foundatio/Jobs/IQueueJob.cs +++ b/src/Foundatio/Jobs/IQueueJob.cs @@ -20,8 +20,8 @@ public interface IQueueJob : IJob where T : class public static class QueueJobExtensions { /// - /// Will run wait for the acquire timeout to expire waiting if there are no queued items. It will then run until the queue is empty. - /// NOTE: The acquire timeout will not be reset until after the first job is processed, + /// Will run wait for the wait timeout to expire waiting if there are no queued items. It will then run until the queue is empty. + /// NOTE: The wait timeout will not be reset until after the first job is processed. /// public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan waitTimeout, CancellationToken cancellationToken = default) where T : class From 126f102d20ac77787fc2e67d22b20fd146a476f4 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Thu, 25 Jul 2024 15:11:09 -0500 Subject: [PATCH 4/8] PR Feedback --- src/Foundatio/Jobs/IJob.cs | 30 +++++++++---- src/Foundatio/Jobs/IQueueJob.cs | 45 ++++++++----------- .../Foundatio.Tests/Jobs/WorkItemJobTests.cs | 39 +++++++++++++--- 3 files changed, 74 insertions(+), 40 deletions(-) diff --git a/src/Foundatio/Jobs/IJob.cs b/src/Foundatio/Jobs/IJob.cs index 5d402ed2..cff7287b 100644 --- a/src/Foundatio/Jobs/IJob.cs +++ b/src/Foundatio/Jobs/IJob.cs @@ -30,14 +30,15 @@ public static async Task TryRunAsync(this IJob job, CancellationToken } } - public static async Task RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func> continuationCallback = null) + public static async Task RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func> continuationCallback = null) { int iterations = 0; string jobName = job.GetType().Name; var logger = job.GetLogger(); + bool isInformationLogLevelEnabled = logger.IsEnabled(LogLevel.Information); using var _ = logger.BeginScope(new Dictionary { { "job", jobName } }); - if (logger.IsEnabled(LogLevel.Information)) + if (isInformationLogLevelEnabled) logger.LogInformation("Starting continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName); while (!cancellationToken.IsCancellationRequested) @@ -72,18 +73,31 @@ public static async Task RunContinuousAsync(this IJob job, TimeSpan? interval = if (!await continuationCallback().AnyContext()) break; } - catch (Exception ex) + catch (Exception ex) when (logger.IsEnabled(LogLevel.Error)) { - if (logger.IsEnabled(LogLevel.Error)) - logger.LogError(ex, "Error in continuation callback: {Message}", ex.Message); + logger.LogError(ex, "Error in continuation callback: {Message}", ex.Message); } } - logger.LogInformation("Finished continuous job type {JobName}: {IterationLimit} {Iterations}", jobName, Environment.MachineName, iterationLimit, iterations); if (cancellationToken.IsCancellationRequested && logger.IsEnabled(LogLevel.Trace)) logger.LogTrace("Job cancellation requested"); - if (logger.IsEnabled(LogLevel.Information)) - logger.LogInformation("Stopping continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName); + if (isInformationLogLevelEnabled) + { + if (iterationLimit > 0) + { + logger.LogInformation( + "Stopping continuous job type {JobName} on machine {MachineName}: Job ran {Iterations} times (Limit={IterationLimit})", + jobName, Environment.MachineName, iterationLimit, iterations); + } + else + { + logger.LogInformation( + "Stopping continuous job type {JobName} on machine {MachineName}: Job ran {Iterations} times", + jobName, Environment.MachineName, iterations); + } + } + + return iterations; } } diff --git a/src/Foundatio/Jobs/IQueueJob.cs b/src/Foundatio/Jobs/IQueueJob.cs index 89d7f293..f2116319 100644 --- a/src/Foundatio/Jobs/IQueueJob.cs +++ b/src/Foundatio/Jobs/IQueueJob.cs @@ -20,10 +20,10 @@ public interface IQueueJob : IJob where T : class public static class QueueJobExtensions { /// - /// Will run wait for the wait timeout to expire waiting if there are no queued items. It will then run until the queue is empty. - /// NOTE: The wait timeout will not be reset until after the first job is processed. + /// Will run until the wait timeout expires. If there is still data the job will be cancelled. and then will cancel the job. /// - public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan waitTimeout, + /// The amount of queue items processed. + public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan waitTimeout, CancellationToken cancellationToken = default) where T : class { if (waitTimeout <= TimeSpan.Zero) @@ -31,36 +31,25 @@ public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan w using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); linkedCancellationTokenSource.CancelAfter(waitTimeout); - bool hasAcquireTimeout = true; - - var logger = job.GetLogger(); // NOTE: This has to be awaited otherwise the linkedCancellationTokenSource cancel timer will not fire. - await job.RunContinuousAsync(cancellationToken: linkedCancellationTokenSource.Token, continuationCallback: async () => - { - // Stop the Cancel After - if (hasAcquireTimeout) - { - linkedCancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan); - hasAcquireTimeout = false; - } - - // Allow abandoned items to be added in a background task. - Thread.Yield(); - - var stats = await job.Queue.GetQueueStatsAsync().AnyContext(); - if (logger.IsEnabled(LogLevel.Trace)) - logger.LogTrace("RunUntilEmpty continuation: Queued={Queued}, Working={Working}, Abandoned={Abandoned}", stats.Queued, stats.Working, stats.Abandoned); - - return stats.Queued + stats.Working > 0; - }); + return await job.RunUntilEmptyAsync(linkedCancellationTokenSource.Token).AnyContext(); } - public static Task RunUntilEmptyAsync(this IQueueJob job, CancellationToken cancellationToken = default) where T : class + /// + /// Will wait up to thirty seconds if queue is empty, otherwise will run until the queue is empty or cancelled. + /// + /// The amount of queue items processed. This count will not be accurate if the job is cancelled. + public static async Task RunUntilEmptyAsync(this IQueueJob job, CancellationToken cancellationToken = default) where T : class { var logger = job.GetLogger(); - return job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () => + + // NOTE: processed count is not accurate if the continuation callback is skipped due to cancellation. + int processed = 0; + await job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () => { + processed++; + // Allow abandoned items to be added in a background task. Thread.Yield(); @@ -69,6 +58,8 @@ public static Task RunUntilEmptyAsync(this IQueueJob job, CancellationToke logger.LogTrace("RunUntilEmpty continuation: Queued={Queued}, Working={Working}, Abandoned={Abandoned}", stats.Queued, stats.Working, stats.Abandoned); return stats.Queued + stats.Working > 0; - }); + }).AnyContext(); + + return processed; } } diff --git a/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs b/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs index 2f448dff..abc8e4d6 100644 --- a/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs +++ b/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs @@ -179,7 +179,7 @@ await messageBus.SubscribeAsync(status => countdown.Signal(); }); - await job.RunUntilEmptyAsync(); + Assert.Equal(1, await job.RunUntilEmptyAsync()); await countdown.WaitAsync(TimeSpan.FromSeconds(2)); Assert.Equal(0, countdown.CurrentCount); } @@ -217,7 +217,7 @@ await messageBus.SubscribeAsync(status => countdown.Signal(); }); - await job.RunUntilEmptyAsync(); + Assert.Equal(1, await job.RunUntilEmptyAsync()); await countdown.WaitAsync(TimeSpan.FromSeconds(2)); Assert.Equal(0, countdown.CurrentCount); } @@ -242,7 +242,7 @@ await queue.EnqueueAsync(new MyWorkItem SomeData = "Test" }, true); - await job.RunUntilEmptyAsync(); + Assert.Equal(2, await job.RunUntilEmptyAsync()); var stats = await queue.GetQueueStatsAsync(); Assert.Equal(2, stats.Enqueued); Assert.Equal(2, stats.Dequeued); @@ -260,7 +260,7 @@ public async Task CanRunWorkItemJobUntilEmptyWithNoEnqueuedItems() handlerRegistry.Register(new MyWorkItemHandler(Log)); var sw = Stopwatch.StartNew(); - await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100)); + Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100))); sw.Stop(); Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(500)); @@ -271,6 +271,35 @@ public async Task CanRunWorkItemJobUntilEmptyWithNoEnqueuedItems() Assert.Equal(0, stats.Completed); } + [Fact] + public async Task CanRunWorkItemJobUntilEmptyHandlesCancellation() + { + using var queue = new InMemoryQueue(o => o.LoggerFactory(Log)); + using var messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log)); + var handlerRegistry = new WorkItemHandlers(); + var job = new WorkItemJob(queue, messageBus, handlerRegistry, Log); + + handlerRegistry.Register(new MyWorkItemHandler(Log)); + + await queue.EnqueueAsync(new MyWorkItem + { + SomeData = "Test" + }, true); + + await queue.EnqueueAsync(new MyWorkItem + { + SomeData = "Test" + }, true); + + // NOTE: This count is wrong due to the continuation callback not firing due to the cancellation. + Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(50))); + + var stats = await queue.GetQueueStatsAsync(); + Assert.Equal(2, stats.Enqueued); + Assert.Equal(1, stats.Dequeued); + Assert.Equal(1, stats.Completed); + } + [Fact] public async Task CanRunBadWorkItem() { @@ -299,7 +328,7 @@ await messageBus.SubscribeAsync(status => countdown.Signal(); }); - await job.RunUntilEmptyAsync(); + Assert.Equal(1, await job.RunUntilEmptyAsync()); await countdown.WaitAsync(TimeSpan.FromSeconds(2)); Assert.Equal(0, countdown.CurrentCount); } From 11abd78aca7ef717120cb12f3df808bd209e1401 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Thu, 25 Jul 2024 15:12:15 -0500 Subject: [PATCH 5/8] Updated docs --- src/Foundatio/Jobs/IQueueJob.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Foundatio/Jobs/IQueueJob.cs b/src/Foundatio/Jobs/IQueueJob.cs index f2116319..bde7685e 100644 --- a/src/Foundatio/Jobs/IQueueJob.cs +++ b/src/Foundatio/Jobs/IQueueJob.cs @@ -22,7 +22,7 @@ public static class QueueJobExtensions /// /// Will run until the wait timeout expires. If there is still data the job will be cancelled. and then will cancel the job. /// - /// The amount of queue items processed. + /// The amount of queue items processed. This count will not be accurate if the job is cancelled. public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan waitTimeout, CancellationToken cancellationToken = default) where T : class { From 0fce72ea80283039eba306427c0ed56c039b1a1a Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Thu, 25 Jul 2024 15:46:12 -0500 Subject: [PATCH 6/8] Accurately report how many queue items were processed. --- src/Foundatio/Jobs/IJob.cs | 13 ++++++++++++- src/Foundatio/Jobs/IQueueJob.cs | 18 ++++++------------ src/Foundatio/Jobs/QueueJobBase.cs | 17 +++++++++++------ src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs | 13 ++++++++++--- tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs | 7 +++---- 5 files changed, 42 insertions(+), 26 deletions(-) diff --git a/src/Foundatio/Jobs/IJob.cs b/src/Foundatio/Jobs/IJob.cs index cff7287b..77b1b487 100644 --- a/src/Foundatio/Jobs/IJob.cs +++ b/src/Foundatio/Jobs/IJob.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Foundatio.Utility; @@ -30,6 +31,10 @@ public static async Task TryRunAsync(this IJob job, CancellationToken } } + /// + /// Runs the job continuously until the cancellation token is set or the iteration limit is reached. + /// + /// Returns the iteration count for normal jobs. For queue based jobs this will be the amount of items processed successfully. public static async Task RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func> continuationCallback = null) { int iterations = 0; @@ -37,6 +42,9 @@ public static async Task RunContinuousAsync(this IJob job, TimeSpan? interv var logger = job.GetLogger(); bool isInformationLogLevelEnabled = logger.IsEnabled(LogLevel.Information); + int queueItemsProcessed = 0; + bool isQueueJob = job.GetType().GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(IQueueJob<>)); + using var _ = logger.BeginScope(new Dictionary { { "job", jobName } }); if (isInformationLogLevelEnabled) logger.LogInformation("Starting continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName); @@ -45,7 +53,10 @@ public static async Task RunContinuousAsync(this IJob job, TimeSpan? interv { var result = await job.TryRunAsync(cancellationToken).AnyContext(); logger.LogJobResult(result, jobName); + iterations++; + if (isQueueJob && result.IsSuccess) + queueItemsProcessed++; if (cancellationToken.IsCancellationRequested || (iterationLimit > -1 && iterationLimit <= iterations)) break; @@ -98,6 +109,6 @@ public static async Task RunContinuousAsync(this IJob job, TimeSpan? interv } } - return iterations; + return isQueueJob ? queueItemsProcessed : iterations; } } diff --git a/src/Foundatio/Jobs/IQueueJob.cs b/src/Foundatio/Jobs/IQueueJob.cs index bde7685e..beb261a9 100644 --- a/src/Foundatio/Jobs/IQueueJob.cs +++ b/src/Foundatio/Jobs/IQueueJob.cs @@ -20,9 +20,9 @@ public interface IQueueJob : IJob where T : class public static class QueueJobExtensions { /// - /// Will run until the wait timeout expires. If there is still data the job will be cancelled. and then will cancel the job. + /// Will run until the wait timeout expires, if expired the job will be cancelled. /// - /// The amount of queue items processed. This count will not be accurate if the job is cancelled. + /// The amount of queue items processed. public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan waitTimeout, CancellationToken cancellationToken = default) where T : class { @@ -39,17 +39,13 @@ public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeS /// /// Will wait up to thirty seconds if queue is empty, otherwise will run until the queue is empty or cancelled. /// - /// The amount of queue items processed. This count will not be accurate if the job is cancelled. - public static async Task RunUntilEmptyAsync(this IQueueJob job, CancellationToken cancellationToken = default) where T : class + /// The amount of queue items processed. + public static Task RunUntilEmptyAsync(this IQueueJob job, CancellationToken cancellationToken = default) where T : class { var logger = job.GetLogger(); - // NOTE: processed count is not accurate if the continuation callback is skipped due to cancellation. - int processed = 0; - await job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () => + return job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () => { - processed++; - // Allow abandoned items to be added in a background task. Thread.Yield(); @@ -58,8 +54,6 @@ await job.RunContinuousAsync(cancellationToken: cancellationToken, continuationC logger.LogTrace("RunUntilEmpty continuation: Queued={Queued}, Working={Working}, Abandoned={Abandoned}", stats.Queued, stats.Working, stats.Abandoned); return stats.Queued + stats.Working > 0; - }).AnyContext(); - - return processed; + }); } } diff --git a/src/Foundatio/Jobs/QueueJobBase.cs b/src/Foundatio/Jobs/QueueJobBase.cs index bf08be5c..c8fd1334 100644 --- a/src/Foundatio/Jobs/QueueJobBase.cs +++ b/src/Foundatio/Jobs/QueueJobBase.cs @@ -41,6 +41,10 @@ public virtual async Task RunAsync(CancellationToken cancellationToke { queueEntry = await _queue.Value.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext(); } + catch (OperationCanceledException) + { + return JobResult.Cancelled; + } catch (Exception ex) { return JobResult.FromException(ex, $"Error trying to dequeue message: {ex.Message}"); @@ -51,8 +55,11 @@ public virtual async Task RunAsync(CancellationToken cancellationToke public async Task ProcessAsync(IQueueEntry queueEntry, CancellationToken cancellationToken) { + if (cancellationToken.IsCancellationRequested && queueEntry == null) + return JobResult.Cancelled; + if (queueEntry == null) - return JobResult.Success; + return JobResult.CancelledWithMessage("No queue entry to process."); using var activity = StartProcessQueueEntryActivity(queueEntry); using var _ = _logger.BeginScope(s => s @@ -76,8 +83,7 @@ public async Task ProcessAsync(IQueueEntry queueEntry, Cancellatio if (lockValue == null) { await queueEntry.AbandonAsync().AnyContext(); - _logger.LogTrace("Unable to acquire queue entry lock"); - return JobResult.Success; + return JobResult.CancelledWithMessage($"Unable to acquire queue entry lock. Abandoning {_queueEntryName} queue entry: {queueEntry.Id}"); } bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); @@ -130,9 +136,8 @@ public async Task ProcessAsync(IQueueEntry queueEntry, Cancellatio protected virtual Activity StartProcessQueueEntryActivity(IQueueEntry entry) { var activity = FoundatioDiagnostics.ActivitySource.StartActivity("ProcessQueueEntry", ActivityKind.Server, entry.CorrelationId); - - if (activity == null) - return activity; + if (activity is null) + return null; if (entry.Properties != null && entry.Properties.TryGetValue("TraceState", out var traceState)) activity.TraceStateString = traceState.ToString(); diff --git a/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs b/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs index b5bf439d..4584a410 100644 --- a/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs +++ b/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs @@ -43,6 +43,10 @@ public virtual async Task RunAsync(CancellationToken cancellationToke { queueEntry = await _queue.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext(); } + catch (OperationCanceledException) + { + return JobResult.Cancelled; + } catch (Exception ex) { return JobResult.FromException(ex, $"Error trying to dequeue work item: {ex.Message}"); @@ -53,8 +57,11 @@ public virtual async Task RunAsync(CancellationToken cancellationToke public async Task ProcessAsync(IQueueEntry queueEntry, CancellationToken cancellationToken) { + if (cancellationToken.IsCancellationRequested && queueEntry == null) + return JobResult.Cancelled; + if (queueEntry == null) - return JobResult.Success; + return JobResult.CancelledWithMessage("No queue entry to process."); if (cancellationToken.IsCancellationRequested) { @@ -101,10 +108,10 @@ public async Task ProcessAsync(IQueueEntry queueEntry, if (lockValue == null) { if (handler.Log.IsEnabled(LogLevel.Information)) - handler.Log.LogInformation("Abandoning {TypeName} work item: {Id}: Unable to acquire work item lock.", queueEntry.Value.Type, queueEntry.Id); + handler.Log.LogInformation("Abandoning {TypeName} work item: {Id}: Unable to acquire work item lock", queueEntry.Value.Type, queueEntry.Id); await queueEntry.AbandonAsync().AnyContext(); - return JobResult.Success; + return JobResult.CancelledWithMessage("Unable to acquire work item lock."); } var progressCallback = new Func(async (progress, message) => diff --git a/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs b/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs index abc8e4d6..307386c1 100644 --- a/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs +++ b/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs @@ -263,7 +263,7 @@ public async Task CanRunWorkItemJobUntilEmptyWithNoEnqueuedItems() Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100))); sw.Stop(); - Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(500)); + Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(250)); var stats = await queue.GetQueueStatsAsync(); Assert.Equal(0, stats.Enqueued); @@ -291,8 +291,7 @@ await queue.EnqueueAsync(new MyWorkItem SomeData = "Test" }, true); - // NOTE: This count is wrong due to the continuation callback not firing due to the cancellation. - Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(50))); + Assert.Equal(1, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(50))); var stats = await queue.GetQueueStatsAsync(); Assert.Equal(2, stats.Enqueued); @@ -328,7 +327,7 @@ await messageBus.SubscribeAsync(status => countdown.Signal(); }); - Assert.Equal(1, await job.RunUntilEmptyAsync()); + Assert.Equal(0, await job.RunUntilEmptyAsync()); await countdown.WaitAsync(TimeSpan.FromSeconds(2)); Assert.Equal(0, countdown.CurrentCount); } From 8482bc66ec75f0a5a2935e24d78e5c2a63216a42 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Thu, 25 Jul 2024 15:53:20 -0500 Subject: [PATCH 7/8] Ensure we return the proper job result --- src/Foundatio/Jobs/JobWithLockBase.cs | 5 ++--- src/Foundatio/Jobs/QueueJobBase.cs | 2 +- src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs | 9 ++++----- src/Foundatio/Lock/CacheLockProvider.cs | 5 ++--- src/Foundatio/Messaging/MessageBusBase.cs | 5 ++--- 5 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/Foundatio/Jobs/JobWithLockBase.cs b/src/Foundatio/Jobs/JobWithLockBase.cs index fa7a21fd..d81317e9 100644 --- a/src/Foundatio/Jobs/JobWithLockBase.cs +++ b/src/Foundatio/Jobs/JobWithLockBase.cs @@ -23,10 +23,9 @@ public JobWithLockBase(ILoggerFactory loggerFactory = null) public virtual async Task RunAsync(CancellationToken cancellationToken = default) { var lockValue = await GetLockAsync(cancellationToken).AnyContext(); - if (lockValue == null) + if (lockValue is null) { - _logger.LogTrace("Unable to acquire job lock"); - return JobResult.Success; + return JobResult.CancelledWithMessage("Unable to acquire job lock"); } try diff --git a/src/Foundatio/Jobs/QueueJobBase.cs b/src/Foundatio/Jobs/QueueJobBase.cs index c8fd1334..60d23f35 100644 --- a/src/Foundatio/Jobs/QueueJobBase.cs +++ b/src/Foundatio/Jobs/QueueJobBase.cs @@ -80,7 +80,7 @@ public async Task ProcessAsync(IQueueEntry queueEntry, Cancellatio } var lockValue = await GetQueueEntryLockAsync(queueEntry, cancellationToken).AnyContext(); - if (lockValue == null) + if (lockValue is null) { await queueEntry.AbandonAsync().AnyContext(); return JobResult.CancelledWithMessage($"Unable to acquire queue entry lock. Abandoning {_queueEntryName} queue entry: {queueEntry.Id}"); diff --git a/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs b/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs index 4584a410..6ca18285 100644 --- a/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs +++ b/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs @@ -105,13 +105,13 @@ public async Task ProcessAsync(IQueueEntry queueEntry, await ReportProgressAsync(handler, queueEntry).AnyContext(); var lockValue = await handler.GetWorkItemLockAsync(workItemData, cancellationToken).AnyContext(); - if (lockValue == null) + if (lockValue is null) { if (handler.Log.IsEnabled(LogLevel.Information)) handler.Log.LogInformation("Abandoning {TypeName} work item: {Id}: Unable to acquire work item lock", queueEntry.Value.Type, queueEntry.Id); await queueEntry.AbandonAsync().AnyContext(); - return JobResult.CancelledWithMessage("Unable to acquire work item lock."); + return JobResult.CancelledWithMessage($"Unable to acquire work item lock. Abandoning {queueEntry.Value.Type} queue entry: {queueEntry.Id}"); } var progressCallback = new Func(async (progress, message) => @@ -186,9 +186,8 @@ await Task.WhenAll( protected virtual Activity StartProcessWorkItemActivity(IQueueEntry entry, Type workItemDataType) { var activity = FoundatioDiagnostics.ActivitySource.StartActivity("ProcessQueueEntry", ActivityKind.Server, entry.CorrelationId); - - if (activity == null) - return activity; + if (activity is null) + return null; if (entry.Properties != null && entry.Properties.TryGetValue("TraceState", out var traceState)) activity.TraceStateString = traceState.ToString(); diff --git a/src/Foundatio/Lock/CacheLockProvider.cs b/src/Foundatio/Lock/CacheLockProvider.cs index 499c1c69..e8476c03 100644 --- a/src/Foundatio/Lock/CacheLockProvider.cs +++ b/src/Foundatio/Lock/CacheLockProvider.cs @@ -68,9 +68,8 @@ private Task OnLockReleasedAsync(CacheLockReleased msg, CancellationToken cancel protected virtual Activity StartLockActivity(string resource) { var activity = FoundatioDiagnostics.ActivitySource.StartActivity("AcquireLock"); - - if (activity == null) - return activity; + if (activity is null) + return null; activity.AddTag("resource", resource); activity.DisplayName = $"Lock: {resource}"; diff --git a/src/Foundatio/Messaging/MessageBusBase.cs b/src/Foundatio/Messaging/MessageBusBase.cs index f0e1c63d..a2d303aa 100644 --- a/src/Foundatio/Messaging/MessageBusBase.cs +++ b/src/Foundatio/Messaging/MessageBusBase.cs @@ -296,9 +296,8 @@ protected async Task SendMessageToSubscribersAsync(IMessage message) protected virtual Activity StartHandleMessageActivity(IMessage message) { var activity = FoundatioDiagnostics.ActivitySource.StartActivity("HandleMessage", ActivityKind.Server, message.CorrelationId); - - if (activity == null) - return activity; + if (activity is null) + return null; if (message.Properties != null && message.Properties.TryGetValue("TraceState", out var traceState)) activity.TraceStateString = traceState.ToString(); From e4940ae5414e20f5eca69aec2c5fc17db51d83b2 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Fri, 26 Jul 2024 05:48:29 -0500 Subject: [PATCH 8/8] Update src/Foundatio/Jobs/IQueueJob.cs Co-authored-by: Eric J. Smith --- src/Foundatio/Jobs/IQueueJob.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Foundatio/Jobs/IQueueJob.cs b/src/Foundatio/Jobs/IQueueJob.cs index beb261a9..6384c9b7 100644 --- a/src/Foundatio/Jobs/IQueueJob.cs +++ b/src/Foundatio/Jobs/IQueueJob.cs @@ -20,7 +20,7 @@ public interface IQueueJob : IJob where T : class public static class QueueJobExtensions { /// - /// Will run until the wait timeout expires, if expired the job will be cancelled. + /// Will run until the queue is empty or the wait time is exceeded. /// /// The amount of queue items processed. public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan waitTimeout,