Skip to content

Commit

Permalink
Updated queue test base to be second for work timeouts as some cannot…
Browse files Browse the repository at this point in the history
… handle fractional seconds like sqs
  • Loading branch information
niemyjski committed Sep 27, 2024
1 parent 7325278 commit 562db60
Showing 1 changed file with 34 additions and 22 deletions.
56 changes: 34 additions & 22 deletions src/Foundatio.TestHarness/Queue/QueueTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ await queue.EnqueueAsync(new SimpleWorkItem
});
if (_assertStats)
{
Assert.Equal(1, (await queue.GetQueueStatsAsync()).Enqueued);
var stats = await queue.GetQueueStatsAsync();
Assert.Equal(1, stats.Enqueued);
Assert.Equal(1, metricsCollector.GetSum<long>("foundatio.simpleworkitem.enqueued"));
}

Expand All @@ -280,7 +281,8 @@ await queue.EnqueueAsync(new SimpleWorkItem
});
if (_assertStats)
{
Assert.Equal(1, (await queue.GetQueueStatsAsync()).Enqueued);
var stats = await queue.GetQueueStatsAsync();
Assert.Equal(1, stats.Enqueued);
Assert.Equal(1, metricsCollector.GetSum<long>("foundatio.simpleworkitem.enqueued"));
}

Expand All @@ -289,7 +291,8 @@ await queue.EnqueueAsync(new SimpleWorkItem
Assert.Equal("Hello", workItem.Value.Data);
if (_assertStats)
{
Assert.Equal(1, (await queue.GetQueueStatsAsync()).Dequeued);
var stats = await queue.GetQueueStatsAsync();
Assert.Equal(1, stats.Dequeued);
Assert.Equal(1, metricsCollector.GetSum<long>("foundatio.simpleworkitem.dequeued"));
}

Expand All @@ -300,18 +303,27 @@ await queue.EnqueueAsync(new SimpleWorkItem
});
if (_assertStats)
{
Assert.Equal(2, (await queue.GetQueueStatsAsync()).Enqueued);
var stats = await queue.GetQueueStatsAsync();
Assert.Equal(2, stats.Enqueued);
Assert.Equal(2, metricsCollector.GetSum<long>("foundatio.simpleworkitem.enqueued"));
}

