Skip to content

Commit

Permalink
Merge pull request #334 from dvonthenen/expose-send-finalize
Browse files Browse the repository at this point in the history
Implement Length on Send Functions, Expose KeepAlive, Finalize, etc
  • Loading branch information
davidvonthenen authored Sep 25, 2024
2 parents 5b3d635 + f2a12a8 commit 52fb7c1
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 31 deletions.
6 changes: 3 additions & 3 deletions Deepgram.Microphone/Microphone.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Deepgram.Microphone;
/// </summary>
public class Microphone
{
private Action<byte[]> _push_callback;
private Action<byte[], int> _push_callback;

private int _rate;
private uint _chunk;
Expand All @@ -27,7 +27,7 @@ public class Microphone
/// Constructor for Microphone
/// </summary>
public Microphone(
Action<byte[]> push_callback,
Action<byte[], int> push_callback,
int rate = Defaults.RATE,
uint chunkSize = Defaults.CHUNK_SIZE,
int channels = Defaults.CHANNELS,
Expand Down Expand Up @@ -120,7 +120,7 @@ private StreamCallbackResult _callback(nint input, nint output, uint frameCount,
}

// Push the data to the callback
_push_callback(buf);
_push_callback(buf, buf.Length);

return StreamCallbackResult.Continue;
}
Expand Down
15 changes: 15 additions & 0 deletions Deepgram/Clients/Interfaces/v1/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2024 Deepgram .NET SDK contributors. All Rights Reserved.
// Use of this source code is governed by a MIT license that can be found in the LICENSE file.
// SPDX-License-Identifier: MIT

namespace Deepgram.Clients.Interfaces.v1;

/// <summary>
/// Headers of interest in the return values from the Deepgram Speak API.
/// </summary>
public static class Constants
{
// WS buffer size
public const int UseArrayLengthForSend = -1;
}

25 changes: 20 additions & 5 deletions Deepgram/Clients/Interfaces/v1/IListenWebSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,47 @@ public Task Connect(LiveSchema options, CancellationTokenSource? cancelToken = n
#endregion

#region Send Functions
/// <summary>
/// Sends a KeepAlive message to Deepgram
/// </summary>
public void SendKeepAlive();

/// <summary>
/// Sends a Finalize message to Deepgram
/// </summary>
public void SendFinalize();

/// <summary>
/// Sends a Close message to Deepgram
/// </summary>
public void SendClose(bool nullByte = false);

/// <summary>
/// Sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data">The data to be sent over the WebSocket.</param>
public void Send(byte[] data);
public void Send(byte[] data, int length = Constants.UseArrayLengthForSend);

/// <summary>
/// This method sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data"></param>
public void SendBinary(byte[] data);
public void SendBinary(byte[] data, int length = Constants.UseArrayLengthForSend);

/// <summary>
/// This method sends a text message over the WebSocket connection.
/// </summary>
public void SendMessage(byte[] data);
public void SendMessage(byte[] data, int length = Constants.UseArrayLengthForSend);

/// <summary>
/// This method sends a binary message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendBinaryImmediately(byte[] data);
public void SendBinaryImmediately(byte[] data, int length = Constants.UseArrayLengthForSend);

/// <summary>
/// This method sends a text message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendMessageImmediately(byte[] data);
public void SendMessageImmediately(byte[] data, int length = Constants.UseArrayLengthForSend);
#endregion

#region Helpers
Expand Down
6 changes: 3 additions & 3 deletions Deepgram/Clients/Interfaces/v1/ISpeakWebSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public Task Connect(SpeakSchema options, CancellationTokenSource? cancelToken =
/// Sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data">The data to be sent over the WebSocket.</param>
public void Send(byte[] data);
public void Send(byte[] data, int length = Constants.UseArrayLengthForSend);

///// <summary>
///// This method sends a binary message over the WebSocket connection.
Expand All @@ -120,7 +120,7 @@ public Task Connect(SpeakSchema options, CancellationTokenSource? cancelToken =
/// <summary>
/// This method sends a text message over the WebSocket connection.
/// </summary>
public void SendMessage(byte[] data);
public void SendMessage(byte[] data, int length = Constants.UseArrayLengthForSend);

///// <summary>
///// This method sends a binary message over the WebSocket connection immediately without queueing.
Expand All @@ -130,7 +130,7 @@ public Task Connect(SpeakSchema options, CancellationTokenSource? cancelToken =
/// <summary>
/// This method sends a text message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendMessageImmediately(byte[] data);
public void SendMessageImmediately(byte[] data, int length = Constants.UseArrayLengthForSend);
#endregion

#region Helpers
Expand Down
47 changes: 38 additions & 9 deletions Deepgram/Clients/Listen/v1/WebSocket/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,47 +314,76 @@ public void SendFinalize()
SendMessageImmediately(data);
}

/// <summary>
/// Sends a Close message to Deepgram
/// </summary>
public void SendClose(bool nullByte = false)
{
Log.Debug("SendFinalize", "Sending Close Message Immediately...");
if (nullByte && _clientWebSocket != null)
{
// send a close to Deepgram
lock (_mutexSend)
{
_clientWebSocket.SendAsync(new ArraySegment<byte>([0]), WebSocketMessageType.Binary, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
return;
}

byte[] data = Encoding.ASCII.GetBytes("{\"type\": \"CloseStream\"}");
SendMessageImmediately(data);
}

/// <summary>
/// Sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data">The data to be sent over the WebSocket.</param>
public void Send(byte[] data) => SendBinary(data);
public void Send(byte[] data, int length = Constants.UseArrayLengthForSend) => SendBinary(data, length);

/// <summary>
/// This method sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data"></param>
public void SendBinary(byte[] data) =>
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Binary));
public void SendBinary(byte[] data, int length = Constants.UseArrayLengthForSend) =>
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Binary, length));

/// <summary>
/// This method sends a text message over the WebSocket connection.
/// </summary>
public void SendMessage(byte[] data) =>
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Text));
public void SendMessage(byte[] data, int length = Constants.UseArrayLengthForSend) =>
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Text, length));

