Skip to content

Commit

Permalink
Remove retry fact on tests to expose flakey tests and add more cache …
Browse files Browse the repository at this point in the history
…lock tests and changes
  • Loading branch information
niemyjski committed Mar 18, 2024
1 parent 55da2bd commit 9087b73
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 49 deletions.
74 changes: 72 additions & 2 deletions src/Foundatio.TestHarness/Locks/LockTestBase.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand Down Expand Up @@ -183,14 +183,50 @@ public virtual async Task CanAcquireLocksInParallel()
if (locker == null)
return;

Log.SetLogLevel<CacheLockProvider>(LogLevel.Trace);

const int COUNT = 100;
int current = 1;
var used = new List<int>();
int concurrency = 0;

await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) =>
{
await using var myLock = await locker.AcquireAsync("test", TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
Assert.NotNull(myLock);

int currentConcurrency = Interlocked.Increment(ref concurrency);
Assert.Equal(1, currentConcurrency);

int item = current;
await Task.Delay(10, ct);
used.Add(item);
current++;

Interlocked.Decrement(ref concurrency);
});

var duplicates = used.GroupBy(x => x).Where(g => g.Count() > 1);
Assert.Empty(duplicates);
Assert.Equal(COUNT, used.Count);
}

public virtual async Task CanAcquireScopedLocksInParallel()
{
var lockProvider = GetLockProvider();
if (lockProvider == null)
return;

var locker = new ScopedLockProvider(lockProvider, "scoped");

Log.SetLogLevel<CacheLockProvider>(LogLevel.Debug);

const int COUNT = 100;
int current = 1;
var used = new List<int>();
int concurrency = 0;

await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (index, ct) =>
await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) =>
{
await using var myLock = await locker.AcquireAsync("test", TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
Assert.NotNull(myLock);
Expand All @@ -211,6 +247,40 @@ await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (index, ct) =>
Assert.Equal(COUNT, used.Count);
}

public virtual async Task CanAcquireMultipleLocksInParallel()
{
var locker = GetLockProvider();
if (locker == null)
return;

Log.SetLogLevel<CacheLockProvider>(LogLevel.Debug);

const int COUNT = 100;
int current = 1;
var used = new List<int>();
int concurrency = 0;

await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) =>
{
await using var myLock = await locker.AcquireAsync(["test"], TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
Assert.NotNull(myLock);

int currentConcurrency = Interlocked.Increment(ref concurrency);
Assert.Equal(1, currentConcurrency);

int item = current;
await Task.Delay(10, ct);
used.Add(item);
current++;

Interlocked.Decrement(ref concurrency);
});

var duplicates = used.GroupBy(x => x).Where(g => g.Count() > 1);
Assert.Empty(duplicates);
Assert.Equal(COUNT, used.Count);
}

public virtual async Task LockOneAtATimeAsync()
{
var locker = GetLockProvider();
Expand Down
13 changes: 6 additions & 7 deletions src/Foundatio/Caching/InMemoryCacheClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.AsyncEx;
using Foundatio.Utility;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
Expand All @@ -22,7 +23,7 @@ public class InMemoryCacheClient : IMemoryCacheClient
private long _hits;
private long _misses;
private readonly ILogger _logger;
private readonly object _lock = new();
private readonly AsyncLock _lock = new();

public InMemoryCacheClient() : this(o => o) { }

Expand Down Expand Up @@ -847,16 +848,16 @@ private async Task StartMaintenanceAsync(bool compactImmediately = false)
}
}