await workItem.CompleteAsync();
Assert.False(workItem.IsAbandoned);
Assert.True(workItem.IsCompleted);
var stats = await queue.GetQueueStatsAsync();
if (_assertStats)
{
var stats = await queue.GetQueueStatsAsync();
Assert.Equal(2, stats.Enqueued);
Assert.Equal(1, stats.Dequeued);
Assert.Equal(1, stats.Completed);
Assert.Equal(0, stats.Abandoned);

Assert.Equal(1, stats.Queued);
Assert.Equal(0, stats.Working);
Assert.Equal(0, stats.Deadletter);
Assert.Equal(0, stats.Errors);
Assert.Equal(0, stats.Timeouts);

metricsCollector.RecordObservableInstruments();
Assert.Equal(2, metricsCollector.GetSum<long>("foundatio.simpleworkitem.enqueued"));
Expand All @@ -323,7 +335,6 @@ await queue.EnqueueAsync(new SimpleWorkItem
Assert.Equal(0, metricsCollector.GetSum<long>("foundatio.simpleworkitem.working"));
Assert.Equal(0, metricsCollector.GetSum<long>("foundatio.simpleworkitem.deadletter"));
}

}
finally
{
Expand Down Expand Up @@ -783,8 +794,7 @@ await queue.StartWorkingAsync(w =>

public virtual async Task WorkItemsWillTimeoutAsync()
{
Log.SetLogLevel("Foundatio.Queues.RedisQueue", LogLevel.Trace);
var queue = GetQueue(retryDelay: TimeSpan.Zero, workItemTimeout: TimeSpan.FromMilliseconds(50));
var queue = GetQueue(retryDelay: TimeSpan.Zero, workItemTimeout: TimeSpan.FromSeconds(1));
if (queue == null)
return;

Expand All @@ -810,7 +820,7 @@ await queue.EnqueueAsync(new SimpleWorkItem
var stats = await queue.GetQueueStatsAsync();
if (stats.Abandoned > 0)
break;
await Task.Delay(250);
await Task.Delay(1250);
} while (sw.Elapsed < TimeSpan.FromSeconds(10));
}

Expand Down Expand Up @@ -1011,7 +1021,7 @@ await Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), cancellationToke

public virtual async Task CanDelayRetryAsync()
{
var queue = GetQueue(workItemTimeout: TimeSpan.FromMilliseconds(500), retryDelay: TimeSpan.FromSeconds(1));
var queue = GetQueue(workItemTimeout: TimeSpan.FromSeconds(1), retryDelay: TimeSpan.FromSeconds(1));
if (queue == null)
return;

Expand All @@ -1037,7 +1047,7 @@ await queue.EnqueueAsync(new SimpleWorkItem
var elapsed = DateTime.UtcNow.Subtract(startTime);
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Time {Elapsed}", elapsed);
Assert.NotNull(workItem);
Assert.True(elapsed > TimeSpan.FromSeconds(.95));
Assert.InRange(elapsed, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10));
await workItem.CompleteAsync();

if (_assertStats)
Expand Down Expand Up @@ -1483,7 +1493,7 @@ protected async Task AssertEmptyQueueAsync(IQueue<SimpleWorkItem> queue)

public virtual async Task MaintainJobNotAbandon_NotWorkTimeOutEntry()
{
var queue = GetQueue(retries: 0, workItemTimeout: TimeSpan.FromMilliseconds(100), retryDelay: TimeSpan.Zero);
var queue = GetQueue(retries: 0, workItemTimeout: TimeSpan.FromSeconds(1), retryDelay: TimeSpan.Zero);
if (queue == null)
return;
try
Expand All @@ -1503,16 +1513,16 @@ public virtual async Task MaintainJobNotAbandon_NotWorkTimeOutEntry()

var dequeuedQueueItem = Assert.IsAssignableFrom<QueueEntry<SimpleWorkItem>>(await queue.DequeueAsync());
Assert.NotNull(dequeuedQueueItem.Value);
// The first dequeued item works for 60 milliseconds less than work timeout(100 milliseconds).
await Task.Delay(60);
// The first dequeued item works for 900 milliseconds less than work timeout(1s).
await Task.Delay(900);
await dequeuedQueueItem.CompleteAsync();
Assert.True(dequeuedQueueItem.IsCompleted);
Assert.False(dequeuedQueueItem.IsAbandoned);

dequeuedQueueItem = Assert.IsAssignableFrom<QueueEntry<SimpleWorkItem>>(await queue.DequeueAsync());
Assert.NotNull(dequeuedQueueItem.Value);
// The second dequeued item works for 60 milliseconds less than work timeout(100 milliseconds).
await Task.Delay(60);
// The second dequeued item works for 900 milliseconds less than work timeout(1s).
await Task.Delay(900);
await dequeuedQueueItem.CompleteAsync();
Assert.True(dequeuedQueueItem.IsCompleted);
Assert.False(dequeuedQueueItem.IsAbandoned);
Expand All @@ -1533,8 +1543,9 @@ public virtual async Task MaintainJobNotAbandon_NotWorkTimeOutEntry()

public virtual async Task CanHandleAutoAbandonInWorker()
{
Log.DefaultMinimumLevel = LogLevel.Trace;
// create queue with short work item timeout so it will be auto abandoned
var queue = GetQueue(workItemTimeout: TimeSpan.FromMilliseconds(100));
var queue = GetQueue(workItemTimeout: TimeSpan.FromSeconds(1));
if (queue == null)
return;

Expand All @@ -1546,7 +1557,7 @@ public virtual async Task CanHandleAutoAbandonInWorker()
var successEvent = new AsyncAutoResetEvent();
var errorEvent = new AsyncAutoResetEvent();

await queue.StartWorkingAsync(async (item) =>
await queue.StartWorkingAsync(async item =>
{
_logger.LogDebug("Processing item: {Id} Value={Value}", item.Id, item.Value.Data);
if (item.Value.Data == "Delay")
Expand All @@ -1563,9 +1574,10 @@ await queue.StartWorkingAsync(async (item) =>
}

stats = await queue.GetQueueStatsAsync();
_logger.LogTrace("Getting updated stats, Abandoned={Abandoned}", stats.Abandoned);
_logger.LogTrace("Getting updated stats... Queued={Queued}, Working={Working}, Abandoned={Abandoned} Deadletter={Deadletter}, Enqueued={Enqueued}, Dequeued={Dequeued}, Completed={Completed}, Errors={Errors}, Timeouts={Timeouts}",
stats.Queued, stats.Working, stats.Abandoned, stats.Deadletter, stats.Enqueued, stats.Dequeued, stats.Completed, stats.Errors, stats.Timeouts);

await Task.Delay(50, cancellationTokenSource.Token);
await Task.Delay(250, cancellationTokenSource.Token);
} while (sw.Elapsed < TimeSpan.FromSeconds(5));

Assert.Equal(1, stats.Abandoned);
Expand All @@ -1585,8 +1597,8 @@ await queue.StartWorkingAsync(async (item) =>
successEvent.Set();
}, cancellationToken: cancellationTokenSource.Token);

await queue.EnqueueAsync(new SimpleWorkItem() { Data = "Delay" });
await queue.EnqueueAsync(new SimpleWorkItem() { Data = "No Delay" });
await queue.EnqueueAsync(new SimpleWorkItem { Data = "Delay" });
await queue.EnqueueAsync(new SimpleWorkItem { Data = "No Delay" });

await errorEvent.WaitAsync(TimeSpan.FromSeconds(10));
await successEvent.WaitAsync(TimeSpan.FromSeconds(10));
Expand Down

0 comments on commit 562db60

Please sign in to comment.