Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Approach for better implementation for RunUntilEmptyAsync #303

Merged
merged 8 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions src/Foundatio/Jobs/IJob.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Utility;
Expand Down Expand Up @@ -30,21 +31,32 @@ public static async Task<JobResult> TryRunAsync(this IJob job, CancellationToken
}
}

public static async Task RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func<Task<bool>> continuationCallback = null)
/// <summary>
/// Runs the job continuously until the cancellation token is set or the iteration limit is reached.
/// </summary>
/// <returns>Returns the iteration count for normal jobs. For queue based jobs this will be the amount of items processed successfully.</returns>
public static async Task<int> RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func<Task<bool>> continuationCallback = null)
{
int iterations = 0;
string jobName = job.GetType().Name;
var logger = job.GetLogger();
bool isInformationLogLevelEnabled = logger.IsEnabled(LogLevel.Information);

int queueItemsProcessed = 0;
bool isQueueJob = job.GetType().GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(IQueueJob<>));

using var _ = logger.BeginScope(new Dictionary<string, object> { { "job", jobName } });
if (logger.IsEnabled(LogLevel.Information))
if (isInformationLogLevelEnabled)
logger.LogInformation("Starting continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName);

while (!cancellationToken.IsCancellationRequested)
{
var result = await job.TryRunAsync(cancellationToken).AnyContext();
logger.LogJobResult(result, jobName);

iterations++;
if (isQueueJob && result.IsSuccess)
queueItemsProcessed++;

if (cancellationToken.IsCancellationRequested || (iterationLimit > -1 && iterationLimit <= iterations))
break;
Expand Down Expand Up @@ -72,18 +84,31 @@ public static async Task RunContinuousAsync(this IJob job, TimeSpan? interval =
if (!await continuationCallback().AnyContext())
break;
}
catch (Exception ex)
catch (Exception ex) when (logger.IsEnabled(LogLevel.Error))
{
if (logger.IsEnabled(LogLevel.Error))
logger.LogError(ex, "Error in continuation callback: {Message}", ex.Message);
logger.LogError(ex, "Error in continuation callback: {Message}", ex.Message);
}
}

logger.LogInformation("Finished continuous job type {JobName}: {IterationLimit} {Iterations}", jobName, Environment.MachineName, iterationLimit, iterations);
if (cancellationToken.IsCancellationRequested && logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("Job cancellation requested");

if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("Stopping continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName);
if (isInformationLogLevelEnabled)
{
if (iterationLimit > 0)
niemyjski marked this conversation as resolved.
Show resolved Hide resolved
{
logger.LogInformation(
"Stopping continuous job type {JobName} on machine {MachineName}: Job ran {Iterations} times (Limit={IterationLimit})",
jobName, Environment.MachineName, iterationLimit, iterations);
}
else
{
logger.LogInformation(
"Stopping continuous job type {JobName} on machine {MachineName}: Job ran {Iterations} times",
jobName, Environment.MachineName, iterations);
}
}

return isQueueJob ? queueItemsProcessed : iterations;
}
}
21 changes: 15 additions & 6 deletions src/Foundatio/Jobs/IQueueJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,31 @@ public interface IQueueJob<T> : IJob where T : class

public static class QueueJobExtensions
{
public static async Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, TimeSpan acquireTimeout,
/// <summary>
/// Will run until the queue is empty or the wait time is exceeded.
/// </summary>
/// <returns>The amount of queue items processed.</returns>
public static async Task<int> RunUntilEmptyAsync<T>(this IQueueJob<T> job, TimeSpan waitTimeout,
CancellationToken cancellationToken = default) where T : class
{
if (acquireTimeout <= TimeSpan.Zero)
throw new ArgumentException("Acquire timeout must be greater than zero", nameof(acquireTimeout));
if (waitTimeout <= TimeSpan.Zero)
throw new ArgumentException("Acquire timeout must be greater than zero", nameof(waitTimeout));

using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
linkedCancellationTokenSource.CancelAfter(acquireTimeout);
linkedCancellationTokenSource.CancelAfter(waitTimeout);

// NOTE: This has to be awaited otherwise the linkedCancellationTokenSource cancel timer will not fire.
await job.RunUntilEmptyAsync(linkedCancellationTokenSource.Token);
return await job.RunUntilEmptyAsync(linkedCancellationTokenSource.Token).AnyContext();
}

public static Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, CancellationToken cancellationToken = default) where T : class
/// <summary>
/// Will wait up to thirty seconds if queue is empty, otherwise will run until the queue is empty or cancelled.
/// </summary>
/// <returns>The amount of queue items processed.</returns>
public static Task<int> RunUntilEmptyAsync<T>(this IQueueJob<T> job, CancellationToken cancellationToken = default) where T : class
{
var logger = job.GetLogger();

return job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () =>
{
// Allow abandoned items to be added in a background task.
Expand Down
5 changes: 2 additions & 3 deletions src/Foundatio/Jobs/JobWithLockBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ public JobWithLockBase(ILoggerFactory loggerFactory = null)
public virtual async Task<JobResult> RunAsync(CancellationToken cancellationToken = default)
{
var lockValue = await GetLockAsync(cancellationToken).AnyContext();
if (lockValue == null)
if (lockValue is null)
{
_logger.LogTrace("Unable to acquire job lock");
return JobResult.Success;
return JobResult.CancelledWithMessage("Unable to acquire job lock");
}

try
Expand Down
19 changes: 12 additions & 7 deletions src/Foundatio/Jobs/QueueJobBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public virtual async Task<JobResult> RunAsync(CancellationToken cancellationToke
{
queueEntry = await _queue.Value.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext();
}
catch (OperationCanceledException)
{
return JobResult.Cancelled;
}
catch (Exception ex)
{
return JobResult.FromException(ex, $"Error trying to dequeue message: {ex.Message}");
Expand All @@ -51,8 +55,11 @@ public virtual async Task<JobResult> RunAsync(CancellationToken cancellationToke

public async Task<JobResult> ProcessAsync(IQueueEntry<T> queueEntry, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested && queueEntry == null)
return JobResult.Cancelled;

if (queueEntry == null)
return JobResult.Success;
return JobResult.CancelledWithMessage("No queue entry to process.");

using var activity = StartProcessQueueEntryActivity(queueEntry);
using var _ = _logger.BeginScope(s => s
Expand All @@ -73,11 +80,10 @@ public async Task<JobResult> ProcessAsync(IQueueEntry<T> queueEntry, Cancellatio
}

var lockValue = await GetQueueEntryLockAsync(queueEntry, cancellationToken).AnyContext();
if (lockValue == null)
if (lockValue is null)
{
await queueEntry.AbandonAsync().AnyContext();
_logger.LogTrace("Unable to acquire queue entry lock");
return JobResult.Success;
return JobResult.CancelledWithMessage($"Unable to acquire queue entry lock. Abandoning {_queueEntryName} queue entry: {queueEntry.Id}");
}

bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace);
Expand Down Expand Up @@ -130,9 +136,8 @@ public async Task<JobResult> ProcessAsync(IQueueEntry<T> queueEntry, Cancellatio
protected virtual Activity StartProcessQueueEntryActivity(IQueueEntry<T> entry)
{
var activity = FoundatioDiagnostics.ActivitySource.StartActivity("ProcessQueueEntry", ActivityKind.Server, entry.CorrelationId);

if (activity == null)
return activity;
if (activity is null)
return null;

if (entry.Properties != null && entry.Properties.TryGetValue("TraceState", out var traceState))
activity.TraceStateString = traceState.ToString();
Expand Down
20 changes: 13 additions & 7 deletions src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public virtual async Task<JobResult> RunAsync(CancellationToken cancellationToke
{
queueEntry = await _queue.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext();
}
catch (OperationCanceledException)
{
return JobResult.Cancelled;
}
catch (Exception ex)
{
return JobResult.FromException(ex, $"Error trying to dequeue work item: {ex.Message}");
Expand All @@ -53,8 +57,11 @@ public virtual async Task<JobResult> RunAsync(CancellationToken cancellationToke

public async Task<JobResult> ProcessAsync(IQueueEntry<WorkItemData> queueEntry, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested && queueEntry == null)
return JobResult.Cancelled;

if (queueEntry == null)
return JobResult.Success;
return JobResult.CancelledWithMessage("No queue entry to process.");

if (cancellationToken.IsCancellationRequested)
{
Expand Down Expand Up @@ -98,13 +105,13 @@ public async Task<JobResult> ProcessAsync(IQueueEntry<WorkItemData> queueEntry,
await ReportProgressAsync(handler, queueEntry).AnyContext();

var lockValue = await handler.GetWorkItemLockAsync(workItemData, cancellationToken).AnyContext();
if (lockValue == null)
if (lockValue is null)
{
if (handler.Log.IsEnabled(LogLevel.Information))
handler.Log.LogInformation("Abandoning {TypeName} work item: {Id}: Unable to acquire work item lock.", queueEntry.Value.Type, queueEntry.Id);
handler.Log.LogInformation("Abandoning {TypeName} work item: {Id}: Unable to acquire work item lock", queueEntry.Value.Type, queueEntry.Id);

await queueEntry.AbandonAsync().AnyContext();
return JobResult.Success;
return JobResult.CancelledWithMessage($"Unable to acquire work item lock. Abandoning {queueEntry.Value.Type} queue entry: {queueEntry.Id}");
}

var progressCallback = new Func<int, string, Task>(async (progress, message) =>
Expand Down Expand Up @@ -179,9 +186,8 @@ await Task.WhenAll(
protected virtual Activity StartProcessWorkItemActivity(IQueueEntry<WorkItemData> entry, Type workItemDataType)
{
var activity = FoundatioDiagnostics.ActivitySource.StartActivity("ProcessQueueEntry", ActivityKind.Server, entry.CorrelationId);

if (activity == null)
return activity;
if (activity is null)
return null;

if (entry.Properties != null && entry.Properties.TryGetValue("TraceState", out var traceState))
activity.TraceStateString = traceState.ToString();
Expand Down
5 changes: 2 additions & 3 deletions src/Foundatio/Lock/CacheLockProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ private Task OnLockReleasedAsync(CacheLockReleased msg, CancellationToken cancel
protected virtual Activity StartLockActivity(string resource)
{
var activity = FoundatioDiagnostics.ActivitySource.StartActivity("AcquireLock");

if (activity == null)
return activity;
if (activity is null)
return null;

activity.AddTag("resource", resource);
activity.DisplayName = $"Lock: {resource}";
Expand Down
5 changes: 2 additions & 3 deletions src/Foundatio/Messaging/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,8 @@ protected async Task SendMessageToSubscribersAsync(IMessage message)
protected virtual Activity StartHandleMessageActivity(IMessage message)
{
var activity = FoundatioDiagnostics.ActivitySource.StartActivity("HandleMessage", ActivityKind.Server, message.CorrelationId);

if (activity == null)
return activity;
if (activity is null)
return null;

if (message.Properties != null && message.Properties.TryGetValue("TraceState", out var traceState))
activity.TraceStateString = traceState.ToString();
Expand Down
40 changes: 34 additions & 6 deletions tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ await messageBus.SubscribeAsync<WorkItemStatus>(status =>
countdown.Signal();
});

await job.RunUntilEmptyAsync();
Assert.Equal(1, await job.RunUntilEmptyAsync());
await countdown.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(0, countdown.CurrentCount);
}
Expand Down Expand Up @@ -217,7 +217,7 @@ await messageBus.SubscribeAsync<WorkItemStatus>(status =>
countdown.Signal();
});

await job.RunUntilEmptyAsync();
Assert.Equal(1, await job.RunUntilEmptyAsync());
await countdown.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(0, countdown.CurrentCount);
}
Expand All @@ -242,7 +242,7 @@ await queue.EnqueueAsync(new MyWorkItem
SomeData = "Test"
}, true);

await job.RunUntilEmptyAsync();
Assert.Equal(2, await job.RunUntilEmptyAsync());
var stats = await queue.GetQueueStatsAsync();
Assert.Equal(2, stats.Enqueued);
Assert.Equal(2, stats.Dequeued);
Expand All @@ -260,17 +260,45 @@ public async Task CanRunWorkItemJobUntilEmptyWithNoEnqueuedItems()
handlerRegistry.Register<MyWorkItem>(new MyWorkItemHandler(Log));

var sw = Stopwatch.StartNew();
await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100));
Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100)));
sw.Stop();

Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(500));
Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(250));

var stats = await queue.GetQueueStatsAsync();
Assert.Equal(0, stats.Enqueued);
Assert.Equal(0, stats.Dequeued);
Assert.Equal(0, stats.Completed);
}

[Fact]
public async Task CanRunWorkItemJobUntilEmptyHandlesCancellation()
{
using var queue = new InMemoryQueue<WorkItemData>(o => o.LoggerFactory(Log));
using var messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log));
var handlerRegistry = new WorkItemHandlers();
var job = new WorkItemJob(queue, messageBus, handlerRegistry, Log);

handlerRegistry.Register<MyWorkItem>(new MyWorkItemHandler(Log));

await queue.EnqueueAsync(new MyWorkItem
{
SomeData = "Test"
}, true);

await queue.EnqueueAsync(new MyWorkItem
{
SomeData = "Test"
}, true);

Assert.Equal(1, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(50)));

var stats = await queue.GetQueueStatsAsync();
Assert.Equal(2, stats.Enqueued);
Assert.Equal(1, stats.Dequeued);
Assert.Equal(1, stats.Completed);
}

[Fact]
public async Task CanRunBadWorkItem()
{
Expand Down Expand Up @@ -299,7 +327,7 @@ await messageBus.SubscribeAsync<WorkItemStatus>(status =>
countdown.Signal();
});

await job.RunUntilEmptyAsync();
Assert.Equal(0, await job.RunUntilEmptyAsync());
await countdown.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(0, countdown.CurrentCount);
}
Expand Down