Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host.Sql] SQL database transport (initial) #221

Merged
merged 1 commit into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i

- [Introduction](docs/intro.md)
- Providers:
- [Apache Kafka](docs/provider_kafka.md)
- [Azure EventHubs](docs/provider_azure_eventhubs.md)
- [Azure ServiceBus](docs/provider_azure_servicebus.md)
- [Apache Kafka](docs/provider_kafka.md)
- [Hybrid](docs/provider_hybrid.md)
- [Memory](docs/provider_memory.md)
- [MQTT](docs/provider_mqtt.md)
- [Memory](docs/provider_memory.md)
- [RabbitMQ](docs/provider_rabbitmq.md)
- [Redis](docs/provider_redis.md)
- [SQL](docs/provider_sql.md)
- Plugins:
- [Serialization](docs/serialization.md)
- [Transactional Outbox](docs/plugin_outbox.md)
Expand All @@ -74,6 +75,7 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i
| `.Host.MQTT` | Transport provider for MQTT | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.MQTT.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.MQTT) |
| `.Host.RabbitMQ` | Transport provider for RabbitMQ | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.RabbitMQ.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.RabbitMQ) |
| `.Host.Redis` | Transport provider for Redis | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Redis.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Redis) |
| `.Host.Sql` | Transport provider implementation for SQL database message passing | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Sql.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Sql) |
| **Serialization** | | |
| `.Host.Serialization.Json` | Serialization plugin for JSON (Newtonsoft.Json library) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Serialization.Json.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.Json) |
| `.Host.Serialization.SystemTextJson` | Serialization plugin for JSON (System.Text.Json library) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Serialization.SystemTextJson.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Serialization.SystemTextJson) |
Expand Down
1 change: 1 addition & 0 deletions build/tasks.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ $projects = @(
"SlimMessageBus.Host.Redis",
"SlimMessageBus.Host.Mqtt",
"SlimMessageBus.Host.RabbitMQ",
"SlimMessageBus.Host.Sql",

"SlimMessageBus.Host.FluentValidation",

Expand Down
11 changes: 6 additions & 5 deletions docs/NuGet.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ SlimMessageBus additionally provides request-response implementation over messag
Transports:

- Apache Kafka
- Azure Service Bus
- Azure Event Hub
- Redis
- Azure Service Bus
- Hybrid (composition of the bus out of many transports)
- In-Memory transport (domain events, mediator)
- MQTT / Azure IoT Hub
- RabbitMQ
- In-Memory transport (domain events, mediator)
- Hybrid (composition of the bus out of many transports)
- Redis
- SQL (MS SQL, PostgreSql)

Plugins:

- Message validation via Fluent Validation
- Transactional Outbox pattern
- Transactional Outbox pattern (SQL, DbContext)
- Serialization using JSON, Avro, ProtoBuf
- AsyncAPI specification generation

Expand Down
8 changes: 6 additions & 2 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
- [Introduction](intro.md)
- Providers
- [Apache Kafka](provider_kafka.md)
- [Azure Service Bus](provider_azure_servicebus.md)
- [Azure Event Hubs](provider_azure_eventhubs.md)
- [Redis](provider_redis.md)
- [Azure Service Bus](provider_azure_servicebus.md)
- [Hybrid](provider_hybrid.md)
- [MQTT](provider_mqtt.md)
- [Memory](provider_memory.md)
- [RabbitMq](provider_rabbitmq.md)
- [Redis](provider_redis.md)
- [SQL](provider_sql.md)
- [Serialization Plugins](serialization.md)
66 changes: 66 additions & 0 deletions docs/provider_sql.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# SQL transport provider for SlimMessageBus <!-- omit in toc -->

Please read the [Introduction](intro.md) before reading this provider documentation.

- [About](#about)
- [SQL Compatibility](#sql-compatibility)
- [Configuration](#configuration)
- [How it works](#how-it-works)

## About

The SQL transport provider allows to leverage a single shared SQL database instance as a messaging broker for all the collaborating producers and consumers.

This transport might be optimal for simpler applications that do not have a dedicated messaging infrastructure available, do not have high throughput needs, or want to target a simplistic deployment model.

When the application grows over time, and given that SMB is an abstraction, the migration from SQL towards a dedicated messaging system should be super easy.

## SQL Compatibility

This transport has been tested on SQL Azure (T-SQL), and should work on most other databases.
If you see an issue, please raise an github issue.

## Configuration

ToDo: Finish

The configuration is arranged via the `.WithProviderMqtt(cfg => {})` method on the message bus builder.

```cs
services.AddSlimMessageBus(mbb =>
{
mbb.WithProviderMqtt(cfg =>
{
cfg.ClientBuilder
.WithTcpServer(configuration["Mqtt:Server"], int.Parse(configuration["Mqtt:Port"]))
.WithTls()
.WithCredentials(configuration["Mqtt:Username"], configuration["Mqtt:Password"])
// Use MQTTv5 to use message headers (if the broker supports it)
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500);
});

mbb.AddServicesFromAssemblyContaining<PingConsumer>();
mbb.AddJsonSerializer();
});
```

The `ClientBuilder` property (of type `MqttClientOptionsBuilder`) is used to configure the underlying [MQTTnet library client](https://github.com/dotnet/MQTTnet/wiki/Client).
Please consult the MQTTnet library docs for more configuration options.

## How it works

The same SQL database instance is required for all the producers and consumers to collaborate.
Therefore ensure all of the service instances point to the same database cluster.

- Single table is used to store all the exchanged messages (by default table is called `Messages`).
- Producers send messages to the messages table.
- There are two types of entities (queues, and topics for pub/sub).
- In the case of a topic:
- Each subscription gets a copy of the message.
- Subscription has a lifetime, and can expire after certain idle time. Along with it, all the messages placed on the subscription.
- Consumers (queue consumers, or subscribers in pub/sub) long poll the table to pick up their respective message.
- Queue consumers compete for the message, and ensure only one consumer instance is processing the message.
- Topic subscribers complete for the message within the same subscription.
- In the future we might consider:
- Table per each entity, so that we can minimize table locking.
- Sessions to ensure order of processing within the same message session ID - similar to how Azure Service Bus feature or Apache Kafka topic-partition works.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ protected override async Task OnStart()
}
}

protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders = null, CancellationToken cancellationToken = default)
protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
{
AssertActive();

Expand Down
54 changes: 23 additions & 31 deletions src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,28 @@ public ServiceBusMessageBus(MessageBusSettings settings, ServiceBusMessageBusSet
OnBuildProvider();
}

protected override async ValueTask DisposeAsyncCore()
{
await base.DisposeAsyncCore().ConfigureAwait(false);

var producers = _producerByPath.ClearAndSnapshot();
if (producers.Count > 0)
{
var producerCloseTasks = producers.Select(x =>
{
_logger.LogDebug("Closing sender client for path {Path}", x.EntityPath);
return x.CloseAsync();
});
await Task.WhenAll(producerCloseTasks).ConfigureAwait(false);
}

if (_client != null)
{
await _client.DisposeAsync().ConfigureAwait(false);
_client = null;
}
}

protected override IMessageBusSettingsValidationService ValidationService => new ServiceBusMessageBusSettingsValidationService(Settings, ProviderSettings);

public override async Task ProvisionTopology()
Expand All @@ -26,7 +48,6 @@ public override async Task ProvisionTopology()
await provisioningService.ProvisionTopology(); // provisining happens asynchronously
}


#region Overrides of MessageBusBase

protected override void Build()
Expand Down Expand Up @@ -92,29 +113,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
}
}

protected override async ValueTask DisposeAsyncCore()
{
await base.DisposeAsyncCore().ConfigureAwait(false);

var producers = _producerByPath.ClearAndSnapshot();
if (producers.Count > 0)
{
var producerCloseTasks = producers.Select(x =>
{
_logger.LogDebug("Closing sender client for path {Path}", x.EntityPath);
return x.CloseAsync();
});
await Task.WhenAll(producerCloseTasks).ConfigureAwait(false);
}

if (_client != null)
{
await _client.DisposeAsync().ConfigureAwait(false);
_client = null;
}
}

protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders = null, CancellationToken cancellationToken = default)
protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
{
var messageType = message?.GetType();

Expand Down Expand Up @@ -172,12 +171,5 @@ private void InvokeMessageModifier(object message, Type messageType, ServiceBusM
}
}

