Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated RabbitMQ library to v7 #4337

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="All" />
</ItemGroup>

Expand Down
50 changes: 28 additions & 22 deletions Source/Csla.Channels.RabbitMq/ProxyListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// <summary>Handles replies from data portal server</summary>
//-----------------------------------------------------------------------

using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
Expand All @@ -25,7 +26,7 @@ internal class ProxyListener : IDisposable
/// <summary>
/// Gets or sets the channel (model) for RabbitMQ.
/// </summary>
protected IModel? Channel { get; set; }
protected IChannel? Channel { get; set; }

/// <summary>
/// Gets or sets the queue for inbound messages
Expand Down Expand Up @@ -73,7 +74,7 @@ public static ProxyListener GetListener(Uri queueUri)
#if NET8_0_OR_GREATER
[MemberNotNull(nameof(Connection), nameof(Channel), nameof(ReplyQueue))]
#endif
private void InitializeRabbitMQ()
private async Task InitializeRabbitMQ()
{
var factory = new ConnectionFactory { HostName = _queueUri.Host };
if (_queueUri.Port < 0)
Expand All @@ -83,8 +84,10 @@ private void InitializeRabbitMQ()
factory.UserName = userInfo[0];
if (userInfo.Length > 1)
factory.Password = userInfo[1];
Connection = factory.CreateConnection();
Channel = Connection.CreateModel();
#pragma warning disable CS8774 // 11/22/2024, Nullable analysis can't track nullability with async/await
Connection = await factory.CreateConnectionAsync();
Channel = await Connection.CreateChannelAsync();
#pragma warning restore CS8774
string[] query;
if (string.IsNullOrWhiteSpace(_queueUri.Query))
query = [];
Expand All @@ -93,25 +96,24 @@ private void InitializeRabbitMQ()
if (query.Length == 0 || !query[0].StartsWith("reply="))
{
IsNamedReplyQueue = false;
ReplyQueue = Channel.QueueDeclare();
#pragma warning disable CS8774 // 11/22/2024, Nullable analysis can't track nullability with async/await
ReplyQueue = await Channel.QueueDeclareAsync();
#pragma warning restore CS8774
}
else
{
IsNamedReplyQueue = true;
var split = query[0].Split('=');
ReplyQueue = Channel.QueueDeclare(
queue: split[1],
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
#pragma warning disable CS8774 // 11/22/2024, Nullable analysis can't track nullability with async/await
ReplyQueue = await Channel.QueueDeclareAsync(queue: split[1], durable: false, exclusive: false, autoDelete: false, arguments: null);
#pragma warning restore CS8774
}
}

private volatile bool IsListening;
private readonly Lock ListeningLock = LockFactory.Create();

public void StartListening()
public async Task StartListening()
{
if (IsListening) return;
lock (ListeningLock)
Expand All @@ -120,11 +122,17 @@ public void StartListening()
IsListening = true;
}

InitializeRabbitMQ();
await InitializeRabbitMQ();
Debug.Assert(Channel != null);

var consumer = new EventingBasicConsumer(Channel);
consumer.Received += (_, ea) =>
var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.ReceivedAsync += async (_, ea) =>
{
if (ea.BasicProperties.CorrelationId is null)
{
throw new InvalidOperationException($"{nameof(BasicDeliverEventArgs.BasicProperties)}.{nameof(IReadOnlyBasicProperties.CorrelationId)} == null");
}

if (Wip.WorkInProgress.TryRemove(ea.BasicProperties.CorrelationId, out WipItem? item))
{
item.Response = ea.Body.ToArray();
Expand All @@ -137,16 +145,14 @@ public void StartListening()
// listeners; if so requeue the message up to 9 times
if (IsNamedReplyQueue && ea.BasicProperties.Priority < 9)
{
ea.BasicProperties.Priority++;
Channel.BasicPublish(
exchange: "",
routingKey: ReplyQueue?.QueueName,
basicProperties: ea.BasicProperties,
body: ea.Body);
var updatedBasicProperties = new BasicProperties(ea.BasicProperties);
++updatedBasicProperties.Priority;

await Channel.BasicPublishAsync(exchange: "", routingKey: ReplyQueue!.QueueName, mandatory: true, basicProperties: updatedBasicProperties, body: ea.Body.ToArray());
}
}
};
Channel.BasicConsume(queue: ReplyQueue?.QueueName, autoAck: true, consumer: consumer);
await Channel.BasicConsumeAsync(queue: ReplyQueue!.QueueName, autoAck: true, consumer: consumer);
}

