From 2ab31f44d801d9f21199f1ec60f710b6a06a5627 Mon Sep 17 00:00:00 2001 From: "Eric J. Smith" Date: Mon, 7 Mar 2022 17:12:20 -0600 Subject: [PATCH] Update deps and use MessageOptions --- .../Foundatio.SampleJob.csproj | 4 +- .../Foundatio.SampleJobClient.csproj | 2 +- samples/Foundatio.SampleJobClient/Program.cs | 2 +- src/Foundatio.Redis/Cache/RedisCacheClient.cs | 2 +- src/Foundatio.Redis/Foundatio.Redis.csproj | 4 +- .../Messaging/RedisMessageBus.cs | 16 ++++--- src/Foundatio.Redis/Queues/RedisQueue.cs | 13 +++--- .../Storage/RedisFileStorage.cs | 33 +++++++-------- .../Utility/EmbeddedResourceLoader.cs | 9 ++-- tests/Directory.Build.props | 4 +- .../Metrics/RedisMetricsTests.cs | 42 +++++++++---------- .../Queues/RedisQueueTests.cs | 20 ++++----- 12 files changed, 70 insertions(+), 81 deletions(-) diff --git a/samples/Foundatio.SampleJob/Foundatio.SampleJob.csproj b/samples/Foundatio.SampleJob/Foundatio.SampleJob.csproj index cbef060..c7a64dd 100644 --- a/samples/Foundatio.SampleJob/Foundatio.SampleJob.csproj +++ b/samples/Foundatio.SampleJob/Foundatio.SampleJob.csproj @@ -10,7 +10,7 @@ - - + + \ No newline at end of file diff --git a/samples/Foundatio.SampleJobClient/Foundatio.SampleJobClient.csproj b/samples/Foundatio.SampleJobClient/Foundatio.SampleJobClient.csproj index 18f56f9..604d66f 100644 --- a/samples/Foundatio.SampleJobClient/Foundatio.SampleJobClient.csproj +++ b/samples/Foundatio.SampleJobClient/Foundatio.SampleJobClient.csproj @@ -8,7 +8,7 @@ - + diff --git a/samples/Foundatio.SampleJobClient/Program.cs b/samples/Foundatio.SampleJobClient/Program.cs index f98d369..45b7df2 100644 --- a/samples/Foundatio.SampleJobClient/Program.cs +++ b/samples/Foundatio.SampleJobClient/Program.cs @@ -18,7 +18,7 @@ public class Program private static TestLoggerFactory _loggerFactory; private static ILogger _logger; private static bool _isRunning = true; - private static CancellationTokenSource _continuousEnqueueTokenSource = new CancellationTokenSource(); + private static CancellationTokenSource _continuousEnqueueTokenSource = new(); public static void Main(string[] args) { diff --git a/src/Foundatio.Redis/Cache/RedisCacheClient.cs b/src/Foundatio.Redis/Cache/RedisCacheClient.cs index 5b5b84c..58ec45e 100644 --- a/src/Foundatio.Redis/Cache/RedisCacheClient.cs +++ b/src/Foundatio.Redis/Cache/RedisCacheClient.cs @@ -16,7 +16,7 @@ public sealed class RedisCacheClient : ICacheClient, IHaveSerializer { private readonly RedisCacheClientOptions _options; private readonly ILogger _logger; - private readonly AsyncLock _lock = new AsyncLock(); + private readonly AsyncLock _lock = new(); private bool _scriptsLoaded; private LoadedLuaScript _removeByPrefix; diff --git a/src/Foundatio.Redis/Foundatio.Redis.csproj b/src/Foundatio.Redis/Foundatio.Redis.csproj index ac433d4..949ffa3 100644 --- a/src/Foundatio.Redis/Foundatio.Redis.csproj +++ b/src/Foundatio.Redis/Foundatio.Redis.csproj @@ -1,9 +1,9 @@  - + - + diff --git a/src/Foundatio.Redis/Messaging/RedisMessageBus.cs b/src/Foundatio.Redis/Messaging/RedisMessageBus.cs index bab9501..7f98d74 100644 --- a/src/Foundatio.Redis/Messaging/RedisMessageBus.cs +++ b/src/Foundatio.Redis/Messaging/RedisMessageBus.cs @@ -1,7 +1,5 @@ using System; -using System.Threading; -using System.Threading.Channels; -using System.Threading.Tasks; +using System.Threading;using System.Threading.Tasks; using Foundatio.Extensions; using Foundatio.Serializer; using Foundatio.Utility; @@ -11,7 +9,7 @@ namespace Foundatio.Messaging { public class RedisMessageBus : MessageBusBase { - private readonly AsyncLock _lock = new AsyncLock(); + private readonly AsyncLock _lock = new(); private bool _isSubscribed; private ChannelMessageQueue _channelMessageQueue = null; @@ -60,16 +58,16 @@ private async Task OnMessage(ChannelMessage channelMessage) { await SendMessageToSubscribersAsync(message).AnyContext(); } - protected override async Task PublishImplAsync(string messageType, object message, TimeSpan? delay, CancellationToken cancellationToken) { + protected override async Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken) { var mappedType = GetMappedMessageType(messageType); - if (delay.HasValue && delay.Value > TimeSpan.Zero) { - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Schedule delayed message: {MessageType} ({Delay}ms)", messageType, delay.Value.TotalMilliseconds); - await AddDelayedMessageAsync(mappedType, message, delay.Value).AnyContext(); + if (options.DeliveryDelay.HasValue && options.DeliveryDelay.Value > TimeSpan.Zero) { + if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Schedule delayed message: {MessageType} ({Delay}ms)", messageType, options.DeliveryDelay.Value.TotalMilliseconds); + await AddDelayedMessageAsync(mappedType, message, options.DeliveryDelay.Value).AnyContext(); return; } if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Message Publish: {MessageType}", messageType); - var bodyData = SerializeMessageBody(messageType, message); + byte[] bodyData = SerializeMessageBody(messageType, message); byte[] data = _serializer.SerializeToBytes(new RedisMessageEnvelope() { Type = messageType, Data = bodyData diff --git a/src/Foundatio.Redis/Queues/RedisQueue.cs b/src/Foundatio.Redis/Queues/RedisQueue.cs index 969d467..e235379 100644 --- a/src/Foundatio.Redis/Queues/RedisQueue.cs +++ b/src/Foundatio.Redis/Queues/RedisQueue.cs @@ -17,8 +17,8 @@ namespace Foundatio.Queues { public class RedisQueue : QueueBase> where T : class { - private readonly AsyncLock _lock = new AsyncLock(); - private readonly AsyncAutoResetEvent _autoResetEvent = new AsyncAutoResetEvent(); + private readonly AsyncLock _lock = new(); + private readonly AsyncAutoResetEvent _autoResetEvent = new(); private readonly ISubscriber _subscriber; private readonly RedisCacheClient _cache; private long _enqueuedCount; @@ -211,7 +211,7 @@ await Run.WithRetriesAsync(() => Task.WhenAll( return id; } - private readonly List _workers = new List(); + private readonly List _workers = new(); protected override void StartWorkingImpl(Func, CancellationToken, Task> handler, bool autoComplete, CancellationToken cancellationToken) { if (handler == null) @@ -280,10 +280,9 @@ protected override async Task> DequeueImplAsync(CancellationToken var sw = Stopwatch.StartNew(); try { - using (var timeoutCancellationTokenSource = new CancellationTokenSource(10000)) - using (var dequeueCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(linkedCancellationToken, timeoutCancellationTokenSource.Token)) { - await _autoResetEvent.WaitAsync(dequeueCancellationTokenSource.Token).AnyContext(); - } + using var timeoutCancellationTokenSource = new CancellationTokenSource(10000); + using var dequeueCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(linkedCancellationToken, timeoutCancellationTokenSource.Token); + await _autoResetEvent.WaitAsync(dequeueCancellationTokenSource.Token).AnyContext(); } catch (OperationCanceledException) { } sw.Stop(); diff --git a/src/Foundatio.Redis/Storage/RedisFileStorage.cs b/src/Foundatio.Redis/Storage/RedisFileStorage.cs index 790bebe..22fd88c 100644 --- a/src/Foundatio.Redis/Storage/RedisFileStorage.cs +++ b/src/Foundatio.Redis/Storage/RedisFileStorage.cs @@ -60,23 +60,22 @@ public async Task SaveFileAsync(string path, Stream stream, CancellationTo path = NormalizePath(path); try { var database = Database; - using (var memory = new MemoryStream()) { - await stream.CopyToAsync(memory, 0x14000, cancellationToken).AnyContext(); - var saveFileTask = database.HashSetAsync(_options.ContainerName, path, memory.ToArray()); - long fileSize = memory.Length; - memory.Seek(0, SeekOrigin.Begin); - memory.SetLength(0); - Serializer.Serialize(new FileSpec { - Path = path, - Created = DateTime.UtcNow, - Modified = DateTime.UtcNow, - Size = fileSize - }, memory); - var saveSpecTask = database.HashSetAsync(_fileSpecContainer, path, memory.ToArray()); - await Run.WithRetriesAsync(() => Task.WhenAll(saveFileTask, saveSpecTask), - cancellationToken: cancellationToken, logger: _logger).AnyContext(); - return true; - } + using var memory = new MemoryStream(); + await stream.CopyToAsync(memory, 0x14000, cancellationToken).AnyContext(); + var saveFileTask = database.HashSetAsync(_options.ContainerName, path, memory.ToArray()); + long fileSize = memory.Length; + memory.Seek(0, SeekOrigin.Begin); + memory.SetLength(0); + Serializer.Serialize(new FileSpec { + Path = path, + Created = DateTime.UtcNow, + Modified = DateTime.UtcNow, + Size = fileSize + }, memory); + var saveSpecTask = database.HashSetAsync(_fileSpecContainer, path, memory.ToArray()); + await Run.WithRetriesAsync(() => Task.WhenAll(saveFileTask, saveSpecTask), + cancellationToken: cancellationToken, logger: _logger).AnyContext(); + return true; } catch (Exception ex) { _logger.LogError(ex, "Error trying to save file: {Path}", path); diff --git a/src/Foundatio.Redis/Utility/EmbeddedResourceLoader.cs b/src/Foundatio.Redis/Utility/EmbeddedResourceLoader.cs index 9804534..d1af7bc 100644 --- a/src/Foundatio.Redis/Utility/EmbeddedResourceLoader.cs +++ b/src/Foundatio.Redis/Utility/EmbeddedResourceLoader.cs @@ -6,10 +6,9 @@ internal static class EmbeddedResourceLoader { internal static string GetEmbeddedResource(string name) { var assembly = typeof(EmbeddedResourceLoader).GetTypeInfo().Assembly; - using (var stream = assembly.GetManifestResourceStream(name)) - using (var streamReader = new StreamReader(stream)) { - return streamReader.ReadToEnd(); - } - } + using var stream = assembly.GetManifestResourceStream(name); + using var streamReader = new StreamReader(stream); + return streamReader.ReadToEnd(); + } } } \ No newline at end of file diff --git a/tests/Directory.Build.props b/tests/Directory.Build.props index 6747ea8..34e1159 100644 --- a/tests/Directory.Build.props +++ b/tests/Directory.Build.props @@ -6,10 +6,10 @@ $(NoWarn);CS1591;NU1701 - + - + diff --git a/tests/Foundatio.Redis.Tests/Metrics/RedisMetricsTests.cs b/tests/Foundatio.Redis.Tests/Metrics/RedisMetricsTests.cs index 326be37..aea8d30 100644 --- a/tests/Foundatio.Redis.Tests/Metrics/RedisMetricsTests.cs +++ b/tests/Foundatio.Redis.Tests/Metrics/RedisMetricsTests.cs @@ -51,37 +51,35 @@ public override Task CanSendBufferedMetricsAsync() { [Fact] public async Task SendGaugesAsync() { - using (var metrics = GetMetricsClient()) { - if (!(metrics is IMetricsClientStats stats)) - return; + using var metrics = GetMetricsClient(); + if (!(metrics is IMetricsClientStats stats)) + return; - int max = 1000; - for (int index = 0; index <= max; index++) { - metrics.Gauge("mygauge", index); - metrics.Timer("mygauge", index); - } - - Assert.Equal(max, (await stats.GetGaugeStatsAsync("mygauge")).Last); + int max = 1000; + for (int index = 0; index <= max; index++) { + metrics.Gauge("mygauge", index); + metrics.Timer("mygauge", index); } + + Assert.Equal(max, (await stats.GetGaugeStatsAsync("mygauge")).Last); } [Fact] public async Task SendGaugesBufferedAsync() { - using (var metrics = GetMetricsClient(true)) { - if (!(metrics is IMetricsClientStats stats)) - return; + using var metrics = GetMetricsClient(true); + if (!(metrics is IMetricsClientStats stats)) + return; - int max = 1000; - for (int index = 0; index <= max; index++) { - metrics.Gauge("mygauge", index); - metrics.Timer("mygauge", index); - } + int max = 1000; + for (int index = 0; index <= max; index++) { + metrics.Gauge("mygauge", index); + metrics.Timer("mygauge", index); + } - if (metrics is IBufferedMetricsClient bufferedMetrics) - await bufferedMetrics.FlushAsync(); + if (metrics is IBufferedMetricsClient bufferedMetrics) + await bufferedMetrics.FlushAsync(); - Assert.Equal(max, (await stats.GetGaugeStatsAsync("mygauge")).Last); - } + Assert.Equal(max, (await stats.GetGaugeStatsAsync("mygauge")).Last); } [Fact] diff --git a/tests/Foundatio.Redis.Tests/Queues/RedisQueueTests.cs b/tests/Foundatio.Redis.Tests/Queues/RedisQueueTests.cs index 833e0eb..7cf552c 100644 --- a/tests/Foundatio.Redis.Tests/Queues/RedisQueueTests.cs +++ b/tests/Foundatio.Redis.Tests/Queues/RedisQueueTests.cs @@ -156,23 +156,19 @@ public override Task CanUseQueueOptionsAsync() { [RetryFact] public override async Task CanDequeueWithLockingAsync() { var muxer = SharedConnection.GetMuxer(); - using (var cache = new RedisCacheClient(new RedisCacheClientOptions { ConnectionMultiplexer = muxer, LoggerFactory = Log })) { - using (var messageBus = new RedisMessageBus(new RedisMessageBusOptions { Subscriber = muxer.GetSubscriber(), Topic = "test-queue", LoggerFactory = Log })) { - var distributedLock = new CacheLockProvider(cache, messageBus, Log); - await CanDequeueWithLockingImpAsync(distributedLock); - } - } + using var cache = new RedisCacheClient(new RedisCacheClientOptions { ConnectionMultiplexer = muxer, LoggerFactory = Log }); + using var messageBus = new RedisMessageBus(new RedisMessageBusOptions { Subscriber = muxer.GetSubscriber(), Topic = "test-queue", LoggerFactory = Log }); + var distributedLock = new CacheLockProvider(cache, messageBus, Log); + await CanDequeueWithLockingImpAsync(distributedLock); } [Fact] public override async Task CanHaveMultipleQueueInstancesWithLockingAsync() { var muxer = SharedConnection.GetMuxer(); - using (var cache = new RedisCacheClient(new RedisCacheClientOptions { ConnectionMultiplexer = muxer, LoggerFactory = Log })) { - using (var messageBus = new RedisMessageBus(new RedisMessageBusOptions { Subscriber = muxer.GetSubscriber(), Topic = "test-queue", LoggerFactory = Log })) { - var distributedLock = new CacheLockProvider(cache, messageBus, Log); - await CanHaveMultipleQueueInstancesWithLockingImplAsync(distributedLock); - } - } + using var cache = new RedisCacheClient(new RedisCacheClientOptions { ConnectionMultiplexer = muxer, LoggerFactory = Log }); + using var messageBus = new RedisMessageBus(new RedisMessageBusOptions { Subscriber = muxer.GetSubscriber(), Topic = "test-queue", LoggerFactory = Log }); + var distributedLock = new CacheLockProvider(cache, messageBus, Log); + await CanHaveMultipleQueueInstancesWithLockingImplAsync(distributedLock); } [Fact]