Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Approach for better implementation for RunUntilEmptyAsync #303

Merged
merged 8 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions src/Foundatio/Jobs/IJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ public static async Task<JobResult> TryRunAsync(this IJob job, CancellationToken
}
}

public static async Task RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func<Task<bool>> continuationCallback = null)
public static async Task<int> RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func<Task<bool>> continuationCallback = null)
{
int iterations = 0;
string jobName = job.GetType().Name;
var logger = job.GetLogger();
bool isInformationLogLevelEnabled = logger.IsEnabled(LogLevel.Information);

using var _ = logger.BeginScope(new Dictionary<string, object> { { "job", jobName } });
if (logger.IsEnabled(LogLevel.Information))
if (isInformationLogLevelEnabled)
logger.LogInformation("Starting continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName);

while (!cancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -72,18 +73,31 @@ public static async Task RunContinuousAsync(this IJob job, TimeSpan? interval =
if (!await continuationCallback().AnyContext())
break;
}
catch (Exception ex)
catch (Exception ex) when (logger.IsEnabled(LogLevel.Error))
{
if (logger.IsEnabled(LogLevel.Error))
logger.LogError(ex, "Error in continuation callback: {Message}", ex.Message);
logger.LogError(ex, "Error in continuation callback: {Message}", ex.Message);
}
}

logger.LogInformation("Finished continuous job type {JobName}: {IterationLimit} {Iterations}", jobName, Environment.MachineName, iterationLimit, iterations);
if (cancellationToken.IsCancellationRequested && logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("Job cancellation requested");

if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("Stopping continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName);
if (isInformationLogLevelEnabled)
{
if (iterationLimit > 0)
niemyjski marked this conversation as resolved.
Show resolved Hide resolved
{
logger.LogInformation(
"Stopping continuous job type {JobName} on machine {MachineName}: Job ran {Iterations} times (Limit={IterationLimit})",
jobName, Environment.MachineName, iterationLimit, iterations);
}
else
{
logger.LogInformation(
"Stopping continuous job type {JobName} on machine {MachineName}: Job ran {Iterations} times",
jobName, Environment.MachineName, iterations);
}
}

return iterations;
}
}
31 changes: 23 additions & 8 deletions src/Foundatio/Jobs/IQueueJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,37 @@ public interface IQueueJob<T> : IJob where T : class

public static class QueueJobExtensions
{
public static async Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, TimeSpan acquireTimeout,
/// <summary>
/// Will run until the wait timeout expires. If there is still data the job will be cancelled. and then will cancel the job.
/// </summary>
/// <returns>The amount of queue items processed. This count will not be accurate if the job is cancelled.</returns>
public static async Task<int> RunUntilEmptyAsync<T>(this IQueueJob<T> job, TimeSpan waitTimeout,
CancellationToken cancellationToken = default) where T : class
{
if (acquireTimeout <= TimeSpan.Zero)
throw new ArgumentException("Acquire timeout must be greater than zero", nameof(acquireTimeout));
if (waitTimeout <= TimeSpan.Zero)
throw new ArgumentException("Acquire timeout must be greater than zero", nameof(waitTimeout));

using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
linkedCancellationTokenSource.CancelAfter(acquireTimeout);
linkedCancellationTokenSource.CancelAfter(waitTimeout);

// NOTE: This has to be awaited otherwise the linkedCancellationTokenSource cancel timer will not fire.
await job.RunUntilEmptyAsync(linkedCancellationTokenSource.Token);
return await job.RunUntilEmptyAsync(linkedCancellationTokenSource.Token).AnyContext();
}

public static Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, CancellationToken cancellationToken = default) where T : class
/// <summary>
/// Will wait up to thirty seconds if queue is empty, otherwise will run until the queue is empty or cancelled.
/// </summary>
/// <returns>The amount of queue items processed. This count will not be accurate if the job is cancelled.</returns>
public static async Task<int> RunUntilEmptyAsync<T>(this IQueueJob<T> job, CancellationToken cancellationToken = default) where T : class
{
var logger = job.GetLogger();
return job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () =>

// NOTE: processed count is not accurate if the continuation callback is skipped due to cancellation.
int processed = 0;
await job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () =>
niemyjski marked this conversation as resolved.
Show resolved Hide resolved
{
processed++;

// Allow abandoned items to be added in a background task.
Thread.Yield();

Expand All @@ -45,6 +58,8 @@ public static Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, CancellationToke
logger.LogTrace("RunUntilEmpty continuation: Queued={Queued}, Working={Working}, Abandoned={Abandoned}", stats.Queued, stats.Working, stats.Abandoned);

return stats.Queued + stats.Working > 0;
});
}).AnyContext();

return processed;
}
}
39 changes: 34 additions & 5 deletions tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ await messageBus.SubscribeAsync<WorkItemStatus>(status =>
countdown.Signal();
});

await job.RunUntilEmptyAsync();
Assert.Equal(1, await job.RunUntilEmptyAsync());
await countdown.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(0, countdown.CurrentCount);
}
Expand Down Expand Up @@ -217,7 +217,7 @@ await messageBus.SubscribeAsync<WorkItemStatus>(status =>
countdown.Signal();
});

await job.RunUntilEmptyAsync();
Assert.Equal(1, await job.RunUntilEmptyAsync());
await countdown.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(0, countdown.CurrentCount);
}
Expand All @@ -242,7 +242,7 @@ await queue.EnqueueAsync(new MyWorkItem
SomeData = "Test"
}, true);

await job.RunUntilEmptyAsync();
Assert.Equal(2, await job.RunUntilEmptyAsync());
var stats = await queue.GetQueueStatsAsync();
Assert.Equal(2, stats.Enqueued);
Assert.Equal(2, stats.Dequeued);
Expand All @@ -260,7 +260,7 @@ public async Task CanRunWorkItemJobUntilEmptyWithNoEnqueuedItems()
handlerRegistry.Register<MyWorkItem>(new MyWorkItemHandler(Log));

var sw = Stopwatch.StartNew();
await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100));
Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100)));
sw.Stop();

Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(500));
Expand All @@ -271,6 +271,35 @@ public async Task CanRunWorkItemJobUntilEmptyWithNoEnqueuedItems()
Assert.Equal(0, stats.Completed);
}

[Fact]
public async Task CanRunWorkItemJobUntilEmptyHandlesCancellation()
{
using var queue = new InMemoryQueue<WorkItemData>(o => o.LoggerFactory(Log));
using var messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log));
var handlerRegistry = new WorkItemHandlers();
var job = new WorkItemJob(queue, messageBus, handlerRegistry, Log);

handlerRegistry.Register<MyWorkItem>(new MyWorkItemHandler(Log));

await queue.EnqueueAsync(new MyWorkItem
{
SomeData = "Test"
}, true);

await queue.EnqueueAsync(new MyWorkItem
{
SomeData = "Test"
}, true);

// NOTE: This count is wrong due to the continuation callback not firing due to the cancellation.
Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(50)));

var stats = await queue.GetQueueStatsAsync();
Assert.Equal(2, stats.Enqueued);
Assert.Equal(1, stats.Dequeued);
Assert.Equal(1, stats.Completed);
}

[Fact]
public async Task CanRunBadWorkItem()
{
Expand Down Expand Up @@ -299,7 +328,7 @@ await messageBus.SubscribeAsync<WorkItemStatus>(status =>
countdown.Signal();
});

await job.RunUntilEmptyAsync();
Assert.Equal(1, await job.RunUntilEmptyAsync());
await countdown.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(0, countdown.CurrentCount);
}
Expand Down