-
-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Working on message bus subscriptions options #278
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -553,6 +553,7 @@ public virtual async Task WillWaitForItemAsync() { | |
return; | ||
|
||
try { | ||
Log.MinimumLevel = LogLevel.Trace; | ||
await queue.DeleteQueueAsync(); | ||
await AssertEmptyQueueAsync(queue); | ||
|
||
|
@@ -564,14 +565,16 @@ public virtual async Task WillWaitForItemAsync() { | |
Assert.InRange(sw.Elapsed, TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(5000)); | ||
|
||
_ = Task.Run(async () => { | ||
await SystemClock.SleepAsync(500); | ||
await Task.Delay(500); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I know that, but in this case we never want the sleep to be virtual. |
||
_logger.LogInformation("Enqueuing async message"); | ||
await queue.EnqueueAsync(new SimpleWorkItem { | ||
Data = "Hello" | ||
}); | ||
}); | ||
|
||
sw.Restart(); | ||
workItem = await queue.DequeueAsync(TimeSpan.FromSeconds(1)); | ||
_logger.LogInformation("Dequeuing message with timeout"); | ||
workItem = await queue.DequeueAsync(TimeSpan.FromSeconds(10)); | ||
sw.Stop(); | ||
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Time {Elapsed:g}", sw.Elapsed); | ||
Assert.True(sw.Elapsed > TimeSpan.FromMilliseconds(400)); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,77 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
namespace Foundatio.Messaging { | ||
public interface IMessageSubscriber { | ||
Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken = default) where T : class; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wondering if we should create extension method overloads for this that just set a token on options. Will users know there is an implicit conversion happening here for cancellation token? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe, I am worried about us having overload conflicts since there are already a bunch of overloads for this method. I pretty much did this just for compatibility. |
||
Task<IMessageSubscription> SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, MessageSubscriptionOptions options = null) where T : class; | ||
} | ||
|
||
public static class MessageBusExtensions { | ||
public static Task SubscribeAsync<T>(this IMessageSubscriber subscriber, Func<T, Task> handler, CancellationToken cancellationToken = default) where T : class { | ||
return subscriber.SubscribeAsync<T>((msg, token) => handler(msg), cancellationToken); | ||
public static Task<IMessageSubscription> SubscribeAsync<T>(this IMessageSubscriber subscriber, Func<T, Task> handler, MessageSubscriptionOptions options = null) where T : class { | ||
return subscriber.SubscribeAsync<T>((msg, token) => handler(msg), options); | ||
} | ||
|
||
public static Task SubscribeAsync<T>(this IMessageSubscriber subscriber, Action<T> handler, CancellationToken cancellationToken = default) where T : class { | ||
public static Task<IMessageSubscription> SubscribeAsync<T>(this IMessageSubscriber subscriber, Action<T> handler, MessageSubscriptionOptions options = null) where T : class { | ||
return subscriber.SubscribeAsync<T>((msg, token) => { | ||
handler(msg); | ||
return Task.CompletedTask; | ||
}, cancellationToken); | ||
}, options); | ||
} | ||
|
||
public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func<IMessage, CancellationToken, Task> handler, CancellationToken cancellationToken = default) { | ||
return subscriber.SubscribeAsync<IMessage>((msg, token) => handler(msg, token), cancellationToken); | ||
public static Task<IMessageSubscription> SubscribeAsync(this IMessageSubscriber subscriber, Func<IMessage, CancellationToken, Task> handler, MessageSubscriptionOptions options = null) { | ||
return subscriber.SubscribeAsync<IMessage>((msg, token) => handler(msg, token), options); | ||
} | ||
|
||
public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func<IMessage, Task> handler, CancellationToken cancellationToken = default) { | ||
return subscriber.SubscribeAsync((msg, token) => handler(msg), cancellationToken); | ||
public static Task<IMessageSubscription> SubscribeAsync(this IMessageSubscriber subscriber, Func<IMessage, Task> handler, MessageSubscriptionOptions options = null) { | ||
return subscriber.SubscribeAsync((msg, token) => handler(msg), options); | ||
} | ||
|
||
public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action<IMessage> handler, CancellationToken cancellationToken = default) { | ||
public static Task<IMessageSubscription> SubscribeAsync(this IMessageSubscriber subscriber, Action<IMessage> handler, MessageSubscriptionOptions options = null) { | ||
return subscriber.SubscribeAsync((msg, token) => { | ||
handler(msg); | ||
return Task.CompletedTask; | ||
}, cancellationToken); | ||
}, options); | ||
} | ||
} | ||
|
||
public interface IMessageSubscription : IAsyncDisposable { | ||
string SubscriptionId { get; } | ||
IDictionary<string, string> Properties { get; } | ||
} | ||
|
||
public class MessageSubscription : IMessageSubscription { | ||
private readonly Func<ValueTask> _disposeSubscriptionFunc; | ||
|
||
public MessageSubscription(string subscriptionId, Func<ValueTask> disposeSubscriptionFunc) { | ||
SubscriptionId = subscriptionId; | ||
_disposeSubscriptionFunc = disposeSubscriptionFunc; | ||
} | ||
|
||
public string SubscriptionId { get; } | ||
public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>(); | ||
|
||
public ValueTask DisposeAsync() { | ||
return _disposeSubscriptionFunc?.Invoke() ?? new ValueTask(); | ||
} | ||
} | ||
|
||
public class MessageSubscriptionOptions { | ||
/// <summary> | ||
/// The topic name | ||
/// </summary> | ||
public string Topic { get; set; } | ||
|
||
/// <summary> | ||
/// Resolves a message to a .NET type. | ||
/// </summary> | ||
public Func<IConsumeMessageContext, Type> MessageTypeResolver { get; set; } | ||
|
||
public CancellationToken CancellationToken { get; set; } | ||
|
||
public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>(); | ||
|
||
public static implicit operator MessageSubscriptionOptions(CancellationToken cancellationToken) => new() { CancellationToken = cancellationToken }; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,21 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Diagnostics; | ||
using Foundatio.Serializer; | ||
using System.Threading.Tasks; | ||
|
||
namespace Foundatio.Messaging { | ||
public interface IMessage { | ||
public interface IMessage : IAsyncDisposable { | ||
string UniqueId { get; } | ||
string CorrelationId { get; } | ||
string Type { get; } | ||
Type ClrType { get; } | ||
byte[] Data { get; } | ||
object GetBody(); | ||
IDictionary<string, string> Properties { get; } | ||
|
||
Task RenewLockAsync(); | ||
Task AbandonAsync(); | ||
Task CompleteAsync(); | ||
} | ||
|
||
public interface IMessage<T> : IMessage where T: class { | ||
|
@@ -33,7 +37,24 @@ public Message(byte[] data, Func<IMessage, object> getBody) { | |
public Type ClrType { get; set; } | ||
public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>(); | ||
public byte[] Data { get; set; } | ||
|
||
public object GetBody() => _getBody(this); | ||
|
||
public Task AbandonAsync() { | ||
throw new NotImplementedException(); | ||
} | ||
|
||
public Task CompleteAsync() { | ||
throw new NotImplementedException(); | ||
} | ||
|
||
public Task RenewLockAsync() { | ||
throw new NotImplementedException(); | ||
} | ||
|
||
public ValueTask DisposeAsync() { | ||
throw new NotImplementedException(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the default interface implementation throw for dispose? I'd think this would do nothing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As you cant see, these haven't been implemented yet. This PR is not done. |
||
} | ||
} | ||
|
||
public class Message<T> : IMessage<T> where T : class { | ||
|
@@ -58,5 +79,13 @@ public Message(IMessage message) { | |
public IDictionary<string, string> Properties => _message.Properties; | ||
|
||
public object GetBody() => _message.GetBody(); | ||
|
||
public Task AbandonAsync() => _message.AbandonAsync(); | ||
|
||
public Task CompleteAsync() => _message.CompleteAsync(); | ||
|
||
public Task RenewLockAsync() => _message.RenewLockAsync(); | ||
|
||
public ValueTask DisposeAsync() => _message.DisposeAsync(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,11 +59,11 @@ protected string GetMappedMessageType(Type messageType) { | |
} | ||
|
||
private readonly ConcurrentDictionary<string, Type> _knownMessageTypesCache = new(); | ||
protected virtual Type GetMappedMessageType(string messageType) { | ||
if (String.IsNullOrEmpty(messageType)) | ||
protected virtual Type GetMappedMessageType(IConsumeMessageContext context) { | ||
if (context == null || String.IsNullOrEmpty(context.MessageType)) | ||
return null; | ||
|
||
return _knownMessageTypesCache.GetOrAdd(messageType, type => { | ||
return _knownMessageTypesCache.GetOrAdd(context.MessageType, type => { | ||
if (_options.MessageTypeMappings != null && _options.MessageTypeMappings.ContainsKey(type)) | ||
return _options.MessageTypeMappings[type]; | ||
|
||
|
@@ -86,13 +86,16 @@ protected virtual Type GetMappedMessageType(string messageType) { | |
} | ||
}); | ||
} | ||
protected Type GetMappedMessageType(string messageType) { | ||
return GetMappedMessageType(new ConsumeMessageContext { MessageType = messageType }); | ||
} | ||
|
||
protected virtual Task RemoveTopicSubscriptionAsync() => Task.CompletedTask; | ||
protected virtual Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) => Task.CompletedTask; | ||
|
||
protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken) where T : class { | ||
protected virtual Task<IMessageSubscription> SubscribeImplAsync<T>(Func<T, CancellationToken, Task> handler, MessageSubscriptionOptions options) where T : class { | ||
var subscriber = new Subscriber { | ||
CancellationToken = cancellationToken, | ||
CancellationToken = options.CancellationToken, | ||
Type = typeof(T), | ||
Action = (message, token) => { | ||
if (message is not T) { | ||
|
@@ -101,18 +104,10 @@ protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> ha | |
return Task.CompletedTask; | ||
} | ||
|
||
return handler((T)message, cancellationToken); | ||
return handler((T)message, options.CancellationToken); | ||
} | ||
}; | ||
|
||
if (cancellationToken != CancellationToken.None) { | ||
cancellationToken.Register(() => { | ||
_subscribers.TryRemove(subscriber.Id, out _); | ||
if (_subscribers.Count == 0) | ||
RemoveTopicSubscriptionAsync().GetAwaiter().GetResult(); | ||
}); | ||
} | ||
|
||
if (subscriber.Type.Name == "IMessage`1" && subscriber.Type.GenericTypeArguments.Length == 1) { | ||
var modelType = subscriber.Type.GenericTypeArguments.Single(); | ||
subscriber.GenericType = typeof(Message<>).MakeGenericType(modelType); | ||
|
@@ -121,15 +116,28 @@ protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> ha | |
if (!_subscribers.TryAdd(subscriber.Id, subscriber) && _logger.IsEnabled(LogLevel.Error)) | ||
_logger.LogError("Unable to add subscriber {SubscriberId}", subscriber.Id); | ||
|
||
return Task.CompletedTask; | ||
return Task.FromResult<IMessageSubscription>(new MessageSubscription(Guid.NewGuid().ToString("N"), () => { | ||
_subscribers.TryRemove(subscriber.Id, out _); | ||
if (_subscribers.Count == 0) | ||
RemoveTopicSubscriptionAsync().GetAwaiter().GetResult(); | ||
|
||
return new ValueTask(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kinda would be nice if this was async. |
||
})); | ||
} | ||
|
||
public async Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken = default) where T : class { | ||
public async Task<IMessageSubscription> SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, MessageSubscriptionOptions options = null) where T : class { | ||
if (_logger.IsEnabled(LogLevel.Trace)) | ||
_logger.LogTrace("Adding subscriber for {MessageType}.", typeof(T).FullName); | ||
|
||
await SubscribeImplAsync(handler, cancellationToken).AnyContext(); | ||
await EnsureTopicSubscriptionAsync(cancellationToken).AnyContext(); | ||
options ??= new MessageSubscriptionOptions(); | ||
|
||
var sub = await SubscribeImplAsync(handler, options).AnyContext(); | ||
await EnsureTopicSubscriptionAsync(options.CancellationToken).AnyContext(); | ||
|
||
if (options.CancellationToken != CancellationToken.None) | ||
options.CancellationToken.Register(() => sub.DisposeAsync()); | ||
|
||
return sub; | ||
} | ||
|
||
protected List<Subscriber> GetMessageSubscribers(IMessage message) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
using System; | ||
using System; | ||
using System.Collections.Generic; | ||
|
||
namespace Foundatio.Messaging { | ||
|
@@ -9,11 +9,31 @@ public class SharedMessageBusOptions : SharedOptions { | |
public string Topic { get; set; } = "messages"; | ||
|
||
/// <summary> | ||
/// Controls which types messages are mapped to. | ||
/// Resolves message types | ||
/// </summary> | ||
public IMessageTypeResolver MessageTypeResolver { get; set; } | ||
|
||
/// <summary> | ||
/// Statically configured message type mappings. <see cref="MessageTypeResolver"/> will be run first and then this dictionary will be checked. | ||
/// </summary> | ||
public Dictionary<string, Type> MessageTypeMappings { get; set; } = new Dictionary<string, Type>(); | ||
} | ||
|
||
public interface IMessageTypeResolver { | ||
IConsumeMessageContext ToMessageType(Type messageType); | ||
Type ToClrType(IConsumeMessageContext context); | ||
} | ||
|
||
public interface IConsumeMessageContext { | ||
string MessageType { get; set; } | ||
IDictionary<string, string> Properties { get; } | ||
Comment on lines
+32
to
+33
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this work for kafka? I'm dealing with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well it can't be kafka specific. It has access to properties which should include the headers from the kafka message. |
||
} | ||
|
||
public class ConsumeMessageContext : IConsumeMessageContext { | ||
public string MessageType { get; set; } | ||
public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>(); | ||
} | ||
|
||
public class SharedMessageBusOptionsBuilder<TOptions, TBuilder> : SharedOptionsBuilder<TOptions, TBuilder> | ||
where TOptions : SharedMessageBusOptions, new() | ||
where TBuilder : SharedMessageBusOptionsBuilder<TOptions, TBuilder> { | ||
|
@@ -24,6 +44,13 @@ public TBuilder Topic(string topic) { | |
return (TBuilder)this; | ||
} | ||
|
||
public TBuilder MessageTypeResolver(IMessageTypeResolver resolver) { | ||
if (resolver == null) | ||
throw new ArgumentNullException(nameof(resolver)); | ||
Target.MessageTypeResolver = resolver; | ||
return (TBuilder)this; | ||
} | ||
|
||
public TBuilder MapMessageType<T>(string name) { | ||
if (Target.MessageTypeMappings == null) | ||
Target.MessageTypeMappings = new Dictionary<string, Type>(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably should revert this and leave this up to the caller.