diff --git a/src/Foundatio.TestHarness/Locks/LockTestBase.cs b/src/Foundatio.TestHarness/Locks/LockTestBase.cs index 1b987aab..69ffca43 100644 --- a/src/Foundatio.TestHarness/Locks/LockTestBase.cs +++ b/src/Foundatio.TestHarness/Locks/LockTestBase.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -183,6 +183,42 @@ public virtual async Task CanAcquireLocksInParallel() if (locker == null) return; + Log.SetLogLevel(LogLevel.Trace); + + const int COUNT = 100; + int current = 1; + var used = new List(); + int concurrency = 0; + + await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) => + { + await using var myLock = await locker.AcquireAsync("test", TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)); + Assert.NotNull(myLock); + + int currentConcurrency = Interlocked.Increment(ref concurrency); + Assert.Equal(1, currentConcurrency); + + int item = current; + await Task.Delay(10, ct); + used.Add(item); + current++; + + Interlocked.Decrement(ref concurrency); + }); + + var duplicates = used.GroupBy(x => x).Where(g => g.Count() > 1); + Assert.Empty(duplicates); + Assert.Equal(COUNT, used.Count); + } + + public virtual async Task CanAcquireScopedLocksInParallel() + { + var lockProvider = GetLockProvider(); + if (lockProvider == null) + return; + + var locker = new ScopedLockProvider(lockProvider, "scoped"); + Log.SetLogLevel(LogLevel.Debug); const int COUNT = 100; @@ -190,7 +226,7 @@ public virtual async Task CanAcquireLocksInParallel() var used = new List(); int concurrency = 0; - await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (index, ct) => + await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) => { await using var myLock = await locker.AcquireAsync("test", TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)); Assert.NotNull(myLock); @@ -211,6 +247,40 @@ await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (index, ct) => Assert.Equal(COUNT, used.Count); } + public virtual async Task CanAcquireMultipleLocksInParallel() + { + var locker = GetLockProvider(); + if (locker == null) + return; + + Log.SetLogLevel(LogLevel.Debug); + + const int COUNT = 100; + int current = 1; + var used = new List(); + int concurrency = 0; + + await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) => + { + await using var myLock = await locker.AcquireAsync(["test"], TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)); + Assert.NotNull(myLock); + + int currentConcurrency = Interlocked.Increment(ref concurrency); + Assert.Equal(1, currentConcurrency); + + int item = current; + await Task.Delay(10, ct); + used.Add(item); + current++; + + Interlocked.Decrement(ref concurrency); + }); + + var duplicates = used.GroupBy(x => x).Where(g => g.Count() > 1); + Assert.Empty(duplicates); + Assert.Equal(COUNT, used.Count); + } + public virtual async Task LockOneAtATimeAsync() { var locker = GetLockProvider(); diff --git a/src/Foundatio/Caching/InMemoryCacheClient.cs b/src/Foundatio/Caching/InMemoryCacheClient.cs index 5b7d5ad2..8085b040 100644 --- a/src/Foundatio/Caching/InMemoryCacheClient.cs +++ b/src/Foundatio/Caching/InMemoryCacheClient.cs @@ -6,6 +6,7 @@ using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; +using Foundatio.AsyncEx; using Foundatio.Utility; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -22,7 +23,7 @@ public class InMemoryCacheClient : IMemoryCacheClient private long _hits; private long _misses; private readonly ILogger _logger; - private readonly object _lock = new(); + private readonly AsyncLock _lock = new(); public InMemoryCacheClient() : this(o => o) { } @@ -847,16 +848,16 @@ private async Task StartMaintenanceAsync(bool compactImmediately = false) } } - private Task CompactAsync() + private async Task CompactAsync() { if (!_maxItems.HasValue || _memory.Count <= _maxItems) - return Task.CompletedTask; + return; string expiredKey = null; - lock (_lock) + using (await _lock.LockAsync().AnyContext()) { if (_memory.Count <= _maxItems) - return Task.CompletedTask; + return; (string Key, long LastAccessTicks, long InstanceNumber) oldest = (null, Int64.MaxValue, 0); foreach (var kvp in _memory) @@ -874,8 +875,6 @@ private Task CompactAsync() if (expiredKey != null) OnItemExpired(expiredKey); - - return Task.CompletedTask; } private async Task DoMaintenanceAsync() diff --git a/src/Foundatio/Lock/CacheLockProvider.cs b/src/Foundatio/Lock/CacheLockProvider.cs index f1841755..fa34767f 100644 --- a/src/Foundatio/Lock/CacheLockProvider.cs +++ b/src/Foundatio/Lock/CacheLockProvider.cs @@ -83,16 +83,15 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); bool isDebugLogLevelEnabled = _logger.IsEnabled(LogLevel.Debug); bool shouldWait = !cancellationToken.IsCancellationRequested; - if (isDebugLogLevelEnabled) - _logger.LogDebug("Attempting to acquire lock: {Resource}", resource); + string lockId = GenerateNewLockId(); + timeUntilExpires ??= TimeSpan.FromMinutes(20); - if (!timeUntilExpires.HasValue) - timeUntilExpires = TimeSpan.FromMinutes(20); + if (isDebugLogLevelEnabled) + _logger.LogDebug("Attempting to acquire lock ({LockId}): {Resource}", lockId, resource); using var activity = StartLockActivity(resource); bool gotLock = false; - string lockId = GenerateNewLockId(); var sw = Stopwatch.StartNew(); try { @@ -105,18 +104,22 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire else gotLock = await _cacheClient.AddAsync(resource, lockId, timeUntilExpires).AnyContext(); } - catch { } + catch (Exception ex) + { + if (isTraceLogLevelEnabled) + _logger.LogTrace(ex, "Error acquiring lock ({LockId}): {Resource}", lockId, resource); + } if (gotLock) break; if (isDebugLogLevelEnabled) - _logger.LogDebug("Failed to acquire lock: {Resource}", resource); + _logger.LogDebug("Failed to acquire lock ({LockId}): {Resource}", lockId, resource); if (cancellationToken.IsCancellationRequested) { if (isTraceLogLevelEnabled && shouldWait) - _logger.LogTrace("Cancellation requested"); + _logger.LogTrace("Cancellation requested while acquiring lock ({LockId}): {Resource}", lockId, resource); break; } @@ -128,16 +131,14 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire var keyExpiration = SystemClock.UtcNow.SafeAdd(await _cacheClient.GetExpirationAsync(resource).AnyContext() ?? TimeSpan.Zero); var delayAmount = keyExpiration.Subtract(SystemClock.UtcNow); - // delay a minimum of 50ms + // delay a minimum of 50ms and a maximum of 3 seconds if (delayAmount < TimeSpan.FromMilliseconds(50)) delayAmount = TimeSpan.FromMilliseconds(50); - - // delay a maximum of 3 seconds - if (delayAmount > TimeSpan.FromSeconds(3)) + else if (delayAmount > TimeSpan.FromSeconds(3)) delayAmount = TimeSpan.FromSeconds(3); if (isTraceLogLevelEnabled) - _logger.LogTrace("Will wait {Delay:g} before retrying to acquire lock: {Resource}", delayAmount, resource); + _logger.LogTrace("Will wait {Delay:g} before retrying to acquire lock ({LockId}): {Resource}", delayAmount, lockId, resource); // wait until we get a message saying the lock was released or 3 seconds has elapsed or cancellation has been requested using (var maxWaitCancellationTokenSource = new CancellationTokenSource(delayAmount)) @@ -176,17 +177,17 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire _lockTimeoutCounter.Add(1); if (cancellationToken.IsCancellationRequested && isTraceLogLevelEnabled) - _logger.LogTrace("Cancellation requested for lock {Resource} after {Duration:g}", resource, sw.Elapsed); + _logger.LogTrace("Cancellation requested for lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed); else if (_logger.IsEnabled(LogLevel.Warning)) - _logger.LogWarning("Failed to acquire lock {Resource} after {Duration:g}", resource, lockId, sw.Elapsed); + _logger.LogWarning("Failed to acquire lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed); return null; } if (sw.Elapsed > TimeSpan.FromSeconds(5) && _logger.IsEnabled(LogLevel.Warning)) - _logger.LogWarning("Acquired lock {Resource} ({LockId}) after {Duration:g}", resource, lockId, sw.Elapsed); + _logger.LogWarning("Acquired lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed); else if (_logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("Acquired lock {Resource} ({LockId}) after {Duration:g}", resource, lockId, sw.Elapsed); + _logger.LogDebug("Acquired lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed); return new DisposableLock(resource, lockId, sw.Elapsed, this, _logger, releaseOnDispose); } diff --git a/src/Foundatio/Lock/DisposableLock.cs b/src/Foundatio/Lock/DisposableLock.cs index fa243bbf..9601cc0a 100644 --- a/src/Foundatio/Lock/DisposableLock.cs +++ b/src/Foundatio/Lock/DisposableLock.cs @@ -1,6 +1,7 @@ using System; using System.Diagnostics; using System.Threading.Tasks; +using Foundatio.AsyncEx; using Foundatio.Utility; using Microsoft.Extensions.Logging; @@ -12,7 +13,7 @@ internal class DisposableLock : ILock private readonly ILogger _logger; private bool _isReleased; private int _renewalCount; - private readonly object _lock = new(); + private readonly AsyncLock _lock = new(); private readonly Stopwatch _duration; private readonly bool _shouldReleaseOnDispose; @@ -41,7 +42,7 @@ public async ValueTask DisposeAsync() bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace); if (isTraceLogLevelEnabled) - _logger.LogTrace("Disposing lock {Resource}", Resource); + _logger.LogTrace("Disposing lock ({LockId}) {Resource}", LockId, Resource); try { @@ -50,42 +51,42 @@ public async ValueTask DisposeAsync() catch (Exception ex) { if (_logger.IsEnabled(LogLevel.Error)) - _logger.LogError(ex, "Unable to release lock {Resource}", Resource); + _logger.LogError(ex, "Unable to release lock ({LockId}) {Resource}", LockId, Resource); } if (isTraceLogLevelEnabled) - _logger.LogTrace("Disposed lock {Resource}", Resource); + _logger.LogTrace("Disposed lock ({LockId}) {Resource}", LockId, Resource); } public async Task RenewAsync(TimeSpan? lockExtension = null) { if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("Renewing lock {Resource}", Resource); + _logger.LogTrace("Renewing lock ({LockId}) {Resource}", LockId, Resource); await _lockProvider.RenewAsync(Resource, LockId, lockExtension).AnyContext(); _renewalCount++; if (_logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("Renewed lock {Resource}", Resource); + _logger.LogDebug("Renewed lock ({LockId}) {Resource}", LockId, Resource); } - public Task ReleaseAsync() + public async Task ReleaseAsync() { if (_isReleased) - return Task.CompletedTask; + return; - lock (_lock) + using (await _lock.LockAsync().AnyContext()) { if (_isReleased) - return Task.CompletedTask; + return; _isReleased = true; _duration.Stop(); if (_logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("Releasing lock {Resource} after {Duration:g}", Resource, _duration.Elapsed); + _logger.LogDebug("Releasing lock ({LockId}) {Resource} after {Duration:g}", LockId, Resource, _duration.Elapsed); - return _lockProvider.ReleaseAsync(Resource, LockId); + await _lockProvider.ReleaseAsync(Resource, LockId); } } } diff --git a/src/Foundatio/Lock/DisposableLockCollection.cs b/src/Foundatio/Lock/DisposableLockCollection.cs index 1f61ba98..26671f1a 100644 --- a/src/Foundatio/Lock/DisposableLockCollection.cs +++ b/src/Foundatio/Lock/DisposableLockCollection.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.Linq; using System.Threading.Tasks; +using Foundatio.AsyncEx; using Foundatio.Utility; using Microsoft.Extensions.Logging; @@ -14,7 +15,7 @@ internal class DisposableLockCollection : ILock private readonly ILogger _logger; private bool _isReleased; private int _renewalCount; - private readonly object _lock = new(); + private readonly AsyncLock _lock = new(); private readonly Stopwatch _duration; public DisposableLockCollection(IEnumerable locks, string lockId, TimeSpan timeWaitedForLock, ILogger logger) @@ -50,15 +51,15 @@ public async Task RenewAsync(TimeSpan? lockExtension = null) _logger.LogDebug("Renewing {LockCount} locks {Resource}", _locks.Count, Resource); } - public Task ReleaseAsync() + public async Task ReleaseAsync() { if (_isReleased) - return Task.CompletedTask; + return; - lock (_lock) + using (await _lock.LockAsync().AnyContext()) { if (_isReleased) - return Task.CompletedTask; + return; _isReleased = true; _duration.Stop(); @@ -66,7 +67,7 @@ public Task ReleaseAsync() if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Releasing {LockCount} locks {Resource} after {Duration:g}", _locks.Count, Resource, _duration.Elapsed); - return Task.WhenAll(_locks.Select(l => l.ReleaseAsync())); + await Task.WhenAll(_locks.Select(l => l.ReleaseAsync())); } } diff --git a/tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs b/tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs index 31c0bb4a..ef1a9c89 100644 --- a/tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs +++ b/tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs @@ -25,7 +25,7 @@ public override Task CanRunMultipleQueueJobsAsync() return base.CanRunMultipleQueueJobsAsync(); } - [RetryFact] + [Fact] public override Task CanRunQueueJobWithLockFailAsync() { Log.SetLogLevel(LogLevel.Trace); diff --git a/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs b/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs index 3f010eaa..64ebbd4b 100644 --- a/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs +++ b/tests/Foundatio.Tests/Locks/InMemoryLockTests.cs @@ -59,19 +59,31 @@ public override Task CanAcquireMultipleResources() return base.CanAcquireMultipleResources(); } - [RetryFact] + [Fact] public override Task CanAcquireLocksInParallel() { return base.CanAcquireLocksInParallel(); } + [Fact] + public override Task CanAcquireScopedLocksInParallel() + { + return base.CanAcquireScopedLocksInParallel(); + } + + [Fact] + public override Task CanAcquireMultipleLocksInParallel() + { + return base.CanAcquireScopedLocksInParallel(); + } + [Fact] public override Task CanAcquireMultipleScopedResources() { return base.CanAcquireMultipleScopedResources(); } - [RetryFact] + [Fact] public override Task WillThrottleCallsAsync() { Log.SetLogLevel(LogLevel.Trace); diff --git a/tests/Foundatio.Tests/Metrics/InMemoryMetricsTests.cs b/tests/Foundatio.Tests/Metrics/InMemoryMetricsTests.cs index 619ecc30..7953aceb 100644 --- a/tests/Foundatio.Tests/Metrics/InMemoryMetricsTests.cs +++ b/tests/Foundatio.Tests/Metrics/InMemoryMetricsTests.cs @@ -31,7 +31,7 @@ public override Task CanIncrementCounterAsync() return base.CanIncrementCounterAsync(); } - [RetryFact] + [Fact] public override Task CanWaitForCounterAsync() { using (TestSystemClock.Install()) diff --git a/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs b/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs index fae29893..88d1265a 100644 --- a/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs +++ b/tests/Foundatio.Tests/Utility/ScheduledTimerTests.cs @@ -33,7 +33,7 @@ public Task CanRun() return resetEvent.WaitAsync(new CancellationTokenSource(500).Token); } - [RetryFact] + [Fact] public Task CanRunAndScheduleConcurrently() { return CanRunConcurrentlyAsync();