diff --git a/.github/workflows/build-workflow.yml b/.github/workflows/build-workflow.yml index 2c5919dd..ed096051 100644 --- a/.github/workflows/build-workflow.yml +++ b/.github/workflows/build-workflow.yml @@ -42,7 +42,7 @@ jobs: run: "echo ref: ${{github.ref}} event: ${{github.event_name}}" - name: Build Version run: | - dotnet tool install --global minver-cli --version 4.3.0 + dotnet tool install --global minver-cli --version 5.0.0 version=$(minver --tag-prefix v) echo "MINVERVERSIONOVERRIDE=$version" >> $GITHUB_ENV echo "### Version: $version" >> $GITHUB_STEP_SUMMARY diff --git a/.gitignore b/.gitignore index 897f2f01..d8662874 100644 --- a/.gitignore +++ b/.gitignore @@ -32,23 +32,4 @@ _NCrunch_* .DS_Store # Rider - -# User specific -**/.idea/**/workspace.xml -**/.idea/**/tasks.xml -**/.idea/shelf/* -**/.idea/dictionaries - -# Sensitive or high-churn files -**/.idea/**/dataSources/ -**/.idea/**/dataSources.ids -**/.idea/**/dataSources.xml -**/.idea/**/dataSources.local.xml -**/.idea/**/sqlDataSources.xml -**/.idea/**/dynamic.xml - -# Rider -# Rider auto-generates .iml files, and contentModel.xml -**/.idea/**/*.iml -**/.idea/**/contentModel.xml -**/.idea/**/modules.xml \ No newline at end of file +.idea diff --git a/build/common.props b/build/common.props index 471c9ec5..0fed1d90 100644 --- a/build/common.props +++ b/build/common.props @@ -38,7 +38,7 @@ - + diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs index e4ec631b..e724b7ab 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs @@ -24,7 +24,7 @@ public class ScheduledJobService : BackgroundService, IJobStatus public ScheduledJobService(IServiceProvider serviceProvider, ILoggerFactory loggerFactory) { _serviceProvider = serviceProvider; - var cacheClient = serviceProvider.GetService() ?? new InMemoryCacheClient(); + var cacheClient = serviceProvider.GetService() ?? new InMemoryCacheClient(o => o.LoggerFactory(loggerFactory)); _jobs = new List(serviceProvider.GetServices().Select(j => new ScheduledJobRunner(j.JobFactory, j.Schedule, cacheClient, loggerFactory))); var lifetime = serviceProvider.GetService(); diff --git a/src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs b/src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs index 928eea88..75ee4ad1 100644 --- a/src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs +++ b/src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs @@ -210,7 +210,8 @@ public virtual async Task CanAddConcurrentlyAsync() string cacheKey = Guid.NewGuid().ToString("N").Substring(10); long adds = 0; - await Run.InParallelAsync(5, async i => + + await Parallel.ForEachAsync(Enumerable.Range(1, 5), async (i, _) => { if (await cache.AddAsync(cacheKey, i, TimeSpan.FromMinutes(1))) Interlocked.Increment(ref adds); diff --git a/src/Foundatio.TestHarness/Caching/HybridCacheClientTests.cs b/src/Foundatio.TestHarness/Caching/HybridCacheClientTests.cs index 4382a207..9e85bf70 100644 --- a/src/Foundatio.TestHarness/Caching/HybridCacheClientTests.cs +++ b/src/Foundatio.TestHarness/Caching/HybridCacheClientTests.cs @@ -13,14 +13,18 @@ namespace Foundatio.Tests.Caching; public class HybridCacheClientTests : CacheClientTestsBase, IDisposable { - protected readonly ICacheClient _distributedCache = new InMemoryCacheClient(new InMemoryCacheClientOptions()); - protected readonly IMessageBus _messageBus = new InMemoryMessageBus(new InMemoryMessageBusOptions()); + protected readonly ICacheClient _distributedCache; + protected readonly IMessageBus _messageBus; - public HybridCacheClientTests(ITestOutputHelper output) : base(output) { } + public HybridCacheClientTests(ITestOutputHelper output) : base(output) + { + _distributedCache = new InMemoryCacheClient(o => o.LoggerFactory(Log)); + _messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log)); + } protected override ICacheClient GetCacheClient(bool shouldThrowOnSerializationError = true) { - return new HybridCacheClient(_distributedCache, _messageBus, new InMemoryCacheClientOptions { CloneValues = true, ShouldThrowOnSerializationError = shouldThrowOnSerializationError }, Log); + return new HybridCacheClient(_distributedCache, _messageBus, new InMemoryCacheClientOptions { CloneValues = true, ShouldThrowOnSerializationError = shouldThrowOnSerializationError, LoggerFactory = Log }, Log); } [Fact] diff --git a/src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs b/src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs index e7974982..979b37b3 100644 --- a/src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs +++ b/src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs @@ -74,7 +74,7 @@ public virtual async Task CanRunQueueJobAsync() using var queue = GetSampleWorkItemQueue(retries: 0, retryDelay: TimeSpan.Zero); await queue.DeleteQueueAsync(); - var enqueueTask = Run.InParallelAsync(workItemCount, index => queue.EnqueueAsync(new SampleQueueWorkItem + var enqueueTask = Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), async (index, _) => await queue.EnqueueAsync(new SampleQueueWorkItem { Created = SystemClock.UtcNow, Path = "somepath" + index @@ -99,17 +99,17 @@ public virtual async Task CanRunQueueJobWithLockFailAsync() using var queue = GetSampleWorkItemQueue(retries: 3, retryDelay: TimeSpan.Zero); await queue.DeleteQueueAsync(); - var enqueueTask = Run.InParallelAsync(workItemCount, index => + var enqueueTask = Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), async (index, _) => { _logger.LogInformation($"Enqueue #{index}"); - return queue.EnqueueAsync(new SampleQueueWorkItem + await queue.EnqueueAsync(new SampleQueueWorkItem { Created = SystemClock.UtcNow, Path = "somepath" + index }); }); - var lockProvider = new ThrottlingLockProvider(new InMemoryCacheClient(new InMemoryCacheClientOptions()), allowedLockCount, TimeSpan.FromDays(1), Log); + var lockProvider = new ThrottlingLockProvider(new InMemoryCacheClient(o => o.LoggerFactory(Log)), allowedLockCount, TimeSpan.FromDays(1), Log); var job = new SampleQueueJobWithLocking(queue, null, lockProvider, Log); await SystemClock.SleepAsync(10); _logger.LogInformation("Starting RunUntilEmptyAsync"); @@ -147,10 +147,10 @@ public virtual async Task CanRunMultipleQueueJobsAsync() } _logger.LogInformation("Done setting up queues"); - var enqueueTask = Run.InParallelAsync(workItemCount, index => + var enqueueTask = Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), async (_, _) => { var queue = queues[RandomData.GetInt(0, jobCount - 1)]; - return queue.EnqueueAsync(new SampleQueueWorkItem + await queue.EnqueueAsync(new SampleQueueWorkItem { Created = SystemClock.UtcNow, Path = RandomData.GetString() @@ -159,7 +159,7 @@ public virtual async Task CanRunMultipleQueueJobsAsync() _logger.LogInformation("Done enqueueing"); var cancellationTokenSource = new CancellationTokenSource(); - await Run.InParallelAsync(jobCount, async index => + await Parallel.ForEachAsync(Enumerable.Range(1, jobCount), async (index, _) => { var queue = queues[index - 1]; var job = new SampleQueueWithRandomErrorsAndAbandonsJob(queue, metrics, Log); diff --git a/src/Foundatio.TestHarness/Jobs/ThrottledJob.cs b/src/Foundatio.TestHarness/Jobs/ThrottledJob.cs index 3a600c8a..ade785ef 100644 --- a/src/Foundatio.TestHarness/Jobs/ThrottledJob.cs +++ b/src/Foundatio.TestHarness/Jobs/ThrottledJob.cs @@ -18,7 +18,7 @@ public ThrottledJob(ICacheClient client, ILoggerFactory loggerFactory = null) : private readonly ILockProvider _locker; public int RunCount { get; set; } - protected override Task GetLockAsync(CancellationToken cancellationToken = default(CancellationToken)) + protected override Task GetLockAsync(CancellationToken cancellationToken = default) { return _locker.AcquireAsync(nameof(ThrottledJob), acquireTimeout: TimeSpan.Zero); } @@ -26,7 +26,7 @@ public ThrottledJob(ICacheClient client, ILoggerFactory loggerFactory = null) : protected override Task RunInternalAsync(JobContext context) { RunCount++; - + _logger.LogDebug("Incremented Run Count: {RunCount}", RunCount); return Task.FromResult(JobResult.Success); } } diff --git a/src/Foundatio.TestHarness/Jobs/WithLockingJob.cs b/src/Foundatio.TestHarness/Jobs/WithLockingJob.cs index d92bfaad..4b21bb44 100644 --- a/src/Foundatio.TestHarness/Jobs/WithLockingJob.cs +++ b/src/Foundatio.TestHarness/Jobs/WithLockingJob.cs @@ -17,7 +17,7 @@ public class WithLockingJob : JobWithLockBase public WithLockingJob(ILoggerFactory loggerFactory) : base(loggerFactory) { - _locker = new CacheLockProvider(new InMemoryCacheClient(new InMemoryCacheClientOptions { LoggerFactory = loggerFactory }), new InMemoryMessageBus(new InMemoryMessageBusOptions { LoggerFactory = loggerFactory }), loggerFactory); + _locker = new CacheLockProvider(new InMemoryCacheClient(o => o.LoggerFactory(loggerFactory)), new InMemoryMessageBus(o => o.LoggerFactory(loggerFactory)), loggerFactory); } public int RunCount { get; set; } diff --git a/src/Foundatio.TestHarness/Locks/LockTestBase.cs b/src/Foundatio.TestHarness/Locks/LockTestBase.cs index 1b987aab..903449d6 100644 --- a/src/Foundatio.TestHarness/Locks/LockTestBase.cs +++ b/src/Foundatio.TestHarness/Locks/LockTestBase.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -55,8 +55,7 @@ public virtual async Task CanAcquireAndReleaseLockAsync() int counter = 0; - bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); - await Run.InParallelAsync(25, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, 25), async (_, _) => { bool success = await locker.TryUsingAsync("test", () => { @@ -183,6 +182,42 @@ public virtual async Task CanAcquireLocksInParallel() if (locker == null) return; + Log.SetLogLevel(LogLevel.Trace); + + const int COUNT = 100; + int current = 1; + var used = new List(); + int concurrency = 0; + + await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) => + { + await using var myLock = await locker.AcquireAsync("test", TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)); + Assert.NotNull(myLock); + + int currentConcurrency = Interlocked.Increment(ref concurrency); + Assert.Equal(1, currentConcurrency); + + int item = current; + await Task.Delay(10, ct); + used.Add(item); + current++; + + Interlocked.Decrement(ref concurrency); + }); + + var duplicates = used.GroupBy(x => x).Where(g => g.Count() > 1); + Assert.Empty(duplicates); + Assert.Equal(COUNT, used.Count); + } + + public virtual async Task CanAcquireScopedLocksInParallel() + { + var lockProvider = GetLockProvider(); + if (lockProvider == null) + return; + + var locker = new ScopedLockProvider(lockProvider, "scoped"); + Log.SetLogLevel(LogLevel.Debug); const int COUNT = 100; @@ -190,7 +225,7 @@ public virtual async Task CanAcquireLocksInParallel() var used = new List(); int concurrency = 0; - await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (index, ct) => + await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) => { await using var myLock = await locker.AcquireAsync("test", TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)); Assert.NotNull(myLock); @@ -211,6 +246,40 @@ await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (index, ct) => Assert.Equal(COUNT, used.Count); } + public virtual async Task CanAcquireMultipleLocksInParallel() + { + var locker = GetLockProvider(); + if (locker == null) + return; + + Log.SetLogLevel(LogLevel.Debug); + + const int COUNT = 100; + int current = 1; + var used = new List(); + int concurrency = 0; + + await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) => + { + await using var myLock = await locker.AcquireAsync(["test"], TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)); + Assert.NotNull(myLock); + + int currentConcurrency = Interlocked.Increment(ref concurrency); + Assert.Equal(1, currentConcurrency); + + int item = current; + await Task.Delay(10, ct); + used.Add(item); + current++; + + Interlocked.Decrement(ref concurrency); + }); + + var duplicates = used.GroupBy(x => x).Where(g => g.Count() > 1); + Assert.Empty(duplicates); + Assert.Equal(COUNT, used.Count); + } + public virtual async Task LockOneAtATimeAsync() { var locker = GetLockProvider(); @@ -271,7 +340,6 @@ private Task DoLockedWorkAsync(ILockProvider locker) public virtual async Task WillThrottleCallsAsync() { - Log.DefaultMinimumLevel = LogLevel.Trace; Log.SetLogLevel(LogLevel.Information); Log.SetLogLevel(LogLevel.Trace); diff --git a/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs b/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs index fc68da43..e60abac8 100644 --- a/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs +++ b/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Exceptionless; @@ -248,7 +249,7 @@ await messageBus.SubscribeAsync(msg => }); var sw = Stopwatch.StartNew(); - await Run.InParallelAsync(numConcurrentMessages, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, numConcurrentMessages), async (i, _) => { await messageBus.PublishAsync(new SimpleMessageA { @@ -282,16 +283,16 @@ public virtual async Task CanSubscribeConcurrentlyAsync() try { var countdown = new AsyncCountdownEvent(iterations * 10); - await Run.InParallelAsync(10, i => + await Parallel.ForEachAsync(Enumerable.Range(1, 10), async (_, ct) => { - return messageBus.SubscribeAsync(msg => + await messageBus.SubscribeAsync(msg => { Assert.Equal("Hello", msg.Data); countdown.Signal(); - }); + }, cancellationToken: ct); }); - await Run.InParallelAsync(iterations, i => messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" })); + await Parallel.ForEachAsync(Enumerable.Range(1, iterations), async (_, _) => await messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" })); await countdown.WaitAsync(TimeSpan.FromSeconds(2)); Assert.Equal(0, countdown.CurrentCount); } @@ -312,27 +313,27 @@ public virtual async Task CanReceiveMessagesConcurrentlyAsync() try { var countdown = new AsyncCountdownEvent(iterations * 10); - await Run.InParallelAsync(10, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, 10), async (_, ct) => { var bus = GetMessageBus(); await bus.SubscribeAsync(msg => { Assert.Equal("Hello", msg.Data); countdown.Signal(); - }); + }, cancellationToken: ct); messageBuses.Add(bus); }); - var subscribe = Run.InParallelAsync(iterations, - i => - { - SystemClock.Sleep(RandomData.GetInt(0, 10)); - return messageBuses.Random().SubscribeAsync(msg => Task.CompletedTask); - }); - var publish = Run.InParallelAsync(iterations + 3, i => + var subscribe = Parallel.ForEachAsync(Enumerable.Range(1, iterations), async (i, ct) => { - return i switch + await SystemClock.SleepAsync(RandomData.GetInt(0, 10), ct); + await messageBuses.Random().SubscribeAsync(msg => Task.CompletedTask, cancellationToken: ct); + }); + + var publish = Parallel.ForEachAsync(Enumerable.Range(1, iterations + 3), async (i, _) => + { + await (i switch { 1 => messageBus.PublishAsync(new DerivedSimpleMessageA { Data = "Hello" }), 2 => messageBus.PublishAsync(new Derived2SimpleMessageA { Data = "Hello" }), @@ -348,7 +349,7 @@ await bus.SubscribeAsync(msg => iterations + 2 => messageBus.PublishAsync(new SimpleMessageC { Data = "Hello" }), iterations + 3 => messageBus.PublishAsync(new SimpleMessageB { Data = "Hello" }), _ => messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" }), - }; + }); }); await Task.WhenAll(subscribe, publish); diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index f0899eee..cfbdee9c 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -265,7 +265,7 @@ public virtual async Task CanDiscardDuplicateQueueEntriesAsync() { await queue.DeleteQueueAsync(); await AssertEmptyQueueAsync(queue); - queue.AttachBehavior(new DuplicateDetectionQueueBehavior(new InMemoryCacheClient(), Log)); + queue.AttachBehavior(new DuplicateDetectionQueueBehavior(new InMemoryCacheClient(o => o.LoggerFactory(Log)), Log)); await queue.EnqueueAsync(new SimpleWorkItem { @@ -358,6 +358,8 @@ public virtual Task VerifyDelayedRetryAttemptsAsync() private async Task VerifyRetryAttemptsImplAsync(IQueue queue, int retryCount, TimeSpan waitTime) { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + try { await queue.DeleteQueueAsync(); @@ -367,6 +369,7 @@ private async Task VerifyRetryAttemptsImplAsync(IQueue queue, in var countdown = new AsyncCountdownEvent(retryCount + 1); int attempts = 0; + await queue.StartWorkingAsync(async w => { Interlocked.Increment(ref attempts); @@ -380,7 +383,7 @@ await queue.StartWorkingAsync(async w => countdown.Signal(); _logger.LogInformation("Finished Attempt {Attempt} to work on queue item, Metadata Attempts: {MetadataAttempts}", attempts, queueEntryMetadata.Attempts); - }); + }, cancellationToken: cancellationTokenSource.Token); await queue.EnqueueAsync(new SimpleWorkItem { @@ -410,6 +413,7 @@ await queue.EnqueueAsync(new SimpleWorkItem } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -637,7 +641,6 @@ public virtual async Task WillNotWaitForItemAsync() public virtual async Task WillWaitForItemAsync() { - Log.DefaultMinimumLevel = LogLevel.Trace; var queue = GetQueue(); if (queue == null) return; @@ -717,6 +720,7 @@ public virtual async Task CanUseQueueWorkerAsync() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -728,7 +732,7 @@ await queue.StartWorkingAsync(async w => Assert.Equal("Hello", w.Value.Data); await w.CompleteAsync(); resetEvent.Set(); - }); + }, cancellationToken: cancellationTokenSource.Token); await queue.EnqueueAsync(new SimpleWorkItem { @@ -746,6 +750,7 @@ await queue.EnqueueAsync(new SimpleWorkItem } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -756,6 +761,7 @@ public virtual async Task CanHandleErrorInWorkerAsync() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -771,7 +777,7 @@ await queue.StartWorkingAsync(w => _logger.LogDebug("WorkAction"); Assert.Equal("Hello", w.Value.Data); throw new Exception(); - }); + }, cancellationToken: cancellationTokenSource.Token); var resetEvent = new AsyncManualResetEvent(false); using (queue.Abandoned.AddSyncHandler((o, args) => resetEvent.Set())) @@ -794,13 +800,13 @@ await queue.StartWorkingAsync(w => } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } public virtual async Task WorkItemsWillTimeoutAsync() { - Log.DefaultMinimumLevel = LogLevel.Trace; Log.SetLogLevel("Foundatio.Queues.RedisQueue", LogLevel.Trace); var queue = GetQueue(retryDelay: TimeSpan.Zero, workItemTimeout: TimeSpan.FromMilliseconds(50)); if (queue == null) @@ -901,6 +907,7 @@ public virtual async Task CanAutoCompleteWorkerAsync() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -911,7 +918,7 @@ await queue.StartWorkingAsync(w => { Assert.Equal("Hello", w.Value.Data); return Task.CompletedTask; - }, true); + }, true, cancellationTokenSource.Token); using (queue.Completed.AddSyncHandler((s, e) => { resetEvent.Set(); })) { @@ -931,6 +938,7 @@ await queue.StartWorkingAsync(w => } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -941,6 +949,7 @@ public virtual async Task CanHaveMultipleQueueInstancesAsync() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -958,11 +967,11 @@ public virtual async Task CanHaveMultipleQueueInstancesAsync() { var q = GetQueue(retries: 0, retryDelay: TimeSpan.Zero); if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Queue Id: {Id}, I: {Instance}", q.QueueId, i); - await q.StartWorkingAsync(w => DoWorkAsync(w, countdown, info)); + await q.StartWorkingAsync(w => DoWorkAsync(w, countdown, info), cancellationToken: cancellationTokenSource.Token); workers.Add(q); } - await Run.InParallelAsync(workItemCount, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), cancellationTokenSource.Token, async (i, _) => { string id = await queue.EnqueueAsync(new SimpleWorkItem { @@ -972,8 +981,8 @@ await Run.InParallelAsync(workItemCount, async i => if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Enqueued Index: {Instance} Id: {Id}", i, id); }); - await countdown.WaitAsync(); - await SystemClock.SleepAsync(50); + await countdown.WaitAsync(cancellationTokenSource.Token); + await SystemClock.SleepAsync(50, cancellationTokenSource.Token); if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("Work Info Stats: Completed: {Completed} Abandoned: {Abandoned} Error: {Errors}", info.CompletedCount, info.AbandonCount, info.ErrorCount); @@ -1013,6 +1022,7 @@ await Run.InParallelAsync(workItemCount, async i => } finally { + await cancellationTokenSource.CancelAsync(); foreach (var q in workers) await CleanupQueueAsync(q); } @@ -1067,13 +1077,12 @@ public virtual async Task CanRunWorkItemWithMetricsAsync() { int completedCount = 0; - using var metrics = new InMemoryMetricsClient(new InMemoryMetricsClientOptions { Buffered = false, LoggerFactory = Log }); + using var metrics = new InMemoryMetricsClient(o => o.Buffered(false).LoggerFactory(Log)); #pragma warning disable CS0618 // Type or member is obsolete var behavior = new MetricsQueueBehavior(metrics, "metric", TimeSpan.FromMilliseconds(100), loggerFactory: Log); #pragma warning restore CS0618 // Type or member is obsolete - var options = new InMemoryQueueOptions { Behaviors = new[] { behavior }, LoggerFactory = Log }; - using var queue = new InMemoryQueue(options); + using var queue = new InMemoryQueue(o => o.Behaviors(behavior).LoggerFactory(Log)); Task Handler(object sender, CompletedEventArgs e) { @@ -1305,8 +1314,8 @@ public virtual async Task CanCompleteQueueEntryOnceAsync() public virtual async Task CanDequeueWithLockingAsync() { - using var cache = new InMemoryCacheClient(new InMemoryCacheClientOptions { LoggerFactory = Log }); - using var messageBus = new InMemoryMessageBus(new InMemoryMessageBusOptions { LoggerFactory = Log }); + using var cache = new InMemoryCacheClient(o => o.LoggerFactory(Log)); + using var messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log)); var distributedLock = new CacheLockProvider(cache, messageBus, Log); await CanDequeueWithLockingImpAsync(distributedLock); @@ -1318,12 +1327,12 @@ protected async Task CanDequeueWithLockingImpAsync(CacheLockProvider distributed if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); await AssertEmptyQueueAsync(queue); - Log.DefaultMinimumLevel = LogLevel.Trace; using var metrics = new InMemoryMetricsClient(new InMemoryMetricsClientOptions { Buffered = false, LoggerFactory = Log }); #pragma warning disable CS0618 // Type or member is obsolete @@ -1343,7 +1352,7 @@ await queue.StartWorkingAsync(async w => await w.CompleteAsync(); resetEvent.Set(); - }); + }, cancellationToken: cancellationTokenSource.Token); await queue.EnqueueAsync(new SimpleWorkItem { Data = "Hello" }); await resetEvent.WaitAsync(TimeSpan.FromSeconds(5)); @@ -1358,14 +1367,15 @@ await queue.StartWorkingAsync(async w => } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } public virtual async Task CanHaveMultipleQueueInstancesWithLockingAsync() { - using var cache = new InMemoryCacheClient(new InMemoryCacheClientOptions { LoggerFactory = Log }); - using var messageBus = new InMemoryMessageBus(new InMemoryMessageBusOptions { LoggerFactory = Log }); + using var cache = new InMemoryCacheClient(o => o.LoggerFactory(Log)); + using var messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log)); var distributedLock = new CacheLockProvider(cache, messageBus, Log); await CanHaveMultipleQueueInstancesWithLockingImplAsync(distributedLock); @@ -1377,6 +1387,7 @@ protected async Task CanHaveMultipleQueueInstancesWithLockingImplAsync(CacheLock if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -1398,11 +1409,11 @@ await q.StartWorkingAsync(async w => { if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("[{Instance}] Acquiring distributed lock in work item: {Id}", instanceCount, w.Id); - var l = await distributedLock.AcquireAsync("test"); + var l = await distributedLock.AcquireAsync("test", cancellationToken: cancellationTokenSource.Token); Assert.NotNull(l); if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("[{Instance}] Acquired distributed lock: {Id}", instanceCount, w.Id); - await SystemClock.SleepAsync(TimeSpan.FromMilliseconds(50)); + await SystemClock.SleepAsync(TimeSpan.FromMilliseconds(50), cancellationTokenSource.Token); await l.ReleaseAsync(); if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("[{Instance}] Released distributed lock: {Id}", instanceCount, w.Id); @@ -1412,11 +1423,11 @@ await q.StartWorkingAsync(async w => countdown.Signal(); if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("[{Instance}] Signaled countdown: {Id}", instanceCount, w.Id); - }); + }, cancellationToken: cancellationTokenSource.Token); workers.Add(q); } - await Run.InParallelAsync(workItemCount, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), cancellationTokenSource.Token, async (i, _) => { string id = await queue.EnqueueAsync(new SimpleWorkItem { @@ -1427,7 +1438,7 @@ await Run.InParallelAsync(workItemCount, async i => }); await countdown.WaitAsync(TimeSpan.FromSeconds(5)); - await SystemClock.SleepAsync(50); + await SystemClock.SleepAsync(50, cancellationTokenSource.Token); if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Completed: {Completed} Abandoned: {Abandoned} Error: {Errors}", info.CompletedCount, info.AbandonCount, info.ErrorCount); if (_logger.IsEnabled(LogLevel.Information)) @@ -1456,6 +1467,7 @@ await Run.InParallelAsync(workItemCount, async i => } finally { + await cancellationTokenSource.CancelAsync(); foreach (var q in workers) await CleanupQueueAsync(q); } @@ -1574,6 +1586,7 @@ public virtual async Task CanHandleAutoAbandonInWorker() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -1583,6 +1596,7 @@ public virtual async Task CanHandleAutoAbandonInWorker() await queue.StartWorkingAsync(async (item) => { + _logger.LogDebug("Processing item: {Id} Value={Value}", item.Id, item.Value.Data); if (item.Value.Data == "Delay") { // wait for queue item to get auto abandoned @@ -1591,10 +1605,16 @@ await queue.StartWorkingAsync(async (item) => do { if (stats.Abandoned > 0) + { + _logger.LogTrace("Breaking, queue item was abandoned"); break; + } stats = await queue.GetQueueStatsAsync(); - } while (sw.Elapsed < TimeSpan.FromSeconds(10)); + _logger.LogTrace("Getting updated stats, Abandoned={Abandoned}", stats.Abandoned); + + await Task.Delay(50, cancellationTokenSource.Token); + } while (sw.Elapsed < TimeSpan.FromSeconds(5)); Assert.Equal(1, stats.Abandoned); } @@ -1603,14 +1623,15 @@ await queue.StartWorkingAsync(async (item) => { await item.CompleteAsync(); } - catch + catch (Exception ex) { + _logger.LogDebug(ex, "Error completing item: {Message}", ex.Message); errorEvent.Set(); throw; } successEvent.Set(); - }); + }, cancellationToken: cancellationTokenSource.Token); await queue.EnqueueAsync(new SimpleWorkItem() { Data = "Delay" }); await queue.EnqueueAsync(new SimpleWorkItem() { Data = "No Delay" }); @@ -1620,6 +1641,7 @@ await queue.StartWorkingAsync(async (item) => } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } diff --git a/src/Foundatio.TestHarness/Storage/FileStorageTestsBase.cs b/src/Foundatio.TestHarness/Storage/FileStorageTestsBase.cs index ccc66893..a39e4a51 100644 --- a/src/Foundatio.TestHarness/Storage/FileStorageTestsBase.cs +++ b/src/Foundatio.TestHarness/Storage/FileStorageTestsBase.cs @@ -559,7 +559,7 @@ public virtual async Task CanConcurrentlyManageFilesAsync() var info = await storage.GetFileInfoAsync("nope"); Assert.Null(info); - await Run.InParallelAsync(10, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, 10), async (i, ct) => { var ev = new PostInfo { @@ -573,13 +573,13 @@ await Run.InParallelAsync(10, async i => UserAgent = "test" }; - await storage.SaveObjectAsync(Path.Combine(queueFolder, i + ".json"), ev); - queueItems.Add(i); + await storage.SaveObjectAsync(Path.Combine(queueFolder, i + ".json"), ev, cancellationToken: ct); + queueItems.Add(i, ct); }); Assert.Equal(10, (await storage.GetFileListAsync()).Count); - await Run.InParallelAsync(10, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, 10), async (_, _) => { string path = Path.Combine(queueFolder, queueItems.Random() + ".json"); var eventPost = await storage.GetEventPostAndSetActiveAsync(Path.Combine(queueFolder, RandomData.GetInt(0, 25) + ".json"), _logger); diff --git a/src/Foundatio/Caching/InMemoryCacheClient.cs b/src/Foundatio/Caching/InMemoryCacheClient.cs index 5b7d5ad2..2c16ec22 100644 --- a/src/Foundatio/Caching/InMemoryCacheClient.cs +++ b/src/Foundatio/Caching/InMemoryCacheClient.cs @@ -6,6 +6,7 @@ using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; +using Foundatio.AsyncEx; using Foundatio.Utility; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -15,14 +16,14 @@ namespace Foundatio.Caching; public class InMemoryCacheClient : IMemoryCacheClient { private readonly ConcurrentDictionary _memory; - private bool _shouldClone; - private bool _shouldThrowOnSerializationErrors; - private int? _maxItems; + private readonly bool _shouldClone; + private readonly bool _shouldThrowOnSerializationErrors; + private readonly int? _maxItems; private long _writes; private long _hits; private long _misses; private readonly ILogger _logger; - private readonly object _lock = new(); + private readonly AsyncLock _lock = new(); public InMemoryCacheClient() : this(o => o) { } @@ -68,7 +69,7 @@ private void OnItemExpired(string key, bool sendNotification = true) if (ItemExpired == null) return; - Task.Factory.StartNew(state => + Task.Factory.StartNew(_ => { var args = new ItemExpiredEventArgs { @@ -119,27 +120,31 @@ public async Task RemoveIfEqualAsync(string key, T expected) if (String.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key), "Key cannot be null or empty"); - if (_logger.IsEnabled(LogLevel.Trace)) + bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); + if (isTraceLogLevelEnabled) _logger.LogTrace("RemoveIfEqualAsync Key: {Key} Expected: {Expected}", key, expected); bool wasExpectedValue = false; - bool success = _memory.TryUpdate(key, (k, e) => + bool success = _memory.TryUpdate(key, (key, existingEntry) => { - var currentValue = e.GetValue(); + var currentValue = existingEntry.GetValue(); if (currentValue.Equals(expected)) { - e.ExpiresAt = DateTime.MinValue; + if (isTraceLogLevelEnabled) + _logger.LogTrace("RemoveIfEqualAsync Key: {Key} Updating ExpiresAt to DateTime.MinValue", key); + + existingEntry.ExpiresAt = DateTime.MinValue; wasExpectedValue = true; } - return e; + return existingEntry; }); success = success && wasExpectedValue; await StartMaintenanceAsync().AnyContext(); - if (_logger.IsEnabled(LogLevel.Trace)) + if (isTraceLogLevelEnabled) _logger.LogTrace("RemoveIfEqualAsync Key: {Key} Expected: {Expected} Success: {Success}", key, expected, success); return success; @@ -154,13 +159,14 @@ public Task RemoveAllAsync(IEnumerable keys = null) return Task.FromResult(count); } + bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); int removed = 0; foreach (string key in keys) { if (String.IsNullOrEmpty(key)) continue; - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("RemoveAllAsync: Removing key: {Key}", key); + if (isTraceLogLevelEnabled) _logger.LogTrace("RemoveAllAsync: Removing key: {Key}", key); if (_memory.TryRemove(key, out _)) removed++; } @@ -189,10 +195,18 @@ public Task RemoveByPrefixAsync(string prefix) internal void RemoveExpiredKey(string key, bool sendNotification = true) { - _logger.LogDebug("Removing expired cache entry {Key}", key); + // Consideration: We could reduce the amount of calls to this by updating ExpiresAt and only having maintenance remove keys. + if (_memory.TryGetValue(key, out var existingEntry) && existingEntry.ExpiresAt < SystemClock.UtcNow) + { + if (_memory.TryRemove(key, out var removedEntry)) + { + if (removedEntry.ExpiresAt >= SystemClock.UtcNow) + throw new Exception("Removed item was not expired"); - if (_memory.TryRemove(key, out _)) - OnItemExpired(key, sendNotification); + _logger.LogDebug("Removing expired cache entry {Key}", key); + OnItemExpired(key, sendNotification); + } + } } public Task> GetAsync(string key) @@ -200,15 +214,14 @@ public Task> GetAsync(string key) if (String.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key), "Key cannot be null or empty"); - if (!_memory.TryGetValue(key, out var cacheEntry)) + if (!_memory.TryGetValue(key, out var existingEntry)) { Interlocked.Increment(ref _misses); return Task.FromResult(CacheValue.NoValue); } - if (cacheEntry.ExpiresAt < SystemClock.UtcNow) + if (existingEntry.ExpiresAt < SystemClock.UtcNow) { - RemoveExpiredKey(key); Interlocked.Increment(ref _misses); return Task.FromResult(CacheValue.NoValue); } @@ -217,13 +230,13 @@ public Task> GetAsync(string key) try { - var value = cacheEntry.GetValue(); + var value = existingEntry.GetValue(); return Task.FromResult(new CacheValue(value, true)); } catch (Exception ex) { if (_logger.IsEnabled(LogLevel.Error)) - _logger.LogError(ex, "Unable to deserialize value {Value} to type {TypeFullName}", cacheEntry.Value, typeof(T).FullName); + _logger.LogError(ex, "Unable to deserialize value {Value} to type {TypeFullName}", existingEntry.Value, typeof(T).FullName); if (_shouldThrowOnSerializationErrors) throw; @@ -275,12 +288,12 @@ public async Task SetIfHigherAsync(string key, double value, TimeSpan? e double difference = value; var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; - _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (k, entry) => + _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (_, existingEntry) => { double? currentValue = null; try { - currentValue = entry.GetValue(); + currentValue = existingEntry.GetValue(); } catch (Exception ex) { @@ -290,15 +303,17 @@ public async Task SetIfHigherAsync(string key, double value, TimeSpan? e if (currentValue.HasValue && currentValue.Value < value) { difference = value - currentValue.Value; - entry.Value = value; + existingEntry.Value = value; } else + { difference = 0; + } if (expiresIn.HasValue) - entry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return entry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -321,12 +336,12 @@ public async Task SetIfHigherAsync(string key, long value, TimeSpan? expir long difference = value; var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; - _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (k, entry) => + _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (_, existingEntry) => { long? currentValue = null; try { - currentValue = entry.GetValue(); + currentValue = existingEntry.GetValue(); } catch (Exception ex) { @@ -336,15 +351,17 @@ public async Task SetIfHigherAsync(string key, long value, TimeSpan? expir if (currentValue.HasValue && currentValue.Value < value) { difference = value - currentValue.Value; - entry.Value = value; + existingEntry.Value = value; } else + { difference = 0; + } if (expiresIn.HasValue) - entry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return entry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -367,12 +384,12 @@ public async Task SetIfLowerAsync(string key, double value, TimeSpan? ex double difference = value; var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; - _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (k, entry) => + _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (_, existingEntry) => { double? currentValue = null; try { - currentValue = entry.GetValue(); + currentValue = existingEntry.GetValue(); } catch (Exception ex) { @@ -382,15 +399,17 @@ public async Task SetIfLowerAsync(string key, double value, TimeSpan? ex if (currentValue.HasValue && currentValue.Value > value) { difference = currentValue.Value - value; - entry.Value = value; + existingEntry.Value = value; } else + { difference = 0; + } if (expiresIn.HasValue) - entry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return entry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -411,12 +430,12 @@ public async Task SetIfLowerAsync(string key, long value, TimeSpan? expire long difference = value; var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; - _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (k, entry) => + _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (_, existingEntry) => { long? currentValue = null; try { - currentValue = entry.GetValue(); + currentValue = existingEntry.GetValue(); } catch (Exception ex) { @@ -426,15 +445,17 @@ public async Task SetIfLowerAsync(string key, long value, TimeSpan? expire if (currentValue.HasValue && currentValue.Value > value) { difference = currentValue.Value - value; - entry.Value = value; + existingEntry.Value = value; } else + { difference = 0; + } if (expiresIn.HasValue) - entry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return entry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -463,18 +484,18 @@ public async Task ListAddAsync(string key, IEnumerable values, TimeS { var items = new HashSet(new[] { stringValue }); var entry = new CacheEntry(items, expiresAt, _shouldClone); - _memory.AddOrUpdate(key, entry, (k, cacheEntry) => + _memory.AddOrUpdate(key, entry, (_, existingEntry) => { - if (!(cacheEntry.Value is ICollection collection)) + if (!(existingEntry.Value is ICollection collection)) throw new InvalidOperationException($"Unable to add value for key: {key}. Cache value does not contain a set"); collection.Add(stringValue); - cacheEntry.Value = collection; + existingEntry.Value = collection; if (expiresIn.HasValue) - cacheEntry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return cacheEntry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -485,18 +506,18 @@ public async Task ListAddAsync(string key, IEnumerable values, TimeS { var items = new HashSet(values); var entry = new CacheEntry(items, expiresAt, _shouldClone); - _memory.AddOrUpdate(key, entry, (k, cacheEntry) => + _memory.AddOrUpdate(key, entry, (_, existingEntry) => { - if (!(cacheEntry.Value is ICollection collection)) + if (!(existingEntry.Value is ICollection collection)) throw new InvalidOperationException($"Unable to add value for key: {key}. Cache value does not contain a set"); collection.AddRange(items); - cacheEntry.Value = collection; + existingEntry.Value = collection; if (expiresIn.HasValue) - cacheEntry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return cacheEntry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -525,21 +546,21 @@ public Task ListRemoveAsync(string key, IEnumerable values, TimeSpan if (values is string stringValue) { var items = new HashSet(new[] { stringValue }); - _memory.TryUpdate(key, (k, cacheEntry) => + _memory.TryUpdate(key, (_, existingEntry) => { - if (cacheEntry.Value is ICollection collection && collection.Count > 0) + if (existingEntry.Value is ICollection collection && collection.Count > 0) { - foreach (var value in items) + foreach (string value in items) collection.Remove(value); - cacheEntry.Value = collection; + existingEntry.Value = collection; } if (expiresIn.HasValue) - cacheEntry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Removed value from set with cache key: {Key}", key); - return cacheEntry; + return existingEntry; }); return Task.FromResult(items.Count); @@ -547,21 +568,21 @@ public Task ListRemoveAsync(string key, IEnumerable values, TimeSpan else { var items = new HashSet(values); - _memory.TryUpdate(key, (k, cacheEntry) => + _memory.TryUpdate(key, (_, existingEntry) => { - if (cacheEntry.Value is ICollection collection && collection.Count > 0) + if (existingEntry.Value is ICollection collection && collection.Count > 0) { foreach (var value in items) collection.Remove(value); - cacheEntry.Value = collection; + existingEntry.Value = collection; } if (expiresIn.HasValue) - cacheEntry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Removed value from set with cache key: {Key}", key); - return cacheEntry; + return existingEntry; }); return Task.FromResult(items.Count); @@ -595,39 +616,38 @@ private async Task SetInternalAsync(string key, CacheEntry entry, bool add Interlocked.Increment(ref _writes); + bool wasUpdated = true; + bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); if (addOnly) { - if (!_memory.TryAdd(key, entry)) + _memory.AddOrUpdate(key, entry, (_, existingEntry) => { + wasUpdated = false; // check to see if existing entry is expired - bool updated = false; - _memory.TryUpdate(key, (key, existingEntry) => + if (existingEntry.ExpiresAt < SystemClock.UtcNow) { - if (existingEntry.ExpiresAt < SystemClock.UtcNow) - { - updated = true; - return entry; - } + if (isTraceLogLevelEnabled) + _logger.LogTrace("Replacing expired cache key: {Key}", key); - return existingEntry; - }); + wasUpdated = true; + return entry; + } - if (!updated) - return false; - } + return existingEntry; + }); - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Added cache key: {Key}", key); + if (wasUpdated && isTraceLogLevelEnabled) + _logger.LogTrace("Added cache key: {Key}", key); } else { - _memory.AddOrUpdate(key, entry, (k, cacheEntry) => entry); - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Set cache key: {Key}", key); + _memory.AddOrUpdate(key, entry, (_, _) => entry); + if (isTraceLogLevelEnabled) _logger.LogTrace("Set cache key: {Key}", key); } await StartMaintenanceAsync(true).AnyContext(); - - return true; + return wasUpdated; } public async Task SetAllAsync(IDictionary values, TimeSpan? expiresIn = null) @@ -666,19 +686,19 @@ public async Task ReplaceIfEqualAsync(string key, T value, T expected, var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; bool wasExpectedValue = false; - bool success = _memory.TryUpdate(key, (k, cacheEntry) => + bool success = _memory.TryUpdate(key, (_, existingEntry) => { - var currentValue = cacheEntry.GetValue(); + var currentValue = existingEntry.GetValue(); if (currentValue.Equals(expected)) { - cacheEntry.Value = value; + existingEntry.Value = value; wasExpectedValue = true; if (expiresIn.HasValue) - cacheEntry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; } - return cacheEntry; + return existingEntry; }); success = success && wasExpectedValue; @@ -704,12 +724,12 @@ public async Task IncrementAsync(string key, double amount, TimeSpan? ex Interlocked.Increment(ref _writes); var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; - var result = _memory.AddOrUpdate(key, new CacheEntry(amount, expiresAt, _shouldClone), (k, entry) => + var result = _memory.AddOrUpdate(key, new CacheEntry(amount, expiresAt, _shouldClone), (_, existingEntry) => { double? currentValue = null; try { - currentValue = entry.GetValue(); + currentValue = existingEntry.GetValue(); } catch (Exception ex) { @@ -717,14 +737,14 @@ public async Task IncrementAsync(string key, double amount, TimeSpan? ex } if (currentValue.HasValue) - entry.Value = currentValue.Value + amount; + existingEntry.Value = currentValue.Value + amount; else - entry.Value = amount; + existingEntry.Value = amount; if (expiresIn.HasValue) - entry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return entry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -746,12 +766,12 @@ public async Task IncrementAsync(string key, long amount, TimeSpan? expire Interlocked.Increment(ref _writes); var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; - var result = _memory.AddOrUpdate(key, new CacheEntry(amount, expiresAt, _shouldClone), (k, entry) => + var result = _memory.AddOrUpdate(key, new CacheEntry(amount, expiresAt, _shouldClone), (_, existingEntry) => { long? currentValue = null; try { - currentValue = entry.GetValue(); + currentValue = existingEntry.GetValue(); } catch (Exception ex) { @@ -759,14 +779,14 @@ public async Task IncrementAsync(string key, long amount, TimeSpan? expire } if (currentValue.HasValue) - entry.Value = currentValue.Value + amount; + existingEntry.Value = currentValue.Value + amount; else - entry.Value = amount; + existingEntry.Value = amount; if (expiresIn.HasValue) - entry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return entry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -779,16 +799,19 @@ public Task ExistsAsync(string key) if (String.IsNullOrEmpty(key)) return Task.FromException(new ArgumentNullException(nameof(key), "Key cannot be null or empty.")); - if (!_memory.TryGetValue(key, out var cacheEntry)) + if (!_memory.TryGetValue(key, out var existingEntry)) { Interlocked.Increment(ref _misses); return Task.FromResult(false); } - Interlocked.Increment(ref _hits); - if (cacheEntry.ExpiresAt < SystemClock.UtcNow) + if (existingEntry.ExpiresAt < SystemClock.UtcNow) + { + Interlocked.Increment(ref _misses); return Task.FromResult(false); + } + Interlocked.Increment(ref _hits); return Task.FromResult(true); } @@ -797,19 +820,20 @@ public Task ExistsAsync(string key) if (String.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key), "Key cannot be null or empty"); - if (!_memory.TryGetValue(key, out var value) || value.ExpiresAt == DateTime.MaxValue) + if (!_memory.TryGetValue(key, out var existingEntry)) { Interlocked.Increment(ref _misses); return Task.FromResult(null); } - Interlocked.Increment(ref _hits); - if (value.ExpiresAt >= SystemClock.UtcNow) - return Task.FromResult(value.ExpiresAt.Subtract(SystemClock.UtcNow)); - - RemoveExpiredKey(key); + if (existingEntry.ExpiresAt < SystemClock.UtcNow || existingEntry.ExpiresAt == DateTime.MaxValue) + { + Interlocked.Increment(ref _misses); + return Task.FromResult(null); + } - return Task.FromResult(null); + Interlocked.Increment(ref _hits); + return Task.FromResult(existingEntry.ExpiresAt.Subtract(SystemClock.UtcNow)); } public async Task SetExpirationAsync(string key, TimeSpan expiresIn) @@ -825,9 +849,9 @@ public async Task SetExpirationAsync(string key, TimeSpan expiresIn) } Interlocked.Increment(ref _writes); - if (_memory.TryGetValue(key, out var value)) + if (_memory.TryGetValue(key, out var existingEntry)) { - value.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; await StartMaintenanceAsync().AnyContext(); } } @@ -847,23 +871,28 @@ private async Task StartMaintenanceAsync(bool compactImmediately = false) } } - private Task CompactAsync() + private async Task CompactAsync() { if (!_maxItems.HasValue || _memory.Count <= _maxItems) - return Task.CompletedTask; + return; string expiredKey = null; - lock (_lock) + using (await _lock.LockAsync().AnyContext()) { if (_memory.Count <= _maxItems) - return Task.CompletedTask; + return; (string Key, long LastAccessTicks, long InstanceNumber) oldest = (null, Int64.MaxValue, 0); foreach (var kvp in _memory) { - if (kvp.Value.LastAccessTicks < oldest.LastAccessTicks - || (kvp.Value.LastAccessTicks == oldest.LastAccessTicks && kvp.Value.InstanceNumber < oldest.InstanceNumber)) + bool isExpired = kvp.Value.ExpiresAt < SystemClock.UtcNow; + if (isExpired || + kvp.Value.LastAccessTicks < oldest.LastAccessTicks || + (kvp.Value.LastAccessTicks == oldest.LastAccessTicks && kvp.Value.InstanceNumber < oldest.InstanceNumber)) oldest = (kvp.Key, kvp.Value.LastAccessTicks, kvp.Value.InstanceNumber); + + if (isExpired) + break; } _logger.LogDebug("Removing cache entry {Key} due to cache exceeding max item count limit.", oldest); @@ -874,8 +903,6 @@ private Task CompactAsync() if (expiredKey != null) OnItemExpired(expiredKey); - - return Task.CompletedTask; } private async Task DoMaintenanceAsync() @@ -884,13 +911,19 @@ private async Task DoMaintenanceAsync() var utcNow = SystemClock.UtcNow.AddMilliseconds(50); + // Remove expired items and items that are infrequently accessed as they may be updated by add. + long lastAccessMaximumTicks = utcNow.AddMilliseconds(-300).Ticks; + try { foreach (var kvp in _memory.ToArray()) { - var expiresAt = kvp.Value.ExpiresAt; - if (expiresAt <= utcNow) + bool lastAccessTimeIsInfrequent = kvp.Value.LastAccessTicks < lastAccessMaximumTicks; + if (lastAccessTimeIsInfrequent && kvp.Value.ExpiresAt <= utcNow) + { + _logger.LogDebug("DoMaintenance: Removing expired key {Key}", kvp.Key); RemoveExpiredKey(kvp.Key); + } } } catch (Exception ex) @@ -907,7 +940,7 @@ public void Dispose() ItemExpired?.Dispose(); } - private class CacheEntry + private record CacheEntry { private object _cacheValue; private static long _instanceCount; diff --git a/src/Foundatio/Jobs/JobWithLockBase.cs b/src/Foundatio/Jobs/JobWithLockBase.cs index d6f0a8b9..fa7a21fd 100644 --- a/src/Foundatio/Jobs/JobWithLockBase.cs +++ b/src/Foundatio/Jobs/JobWithLockBase.cs @@ -20,7 +20,7 @@ public JobWithLockBase(ILoggerFactory loggerFactory = null) public string JobId { get; } = Guid.NewGuid().ToString("N").Substring(0, 10); ILogger IHaveLogger.Logger => _logger; - public async virtual Task RunAsync(CancellationToken cancellationToken = default) + public virtual async Task RunAsync(CancellationToken cancellationToken = default) { var lockValue = await GetLockAsync(cancellationToken).AnyContext(); if (lockValue == null) diff --git a/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs b/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs index 62c508b8..7c826b1a 100644 --- a/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs +++ b/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs @@ -32,7 +32,7 @@ public WorkItemJob(IQueue queue, IMessagePublisher publisher, Work IQueue IQueueJob.Queue => _queue; ILogger IHaveLogger.Logger => _logger; - public async virtual Task RunAsync(CancellationToken cancellationToken = default) + public virtual async Task RunAsync(CancellationToken cancellationToken = default) { IQueueEntry queueEntry; diff --git a/src/Foundatio/Lock/CacheLockProvider.cs b/src/Foundatio/Lock/CacheLockProvider.cs index f1841755..fa34767f 100644 --- a/src/Foundatio/Lock/CacheLockProvider.cs +++ b/src/Foundatio/Lock/CacheLockProvider.cs @@ -83,16 +83,15 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); bool isDebugLogLevelEnabled = _logger.IsEnabled(LogLevel.Debug); bool shouldWait = !cancellationToken.IsCancellationRequested; - if (isDebugLogLevelEnabled) - _logger.LogDebug("Attempting to acquire lock: {Resource}", resource); + string lockId = GenerateNewLockId(); + timeUntilExpires ??= TimeSpan.FromMinutes(20); - if (!timeUntilExpires.HasValue) - timeUntilExpires = TimeSpan.FromMinutes(20); + if (isDebugLogLevelEnabled) + _logger.LogDebug("Attempting to acquire lock ({LockId}): {Resource}", lockId, resource); using var activity = StartLockActivity(resource); bool gotLock = false; - string lockId = GenerateNewLockId(); var sw = Stopwatch.StartNew(); try { @@ -105,18 +104,22 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire else gotLock = await _cacheClient.AddAsync(resource, lockId, timeUntilExpires).AnyContext(); } - catch { } + catch (Exception ex) + { + if (isTraceLogLevelEnabled) + _logger.LogTrace(ex, "Error acquiring lock ({LockId}): {Resource}", lockId, resource); + } if (gotLock) break; if (isDebugLogLevelEnabled) - _logger.LogDebug("Failed to acquire lock: {Resource}", resource); + _logger.LogDebug("Failed to acquire lock ({LockId}): {Resource}", lockId, resource); if (cancellationToken.IsCancellationRequested) { if (isTraceLogLevelEnabled && shouldWait) - _logger.LogTrace("Cancellation requested"); + _logger.LogTrace("Cancellation requested while acquiring lock ({LockId}): {Resource}", lockId, resource); break; } @@ -128,16 +131,14 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire var keyExpiration = SystemClock.UtcNow.SafeAdd(await _cacheClient.GetExpirationAsync(resource).AnyContext() ?? TimeSpan.Zero); var delayAmount = keyExpiration.Subtract(SystemClock.UtcNow); - // delay a minimum of 50ms + // delay a minimum of 50ms and a maximum of 3 seconds if (delayAmount < TimeSpan.FromMilliseconds(50)) delayAmount = TimeSpan.FromMilliseconds(50); - - // delay a maximum of 3 seconds - if (delayAmount > TimeSpan.FromSeconds(3)) + else if (delayAmount > TimeSpan.FromSeconds(3)) delayAmount = TimeSpan.FromSeconds(3); if (isTraceLogLevelEnabled) - _logger.LogTrace("Will wait {Delay:g} before retrying to acquire lock: {Resource}", delayAmount, resource); + _logger.LogTrace("Will wait {Delay:g} before retrying to acquire lock ({LockId}): {Resource}", delayAmount, lockId, resource); // wait until we get a message saying the lock was released or 3 seconds has elapsed or cancellation has been requested using (var maxWaitCancellationTokenSource = new CancellationTokenSource(delayAmount)) @@ -176,17 +177,17 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire _lockTimeoutCounter.Add(1); if (cancellationToken.IsCancellationRequested && isTraceLogLevelEnabled) - _logger.LogTrace("Cancellation requested for lock {Resource} after {Duration:g}", resource, sw.Elapsed); + _logger.LogTrace("Cancellation requested for lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed); else if (_logger.IsEnabled(LogLevel.Warning)) - _logger.LogWarning("Failed to acquire lock {Resource} after {Duration:g}", resource, lockId, sw.Elapsed); + _logger.LogWarning("Failed to acquire lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed); return null; } if (sw.Elapsed > TimeSpan.FromSeconds(5) && _logger.IsEnabled(LogLevel.Warning)) - _logger.LogWarning("Acquired lock {Resource} ({LockId}) after {Duration:g}", resource, lockId, sw.Elapsed); + _logger.LogWarning("Acquired lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed); else if (_logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("Acquired lock {Resource} ({LockId}) after {Duration:g}", resource, lockId, sw.Elapsed); + _logger.LogDebug("Acquired lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed); return new DisposableLock(resource, lockId, sw.Elapsed, this, _logger, releaseOnDispose); } diff --git a/src/Foundatio/Lock/DisposableLock.cs b/src/Foundatio/Lock/DisposableLock.cs index fa243bbf..9601cc0a 100644 --- a/src/Foundatio/Lock/DisposableLock.cs +++ b/src/Foundatio/Lock/DisposableLock.cs @@ -1,6 +1,7 @@ using System; using System.Diagnostics; using System.Threading.Tasks; +using Foundatio.AsyncEx; using Foundatio.Utility; using Microsoft.Extensions.Logging; @@ -12,7 +13,7 @@ internal class DisposableLock : ILock private readonly ILogger _logger; private bool _isReleased; private int _renewalCount; - private readonly object _lock = new(); + private readonly AsyncLock _lock = new(); private readonly Stopwatch _duration; private readonly bool _shouldReleaseOnDispose; @@ -41,7 +42,7 @@ public async ValueTask DisposeAsync() bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); if (isTraceLogLevelEnabled) - _logger.LogTrace("Disposing lock {Resource}", Resource); + _logger.LogTrace("Disposing lock ({LockId}) {Resource}", LockId, Resource); try { @@ -50,42 +51,42 @@ public async ValueTask DisposeAsync() catch (Exception ex) { if (_logger.IsEnabled(LogLevel.Error)) - _logger.LogError(ex, "Unable to release lock {Resource}", Resource); + _logger.LogError(ex, "Unable to release lock ({LockId}) {Resource}", LockId, Resource); } if (isTraceLogLevelEnabled) - _logger.LogTrace("Disposed lock {Resource}", Resource); + _logger.LogTrace("Disposed lock ({LockId}) {Resource}", LockId, Resource); } public async Task RenewAsync(TimeSpan? lockExtension = null) { if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Renewing lock {Resource}", Resource); + _logger.LogTrace("Renewing lock ({LockId}) {Resource}", LockId, Resource); await _lockProvider.RenewAsync(Resource, LockId, lockExtension).AnyContext(); _renewalCount++; if (_logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("Renewed lock {Resource}", Resource); + _logger.LogDebug("Renewed lock ({LockId}) {Resource}", LockId, Resource); } - public Task ReleaseAsync() + public async Task ReleaseAsync() { if (_isReleased) - return Task.CompletedTask; + return; - lock (_lock) + using (await _lock.LockAsync().AnyContext()) { if (_isReleased) - return Task.CompletedTask; + return; _isReleased = true; _duration.Stop(); if (_logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("Releasing lock {Resource} after {Duration:g}", Resource, _duration.Elapsed); + _logger.LogDebug("Releasing lock ({LockId}) {Resource} after {Duration:g}", LockId, Resource, _duration.Elapsed); - return _lockProvider.ReleaseAsync(Resource, LockId); + await _lockProvider.ReleaseAsync(Resource, LockId); } } } diff --git a/src/Foundatio/Lock/DisposableLockCollection.cs b/src/Foundatio/Lock/DisposableLockCollection.cs index 1f61ba98..26671f1a 100644 --- a/src/Foundatio/Lock/DisposableLockCollection.cs +++ b/src/Foundatio/Lock/DisposableLockCollection.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.Linq; using System.Threading.Tasks; +using Foundatio.AsyncEx; using Foundatio.Utility; using Microsoft.Extensions.Logging; @@ -14,7 +15,7 @@ internal class DisposableLockCollection : ILock private readonly ILogger _logger; private bool _isReleased; private int _renewalCount; - private readonly object _lock = new(); + private readonly AsyncLock _lock = new(); private readonly Stopwatch _duration; public DisposableLockCollection(IEnumerable locks, string lockId, TimeSpan timeWaitedForLock, ILogger logger) @@ -50,15 +51,15 @@ public async Task RenewAsync(TimeSpan? lockExtension = null) _logger.LogDebug("Renewing {LockCount} locks {Resource}", _locks.Count, Resource); } - public Task ReleaseAsync() + public async Task ReleaseAsync() { if (_isReleased) - return Task.CompletedTask; + return; - lock (_lock) + using (await _lock.LockAsync().AnyContext()) { if (_isReleased) - return Task.CompletedTask; + return; _isReleased = true; _duration.Stop(); @@ -66,7 +67,7 @@ public Task ReleaseAsync() if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Releasing {LockCount} locks {Resource} after {Duration:g}", _locks.Count, Resource, _duration.Elapsed); - return Task.WhenAll(_locks.Select(l => l.ReleaseAsync())); + await Task.WhenAll(_locks.Select(l => l.ReleaseAsync())); } } diff --git a/src/Foundatio/Metrics/InMemoryMetricsClient.cs b/src/Foundatio/Metrics/InMemoryMetricsClient.cs index 8a89c481..b29c1d24 100644 --- a/src/Foundatio/Metrics/InMemoryMetricsClient.cs +++ b/src/Foundatio/Metrics/InMemoryMetricsClient.cs @@ -8,7 +8,7 @@ public class InMemoryMetricsClient : CacheBucketMetricsClientBase public InMemoryMetricsClient() : this(o => o) { } public InMemoryMetricsClient(InMemoryMetricsClientOptions options) - : base(new InMemoryCacheClient(new InMemoryCacheClientOptions { LoggerFactory = options?.LoggerFactory }), options) { } + : base(new InMemoryCacheClient(o => o.LoggerFactory(options?.LoggerFactory)), options) { } public InMemoryMetricsClient(Builder config) : this(config(new InMemoryMetricsClientOptionsBuilder()).Build()) { } diff --git a/src/Foundatio/Queues/InMemoryQueue.cs b/src/Foundatio/Queues/InMemoryQueue.cs index 8a37aa4b..a73dc490 100644 --- a/src/Foundatio/Queues/InMemoryQueue.cs +++ b/src/Foundatio/Queues/InMemoryQueue.cs @@ -221,6 +221,8 @@ protected override async Task> DequeueImplAsync(CancellationToken if (!_queue.TryDequeue(out var entry) || entry == null) return null; + ScheduleNextMaintenance(SystemClock.UtcNow.Add(_options.WorkItemTimeout)); + entry.Attempts++; entry.DequeuedTimeUtc = SystemClock.UtcNow; @@ -232,31 +234,30 @@ protected override async Task> DequeueImplAsync(CancellationToken await entry.RenewLockAsync(); await OnDequeuedAsync(entry).AnyContext(); - ScheduleNextMaintenance(SystemClock.UtcNow.Add(_options.WorkItemTimeout)); return entry; } - public override async Task RenewLockAsync(IQueueEntry entry) + public override async Task RenewLockAsync(IQueueEntry queueEntry) { - _logger.LogDebug("Queue {Name} renew lock item: {Id}", _options.Name, entry.Id); + _logger.LogDebug("Queue {Name} renew lock item: {Id}", _options.Name, queueEntry.Id); - if (!_dequeued.TryGetValue(entry.Id, out var targetEntry)) + if (!_dequeued.TryGetValue(queueEntry.Id, out var targetEntry)) return; targetEntry.RenewedTimeUtc = SystemClock.UtcNow; - await OnLockRenewedAsync(entry).AnyContext(); - _logger.LogTrace("Renew lock done: {Id}", entry.Id); + await OnLockRenewedAsync(queueEntry).AnyContext(); + _logger.LogTrace("Renew lock done: {Id}", queueEntry.Id); } - public override async Task CompleteAsync(IQueueEntry entry) + public override async Task CompleteAsync(IQueueEntry queueEntry) { - _logger.LogDebug("Queue {Name} complete item: {Id}", _options.Name, entry.Id); - if (entry.IsAbandoned || entry.IsCompleted) + _logger.LogDebug("Queue {Name} complete item: {Id}", _options.Name, queueEntry.Id); + if (queueEntry.IsAbandoned || queueEntry.IsCompleted) throw new InvalidOperationException("Queue entry has already been completed or abandoned"); - if (!_dequeued.TryRemove(entry.Id, out var info) || info == null) + if (!_dequeued.TryRemove(queueEntry.Id, out var info) || info == null) throw new Exception("Unable to remove item from the dequeued list"); if (_options.CompletedEntryRetentionLimit > 0) @@ -266,42 +267,42 @@ public override async Task CompleteAsync(IQueueEntry entry) _completedQueue.TryDequeue(out _); } - entry.MarkCompleted(); + queueEntry.MarkCompleted(); Interlocked.Increment(ref _completedCount); - await OnCompletedAsync(entry).AnyContext(); - _logger.LogTrace("Complete done: {Id}", entry.Id); + await OnCompletedAsync(queueEntry).AnyContext(); + _logger.LogTrace("Complete done: {Id}", queueEntry.Id); } - public override async Task AbandonAsync(IQueueEntry entry) + public override async Task AbandonAsync(IQueueEntry queueEntry) { - _logger.LogDebug("Queue {Name}:{QueueId} abandon item: {Id}", _options.Name, QueueId, entry.Id); + _logger.LogDebug("Queue {Name}:{QueueId} abandon item: {Id}", _options.Name, QueueId, queueEntry.Id); - if (entry.IsAbandoned || entry.IsCompleted) + if (queueEntry.IsAbandoned || queueEntry.IsCompleted) throw new InvalidOperationException("Queue entry has already been completed or abandoned"); - if (!_dequeued.TryRemove(entry.Id, out var targetEntry) || targetEntry == null) + if (!_dequeued.TryRemove(queueEntry.Id, out var targetEntry) || targetEntry == null) { foreach (var kvp in _queue) { - if (kvp.Id == entry.Id) + if (kvp.Id == queueEntry.Id) throw new Exception("Unable to remove item from the dequeued list (item is in queue)"); } foreach (var kvp in _deadletterQueue) { - if (kvp.Id == entry.Id) + if (kvp.Id == queueEntry.Id) throw new Exception("Unable to remove item from the dequeued list (item is in dead letter)"); } throw new Exception("Unable to remove item from the dequeued list"); } - entry.MarkAbandoned(); + queueEntry.MarkAbandoned(); Interlocked.Increment(ref _abandonedCount); - _logger.LogTrace("Abandon complete: {Id}", entry.Id); + _logger.LogTrace("Abandon complete: {Id}", queueEntry.Id); try { - await OnAbandonedAsync(entry).AnyContext(); + await OnAbandonedAsync(queueEntry).AnyContext(); } finally { @@ -309,18 +310,18 @@ public override async Task AbandonAsync(IQueueEntry entry) { if (_options.RetryDelay > TimeSpan.Zero) { - _logger.LogTrace("Adding item to wait list for future retry: {Id}", entry.Id); + _logger.LogTrace("Adding item to wait list for future retry: {Id}", queueEntry.Id); var unawaited = Run.DelayedAsync(GetRetryDelay(targetEntry.Attempts), () => RetryAsync(targetEntry), _queueDisposedCancellationTokenSource.Token); } else { - _logger.LogTrace("Adding item back to queue for retry: {Id}", entry.Id); + _logger.LogTrace("Adding item back to queue for retry: {Id}", queueEntry.Id); _ = Task.Run(() => RetryAsync(targetEntry)); } } else { - _logger.LogTrace("Exceeded retry limit moving to deadletter: {Id}", entry.Id); + _logger.LogTrace("Exceeded retry limit moving to deadletter: {Id}", queueEntry.Id); _deadletterQueue.Enqueue(targetEntry); } } @@ -391,7 +392,9 @@ public override Task DeleteQueueAsync() _logger.LogError(ex, "DoMaintenance Error: {Message}", ex.Message); } - return minAbandonAt; + // Add a tiny buffer just in case the schedule next timer fires early. + // The system clock typically has a resolution of 10-15 milliseconds, so timers cannot be more accurate than this resolution. + return minAbandonAt.SafeAdd(TimeSpan.FromMilliseconds(15)); } public override void Dispose() diff --git a/src/Foundatio/Utility/Run.cs b/src/Foundatio/Utility/Run.cs index 9283bbfd..f3b7ce1f 100644 --- a/src/Foundatio/Utility/Run.cs +++ b/src/Foundatio/Utility/Run.cs @@ -25,6 +25,7 @@ public static Task DelayedAsync(TimeSpan delay, Func action, CancellationT }, cancellationToken); } + [Obsolete("Use Parallel.ForEachAsync")] public static Task InParallelAsync(int iterations, Func work) { return Task.WhenAll(Enumerable.Range(1, iterations).Select(i => Task.Run(() => work(i)))); diff --git a/src/Foundatio/Utility/ScheduledTimer.cs b/src/Foundatio/Utility/ScheduledTimer.cs index 045e2daa..74431e94 100644 --- a/src/Foundatio/Utility/ScheduledTimer.cs +++ b/src/Foundatio/Utility/ScheduledTimer.cs @@ -38,6 +38,7 @@ public void ScheduleNext(DateTime? utcDate = null) bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); if (isTraceLogLevelEnabled) _logger.LogTrace("ScheduleNext called: value={NextRun:O}", utcDate.Value); + if (utcDate == DateTime.MaxValue) { if (isTraceLogLevelEnabled) _logger.LogTrace("Ignoring MaxValue"); @@ -48,7 +49,7 @@ public void ScheduleNext(DateTime? utcDate = null) if (_next > utcNow && utcDate > _next) { if (isTraceLogLevelEnabled) - _logger.LogTrace("Ignoring because already scheduled for earlier time: {PreviousTicks} Next: {NextTicks}", utcDate.Value.Ticks, _next.Ticks); + _logger.LogTrace("Ignoring because already scheduled for earlier time: {PreviousNextRun:O} Next: {NextRun:O}", utcDate.Value, _next); return; } @@ -65,7 +66,7 @@ public void ScheduleNext(DateTime? utcDate = null) if (_next > utcNow && utcDate > _next) { if (isTraceLogLevelEnabled) - _logger.LogTrace("Ignoring because already scheduled for earlier time: {PreviousTicks} Next: {NextTicks}", utcDate.Value.Ticks, _next.Ticks); + _logger.LogTrace("Ignoring because already scheduled for earlier time: {PreviousNextRun:O} Next: {NextRun:O}", utcDate.Value, _next); return; } @@ -101,6 +102,9 @@ private async Task RunCallbackAsync() return; } + // If the callback runs before the next time, then store it here before we reset it and use it for scheduling. + DateTime? nextTimeOverride = null; + if (isTraceLogLevelEnabled) _logger.LogTrace("Starting RunCallbackAsync"); using (await _lock.LockAsync().AnyContext()) { @@ -114,6 +118,13 @@ private async Task RunCallbackAsync() } _last = SystemClock.UtcNow; + if (SystemClock.UtcNow < _next) + { + _logger.LogWarning("ScheduleNext RunCallbackAsync was called before next run time {NextRun:O}, setting next to current time and rescheduling", _next); + nextTimeOverride = _next; + _next = SystemClock.UtcNow; + _shouldRunAgainImmediately = true; + } } try @@ -146,7 +157,10 @@ private async Task RunCallbackAsync() if (isTraceLogLevelEnabled) _logger.LogTrace("Finished sleeping"); } - var nextRun = SystemClock.UtcNow.AddMilliseconds(10); + var nextRun = SystemClock.UtcNow.AddMilliseconds(10); + if (nextRun < nextTimeOverride) + nextRun = nextTimeOverride.Value; + if (_shouldRunAgainImmediately || next.HasValue && next.Value <= nextRun) ScheduleNext(nextRun); else if (next.HasValue) diff --git a/tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs b/tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs index dabaff30..b88b3976 100644 --- a/tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs +++ b/tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Threading.Tasks; using Foundatio.Caching; @@ -155,8 +155,6 @@ public override Task CanManageListsAsync() [Fact] public async Task CanSetMaxItems() { - Log.DefaultMinimumLevel = LogLevel.Trace; - // run in tight loop so that the code is warmed up and we can catch timing issues for (int x = 0; x < 5; x++) { @@ -196,7 +194,9 @@ public async Task SetAllShouldExpire() var expiry = TimeSpan.FromMilliseconds(50); await client.SetAllAsync(new Dictionary { { "test", "value" } }, expiry); - await Task.Delay(expiry); + + // Add 1ms to the expiry to ensure the cache has expired as the delay window is not guaranteed to be exact. + await Task.Delay(expiry.Add(TimeSpan.FromMilliseconds(1))); Assert.False(await client.ExistsAsync("test")); } diff --git a/tests/Foundatio.Tests/Caching/InMemoryHybridCacheClientTests.cs b/tests/Foundatio.Tests/Caching/InMemoryHybridCacheClientTests.cs index 3003f60c..6e00af01 100644 --- a/tests/Foundatio.Tests/Caching/InMemoryHybridCacheClientTests.cs +++ b/tests/Foundatio.Tests/Caching/InMemoryHybridCacheClientTests.cs @@ -81,7 +81,6 @@ public override Task WillExpireRemoteItems() [Fact(Skip = "Skip because cache invalidation loops on this with 2 in memory cache client instances")] public override Task WillWorkWithSets() { - Log.DefaultMinimumLevel = LogLevel.Trace; return base.WillWorkWithSets(); } diff --git a/tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs b/tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs index 31c0bb4a..ef1a9c89 100644 --- a/tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs +++ b/tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs @@ -25,7 +25,7 @@ public override Task CanRunMultipleQueueJobsAsync() return base.CanRunMultipleQueueJobsAsync(); } - [RetryFact] + [Fact] public override Task CanRunQueueJobWithLockFailAsync() { Log.SetLogLevel(LogLevel.Trace); diff --git a/tests/Foundatio.Tests/Jobs/JobTests.cs b/tests/Foundatio.Tests/Jobs/JobTests.cs index a82cb52f..48777b69 100644 --- a/tests/Foundatio.Tests/Jobs/JobTests.cs +++ b/tests/Foundatio.Tests/Jobs/JobTests.cs @@ -117,7 +117,7 @@ public async Task CanCancelContinuousJobs() } } - [RetryFact] + [Fact] public async Task CanRunJobsWithLocks() { var job = new WithLockingJob(Log); @@ -128,22 +128,21 @@ public async Task CanRunJobsWithLocks() await job.RunContinuousAsync(iterationLimit: 2); Assert.Equal(3, job.RunCount); - await Run.InParallelAsync(2, i => job.RunAsync()); + await Parallel.ForEachAsync(Enumerable.Range(1, 2), async (_, ct) => await job.RunAsync(ct)); Assert.Equal(4, job.RunCount); } [Fact] public async Task CanRunThrottledJobs() { - using var client = new InMemoryCacheClient(new InMemoryCacheClientOptions { LoggerFactory = Log }); + using var client = new InMemoryCacheClient(o => o.LoggerFactory(Log)); var jobs = new List(new[] { new ThrottledJob(client, Log), new ThrottledJob(client, Log), new ThrottledJob(client, Log) }); var sw = Stopwatch.StartNew(); - using (var timeoutCancellationTokenSource = new CancellationTokenSource(1000)) - { - await Task.WhenAll(jobs.Select(job => job.RunContinuousAsync(TimeSpan.FromMilliseconds(1), cancellationToken: timeoutCancellationTokenSource.Token))); - } + using var timeoutCancellationTokenSource = new CancellationTokenSource(1000); + await Task.WhenAll(jobs.Select(job => job.RunContinuousAsync(TimeSpan.FromMilliseconds(1), cancellationToken: timeoutCancellationTokenSource.Token))); sw.Stop(); + Assert.InRange(jobs.Sum(j => j.RunCount), 4, 14); _logger.LogInformation(jobs.Sum(j => j.RunCount).ToString()); Assert.InRange(sw.ElapsedMilliseconds, 20, 1500); diff --git a/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs b/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs index 3f010eaa..ba1e5279 100644 --- a/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs +++ b/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs @@ -59,19 +59,31 @@ public override Task CanAcquireMultipleResources() return base.CanAcquireMultipleResources(); } - [RetryFact] + [Fact] public override Task CanAcquireLocksInParallel() { return base.CanAcquireLocksInParallel(); } + [Fact] + public override Task CanAcquireScopedLocksInParallel() + { + return base.CanAcquireScopedLocksInParallel(); + } + + [Fact] + public override Task CanAcquireMultipleLocksInParallel() + { + return base.CanAcquireMultipleLocksInParallel(); + } + [Fact] public override Task CanAcquireMultipleScopedResources() { return base.CanAcquireMultipleScopedResources(); } - [RetryFact] + [Fact] public override Task WillThrottleCallsAsync() { Log.SetLogLevel(LogLevel.Trace); diff --git a/tests/Foundatio.Tests/Metrics/DiagnosticsMetricsTests.cs b/tests/Foundatio.Tests/Metrics/DiagnosticsMetricsTests.cs index 92c1c4f6..6bd06c12 100644 --- a/tests/Foundatio.Tests/Metrics/DiagnosticsMetricsTests.cs +++ b/tests/Foundatio.Tests/Metrics/DiagnosticsMetricsTests.cs @@ -16,7 +16,6 @@ public class DiagnosticsMetricsTests : TestWithLoggingBase, IDisposable public DiagnosticsMetricsTests(ITestOutputHelper output) : base(output) { - Log.DefaultMinimumLevel = LogLevel.Trace; _client = new DiagnosticsMetricsClient(o => o.MeterName("Test")); } diff --git a/tests/Foundatio.Tests/Metrics/InMemoryMetricsTests.cs b/tests/Foundatio.Tests/Metrics/InMemoryMetricsTests.cs index 619ecc30..7953aceb 100644 --- a/tests/Foundatio.Tests/Metrics/InMemoryMetricsTests.cs +++ b/tests/Foundatio.Tests/Metrics/InMemoryMetricsTests.cs @@ -31,7 +31,7 @@ public override Task CanIncrementCounterAsync() return base.CanIncrementCounterAsync(); } - [RetryFact] + [Fact] public override Task CanWaitForCounterAsync() { using (TestSystemClock.Install()) diff --git a/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs b/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs index 29893357..8d0a7fbf 100644 --- a/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs +++ b/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Threading; using System.Threading.Tasks; using Foundatio.Queues; using Foundatio.Utility; @@ -290,7 +291,6 @@ public override Task VerifyDelayedRetryAttemptsAsync() [Fact] public override Task CanHandleAutoAbandonInWorker() { - Log.DefaultMinimumLevel = LogLevel.Trace; return base.CanHandleAutoAbandonInWorker(); } @@ -386,6 +386,7 @@ public virtual async Task CompleteOnAutoAbandonedHandledProperly_Issue239() { // create queue with short work item timeout so it will be auto abandoned var queue = new InMemoryQueue_Issue239(Log); + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); // completion source to wait for CompleteAsync call before the assert var taskCompletionSource = new TaskCompletionSource(); @@ -393,7 +394,7 @@ public virtual async Task CompleteOnAutoAbandonedHandledProperly_Issue239() // start handling items await queue.StartWorkingAsync(async (item) => { - // we want to wait for maintainance to be performed and auto abandon our item, we don't have any way for waiting in IQueue so we'll settle for a delay + // we want to wait for maintenance to be performed and auto abandon our item, we don't have any way for waiting in IQueue so we'll settle for a delay if (item.Value.Data == "Delay") { await Task.Delay(TimeSpan.FromSeconds(1)); @@ -410,7 +411,7 @@ await queue.StartWorkingAsync(async (item) => // infrastructure handles user exception incorrectly taskCompletionSource.SetResult(true); } - }); + }, cancellationToken: cancellationTokenSource.Token); // enqueue item which will be processed after it's auto abandoned await queue.EnqueueAsync(new SimpleWorkItem() { Data = "Delay" }); @@ -426,8 +427,6 @@ await queue.StartWorkingAsync(async (item) => // one option to fix this issue is surrounding the AbandonAsync call in StartWorkingImpl exception handler in inner try/catch block timedout = (await Task.WhenAny(taskCompletionSource.Task, Task.Delay(TimeSpan.FromSeconds(2)))) != taskCompletionSource.Task; Assert.False(timedout); - - return; } #endregion diff --git a/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs b/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs index fae29893..e8fb1a80 100644 --- a/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs +++ b/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs @@ -33,7 +33,7 @@ public Task CanRun() return resetEvent.WaitAsync(new CancellationTokenSource(500).Token); } - [RetryFact] + [Fact] public Task CanRunAndScheduleConcurrently() { return CanRunConcurrentlyAsync(); @@ -47,7 +47,6 @@ public Task CanRunWithMinimumInterval() private async Task CanRunConcurrentlyAsync(TimeSpan? minimumIntervalTime = null) { - Log.DefaultMinimumLevel = LogLevel.Trace; const int iterations = 2; var countdown = new AsyncCountdownEvent(iterations);