Skip to content

Commit

Permalink
Add SendTextAsBinary method
Browse files Browse the repository at this point in the history
  • Loading branch information
Odonno committed Aug 12, 2023
1 parent 781367b commit df86961
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 14 deletions.
36 changes: 36 additions & 0 deletions src/Websocket.Client/RequestMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;

namespace Websocket.Client
{
internal abstract class RequestMessage { }

internal class RequestTextMessage : RequestMessage
{
public string Text { get; }

public RequestTextMessage(string text)
{
Text = text;
}
}

internal class RequestBinaryMessage : RequestMessage
{
public byte[] Data { get; }

public RequestBinaryMessage(byte[] data)
{
Data = data;
}
}

internal class RequestBinarySegmentMessage : RequestMessage
{
public ArraySegment<byte> Data { get; }

public RequestBinarySegmentMessage(ArraySegment<byte> data)
{
Data = data;
}
}
}
111 changes: 97 additions & 14 deletions src/Websocket.Client/WebsocketClient.Sending.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Websocket.Client
{
public partial class WebsocketClient
{
private readonly Channel<string> _messagesTextToSendQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
private readonly Channel<RequestMessage> _messagesTextToSendQueue = Channel.CreateUnbounded<RequestMessage>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false
Expand All @@ -31,7 +31,7 @@ public void Send(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));

_messagesTextToSendQueue.Writer.TryWrite(message);
_messagesTextToSendQueue.Writer.TryWrite(new RequestTextMessage(message));
}

/// <summary>
Expand Down Expand Up @@ -68,7 +68,7 @@ public ValueTask SendAsync(string message, CancellationToken cancellationToken =
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(message, cancellationToken);
return _messagesTextToSendQueue.Writer.WriteAsync(new RequestTextMessage(message), cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -108,7 +108,7 @@ public Task SendInstant(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));

return SendInternalSynchronized(message);
return SendInternalSynchronized(new RequestTextMessage(message));
}

/// <summary>
Expand All @@ -123,6 +123,60 @@ public Task SendInstant(byte[] message)
return SendInternalSynchronized(new ArraySegment<byte>(message));
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
public void SendTextAsBinary(byte[] message)
{
Validations.Validations.ValidateInput(message, nameof(message));

_messagesTextToSendQueue.Writer.TryWrite(new RequestBinaryMessage(message));
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
public void SendTextAsBinary(ArraySegment<byte> message)
{
Validations.Validations.ValidateInput(message, nameof(message));

_messagesTextToSendQueue.Writer.TryWrite(new RequestBinarySegmentMessage(message));
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendTextAsBinaryAsync(byte[] message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(new RequestBinaryMessage(message), cancellationToken);
}

/// <summary>
/// Send already converted text message to the websocket channel.
/// Use this method to avoid double serialization of the text message.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Message to be sent</param>
/// <param name="cancellationToken">The cancellationToken enables graceful cancellation of asynchronous operations</param>
public ValueTask SendTextAsBinaryAsync(ArraySegment<byte> message, CancellationToken cancellationToken = default)
{
Validations.Validations.ValidateInput(message, nameof(message));

return _messagesTextToSendQueue.Writer.WriteAsync(new RequestBinarySegmentMessage(message), cancellationToken);
}

/// <summary>
/// Stream/publish fake message (via 'MessageReceived' observable).
/// Use for testing purposes to simulate a server message.
Expand Down Expand Up @@ -228,15 +282,15 @@ private void StartBackgroundThreadForSendingBinary()
_ = Task.Factory.StartNew(_ => SendBinaryFromQueue(), TaskCreationOptions.LongRunning, _cancellationTotal.Token);
}

private async Task SendInternalSynchronized(string message)
private async Task SendInternalSynchronized(RequestMessage message)
{
using (await _locker.LockAsync())
{
await SendInternal(message);
}
}

private async Task SendInternal(string message)
private async Task SendInternal(RequestMessage message)
{
if (!IsClientConnected())
{
Expand All @@ -246,14 +300,43 @@ private async Task SendInternal(string message)

Logger.Trace(L($"Sending: {message}"));
#if NETSTANDARD2_0
var buffer = GetEncoding().GetBytes(message);
var messageSegment = new ArraySegment<byte>(buffer);
ArraySegment<byte> payload;

switch (message)
{
case RequestTextMessage textMessage:
payload = new ArraySegment<byte>(GetEncoding().GetBytes(textMessage.Text));
break;
case RequestBinaryMessage binaryMessage:
payload = new ArraySegment<byte>(binaryMessage.Data);
break;
case RequestBinarySegmentMessage segmentMessage:
payload = segmentMessage.Data;
break;
default:
throw new ArgumentException($"Unknown message type: {message.GetType()}");
}
#else
ReadOnlyMemory<byte> messageSegment = MemoryMarshal.AsMemory<byte>(GetEncoding().GetBytes(message));
ReadOnlyMemory<byte> payload;

switch (message)
{
case RequestTextMessage textMessage:
payload = MemoryMarshal.AsMemory<byte>(GetEncoding().GetBytes(textMessage.Text));
break;
case RequestBinaryMessage binaryMessage:
payload = MemoryMarshal.AsMemory<byte>(binaryMessage.Data);
break;
case RequestBinarySegmentMessage segmentMessage:
payload = segmentMessage.Data.AsMemory();
break;
default:
throw new ArgumentException($"Unknown message type: {message.GetType()}");
}
#endif

await _client
.SendAsync(messageSegment, WebSocketMessageType.Text, true, _cancellation.Token)
.SendAsync(payload, WebSocketMessageType.Text, true, _cancellation.Token)
.ConfigureAwait(false);
}

Expand All @@ -265,18 +348,18 @@ private async Task SendInternalSynchronized(ArraySegment<byte> message)
}
}

private async Task SendInternal(ArraySegment<byte> message)
private async Task SendInternal(ArraySegment<byte> payload)
{
if (!IsClientConnected())
{
Logger.Debug(L($"Client is not connected to server, cannot send binary, length: {message.Count}"));
Logger.Debug(L($"Client is not connected to server, cannot send binary, length: {payload.Count}"));
return;
}

Logger.Trace(L($"Sending binary, length: {message.Count}"));
Logger.Trace(L($"Sending binary, length: {payload.Count}"));

await _client
.SendAsync(message, WebSocketMessageType.Binary, true, _cancellation.Token)
.SendAsync(payload, WebSocketMessageType.Binary, true, _cancellation.Token)
.ConfigureAwait(false);
}
}
Expand Down

0 comments on commit df86961

Please sign in to comment.