Skip to content

Commit

Permalink
PR Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
niemyjski committed Jul 25, 2024
1 parent 8814fbe commit 126f102
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 40 deletions.
30 changes: 22 additions & 8 deletions src/Foundatio/Jobs/IJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ public static async Task<JobResult> TryRunAsync(this IJob job, CancellationToken
}
}

public static async Task RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func<Task<bool>> continuationCallback = null)
public static async Task<int> RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func<Task<bool>> continuationCallback = null)
{
int iterations = 0;
string jobName = job.GetType().Name;
var logger = job.GetLogger();
bool isInformationLogLevelEnabled = logger.IsEnabled(LogLevel.Information);

using var _ = logger.BeginScope(new Dictionary<string, object> { { "job", jobName } });
if (logger.IsEnabled(LogLevel.Information))
if (isInformationLogLevelEnabled)
logger.LogInformation("Starting continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName);

while (!cancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -72,18 +73,31 @@ public static async Task RunContinuousAsync(this IJob job, TimeSpan? interval =
if (!await continuationCallback().AnyContext())
break;
}
catch (Exception ex)
catch (Exception ex) when (logger.IsEnabled(LogLevel.Error))
{
if (logger.IsEnabled(LogLevel.Error))
logger.LogError(ex, "Error in continuation callback: {Message}", ex.Message);
logger.LogError(ex, "Error in continuation callback: {Message}", ex.Message);
}
}

logger.LogInformation("Finished continuous job type {JobName}: {IterationLimit} {Iterations}", jobName, Environment.MachineName, iterationLimit, iterations);
if (cancellationToken.IsCancellationRequested && logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("Job cancellation requested");

if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("Stopping continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName);
if (isInformationLogLevelEnabled)
{
if (iterationLimit > 0)
{
logger.LogInformation(
"Stopping continuous job type {JobName} on machine {MachineName}: Job ran {Iterations} times (Limit={IterationLimit})",
jobName, Environment.MachineName, iterationLimit, iterations);
}
else
{
logger.LogInformation(
"Stopping continuous job type {JobName} on machine {MachineName}: Job ran {Iterations} times",
jobName, Environment.MachineName, iterations);
}
}

return iterations;
}
}
45 changes: 18 additions & 27 deletions src/Foundatio/Jobs/IQueueJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,47 +20,36 @@ public interface IQueueJob<T> : IJob where T : class
public static class QueueJobExtensions
{
/// <summary>
/// Will run wait for the wait timeout to expire waiting if there are no queued items. It will then run until the queue is empty.
/// NOTE: The wait timeout will not be reset until after the first job is processed.
/// Will run until the wait timeout expires. If there is still data the job will be cancelled. and then will cancel the job.
/// </summary>
public static async Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, TimeSpan waitTimeout,
/// <returns>The amount of queue items processed.</returns>
public static async Task<int> RunUntilEmptyAsync<T>(this IQueueJob<T> job, TimeSpan waitTimeout,
CancellationToken cancellationToken = default) where T : class
{
if (waitTimeout <= TimeSpan.Zero)
throw new ArgumentException("Acquire timeout must be greater than zero", nameof(waitTimeout));

using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
linkedCancellationTokenSource.CancelAfter(waitTimeout);
bool hasAcquireTimeout = true;

var logger = job.GetLogger();

// NOTE: This has to be awaited otherwise the linkedCancellationTokenSource cancel timer will not fire.
await job.RunContinuousAsync(cancellationToken: linkedCancellationTokenSource.Token, continuationCallback: async () =>
{
// Stop the Cancel After
if (hasAcquireTimeout)
{
linkedCancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan);
hasAcquireTimeout = false;
}

// Allow abandoned items to be added in a background task.
Thread.Yield();

var stats = await job.Queue.GetQueueStatsAsync().AnyContext();
if (logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("RunUntilEmpty continuation: Queued={Queued}, Working={Working}, Abandoned={Abandoned}", stats.Queued, stats.Working, stats.Abandoned);

return stats.Queued + stats.Working > 0;
});
return await job.RunUntilEmptyAsync(linkedCancellationTokenSource.Token).AnyContext();
}

public static Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, CancellationToken cancellationToken = default) where T : class
/// <summary>
/// Will wait up to thirty seconds if queue is empty, otherwise will run until the queue is empty or cancelled.
/// </summary>
/// <returns>The amount of queue items processed. This count will not be accurate if the job is cancelled.</returns>
public static async Task<int> RunUntilEmptyAsync<T>(this IQueueJob<T> job, CancellationToken cancellationToken = default) where T : class
{
var logger = job.GetLogger();
return job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () =>

// NOTE: processed count is not accurate if the continuation callback is skipped due to cancellation.
int processed = 0;
await job.RunContinuousAsync(cancellationToken: cancellationToken, continuationCallback: async () =>
{
processed++;

// Allow abandoned items to be added in a background task.
Thread.Yield();

Expand All @@ -69,6 +58,8 @@ public static Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, CancellationToke
logger.LogTrace("RunUntilEmpty continuation: Queued={Queued}, Working={Working}, Abandoned={Abandoned}", stats.Queued, stats.Working, stats.Abandoned);

return stats.Queued + stats.Working > 0;
});
}).AnyContext();

return processed;
}
}
39 changes: 34 additions & 5 deletions tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ await messageBus.SubscribeAsync<WorkItemStatus>(status =>
countdown.Signal();
});

