Skip to content

Commit

Permalink
Use RecyclableMemoryStream instead of MemoryStream
Browse files Browse the repository at this point in the history
  • Loading branch information
Odonno committed Aug 12, 2023
1 parent 638cf7f commit 781367b
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 60 deletions.
35 changes: 26 additions & 9 deletions src/Websocket.Client/ResponseMessage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Net.WebSockets;
using System.IO;
using System.Net.WebSockets;

namespace Websocket.Client
{
Expand All @@ -7,22 +8,30 @@ namespace Websocket.Client
/// </summary>
public class ResponseMessage
{
private ResponseMessage(byte[] binary, string text, WebSocketMessageType messageType)
private readonly byte[] _binary;

private ResponseMessage(MemoryStream memoryStream, byte[] binary, string text, WebSocketMessageType messageType)
{
Binary = binary;
Stream = memoryStream;
_binary = binary;
Text = text;
MessageType = messageType;
}

/// <summary>
/// Received text message (only if type = WebSocketMessageType.Text)
/// Received text message (only if type = <see cref="WebSocketMessageType.Text"/>)
/// </summary>
public string Text { get; }

/// <summary>
/// Received text message (only if type = WebSocketMessageType.Binary)
/// Received text message (only if type = <see cref="WebSocketMessageType.Binary"/>)
/// </summary>
public byte[] Binary { get; }
public byte[] Binary => Stream is null ? _binary : Stream.ToArray();

/// <summary>
/// Received stream message (only if type = <see cref="WebSocketMessageType.Binary"/> and <see cref="WebsocketClient.IsStreamDisposedAutomatically"/> = false)
/// </summary>
public MemoryStream Stream { get; }

/// <summary>
/// Current message type (Text or Binary)
Expand All @@ -47,15 +56,23 @@ public override string ToString()
/// </summary>
public static ResponseMessage TextMessage(string data)
{
return new ResponseMessage(null, data, WebSocketMessageType.Text);
return new ResponseMessage(null, null, data, WebSocketMessageType.Text);
}

/// <summary>
/// Create binary response message
/// </summary>
public static ResponseMessage BinaryMessage(byte[] data)
{
return new ResponseMessage(data, null, WebSocketMessageType.Binary);
return new ResponseMessage(null, data, null, WebSocketMessageType.Binary);
}

/// <summary>
/// Create stream response message
/// </summary>
public static ResponseMessage BinaryStreamMessage(MemoryStream memoryStream)
{
return new ResponseMessage(memoryStream, null, null, WebSocketMessageType.Binary);
}
}
}
}
1 change: 1 addition & 0 deletions src/Websocket.Client/Websocket.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.3.2" />
<PackageReference Include="System.Reactive" Version="5.0.0" />
<PackageReference Include="System.Threading.Channels" Version="5.0.0" />
</ItemGroup>
Expand Down
46 changes: 46 additions & 0 deletions src/Websocket.Client/WebsocketClient.Sending.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Websocket.Client.Logging;
Expand Down Expand Up @@ -56,6 +58,45 @@ public void Send(ArraySegment<byte> message)
_messagesBinaryToSendQueue.Writer.TryWrite(message);
}

/// <summary>
/// Send text message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Text message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendAsync(string message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(message, cancellationToken);
}

/// <summary>
/// Send binary message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Binary message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendAsync(byte[] message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesBinaryToSendQueue.Writer.WriteAsync(new ArraySegment<byte>(message), cancellationToken);
}

/// <summary>
/// Send binary message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Binary message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendAsync(ArraySegment<byte> message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesBinaryToSendQueue.Writer.WriteAsync(message, cancellationToken);
}

/// <summary>
/// Send text message to the websocket channel.
/// It doesn't use a sending queue,
Expand Down Expand Up @@ -204,8 +245,13 @@ private async Task SendInternal(string message)
}

Logger.Trace(L($"Sending: {message}"));
#if NETSTANDARD2_0
var buffer = GetEncoding().GetBytes(message);
var messageSegment = new ArraySegment<byte>(buffer);
#else
ReadOnlyMemory<byte> messageSegment = MemoryMarshal.AsMemory<byte>(GetEncoding().GetBytes(message));
#endif

await _client
.SendAsync(messageSegment, WebSocketMessageType.Text, true, _cancellation.Token)
.ConfigureAwait(false);
Expand Down
85 changes: 34 additions & 51 deletions src/Websocket.Client/WebsocketClient.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using Microsoft.IO;
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.WebSockets;
using System.Reactive.Linq;
using System.Reactive.Subjects;
Expand All @@ -24,6 +24,7 @@ public partial class WebsocketClient : IWebsocketClient