private Task CompactAsync()
private async Task CompactAsync()
{
if (!_maxItems.HasValue || _memory.Count <= _maxItems)
return Task.CompletedTask;
return;

string expiredKey = null;
lock (_lock)
using (await _lock.LockAsync().AnyContext())
{
if (_memory.Count <= _maxItems)
return Task.CompletedTask;
return;

(string Key, long LastAccessTicks, long InstanceNumber) oldest = (null, Int64.MaxValue, 0);
foreach (var kvp in _memory)
Expand All @@ -874,8 +875,6 @@ private Task CompactAsync()

if (expiredKey != null)
OnItemExpired(expiredKey);

return Task.CompletedTask;
}

private async Task DoMaintenanceAsync()
Expand Down
35 changes: 18 additions & 17 deletions src/Foundatio/Lock/CacheLockProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,15 @@ public async Task<ILock> AcquireAsync(string resource, TimeSpan? timeUntilExpire
bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace);
bool isDebugLogLevelEnabled = _logger.IsEnabled(LogLevel.Debug);
bool shouldWait = !cancellationToken.IsCancellationRequested;
if (isDebugLogLevelEnabled)
_logger.LogDebug("Attempting to acquire lock: {Resource}", resource);
string lockId = GenerateNewLockId();
timeUntilExpires ??= TimeSpan.FromMinutes(20);

if (!timeUntilExpires.HasValue)
timeUntilExpires = TimeSpan.FromMinutes(20);
if (isDebugLogLevelEnabled)
_logger.LogDebug("Attempting to acquire lock ({LockId}): {Resource}", lockId, resource);

using var activity = StartLockActivity(resource);

bool gotLock = false;
string lockId = GenerateNewLockId();
var sw = Stopwatch.StartNew();
try
{
Expand All @@ -105,18 +104,22 @@ public async Task<ILock> AcquireAsync(string resource, TimeSpan? timeUntilExpire
else
gotLock = await _cacheClient.AddAsync(resource, lockId, timeUntilExpires).AnyContext();
}
catch { }
catch (Exception ex)
{
if (isTraceLogLevelEnabled)
_logger.LogTrace(ex, "Error acquiring lock ({LockId}): {Resource}", lockId, resource);
}

if (gotLock)
break;

if (isDebugLogLevelEnabled)
_logger.LogDebug("Failed to acquire lock: {Resource}", resource);
_logger.LogDebug("Failed to acquire lock ({LockId}): {Resource}", lockId, resource);

if (cancellationToken.IsCancellationRequested)
{
if (isTraceLogLevelEnabled && shouldWait)
_logger.LogTrace("Cancellation requested");
_logger.LogTrace("Cancellation requested while acquiring lock ({LockId}): {Resource}", lockId, resource);

break;
}
Expand All @@ -128,16 +131,14 @@ public async Task<ILock> AcquireAsync(string resource, TimeSpan? timeUntilExpire
var keyExpiration = SystemClock.UtcNow.SafeAdd(await _cacheClient.GetExpirationAsync(resource).AnyContext() ?? TimeSpan.Zero);
var delayAmount = keyExpiration.Subtract(SystemClock.UtcNow);

// delay a minimum of 50ms
// delay a minimum of 50ms and a maximum of 3 seconds
if (delayAmount < TimeSpan.FromMilliseconds(50))
delayAmount = TimeSpan.FromMilliseconds(50);

// delay a maximum of 3 seconds
if (delayAmount > TimeSpan.FromSeconds(3))
else if (delayAmount > TimeSpan.FromSeconds(3))
delayAmount = TimeSpan.FromSeconds(3);

if (isTraceLogLevelEnabled)
_logger.LogTrace("Will wait {Delay:g} before retrying to acquire lock: {Resource}", delayAmount, resource);
_logger.LogTrace("Will wait {Delay:g} before retrying to acquire lock ({LockId}): {Resource}", delayAmount, lockId, resource);

// wait until we get a message saying the lock was released or 3 seconds has elapsed or cancellation has been requested
using (var maxWaitCancellationTokenSource = new CancellationTokenSource(delayAmount))
Expand Down Expand Up @@ -176,17 +177,17 @@ public async Task<ILock> AcquireAsync(string resource, TimeSpan? timeUntilExpire
_lockTimeoutCounter.Add(1);

if (cancellationToken.IsCancellationRequested && isTraceLogLevelEnabled)
_logger.LogTrace("Cancellation requested for lock {Resource} after {Duration:g}", resource, sw.Elapsed);
_logger.LogTrace("Cancellation requested for lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed);
else if (_logger.IsEnabled(LogLevel.Warning))
_logger.LogWarning("Failed to acquire lock {Resource} after {Duration:g}", resource, lockId, sw.Elapsed);
_logger.LogWarning("Failed to acquire lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed);

return null;
}

if (sw.Elapsed > TimeSpan.FromSeconds(5) && _logger.IsEnabled(LogLevel.Warning))
_logger.LogWarning("Acquired lock {Resource} ({LockId}) after {Duration:g}", resource, lockId, sw.Elapsed);
_logger.LogWarning("Acquired lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed);
else if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("Acquired lock {Resource} ({LockId}) after {Duration:g}", resource, lockId, sw.Elapsed);
_logger.LogDebug("Acquired lock ({LockId}) {Resource} after {Duration:g}", lockId, resource, sw.Elapsed);

return new DisposableLock(resource, lockId, sw.Elapsed, this, _logger, releaseOnDispose);
}
Expand Down
25 changes: 13 additions & 12 deletions src/Foundatio/Lock/DisposableLock.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Foundatio.AsyncEx;
using Foundatio.Utility;
using Microsoft.Extensions.Logging;

Expand All @@ -12,7 +13,7 @@ internal class DisposableLock : ILock
private readonly ILogger _logger;
private bool _isReleased;
private int _renewalCount;
private readonly object _lock = new();
private readonly AsyncLock _lock = new();
private readonly Stopwatch _duration;
private readonly bool _shouldReleaseOnDispose;

Expand Down Expand Up @@ -41,7 +42,7 @@ public async ValueTask DisposeAsync()

bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace);
if (isTraceLogLevelEnabled)
_logger.LogTrace("Disposing lock {Resource}", Resource);
_logger.LogTrace("Disposing lock ({LockId}) {Resource}", LockId, Resource);

try
{
Expand All @@ -50,42 +51,42 @@ public async ValueTask DisposeAsync()
catch (Exception ex)
{
if (_logger.IsEnabled(LogLevel.Error))
_logger.LogError(ex, "Unable to release lock {Resource}", Resource);
_logger.LogError(ex, "Unable to release lock ({LockId}) {Resource}", LockId, Resource);
}

if (isTraceLogLevelEnabled)
_logger.LogTrace("Disposed lock {Resource}", Resource);
_logger.LogTrace("Disposed lock ({LockId}) {Resource}", LockId, Resource);
}

public async Task RenewAsync(TimeSpan? lockExtension = null)
{
if (_logger.IsEnabled(LogLevel.Trace))
_logger.LogTrace("Renewing lock {Resource}", Resource);
_logger.LogTrace("Renewing lock ({LockId}) {Resource}", LockId, Resource);

await _lockProvider.RenewAsync(Resource, LockId, lockExtension).AnyContext();
_renewalCount++;

if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("Renewed lock {Resource}", Resource);
_logger.LogDebug("Renewed lock ({LockId}) {Resource}", LockId, Resource);
}

public Task ReleaseAsync()
public async Task ReleaseAsync()
{
if (_isReleased)
return Task.CompletedTask;
return;

lock (_lock)
using (await _lock.LockAsync().AnyContext())
{
if (_isReleased)
return Task.CompletedTask;
return;

_isReleased = true;
_duration.Stop();

if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("Releasing lock {Resource} after {Duration:g}", Resource, _duration.Elapsed);
_logger.LogDebug("Releasing lock ({LockId}) {Resource} after {Duration:g}", LockId, Resource, _duration.Elapsed);

return _lockProvider.ReleaseAsync(Resource, LockId);
await _lockProvider.ReleaseAsync(Resource, LockId);
}
}
}
13 changes: 7 additions & 6 deletions src/Foundatio/Lock/DisposableLockCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Foundatio.AsyncEx;
using Foundatio.Utility;
using Microsoft.Extensions.Logging;

Expand All @@ -14,7 +15,7 @@ internal class DisposableLockCollection : ILock
private readonly ILogger _logger;
private bool _isReleased;
private int _renewalCount;
private readonly object _lock = new();
private readonly AsyncLock _lock = new();
private readonly Stopwatch _duration;

public DisposableLockCollection(IEnumerable<ILock> locks, string lockId, TimeSpan timeWaitedForLock, ILogger logger)
Expand Down Expand Up @@ -50,23 +51,23 @@ public async Task RenewAsync(TimeSpan? lockExtension = null)
_logger.LogDebug("Renewing {LockCount} locks {Resource}", _locks.Count, Resource);
}

public Task ReleaseAsync()
public async Task ReleaseAsync()
{
if (_isReleased)
return Task.CompletedTask;
return;

lock (_lock)
using (await _lock.LockAsync().AnyContext())
{
if (_isReleased)
return Task.CompletedTask;
return;

_isReleased = true;
_duration.Stop();

if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("Releasing {LockCount} locks {Resource} after {Duration:g}", _locks.Count, Resource, _duration.Elapsed);

return Task.WhenAll(_locks.Select(l => l.ReleaseAsync()));
await Task.WhenAll(_locks.Select(l => l.ReleaseAsync()));
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/Foundatio.Tests/Jobs/InMemoryJobQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public override Task CanRunMultipleQueueJobsAsync()
return base.CanRunMultipleQueueJobsAsync();
}

[RetryFact]
[Fact]
public override Task CanRunQueueJobWithLockFailAsync()
{
Log.SetLogLevel<InMemoryCacheClient>(LogLevel.Trace);
Expand Down
Loading

0 comments on commit 9087b73

Please sign in to comment.