public override Task ProduceRequest(object request, IDictionary<string, object> requestHeaders, string path, ProducerSettings producerSettings)
{
if (requestHeaders is null) throw new ArgumentNullException(nameof(requestHeaders));

return base.ProduceRequest(request, requestHeaders, path, producerSettings);
}

#endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
/// <summary>
/// The bus factory method.
/// </summary>
public Func<MessageBusSettings, IMessageBus> BusFactory { get; private set; }
public Func<MessageBusSettings, IMessageBusProvider> BusFactory { get; private set; }

public IList<Action<IServiceCollection>> PostConfigurationActions { get; } = new List<Action<IServiceCollection>>();

Expand Down Expand Up @@ -230,7 +230,7 @@
return this;
}

public MessageBusBuilder WithProvider(Func<MessageBusSettings, IMessageBus> provider)
public MessageBusBuilder WithProvider(Func<MessageBusSettings, IMessageBusProvider> provider)
{
BusFactory = provider ?? throw new ArgumentNullException(nameof(provider));
return this;
Expand Down Expand Up @@ -325,12 +325,12 @@
child.MergeFrom(Settings);
}

builderAction?.Invoke(child);

Check warning on line 328 in src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs

View workflow job for this annotation

GitHub Actions / build

Remove this unnecessary check for null. (https://rules.sonarsource.com/csharp/RSPEC-2589)

Check warning on line 328 in src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs

View workflow job for this annotation

GitHub Actions / build

Remove this unnecessary check for null. (https://rules.sonarsource.com/csharp/RSPEC-2589)

Check warning on line 328 in src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs

View workflow job for this annotation

GitHub Actions / build

Remove this unnecessary check for null. (https://rules.sonarsource.com/csharp/RSPEC-2589)

return this;
}

public IMessageBus Build()
public IMessageBusProvider Build()
{
if (BusFactory is null)
{
Expand All @@ -339,4 +339,4 @@
}
return BusFactory(Settings);
}
}
}
7 changes: 7 additions & 0 deletions src/SlimMessageBus.Host.Configuration/IMessageBusProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace SlimMessageBus.Host;