private readonly WebsocketAsyncLock _locker = new WebsocketAsyncLock();
private readonly Func<Uri, CancellationToken, Task<WebSocket>> _connectionFactory;
private static readonly RecyclableMemoryStreamManager _memoryStreamManager = new RecyclableMemoryStreamManager();

private Uri _url;
private Timer _lastChanceTimer;
Expand Down Expand Up @@ -159,6 +160,15 @@ public bool IsReconnectionEnabled
/// </summary>
public bool IsTextMessageConversionEnabled { get; set; } = true;

/// <summary>
/// Enable or disable automatic <see cref="MemoryStream.Dispose(bool)"/> of the <see cref="MemoryStream"/>
/// after sending data (only available for binary response).
/// Setting value to false allows you to access the stream directly.
/// <warning>However, keep in mind that you need to handle the dispose yourself.</warning>
/// Default: true
/// </summary>
public bool IsStreamDisposedAutomatically { get; set; } = true;

/// <inheritdoc />
public Encoding MessageEncoding { get; set; }

Expand Down Expand Up @@ -416,71 +426,43 @@ private async Task Listen(WebSocket client, CancellationToken token)
{
// define buffer here and reuse, to avoid more allocation
const int chunkSize = 1024 * 4;
#if NETSTANDARD2_0
var buffer = new ArraySegment<byte>(new byte[chunkSize]);
#else
var buffer = new Memory<byte>(new byte[chunkSize]);
#endif

do
{
#if NETSTANDARD2_0
WebSocketReceiveResult result;
byte[] resultArrayWithTrailing = null;
var resultArraySize = 0;
var isResultArrayCloned = false;
MemoryStream ms = null;
#else
ValueWebSocketReceiveResult result;
#endif
var ms = _memoryStreamManager.GetStream() as RecyclableMemoryStream;

while (true)
{
result = await client.ReceiveAsync(buffer, token);
var currentChunk = buffer.Array;
var currentChunkSize = result.Count;

var isFirstChunk = resultArrayWithTrailing == null;
if (isFirstChunk)
{
// first chunk, use buffer as reference, do not allocate anything
resultArraySize += currentChunkSize;
resultArrayWithTrailing = currentChunk;
isResultArrayCloned = false;
}
else if (currentChunk == null)
{
// weird chunk, do nothing
}
else
{
// received more chunks, lets merge them via memory stream
if (ms == null)
{
// create memory stream and insert first chunk
ms = new MemoryStream();
ms.Write(resultArrayWithTrailing, 0, resultArraySize);
}

// insert current chunk
ms.Write(currentChunk, buffer.Offset, currentChunkSize);
}
#if NETSTANDARD2_0
ms.Write(buffer.AsSpan(0, result.Count));
#else
ms.Write(buffer[..result.Count].Span);
#endif

if (result.EndOfMessage)
{
break;
}

if (isResultArrayCloned)
continue;

// we got more chunks incoming, need to clone first chunk
resultArrayWithTrailing = resultArrayWithTrailing?.ToArray();
isResultArrayCloned = true;
}

ms?.Seek(0, SeekOrigin.Begin);
ms.Seek(0, SeekOrigin.Begin);

ResponseMessage message;
bool shouldDisposeStream = true;

if (result.MessageType == WebSocketMessageType.Text && IsTextMessageConversionEnabled)
{
var data = ms != null ?
GetEncoding().GetString(ms.ToArray()) :
resultArrayWithTrailing != null ?
GetEncoding().GetString(resultArrayWithTrailing, 0, resultArraySize) :
null;
var data = GetEncoding().GetString(ms.ToArray());

message = ResponseMessage.TextMessage(data);
}
Expand Down Expand Up @@ -520,18 +502,19 @@ await StopInternal(client, WebSocketCloseStatus.NormalClosure, "Closing",
}
else
{
if (ms != null)
if (IsStreamDisposedAutomatically)
{
message = ResponseMessage.BinaryMessage(ms.ToArray());
}
else
{
Array.Resize(ref resultArrayWithTrailing, resultArraySize);
message = ResponseMessage.BinaryMessage(resultArrayWithTrailing);
message = ResponseMessage.BinaryStreamMessage(ms);
shouldDisposeStream = false;
}
}

ms?.Dispose();
if (shouldDisposeStream)
ms.Dispose();

Logger.Trace(L($"Received: {message}"));
_lastReceivedMsg = DateTime.UtcNow;
Expand Down

0 comments on commit 781367b

Please sign in to comment.