Skip to content

Commit

Permalink
Update deps and use MessageOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
ejsmith committed Mar 7, 2022
1 parent 2ca55e3 commit 2ab31f4
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 81 deletions.
4 changes: 2 additions & 2 deletions samples/Foundatio.SampleJob/Foundatio.SampleJob.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Exceptionless.RandomData" Version="1.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.2" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Foundatio.Xunit" Version="10.3.1" />
<PackageReference Include="Foundatio.Xunit" Version="10.3.2-alpha.0.9" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion samples/Foundatio.SampleJobClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class Program
private static TestLoggerFactory _loggerFactory;
private static ILogger _logger;
private static bool _isRunning = true;
private static CancellationTokenSource _continuousEnqueueTokenSource = new CancellationTokenSource();
private static CancellationTokenSource _continuousEnqueueTokenSource = new();

public static void Main(string[] args)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Foundatio.Redis/Cache/RedisCacheClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public sealed class RedisCacheClient : ICacheClient, IHaveSerializer {
private readonly RedisCacheClientOptions _options;
private readonly ILogger _logger;

private readonly AsyncLock _lock = new AsyncLock();
private readonly AsyncLock _lock = new();
private bool _scriptsLoaded;

private LoadedLuaScript _removeByPrefix;
Expand Down
4 changes: 2 additions & 2 deletions src/Foundatio.Redis/Foundatio.Redis.csproj
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\build\common.props" />
<ItemGroup>
<PackageReference Include="Foundatio" Version="10.3.1" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" />
<PackageReference Include="Foundatio" Version="10.3.2-alpha.0.9" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" />
<ProjectReference Include="..\..\..\Foundatio\src\Foundatio\Foundatio.csproj" Condition="'$(ReferenceFoundatioSource)' == 'true'" />
<PackageReference Include="StackExchange.Redis" Version="2.2.88" />
<PackageReference Include="StackExchange.Redis" Version="2.5.43" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="Scripts\*.lua" />
Expand Down
16 changes: 7 additions & 9 deletions src/Foundatio.Redis/Messaging/RedisMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading;using System.Threading.Tasks;
using Foundatio.Extensions;
using Foundatio.Serializer;
using Foundatio.Utility;
Expand All @@ -11,7 +9,7 @@

namespace Foundatio.Messaging {
public class RedisMessageBus : MessageBusBase<RedisMessageBusOptions> {
private readonly AsyncLock _lock = new AsyncLock();
private readonly AsyncLock _lock = new();
private bool _isSubscribed;
private ChannelMessageQueue _channelMessageQueue = null;

Expand Down Expand Up @@ -60,16 +58,16 @@ private async Task OnMessage(ChannelMessage channelMessage) {
await SendMessageToSubscribersAsync(message).AnyContext();
}

protected override async Task PublishImplAsync(string messageType, object message, TimeSpan? delay, CancellationToken cancellationToken) {
protected override async Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken) {
var mappedType = GetMappedMessageType(messageType);
if (delay.HasValue && delay.Value > TimeSpan.Zero) {
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Schedule delayed message: {MessageType} ({Delay}ms)", messageType, delay.Value.TotalMilliseconds);
await AddDelayedMessageAsync(mappedType, message, delay.Value).AnyContext();
if (options.DeliveryDelay.HasValue && options.DeliveryDelay.Value > TimeSpan.Zero) {
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Schedule delayed message: {MessageType} ({Delay}ms)", messageType, options.DeliveryDelay.Value.TotalMilliseconds);
await AddDelayedMessageAsync(mappedType, message, options.DeliveryDelay.Value).AnyContext();
return;
}

if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Message Publish: {MessageType}", messageType);
var bodyData = SerializeMessageBody(messageType, message);
byte[] bodyData = SerializeMessageBody(messageType, message);
byte[] data = _serializer.SerializeToBytes(new RedisMessageEnvelope() {
Type = messageType,
Data = bodyData
Expand Down
13 changes: 6 additions & 7 deletions src/Foundatio.Redis/Queues/RedisQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

namespace Foundatio.Queues {
public class RedisQueue<T> : QueueBase<T, RedisQueueOptions<T>> where T : class {
private readonly AsyncLock _lock = new AsyncLock();
private readonly AsyncAutoResetEvent _autoResetEvent = new AsyncAutoResetEvent();
private readonly AsyncLock _lock = new();
private readonly AsyncAutoResetEvent _autoResetEvent = new();
private readonly ISubscriber _subscriber;
private readonly RedisCacheClient _cache;
private long _enqueuedCount;
Expand Down Expand Up @@ -211,7 +211,7 @@ await Run.WithRetriesAsync(() => Task.WhenAll(
return id;
}

private readonly List<Task> _workers = new List<Task>();
private readonly List<Task> _workers = new();

protected override void StartWorkingImpl(Func<IQueueEntry<T>, CancellationToken, Task> handler, bool autoComplete, CancellationToken cancellationToken) {
if (handler == null)
Expand Down Expand Up @@ -280,10 +280,9 @@ protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken
var sw = Stopwatch.StartNew();

try {
using (var timeoutCancellationTokenSource = new CancellationTokenSource(10000))
using (var dequeueCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(linkedCancellationToken, timeoutCancellationTokenSource.Token)) {
await _autoResetEvent.WaitAsync(dequeueCancellationTokenSource.Token).AnyContext();
}
using var timeoutCancellationTokenSource = new CancellationTokenSource(10000);
using var dequeueCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(linkedCancellationToken, timeoutCancellationTokenSource.Token);
await _autoResetEvent.WaitAsync(dequeueCancellationTokenSource.Token).AnyContext();
} catch (OperationCanceledException) { }

sw.Stop();
Expand Down
33 changes: 16 additions & 17 deletions src/Foundatio.Redis/Storage/RedisFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,22 @@ public async Task<bool> SaveFileAsync(string path, Stream stream, CancellationTo
path = NormalizePath(path);
try {
var database = Database;
using (var memory = new MemoryStream()) {
await stream.CopyToAsync(memory, 0x14000, cancellationToken).AnyContext();
var saveFileTask = database.HashSetAsync(_options.ContainerName, path, memory.ToArray());
long fileSize = memory.Length;
memory.Seek(0, SeekOrigin.Begin);
memory.SetLength(0);
Serializer.Serialize(new FileSpec {
Path = path,
Created = DateTime.UtcNow,
Modified = DateTime.UtcNow,
Size = fileSize
}, memory);
var saveSpecTask = database.HashSetAsync(_fileSpecContainer, path, memory.ToArray());
await Run.WithRetriesAsync(() => Task.WhenAll(saveFileTask, saveSpecTask),
cancellationToken: cancellationToken, logger: _logger).AnyContext();
return true;
}
using var memory = new MemoryStream();
await stream.CopyToAsync(memory, 0x14000, cancellationToken).AnyContext();
var saveFileTask = database.HashSetAsync(_options.ContainerName, path, memory.ToArray());
long fileSize = memory.Length;
memory.Seek(0, SeekOrigin.Begin);
memory.SetLength(0);
Serializer.Serialize(new FileSpec {
Path = path,
Created = DateTime.UtcNow,
Modified = DateTime.UtcNow,
Size = fileSize
}, memory);
var saveSpecTask = database.HashSetAsync(_fileSpecContainer, path, memory.ToArray());
await Run.WithRetriesAsync(() => Task.WhenAll(saveFileTask, saveSpecTask),
cancellationToken: cancellationToken, logger: _logger).AnyContext();
return true;
}
catch (Exception ex) {
_logger.LogError(ex, "Error trying to save file: {Path}", path);
Expand Down
9 changes: 4 additions & 5 deletions src/Foundatio.Redis/Utility/EmbeddedResourceLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ internal static class EmbeddedResourceLoader {
internal static string GetEmbeddedResource(string name) {
var assembly = typeof(EmbeddedResourceLoader).GetTypeInfo().Assembly;

using (var stream = assembly.GetManifestResourceStream(name))
using (var streamReader = new StreamReader(stream)) {
return streamReader.ReadToEnd();
}
}
using var stream = assembly.GetManifestResourceStream(name);
using var streamReader = new StreamReader(stream);
return streamReader.ReadToEnd();
}
}
}
4 changes: 2 additions & 2 deletions tests/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
<NoWarn>$(NoWarn);CS1591;NU1701</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3" />
<PackageReference Include="Foundatio.TestHarness" Version="10.3.1" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" />
<PackageReference Include="Foundatio.TestHarness" Version="10.3.2-alpha.0.9" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" />

<ProjectReference Include="..\..\..\Foundatio\src\Foundatio.TestHarness\Foundatio.TestHarness.csproj" Condition="'$(ReferenceFoundatioSource)' == 'true'" />
</ItemGroup>
Expand Down
42 changes: 20 additions & 22 deletions tests/Foundatio.Redis.Tests/Metrics/RedisMetricsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,37 +51,35 @@ public override Task CanSendBufferedMetricsAsync() {

[Fact]
public async Task SendGaugesAsync() {
using (var metrics = GetMetricsClient()) {
if (!(metrics is IMetricsClientStats stats))
return;
using var metrics = GetMetricsClient();
if (!(metrics is IMetricsClientStats stats))
return;

int max = 1000;
for (int index = 0; index <= max; index++) {
metrics.Gauge("mygauge", index);
metrics.Timer("mygauge", index);
}

Assert.Equal(max, (await stats.GetGaugeStatsAsync("mygauge")).Last);
int max = 1000;
for (int index = 0; index <= max; index++) {
metrics.Gauge("mygauge", index);
metrics.Timer("mygauge", index);
}

Assert.Equal(max, (await stats.GetGaugeStatsAsync("mygauge")).Last);
}

[Fact]
public async Task SendGaugesBufferedAsync() {
using (var metrics = GetMetricsClient(true)) {
if (!(metrics is IMetricsClientStats stats))
return;
using var metrics = GetMetricsClient(true);
if (!(metrics is IMetricsClientStats stats))
return;

int max = 1000;
for (int index = 0; index <= max; index++) {
metrics.Gauge("mygauge", index);
metrics.Timer("mygauge", index);
}
int max = 1000;
for (int index = 0; index <= max; index++) {
metrics.Gauge("mygauge", index);
metrics.Timer("mygauge", index);
}

if (metrics is IBufferedMetricsClient bufferedMetrics)
await bufferedMetrics.FlushAsync();
if (metrics is IBufferedMetricsClient bufferedMetrics)
await bufferedMetrics.FlushAsync();

Assert.Equal(max, (await stats.GetGaugeStatsAsync("mygauge")).Last);
}
Assert.Equal(max, (await stats.GetGaugeStatsAsync("mygauge")).Last);
}

[Fact]
Expand Down
20 changes: 8 additions & 12 deletions tests/Foundatio.Redis.Tests/Queues/RedisQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,23 +156,19 @@ public override Task CanUseQueueOptionsAsync() {
[RetryFact]
public override async Task CanDequeueWithLockingAsync() {
var muxer = SharedConnection.GetMuxer();
using (var cache = new RedisCacheClient(new RedisCacheClientOptions { ConnectionMultiplexer = muxer, LoggerFactory = Log })) {
using (var messageBus = new RedisMessageBus(new RedisMessageBusOptions { Subscriber = muxer.GetSubscriber(), Topic = "test-queue", LoggerFactory = Log })) {
var distributedLock = new CacheLockProvider(cache, messageBus, Log);
await CanDequeueWithLockingImpAsync(distributedLock);
}
}
using var cache = new RedisCacheClient(new RedisCacheClientOptions { ConnectionMultiplexer = muxer, LoggerFactory = Log });
using var messageBus = new RedisMessageBus(new RedisMessageBusOptions { Subscriber = muxer.GetSubscriber(), Topic = "test-queue", LoggerFactory = Log });
var distributedLock = new CacheLockProvider(cache, messageBus, Log);
await CanDequeueWithLockingImpAsync(distributedLock);
}

[Fact]
public override async Task CanHaveMultipleQueueInstancesWithLockingAsync() {
var muxer = SharedConnection.GetMuxer();
using (var cache = new RedisCacheClient(new RedisCacheClientOptions { ConnectionMultiplexer = muxer, LoggerFactory = Log })) {
using (var messageBus = new RedisMessageBus(new RedisMessageBusOptions { Subscriber = muxer.GetSubscriber(), Topic = "test-queue", LoggerFactory = Log })) {
var distributedLock = new CacheLockProvider(cache, messageBus, Log);
await CanHaveMultipleQueueInstancesWithLockingImplAsync(distributedLock);
}
}
using var cache = new RedisCacheClient(new RedisCacheClientOptions { ConnectionMultiplexer = muxer, LoggerFactory = Log });
using var messageBus = new RedisMessageBus(new RedisMessageBusOptions { Subscriber = muxer.GetSubscriber(), Topic = "test-queue", LoggerFactory = Log });
var distributedLock = new CacheLockProvider(cache, messageBus, Log);
await CanHaveMultipleQueueInstancesWithLockingImplAsync(distributedLock);
}

[Fact]
Expand Down

0 comments on commit 2ab31f4

Please sign in to comment.