/// <summary>
/// This method sends a binary message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendBinaryImmediately(byte[] data)
public void SendBinaryImmediately(byte[] data, int length = Constants.UseArrayLengthForSend)
{
lock (_mutexSend)
{
Log.Verbose("SendBinaryImmediately", "Sending binary message immediately.."); // TODO: dump this message
_clientWebSocket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Binary, true, _cancellationTokenSource.Token)
if (length == Constants.UseArrayLengthForSend)
{
length = data.Length;
}
_clientWebSocket.SendAsync(new ArraySegment<byte>(data, 0, length), WebSocketMessageType.Binary, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
}

/// <summary>
/// This method sends a text message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendMessageImmediately(byte[] data)
public void SendMessageImmediately(byte[] data, int length = Constants.UseArrayLengthForSend)
{
lock (_mutexSend)
{
Log.Verbose("SendBinaryImmediately", "Sending binary message immediately.."); // TODO: dump this message
_clientWebSocket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Text, true, _cancellationTokenSource.Token)
if (length == Constants.UseArrayLengthForSend)
{
length = data.Length;
}
_clientWebSocket.SendAsync(new ArraySegment<byte>(data, 0, length), WebSocketMessageType.Text, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
}
Expand Down
2 changes: 2 additions & 0 deletions Deepgram/Clients/Listen/v1/WebSocket/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ public static class Constants
{
// WS buffer size
public const int BufferSize = 1024 * 16;
public const int UseArrayLengthForSend = -1;

// Default timeout for connect/disconnect
public const int DefaultConnectTimeout = 5000;
public const int DefaultDisconnectTimeout = 5000;

// Default flush period
public const int DefaultFlushPeriodInMs = 500;
}

27 changes: 24 additions & 3 deletions Deepgram/Clients/Listen/v1/WebSocket/WebSocketMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,30 @@

namespace Deepgram.Clients.Listen.v1.WebSocket;

internal readonly struct WebSocketMessage(byte[] message, WebSocketMessageType type)
internal readonly struct WebSocketMessage
{
public ArraySegment<byte> Message { get; } = new ArraySegment<byte>(message);
public WebSocketMessage(byte[] message, WebSocketMessageType type)
: this(message, type, Constants.UseArrayLengthForSend)
{
}

public WebSocketMessageType MessageType { get; } = type;
public WebSocketMessage(byte[] message, WebSocketMessageType type, int length)
{
if (length != Constants.UseArrayLengthForSend || length < Constants.UseArrayLengthForSend)
{
Message = new ArraySegment<byte>(message, 0, length);
}
else
{
Message = new ArraySegment<byte>(message, 0, message.Length);
}
MessageType = type;
Length = length;
}

public int Length { get; }

public ArraySegment<byte> Message { get; }

public WebSocketMessageType MessageType { get; }
}
14 changes: 9 additions & 5 deletions Deepgram/Clients/Speak/v1/WebSocket/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void Close()
/// Sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data">The data to be sent over the WebSocket.</param>
public void Send(byte[] data) => SendMessage(data);
public void Send(byte[] data, int length = Constants.UseArrayLengthForSend) => SendMessage(data, length);

///// <summary>
///// This method sends a binary message over the WebSocket connection.
Expand All @@ -361,7 +361,7 @@ public void Close()
/// <summary>
/// This method sends a text message over the WebSocket connection.
/// </summary>
public void SendMessage(byte[] data)
public void SendMessage(byte[] data, int length = Constants.UseArrayLengthForSend)
{
// auto flush
if (_deepgramClientOptions.InspectSpeakMessage())
Expand All @@ -384,7 +384,7 @@ public void SendMessage(byte[] data)
}

// send message
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Text));
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Text, length));
}
///// <summary>
///// This method sends a binary message over the WebSocket connection immediately without queueing.
Expand All @@ -403,7 +403,7 @@ public void SendMessage(byte[] data)
/// <summary>
/// This method sends a text message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendMessageImmediately(byte[] data)
public void SendMessageImmediately(byte[] data, int length = Constants.UseArrayLengthForSend)
{
// auto flush
if (_deepgramClientOptions.InspectSpeakMessage())
Expand All @@ -428,7 +428,11 @@ public void SendMessageImmediately(byte[] data)
lock (_mutexSend)
{
Log.Verbose("SendBinaryImmediately", "Sending text message immediately.."); // TODO: dump this message
_clientWebSocket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Text, true, _cancellationTokenSource.Token)
if (length == Constants.UseArrayLengthForSend)
{
length = data.Length;
}
_clientWebSocket.SendAsync(new ArraySegment<byte>(data, 0, length), WebSocketMessageType.Text, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
}
Expand Down
1 change: 1 addition & 0 deletions Deepgram/Clients/Speak/v1/WebSocket/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public static class Constants
{
// WS buffer size
public const int BufferSize = 1024 * 16;
public const int UseArrayLengthForSend = -1;

// Default timeout for connect/disconnect
public const int DefaultConnectTimeout = 5000;
Expand Down
27 changes: 24 additions & 3 deletions Deepgram/Clients/Speak/v1/WebSocket/WebSocketMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,30 @@

namespace Deepgram.Clients.Speak.v1.WebSocket;

internal readonly struct WebSocketMessage(byte[] message, WebSocketMessageType type)
internal readonly struct WebSocketMessage
{
public ArraySegment<byte> Message { get; } = new ArraySegment<byte>(message);
public WebSocketMessage(byte[] message, WebSocketMessageType type)
: this(message, type, Constants.UseArrayLengthForSend)
{
}

public WebSocketMessageType MessageType { get; } = type;
public WebSocketMessage(byte[] message, WebSocketMessageType type, int length)
{
if (length != Constants.UseArrayLengthForSend || length < Constants.UseArrayLengthForSend)
{
Message = new ArraySegment<byte>(message, 0, length);
}
else
{
Message = new ArraySegment<byte>(message, 0, message.Length);
}
MessageType = type;
Length = length;
}

public int Length { get; }

public ArraySegment<byte> Message { get; }

public WebSocketMessageType MessageType { get; }
}

0 comments on commit 52fb7c1

Please sign in to comment.