public void Dispose()
Expand Down
66 changes: 34 additions & 32 deletions Source/Csla.Channels.RabbitMq/RabbitMqPortal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// <summary>Exposes server-side DataPortal functionality through RabbitMQ</summary>
//-----------------------------------------------------------------------

using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Security.Principal;
using Csla.Core;
Expand Down Expand Up @@ -34,15 +35,15 @@ internal RabbitMqPortal(ApplicationContext applicationContext, IDataPortalServer
private readonly ApplicationContext _applicationContext;

private IConnection? Connection;
private IModel? Channel;
private IChannel? Channel;
private string? DataPortalQueueName;

private Uri DataPortalUri { get; set; }

#if NET8_0_OR_GREATER
[MemberNotNull(nameof(DataPortalUri), nameof(DataPortalQueueName), nameof(Connection), nameof(Channel))]
#endif
private void InitializeRabbitMQ()
private async Task InitializeRabbitMQ()
{
if (Connection == null || DataPortalUri == null || Channel == null || DataPortalQueueName == null)
{
Expand All @@ -58,39 +59,42 @@ private void InitializeRabbitMQ()
factory.UserName = userInfo[0];
if (userInfo.Length > 1)
factory.Password = userInfo[1];
Connection = factory.CreateConnection();
Channel = Connection.CreateModel();
#pragma warning disable CS8774 // 11/22/2024, Nullable analysis can't track nullability with async/await
Connection = await factory.CreateConnectionAsync();
Channel = await Connection.CreateChannelAsync();
#pragma warning restore CS8774
}
}

/// <summary>
/// Start processing inbound messages.
/// </summary>
public void StartListening()
public async Task StartListening()
{
InitializeRabbitMQ();
Channel?.QueueDeclare(
queue: DataPortalQueueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);

var consumer = new EventingBasicConsumer(Channel);
consumer.Received += (_, ea) =>
{
InvokePortal(ea, ea.Body.ToArray());
};
Channel.BasicConsume(queue: DataPortalQueueName, autoAck: true, consumer: consumer);
await InitializeRabbitMQ();
await Channel!.QueueDeclareAsync(queue: DataPortalQueueName!, durable: false, exclusive: false, autoDelete: false);

var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.ReceivedAsync += async (_, ea) => await InvokePortal(ea, ea.Body.ToArray());
await Channel.BasicConsumeAsync(queue: DataPortalQueueName!, autoAck: true, consumer: consumer);
}

private async void InvokePortal(BasicDeliverEventArgs ea, byte[] requestData)
private async Task InvokePortal(BasicDeliverEventArgs ea, byte[] requestData)
{
if (ea.BasicProperties.ReplyTo is null)
{
throw new InvalidOperationException($"{nameof(BasicDeliverEventArgs.BasicProperties)}.{nameof(IReadOnlyBasicProperties.ReplyTo)} == null");
}
if (ea.BasicProperties.CorrelationId is null)
{
throw new InvalidOperationException($"{nameof(BasicDeliverEventArgs.BasicProperties)}.{nameof(IReadOnlyBasicProperties.CorrelationId)} == null");
}

var result = _applicationContext.CreateInstanceDI<DataPortalResponse>();
try
{
var request = _applicationContext.GetRequiredService<ISerializationFormatter>().Deserialize(requestData);
result = await CallPortal(ea.BasicProperties.Type, request);
result = await CallPortal(ea.BasicProperties.Type ?? throw new InvalidOperationException($"{nameof(BasicDeliverEventArgs.BasicProperties)}.{nameof(IReadOnlyBasicProperties.Type)} == null"), request);
}
catch (Exception ex)
{
Expand All @@ -100,29 +104,27 @@ private async void InvokePortal(BasicDeliverEventArgs ea, byte[] requestData)
try
{
var response = _applicationContext.GetRequiredService<ISerializationFormatter>().Serialize(result);
SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response);
await SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response);
}
catch (Exception ex)
{
result = _applicationContext.CreateInstanceDI<DataPortalResponse>();
result.ErrorData = _applicationContext.CreateInstanceDI<DataPortalErrorInfo>(ex);
var response = _applicationContext.GetRequiredService<ISerializationFormatter>().Serialize(result);
SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response);
await SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response);
}
}

