From 9087b7376fd45c0c3f473eb58e6241e38059cd53 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Mon, 18 Mar 2024 16:13:28 -0500 Subject: [PATCH 01/28] Remove retry fact on tests to expose flakey tests and add more cache lock tests and changes --- .../Locks/LockTestBase.cs | 74 ++++++++++++++++++- src/Foundatio/Caching/InMemoryCacheClient.cs | 13 ++-- src/Foundatio/Lock/CacheLockProvider.cs | 35 ++++----- src/Foundatio/Lock/DisposableLock.cs | 25 ++++--- .../Lock/DisposableLockCollection.cs | 13 ++-- .../Jobs/InMemoryJobQueueTests.cs | 2 +- .../Locks/InMemoryLockTests.cs | 16 +++- .../Metrics/InMemoryMetricsTests.cs | 2 +- .../Utility/ScheduledTimerTests.cs | 2 +- 9 files changed, 133 insertions(+), 49 deletions(-) diff --git a/src/Foundatio.TestHarness/Locks/LockTestBase.cs b/src/Foundatio.TestHarness/Locks/LockTestBase.cs index 1b987aab..69ffca43 100644 --- a/src/Foundatio.TestHarness/Locks/LockTestBase.cs +++ b/src/Foundatio.TestHarness/Locks/LockTestBase.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -183,6 +183,42 @@ public virtual async Task CanAcquireLocksInParallel() if (locker == null) return; + Log.SetLogLevel(LogLevel.Trace); + + const int COUNT = 100; + int current = 1; + var used = new List(); + int concurrency = 0; + + await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) => + { + await using var myLock = await locker.AcquireAsync("test", TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)); + Assert.NotNull(myLock); + + int currentConcurrency = Interlocked.Increment(ref concurrency); + Assert.Equal(1, currentConcurrency); + + int item = current; + await Task.Delay(10, ct); + used.Add(item); + current++; + + Interlocked.Decrement(ref concurrency); + }); + + var duplicates = used.GroupBy(x => x).Where(g => g.Count() > 1); + Assert.Empty(duplicates); + Assert.Equal(COUNT, used.Count); + } + + public virtual async Task CanAcquireScopedLocksInParallel() + { + var lockProvider = GetLockProvider(); + if (lockProvider == null) + return; + + var locker = new ScopedLockProvider(lockProvider, "scoped"); + Log.SetLogLevel(LogLevel.Debug); const int COUNT = 100; @@ -190,7 +226,7 @@ public virtual async Task CanAcquireLocksInParallel() var used = new List(); int concurrency = 0; - await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (index, ct) => + await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) => { await using var myLock = await locker.AcquireAsync("test", TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)); Assert.NotNull(myLock); @@ -211,6 +247,40 @@ await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (index, ct) => Assert.Equal(COUNT, used.Count); } + public virtual async Task CanAcquireMultipleLocksInParallel() + { + var locker = GetLockProvider(); + if (locker == null) + return; + + Log.SetLogLevel(LogLevel.Debug); + + const int COUNT = 100; + int current = 1; + var used = new List(); + int concurrency = 0; + + await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) => + { + await using var myLock = await locker.AcquireAsync(["test"], TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)); + Assert.NotNull(myLock); + + int currentConcurrency = Interlocked.Increment(ref concurrency); + Assert.Equal(1, currentConcurrency); + + int item = current; + await Task.Delay(10, ct); + used.Add(item); + current++; + + Interlocked.Decrement(ref concurrency); + }); + + var duplicates = used.GroupBy(x => x).Where(g => g.Count() > 1); + Assert.Empty(duplicates); + Assert.Equal(COUNT, used.Count); + } + public virtual async Task LockOneAtATimeAsync() { var locker = GetLockProvider(); diff --git a/src/Foundatio/Caching/InMemoryCacheClient.cs b/src/Foundatio/Caching/InMemoryCacheClient.cs index 5b7d5ad2..8085b040 100644 --- a/src/Foundatio/Caching/InMemoryCacheClient.cs +++ b/src/Foundatio/Caching/InMemoryCacheClient.cs @@ -6,6 +6,7 @@ using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; +using Foundatio.AsyncEx; using Foundatio.Utility; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -22,7 +23,7 @@ public class InMemoryCacheClient : IMemoryCacheClient private long _hits; private long _misses; private readonly ILogger _logger; - private readonly object _lock = new(); + private readonly AsyncLock _lock = new(); public InMemoryCacheClient() : this(o => o) { } @@ -847,16 +848,16 @@ private async Task StartMaintenanceAsync(bool compactImmediately = false) } } - private Task CompactAsync() + private async Task CompactAsync() { if (!_maxItems.HasValue || _memory.Count <= _maxItems) - return Task.CompletedTask; + return; string expiredKey = null; - lock (_lock) + using (await _lock.LockAsync().AnyContext()) { if (_memory.Count <= _maxItems) - return Task.CompletedTask; + return; (string Key, long LastAccessTicks, long InstanceNumber) oldest = (null, Int64.MaxValue, 0); foreach (var kvp in _memory) @@ -874,8 +875,6 @@ private Task CompactAsync() if (expiredKey != null) OnItemExpired(expiredKey); - - return Task.CompletedTask; } private async Task DoMaintenanceAsync() diff --git a/src/Foundatio/Lock/CacheLockProvider.cs b/src/Foundatio/Lock/CacheLockProvider.cs index f1841755..fa34767f 100644 --- a/src/Foundatio/Lock/CacheLockProvider.cs +++ b/src/Foundatio/Lock/CacheLockProvider.cs @@ -83,16 +83,15 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); bool isDebugLogLevelEnabled = _logger.IsEnabled(LogLevel.Debug); bool shouldWait = !cancellationToken.IsCancellationRequested; - if (isDebugLogLevelEnabled) - _logger.LogDebug("Attempting to acquire lock: {Resource}", resource); + string lockId = GenerateNewLockId(); + timeUntilExpires ??= TimeSpan.FromMinutes(20); - if (!timeUntilExpires.HasValue) - timeUntilExpires = TimeSpan.FromMinutes(20); + if (isDebugLogLevelEnabled) + _logger.LogDebug("Attempting to acquire lock ({LockId}): {Resource}", lockId, resource); using var activity = StartLockActivity(resource); bool gotLock = false; - string lockId = GenerateNewLockId(); var sw = Stopwatch.StartNew(); try { @@ -105,18 +104,22 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire else gotLock = await _cacheClient.AddAsync(resource, lockId, timeUntilExpires).AnyContext(); } - catch { } + catch (Exception ex) + { + if (isTraceLogLevelEnabled) + _logger.LogTrace(ex, "Error acquiring lock ({LockId}): {Resource}", lockId, resource); + } if (gotLock) break; if (isDebugLogLevelEnabled) - _logger.LogDebug("Failed to acquire lock: {Resource}", resource); + _logger.LogDebug("Failed to acquire lock ({LockId}): {Resource}", lockId, resource); if (cancellationToken.IsCancellationRequested) { if (isTraceLogLevelEnabled && shouldWait) - _logger.LogTrace("Cancellation requested"); + _logger.LogTrace("Cancellation requested while acquiring lock ({LockId}): {Resource}", lockId, resource); break; } @@ -128,16 +131,14 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire var keyExpiration = SystemClock.UtcNow.SafeAdd(await _cacheClient.GetExpirationAsync(resource).AnyContext() ?? TimeSpan.Zero); var delayAmount = keyExpiration.Subtract(SystemClock.UtcNow); - // delay a minimum of 50ms + // delay a minimum of 50ms and a maximum of 3 seconds if (delayAmount < TimeSpan.FromMilliseconds(50)) delayAmount = TimeSpan.FromMilliseconds(50); - - // delay a maximum of 3 seconds - if (delayAmount > TimeSpan.FromSeconds(3)) + else if (delayAmount > TimeSpan.FromSeconds(3)) delayAmount = TimeSpan.FromSeconds(3); if (isTraceLogLevelEnabled) - _logger.LogTrace("Will wait {Delay:g} before retrying to acquire lock: {Resource}", delayAmount, resource); + _logger.LogTrace("Will wait {Delay:g} before retrying to acquire lock ({LockId}): {Resource}", delayAmount, lockId, resource); // wait until we get a message saying the lock was released or 3 seconds has elapsed or cancellation has been requested using (var maxWaitCancellationTokenSource = new CancellationTokenSource(delayAmount)) @@ -176,17 +177,17 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire _lockTimeoutCounter.Add(1); if (cancellationToken.IsCancellationRequested && isTraceLogLevelEnabled) - _logger.LogTrace("Cancellation requested for lock {Resource} after {Duration:g}", resource, sw.Elapsed); + _logger.LogTrace("Cancellation requested for lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed); else if (_logger.IsEnabled(LogLevel.Warning)) - _logger.LogWarning("Failed to acquire lock {Resource} after {Duration:g}", resource, lockId, sw.Elapsed); + _logger.LogWarning("Failed to acquire lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed); return null; } if (sw.Elapsed > TimeSpan.FromSeconds(5) && _logger.IsEnabled(LogLevel.Warning)) - _logger.LogWarning("Acquired lock {Resource} ({LockId}) after {Duration:g}", resource, lockId, sw.Elapsed); + _logger.LogWarning("Acquired lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed); else if (_logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("Acquired lock {Resource} ({LockId}) after {Duration:g}", resource, lockId, sw.Elapsed); + _logger.LogDebug("Acquired lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed); return new DisposableLock(resource, lockId, sw.Elapsed, this, _logger, releaseOnDispose); } diff --git a/src/Foundatio/Lock/DisposableLock.cs b/src/Foundatio/Lock/DisposableLock.cs index fa243bbf..9601cc0a 100644 --- a/src/Foundatio/Lock/DisposableLock.cs +++ b/src/Foundatio/Lock/DisposableLock.cs @@ -1,6 +1,7 @@ using System; using System.Diagnostics; using System.Threading.Tasks; +using Foundatio.AsyncEx; using Foundatio.Utility; using Microsoft.Extensions.Logging; @@ -12,7 +13,7 @@ internal class DisposableLock : ILock private readonly ILogger _logger; private bool _isReleased; private int _renewalCount; - private readonly object _lock = new(); + private readonly AsyncLock _lock = new(); private readonly Stopwatch _duration; private readonly bool _shouldReleaseOnDispose; @@ -41,7 +42,7 @@ public async ValueTask DisposeAsync() bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); if (isTraceLogLevelEnabled) - _logger.LogTrace("Disposing lock {Resource}", Resource); + _logger.LogTrace("Disposing lock ({LockId}) {Resource}", LockId, Resource); try { @@ -50,42 +51,42 @@ public async ValueTask DisposeAsync() catch (Exception ex) { if (_logger.IsEnabled(LogLevel.Error)) - _logger.LogError(ex, "Unable to release lock {Resource}", Resource); + _logger.LogError(ex, "Unable to release lock ({LockId}) {Resource}", LockId, Resource); } if (isTraceLogLevelEnabled) - _logger.LogTrace("Disposed lock {Resource}", Resource); + _logger.LogTrace("Disposed lock ({LockId}) {Resource}", LockId, Resource); } public async Task RenewAsync(TimeSpan? lockExtension = null) { if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Renewing lock {Resource}", Resource); + _logger.LogTrace("Renewing lock ({LockId}) {Resource}", LockId, Resource); await _lockProvider.RenewAsync(Resource, LockId, lockExtension).AnyContext(); _renewalCount++; if (_logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("Renewed lock {Resource}", Resource); + _logger.LogDebug("Renewed lock ({LockId}) {Resource}", LockId, Resource); } - public Task ReleaseAsync() + public async Task ReleaseAsync() { if (_isReleased) - return Task.CompletedTask; + return; - lock (_lock) + using (await _lock.LockAsync().AnyContext()) { if (_isReleased) - return Task.CompletedTask; + return; _isReleased = true; _duration.Stop(); if (_logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("Releasing lock {Resource} after {Duration:g}", Resource, _duration.Elapsed); + _logger.LogDebug("Releasing lock ({LockId}) {Resource} after {Duration:g}", LockId, Resource, _duration.Elapsed); - return _lockProvider.ReleaseAsync(Resource, LockId); + await _lockProvider.ReleaseAsync(Resource, LockId); } } } diff --git a/src/Foundatio/Lock/DisposableLockCollection.cs b/src/Foundatio/Lock/DisposableLockCollection.cs index 1f61ba98..26671f1a 100644 --- a/src/Foundatio/Lock/DisposableLockCollection.cs +++ b/src/Foundatio/Lock/DisposableLockCollection.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.Linq; using System.Threading.Tasks; +using Foundatio.AsyncEx; using Foundatio.Utility; using Microsoft.Extensions.Logging; @@ -14,7 +15,7 @@ internal class DisposableLockCollection : ILock private readonly ILogger _logger; private bool _isReleased; private int _renewalCount; - private readonly object _lock = new(); + private readonly AsyncLock _lock = new(); private readonly Stopwatch _duration; public DisposableLockCollection(IEnumerable locks, string lockId, TimeSpan timeWaitedForLock, ILogger logger) @@ -50,15 +51,15 @@ public async Task RenewAsync(TimeSpan? lockExtension = null) _logger.LogDebug("Renewing {LockCount} locks {Resource}", _locks.Count, Resource); } - public Task ReleaseAsync() + public async Task ReleaseAsync() { if (_isReleased) - return Task.CompletedTask; + return; - lock (_lock) + using (await _lock.LockAsync().AnyContext()) { if (_isReleased) - return Task.CompletedTask; + return; _isReleased = true; _duration.Stop(); @@ -66,7 +67,7 @@ public Task ReleaseAsync() if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Releasing {LockCount} locks {Resource} after {Duration:g}", _locks.Count, Resource, _duration.Elapsed); - return Task.WhenAll(_locks.Select(l => l.ReleaseAsync())); + await Task.WhenAll(_locks.Select(l => l.ReleaseAsync())); } } diff --git a/tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs b/tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs index 31c0bb4a..ef1a9c89 100644 --- a/tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs +++ b/tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs @@ -25,7 +25,7 @@ public override Task CanRunMultipleQueueJobsAsync() return base.CanRunMultipleQueueJobsAsync(); } - [RetryFact] + [Fact] public override Task CanRunQueueJobWithLockFailAsync() { Log.SetLogLevel(LogLevel.Trace); diff --git a/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs b/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs index 3f010eaa..64ebbd4b 100644 --- a/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs +++ b/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs @@ -59,19 +59,31 @@ public override Task CanAcquireMultipleResources() return base.CanAcquireMultipleResources(); } - [RetryFact] + [Fact] public override Task CanAcquireLocksInParallel() { return base.CanAcquireLocksInParallel(); } + [Fact] + public override Task CanAcquireScopedLocksInParallel() + { + return base.CanAcquireScopedLocksInParallel(); + } + + [Fact] + public override Task CanAcquireMultipleLocksInParallel() + { + return base.CanAcquireScopedLocksInParallel(); + } + [Fact] public override Task CanAcquireMultipleScopedResources() { return base.CanAcquireMultipleScopedResources(); } - [RetryFact] + [Fact] public override Task WillThrottleCallsAsync() { Log.SetLogLevel(LogLevel.Trace); diff --git a/tests/Foundatio.Tests/Metrics/InMemoryMetricsTests.cs b/tests/Foundatio.Tests/Metrics/InMemoryMetricsTests.cs index 619ecc30..7953aceb 100644 --- a/tests/Foundatio.Tests/Metrics/InMemoryMetricsTests.cs +++ b/tests/Foundatio.Tests/Metrics/InMemoryMetricsTests.cs @@ -31,7 +31,7 @@ public override Task CanIncrementCounterAsync() return base.CanIncrementCounterAsync(); } - [RetryFact] + [Fact] public override Task CanWaitForCounterAsync() { using (TestSystemClock.Install()) diff --git a/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs b/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs index fae29893..88d1265a 100644 --- a/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs +++ b/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs @@ -33,7 +33,7 @@ public Task CanRun() return resetEvent.WaitAsync(new CancellationTokenSource(500).Token); } - [RetryFact] + [Fact] public Task CanRunAndScheduleConcurrently() { return CanRunConcurrentlyAsync(); From 89978292ccab240a8cd5bc04a4dcdef4f9f01db6 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Tue, 19 Mar 2024 13:10:31 -0500 Subject: [PATCH 02/28] Update InMemoryLockTests.cs Co-authored-by: scharde-encora <121244313+scharde-encora@users.noreply.github.com> --- tests/Foundatio.Tests/Locks/InMemoryLockTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs b/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs index 64ebbd4b..ba1e5279 100644 --- a/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs +++ b/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs @@ -74,7 +74,7 @@ public override Task CanAcquireScopedLocksInParallel() [Fact] public override Task CanAcquireMultipleLocksInParallel() { - return base.CanAcquireScopedLocksInParallel(); + return base.CanAcquireMultipleLocksInParallel(); } [Fact] From 2805ed028b69dcb341d63dfd575e6528bd9ecd91 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Tue, 19 Mar 2024 09:53:29 -0500 Subject: [PATCH 03/28] Ignore idea folder --- .gitignore | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/.gitignore b/.gitignore index 897f2f01..d8662874 100644 --- a/.gitignore +++ b/.gitignore @@ -32,23 +32,4 @@ _NCrunch_* .DS_Store # Rider - -# User specific -**/.idea/**/workspace.xml -**/.idea/**/tasks.xml -**/.idea/shelf/* -**/.idea/dictionaries - -# Sensitive or high-churn files -**/.idea/**/dataSources/ -**/.idea/**/dataSources.ids -**/.idea/**/dataSources.xml -**/.idea/**/dataSources.local.xml -**/.idea/**/sqlDataSources.xml -**/.idea/**/dynamic.xml - -# Rider -# Rider auto-generates .iml files, and contentModel.xml -**/.idea/**/*.iml -**/.idea/**/contentModel.xml -**/.idea/**/modules.xml \ No newline at end of file +.idea From bc5bbf2d3ccf9f2f147bc668880508246bc209ad Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Tue, 19 Mar 2024 15:32:12 -0500 Subject: [PATCH 04/28] Updates to cache client. --- src/Foundatio/Caching/InMemoryCacheClient.cs | 214 +++++++++++-------- 1 file changed, 119 insertions(+), 95 deletions(-) diff --git a/src/Foundatio/Caching/InMemoryCacheClient.cs b/src/Foundatio/Caching/InMemoryCacheClient.cs index 8085b040..09311779 100644 --- a/src/Foundatio/Caching/InMemoryCacheClient.cs +++ b/src/Foundatio/Caching/InMemoryCacheClient.cs @@ -16,9 +16,9 @@ namespace Foundatio.Caching; public class InMemoryCacheClient : IMemoryCacheClient { private readonly ConcurrentDictionary _memory; - private bool _shouldClone; - private bool _shouldThrowOnSerializationErrors; - private int? _maxItems; + private readonly bool _shouldClone; + private readonly bool _shouldThrowOnSerializationErrors; + private readonly int? _maxItems; private long _writes; private long _hits; private long _misses; @@ -69,7 +69,7 @@ private void OnItemExpired(string key, bool sendNotification = true) if (ItemExpired == null) return; - Task.Factory.StartNew(state => + Task.Factory.StartNew(_ => { var args = new ItemExpiredEventArgs { @@ -120,27 +120,31 @@ public async Task RemoveIfEqualAsync(string key, T expected) if (String.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key), "Key cannot be null or empty"); - if (_logger.IsEnabled(LogLevel.Trace)) + bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); + if (isTraceLogLevelEnabled) _logger.LogTrace("RemoveIfEqualAsync Key: {Key} Expected: {Expected}", key, expected); bool wasExpectedValue = false; - bool success = _memory.TryUpdate(key, (k, e) => + bool success = _memory.TryUpdate(key, (key, existingEntry) => { - var currentValue = e.GetValue(); + var currentValue = existingEntry.GetValue(); if (currentValue.Equals(expected)) { - e.ExpiresAt = DateTime.MinValue; + if (isTraceLogLevelEnabled) + _logger.LogTrace("RemoveIfEqualAsync Key: {Key} Updating ExpiresAt to DateTime.MinValue", key); + + existingEntry.ExpiresAt = DateTime.MinValue; wasExpectedValue = true; } - return e; + return existingEntry; }); success = success && wasExpectedValue; await StartMaintenanceAsync().AnyContext(); - if (_logger.IsEnabled(LogLevel.Trace)) + if (isTraceLogLevelEnabled) _logger.LogTrace("RemoveIfEqualAsync Key: {Key} Expected: {Expected} Success: {Success}", key, expected, success); return success; @@ -155,13 +159,14 @@ public Task RemoveAllAsync(IEnumerable keys = null) return Task.FromResult(count); } + bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); int removed = 0; foreach (string key in keys) { if (String.IsNullOrEmpty(key)) continue; - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("RemoveAllAsync: Removing key: {Key}", key); + if (isTraceLogLevelEnabled) _logger.LogTrace("RemoveAllAsync: Removing key: {Key}", key); if (_memory.TryRemove(key, out _)) removed++; } @@ -190,10 +195,19 @@ public Task RemoveByPrefixAsync(string prefix) internal void RemoveExpiredKey(string key, bool sendNotification = true) { - _logger.LogDebug("Removing expired cache entry {Key}", key); + // Consideration: We could reduce the amount of calls to this by updating the key and only having maintenance run this. + // Consideration: We could also only remove expired items after a last access period determined when any property on the model is updated. + if (_memory.TryGetValue(key, out var existingEntry) && existingEntry.ExpiresAt < SystemClock.UtcNow) + { + if (_memory.TryRemove(key, out var removedEntry)) + { + if (removedEntry.ExpiresAt >= SystemClock.UtcNow) + throw new Exception("Removed item was not expired"); - if (_memory.TryRemove(key, out _)) - OnItemExpired(key, sendNotification); + _logger.LogDebug("Removing expired cache entry {Key}", key); + OnItemExpired(key, sendNotification); + } + } } public Task> GetAsync(string key) @@ -201,13 +215,13 @@ public Task> GetAsync(string key) if (String.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key), "Key cannot be null or empty"); - if (!_memory.TryGetValue(key, out var cacheEntry)) + if (!_memory.TryGetValue(key, out var existingEntry)) { Interlocked.Increment(ref _misses); return Task.FromResult(CacheValue.NoValue); } - if (cacheEntry.ExpiresAt < SystemClock.UtcNow) + if (existingEntry.ExpiresAt < SystemClock.UtcNow) { RemoveExpiredKey(key); Interlocked.Increment(ref _misses); @@ -218,13 +232,13 @@ public Task> GetAsync(string key) try { - var value = cacheEntry.GetValue(); + var value = existingEntry.GetValue(); return Task.FromResult(new CacheValue(value, true)); } catch (Exception ex) { if (_logger.IsEnabled(LogLevel.Error)) - _logger.LogError(ex, "Unable to deserialize value {Value} to type {TypeFullName}", cacheEntry.Value, typeof(T).FullName); + _logger.LogError(ex, "Unable to deserialize value {Value} to type {TypeFullName}", existingEntry.Value, typeof(T).FullName); if (_shouldThrowOnSerializationErrors) throw; @@ -276,12 +290,12 @@ public async Task SetIfHigherAsync(string key, double value, TimeSpan? e double difference = value; var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; - _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (k, entry) => + _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (_, existingEntry) => { double? currentValue = null; try { - currentValue = entry.GetValue(); + currentValue = existingEntry.GetValue(); } catch (Exception ex) { @@ -291,15 +305,17 @@ public async Task SetIfHigherAsync(string key, double value, TimeSpan? e if (currentValue.HasValue && currentValue.Value < value) { difference = value - currentValue.Value; - entry.Value = value; + existingEntry.Value = value; } else + { difference = 0; + } if (expiresIn.HasValue) - entry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return entry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -322,12 +338,12 @@ public async Task SetIfHigherAsync(string key, long value, TimeSpan? expir long difference = value; var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; - _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (k, entry) => + _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (_, existingEntry) => { long? currentValue = null; try { - currentValue = entry.GetValue(); + currentValue = existingEntry.GetValue(); } catch (Exception ex) { @@ -337,15 +353,17 @@ public async Task SetIfHigherAsync(string key, long value, TimeSpan? expir if (currentValue.HasValue && currentValue.Value < value) { difference = value - currentValue.Value; - entry.Value = value; + existingEntry.Value = value; } else + { difference = 0; + } if (expiresIn.HasValue) - entry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return entry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -368,12 +386,12 @@ public async Task SetIfLowerAsync(string key, double value, TimeSpan? ex double difference = value; var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; - _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (k, entry) => + _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (_, existingEntry) => { double? currentValue = null; try { - currentValue = entry.GetValue(); + currentValue = existingEntry.GetValue(); } catch (Exception ex) { @@ -383,15 +401,17 @@ public async Task SetIfLowerAsync(string key, double value, TimeSpan? ex if (currentValue.HasValue && currentValue.Value > value) { difference = currentValue.Value - value; - entry.Value = value; + existingEntry.Value = value; } else + { difference = 0; + } if (expiresIn.HasValue) - entry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return entry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -412,12 +432,12 @@ public async Task SetIfLowerAsync(string key, long value, TimeSpan? expire long difference = value; var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; - _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (k, entry) => + _memory.AddOrUpdate(key, new CacheEntry(value, expiresAt, _shouldClone), (_, existingEntry) => { long? currentValue = null; try { - currentValue = entry.GetValue(); + currentValue = existingEntry.GetValue(); } catch (Exception ex) { @@ -427,15 +447,17 @@ public async Task SetIfLowerAsync(string key, long value, TimeSpan? expire if (currentValue.HasValue && currentValue.Value > value) { difference = currentValue.Value - value; - entry.Value = value; + existingEntry.Value = value; } else + { difference = 0; + } if (expiresIn.HasValue) - entry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return entry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -464,18 +486,18 @@ public async Task ListAddAsync(string key, IEnumerable values, TimeS { var items = new HashSet(new[] { stringValue }); var entry = new CacheEntry(items, expiresAt, _shouldClone); - _memory.AddOrUpdate(key, entry, (k, cacheEntry) => + _memory.AddOrUpdate(key, entry, (_, existingEntry) => { - if (!(cacheEntry.Value is ICollection collection)) + if (!(existingEntry.Value is ICollection collection)) throw new InvalidOperationException($"Unable to add value for key: {key}. Cache value does not contain a set"); collection.Add(stringValue); - cacheEntry.Value = collection; + existingEntry.Value = collection; if (expiresIn.HasValue) - cacheEntry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return cacheEntry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -486,18 +508,18 @@ public async Task ListAddAsync(string key, IEnumerable values, TimeS { var items = new HashSet(values); var entry = new CacheEntry(items, expiresAt, _shouldClone); - _memory.AddOrUpdate(key, entry, (k, cacheEntry) => + _memory.AddOrUpdate(key, entry, (_, existingEntry) => { - if (!(cacheEntry.Value is ICollection collection)) + if (!(existingEntry.Value is ICollection collection)) throw new InvalidOperationException($"Unable to add value for key: {key}. Cache value does not contain a set"); collection.AddRange(items); - cacheEntry.Value = collection; + existingEntry.Value = collection; if (expiresIn.HasValue) - cacheEntry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return cacheEntry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -526,21 +548,21 @@ public Task ListRemoveAsync(string key, IEnumerable values, TimeSpan if (values is string stringValue) { var items = new HashSet(new[] { stringValue }); - _memory.TryUpdate(key, (k, cacheEntry) => + _memory.TryUpdate(key, (_, existingEntry) => { - if (cacheEntry.Value is ICollection collection && collection.Count > 0) + if (existingEntry.Value is ICollection collection && collection.Count > 0) { - foreach (var value in items) + foreach (string value in items) collection.Remove(value); - cacheEntry.Value = collection; + existingEntry.Value = collection; } if (expiresIn.HasValue) - cacheEntry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Removed value from set with cache key: {Key}", key); - return cacheEntry; + return existingEntry; }); return Task.FromResult(items.Count); @@ -548,21 +570,21 @@ public Task ListRemoveAsync(string key, IEnumerable values, TimeSpan else { var items = new HashSet(values); - _memory.TryUpdate(key, (k, cacheEntry) => + _memory.TryUpdate(key, (_, existingEntry) => { - if (cacheEntry.Value is ICollection collection && collection.Count > 0) + if (existingEntry.Value is ICollection collection && collection.Count > 0) { foreach (var value in items) collection.Remove(value); - cacheEntry.Value = collection; + existingEntry.Value = collection; } if (expiresIn.HasValue) - cacheEntry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Removed value from set with cache key: {Key}", key); - return cacheEntry; + return existingEntry; }); return Task.FromResult(items.Count); @@ -596,39 +618,38 @@ private async Task SetInternalAsync(string key, CacheEntry entry, bool add Interlocked.Increment(ref _writes); + bool wasUpdated = true; + bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); if (addOnly) { - if (!_memory.TryAdd(key, entry)) + _memory.AddOrUpdate(key, entry, (_, existingEntry) => { + wasUpdated = false; // check to see if existing entry is expired - bool updated = false; - _memory.TryUpdate(key, (key, existingEntry) => + if (existingEntry.ExpiresAt < SystemClock.UtcNow) { - if (existingEntry.ExpiresAt < SystemClock.UtcNow) - { - updated = true; - return entry; - } + if (isTraceLogLevelEnabled) + _logger.LogTrace("Replacing expired cache key: {Key}", key); - return existingEntry; - }); + wasUpdated = true; + return entry; + } - if (!updated) - return false; - } + return existingEntry; + }); - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Added cache key: {Key}", key); + if (wasUpdated && isTraceLogLevelEnabled) + _logger.LogTrace("Added cache key: {Key}", key); } else { - _memory.AddOrUpdate(key, entry, (k, cacheEntry) => entry); - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Set cache key: {Key}", key); + _memory.AddOrUpdate(key, entry, (_, _) => entry); + if (isTraceLogLevelEnabled) _logger.LogTrace("Set cache key: {Key}", key); } await StartMaintenanceAsync(true).AnyContext(); - - return true; + return wasUpdated; } public async Task SetAllAsync(IDictionary values, TimeSpan? expiresIn = null) @@ -667,19 +688,19 @@ public async Task ReplaceIfEqualAsync(string key, T value, T expected, var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; bool wasExpectedValue = false; - bool success = _memory.TryUpdate(key, (k, cacheEntry) => + bool success = _memory.TryUpdate(key, (_, existingEntry) => { - var currentValue = cacheEntry.GetValue(); + var currentValue = existingEntry.GetValue(); if (currentValue.Equals(expected)) { - cacheEntry.Value = value; + existingEntry.Value = value; wasExpectedValue = true; if (expiresIn.HasValue) - cacheEntry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; } - return cacheEntry; + return existingEntry; }); success = success && wasExpectedValue; @@ -705,12 +726,12 @@ public async Task IncrementAsync(string key, double amount, TimeSpan? ex Interlocked.Increment(ref _writes); var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; - var result = _memory.AddOrUpdate(key, new CacheEntry(amount, expiresAt, _shouldClone), (k, entry) => + var result = _memory.AddOrUpdate(key, new CacheEntry(amount, expiresAt, _shouldClone), (_, existingEntry) => { double? currentValue = null; try { - currentValue = entry.GetValue(); + currentValue = existingEntry.GetValue(); } catch (Exception ex) { @@ -718,14 +739,14 @@ public async Task IncrementAsync(string key, double amount, TimeSpan? ex } if (currentValue.HasValue) - entry.Value = currentValue.Value + amount; + existingEntry.Value = currentValue.Value + amount; else - entry.Value = amount; + existingEntry.Value = amount; if (expiresIn.HasValue) - entry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return entry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -747,12 +768,12 @@ public async Task IncrementAsync(string key, long amount, TimeSpan? expire Interlocked.Increment(ref _writes); var expiresAt = expiresIn.HasValue ? SystemClock.UtcNow.SafeAdd(expiresIn.Value) : DateTime.MaxValue; - var result = _memory.AddOrUpdate(key, new CacheEntry(amount, expiresAt, _shouldClone), (k, entry) => + var result = _memory.AddOrUpdate(key, new CacheEntry(amount, expiresAt, _shouldClone), (_, existingEntry) => { long? currentValue = null; try { - currentValue = entry.GetValue(); + currentValue = existingEntry.GetValue(); } catch (Exception ex) { @@ -760,14 +781,14 @@ public async Task IncrementAsync(string key, long amount, TimeSpan? expire } if (currentValue.HasValue) - entry.Value = currentValue.Value + amount; + existingEntry.Value = currentValue.Value + amount; else - entry.Value = amount; + existingEntry.Value = amount; if (expiresIn.HasValue) - entry.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; - return entry; + return existingEntry; }); await StartMaintenanceAsync().AnyContext(); @@ -889,7 +910,10 @@ private async Task DoMaintenanceAsync() { var expiresAt = kvp.Value.ExpiresAt; if (expiresAt <= utcNow) + { + _logger.LogDebug("DoMaintenance: Removing expired key {Key}", kvp.Key); RemoveExpiredKey(kvp.Key); + } } } catch (Exception ex) @@ -906,7 +930,7 @@ public void Dispose() ItemExpired?.Dispose(); } - private class CacheEntry + private record CacheEntry { private object _cacheValue; private static long _instanceCount; From e45c500fa1d9eab6197d2cb8999715fc3374f246 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Tue, 19 Mar 2024 16:04:06 -0500 Subject: [PATCH 05/28] Changed how frequent maintenance removes expired items. --- src/Foundatio/Caching/InMemoryCacheClient.cs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/Foundatio/Caching/InMemoryCacheClient.cs b/src/Foundatio/Caching/InMemoryCacheClient.cs index 09311779..45e5ea20 100644 --- a/src/Foundatio/Caching/InMemoryCacheClient.cs +++ b/src/Foundatio/Caching/InMemoryCacheClient.cs @@ -883,9 +883,14 @@ private async Task CompactAsync() (string Key, long LastAccessTicks, long InstanceNumber) oldest = (null, Int64.MaxValue, 0); foreach (var kvp in _memory) { - if (kvp.Value.LastAccessTicks < oldest.LastAccessTicks - || (kvp.Value.LastAccessTicks == oldest.LastAccessTicks && kvp.Value.InstanceNumber < oldest.InstanceNumber)) + bool isExpired = kvp.Value.ExpiresAt < SystemClock.UtcNow; + if (isExpired || + kvp.Value.LastAccessTicks < oldest.LastAccessTicks || + (kvp.Value.LastAccessTicks == oldest.LastAccessTicks && kvp.Value.InstanceNumber < oldest.InstanceNumber)) oldest = (kvp.Key, kvp.Value.LastAccessTicks, kvp.Value.InstanceNumber); + + if (isExpired) + break; } _logger.LogDebug("Removing cache entry {Key} due to cache exceeding max item count limit.", oldest); @@ -904,12 +909,15 @@ private async Task DoMaintenanceAsync() var utcNow = SystemClock.UtcNow.AddMilliseconds(50); + // Remove expired items and items that are infrequently accessed as they may be updated by add. + long lastAccessMaximumTicks = utcNow.AddMilliseconds(-300).Ticks; + try { foreach (var kvp in _memory.ToArray()) { - var expiresAt = kvp.Value.ExpiresAt; - if (expiresAt <= utcNow) + bool lastAccessTimeIsInfrequent = kvp.Value.LastAccessTicks < lastAccessMaximumTicks; + if (lastAccessTimeIsInfrequent && kvp.Value.ExpiresAt <= utcNow) { _logger.LogDebug("DoMaintenance: Removing expired key {Key}", kvp.Key); RemoveExpiredKey(kvp.Key); From ed54b82bb2c35bcc973b3232717fd6365f9e3dc3 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Tue, 19 Mar 2024 16:32:31 -0500 Subject: [PATCH 06/28] General cleanup --- src/Foundatio.TestHarness/Locks/LockTestBase.cs | 2 -- src/Foundatio.TestHarness/Queue/QueueTestBase.cs | 3 --- tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs | 2 -- .../Caching/InMemoryHybridCacheClientTests.cs | 1 - tests/Foundatio.Tests/Metrics/DiagnosticsMetricsTests.cs | 1 - tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs | 5 +---- 6 files changed, 1 insertion(+), 13 deletions(-) diff --git a/src/Foundatio.TestHarness/Locks/LockTestBase.cs b/src/Foundatio.TestHarness/Locks/LockTestBase.cs index 69ffca43..2b89fe9f 100644 --- a/src/Foundatio.TestHarness/Locks/LockTestBase.cs +++ b/src/Foundatio.TestHarness/Locks/LockTestBase.cs @@ -55,7 +55,6 @@ public virtual async Task CanAcquireAndReleaseLockAsync() int counter = 0; - bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); await Run.InParallelAsync(25, async i => { bool success = await locker.TryUsingAsync("test", () => @@ -341,7 +340,6 @@ private Task DoLockedWorkAsync(ILockProvider locker) public virtual async Task WillThrottleCallsAsync() { - Log.DefaultMinimumLevel = LogLevel.Trace; Log.SetLogLevel(LogLevel.Information); Log.SetLogLevel(LogLevel.Trace); diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index f0899eee..7ffe9a85 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -637,7 +637,6 @@ public virtual async Task WillNotWaitForItemAsync() public virtual async Task WillWaitForItemAsync() { - Log.DefaultMinimumLevel = LogLevel.Trace; var queue = GetQueue(); if (queue == null) return; @@ -800,7 +799,6 @@ await queue.StartWorkingAsync(w => public virtual async Task WorkItemsWillTimeoutAsync() { - Log.DefaultMinimumLevel = LogLevel.Trace; Log.SetLogLevel("Foundatio.Queues.RedisQueue", LogLevel.Trace); var queue = GetQueue(retryDelay: TimeSpan.Zero, workItemTimeout: TimeSpan.FromMilliseconds(50)); if (queue == null) @@ -1323,7 +1321,6 @@ protected async Task CanDequeueWithLockingImpAsync(CacheLockProvider distributed await queue.DeleteQueueAsync(); await AssertEmptyQueueAsync(queue); - Log.DefaultMinimumLevel = LogLevel.Trace; using var metrics = new InMemoryMetricsClient(new InMemoryMetricsClientOptions { Buffered = false, LoggerFactory = Log }); #pragma warning disable CS0618 // Type or member is obsolete diff --git a/tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs b/tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs index dabaff30..13a18d28 100644 --- a/tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs +++ b/tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs @@ -155,8 +155,6 @@ public override Task CanManageListsAsync() [Fact] public async Task CanSetMaxItems() { - Log.DefaultMinimumLevel = LogLevel.Trace; - // run in tight loop so that the code is warmed up and we can catch timing issues for (int x = 0; x < 5; x++) { diff --git a/tests/Foundatio.Tests/Caching/InMemoryHybridCacheClientTests.cs b/tests/Foundatio.Tests/Caching/InMemoryHybridCacheClientTests.cs index 3003f60c..6e00af01 100644 --- a/tests/Foundatio.Tests/Caching/InMemoryHybridCacheClientTests.cs +++ b/tests/Foundatio.Tests/Caching/InMemoryHybridCacheClientTests.cs @@ -81,7 +81,6 @@ public override Task WillExpireRemoteItems() [Fact(Skip = "Skip because cache invalidation loops on this with 2 in memory cache client instances")] public override Task WillWorkWithSets() { - Log.DefaultMinimumLevel = LogLevel.Trace; return base.WillWorkWithSets(); } diff --git a/tests/Foundatio.Tests/Metrics/DiagnosticsMetricsTests.cs b/tests/Foundatio.Tests/Metrics/DiagnosticsMetricsTests.cs index 92c1c4f6..6bd06c12 100644 --- a/tests/Foundatio.Tests/Metrics/DiagnosticsMetricsTests.cs +++ b/tests/Foundatio.Tests/Metrics/DiagnosticsMetricsTests.cs @@ -16,7 +16,6 @@ public class DiagnosticsMetricsTests : TestWithLoggingBase, IDisposable public DiagnosticsMetricsTests(ITestOutputHelper output) : base(output) { - Log.DefaultMinimumLevel = LogLevel.Trace; _client = new DiagnosticsMetricsClient(o => o.MeterName("Test")); } diff --git a/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs b/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs index 29893357..e3d339f2 100644 --- a/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs +++ b/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs @@ -290,7 +290,6 @@ public override Task VerifyDelayedRetryAttemptsAsync() [Fact] public override Task CanHandleAutoAbandonInWorker() { - Log.DefaultMinimumLevel = LogLevel.Trace; return base.CanHandleAutoAbandonInWorker(); } @@ -393,7 +392,7 @@ public virtual async Task CompleteOnAutoAbandonedHandledProperly_Issue239() // start handling items await queue.StartWorkingAsync(async (item) => { - // we want to wait for maintainance to be performed and auto abandon our item, we don't have any way for waiting in IQueue so we'll settle for a delay + // we want to wait for maintenance to be performed and auto abandon our item, we don't have any way for waiting in IQueue so we'll settle for a delay if (item.Value.Data == "Delay") { await Task.Delay(TimeSpan.FromSeconds(1)); @@ -426,8 +425,6 @@ await queue.StartWorkingAsync(async (item) => // one option to fix this issue is surrounding the AbandonAsync call in StartWorkingImpl exception handler in inner try/catch block timedout = (await Task.WhenAny(taskCompletionSource.Task, Task.Delay(TimeSpan.FromSeconds(2)))) != taskCompletionSource.Task; Assert.False(timedout); - - return; } #endregion From aa03c0c7740a66dd9697471866bd843a2688bcfd Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Tue, 19 Mar 2024 16:32:50 -0500 Subject: [PATCH 07/28] Marked code as obsolete as there is a better way to run things in parallel. --- src/Foundatio/Utility/Run.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Foundatio/Utility/Run.cs b/src/Foundatio/Utility/Run.cs index 9283bbfd..f3b7ce1f 100644 --- a/src/Foundatio/Utility/Run.cs +++ b/src/Foundatio/Utility/Run.cs @@ -25,6 +25,7 @@ public static Task DelayedAsync(TimeSpan delay, Func action, CancellationT }, cancellationToken); } + [Obsolete("Use Parallel.ForEachAsync")] public static Task InParallelAsync(int iterations, Func work) { return Task.WhenAll(Enumerable.Range(1, iterations).Select(i => Task.Run(() => work(i)))); From 969caf351febe29c05569dca44b1e08a019c6024 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Tue, 19 Mar 2024 16:33:06 -0500 Subject: [PATCH 08/28] Fixed failing unit test --- src/Foundatio/Caching/InMemoryCacheClient.cs | 32 +++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/Foundatio/Caching/InMemoryCacheClient.cs b/src/Foundatio/Caching/InMemoryCacheClient.cs index 45e5ea20..4e6b84b7 100644 --- a/src/Foundatio/Caching/InMemoryCacheClient.cs +++ b/src/Foundatio/Caching/InMemoryCacheClient.cs @@ -195,8 +195,7 @@ public Task RemoveByPrefixAsync(string prefix) internal void RemoveExpiredKey(string key, bool sendNotification = true) { - // Consideration: We could reduce the amount of calls to this by updating the key and only having maintenance run this. - // Consideration: We could also only remove expired items after a last access period determined when any property on the model is updated. + // Consideration: We could reduce the amount of calls to this by updating ExpiresAt and only having maintenance remove keys. if (_memory.TryGetValue(key, out var existingEntry) && existingEntry.ExpiresAt < SystemClock.UtcNow) { if (_memory.TryRemove(key, out var removedEntry)) @@ -223,7 +222,6 @@ public Task> GetAsync(string key) if (existingEntry.ExpiresAt < SystemClock.UtcNow) { - RemoveExpiredKey(key); Interlocked.Increment(ref _misses); return Task.FromResult(CacheValue.NoValue); } @@ -801,16 +799,19 @@ public Task ExistsAsync(string key) if (String.IsNullOrEmpty(key)) return Task.FromException(new ArgumentNullException(nameof(key), "Key cannot be null or empty.")); - if (!_memory.TryGetValue(key, out var cacheEntry)) + if (!_memory.TryGetValue(key, out var existingEntry)) { Interlocked.Increment(ref _misses); return Task.FromResult(false); } - Interlocked.Increment(ref _hits); - if (cacheEntry.ExpiresAt < SystemClock.UtcNow) + if (existingEntry.ExpiresAt < SystemClock.UtcNow) + { + Interlocked.Increment(ref _misses); return Task.FromResult(false); + } + Interlocked.Increment(ref _hits); return Task.FromResult(true); } @@ -819,19 +820,20 @@ public Task ExistsAsync(string key) if (String.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key), "Key cannot be null or empty"); - if (!_memory.TryGetValue(key, out var value) || value.ExpiresAt == DateTime.MaxValue) + if (!_memory.TryGetValue(key, out var existingEntry) || existingEntry.ExpiresAt == DateTime.MaxValue) { Interlocked.Increment(ref _misses); return Task.FromResult(null); } - Interlocked.Increment(ref _hits); - if (value.ExpiresAt >= SystemClock.UtcNow) - return Task.FromResult(value.ExpiresAt.Subtract(SystemClock.UtcNow)); - - RemoveExpiredKey(key); + if (existingEntry.ExpiresAt < SystemClock.UtcNow || existingEntry.ExpiresAt == DateTime.MaxValue) + { + Interlocked.Increment(ref _misses); + return Task.FromResult(null); + } - return Task.FromResult(null); + Interlocked.Increment(ref _hits); + return Task.FromResult(existingEntry.ExpiresAt.Subtract(SystemClock.UtcNow)); } public async Task SetExpirationAsync(string key, TimeSpan expiresIn) @@ -847,9 +849,9 @@ public async Task SetExpirationAsync(string key, TimeSpan expiresIn) } Interlocked.Increment(ref _writes); - if (_memory.TryGetValue(key, out var value)) + if (_memory.TryGetValue(key, out var existingEntry)) { - value.ExpiresAt = expiresAt; + existingEntry.ExpiresAt = expiresAt; await StartMaintenanceAsync().AnyContext(); } } From 16acc7d368e904ecb915d11a80239ca0b66f987c Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Tue, 19 Mar 2024 16:49:47 -0500 Subject: [PATCH 09/28] Fixed issue from self review --- src/Foundatio/Caching/InMemoryCacheClient.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Foundatio/Caching/InMemoryCacheClient.cs b/src/Foundatio/Caching/InMemoryCacheClient.cs index 4e6b84b7..2c16ec22 100644 --- a/src/Foundatio/Caching/InMemoryCacheClient.cs +++ b/src/Foundatio/Caching/InMemoryCacheClient.cs @@ -820,7 +820,7 @@ public Task ExistsAsync(string key) if (String.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key), "Key cannot be null or empty"); - if (!_memory.TryGetValue(key, out var existingEntry) || existingEntry.ExpiresAt == DateTime.MaxValue) + if (!_memory.TryGetValue(key, out var existingEntry)) { Interlocked.Increment(ref _misses); return Task.FromResult(null); From be0d1fd33c3c80863c9153acc3408d687518a078 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Tue, 19 Mar 2024 20:56:23 -0500 Subject: [PATCH 10/28] Pass logger factory in --- .../Jobs/ScheduledJobService.cs | 2 +- .../Caching/HybridCacheClientTests.cs | 12 ++++++++---- src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs | 2 +- src/Foundatio.TestHarness/Queue/QueueTestBase.cs | 2 +- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs index e4ec631b..e724b7ab 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs @@ -24,7 +24,7 @@ public class ScheduledJobService : BackgroundService, IJobStatus public ScheduledJobService(IServiceProvider serviceProvider, ILoggerFactory loggerFactory) { _serviceProvider = serviceProvider; - var cacheClient = serviceProvider.GetService() ?? new InMemoryCacheClient(); + var cacheClient = serviceProvider.GetService() ?? new InMemoryCacheClient(o => o.LoggerFactory(loggerFactory)); _jobs = new List(serviceProvider.GetServices().Select(j => new ScheduledJobRunner(j.JobFactory, j.Schedule, cacheClient, loggerFactory))); var lifetime = serviceProvider.GetService(); diff --git a/src/Foundatio.TestHarness/Caching/HybridCacheClientTests.cs b/src/Foundatio.TestHarness/Caching/HybridCacheClientTests.cs index 4382a207..9e85bf70 100644 --- a/src/Foundatio.TestHarness/Caching/HybridCacheClientTests.cs +++ b/src/Foundatio.TestHarness/Caching/HybridCacheClientTests.cs @@ -13,14 +13,18 @@ namespace Foundatio.Tests.Caching; public class HybridCacheClientTests : CacheClientTestsBase, IDisposable { - protected readonly ICacheClient _distributedCache = new InMemoryCacheClient(new InMemoryCacheClientOptions()); - protected readonly IMessageBus _messageBus = new InMemoryMessageBus(new InMemoryMessageBusOptions()); + protected readonly ICacheClient _distributedCache; + protected readonly IMessageBus _messageBus; - public HybridCacheClientTests(ITestOutputHelper output) : base(output) { } + public HybridCacheClientTests(ITestOutputHelper output) : base(output) + { + _distributedCache = new InMemoryCacheClient(o => o.LoggerFactory(Log)); + _messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log)); + } protected override ICacheClient GetCacheClient(bool shouldThrowOnSerializationError = true) { - return new HybridCacheClient(_distributedCache, _messageBus, new InMemoryCacheClientOptions { CloneValues = true, ShouldThrowOnSerializationError = shouldThrowOnSerializationError }, Log); + return new HybridCacheClient(_distributedCache, _messageBus, new InMemoryCacheClientOptions { CloneValues = true, ShouldThrowOnSerializationError = shouldThrowOnSerializationError, LoggerFactory = Log }, Log); } [Fact] diff --git a/src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs b/src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs index e7974982..b87ed45c 100644 --- a/src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs +++ b/src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs @@ -109,7 +109,7 @@ public virtual async Task CanRunQueueJobWithLockFailAsync() }); }); - var lockProvider = new ThrottlingLockProvider(new InMemoryCacheClient(new InMemoryCacheClientOptions()), allowedLockCount, TimeSpan.FromDays(1), Log); + var lockProvider = new ThrottlingLockProvider(new InMemoryCacheClient(o => o.LoggerFactory(Log)), allowedLockCount, TimeSpan.FromDays(1), Log); var job = new SampleQueueJobWithLocking(queue, null, lockProvider, Log); await SystemClock.SleepAsync(10); _logger.LogInformation("Starting RunUntilEmptyAsync"); diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index 7ffe9a85..dd36c6bf 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -265,7 +265,7 @@ public virtual async Task CanDiscardDuplicateQueueEntriesAsync() { await queue.DeleteQueueAsync(); await AssertEmptyQueueAsync(queue); - queue.AttachBehavior(new DuplicateDetectionQueueBehavior(new InMemoryCacheClient(), Log)); + queue.AttachBehavior(new DuplicateDetectionQueueBehavior(new InMemoryCacheClient(o => o.LoggerFactory(Log)), Log)); await queue.EnqueueAsync(new SimpleWorkItem { From eb3d94017967a68b67d92cc0264a6419926e7051 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Tue, 19 Mar 2024 21:07:30 -0500 Subject: [PATCH 11/28] Ensure queue start working methods are shut down properly. --- .../Queue/QueueTestBase.cs | 34 ++++++++++++++----- .../Queue/InMemoryQueueTests.cs | 6 ++-- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index dd36c6bf..66ad6dba 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -358,6 +358,8 @@ public virtual Task VerifyDelayedRetryAttemptsAsync() private async Task VerifyRetryAttemptsImplAsync(IQueue queue, int retryCount, TimeSpan waitTime) { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + try { await queue.DeleteQueueAsync(); @@ -367,6 +369,7 @@ private async Task VerifyRetryAttemptsImplAsync(IQueue queue, in var countdown = new AsyncCountdownEvent(retryCount + 1); int attempts = 0; + await queue.StartWorkingAsync(async w => { Interlocked.Increment(ref attempts); @@ -380,7 +383,7 @@ await queue.StartWorkingAsync(async w => countdown.Signal(); _logger.LogInformation("Finished Attempt {Attempt} to work on queue item, Metadata Attempts: {MetadataAttempts}", attempts, queueEntryMetadata.Attempts); - }); + }, cancellationToken: cancellationTokenSource.Token); await queue.EnqueueAsync(new SimpleWorkItem { @@ -410,6 +413,7 @@ await queue.EnqueueAsync(new SimpleWorkItem } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -716,6 +720,7 @@ public virtual async Task CanUseQueueWorkerAsync() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -727,7 +732,7 @@ await queue.StartWorkingAsync(async w => Assert.Equal("Hello", w.Value.Data); await w.CompleteAsync(); resetEvent.Set(); - }); + }, cancellationToken: cancellationTokenSource.Token); await queue.EnqueueAsync(new SimpleWorkItem { @@ -745,6 +750,7 @@ await queue.EnqueueAsync(new SimpleWorkItem } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -755,6 +761,7 @@ public virtual async Task CanHandleErrorInWorkerAsync() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -770,7 +777,7 @@ await queue.StartWorkingAsync(w => _logger.LogDebug("WorkAction"); Assert.Equal("Hello", w.Value.Data); throw new Exception(); - }); + }, cancellationToken: cancellationTokenSource.Token); var resetEvent = new AsyncManualResetEvent(false); using (queue.Abandoned.AddSyncHandler((o, args) => resetEvent.Set())) @@ -793,6 +800,7 @@ await queue.StartWorkingAsync(w => } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -899,6 +907,7 @@ public virtual async Task CanAutoCompleteWorkerAsync() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -909,7 +918,7 @@ await queue.StartWorkingAsync(w => { Assert.Equal("Hello", w.Value.Data); return Task.CompletedTask; - }, true); + }, true, cancellationTokenSource.Token); using (queue.Completed.AddSyncHandler((s, e) => { resetEvent.Set(); })) { @@ -929,6 +938,7 @@ await queue.StartWorkingAsync(w => } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -939,6 +949,7 @@ public virtual async Task CanHaveMultipleQueueInstancesAsync() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -956,7 +967,7 @@ public virtual async Task CanHaveMultipleQueueInstancesAsync() { var q = GetQueue(retries: 0, retryDelay: TimeSpan.Zero); if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Queue Id: {Id}, I: {Instance}", q.QueueId, i); - await q.StartWorkingAsync(w => DoWorkAsync(w, countdown, info)); + await q.StartWorkingAsync(w => DoWorkAsync(w, countdown, info), cancellationToken: cancellationTokenSource.Token); workers.Add(q); } @@ -1011,6 +1022,7 @@ await Run.InParallelAsync(workItemCount, async i => } finally { + await cancellationTokenSource.CancelAsync(); foreach (var q in workers) await CleanupQueueAsync(q); } @@ -1316,6 +1328,7 @@ protected async Task CanDequeueWithLockingImpAsync(CacheLockProvider distributed if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -1340,7 +1353,7 @@ await queue.StartWorkingAsync(async w => await w.CompleteAsync(); resetEvent.Set(); - }); + }, cancellationToken: cancellationTokenSource.Token); await queue.EnqueueAsync(new SimpleWorkItem { Data = "Hello" }); await resetEvent.WaitAsync(TimeSpan.FromSeconds(5)); @@ -1355,6 +1368,7 @@ await queue.StartWorkingAsync(async w => } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } @@ -1374,6 +1388,7 @@ protected async Task CanHaveMultipleQueueInstancesWithLockingImplAsync(CacheLock if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -1409,7 +1424,7 @@ await q.StartWorkingAsync(async w => countdown.Signal(); if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("[{Instance}] Signaled countdown: {Id}", instanceCount, w.Id); - }); + }, cancellationToken: cancellationTokenSource.Token); workers.Add(q); } @@ -1453,6 +1468,7 @@ await Run.InParallelAsync(workItemCount, async i => } finally { + await cancellationTokenSource.CancelAsync(); foreach (var q in workers) await CleanupQueueAsync(q); } @@ -1571,6 +1587,7 @@ public virtual async Task CanHandleAutoAbandonInWorker() if (queue == null) return; + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); try { await queue.DeleteQueueAsync(); @@ -1607,7 +1624,7 @@ await queue.StartWorkingAsync(async (item) => } successEvent.Set(); - }); + }, cancellationToken: cancellationTokenSource.Token); await queue.EnqueueAsync(new SimpleWorkItem() { Data = "Delay" }); await queue.EnqueueAsync(new SimpleWorkItem() { Data = "No Delay" }); @@ -1617,6 +1634,7 @@ await queue.StartWorkingAsync(async (item) => } finally { + await cancellationTokenSource.CancelAsync(); await CleanupQueueAsync(queue); } } diff --git a/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs b/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs index e3d339f2..7f15b561 100644 --- a/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs +++ b/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs @@ -1,6 +1,7 @@ -using System; +using System; using System.Collections.Generic; using System.Diagnostics; +using System.Threading; using System.Threading.Tasks; using Foundatio.Queues; using Foundatio.Utility; @@ -385,6 +386,7 @@ public virtual async Task CompleteOnAutoAbandonedHandledProperly_Issue239() { // create queue with short work item timeout so it will be auto abandoned var queue = new InMemoryQueue_Issue239(Log); + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); // completion source to wait for CompleteAsync call before the assert var taskCompletionSource = new TaskCompletionSource(); @@ -409,7 +411,7 @@ await queue.StartWorkingAsync(async (item) => // infrastructure handles user exception incorrectly taskCompletionSource.SetResult(true); } - }); + }, cancellationToken: cancellationTokenSource.Token); // enqueue item which will be processed after it's auto abandoned await queue.EnqueueAsync(new SimpleWorkItem() { Data = "Delay" }); From 88a693f82d9311cc50fa1f0fd1efe72c6e89ef86 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 11:55:46 -0500 Subject: [PATCH 12/28] Fixed deprecations and other warnings --- .../Caching/CacheClientTestsBase.cs | 3 +- .../Jobs/JobQueueTestsBase.cs | 12 ++--- .../Locks/LockTestBase.cs | 2 +- .../Messaging/MessageBusTestBase.cs | 33 ++++++------- .../Queue/QueueTestBase.cs | 10 ++-- .../Storage/FileStorageTestsBase.cs | 8 ++-- src/Foundatio/Jobs/JobWithLockBase.cs | 2 +- src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs | 2 +- src/Foundatio/Queues/InMemoryQueue.cs | 48 +++++++++---------- tests/Foundatio.Tests/Jobs/JobTests.cs | 2 +- 10 files changed, 62 insertions(+), 60 deletions(-) diff --git a/src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs b/src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs index 928eea88..75ee4ad1 100644 --- a/src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs +++ b/src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs @@ -210,7 +210,8 @@ public virtual async Task CanAddConcurrentlyAsync() string cacheKey = Guid.NewGuid().ToString("N").Substring(10); long adds = 0; - await Run.InParallelAsync(5, async i => + + await Parallel.ForEachAsync(Enumerable.Range(1, 5), async (i, _) => { if (await cache.AddAsync(cacheKey, i, TimeSpan.FromMinutes(1))) Interlocked.Increment(ref adds); diff --git a/src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs b/src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs index b87ed45c..979b37b3 100644 --- a/src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs +++ b/src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs @@ -74,7 +74,7 @@ public virtual async Task CanRunQueueJobAsync() using var queue = GetSampleWorkItemQueue(retries: 0, retryDelay: TimeSpan.Zero); await queue.DeleteQueueAsync(); - var enqueueTask = Run.InParallelAsync(workItemCount, index => queue.EnqueueAsync(new SampleQueueWorkItem + var enqueueTask = Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), async (index, _) => await queue.EnqueueAsync(new SampleQueueWorkItem { Created = SystemClock.UtcNow, Path = "somepath" + index @@ -99,10 +99,10 @@ public virtual async Task CanRunQueueJobWithLockFailAsync() using var queue = GetSampleWorkItemQueue(retries: 3, retryDelay: TimeSpan.Zero); await queue.DeleteQueueAsync(); - var enqueueTask = Run.InParallelAsync(workItemCount, index => + var enqueueTask = Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), async (index, _) => { _logger.LogInformation($"Enqueue #{index}"); - return queue.EnqueueAsync(new SampleQueueWorkItem + await queue.EnqueueAsync(new SampleQueueWorkItem { Created = SystemClock.UtcNow, Path = "somepath" + index @@ -147,10 +147,10 @@ public virtual async Task CanRunMultipleQueueJobsAsync() } _logger.LogInformation("Done setting up queues"); - var enqueueTask = Run.InParallelAsync(workItemCount, index => + var enqueueTask = Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), async (_, _) => { var queue = queues[RandomData.GetInt(0, jobCount - 1)]; - return queue.EnqueueAsync(new SampleQueueWorkItem + await queue.EnqueueAsync(new SampleQueueWorkItem { Created = SystemClock.UtcNow, Path = RandomData.GetString() @@ -159,7 +159,7 @@ public virtual async Task CanRunMultipleQueueJobsAsync() _logger.LogInformation("Done enqueueing"); var cancellationTokenSource = new CancellationTokenSource(); - await Run.InParallelAsync(jobCount, async index => + await Parallel.ForEachAsync(Enumerable.Range(1, jobCount), async (index, _) => { var queue = queues[index - 1]; var job = new SampleQueueWithRandomErrorsAndAbandonsJob(queue, metrics, Log); diff --git a/src/Foundatio.TestHarness/Locks/LockTestBase.cs b/src/Foundatio.TestHarness/Locks/LockTestBase.cs index 2b89fe9f..903449d6 100644 --- a/src/Foundatio.TestHarness/Locks/LockTestBase.cs +++ b/src/Foundatio.TestHarness/Locks/LockTestBase.cs @@ -55,7 +55,7 @@ public virtual async Task CanAcquireAndReleaseLockAsync() int counter = 0; - await Run.InParallelAsync(25, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, 25), async (_, _) => { bool success = await locker.TryUsingAsync("test", () => { diff --git a/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs b/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs index fc68da43..e60abac8 100644 --- a/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs +++ b/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Exceptionless; @@ -248,7 +249,7 @@ await messageBus.SubscribeAsync(msg => }); var sw = Stopwatch.StartNew(); - await Run.InParallelAsync(numConcurrentMessages, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, numConcurrentMessages), async (i, _) => { await messageBus.PublishAsync(new SimpleMessageA { @@ -282,16 +283,16 @@ public virtual async Task CanSubscribeConcurrentlyAsync() try { var countdown = new AsyncCountdownEvent(iterations * 10); - await Run.InParallelAsync(10, i => + await Parallel.ForEachAsync(Enumerable.Range(1, 10), async (_, ct) => { - return messageBus.SubscribeAsync(msg => + await messageBus.SubscribeAsync(msg => { Assert.Equal("Hello", msg.Data); countdown.Signal(); - }); + }, cancellationToken: ct); }); - await Run.InParallelAsync(iterations, i => messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" })); + await Parallel.ForEachAsync(Enumerable.Range(1, iterations), async (_, _) => await messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" })); await countdown.WaitAsync(TimeSpan.FromSeconds(2)); Assert.Equal(0, countdown.CurrentCount); } @@ -312,27 +313,27 @@ public virtual async Task CanReceiveMessagesConcurrentlyAsync() try { var countdown = new AsyncCountdownEvent(iterations * 10); - await Run.InParallelAsync(10, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, 10), async (_, ct) => { var bus = GetMessageBus(); await bus.SubscribeAsync(msg => { Assert.Equal("Hello", msg.Data); countdown.Signal(); - }); + }, cancellationToken: ct); messageBuses.Add(bus); }); - var subscribe = Run.InParallelAsync(iterations, - i => - { - SystemClock.Sleep(RandomData.GetInt(0, 10)); - return messageBuses.Random().SubscribeAsync(msg => Task.CompletedTask); - }); - var publish = Run.InParallelAsync(iterations + 3, i => + var subscribe = Parallel.ForEachAsync(Enumerable.Range(1, iterations), async (i, ct) => { - return i switch + await SystemClock.SleepAsync(RandomData.GetInt(0, 10), ct); + await messageBuses.Random().SubscribeAsync(msg => Task.CompletedTask, cancellationToken: ct); + }); + + var publish = Parallel.ForEachAsync(Enumerable.Range(1, iterations + 3), async (i, _) => + { + await (i switch { 1 => messageBus.PublishAsync(new DerivedSimpleMessageA { Data = "Hello" }), 2 => messageBus.PublishAsync(new Derived2SimpleMessageA { Data = "Hello" }), @@ -348,7 +349,7 @@ await bus.SubscribeAsync(msg => iterations + 2 => messageBus.PublishAsync(new SimpleMessageC { Data = "Hello" }), iterations + 3 => messageBus.PublishAsync(new SimpleMessageB { Data = "Hello" }), _ => messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" }), - }; + }); }); await Task.WhenAll(subscribe, publish); diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index 66ad6dba..7d6ab395 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -971,7 +971,7 @@ public virtual async Task CanHaveMultipleQueueInstancesAsync() workers.Add(q); } - await Run.InParallelAsync(workItemCount, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), cancellationTokenSource.Token, async (i, _) => { string id = await queue.EnqueueAsync(new SimpleWorkItem { @@ -981,8 +981,8 @@ await Run.InParallelAsync(workItemCount, async i => if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Enqueued Index: {Instance} Id: {Id}", i, id); }); - await countdown.WaitAsync(); - await SystemClock.SleepAsync(50); + await countdown.WaitAsync(cancellationTokenSource.Token); + await SystemClock.SleepAsync(50, cancellationTokenSource.Token); if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("Work Info Stats: Completed: {Completed} Abandoned: {Abandoned} Error: {Errors}", info.CompletedCount, info.AbandonCount, info.ErrorCount); @@ -1428,7 +1428,7 @@ await q.StartWorkingAsync(async w => workers.Add(q); } - await Run.InParallelAsync(workItemCount, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), cancellationTokenSource.Token, async (i, _) => { string id = await queue.EnqueueAsync(new SimpleWorkItem { @@ -1439,7 +1439,7 @@ await Run.InParallelAsync(workItemCount, async i => }); await countdown.WaitAsync(TimeSpan.FromSeconds(5)); - await SystemClock.SleepAsync(50); + await SystemClock.SleepAsync(50, cancellationTokenSource.Token); if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Completed: {Completed} Abandoned: {Abandoned} Error: {Errors}", info.CompletedCount, info.AbandonCount, info.ErrorCount); if (_logger.IsEnabled(LogLevel.Information)) diff --git a/src/Foundatio.TestHarness/Storage/FileStorageTestsBase.cs b/src/Foundatio.TestHarness/Storage/FileStorageTestsBase.cs index ccc66893..a39e4a51 100644 --- a/src/Foundatio.TestHarness/Storage/FileStorageTestsBase.cs +++ b/src/Foundatio.TestHarness/Storage/FileStorageTestsBase.cs @@ -559,7 +559,7 @@ public virtual async Task CanConcurrentlyManageFilesAsync() var info = await storage.GetFileInfoAsync("nope"); Assert.Null(info); - await Run.InParallelAsync(10, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, 10), async (i, ct) => { var ev = new PostInfo { @@ -573,13 +573,13 @@ await Run.InParallelAsync(10, async i => UserAgent = "test" }; - await storage.SaveObjectAsync(Path.Combine(queueFolder, i + ".json"), ev); - queueItems.Add(i); + await storage.SaveObjectAsync(Path.Combine(queueFolder, i + ".json"), ev, cancellationToken: ct); + queueItems.Add(i, ct); }); Assert.Equal(10, (await storage.GetFileListAsync()).Count); - await Run.InParallelAsync(10, async i => + await Parallel.ForEachAsync(Enumerable.Range(1, 10), async (_, _) => { string path = Path.Combine(queueFolder, queueItems.Random() + ".json"); var eventPost = await storage.GetEventPostAndSetActiveAsync(Path.Combine(queueFolder, RandomData.GetInt(0, 25) + ".json"), _logger); diff --git a/src/Foundatio/Jobs/JobWithLockBase.cs b/src/Foundatio/Jobs/JobWithLockBase.cs index d6f0a8b9..fa7a21fd 100644 --- a/src/Foundatio/Jobs/JobWithLockBase.cs +++ b/src/Foundatio/Jobs/JobWithLockBase.cs @@ -20,7 +20,7 @@ public JobWithLockBase(ILoggerFactory loggerFactory = null) public string JobId { get; } = Guid.NewGuid().ToString("N").Substring(0, 10); ILogger IHaveLogger.Logger => _logger; - public async virtual Task RunAsync(CancellationToken cancellationToken = default) + public virtual async Task RunAsync(CancellationToken cancellationToken = default) { var lockValue = await GetLockAsync(cancellationToken).AnyContext(); if (lockValue == null) diff --git a/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs b/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs index 62c508b8..7c826b1a 100644 --- a/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs +++ b/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs @@ -32,7 +32,7 @@ public WorkItemJob(IQueue queue, IMessagePublisher publisher, Work IQueue IQueueJob.Queue => _queue; ILogger IHaveLogger.Logger => _logger; - public async virtual Task RunAsync(CancellationToken cancellationToken = default) + public virtual async Task RunAsync(CancellationToken cancellationToken = default) { IQueueEntry queueEntry; diff --git a/src/Foundatio/Queues/InMemoryQueue.cs b/src/Foundatio/Queues/InMemoryQueue.cs index 8a37aa4b..1266ec32 100644 --- a/src/Foundatio/Queues/InMemoryQueue.cs +++ b/src/Foundatio/Queues/InMemoryQueue.cs @@ -237,26 +237,26 @@ protected override async Task> DequeueImplAsync(CancellationToken return entry; } - public override async Task RenewLockAsync(IQueueEntry entry) + public override async Task RenewLockAsync(IQueueEntry queueEntry) { - _logger.LogDebug("Queue {Name} renew lock item: {Id}", _options.Name, entry.Id); + _logger.LogDebug("Queue {Name} renew lock item: {Id}", _options.Name, queueEntry.Id); - if (!_dequeued.TryGetValue(entry.Id, out var targetEntry)) + if (!_dequeued.TryGetValue(queueEntry.Id, out var targetEntry)) return; targetEntry.RenewedTimeUtc = SystemClock.UtcNow; - await OnLockRenewedAsync(entry).AnyContext(); - _logger.LogTrace("Renew lock done: {Id}", entry.Id); + await OnLockRenewedAsync(queueEntry).AnyContext(); + _logger.LogTrace("Renew lock done: {Id}", queueEntry.Id); } - public override async Task CompleteAsync(IQueueEntry entry) + public override async Task CompleteAsync(IQueueEntry queueEntry) { - _logger.LogDebug("Queue {Name} complete item: {Id}", _options.Name, entry.Id); - if (entry.IsAbandoned || entry.IsCompleted) + _logger.LogDebug("Queue {Name} complete item: {Id}", _options.Name, queueEntry.Id); + if (queueEntry.IsAbandoned || queueEntry.IsCompleted) throw new InvalidOperationException("Queue entry has already been completed or abandoned"); - if (!_dequeued.TryRemove(entry.Id, out var info) || info == null) + if (!_dequeued.TryRemove(queueEntry.Id, out var info) || info == null) throw new Exception("Unable to remove item from the dequeued list"); if (_options.CompletedEntryRetentionLimit > 0) @@ -266,42 +266,42 @@ public override async Task CompleteAsync(IQueueEntry entry) _completedQueue.TryDequeue(out _); } - entry.MarkCompleted(); + queueEntry.MarkCompleted(); Interlocked.Increment(ref _completedCount); - await OnCompletedAsync(entry).AnyContext(); - _logger.LogTrace("Complete done: {Id}", entry.Id); + await OnCompletedAsync(queueEntry).AnyContext(); + _logger.LogTrace("Complete done: {Id}", queueEntry.Id); } - public override async Task AbandonAsync(IQueueEntry entry) + public override async Task AbandonAsync(IQueueEntry queueEntry) { - _logger.LogDebug("Queue {Name}:{QueueId} abandon item: {Id}", _options.Name, QueueId, entry.Id); + _logger.LogDebug("Queue {Name}:{QueueId} abandon item: {Id}", _options.Name, QueueId, queueEntry.Id); - if (entry.IsAbandoned || entry.IsCompleted) + if (queueEntry.IsAbandoned || queueEntry.IsCompleted) throw new InvalidOperationException("Queue entry has already been completed or abandoned"); - if (!_dequeued.TryRemove(entry.Id, out var targetEntry) || targetEntry == null) + if (!_dequeued.TryRemove(queueEntry.Id, out var targetEntry) || targetEntry == null) { foreach (var kvp in _queue) { - if (kvp.Id == entry.Id) + if (kvp.Id == queueEntry.Id) throw new Exception("Unable to remove item from the dequeued list (item is in queue)"); } foreach (var kvp in _deadletterQueue) { - if (kvp.Id == entry.Id) + if (kvp.Id == queueEntry.Id) throw new Exception("Unable to remove item from the dequeued list (item is in dead letter)"); } throw new Exception("Unable to remove item from the dequeued list"); } - entry.MarkAbandoned(); + queueEntry.MarkAbandoned(); Interlocked.Increment(ref _abandonedCount); - _logger.LogTrace("Abandon complete: {Id}", entry.Id); + _logger.LogTrace("Abandon complete: {Id}", queueEntry.Id); try { - await OnAbandonedAsync(entry).AnyContext(); + await OnAbandonedAsync(queueEntry).AnyContext(); } finally { @@ -309,18 +309,18 @@ public override async Task AbandonAsync(IQueueEntry entry) { if (_options.RetryDelay > TimeSpan.Zero) { - _logger.LogTrace("Adding item to wait list for future retry: {Id}", entry.Id); + _logger.LogTrace("Adding item to wait list for future retry: {Id}", queueEntry.Id); var unawaited = Run.DelayedAsync(GetRetryDelay(targetEntry.Attempts), () => RetryAsync(targetEntry), _queueDisposedCancellationTokenSource.Token); } else { - _logger.LogTrace("Adding item back to queue for retry: {Id}", entry.Id); + _logger.LogTrace("Adding item back to queue for retry: {Id}", queueEntry.Id); _ = Task.Run(() => RetryAsync(targetEntry)); } } else { - _logger.LogTrace("Exceeded retry limit moving to deadletter: {Id}", entry.Id); + _logger.LogTrace("Exceeded retry limit moving to deadletter: {Id}", queueEntry.Id); _deadletterQueue.Enqueue(targetEntry); } } diff --git a/tests/Foundatio.Tests/Jobs/JobTests.cs b/tests/Foundatio.Tests/Jobs/JobTests.cs index a82cb52f..e92cb4dc 100644 --- a/tests/Foundatio.Tests/Jobs/JobTests.cs +++ b/tests/Foundatio.Tests/Jobs/JobTests.cs @@ -128,7 +128,7 @@ public async Task CanRunJobsWithLocks() await job.RunContinuousAsync(iterationLimit: 2); Assert.Equal(3, job.RunCount); - await Run.InParallelAsync(2, i => job.RunAsync()); + await Parallel.ForEachAsync(Enumerable.Range(1, 2), async (_, ct) => await job.RunAsync(ct)); Assert.Equal(4, job.RunCount); } From d5303a2cc8567247a620d3972d86083183c13531 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 11:56:11 -0500 Subject: [PATCH 13/28] Fixed flakey test --- src/Foundatio.TestHarness/Queue/QueueTestBase.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index 7d6ab395..d1bdf5eb 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -1608,8 +1608,9 @@ await queue.StartWorkingAsync(async (item) => break; stats = await queue.GetQueueStatsAsync(); - } while (sw.Elapsed < TimeSpan.FromSeconds(10)); + } while (sw.Elapsed < TimeSpan.FromSeconds(5)); + _logger.LogDebug("Asserting abandon count is 1, {Actual}", stats.Abandoned); Assert.Equal(1, stats.Abandoned); } @@ -1617,8 +1618,9 @@ await queue.StartWorkingAsync(async (item) => { await item.CompleteAsync(); } - catch + catch (Exception ex) { + _logger.LogDebug(ex, "Error completing item: {Message}", ex.Message); errorEvent.Set(); throw; } From 50423c4f1d2848028571e6872248d3611afafe4e Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 11:59:20 -0500 Subject: [PATCH 14/28] Updated minver --- .github/workflows/build-workflow.yml | 2 +- build/common.props | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-workflow.yml b/.github/workflows/build-workflow.yml index 2c5919dd..ed096051 100644 --- a/.github/workflows/build-workflow.yml +++ b/.github/workflows/build-workflow.yml @@ -42,7 +42,7 @@ jobs: run: "echo ref: ${{github.ref}} event: ${{github.event_name}}" - name: Build Version run: | - dotnet tool install --global minver-cli --version 4.3.0 + dotnet tool install --global minver-cli --version 5.0.0 version=$(minver --tag-prefix v) echo "MINVERVERSIONOVERRIDE=$version" >> $GITHUB_ENV echo "### Version: $version" >> $GITHUB_STEP_SUMMARY diff --git a/build/common.props b/build/common.props index 471c9ec5..0fed1d90 100644 --- a/build/common.props +++ b/build/common.props @@ -38,7 +38,7 @@ - + From fb90e45feb0739d9f00fc0978575041dacf6cbb6 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 13:42:07 -0500 Subject: [PATCH 15/28] Cleanup test options --- src/Foundatio.TestHarness/Jobs/ThrottledJob.cs | 4 ++-- src/Foundatio.TestHarness/Jobs/WithLockingJob.cs | 2 +- src/Foundatio.TestHarness/Queue/QueueTestBase.cs | 8 ++++---- src/Foundatio/Metrics/InMemoryMetricsClient.cs | 2 +- tests/Foundatio.Tests/Jobs/JobTests.cs | 9 ++++----- 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/Foundatio.TestHarness/Jobs/ThrottledJob.cs b/src/Foundatio.TestHarness/Jobs/ThrottledJob.cs index 3a600c8a..ade785ef 100644 --- a/src/Foundatio.TestHarness/Jobs/ThrottledJob.cs +++ b/src/Foundatio.TestHarness/Jobs/ThrottledJob.cs @@ -18,7 +18,7 @@ public ThrottledJob(ICacheClient client, ILoggerFactory loggerFactory = null) : private readonly ILockProvider _locker; public int RunCount { get; set; } - protected override Task GetLockAsync(CancellationToken cancellationToken = default(CancellationToken)) + protected override Task GetLockAsync(CancellationToken cancellationToken = default) { return _locker.AcquireAsync(nameof(ThrottledJob), acquireTimeout: TimeSpan.Zero); } @@ -26,7 +26,7 @@ public ThrottledJob(ICacheClient client, ILoggerFactory loggerFactory = null) : protected override Task RunInternalAsync(JobContext context) { RunCount++; - + _logger.LogDebug("Incremented Run Count: {RunCount}", RunCount); return Task.FromResult(JobResult.Success); } } diff --git a/src/Foundatio.TestHarness/Jobs/WithLockingJob.cs b/src/Foundatio.TestHarness/Jobs/WithLockingJob.cs index d92bfaad..4b21bb44 100644 --- a/src/Foundatio.TestHarness/Jobs/WithLockingJob.cs +++ b/src/Foundatio.TestHarness/Jobs/WithLockingJob.cs @@ -17,7 +17,7 @@ public class WithLockingJob : JobWithLockBase public WithLockingJob(ILoggerFactory loggerFactory) : base(loggerFactory) { - _locker = new CacheLockProvider(new InMemoryCacheClient(new InMemoryCacheClientOptions { LoggerFactory = loggerFactory }), new InMemoryMessageBus(new InMemoryMessageBusOptions { LoggerFactory = loggerFactory }), loggerFactory); + _locker = new CacheLockProvider(new InMemoryCacheClient(o => o.LoggerFactory(loggerFactory)), new InMemoryMessageBus(o => o.LoggerFactory(loggerFactory)), loggerFactory); } public int RunCount { get; set; } diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index d1bdf5eb..38010ccd 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -1315,8 +1315,8 @@ public virtual async Task CanCompleteQueueEntryOnceAsync() public virtual async Task CanDequeueWithLockingAsync() { - using var cache = new InMemoryCacheClient(new InMemoryCacheClientOptions { LoggerFactory = Log }); - using var messageBus = new InMemoryMessageBus(new InMemoryMessageBusOptions { LoggerFactory = Log }); + using var cache = new InMemoryCacheClient(o => o.LoggerFactory(Log)); + using var messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log)); var distributedLock = new CacheLockProvider(cache, messageBus, Log); await CanDequeueWithLockingImpAsync(distributedLock); @@ -1375,8 +1375,8 @@ await queue.StartWorkingAsync(async w => public virtual async Task CanHaveMultipleQueueInstancesWithLockingAsync() { - using var cache = new InMemoryCacheClient(new InMemoryCacheClientOptions { LoggerFactory = Log }); - using var messageBus = new InMemoryMessageBus(new InMemoryMessageBusOptions { LoggerFactory = Log }); + using var cache = new InMemoryCacheClient(o => o.LoggerFactory(Log)); + using var messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log)); var distributedLock = new CacheLockProvider(cache, messageBus, Log); await CanHaveMultipleQueueInstancesWithLockingImplAsync(distributedLock); diff --git a/src/Foundatio/Metrics/InMemoryMetricsClient.cs b/src/Foundatio/Metrics/InMemoryMetricsClient.cs index 8a89c481..b29c1d24 100644 --- a/src/Foundatio/Metrics/InMemoryMetricsClient.cs +++ b/src/Foundatio/Metrics/InMemoryMetricsClient.cs @@ -8,7 +8,7 @@ public class InMemoryMetricsClient : CacheBucketMetricsClientBase public InMemoryMetricsClient() : this(o => o) { } public InMemoryMetricsClient(InMemoryMetricsClientOptions options) - : base(new InMemoryCacheClient(new InMemoryCacheClientOptions { LoggerFactory = options?.LoggerFactory }), options) { } + : base(new InMemoryCacheClient(o => o.LoggerFactory(options?.LoggerFactory)), options) { } public InMemoryMetricsClient(Builder config) : this(config(new InMemoryMetricsClientOptionsBuilder()).Build()) { } diff --git a/tests/Foundatio.Tests/Jobs/JobTests.cs b/tests/Foundatio.Tests/Jobs/JobTests.cs index e92cb4dc..ec69ffdf 100644 --- a/tests/Foundatio.Tests/Jobs/JobTests.cs +++ b/tests/Foundatio.Tests/Jobs/JobTests.cs @@ -135,15 +135,14 @@ public async Task CanRunJobsWithLocks() [Fact] public async Task CanRunThrottledJobs() { - using var client = new InMemoryCacheClient(new InMemoryCacheClientOptions { LoggerFactory = Log }); + using var client = new InMemoryCacheClient(o => o.LoggerFactory(Log)); var jobs = new List(new[] { new ThrottledJob(client, Log), new ThrottledJob(client, Log), new ThrottledJob(client, Log) }); var sw = Stopwatch.StartNew(); - using (var timeoutCancellationTokenSource = new CancellationTokenSource(1000)) - { - await Task.WhenAll(jobs.Select(job => job.RunContinuousAsync(TimeSpan.FromMilliseconds(1), cancellationToken: timeoutCancellationTokenSource.Token))); - } + using var timeoutCancellationTokenSource = new CancellationTokenSource(1000); + await Task.WhenAll(jobs.Select(job => job.RunContinuousAsync(TimeSpan.FromMilliseconds(1), cancellationToken: timeoutCancellationTokenSource.Token))); sw.Stop(); + Assert.InRange(jobs.Sum(j => j.RunCount), 4, 14); _logger.LogInformation(jobs.Sum(j => j.RunCount).ToString()); Assert.InRange(sw.ElapsedMilliseconds, 20, 1500); From a8348a2cd8405ad55b334f25beed0c634055f5d1 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 13:43:38 -0500 Subject: [PATCH 16/28] Move scheduling maintenance before dequeued, otherwise a handler could consume all the time and maintenance to auto abandon never happens. --- src/Foundatio.TestHarness/Queue/QueueTestBase.cs | 7 ++++++- src/Foundatio/Queues/InMemoryQueue.cs | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index 38010ccd..9687f0ba 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -1605,12 +1605,17 @@ await queue.StartWorkingAsync(async (item) => do { if (stats.Abandoned > 0) + { + _logger.LogTrace("Breaking, queue item was abandoned"); break; + } stats = await queue.GetQueueStatsAsync(); + _logger.LogTrace("Getting updated stats, Abandoned={Abandoned}", stats.Abandoned); + + await Task.Delay(50, cancellationTokenSource.Token); } while (sw.Elapsed < TimeSpan.FromSeconds(5)); - _logger.LogDebug("Asserting abandon count is 1, {Actual}", stats.Abandoned); Assert.Equal(1, stats.Abandoned); } diff --git a/src/Foundatio/Queues/InMemoryQueue.cs b/src/Foundatio/Queues/InMemoryQueue.cs index 1266ec32..907766f5 100644 --- a/src/Foundatio/Queues/InMemoryQueue.cs +++ b/src/Foundatio/Queues/InMemoryQueue.cs @@ -231,8 +231,8 @@ protected override async Task> DequeueImplAsync(CancellationToken _logger.LogTrace("Dequeue: Got Item"); await entry.RenewLockAsync(); - await OnDequeuedAsync(entry).AnyContext(); ScheduleNextMaintenance(SystemClock.UtcNow.Add(_options.WorkItemTimeout)); + await OnDequeuedAsync(entry).AnyContext(); return entry; } From 8d8efd96fac4a5859ff7917e11949658c836675d Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 14:15:00 -0500 Subject: [PATCH 17/28] Try and schedule maintenance before renewing lock --- src/Foundatio.TestHarness/Queue/QueueTestBase.cs | 6 +++--- src/Foundatio/Queues/InMemoryQueue.cs | 2 +- src/Foundatio/Utility/ScheduledTimer.cs | 1 + tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs | 1 - 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index 9687f0ba..c05fc411 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -1077,13 +1077,12 @@ public virtual async Task CanRunWorkItemWithMetricsAsync() { int completedCount = 0; - using var metrics = new InMemoryMetricsClient(new InMemoryMetricsClientOptions { Buffered = false, LoggerFactory = Log }); + using var metrics = new InMemoryMetricsClient(o => o.Buffered(false).LoggerFactory(Log)); #pragma warning disable CS0618 // Type or member is obsolete var behavior = new MetricsQueueBehavior(metrics, "metric", TimeSpan.FromMilliseconds(100), loggerFactory: Log); #pragma warning restore CS0618 // Type or member is obsolete - var options = new InMemoryQueueOptions { Behaviors = new[] { behavior }, LoggerFactory = Log }; - using var queue = new InMemoryQueue(options); + using var queue = new InMemoryQueue(o => o.Behaviors(behavior).LoggerFactory(Log)); Task Handler(object sender, CompletedEventArgs e) { @@ -1597,6 +1596,7 @@ public virtual async Task CanHandleAutoAbandonInWorker() await queue.StartWorkingAsync(async (item) => { + _logger.LogDebug("Processing item: {Id} Value={Value}", item.Id, item.Value.Data); if (item.Value.Data == "Delay") { // wait for queue item to get auto abandoned diff --git a/src/Foundatio/Queues/InMemoryQueue.cs b/src/Foundatio/Queues/InMemoryQueue.cs index 907766f5..4cf080e1 100644 --- a/src/Foundatio/Queues/InMemoryQueue.cs +++ b/src/Foundatio/Queues/InMemoryQueue.cs @@ -230,8 +230,8 @@ protected override async Task> DequeueImplAsync(CancellationToken Interlocked.Increment(ref _dequeuedCount); _logger.LogTrace("Dequeue: Got Item"); - await entry.RenewLockAsync(); ScheduleNextMaintenance(SystemClock.UtcNow.Add(_options.WorkItemTimeout)); + await entry.RenewLockAsync(); await OnDequeuedAsync(entry).AnyContext(); return entry; diff --git a/src/Foundatio/Utility/ScheduledTimer.cs b/src/Foundatio/Utility/ScheduledTimer.cs index 045e2daa..9ed16820 100644 --- a/src/Foundatio/Utility/ScheduledTimer.cs +++ b/src/Foundatio/Utility/ScheduledTimer.cs @@ -38,6 +38,7 @@ public void ScheduleNext(DateTime? utcDate = null) bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); if (isTraceLogLevelEnabled) _logger.LogTrace("ScheduleNext called: value={NextRun:O}", utcDate.Value); + if (utcDate == DateTime.MaxValue) { if (isTraceLogLevelEnabled) _logger.LogTrace("Ignoring MaxValue"); diff --git a/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs b/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs index 88d1265a..e8fb1a80 100644 --- a/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs +++ b/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs @@ -47,7 +47,6 @@ public Task CanRunWithMinimumInterval() private async Task CanRunConcurrentlyAsync(TimeSpan? minimumIntervalTime = null) { - Log.DefaultMinimumLevel = LogLevel.Trace; const int iterations = 2; var countdown = new AsyncCountdownEvent(iterations); From bc21f1ae309196c86dfe1f7ee4d5f9c4389ef738 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 14:19:05 -0500 Subject: [PATCH 18/28] Change it back. --- src/Foundatio/Queues/InMemoryQueue.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Foundatio/Queues/InMemoryQueue.cs b/src/Foundatio/Queues/InMemoryQueue.cs index 4cf080e1..907766f5 100644 --- a/src/Foundatio/Queues/InMemoryQueue.cs +++ b/src/Foundatio/Queues/InMemoryQueue.cs @@ -230,8 +230,8 @@ protected override async Task> DequeueImplAsync(CancellationToken Interlocked.Increment(ref _dequeuedCount); _logger.LogTrace("Dequeue: Got Item"); - ScheduleNextMaintenance(SystemClock.UtcNow.Add(_options.WorkItemTimeout)); await entry.RenewLockAsync(); + ScheduleNextMaintenance(SystemClock.UtcNow.Add(_options.WorkItemTimeout)); await OnDequeuedAsync(entry).AnyContext(); return entry; From 2350f41d5a86f0beb9e386724ce139fbcc6d2450 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 14:32:56 -0500 Subject: [PATCH 19/28] Changed where scheduling maintenance gets called --- src/Foundatio/Queues/InMemoryQueue.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Foundatio/Queues/InMemoryQueue.cs b/src/Foundatio/Queues/InMemoryQueue.cs index 907766f5..aee97c3d 100644 --- a/src/Foundatio/Queues/InMemoryQueue.cs +++ b/src/Foundatio/Queues/InMemoryQueue.cs @@ -221,6 +221,8 @@ protected override async Task> DequeueImplAsync(CancellationToken if (!_queue.TryDequeue(out var entry) || entry == null) return null; + ScheduleNextMaintenance(SystemClock.UtcNow.Add(_options.WorkItemTimeout)); + entry.Attempts++; entry.DequeuedTimeUtc = SystemClock.UtcNow; @@ -231,7 +233,6 @@ protected override async Task> DequeueImplAsync(CancellationToken _logger.LogTrace("Dequeue: Got Item"); await entry.RenewLockAsync(); - ScheduleNextMaintenance(SystemClock.UtcNow.Add(_options.WorkItemTimeout)); await OnDequeuedAsync(entry).AnyContext(); return entry; From 12051a3eff7132a6588833a9fcb9d66dbe02fe79 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 14:42:22 -0500 Subject: [PATCH 20/28] Fixed a flakey test where expires might be called right at the same time as the key was set to expire --- tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs b/tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs index 13a18d28..b88b3976 100644 --- a/tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs +++ b/tests/Foundatio.Tests/Caching/InMemoryCacheClientTests.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Threading.Tasks; using Foundatio.Caching; @@ -194,7 +194,9 @@ public async Task SetAllShouldExpire() var expiry = TimeSpan.FromMilliseconds(50); await client.SetAllAsync(new Dictionary { { "test", "value" } }, expiry); - await Task.Delay(expiry); + + // Add 1ms to the expiry to ensure the cache has expired as the delay window is not guaranteed to be exact. + await Task.Delay(expiry.Add(TimeSpan.FromMilliseconds(1))); Assert.False(await client.ExistsAsync("test")); } From 3b19ba34ebb83e0c3181d1158466d84be5b6864b Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 14:44:09 -0500 Subject: [PATCH 21/28] Pass cancellation token through test --- src/Foundatio.TestHarness/Queue/QueueTestBase.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index c05fc411..cfbdee9c 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -1409,11 +1409,11 @@ await q.StartWorkingAsync(async w => { if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("[{Instance}] Acquiring distributed lock in work item: {Id}", instanceCount, w.Id); - var l = await distributedLock.AcquireAsync("test"); + var l = await distributedLock.AcquireAsync("test", cancellationToken: cancellationTokenSource.Token); Assert.NotNull(l); if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("[{Instance}] Acquired distributed lock: {Id}", instanceCount, w.Id); - await SystemClock.SleepAsync(TimeSpan.FromMilliseconds(50)); + await SystemClock.SleepAsync(TimeSpan.FromMilliseconds(50), cancellationTokenSource.Token); await l.ReleaseAsync(); if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("[{Instance}] Released distributed lock: {Id}", instanceCount, w.Id); From 3b44a780b7f19a92b84e44b146fc89ea025c2042 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 14:50:12 -0500 Subject: [PATCH 22/28] Removed retry fact from can run jobs with locks --- tests/Foundatio.Tests/Jobs/JobTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Foundatio.Tests/Jobs/JobTests.cs b/tests/Foundatio.Tests/Jobs/JobTests.cs index ec69ffdf..48777b69 100644 --- a/tests/Foundatio.Tests/Jobs/JobTests.cs +++ b/tests/Foundatio.Tests/Jobs/JobTests.cs @@ -117,7 +117,7 @@ public async Task CanCancelContinuousJobs() } } - [RetryFact] + [Fact] public async Task CanRunJobsWithLocks() { var job = new WithLockingJob(Log); From 19af80399e06404b5a13197983da1c90276e7c64 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 14:52:26 -0500 Subject: [PATCH 23/28] Added logging to get more info from test run on ci. --- tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs b/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs index 7f15b561..661be25e 100644 --- a/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs +++ b/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; @@ -291,6 +291,8 @@ public override Task VerifyDelayedRetryAttemptsAsync() [Fact] public override Task CanHandleAutoAbandonInWorker() { + Log.DefaultMinimumLevel = LogLevel.Trace; + Log.SetLogLevel(LogLevel.Trace); return base.CanHandleAutoAbandonInWorker(); } From 01c723b79ff7fcd285d23040bfb2de42850632e2 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 15:13:34 -0500 Subject: [PATCH 24/28] Add a small buffer to DoMaintenanceAsync to account for system clock drift, see if this fixes flakey ci test. --- src/Foundatio/Queues/InMemoryQueue.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Foundatio/Queues/InMemoryQueue.cs b/src/Foundatio/Queues/InMemoryQueue.cs index aee97c3d..3b986173 100644 --- a/src/Foundatio/Queues/InMemoryQueue.cs +++ b/src/Foundatio/Queues/InMemoryQueue.cs @@ -392,7 +392,9 @@ public override Task DeleteQueueAsync() _logger.LogError(ex, "DoMaintenance Error: {Message}", ex.Message); } - return minAbandonAt; + // Add a tiny buffer just in case the schedule next timer fires early. + // The system clock typically has a resolution of 10-15 milliseconds, so timers cannot be more accurate than this resolution. + return minAbandonAt.SafeAdd(TimeSpan.FromMilliseconds(10)); } public override void Dispose() From 0ea6061c154f266d49ae4bf850fb2d5b212d875d Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 15:19:39 -0500 Subject: [PATCH 25/28] Try 15 second buffer for timer drift --- src/Foundatio/Queues/InMemoryQueue.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Foundatio/Queues/InMemoryQueue.cs b/src/Foundatio/Queues/InMemoryQueue.cs index 3b986173..a73dc490 100644 --- a/src/Foundatio/Queues/InMemoryQueue.cs +++ b/src/Foundatio/Queues/InMemoryQueue.cs @@ -394,7 +394,7 @@ public override Task DeleteQueueAsync() // Add a tiny buffer just in case the schedule next timer fires early. // The system clock typically has a resolution of 10-15 milliseconds, so timers cannot be more accurate than this resolution. - return minAbandonAt.SafeAdd(TimeSpan.FromMilliseconds(10)); + return minAbandonAt.SafeAdd(TimeSpan.FromMilliseconds(15)); } public override void Dispose() From 57ac047c09b69d8bee6023f11dc745ed446149f2 Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 15:36:54 -0500 Subject: [PATCH 26/28] Changed message to be date time which is more readable... --- src/Foundatio/Utility/ScheduledTimer.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Foundatio/Utility/ScheduledTimer.cs b/src/Foundatio/Utility/ScheduledTimer.cs index 9ed16820..6b480492 100644 --- a/src/Foundatio/Utility/ScheduledTimer.cs +++ b/src/Foundatio/Utility/ScheduledTimer.cs @@ -49,7 +49,7 @@ public void ScheduleNext(DateTime? utcDate = null) if (_next > utcNow && utcDate > _next) { if (isTraceLogLevelEnabled) - _logger.LogTrace("Ignoring because already scheduled for earlier time: {PreviousTicks} Next: {NextTicks}", utcDate.Value.Ticks, _next.Ticks); + _logger.LogTrace("Ignoring because already scheduled for earlier time: {PreviousNextRun:O} Next: {NextRun:O}", utcDate.Value, _next); return; } @@ -66,7 +66,7 @@ public void ScheduleNext(DateTime? utcDate = null) if (_next > utcNow && utcDate > _next) { if (isTraceLogLevelEnabled) - _logger.LogTrace("Ignoring because already scheduled for earlier time: {PreviousTicks} Next: {NextTicks}", utcDate.Value.Ticks, _next.Ticks); + _logger.LogTrace("Ignoring because already scheduled for earlier time: {PreviousNextRun:O} Next: {NextRun:O}", utcDate.Value, _next); return; } From 499dbb447b35888a939aebc35cd74e54a25c0edd Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 16:14:49 -0500 Subject: [PATCH 27/28] Try and fix a edge case where a timer could be fired earlier than the next run time and it scheduling was cancelled. --- src/Foundatio/Utility/ScheduledTimer.cs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/Foundatio/Utility/ScheduledTimer.cs b/src/Foundatio/Utility/ScheduledTimer.cs index 6b480492..74431e94 100644 --- a/src/Foundatio/Utility/ScheduledTimer.cs +++ b/src/Foundatio/Utility/ScheduledTimer.cs @@ -102,6 +102,9 @@ private async Task RunCallbackAsync() return; } + // If the callback runs before the next time, then store it here before we reset it and use it for scheduling. + DateTime? nextTimeOverride = null; + if (isTraceLogLevelEnabled) _logger.LogTrace("Starting RunCallbackAsync"); using (await _lock.LockAsync().AnyContext()) { @@ -115,6 +118,13 @@ private async Task RunCallbackAsync() } _last = SystemClock.UtcNow; + if (SystemClock.UtcNow < _next) + { + _logger.LogWarning("ScheduleNext RunCallbackAsync was called before next run time {NextRun:O}, setting next to current time and rescheduling", _next); + nextTimeOverride = _next; + _next = SystemClock.UtcNow; + _shouldRunAgainImmediately = true; + } } try @@ -147,7 +157,10 @@ private async Task RunCallbackAsync() if (isTraceLogLevelEnabled) _logger.LogTrace("Finished sleeping"); } - var nextRun = SystemClock.UtcNow.AddMilliseconds(10); + var nextRun = SystemClock.UtcNow.AddMilliseconds(10); + if (nextRun < nextTimeOverride) + nextRun = nextTimeOverride.Value; + if (_shouldRunAgainImmediately || next.HasValue && next.Value <= nextRun) ScheduleNext(nextRun); else if (next.HasValue) From 40e03abe0b26984a282bfc17d5fc0c28d320e70f Mon Sep 17 00:00:00 2001 From: Blake Niemyjski Date: Wed, 20 Mar 2024 16:18:38 -0500 Subject: [PATCH 28/28] Removed extra trace logging --- tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs b/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs index 661be25e..8d0a7fbf 100644 --- a/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs +++ b/tests/Foundatio.Tests/Queue/InMemoryQueueTests.cs @@ -291,8 +291,6 @@ public override Task VerifyDelayedRetryAttemptsAsync() [Fact] public override Task CanHandleAutoAbandonInWorker() { - Log.DefaultMinimumLevel = LogLevel.Trace; - Log.SetLogLevel(LogLevel.Trace); return base.CanHandleAutoAbandonInWorker(); }