diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index b8903106..ac68c54b 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -269,7 +269,8 @@ await queue.EnqueueAsync(new SimpleWorkItem }); if (_assertStats) { - Assert.Equal(1, (await queue.GetQueueStatsAsync()).Enqueued); + var stats = await queue.GetQueueStatsAsync(); + Assert.Equal(1, stats.Enqueued); Assert.Equal(1, metricsCollector.GetSum("foundatio.simpleworkitem.enqueued")); } @@ -280,7 +281,8 @@ await queue.EnqueueAsync(new SimpleWorkItem }); if (_assertStats) { - Assert.Equal(1, (await queue.GetQueueStatsAsync()).Enqueued); + var stats = await queue.GetQueueStatsAsync(); + Assert.Equal(1, stats.Enqueued); Assert.Equal(1, metricsCollector.GetSum("foundatio.simpleworkitem.enqueued")); } @@ -289,7 +291,8 @@ await queue.EnqueueAsync(new SimpleWorkItem Assert.Equal("Hello", workItem.Value.Data); if (_assertStats) { - Assert.Equal(1, (await queue.GetQueueStatsAsync()).Dequeued); + var stats = await queue.GetQueueStatsAsync(); + Assert.Equal(1, stats.Dequeued); Assert.Equal(1, metricsCollector.GetSum("foundatio.simpleworkitem.dequeued")); } @@ -300,18 +303,27 @@ await queue.EnqueueAsync(new SimpleWorkItem }); if (_assertStats) { - Assert.Equal(2, (await queue.GetQueueStatsAsync()).Enqueued); + var stats = await queue.GetQueueStatsAsync(); + Assert.Equal(2, stats.Enqueued); Assert.Equal(2, metricsCollector.GetSum("foundatio.simpleworkitem.enqueued")); } await workItem.CompleteAsync(); Assert.False(workItem.IsAbandoned); Assert.True(workItem.IsCompleted); - var stats = await queue.GetQueueStatsAsync(); if (_assertStats) { + var stats = await queue.GetQueueStatsAsync(); + Assert.Equal(2, stats.Enqueued); + Assert.Equal(1, stats.Dequeued); Assert.Equal(1, stats.Completed); + Assert.Equal(0, stats.Abandoned); + Assert.Equal(1, stats.Queued); + Assert.Equal(0, stats.Working); + Assert.Equal(0, stats.Deadletter); + Assert.Equal(0, stats.Errors); + Assert.Equal(0, stats.Timeouts); metricsCollector.RecordObservableInstruments(); Assert.Equal(2, metricsCollector.GetSum("foundatio.simpleworkitem.enqueued")); @@ -323,7 +335,6 @@ await queue.EnqueueAsync(new SimpleWorkItem Assert.Equal(0, metricsCollector.GetSum("foundatio.simpleworkitem.working")); Assert.Equal(0, metricsCollector.GetSum("foundatio.simpleworkitem.deadletter")); } - } finally { @@ -783,8 +794,7 @@ await queue.StartWorkingAsync(w => public virtual async Task WorkItemsWillTimeoutAsync() { - Log.SetLogLevel("Foundatio.Queues.RedisQueue", LogLevel.Trace); - var queue = GetQueue(retryDelay: TimeSpan.Zero, workItemTimeout: TimeSpan.FromMilliseconds(50)); + var queue = GetQueue(retryDelay: TimeSpan.Zero, workItemTimeout: TimeSpan.FromSeconds(1)); if (queue == null) return; @@ -810,7 +820,7 @@ await queue.EnqueueAsync(new SimpleWorkItem var stats = await queue.GetQueueStatsAsync(); if (stats.Abandoned > 0) break; - await Task.Delay(250); + await Task.Delay(1250); } while (sw.Elapsed < TimeSpan.FromSeconds(10)); } @@ -1011,7 +1021,7 @@ await Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), cancellationToke public virtual async Task CanDelayRetryAsync() { - var queue = GetQueue(workItemTimeout: TimeSpan.FromMilliseconds(500), retryDelay: TimeSpan.FromSeconds(1)); + var queue = GetQueue(workItemTimeout: TimeSpan.FromSeconds(1), retryDelay: TimeSpan.FromSeconds(1)); if (queue == null) return; @@ -1037,7 +1047,7 @@ await queue.EnqueueAsync(new SimpleWorkItem var elapsed = DateTime.UtcNow.Subtract(startTime); if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Time {Elapsed}", elapsed); Assert.NotNull(workItem); - Assert.True(elapsed > TimeSpan.FromSeconds(.95)); + Assert.InRange(elapsed, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10)); await workItem.CompleteAsync(); if (_assertStats) @@ -1483,7 +1493,7 @@ protected async Task AssertEmptyQueueAsync(IQueue queue) public virtual async Task MaintainJobNotAbandon_NotWorkTimeOutEntry() { - var queue = GetQueue(retries: 0, workItemTimeout: TimeSpan.FromMilliseconds(100), retryDelay: TimeSpan.Zero); + var queue = GetQueue(retries: 0, workItemTimeout: TimeSpan.FromSeconds(1), retryDelay: TimeSpan.Zero); if (queue == null) return; try @@ -1503,16 +1513,16 @@ public virtual async Task MaintainJobNotAbandon_NotWorkTimeOutEntry() var dequeuedQueueItem = Assert.IsAssignableFrom>(await queue.DequeueAsync()); Assert.NotNull(dequeuedQueueItem.Value); - // The first dequeued item works for 60 milliseconds less than work timeout(100 milliseconds). - await Task.Delay(60); + // The first dequeued item works for 900 milliseconds less than work timeout(1s). + await Task.Delay(900); await dequeuedQueueItem.CompleteAsync(); Assert.True(dequeuedQueueItem.IsCompleted); Assert.False(dequeuedQueueItem.IsAbandoned); dequeuedQueueItem = Assert.IsAssignableFrom>(await queue.DequeueAsync()); Assert.NotNull(dequeuedQueueItem.Value); - // The second dequeued item works for 60 milliseconds less than work timeout(100 milliseconds). - await Task.Delay(60); + // The second dequeued item works for 900 milliseconds less than work timeout(1s). + await Task.Delay(900); await dequeuedQueueItem.CompleteAsync(); Assert.True(dequeuedQueueItem.IsCompleted); Assert.False(dequeuedQueueItem.IsAbandoned); @@ -1533,8 +1543,9 @@ public virtual async Task MaintainJobNotAbandon_NotWorkTimeOutEntry() public virtual async Task CanHandleAutoAbandonInWorker() { + Log.DefaultMinimumLevel = LogLevel.Trace; // create queue with short work item timeout so it will be auto abandoned - var queue = GetQueue(workItemTimeout: TimeSpan.FromMilliseconds(100)); + var queue = GetQueue(workItemTimeout: TimeSpan.FromSeconds(1)); if (queue == null) return; @@ -1546,7 +1557,7 @@ public virtual async Task CanHandleAutoAbandonInWorker() var successEvent = new AsyncAutoResetEvent(); var errorEvent = new AsyncAutoResetEvent(); - await queue.StartWorkingAsync(async (item) => + await queue.StartWorkingAsync(async item => { _logger.LogDebug("Processing item: {Id} Value={Value}", item.Id, item.Value.Data); if (item.Value.Data == "Delay") @@ -1563,9 +1574,10 @@ await queue.StartWorkingAsync(async (item) => } stats = await queue.GetQueueStatsAsync(); - _logger.LogTrace("Getting updated stats, Abandoned={Abandoned}", stats.Abandoned); + _logger.LogTrace("Getting updated stats... Queued={Queued}, Working={Working}, Abandoned={Abandoned} Deadletter={Deadletter}, Enqueued={Enqueued}, Dequeued={Dequeued}, Completed={Completed}, Errors={Errors}, Timeouts={Timeouts}", + stats.Queued, stats.Working, stats.Abandoned, stats.Deadletter, stats.Enqueued, stats.Dequeued, stats.Completed, stats.Errors, stats.Timeouts); - await Task.Delay(50, cancellationTokenSource.Token); + await Task.Delay(250, cancellationTokenSource.Token); } while (sw.Elapsed < TimeSpan.FromSeconds(5)); Assert.Equal(1, stats.Abandoned); @@ -1585,8 +1597,8 @@ await queue.StartWorkingAsync(async (item) => successEvent.Set(); }, cancellationToken: cancellationTokenSource.Token); - await queue.EnqueueAsync(new SimpleWorkItem() { Data = "Delay" }); - await queue.EnqueueAsync(new SimpleWorkItem() { Data = "No Delay" }); + await queue.EnqueueAsync(new SimpleWorkItem { Data = "Delay" }); + await queue.EnqueueAsync(new SimpleWorkItem { Data = "No Delay" }); await errorEvent.WaitAsync(TimeSpan.FromSeconds(10)); await successEvent.WaitAsync(TimeSpan.FromSeconds(10));