Skip to content

Commit

Permalink
Merge pull request #262 from dvonthenen/live-client-send-funcs
Browse files Browse the repository at this point in the history
Functions for Handling Text vs Binary Messages on WS
  • Loading branch information
davidvonthenen authored Apr 4, 2024
2 parents 7c56b18 + d5cd1f5 commit 138a52d
Showing 1 changed file with 68 additions and 15 deletions.
83 changes: 68 additions & 15 deletions Deepgram/Clients/Live/v1/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Use of this source code is governed by a MIT license that can be found in the LICENSE file.
// SPDX-License-Identifier: MIT

using System;
using System.Threading;
using Deepgram.Models.Authenticate.v1;
using Deepgram.Models.Live.v1;
Expand All @@ -18,7 +19,9 @@ public class Client : Attribute, IDisposable

private ClientWebSocket? _clientWebSocket;
private CancellationTokenSource? _cancellationTokenSource;
private readonly Mutex _mutex = new Mutex();

private readonly SemaphoreSlim _mutexSubscribe = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _mutexSend = new SemaphoreSlim(1, 1);
#endregion

/// <param name="apiKey">Required DeepgramApiKey</param>
Expand Down Expand Up @@ -171,7 +174,7 @@ void StartKeepAliveBackgroundThread() => _ = Task.Factory.StartNew(
/// <returns>True if successful</returns>
public bool Subscribe(EventHandler<OpenResponse> eventHandler)
{
lock (_mutex)
lock (_mutexSubscribe)
{
_openReceived += (sender, e) => eventHandler(sender, e);
}
Expand All @@ -186,7 +189,7 @@ public bool Subscribe(EventHandler<OpenResponse> eventHandler)
/// <returns>True if successful</returns>
public bool Subscribe(EventHandler<MetadataResponse> eventHandler)
{
lock (_mutex)
lock (_mutexSubscribe)
{
_metadataReceived += (sender, e) => eventHandler(sender, e);
}
Expand All @@ -199,7 +202,7 @@ public bool Subscribe(EventHandler<MetadataResponse> eventHandler)
/// <returns>True if successful</returns>
public bool Subscribe(EventHandler<ResultResponse> eventHandler)
{
lock (_mutex)
lock (_mutexSubscribe)
{
_resultsReceived += (sender, e) => eventHandler(sender, e);
}
Expand All @@ -212,7 +215,7 @@ public bool Subscribe(EventHandler<ResultResponse> eventHandler)
/// <returns>True if successful</returns>
public bool Subscribe(EventHandler<UtteranceEndResponse> eventHandler)
{
lock (_mutex)
lock (_mutexSubscribe)
{
_utteranceEndReceived += (sender, e) => eventHandler(sender, e);
}
Expand All @@ -235,7 +238,7 @@ public bool Subscribe(EventHandler<SpeechStartedResponse> eventHandler)
/// <returns>True if successful</returns>
public bool Subscribe(EventHandler<CloseResponse> eventHandler)
{
lock (_mutex)
lock (_mutexSubscribe)
{
_closeReceived += (sender, e) => eventHandler(sender, e);
}
Expand All @@ -248,7 +251,7 @@ public bool Subscribe(EventHandler<CloseResponse> eventHandler)
/// <returns>True if successful</returns>
public bool Subscribe(EventHandler<UnhandledResponse> eventHandler)
{
lock (_mutex)
lock (_mutexSubscribe)
{
_unhandledReceived += (sender, e) => eventHandler(sender, e);
}
Expand All @@ -261,21 +264,61 @@ public bool Subscribe(EventHandler<UnhandledResponse> eventHandler)
/// <returns>True if successful</returns>
public bool Subscribe(EventHandler<ErrorResponse> eventHandler)
{
lock (_mutex)
lock (_mutexSubscribe)
{
_errorReceived += (sender, e) => eventHandler(sender, e);
}
return true;
}
#endregion

#region Send Functions
/// <summary>
/// Sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data">The data to be sent over the WebSocket.</param>
public void Send(byte[] data) =>
public void Send(byte[] data) => SendBinary(data);

/// <summary>
/// This method sends a binary message over the WebSocket connection.
/// </summary>
/// <param name="data"></param>
public void SendBinary(byte[] data) =>
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Binary));

/// <summary>
/// This method sends a text message over the WebSocket connection.
/// </summary>
public void SendMessage(byte[] data) =>
EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Text));

/// <summary>
/// This method sends a binary message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendBinaryImmediately(byte[] data)
{
lock (_mutexSend)
{
Log.Verbose("SendBinaryImmediately", "Sending binary message immediately.."); // TODO: dump this message
_clientWebSocket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Binary, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
}

/// <summary>
/// This method sends a text message over the WebSocket connection immediately without queueing.
/// </summary>
public void SendMessageImmediately(byte[] data)
{
lock (_mutexSend)
{
Log.Verbose("SendBinaryImmediately", "Sending binary message immediately.."); // TODO: dump this message
_clientWebSocket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Text, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
}
#endregion

internal void EnqueueSendMessage(WebSocketMessage message)
{
try
Expand Down Expand Up @@ -309,7 +352,11 @@ internal async Task ProcessSendQueue()
while (_sendChannel.Reader.TryRead(out var message))
{
Log.Verbose("ProcessSendQueue", $"Sending message..."); // TODO: dump this message
await _clientWebSocket.SendAsync(message.Message, message.MessageType, true, _cancellationTokenSource.Token).ConfigureAwait(false);
lock (_mutexSend)
{
_clientWebSocket.SendAsync(message.Message, message.MessageType, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
}
}

Expand Down Expand Up @@ -350,9 +397,12 @@ internal async void ProcessKeepAlive()
}

Log.Debug("ProcessKeepAlive", "Sending KeepAlive");
byte[] array = Encoding.ASCII.GetBytes("{\"type\": \"KeepAlive\"}");
await _clientWebSocket.SendAsync(new ArraySegment<byte>(array), WebSocketMessageType.Text, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
byte[] data = Encoding.ASCII.GetBytes("{\"type\": \"KeepAlive\"}");
lock (_mutexSend)
{
_clientWebSocket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Text, true, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
}

Log.Verbose("ProcessKeepAlive", "Exiting");
Expand Down Expand Up @@ -658,8 +708,11 @@ public async Task Stop(CancellationTokenSource? cancellationToken = null)
Log.Debug("Stop", "Sending Close message...");

// send a close to Deepgram
await _clientWebSocket.SendAsync(new ArraySegment<byte>([]), WebSocketMessageType.Binary, true, cancelToken)
.ConfigureAwait(false);
lock (_mutexSend)
{
_clientWebSocket.SendAsync(new ArraySegment<byte>([]), WebSocketMessageType.Binary, true, cancelToken)
.ConfigureAwait(false);
}

// send a CloseResponse event
if (_closeReceived != null)
Expand Down

0 comments on commit 138a52d

Please sign in to comment.