public interface IMessageBusProvider
{
string Name { get; }
MessageBusSettings Settings { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<Description>Core configuration interfaces of SlimMessageBus</Description>
<PackageTags>SlimMessageBus</PackageTags>
<RootNamespace>SlimMessageBus.Host</RootNamespace>
<Version>2.0.6-rc1</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected override async ValueTask DisposeAsyncCore()
}
}

protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders = null, CancellationToken cancellationToken = default)
protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
{
AssertActive();

Expand Down
15 changes: 8 additions & 7 deletions src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,21 @@ private IMessageProcessorQueue CreateMessageProcessorQueue(IMessageProcessor<obj
: new MessageProcessorQueue(messageProcessor, LoggerFactory.CreateLogger<MessageProcessorQueue>(), CancellationToken);
}

protected override Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders = null, CancellationToken cancellationToken = default)
protected override Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
=> Task.CompletedTask; // Not used

public override Task ProduceResponse(string requestId, object request, IReadOnlyDictionary<string, object> requestHeaders, object response, Exception responseException, IMessageTypeConsumerInvokerSettings consumerInvoker)
=> Task.CompletedTask; // Not used to responses

protected override Task PublishInternal(object message, string path, IDictionary<string, object> messageHeaders, CancellationToken cancellationToken, ProducerSettings producerSettings, IServiceProvider currentServiceProvider)
=> ProduceInternal<object>(message, path, messageHeaders, currentServiceProvider, isPublish: true, cancellationToken);
protected override Task PublishInternal(object message, string path, IDictionary<string, object> messageHeaders, CancellationToken cancellationToken, ProducerSettings producerSettings, IMessageBusTarget targetBus)
=> ProduceInternal<object>(message, path, messageHeaders, targetBus, isPublish: true, cancellationToken);

