From d5cd1f5490b698849c148a10f0e9e71ac2ac8887 Mon Sep 17 00:00:00 2001 From: dvonthenen Date: Thu, 4 Apr 2024 09:22:11 -0700 Subject: [PATCH] Functions for Handling Text vs Binary Messages on WS --- Deepgram/Clients/Live/v1/Client.cs | 83 ++++++++++++++++++++++++------ 1 file changed, 68 insertions(+), 15 deletions(-) diff --git a/Deepgram/Clients/Live/v1/Client.cs b/Deepgram/Clients/Live/v1/Client.cs index 2d14e3b7..4110d6d3 100644 --- a/Deepgram/Clients/Live/v1/Client.cs +++ b/Deepgram/Clients/Live/v1/Client.cs @@ -2,6 +2,7 @@ // Use of this source code is governed by a MIT license that can be found in the LICENSE file. // SPDX-License-Identifier: MIT +using System; using System.Threading; using Deepgram.Models.Authenticate.v1; using Deepgram.Models.Live.v1; @@ -18,7 +19,9 @@ public class Client : Attribute, IDisposable private ClientWebSocket? _clientWebSocket; private CancellationTokenSource? _cancellationTokenSource; - private readonly Mutex _mutex = new Mutex(); + + private readonly SemaphoreSlim _mutexSubscribe = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _mutexSend = new SemaphoreSlim(1, 1); #endregion /// Required DeepgramApiKey @@ -171,7 +174,7 @@ void StartKeepAliveBackgroundThread() => _ = Task.Factory.StartNew( /// True if successful public bool Subscribe(EventHandler eventHandler) { - lock (_mutex) + lock (_mutexSubscribe) { _openReceived += (sender, e) => eventHandler(sender, e); } @@ -186,7 +189,7 @@ public bool Subscribe(EventHandler eventHandler) /// True if successful public bool Subscribe(EventHandler eventHandler) { - lock (_mutex) + lock (_mutexSubscribe) { _metadataReceived += (sender, e) => eventHandler(sender, e); } @@ -199,7 +202,7 @@ public bool Subscribe(EventHandler eventHandler) /// True if successful public bool Subscribe(EventHandler eventHandler) { - lock (_mutex) + lock (_mutexSubscribe) { _resultsReceived += (sender, e) => eventHandler(sender, e); } @@ -212,7 +215,7 @@ public bool Subscribe(EventHandler eventHandler) /// True if successful public bool Subscribe(EventHandler eventHandler) { - lock (_mutex) + lock (_mutexSubscribe) { _utteranceEndReceived += (sender, e) => eventHandler(sender, e); } @@ -235,7 +238,7 @@ public bool Subscribe(EventHandler eventHandler) /// True if successful public bool Subscribe(EventHandler eventHandler) { - lock (_mutex) + lock (_mutexSubscribe) { _closeReceived += (sender, e) => eventHandler(sender, e); } @@ -248,7 +251,7 @@ public bool Subscribe(EventHandler eventHandler) /// True if successful public bool Subscribe(EventHandler eventHandler) { - lock (_mutex) + lock (_mutexSubscribe) { _unhandledReceived += (sender, e) => eventHandler(sender, e); } @@ -261,7 +264,7 @@ public bool Subscribe(EventHandler eventHandler) /// True if successful public bool Subscribe(EventHandler eventHandler) { - lock (_mutex) + lock (_mutexSubscribe) { _errorReceived += (sender, e) => eventHandler(sender, e); } @@ -269,13 +272,53 @@ public bool Subscribe(EventHandler eventHandler) } #endregion + #region Send Functions /// /// Sends a binary message over the WebSocket connection. /// /// The data to be sent over the WebSocket. - public void Send(byte[] data) => + public void Send(byte[] data) => SendBinary(data); + + /// + /// This method sends a binary message over the WebSocket connection. + /// + /// + public void SendBinary(byte[] data) => EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Binary)); + /// + /// This method sends a text message over the WebSocket connection. + /// + public void SendMessage(byte[] data) => + EnqueueSendMessage(new WebSocketMessage(data, WebSocketMessageType.Text)); + + /// + /// This method sends a binary message over the WebSocket connection immediately without queueing. + /// + public void SendBinaryImmediately(byte[] data) + { + lock (_mutexSend) + { + Log.Verbose("SendBinaryImmediately", "Sending binary message immediately.."); // TODO: dump this message + _clientWebSocket.SendAsync(new ArraySegment(data), WebSocketMessageType.Binary, true, _cancellationTokenSource.Token) + .ConfigureAwait(false); + } + } + + /// + /// This method sends a text message over the WebSocket connection immediately without queueing. + /// + public void SendMessageImmediately(byte[] data) + { + lock (_mutexSend) + { + Log.Verbose("SendBinaryImmediately", "Sending binary message immediately.."); // TODO: dump this message + _clientWebSocket.SendAsync(new ArraySegment(data), WebSocketMessageType.Text, true, _cancellationTokenSource.Token) + .ConfigureAwait(false); + } + } + #endregion + internal void EnqueueSendMessage(WebSocketMessage message) { try @@ -309,7 +352,11 @@ internal async Task ProcessSendQueue() while (_sendChannel.Reader.TryRead(out var message)) { Log.Verbose("ProcessSendQueue", $"Sending message..."); // TODO: dump this message - await _clientWebSocket.SendAsync(message.Message, message.MessageType, true, _cancellationTokenSource.Token).ConfigureAwait(false); + lock (_mutexSend) + { + _clientWebSocket.SendAsync(message.Message, message.MessageType, true, _cancellationTokenSource.Token) + .ConfigureAwait(false); + } } } @@ -350,9 +397,12 @@ internal async void ProcessKeepAlive() } Log.Debug("ProcessKeepAlive", "Sending KeepAlive"); - byte[] array = Encoding.ASCII.GetBytes("{\"type\": \"KeepAlive\"}"); - await _clientWebSocket.SendAsync(new ArraySegment(array), WebSocketMessageType.Text, true, _cancellationTokenSource.Token) - .ConfigureAwait(false); + byte[] data = Encoding.ASCII.GetBytes("{\"type\": \"KeepAlive\"}"); + lock (_mutexSend) + { + _clientWebSocket.SendAsync(new ArraySegment(data), WebSocketMessageType.Text, true, _cancellationTokenSource.Token) + .ConfigureAwait(false); + } } Log.Verbose("ProcessKeepAlive", "Exiting"); @@ -658,8 +708,11 @@ public async Task Stop(CancellationTokenSource? cancellationToken = null) Log.Debug("Stop", "Sending Close message..."); // send a close to Deepgram - await _clientWebSocket.SendAsync(new ArraySegment([]), WebSocketMessageType.Binary, true, cancelToken) - .ConfigureAwait(false); + lock (_mutexSend) + { + _clientWebSocket.SendAsync(new ArraySegment([]), WebSocketMessageType.Binary, true, cancelToken) + .ConfigureAwait(false); + } // send a CloseResponse event if (_closeReceived != null)