Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working on message bus subscriptions options #278

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/Foundatio.TestHarness/Queue/QueueTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ public virtual async Task WillWaitForItemAsync() {
return;

try {
Log.MinimumLevel = LogLevel.Trace;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably should revert this and leave this up to the caller.

await queue.DeleteQueueAsync();
await AssertEmptyQueueAsync(queue);

Expand All @@ -564,14 +565,16 @@ public virtual async Task WillWaitForItemAsync() {
Assert.InRange(sw.Elapsed, TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(5000));

_ = Task.Run(async () => {
await SystemClock.SleepAsync(500);
await Task.Delay(500);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SystemClock.SleepAsync(500) should be doing this behind the scenes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I know that, but in this case we never want the sleep to be virtual.

_logger.LogInformation("Enqueuing async message");
await queue.EnqueueAsync(new SimpleWorkItem {
Data = "Hello"
});
});

sw.Restart();
workItem = await queue.DequeueAsync(TimeSpan.FromSeconds(1));
_logger.LogInformation("Dequeuing message with timeout");
workItem = await queue.DequeueAsync(TimeSpan.FromSeconds(10));
sw.Stop();
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Time {Elapsed:g}", sw.Elapsed);
Assert.True(sw.Elapsed > TimeSpan.FromMilliseconds(400));
Expand Down
62 changes: 51 additions & 11 deletions src/Foundatio/Messaging/IMessageSubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,77 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Foundatio.Messaging {
public interface IMessageSubscriber {
Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken = default) where T : class;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should create extension method overloads for this that just set a token on options. Will users know there is an implicit conversion happening here for cancellation token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I am worried about us having overload conflicts since there are already a bunch of overloads for this method. I pretty much did this just for compatibility.

Task<IMessageSubscription> SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, MessageSubscriptionOptions options = null) where T : class;
}

public static class MessageBusExtensions {
public static Task SubscribeAsync<T>(this IMessageSubscriber subscriber, Func<T, Task> handler, CancellationToken cancellationToken = default) where T : class {
return subscriber.SubscribeAsync<T>((msg, token) => handler(msg), cancellationToken);
public static Task<IMessageSubscription> SubscribeAsync<T>(this IMessageSubscriber subscriber, Func<T, Task> handler, MessageSubscriptionOptions options = null) where T : class {
return subscriber.SubscribeAsync<T>((msg, token) => handler(msg), options);
}

public static Task SubscribeAsync<T>(this IMessageSubscriber subscriber, Action<T> handler, CancellationToken cancellationToken = default) where T : class {
public static Task<IMessageSubscription> SubscribeAsync<T>(this IMessageSubscriber subscriber, Action<T> handler, MessageSubscriptionOptions options = null) where T : class {
return subscriber.SubscribeAsync<T>((msg, token) => {
handler(msg);
return Task.CompletedTask;
}, cancellationToken);
}, options);
}

public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func<IMessage, CancellationToken, Task> handler, CancellationToken cancellationToken = default) {
return subscriber.SubscribeAsync<IMessage>((msg, token) => handler(msg, token), cancellationToken);
public static Task<IMessageSubscription> SubscribeAsync(this IMessageSubscriber subscriber, Func<IMessage, CancellationToken, Task> handler, MessageSubscriptionOptions options = null) {
return subscriber.SubscribeAsync<IMessage>((msg, token) => handler(msg, token), options);
}

public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func<IMessage, Task> handler, CancellationToken cancellationToken = default) {
return subscriber.SubscribeAsync((msg, token) => handler(msg), cancellationToken);
public static Task<IMessageSubscription> SubscribeAsync(this IMessageSubscriber subscriber, Func<IMessage, Task> handler, MessageSubscriptionOptions options = null) {
return subscriber.SubscribeAsync((msg, token) => handler(msg), options);
}

public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action<IMessage> handler, CancellationToken cancellationToken = default) {
public static Task<IMessageSubscription> SubscribeAsync(this IMessageSubscriber subscriber, Action<IMessage> handler, MessageSubscriptionOptions options = null) {
return subscriber.SubscribeAsync((msg, token) => {
handler(msg);
return Task.CompletedTask;
}, cancellationToken);
}, options);
}
}

public interface IMessageSubscription : IAsyncDisposable {
string SubscriptionId { get; }
IDictionary<string, string> Properties { get; }
}

public class MessageSubscription : IMessageSubscription {
private readonly Func<ValueTask> _disposeSubscriptionFunc;

public MessageSubscription(string subscriptionId, Func<ValueTask> disposeSubscriptionFunc) {
SubscriptionId = subscriptionId;
_disposeSubscriptionFunc = disposeSubscriptionFunc;
}

public string SubscriptionId { get; }
public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>();

public ValueTask DisposeAsync() {
return _disposeSubscriptionFunc?.Invoke() ?? new ValueTask();
}
}