await job.RunUntilEmptyAsync();
Assert.Equal(1, await job.RunUntilEmptyAsync());
await countdown.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(0, countdown.CurrentCount);
}
Expand Down Expand Up @@ -217,7 +217,7 @@ await messageBus.SubscribeAsync<WorkItemStatus>(status =>
countdown.Signal();
});

await job.RunUntilEmptyAsync();
Assert.Equal(1, await job.RunUntilEmptyAsync());
await countdown.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(0, countdown.CurrentCount);
}
Expand All @@ -242,7 +242,7 @@ await queue.EnqueueAsync(new MyWorkItem
SomeData = "Test"
}, true);

await job.RunUntilEmptyAsync();
Assert.Equal(2, await job.RunUntilEmptyAsync());
var stats = await queue.GetQueueStatsAsync();
Assert.Equal(2, stats.Enqueued);
Assert.Equal(2, stats.Dequeued);
Expand All @@ -260,7 +260,7 @@ public async Task CanRunWorkItemJobUntilEmptyWithNoEnqueuedItems()
handlerRegistry.Register<MyWorkItem>(new MyWorkItemHandler(Log));

var sw = Stopwatch.StartNew();
await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100));
Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100)));
sw.Stop();

Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(500));
Expand All @@ -271,6 +271,35 @@ public async Task CanRunWorkItemJobUntilEmptyWithNoEnqueuedItems()
Assert.Equal(0, stats.Completed);
}

[Fact]
public async Task CanRunWorkItemJobUntilEmptyHandlesCancellation()
{
using var queue = new InMemoryQueue<WorkItemData>(o => o.LoggerFactory(Log));
using var messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log));
var handlerRegistry = new WorkItemHandlers();
var job = new WorkItemJob(queue, messageBus, handlerRegistry, Log);

handlerRegistry.Register<MyWorkItem>(new MyWorkItemHandler(Log));

await queue.EnqueueAsync(new MyWorkItem
{
SomeData = "Test"
}, true);

await queue.EnqueueAsync(new MyWorkItem
{
SomeData = "Test"
}, true);

// NOTE: This count is wrong due to the continuation callback not firing due to the cancellation.
Assert.Equal(0, await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(50)));

var stats = await queue.GetQueueStatsAsync();
Assert.Equal(2, stats.Enqueued);
Assert.Equal(1, stats.Dequeued);
Assert.Equal(1, stats.Completed);
}

[Fact]
public async Task CanRunBadWorkItem()
{
Expand Down Expand Up @@ -299,7 +328,7 @@ await messageBus.SubscribeAsync<WorkItemStatus>(status =>
countdown.Signal();
});

await job.RunUntilEmptyAsync();
Assert.Equal(1, await job.RunUntilEmptyAsync());
await countdown.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(0, countdown.CurrentCount);
}
Expand Down

0 comments on commit 126f102

Please sign in to comment.