Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Approach for better implementation for RunUntilEmptyAsync #303

Merged
merged 8 commits into from
Jul 26, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/Foundatio/Jobs/IQueueJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ public interface IQueueJob<T> : IJob where T : class

public static class QueueJobExtensions
{
/// <summary>
/// Will run wait for the acquire timeout to expire waiting if there are no queued items. It will then run until the queue is empty.
/// NOTE: The acquire timeout will not be reset until after the first job is processed,
/// </summary>
public static async Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, TimeSpan acquireTimeout,
niemyjski marked this conversation as resolved.
Show resolved Hide resolved
CancellationToken cancellationToken = default) where T : class
{
Expand All @@ -27,9 +31,29 @@ public static async Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, TimeSpan a

using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
linkedCancellationTokenSource.CancelAfter(acquireTimeout);
bool hasAcquireTimeout = true;

var logger = job.GetLogger();

// NOTE: This has to be awaited otherwise the linkedCancellationTokenSource cancel timer will not fire.
await job.RunUntilEmptyAsync(linkedCancellationTokenSource.Token);
await job.RunContinuousAsync(cancellationToken: linkedCancellationTokenSource.Token, continuationCallback: async () =>
niemyjski marked this conversation as resolved.
Show resolved Hide resolved
{
// Stop the Cancel After
if (hasAcquireTimeout)
{
linkedCancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan);
hasAcquireTimeout = false;
}

// Allow abandoned items to be added in a background task.
Thread.Yield();

var stats = await job.Queue.GetQueueStatsAsync().AnyContext();
if (logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("RunUntilEmpty continuation: Queued={Queued}, Working={Working}, Abandoned={Abandoned}", stats.Queued, stats.Working, stats.Abandoned);

return stats.Queued + stats.Working > 0;
});
}

public static Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, CancellationToken cancellationToken = default) where T : class
Expand Down