Skip to content

Commit

Permalink
Accurately report how many queue items were processed.
Browse files Browse the repository at this point in the history
  • Loading branch information
niemyjski committed Jul 25, 2024
1 parent 11abd78 commit 0fce72e
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 26 deletions.
13 changes: 12 additions & 1 deletion src/Foundatio/Jobs/IJob.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Utility;
Expand Down Expand Up @@ -30,13 +31,20 @@ public static async Task<JobResult> TryRunAsync(this IJob job, CancellationToken
}
}

/// <summary>
/// Runs the job continuously until the cancellation token is set or the iteration limit is reached.
/// </summary>
/// <returns>Returns the iteration count for normal jobs. For queue based jobs this will be the amount of items processed successfully.</returns>
public static async Task<int> RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func<Task<bool>> continuationCallback = null)
{
int iterations = 0;
string jobName = job.GetType().Name;
var logger = job.GetLogger();
bool isInformationLogLevelEnabled = logger.IsEnabled(LogLevel.Information);

int queueItemsProcessed = 0;
bool isQueueJob = job.GetType().GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(IQueueJob<>));

using var _ = logger.BeginScope(new Dictionary<string, object> { { "job", jobName } });
if (isInformationLogLevelEnabled)
logger.LogInformation("Starting continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName);
Expand All @@ -45,7 +53,10 @@ public static async Task<int> RunContinuousAsync(this IJob job, TimeSpan? interv
{
var result = await job.TryRunAsync(cancellationToken).AnyContext();
logger.LogJobResult(result, jobName);

iterations++;
if (isQueueJob && result.IsSuccess)
queueItemsProcessed++;

if (cancellationToken.IsCancellationRequested || (iterationLimit > -1 && iterationLimit <= iterations))
break;
Expand Down Expand Up @@ -98,6 +109,6 @@ public static async Task<int> RunContinuousAsync(this IJob job, TimeSpan? interv
}
}

return iterations;
return isQueueJob ? queueItemsProcessed : iterations;
}
}
18 changes: 6 additions & 12 deletions src/Foundatio/Jobs/IQueueJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public interface IQueueJob<T> : IJob where T : class
public static class QueueJobExtensions
{
/// <summary>
/// Will run until the wait timeout expires. If there is still data the job will be cancelled. and then will cancel the job.
/// Will run until the wait timeout expires, if expired the job will be cancelled.
/// </summary>
/// <returns>The amount of queue items processed. This count will not be accurate if the job is cancelled.</returns>
/// <returns>The amount of queue items processed.</returns>
public static async Task<int> RunUntilEmptyAsync<T>(this IQueueJob<T> job, TimeSpan waitTimeout,
CancellationToken cancellationToken = default) where T : class
{
Expand All @@ -39,17 +39,13 @@ public static async Task<int> RunUntilEmptyAsync<T>(this IQueueJob<T> job, TimeS
/// <summary>
/// Will wait up to thirty seconds if queue is empty, otherwise will run until the queue is empty or cancelled.
/// </summary>
/// <returns>The amount of queue items processed. This count will not be accurate if the job is cancelled.</returns>
public static async Task<int> RunUntilEmptyAsync<T>(this IQueueJob<T> job, CancellationToken cancellationToken = default) where T : class
/// <returns>The amount of queue items processed.</returns>
public static Task<int> RunUntilEmptyAsync<T>(this IQueueJob<T> job, CancellationToken cancellationToken = default) where T : class
{
var logger = job.GetLogger();

// NOTE: processed count is not accurate if the continuation callback is skipped due to cancellation.
int processed = 0;
await job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () =>
return job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () =>
{
processed++;

// Allow abandoned items to be added in a background task.
Thread.Yield();

Expand All @@ -58,8 +54,6 @@ await job.RunContinuousAsync(cancellationToken: cancellationToken, continuationC
logger.LogTrace("RunUntilEmpty continuation: Queued={Queued}, Working={Working}, Abandoned={Abandoned}", stats.Queued, stats.Working, stats.Abandoned);

return stats.Queued + stats.Working > 0;
}).AnyContext();

return processed;
});
}
}
17 changes: 11 additions & 6 deletions src/Foundatio/Jobs/QueueJobBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public virtual async Task<JobResult> RunAsync(CancellationToken cancellationToke
{
queueEntry = await _queue.Value.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext();
}
catch (OperationCanceledException)
{
return JobResult.Cancelled;
}
catch (Exception ex)
{
return JobResult.FromException(ex, $"Error trying to dequeue message: {ex.Message}");
Expand All @@ -51,8 +55,11 @@ public virtual async Task<JobResult> RunAsync(CancellationToken cancellationToke

public async Task<JobResult> ProcessAsync(IQueueEntry<T> queueEntry, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested && queueEntry == null)
return JobResult.Cancelled;

if (queueEntry == null)
return JobResult.Success;
return JobResult.CancelledWithMessage("No queue entry to process.");

using var activity = StartProcessQueueEntryActivity(queueEntry);
using var _ = _logger.BeginScope(s => s
Expand All @@ -76,8 +83,7 @@ public async Task<JobResult> ProcessAsync(IQueueEntry<T> queueEntry, Cancellatio
if (lockValue == null)
{
await queueEntry.AbandonAsync().AnyContext();
_logger.LogTrace("Unable to acquire queue entry lock");
return JobResult.Success;
return JobResult.CancelledWithMessage($"Unable to acquire queue entry lock. Abandoning {_queueEntryName} queue entry: {queueEntry.Id}");
}

bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace);
Expand Down Expand Up @@ -130,9 +136,8 @@ public async Task<JobResult> ProcessAsync(IQueueEntry<T> queueEntry, Cancellatio
protected virtual Activity StartProcessQueueEntryActivity(IQueueEntry<T> entry)
{
var activity = FoundatioDiagnostics.ActivitySource.StartActivity("ProcessQueueEntry", ActivityKind.Server, entry.CorrelationId);

if (activity == null)
return activity;
if (activity is null)
return null;

if (entry.Properties != null && entry.Properties.TryGetValue("TraceState", out var traceState))
activity.TraceStateString = traceState.ToString();
Expand Down
13 changes: 10 additions & 3 deletions src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public virtual async Task<JobResult> RunAsync(CancellationToken cancellationToke
{
queueEntry = await _queue.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext();
}
catch (OperationCanceledException)
{
return JobResult.Cancelled;
}
catch (Exception ex)
{
return JobResult.FromException(ex, $"Error trying to dequeue work item: {ex.Message}");
Expand All @@ -53,8 +57,11 @@ public virtual async Task<JobResult> RunAsync(CancellationToken cancellationToke

public async Task<JobResult> ProcessAsync(IQueueEntry<WorkItemData> queueEntry, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested && queueEntry == null)
return JobResult.Cancelled;

if (queueEntry == null)
return JobResult.Success;
return JobResult.CancelledWithMessage("No queue entry to process.");

if (cancellationToken.IsCancellationRequested)
{
Expand Down Expand Up @@ -101,10 +108,10 @@ public async Task<JobResult> ProcessAsync(IQueueEntry<WorkItemData> queueEntry,
if (lockValue == null)
{
if (handler.Log.IsEnabled(LogLevel.Information))
handler.Log.LogInformation("Abandoning {TypeName} work item: {Id}: Unable to acquire work item lock.", queueEntry.Value.Type, queueEntry.Id);
handler.Log.LogInformation("Abandoning {TypeName} work item: {Id}: Unable to acquire work item lock", queueEntry.Value.Type, queueEntry.Id);

await queueEntry.AbandonAsync().AnyContext();
return JobResult.Success;
return JobResult.CancelledWithMessage("Unable to acquire work item lock.");
}

var progressCallback = new Func<int, string, Task>(async (progress, message) =>
Expand Down
7 changes: 3 additions & 4 deletions tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public async Task CanRunWorkItemJobUntilEmptyWithNoEnqueuedItems()
Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100)));
sw.Stop();

Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(500));
Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(250));

var stats = await queue.GetQueueStatsAsync();
Assert.Equal(0, stats.Enqueued);
Expand Down Expand Up @@ -291,8 +291,7 @@ await queue.EnqueueAsync(new MyWorkItem
SomeData = "Test"
}, true);

// NOTE: This count is wrong due to the continuation callback not firing due to the cancellation.
Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(50)));
Assert.Equal(1, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(50)));

var stats = await queue.GetQueueStatsAsync();
Assert.Equal(2, stats.Enqueued);
Expand Down Expand Up @@ -328,7 +327,7 @@ await messageBus.SubscribeAsync<WorkItemStatus>(status =>
countdown.Signal();
});

Assert.Equal(1, await job.RunUntilEmptyAsync());
Assert.Equal(0, await job.RunUntilEmptyAsync());
await countdown.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(0, countdown.CurrentCount);
}
Expand Down

0 comments on commit 0fce72e

Please sign in to comment.