From 91372ab859f7110f57cda5c2bfc6237538cda85d Mon Sep 17 00:00:00 2001 From: "Eric J. Smith" Date: Tue, 12 Jul 2022 22:28:38 -0500 Subject: [PATCH 1/3] Working on message bus subscriptions options --- .../Queue/QueueTestBase.cs | 7 ++- src/Foundatio/Messaging/IMessageSubscriber.cs | 62 +++++++++++++++---- src/Foundatio/Messaging/Message.cs | 33 +++++++++- src/Foundatio/Messaging/MessageBusBase.cs | 22 ++++--- src/Foundatio/Messaging/NullMessageBus.cs | 4 ++ .../Messaging/SharedMessageBusOptions.cs | 26 +++++++- src/Foundatio/Queues/IQueueEntry.cs | 3 +- 7 files changed, 129 insertions(+), 28 deletions(-) diff --git a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs index 3afebc009..0db1b2bbc 100644 --- a/src/Foundatio.TestHarness/Queue/QueueTestBase.cs +++ b/src/Foundatio.TestHarness/Queue/QueueTestBase.cs @@ -553,6 +553,7 @@ public virtual async Task WillWaitForItemAsync() { return; try { + Log.MinimumLevel = LogLevel.Trace; await queue.DeleteQueueAsync(); await AssertEmptyQueueAsync(queue); @@ -564,14 +565,16 @@ public virtual async Task WillWaitForItemAsync() { Assert.InRange(sw.Elapsed, TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(5000)); _ = Task.Run(async () => { - await SystemClock.SleepAsync(500); + await Task.Delay(500); + _logger.LogInformation("Enqueuing async message"); await queue.EnqueueAsync(new SimpleWorkItem { Data = "Hello" }); }); sw.Restart(); - workItem = await queue.DequeueAsync(TimeSpan.FromSeconds(1)); + _logger.LogInformation("Dequeuing message with timeout"); + workItem = await queue.DequeueAsync(TimeSpan.FromSeconds(10)); sw.Stop(); if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Time {Elapsed:g}", sw.Elapsed); Assert.True(sw.Elapsed > TimeSpan.FromMilliseconds(400)); diff --git a/src/Foundatio/Messaging/IMessageSubscriber.cs b/src/Foundatio/Messaging/IMessageSubscriber.cs index e7517d958..f3a5b5eee 100644 --- a/src/Foundatio/Messaging/IMessageSubscriber.cs +++ b/src/Foundatio/Messaging/IMessageSubscriber.cs @@ -1,37 +1,77 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace Foundatio.Messaging { public interface IMessageSubscriber { - Task SubscribeAsync(Func handler, CancellationToken cancellationToken = default) where T : class; + Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null) where T : class; } public static class MessageBusExtensions { - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, CancellationToken cancellationToken = default) where T : class { - return subscriber.SubscribeAsync((msg, token) => handler(msg), cancellationToken); + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, MessageSubscriptionOptions options = null) where T : class { + return subscriber.SubscribeAsync((msg, token) => handler(msg), options); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, CancellationToken cancellationToken = default) where T : class { + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, MessageSubscriptionOptions options = null) where T : class { return subscriber.SubscribeAsync((msg, token) => { handler(msg); return Task.CompletedTask; - }, cancellationToken); + }, options); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, CancellationToken cancellationToken = default) { - return subscriber.SubscribeAsync((msg, token) => handler(msg, token), cancellationToken); + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, MessageSubscriptionOptions options = null) { + return subscriber.SubscribeAsync((msg, token) => handler(msg, token), options); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, CancellationToken cancellationToken = default) { - return subscriber.SubscribeAsync((msg, token) => handler(msg), cancellationToken); + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, MessageSubscriptionOptions options = null) { + return subscriber.SubscribeAsync((msg, token) => handler(msg), options); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, CancellationToken cancellationToken = default) { + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, MessageSubscriptionOptions options = null) { return subscriber.SubscribeAsync((msg, token) => { handler(msg); return Task.CompletedTask; - }, cancellationToken); + }, options); } } + + public interface IMessageSubscription : IAsyncDisposable { + string SubscriptionId { get; } + IDictionary Properties { get; } + } + + public class MessageSubscription : IMessageSubscription { + private readonly Func _disposeSubscriptionFunc; + + public MessageSubscription(string subscriptionId, Func disposeSubscriptionFunc) { + SubscriptionId = subscriptionId; + _disposeSubscriptionFunc = disposeSubscriptionFunc; + } + + public string SubscriptionId { get; } + public IDictionary Properties { get; set; } = new Dictionary(); + + public ValueTask DisposeAsync() { + return _disposeSubscriptionFunc?.Invoke() ?? new ValueTask(); + } + } + + public class MessageSubscriptionOptions { + /// + /// The topic name + /// + public string Topic { get; set; } + + /// + /// Resolves a message to a .NET type. + /// + public Func MessageTypeResolver { get; set; } + + public CancellationToken CancellationToken { get; set; } + + public IDictionary Properties { get; set; } = new Dictionary(); + + public static implicit operator MessageSubscriptionOptions(CancellationToken cancellationToken) => new() { CancellationToken = cancellationToken }; + } } diff --git a/src/Foundatio/Messaging/Message.cs b/src/Foundatio/Messaging/Message.cs index 0b41b4590..1a843db81 100644 --- a/src/Foundatio/Messaging/Message.cs +++ b/src/Foundatio/Messaging/Message.cs @@ -1,10 +1,10 @@ using System; using System.Collections.Generic; using System.Diagnostics; -using Foundatio.Serializer; +using System.Threading.Tasks; namespace Foundatio.Messaging { - public interface IMessage { + public interface IMessage : IAsyncDisposable { string UniqueId { get; } string CorrelationId { get; } string Type { get; } @@ -12,6 +12,10 @@ public interface IMessage { byte[] Data { get; } object GetBody(); IDictionary Properties { get; } + + Task RenewLockAsync(); + Task AbandonAsync(); + Task CompleteAsync(); } public interface IMessage : IMessage where T: class { @@ -33,7 +37,24 @@ public Message(byte[] data, Func getBody) { public Type ClrType { get; set; } public IDictionary Properties { get; set; } = new Dictionary(); public byte[] Data { get; set; } + public object GetBody() => _getBody(this); + + public Task AbandonAsync() { + throw new NotImplementedException(); + } + + public Task CompleteAsync() { + throw new NotImplementedException(); + } + + public Task RenewLockAsync() { + throw new NotImplementedException(); + } + + public ValueTask DisposeAsync() { + throw new NotImplementedException(); + } } public class Message : IMessage where T : class { @@ -58,5 +79,13 @@ public Message(IMessage message) { public IDictionary Properties => _message.Properties; public object GetBody() => _message.GetBody(); + + public Task AbandonAsync() => _message.AbandonAsync(); + + public Task CompleteAsync() => _message.CompleteAsync(); + + public Task RenewLockAsync() => _message.RenewLockAsync(); + + public ValueTask DisposeAsync() => _message.DisposeAsync(); } } \ No newline at end of file diff --git a/src/Foundatio/Messaging/MessageBusBase.cs b/src/Foundatio/Messaging/MessageBusBase.cs index 56a17d3a1..ff0a96b91 100644 --- a/src/Foundatio/Messaging/MessageBusBase.cs +++ b/src/Foundatio/Messaging/MessageBusBase.cs @@ -90,9 +90,9 @@ protected virtual Type GetMappedMessageType(string messageType) { protected virtual Task RemoveTopicSubscriptionAsync() => Task.CompletedTask; protected virtual Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) => Task.CompletedTask; - protected virtual Task SubscribeImplAsync(Func handler, CancellationToken cancellationToken) where T : class { + protected virtual Task SubscribeImplAsync(Func handler, MessageSubscriptionOptions options) where T : class { var subscriber = new Subscriber { - CancellationToken = cancellationToken, + CancellationToken = options.CancellationToken, Type = typeof(T), Action = (message, token) => { if (message is not T) { @@ -101,12 +101,12 @@ protected virtual Task SubscribeImplAsync(Func ha return Task.CompletedTask; } - return handler((T)message, cancellationToken); + return handler((T)message, options.CancellationToken); } }; - if (cancellationToken != CancellationToken.None) { - cancellationToken.Register(() => { + if (options.CancellationToken != CancellationToken.None) { + options.CancellationToken.Register(() => { _subscribers.TryRemove(subscriber.Id, out _); if (_subscribers.Count == 0) RemoveTopicSubscriptionAsync().GetAwaiter().GetResult(); @@ -121,15 +121,19 @@ protected virtual Task SubscribeImplAsync(Func ha if (!_subscribers.TryAdd(subscriber.Id, subscriber) && _logger.IsEnabled(LogLevel.Error)) _logger.LogError("Unable to add subscriber {SubscriberId}", subscriber.Id); - return Task.CompletedTask; + return Task.FromResult(new MessageSubscription(Guid.NewGuid().ToString("N"), () => new ValueTask())); } - public async Task SubscribeAsync(Func handler, CancellationToken cancellationToken = default) where T : class { + public async Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null) where T : class { if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Adding subscriber for {MessageType}.", typeof(T).FullName); - await SubscribeImplAsync(handler, cancellationToken).AnyContext(); - await EnsureTopicSubscriptionAsync(cancellationToken).AnyContext(); + options ??= new MessageSubscriptionOptions(); + + var sub = await SubscribeImplAsync(handler, options).AnyContext(); + await EnsureTopicSubscriptionAsync(options.CancellationToken).AnyContext(); + + return sub; } protected List GetMessageSubscribers(IMessage message) { diff --git a/src/Foundatio/Messaging/NullMessageBus.cs b/src/Foundatio/Messaging/NullMessageBus.cs index a7bce4390..017093a59 100644 --- a/src/Foundatio/Messaging/NullMessageBus.cs +++ b/src/Foundatio/Messaging/NullMessageBus.cs @@ -15,5 +15,9 @@ public Task SubscribeAsync(Func handler, Cancella } public void Dispose() {} + + public Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null) where T : class { + return Task.FromResult(new MessageSubscription(Guid.NewGuid().ToString(), () => new ValueTask())); + } } } diff --git a/src/Foundatio/Messaging/SharedMessageBusOptions.cs b/src/Foundatio/Messaging/SharedMessageBusOptions.cs index b7146a1a6..a94a8fd33 100644 --- a/src/Foundatio/Messaging/SharedMessageBusOptions.cs +++ b/src/Foundatio/Messaging/SharedMessageBusOptions.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; namespace Foundatio.Messaging { @@ -9,11 +9,26 @@ public class SharedMessageBusOptions : SharedOptions { public string Topic { get; set; } = "messages"; /// - /// Controls which types messages are mapped to. + /// Resolves a message to a .NET type. + /// + public Func MessageTypeResolver { get; set; } + + /// + /// Statically configured message type mappings. will be run first and then this dictionary will be checked. /// public Dictionary MessageTypeMappings { get; set; } = new Dictionary(); } + public interface IConsumeMessageContext { + string MessageType { get; set; } + IDictionary Properties { get; } + } + + public class ConsumeMessageContext : IConsumeMessageContext { + public string MessageType { get; set; } + public IDictionary Properties { get; set; } = new Dictionary(); + } + public class SharedMessageBusOptionsBuilder : SharedOptionsBuilder where TOptions : SharedMessageBusOptions, new() where TBuilder : SharedMessageBusOptionsBuilder { @@ -24,6 +39,13 @@ public TBuilder Topic(string topic) { return (TBuilder)this; } + public TBuilder MessageTypeResolver(Func resolver) { + if (resolver == null) + throw new ArgumentNullException(nameof(resolver)); + Target.MessageTypeResolver = resolver; + return (TBuilder)this; + } + public TBuilder MapMessageType(string name) { if (Target.MessageTypeMappings == null) Target.MessageTypeMappings = new Dictionary(); diff --git a/src/Foundatio/Queues/IQueueEntry.cs b/src/Foundatio/Queues/IQueueEntry.cs index a159cbe08..a3b57ab84 100644 --- a/src/Foundatio/Queues/IQueueEntry.cs +++ b/src/Foundatio/Queues/IQueueEntry.cs @@ -4,7 +4,7 @@ using Foundatio.Utility; namespace Foundatio.Queues { - public interface IQueueEntry { + public interface IQueueEntry : IAsyncDisposable { string Id { get; } string CorrelationId { get; } IDictionary Properties { get; } @@ -18,7 +18,6 @@ public interface IQueueEntry { Task RenewLockAsync(); Task AbandonAsync(); Task CompleteAsync(); - ValueTask DisposeAsync(); } public interface IQueueEntry : IQueueEntry where T : class { From e010dda97d73518a7a289b7b12a200af7c374a5b Mon Sep 17 00:00:00 2001 From: "Eric J. Smith" Date: Tue, 12 Jul 2022 23:03:35 -0500 Subject: [PATCH 2/3] Add message type resolver --- src/Foundatio/Messaging/MessageBusBase.cs | 28 +++++++++++-------- .../Messaging/SharedMessageBusOptions.cs | 11 ++++++-- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/Foundatio/Messaging/MessageBusBase.cs b/src/Foundatio/Messaging/MessageBusBase.cs index ff0a96b91..dbc85d617 100644 --- a/src/Foundatio/Messaging/MessageBusBase.cs +++ b/src/Foundatio/Messaging/MessageBusBase.cs @@ -59,11 +59,11 @@ protected string GetMappedMessageType(Type messageType) { } private readonly ConcurrentDictionary _knownMessageTypesCache = new(); - protected virtual Type GetMappedMessageType(string messageType) { - if (String.IsNullOrEmpty(messageType)) + protected virtual Type GetMappedMessageType(IConsumeMessageContext context) { + if (context == null || String.IsNullOrEmpty(context.MessageType)) return null; - return _knownMessageTypesCache.GetOrAdd(messageType, type => { + return _knownMessageTypesCache.GetOrAdd(context.MessageType, type => { if (_options.MessageTypeMappings != null && _options.MessageTypeMappings.ContainsKey(type)) return _options.MessageTypeMappings[type]; @@ -86,6 +86,9 @@ protected virtual Type GetMappedMessageType(string messageType) { } }); } + protected Type GetMappedMessageType(string messageType) { + return GetMappedMessageType(new ConsumeMessageContext { MessageType = messageType }); + } protected virtual Task RemoveTopicSubscriptionAsync() => Task.CompletedTask; protected virtual Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) => Task.CompletedTask; @@ -105,14 +108,6 @@ protected virtual Task SubscribeImplAsync(Func { - _subscribers.TryRemove(subscriber.Id, out _); - if (_subscribers.Count == 0) - RemoveTopicSubscriptionAsync().GetAwaiter().GetResult(); - }); - } - if (subscriber.Type.Name == "IMessage`1" && subscriber.Type.GenericTypeArguments.Length == 1) { var modelType = subscriber.Type.GenericTypeArguments.Single(); subscriber.GenericType = typeof(Message<>).MakeGenericType(modelType); @@ -121,7 +116,13 @@ protected virtual Task SubscribeImplAsync(Func(new MessageSubscription(Guid.NewGuid().ToString("N"), () => new ValueTask())); + return Task.FromResult(new MessageSubscription(Guid.NewGuid().ToString("N"), () => { + _subscribers.TryRemove(subscriber.Id, out _); + if (_subscribers.Count == 0) + RemoveTopicSubscriptionAsync().GetAwaiter().GetResult(); + + return new ValueTask(); + })); } public async Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null) where T : class { @@ -133,6 +134,9 @@ public async Task SubscribeAsync(Func sub.DisposeAsync()); + return sub; } diff --git a/src/Foundatio/Messaging/SharedMessageBusOptions.cs b/src/Foundatio/Messaging/SharedMessageBusOptions.cs index a94a8fd33..149698742 100644 --- a/src/Foundatio/Messaging/SharedMessageBusOptions.cs +++ b/src/Foundatio/Messaging/SharedMessageBusOptions.cs @@ -9,9 +9,9 @@ public class SharedMessageBusOptions : SharedOptions { public string Topic { get; set; } = "messages"; /// - /// Resolves a message to a .NET type. + /// Resolves message types /// - public Func MessageTypeResolver { get; set; } + public IMessageTypeResolver MessageTypeResolver { get; set; } /// /// Statically configured message type mappings. will be run first and then this dictionary will be checked. @@ -19,6 +19,11 @@ public class SharedMessageBusOptions : SharedOptions { public Dictionary MessageTypeMappings { get; set; } = new Dictionary(); } + public interface IMessageTypeResolver { + IConsumeMessageContext ToMessageType(Type messageType); + Type ToClrType(IConsumeMessageContext context); + } + public interface IConsumeMessageContext { string MessageType { get; set; } IDictionary Properties { get; } @@ -39,7 +44,7 @@ public TBuilder Topic(string topic) { return (TBuilder)this; } - public TBuilder MessageTypeResolver(Func resolver) { + public TBuilder MessageTypeResolver(IMessageTypeResolver resolver) { if (resolver == null) throw new ArgumentNullException(nameof(resolver)); Target.MessageTypeResolver = resolver; From 11c85ac9218698a10f3bf90ecf4d3fd2107ae645 Mon Sep 17 00:00:00 2001 From: "Eric J. Smith" Date: Fri, 29 Jul 2022 21:15:31 -0500 Subject: [PATCH 3/3] Progress --- .../Messaging/MessageBusTestBase.cs | 2 +- src/Foundatio/Foundatio.csproj | 1 + src/Foundatio/Messaging/IMessageBus.cs | 2 ++ src/Foundatio/Messaging/IMessageSubscriber.cs | 31 +++++++------------ src/Foundatio/Messaging/Message.cs | 28 +++++++---------- src/Foundatio/Messaging/MessageBusBase.cs | 24 +++++++------- src/Foundatio/Messaging/NullMessageBus.cs | 8 ++--- .../Messaging/SharedMessageBusOptions.cs | 21 ++++++++----- src/Foundatio/Serializer/ISerializer.cs | 2 ++ 9 files changed, 55 insertions(+), 64 deletions(-) diff --git a/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs b/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs index 0d763e285..f19f1542c 100644 --- a/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs +++ b/src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs @@ -522,7 +522,7 @@ await messageBus.SubscribeAsync(msg => { Interlocked.Increment(ref messageCount); cancellationTokenSource.Cancel(); countdown.Signal(); - }, cancellationTokenSource.Token); + }, cancellationToken: cancellationTokenSource.Token); await messageBus.SubscribeAsync(msg => countdown.Signal()); diff --git a/src/Foundatio/Foundatio.csproj b/src/Foundatio/Foundatio.csproj index 91824d2ee..2b33bb216 100644 --- a/src/Foundatio/Foundatio.csproj +++ b/src/Foundatio/Foundatio.csproj @@ -5,5 +5,6 @@ + diff --git a/src/Foundatio/Messaging/IMessageBus.cs b/src/Foundatio/Messaging/IMessageBus.cs index f1eeca8df..aa9ad9ac5 100644 --- a/src/Foundatio/Messaging/IMessageBus.cs +++ b/src/Foundatio/Messaging/IMessageBus.cs @@ -7,6 +7,8 @@ public interface IMessageBus : IMessagePublisher, IMessageSubscriber, IDisposabl public class MessageOptions { public string UniqueId { get; set; } public string CorrelationId { get; set; } + public string Topic { get; set; } + public string MessageType { get; set; } public TimeSpan? DeliveryDelay { get; set; } public IDictionary Properties { get; set; } = new Dictionary(); } diff --git a/src/Foundatio/Messaging/IMessageSubscriber.cs b/src/Foundatio/Messaging/IMessageSubscriber.cs index f3a5b5eee..557873d22 100644 --- a/src/Foundatio/Messaging/IMessageSubscriber.cs +++ b/src/Foundatio/Messaging/IMessageSubscriber.cs @@ -5,34 +5,34 @@ namespace Foundatio.Messaging { public interface IMessageSubscriber { - Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null) where T : class; + Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class; } public static class MessageBusExtensions { - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, MessageSubscriptionOptions options = null) where T : class { - return subscriber.SubscribeAsync((msg, token) => handler(msg), options); + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class { + return subscriber.SubscribeAsync((msg, token) => handler(msg), options, cancellationToken); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, MessageSubscriptionOptions options = null) where T : class { + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class { return subscriber.SubscribeAsync((msg, token) => { handler(msg); return Task.CompletedTask; - }, options); + }, options, cancellationToken); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, MessageSubscriptionOptions options = null) { - return subscriber.SubscribeAsync((msg, token) => handler(msg, token), options); + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) { + return subscriber.SubscribeAsync((msg, token) => handler(msg, token), options, cancellationToken); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, MessageSubscriptionOptions options = null) { - return subscriber.SubscribeAsync((msg, token) => handler(msg), options); + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) { + return subscriber.SubscribeAsync((msg, token) => handler(msg), options, cancellationToken); } - public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, MessageSubscriptionOptions options = null) { + public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) { return subscriber.SubscribeAsync((msg, token) => { handler(msg); return Task.CompletedTask; - }, options); + }, options, cancellationToken); } } @@ -63,15 +63,6 @@ public class MessageSubscriptionOptions { /// public string Topic { get; set; } - /// - /// Resolves a message to a .NET type. - /// - public Func MessageTypeResolver { get; set; } - - public CancellationToken CancellationToken { get; set; } - public IDictionary Properties { get; set; } = new Dictionary(); - - public static implicit operator MessageSubscriptionOptions(CancellationToken cancellationToken) => new() { CancellationToken = cancellationToken }; } } diff --git a/src/Foundatio/Messaging/Message.cs b/src/Foundatio/Messaging/Message.cs index 1a843db81..4ca2feb43 100644 --- a/src/Foundatio/Messaging/Message.cs +++ b/src/Foundatio/Messaging/Message.cs @@ -7,6 +7,7 @@ namespace Foundatio.Messaging { public interface IMessage : IAsyncDisposable { string UniqueId { get; } string CorrelationId { get; } + string Topic { get; } string Type { get; } Type ClrType { get; } byte[] Data { get; } @@ -33,6 +34,7 @@ public Message(byte[] data, Func getBody) { public string UniqueId { get; set; } public string CorrelationId { get; set; } + public string Topic { get; set; } public string Type { get; set; } public Type ClrType { get; set; } public IDictionary Properties { get; set; } = new Dictionary(); @@ -40,20 +42,20 @@ public Message(byte[] data, Func getBody) { public object GetBody() => _getBody(this); - public Task AbandonAsync() { - throw new NotImplementedException(); + public virtual Task AbandonAsync() { + return Task.CompletedTask; } - public Task CompleteAsync() { - throw new NotImplementedException(); + public virtual Task CompleteAsync() { + return Task.CompletedTask; } - public Task RenewLockAsync() { - throw new NotImplementedException(); + public virtual Task RenewLockAsync() { + return Task.CompletedTask; } - public ValueTask DisposeAsync() { - throw new NotImplementedException(); + public virtual ValueTask DisposeAsync() { + return new ValueTask(AbandonAsync()); } } @@ -65,27 +67,19 @@ public Message(IMessage message) { } public byte[] Data => _message.Data; - public T Body => (T)GetBody(); - public string UniqueId => _message.UniqueId; - public string CorrelationId => _message.CorrelationId; - + public string Topic => _message.Topic; public string Type => _message.Type; - public Type ClrType => _message.ClrType; - public IDictionary Properties => _message.Properties; public object GetBody() => _message.GetBody(); public Task AbandonAsync() => _message.AbandonAsync(); - public Task CompleteAsync() => _message.CompleteAsync(); - public Task RenewLockAsync() => _message.RenewLockAsync(); - public ValueTask DisposeAsync() => _message.DisposeAsync(); } } \ No newline at end of file diff --git a/src/Foundatio/Messaging/MessageBusBase.cs b/src/Foundatio/Messaging/MessageBusBase.cs index dbc85d617..bce9f2c51 100644 --- a/src/Foundatio/Messaging/MessageBusBase.cs +++ b/src/Foundatio/Messaging/MessageBusBase.cs @@ -6,6 +6,7 @@ using System.Reflection; using System.Threading; using System.Threading.Tasks; +using System.Threading.Channels; using Foundatio.Serializer; using Foundatio.Utility; using Microsoft.Extensions.Logging; @@ -13,6 +14,7 @@ namespace Foundatio.Messaging { public abstract class MessageBusBase : IMessageBus, IDisposable where TOptions : SharedMessageBusOptions { + protected readonly ConcurrentDictionary> _topics = new(); private readonly CancellationTokenSource _messageBusDisposedCancellationTokenSource; protected readonly ConcurrentDictionary _subscribers = new(); protected readonly TOptions _options; @@ -25,7 +27,7 @@ public MessageBusBase(TOptions options) { var loggerFactory = options?.LoggerFactory ?? NullLoggerFactory.Instance; _logger = loggerFactory.CreateLogger(GetType()); _serializer = options.Serializer ?? DefaultSerializer.Instance; - MessageBusId = _options.Topic + Guid.NewGuid().ToString("N").Substring(10); + MessageBusId = _options.DefaultTopic + Guid.NewGuid().ToString("N").Substring(10); _messageBusDisposedCancellationTokenSource = new CancellationTokenSource(); } @@ -93,9 +95,9 @@ protected Type GetMappedMessageType(string messageType) { protected virtual Task RemoveTopicSubscriptionAsync() => Task.CompletedTask; protected virtual Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) => Task.CompletedTask; - protected virtual Task SubscribeImplAsync(Func handler, MessageSubscriptionOptions options) where T : class { + protected virtual Task SubscribeImplAsync(Func handler, MessageSubscriptionOptions options, CancellationToken cancellationToken = default) where T : class { var subscriber = new Subscriber { - CancellationToken = options.CancellationToken, + CancellationToken = cancellationToken, Type = typeof(T), Action = (message, token) => { if (message is not T) { @@ -104,7 +106,7 @@ protected virtual Task SubscribeImplAsync(Func SubscribeImplAsync(Func(new MessageSubscription(Guid.NewGuid().ToString("N"), () => { + return Task.FromResult(new MessageSubscription(Guid.NewGuid().ToString("N"), async () => { _subscribers.TryRemove(subscriber.Id, out _); if (_subscribers.Count == 0) - RemoveTopicSubscriptionAsync().GetAwaiter().GetResult(); - - return new ValueTask(); + await RemoveTopicSubscriptionAsync().AnyContext(); })); } - public async Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null) where T : class { + public async Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class { if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Adding subscriber for {MessageType}.", typeof(T).FullName); options ??= new MessageSubscriptionOptions(); var sub = await SubscribeImplAsync(handler, options).AnyContext(); - await EnsureTopicSubscriptionAsync(options.CancellationToken).AnyContext(); + await EnsureTopicSubscriptionAsync(cancellationToken).AnyContext(); - if (options.CancellationToken != CancellationToken.None) - options.CancellationToken.Register(() => sub.DisposeAsync()); + if (cancellationToken != CancellationToken.None) + cancellationToken.Register(() => sub.DisposeAsync()); return sub; } diff --git a/src/Foundatio/Messaging/NullMessageBus.cs b/src/Foundatio/Messaging/NullMessageBus.cs index 017093a59..b79505a7c 100644 --- a/src/Foundatio/Messaging/NullMessageBus.cs +++ b/src/Foundatio/Messaging/NullMessageBus.cs @@ -10,14 +10,10 @@ public Task PublishAsync(Type messageType, object message, MessageOptions option return Task.CompletedTask; } - public Task SubscribeAsync(Func handler, CancellationToken cancellationToken = default) where T : class { - return Task.CompletedTask; + public Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class { + return Task.FromResult(new MessageSubscription(Guid.NewGuid().ToString(), () => new ValueTask())); } public void Dispose() {} - - public Task SubscribeAsync(Func handler, MessageSubscriptionOptions options = null) where T : class { - return Task.FromResult(new MessageSubscription(Guid.NewGuid().ToString(), () => new ValueTask())); - } } } diff --git a/src/Foundatio/Messaging/SharedMessageBusOptions.cs b/src/Foundatio/Messaging/SharedMessageBusOptions.cs index 149698742..4f0cb2142 100644 --- a/src/Foundatio/Messaging/SharedMessageBusOptions.cs +++ b/src/Foundatio/Messaging/SharedMessageBusOptions.cs @@ -4,32 +4,37 @@ namespace Foundatio.Messaging { public class SharedMessageBusOptions : SharedOptions { /// - /// The topic name + /// The default topic name /// - public string Topic { get; set; } = "messages"; + public string DefaultTopic { get; set; } = "messages"; /// /// Resolves message types /// - public IMessageTypeResolver MessageTypeResolver { get; set; } + public IMessageRouter Router { get; set; } /// - /// Statically configured message type mappings. will be run first and then this dictionary will be checked. + /// Statically configured message type mappings. will be run first and then this dictionary will be checked. /// public Dictionary MessageTypeMappings { get; set; } = new Dictionary(); } - public interface IMessageTypeResolver { + public interface IMessageRouter { + // get topic from bus options, message and message options + // get message type from message and options + // get .net type from topic, message type and properties (headers) IConsumeMessageContext ToMessageType(Type messageType); Type ToClrType(IConsumeMessageContext context); } public interface IConsumeMessageContext { + string Topic { get; set; } string MessageType { get; set; } IDictionary Properties { get; } } public class ConsumeMessageContext : IConsumeMessageContext { + public string Topic { get; set; } public string MessageType { get; set; } public IDictionary Properties { get; set; } = new Dictionary(); } @@ -40,14 +45,14 @@ public class SharedMessageBusOptionsBuilder : SharedOptionsB public TBuilder Topic(string topic) { if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException(nameof(topic)); - Target.Topic = topic; + Target.DefaultTopic = topic; return (TBuilder)this; } - public TBuilder MessageTypeResolver(IMessageTypeResolver resolver) { + public TBuilder MessageTypeResolver(IMessageRouter resolver) { if (resolver == null) throw new ArgumentNullException(nameof(resolver)); - Target.MessageTypeResolver = resolver; + Target.Router = resolver; return (TBuilder)this; } diff --git a/src/Foundatio/Serializer/ISerializer.cs b/src/Foundatio/Serializer/ISerializer.cs index 25f022b23..bec99a225 100644 --- a/src/Foundatio/Serializer/ISerializer.cs +++ b/src/Foundatio/Serializer/ISerializer.cs @@ -14,6 +14,8 @@ public static class DefaultSerializer { public static ISerializer Instance { get; set; } = new SystemTextJsonSerializer(); } + // TODO: Add a wrapper that adds activities for serialization + public static class SerializerExtensions { public static T Deserialize(this ISerializer serializer, Stream data) { return (T)serializer.Deserialize(data, typeof(T));