public class MessageSubscriptionOptions {
/// <summary>
/// The topic name
/// </summary>
public string Topic { get; set; }

/// <summary>
/// Resolves a message to a .NET type.
/// </summary>
public Func<IConsumeMessageContext, Type> MessageTypeResolver { get; set; }

public CancellationToken CancellationToken { get; set; }

public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>();

public static implicit operator MessageSubscriptionOptions(CancellationToken cancellationToken) => new() { CancellationToken = cancellationToken };
}
}
33 changes: 31 additions & 2 deletions src/Foundatio/Messaging/Message.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using Foundatio.Serializer;
using System.Threading.Tasks;

namespace Foundatio.Messaging {
public interface IMessage {
public interface IMessage : IAsyncDisposable {
string UniqueId { get; }
string CorrelationId { get; }
string Type { get; }
Type ClrType { get; }
byte[] Data { get; }
object GetBody();
IDictionary<string, string> Properties { get; }

Task RenewLockAsync();
Task AbandonAsync();
Task CompleteAsync();
}

public interface IMessage<T> : IMessage where T: class {
Expand All @@ -33,7 +37,24 @@ public Message(byte[] data, Func<IMessage, object> getBody) {
public Type ClrType { get; set; }
public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>();
public byte[] Data { get; set; }

public object GetBody() => _getBody(this);

public Task AbandonAsync() {
throw new NotImplementedException();
}

public Task CompleteAsync() {
throw new NotImplementedException();
}

public Task RenewLockAsync() {
throw new NotImplementedException();
}

public ValueTask DisposeAsync() {
throw new NotImplementedException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the default interface implementation throw for dispose? I'd think this would do nothing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you cant see, these haven't been implemented yet. This PR is not done.

}
}

public class Message<T> : IMessage<T> where T : class {
Expand All @@ -58,5 +79,13 @@ public Message(IMessage message) {
public IDictionary<string, string> Properties => _message.Properties;

public object GetBody() => _message.GetBody();

public Task AbandonAsync() => _message.AbandonAsync();

public Task CompleteAsync() => _message.CompleteAsync();

public Task RenewLockAsync() => _message.RenewLockAsync();

public ValueTask DisposeAsync() => _message.DisposeAsync();
}
}
44 changes: 26 additions & 18 deletions src/Foundatio/Messaging/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ protected string GetMappedMessageType(Type messageType) {
}

private readonly ConcurrentDictionary<string, Type> _knownMessageTypesCache = new();
protected virtual Type GetMappedMessageType(string messageType) {
if (String.IsNullOrEmpty(messageType))
protected virtual Type GetMappedMessageType(IConsumeMessageContext context) {
if (context == null || String.IsNullOrEmpty(context.MessageType))
return null;

return _knownMessageTypesCache.GetOrAdd(messageType, type => {
return _knownMessageTypesCache.GetOrAdd(context.MessageType, type => {
if (_options.MessageTypeMappings != null && _options.MessageTypeMappings.ContainsKey(type))
return _options.MessageTypeMappings[type];

Expand All @@ -86,13 +86,16 @@ protected virtual Type GetMappedMessageType(string messageType) {
}
});
}
protected Type GetMappedMessageType(string messageType) {
return GetMappedMessageType(new ConsumeMessageContext { MessageType = messageType });
}

protected virtual Task RemoveTopicSubscriptionAsync() => Task.CompletedTask;
protected virtual Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) => Task.CompletedTask;

protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken) where T : class {
protected virtual Task<IMessageSubscription> SubscribeImplAsync<T>(Func<T, CancellationToken, Task> handler, MessageSubscriptionOptions options) where T : class {
var subscriber = new Subscriber {
CancellationToken = cancellationToken,
CancellationToken = options.CancellationToken,
Type = typeof(T),
Action = (message, token) => {
if (message is not T) {
Expand All @@ -101,18 +104,10 @@ protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> ha
return Task.CompletedTask;
}

return handler((T)message, cancellationToken);
return handler((T)message, options.CancellationToken);
}
};

if (cancellationToken != CancellationToken.None) {
cancellationToken.Register(() => {
_subscribers.TryRemove(subscriber.Id, out _);
if (_subscribers.Count == 0)
RemoveTopicSubscriptionAsync().GetAwaiter().GetResult();
});
}

if (subscriber.Type.Name == "IMessage`1" && subscriber.Type.GenericTypeArguments.Length == 1) {
var modelType = subscriber.Type.GenericTypeArguments.Single();
subscriber.GenericType = typeof(Message<>).MakeGenericType(modelType);
Expand All @@ -121,15 +116,28 @@ protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> ha
if (!_subscribers.TryAdd(subscriber.Id, subscriber) && _logger.IsEnabled(LogLevel.Error))
_logger.LogError("Unable to add subscriber {SubscriberId}", subscriber.Id);

return Task.CompletedTask;
return Task.FromResult<IMessageSubscription>(new MessageSubscription(Guid.NewGuid().ToString("N"), () => {
_subscribers.TryRemove(subscriber.Id, out _);
if (_subscribers.Count == 0)
RemoveTopicSubscriptionAsync().GetAwaiter().GetResult();

return new ValueTask();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinda would be nice if this was async.

}));
}

public async Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken = default) where T : class {
public async Task<IMessageSubscription> SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, MessageSubscriptionOptions options = null) where T : class {
if (_logger.IsEnabled(LogLevel.Trace))
_logger.LogTrace("Adding subscriber for {MessageType}.", typeof(T).FullName);

await SubscribeImplAsync(handler, cancellationToken).AnyContext();
await EnsureTopicSubscriptionAsync(cancellationToken).AnyContext();
options ??= new MessageSubscriptionOptions();

var sub = await SubscribeImplAsync(handler, options).AnyContext();
await EnsureTopicSubscriptionAsync(options.CancellationToken).AnyContext();

if (options.CancellationToken != CancellationToken.None)
options.CancellationToken.Register(() => sub.DisposeAsync());

return sub;
}

protected List<Subscriber> GetMessageSubscribers(IMessage message) {
Expand Down
4 changes: 4 additions & 0 deletions src/Foundatio/Messaging/NullMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,9 @@ public Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, Cancella
}

public void Dispose() {}

public Task<IMessageSubscription> SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, MessageSubscriptionOptions options = null) where T : class {
return Task.FromResult<IMessageSubscription>(new MessageSubscription(Guid.NewGuid().ToString(), () => new ValueTask()));
}
}
}
31 changes: 29 additions & 2 deletions src/Foundatio/Messaging/SharedMessageBusOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;

namespace Foundatio.Messaging {
Expand All @@ -9,11 +9,31 @@ public class SharedMessageBusOptions : SharedOptions {
public string Topic { get; set; } = "messages";

/// <summary>
/// Controls which types messages are mapped to.
/// Resolves message types
/// </summary>
public IMessageTypeResolver MessageTypeResolver { get; set; }

/// <summary>
/// Statically configured message type mappings. <see cref="MessageTypeResolver"/> will be run first and then this dictionary will be checked.
/// </summary>
public Dictionary<string, Type> MessageTypeMappings { get; set; } = new Dictionary<string, Type>();
}

public interface IMessageTypeResolver {
IConsumeMessageContext ToMessageType(Type messageType);
Type ToClrType(IConsumeMessageContext context);
}

public interface IConsumeMessageContext {
string MessageType { get; set; }
IDictionary<string, string> Properties { get; }
Comment on lines +32 to +33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this work for kafka? I'm dealing with public Func<ConsumeResult<string, byte[]>, string> ResolveMessageType { get; set; } so I can look at specific headers and not have to deserialize every single header to process the message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it can't be kafka specific. It has access to properties which should include the headers from the kafka message.

}

public class ConsumeMessageContext : IConsumeMessageContext {
public string MessageType { get; set; }
public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>();
}

public class SharedMessageBusOptionsBuilder<TOptions, TBuilder> : SharedOptionsBuilder<TOptions, TBuilder>
where TOptions : SharedMessageBusOptions, new()
where TBuilder : SharedMessageBusOptionsBuilder<TOptions, TBuilder> {
Expand All @@ -24,6 +44,13 @@ public TBuilder Topic(string topic) {
return (TBuilder)this;
}

public TBuilder MessageTypeResolver(IMessageTypeResolver resolver) {
if (resolver == null)
throw new ArgumentNullException(nameof(resolver));
Target.MessageTypeResolver = resolver;
return (TBuilder)this;
}

public TBuilder MapMessageType<T>(string name) {
if (Target.MessageTypeMappings == null)
Target.MessageTypeMappings = new Dictionary<string, Type>();
Expand Down
3 changes: 1 addition & 2 deletions src/Foundatio/Queues/IQueueEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using Foundatio.Utility;

namespace Foundatio.Queues {
public interface IQueueEntry {
public interface IQueueEntry : IAsyncDisposable {
string Id { get; }
string CorrelationId { get; }
IDictionary<string, string> Properties { get; }
Expand All @@ -18,7 +18,6 @@ public interface IQueueEntry {
Task RenewLockAsync();
Task AbandonAsync();
Task CompleteAsync();
ValueTask DisposeAsync();
}

public interface IQueueEntry<T> : IQueueEntry where T : class {
Expand Down