protected override Task<TResponseMessage> SendInternal<TResponseMessage>(object request, string path, Type requestType, Type responseType, ProducerSettings producerSettings, DateTimeOffset created, DateTimeOffset expires, string requestId, IDictionary<string, object> requestHeaders, IServiceProvider currentServiceProvider, CancellationToken cancellationToken)
=> ProduceInternal<TResponseMessage>(request, path, requestHeaders, currentServiceProvider, isPublish: false, cancellationToken);
protected override Task<TResponseMessage> SendInternal<TResponseMessage>(object request, string path, Type requestType, Type responseType, ProducerSettings producerSettings, DateTimeOffset created, DateTimeOffset expires, string requestId, IDictionary<string, object> requestHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken)
=> ProduceInternal<TResponseMessage>(request, path, requestHeaders, targetBus, isPublish: false, cancellationToken);

#endregion

private async Task<TResponseMessage> ProduceInternal<TResponseMessage>(object message, string path, IDictionary<string, object> requestHeaders, IServiceProvider currentServiceProvider, bool isPublish, CancellationToken cancellationToken)
private async Task<TResponseMessage> ProduceInternal<TResponseMessage>(object message, string path, IDictionary<string, object> requestHeaders, IMessageBusTarget targetBus, bool isPublish, CancellationToken cancellationToken)
{
var messageType = message.GetType();
var producerSettings = GetProducerSettings(messageType);
Expand Down Expand Up @@ -166,8 +166,9 @@ private async Task<TResponseMessage> ProduceInternal<TResponseMessage>(object me
return default;
}

var serviceProvider = targetBus?.ServiceProvider ?? Settings.ServiceProvider;
// Execute the message processor in synchronous manner
var r = await messageProcessor.ProcessMessage(transportMessage, messageHeadersReadOnly, currentServiceProvider: currentServiceProvider, cancellationToken: cancellationToken);
var r = await messageProcessor.ProcessMessage(transportMessage, messageHeadersReadOnly, currentServiceProvider: serviceProvider, cancellationToken: cancellationToken);
if (r.Exception != null)
{
// We want to pass the same exception to the sender as it happened in the handler/consumer
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
return Task.CompletedTask;
}

protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders = null, CancellationToken cancellationToken = default)
protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
{
var m = new MqttApplicationMessage
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public DbContextOutboxRepository(ILogger<SqlOutboxRepository> logger, SqlOutboxS
public override async ValueTask BeginTransaction()
{
ValidateNoTransactionStarted();
await DbContext.Database.BeginTransactionAsync(Settings.TransactionIsolationLevel);
await DbContext.Database.BeginTransactionAsync(Settings.SqlSettings.TransactionIsolationLevel);
}

public override ValueTask CommitTransaction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public static MessageBusBuilder AddOutboxUsingSql<TOutboxRepository>(this Messag
services.Replace(ServiceDescriptor.Scoped<IOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));

services.TryAddSingleton<SqlOutboxTemplate>();
services.TryAddSingleton<IOutboxMigrationService, SqlOutboxMigrationService>();
});
return mbb;
}
Expand Down
Loading
Loading