Skip to content

Commit

Permalink
[Host] Introduce IMessageScopeAccessor to wrap the MessageScope provi…
Browse files Browse the repository at this point in the history
…der #222

Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Aug 11, 2024
1 parent f4818a4 commit a0dc3a4
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 29 deletions.
9 changes: 9 additions & 0 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- [ASP.Net Core](#aspnet-core)
- [Modularization of configuration](#modularization-of-configuration)
- [Auto registration of consumers and interceptors](#auto-registration-of-consumers-and-interceptors)
- [Message Scope Accessor](#message-scope-accessor)
- [Serialization](#serialization)
- [Multiple message types on one topic (or queue)](#multiple-message-types-on-one-topic-or-queue)
- [Message Type Resolver](#message-type-resolver)
Expand Down Expand Up @@ -674,6 +675,14 @@ services.AddSlimMessageBus(mbb =>
});
```

### Message Scope Accessor

During normal consumer/handler and interceptor life cycles, we can inject any scoped dependencies (services) using the constructor. All is nicely handled by MSDI.

However, for advanced framework integration, if there is a need to get ahold of the `IServiceProvider` tied to the scope of the currently consumed message the [`IMessageScopeAccessor`](../src/SlimMessageBus.Host/Consumer/IMessageScope.cs) can be used.
It works in a similar way how the [`IHttpContextAccessor`](https://learn.microsoft.com/en-us/dotnet/api/microsoft.aspnetcore.http.ihttpcontextaccessor?view=aspnetcore-8.0) works in ASP.NET Core to lookup the current ongoing HTTP request and the per request scoped services.
This is useful when the other framework is not managed by MSDI and we still want to hook into the current message scope.

## Serialization

SMB uses serialization plugins to serialize (and deserialize) the messages into the desired format.
Expand Down
9 changes: 9 additions & 0 deletions docs/intro.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- [ASP.Net Core](#aspnet-core)
- [Modularization of configuration](#modularization-of-configuration)
- [Auto registration of consumers and interceptors](#auto-registration-of-consumers-and-interceptors)
- [Message Scope Accessor](#message-scope-accessor)
- [Serialization](#serialization)
- [Multiple message types on one topic (or queue)](#multiple-message-types-on-one-topic-or-queue)
- [Message Type Resolver](#message-type-resolver)
Expand Down Expand Up @@ -674,6 +675,14 @@ services.AddSlimMessageBus(mbb =>
});
```

### Message Scope Accessor

During normal consumer/handler and interceptor life cycles, we can inject any scoped dependencies (services) using the constructor. All is nicely handled by MSDI.

However, for advanced framework integration, if there is a need to get ahold of the `IServiceProvider` tied to the scope of the currently consumed message the [`IMessageScopeAccessor`](../src/SlimMessageBus.Host/Consumer/IMessageScope.cs) can be used.
It works in a similar way how the [`IHttpContextAccessor`](https://learn.microsoft.com/en-us/dotnet/api/microsoft.aspnetcore.http.ihttpcontextaccessor?view=aspnetcore-8.0) works in ASP.NET Core to lookup the current ongoing HTTP request and the per request scoped services.
This is useful when the other framework is not managed by MSDI and we still want to hook into the current message scope.

## Serialization

SMB uses serialization plugins to serialize (and deserialize) the messages into the desired format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Description>Core configuration interfaces of SlimMessageBus</Description>
<PackageTags>SlimMessageBus</PackageTags>
<RootNamespace>SlimMessageBus.Host</RootNamespace>
<Version>2.4.0</Version>
<Version>2.5.0-rc1</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
13 changes: 13 additions & 0 deletions src/SlimMessageBus.Host/Consumer/IMessageScopeAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace SlimMessageBus.Host.Consumer;

/// <summary>
/// Allows to get ahold of the <see cref="IServiceProvider"/> for the current message scope.
/// </summary>
public interface IMessageScopeAccessor
{
/// <summary>
/// If the running code is within a message scope of a consumer, this property will return the <see cref="IServiceProvider"/> for the current message scope.
/// Otherwise it will return null.
/// </summary>
IServiceProvider Current { get; }
}
6 changes: 6 additions & 0 deletions src/SlimMessageBus.Host/Consumer/MessageScopeAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace SlimMessageBus.Host.Consumer;

internal sealed class MessageScopeAccessor : IMessageScopeAccessor
{
public IServiceProvider Current => MessageScope.Current;
}
4 changes: 4 additions & 0 deletions src/SlimMessageBus.Host/Consumer/MessageScopeWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
namespace SlimMessageBus.Host.Consumer;

/// <summary>
/// Used by consumers to wrap the message processing in a message scope (MSDI).
/// The <see cref="MessageScope.Current"/> is being adjusted as part of this wrapper.
/// </summary>
public sealed class MessageScopeWrapper : IMessageScope
{
private readonly ILogger _logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

using System.Reflection;

using SlimMessageBus.Host.Consumer;
using SlimMessageBus.Host.Hybrid;

public static class ServiceCollectionExtensions
Expand Down Expand Up @@ -95,6 +96,8 @@ public static IServiceCollection AddSlimMessageBus(this IServiceCollection servi
services.TryAddSingleton<IMessageTypeResolver, AssemblyQualifiedNameMessageTypeResolver>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusSettingsPostProcessor, ConsumerMethodPostProcessor>());

services.TryAddSingleton<IMessageScopeAccessor, MessageScopeAccessor>();

services.AddHostedService<MessageBusHostedService>();

return services;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
using SlimMessageBus.Host;
using SlimMessageBus.Host.Memory;

/// <summary>
/// This test verifies that the MessageBus.Current accessor works correctly and looks up in the current message scope.
/// </summary>
/// <param name="testOutputHelper"></param>
[Trait("Category", "Integration")]
public class MessageBusCurrentTests : BaseIntegrationTest<MessageBusCurrentTests>
public class MessageBusCurrentTests(ITestOutputHelper testOutputHelper) : BaseIntegrationTest<MessageBusCurrentTests>(testOutputHelper)
{
public MessageBusCurrentTests(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
{
}

protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration)
{
services.AddSlimMessageBus(mbb =>
Expand Down Expand Up @@ -42,37 +42,36 @@ public async Task Given_MemoryConsumer_When_MessageBusCurrentCalledInsideConsume
var valueHolder = scope.ServiceProvider.GetRequiredService<ValueHolder>();
valueHolder.Value.Should().Be(value);
}
}


public record SetValueCommand(Guid Value);
public record SetValueCommand(Guid Value);

public class SetValueCommandHandler : IRequestHandler<SetValueCommand>
{
public async Task OnHandle(SetValueCommand request)
public class SetValueCommandHandler : IRequestHandler<SetValueCommand>
{
// Some other logic here ...
public async Task OnHandle(SetValueCommand request)
{
// Some other logic here ...

// and then notify about the value change using the MessageBus.Current accessor which should look up in the current message scope
await MessageBus.Current.Publish(new ValueChangedEvent(request.Value));
// and then notify about the value change using the MessageBus.Current accessor which should look up in the current message scope
await MessageBus.Current.Publish(new ValueChangedEvent(request.Value));
}
}
}

public record ValueChangedEvent(Guid Value);
public record ValueChangedEvent(Guid Value);

public class ValueChangedEventHandler(ValueHolder valueHolder) : IRequestHandler<ValueChangedEvent>
{
public Task OnHandle(ValueChangedEvent request)
public class ValueChangedEventHandler(ValueHolder valueHolder) : IRequestHandler<ValueChangedEvent>
{
valueHolder.Value = request.Value;
return Task.CompletedTask;
public Task OnHandle(ValueChangedEvent request)
{
valueHolder.Value = request.Value;
return Task.CompletedTask;
}
}
}

/// <summary>
/// Holds the value (per scope lifetime).
/// </summary>
public class ValueHolder
{
public Guid Value { get; set; }
/// <summary>
/// Holds the value (per scope lifetime).
/// </summary>
public class ValueHolder
{
public Guid Value { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
namespace SlimMessageBus.Host.Integration.Test.MessageScopeAccessor;

using SlimMessageBus.Host;
using SlimMessageBus.Host.Consumer;
using SlimMessageBus.Host.Memory;

/// <summary>
/// This test verifies that the <see cref="IMessageScopeAccessor"/> correctly looks up the <see cref="IServiceProvider"/> for the current message scope.
/// </summary>
/// <param name="testOutputHelper"></param>
[Trait("Category", "Integration")]
public class MessageScopeAccessorTests(ITestOutputHelper testOutputHelper)
: BaseIntegrationTest<MessageScopeAccessorTests>(testOutputHelper)
{
protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration)
{
services.AddSlimMessageBus(mbb =>
{
mbb.AddChildBus("Memory", builder =>
{
builder
.WithProviderMemory()
.AutoDeclareFrom(Assembly.GetExecutingAssembly(), t => t.Namespace.Contains("MessageScopeAccessor"))
.PerMessageScopeEnabled();
});
mbb.AddServicesFromAssemblyContaining<TestMessageConsumer>();
});
services.AddScoped<TestValueHolder>();
}

[Fact]
public async Task Given_MemoryConsumer_When_MessageScopeAccessorCalledInsideConsumer_Then_LooksUpInTheMessageScope()
{
// Arrange
using var scope = ServiceProvider.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();

var value = Guid.NewGuid();

// Act
await bus.Publish(new TestMessage(value));

// Assert
var holder = scope.ServiceProvider.GetRequiredService<TestValueHolder>();
holder.ServiceProvider.Should().BeSameAs(holder.MessageScopeAccessorServiceProvider);
}

public record TestMessage(Guid Value);

public class TestMessageConsumer(TestValueHolder holder, IServiceProvider serviceProvider, IMessageScopeAccessor messageScopeAccessor) : IRequestHandler<TestMessage>
{
public Task OnHandle(TestMessage request)
{
holder.ServiceProvider = serviceProvider;
holder.MessageScopeAccessorServiceProvider = messageScopeAccessor.Current;
return Task.CompletedTask;
}
}

public class TestValueHolder
{
public IServiceProvider ServiceProvider { get; set; }
public IServiceProvider MessageScopeAccessorServiceProvider { get; set; }
}

}

0 comments on commit a0dc3a4

Please sign in to comment.