diff --git a/src/Foundatio/Jobs/IQueueJob.cs b/src/Foundatio/Jobs/IQueueJob.cs index 944eaf8d..8125ddd1 100644 --- a/src/Foundatio/Jobs/IQueueJob.cs +++ b/src/Foundatio/Jobs/IQueueJob.cs @@ -19,6 +19,10 @@ public interface IQueueJob : IJob where T : class public static class QueueJobExtensions { + /// + /// Will run wait for the acquire timeout to expire waiting if there are no queued items. It will then run until the queue is empty. + /// NOTE: The acquire timeout will not be reset until after the first job is processed, + /// public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan acquireTimeout, CancellationToken cancellationToken = default) where T : class { @@ -27,9 +31,29 @@ public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan a using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); linkedCancellationTokenSource.CancelAfter(acquireTimeout); + bool hasAcquireTimeout = true; + + var logger = job.GetLogger(); // NOTE: This has to be awaited otherwise the linkedCancellationTokenSource cancel timer will not fire. - await job.RunUntilEmptyAsync(linkedCancellationTokenSource.Token); + await job.RunContinuousAsync(cancellationToken: linkedCancellationTokenSource.Token, continuationCallback: async () => + { + // Stop the Cancel After + if (hasAcquireTimeout) + { + linkedCancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan); + hasAcquireTimeout = false; + } + + // Allow abandoned items to be added in a background task. + Thread.Yield(); + + var stats = await job.Queue.GetQueueStatsAsync().AnyContext(); + if (logger.IsEnabled(LogLevel.Trace)) + logger.LogTrace("RunUntilEmpty continuation: Queued={Queued}, Working={Working}, Abandoned={Abandoned}", stats.Queued, stats.Working, stats.Abandoned); + + return stats.Queued + stats.Working > 0; + }); } public static Task RunUntilEmptyAsync(this IQueueJob job, CancellationToken cancellationToken = default) where T : class