private void SendMessage(string target, string correlationId, byte[] request)
private async Task SendMessage(string target, string correlationId, byte[] request)
{
InitializeRabbitMQ();
await InitializeRabbitMQ();
if (Channel is null)
throw new InvalidOperationException($"{nameof(Channel)} == null");
var props = Channel.CreateBasicProperties();
props.CorrelationId = correlationId;
Channel.BasicPublish(
exchange: "",
routingKey: target,
basicProperties: props,
body: request);
var props = new BasicProperties
{
CorrelationId = correlationId
};
await Channel.BasicPublishAsync(exchange: "", routingKey: target, mandatory: true, basicProperties: props, body: request);
}

private async Task<DataPortalResponse> CallPortal(string operation, object request)
Expand Down
36 changes: 21 additions & 15 deletions Source/Csla.Channels.RabbitMq/RabbitMqProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public RabbitMqProxy(ApplicationContext applicationContext, RabbitMqProxyOptions
/// <summary>
/// Gets or sets the channel (model) for RabbitMQ.
/// </summary>
protected IModel? Channel { get; set; }
protected IChannel? Channel { get; set; }

/// <summary>
/// Gets or sets the name of the data portal
Expand All @@ -71,7 +71,7 @@ public RabbitMqProxy(ApplicationContext applicationContext, RabbitMqProxyOptions
#if NET8_0_OR_GREATER
[MemberNotNull(nameof(Connection), nameof(Channel), nameof(QueueListener))]
#endif
protected virtual void InitializeRabbitMQ()
protected virtual async Task InitializeRabbitMQ()
{
if (Connection == null || Channel == null || QueueListener == null)
{
Expand All @@ -91,12 +91,14 @@ protected virtual void InitializeRabbitMQ()
factory.UserName = userInfo[0];
if (userInfo.Length > 1)
factory.Password = userInfo[1];
Connection = factory.CreateConnection();
Channel = Connection.CreateModel();
#pragma warning disable CS8774 // 11/22/2024, Nullable analysis can't track nullability with async/await
Connection = await factory.CreateConnectionAsync();
Channel = await Connection.CreateChannelAsync();
#pragma warning restore CS8774
if (QueueListener == null)
{
QueueListener = ProxyListener.GetListener(url);
QueueListener.StartListening();
await QueueListener.StartListening();
}
}
}
Expand Down Expand Up @@ -125,7 +127,7 @@ public override async Task<DataPortalResult> Create(Type objectType, object crit

try
{
InitializeRabbitMQ();
await InitializeRabbitMQ();
return await base.Create(objectType, criteria, context, isSync);
}
finally
Expand All @@ -151,7 +153,7 @@ public override async Task<DataPortalResult> Fetch(Type objectType, object crite

try
{
InitializeRabbitMQ();
await InitializeRabbitMQ();
return await base.Fetch(objectType, criteria, context, isSync);
}
finally
Expand All @@ -176,7 +178,7 @@ public override async Task<DataPortalResult> Update(object obj, DataPortalContex

try
{
InitializeRabbitMQ();
await InitializeRabbitMQ();
return await base.Update(obj, context, isSync);
}
finally
Expand All @@ -202,7 +204,7 @@ public override async Task<DataPortalResult> Delete(Type objectType, object crit

try
{
InitializeRabbitMQ();
await InitializeRabbitMQ();
return await base.Delete(objectType, criteria, context, isSync);
}
finally
Expand All @@ -226,7 +228,7 @@ protected override async Task<byte[]> CallDataPortalServer(byte[] serialized, st
var resetEvent = new Threading.AsyncManualResetEvent();
var wip = Wip.WorkInProgress.GetOrAdd(correlationId, new WipItem(resetEvent));

SendMessage(QueueListener!.ReplyQueue!.QueueName, correlationId, operation, serialized);
await SendMessage(QueueListener!.ReplyQueue!.QueueName, correlationId, operation, serialized);

var timeout = Task.Delay(Options.Timeout);
if (await Task.WhenAny(wip.ResetEvent.WaitAsync(), timeout) == timeout)
Expand All @@ -235,16 +237,20 @@ protected override async Task<byte[]> CallDataPortalServer(byte[] serialized, st
return wip.Response!;
}

private void SendMessage(string sender, string correlationId, string operation, byte[] request)
private async Task SendMessage(string sender, string correlationId, string operation, byte[] request)
{
var props = Channel!.CreateBasicProperties();
var props = new BasicProperties
{
CorrelationId = correlationId,
Type = operation
};
if (!string.IsNullOrWhiteSpace(sender))
props.ReplyTo = sender;
props.CorrelationId = correlationId;
props.Type = operation;
Channel.BasicPublish(

await Channel!.BasicPublishAsync(
exchange: "",
routingKey: DataPortalQueueName,
mandatory: true,
basicProperties: props,
body: request);
}
Expand Down
Loading