From c66f715eb56322fc0a0158181fafc14e02a2618b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9o=20Monnom?= Date: Tue, 24 Sep 2024 12:36:44 -0700 Subject: [PATCH] fix: avoid wrong reconnection logs (#441) --- livekit-api/src/signal_client/mod.rs | 10 +++++----- .../src/signal_client/signal_stream.rs | 6 ++++-- livekit/src/rtc_engine/rtc_session.rs | 19 +++++++++++-------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 73ac1f916..711ba7ed2 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -152,7 +152,7 @@ impl SignalClient { /// Close the connection to the server pub async fn close(&self) { - self.inner.close().await; + self.inner.close(true).await; let handle = self.handle.lock().take(); if let Some(signal_task) = handle { @@ -254,7 +254,7 @@ impl SignalInner { proto::ReconnectResponse, mpsc::UnboundedReceiver>, )> { - self.close().await; + self.close(false).await; // Lock while we are reconnecting let mut stream = self.stream.write().await; @@ -278,9 +278,9 @@ impl SignalInner { } /// Close the connection - pub async fn close(&self) { + pub async fn close(&self, notify_close: bool) { if let Some(stream) = self.stream.write().await.take() { - stream.close().await; + stream.close(notify_close).await; } } @@ -392,7 +392,7 @@ async fn signal_task( } } - inner.close().await; // Make sure to always close the ws connection when the loop is terminated + inner.close(true).await; // Make sure to always close the ws connection when the loop is terminated } /// Check if the signal is queuable diff --git a/livekit-api/src/signal_client/signal_stream.rs b/livekit-api/src/signal_client/signal_stream.rs index 835ca7e8f..c5ab27951 100644 --- a/livekit-api/src/signal_client/signal_stream.rs +++ b/livekit-api/src/signal_client/signal_stream.rs @@ -110,8 +110,10 @@ impl SignalStream { /// Close the websocket /// It sends a CloseFrame to the server before closing - pub async fn close(self) { - let _ = self.internal_tx.send(InternalMessage::Close).await; + pub async fn close(self, notify_close: bool) { + if notify_close { + let _ = self.internal_tx.send(InternalMessage::Close).await; + } let _ = self.write_handle.await; let _ = self.read_handle.await; } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index e9f5619a6..cadbd8ff7 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -416,13 +416,15 @@ impl SessionInner { task.await; } SignalEvent::Close(reason) => { - // SignalClient has been closed - self.on_session_disconnected( - format!("signal client closed: {:?}", reason).as_str(), - DisconnectReason::UnknownReason, - proto::leave_request::Action::Resume, - false, - ); + if !self.closed.load(Ordering::Acquire) { + // SignalClient has been closed unexpectedly + self.on_session_disconnected( + format!("signal client closed: {:?}", reason).as_str(), + DisconnectReason::UnknownReason, + proto::leave_request::Action::Resume, + false, + ); + } } } }, @@ -776,6 +778,8 @@ impl SessionInner { } async fn close(&self) { + self.closed.store(true, Ordering::Release); + self.signal_client .send(proto::signal_request::Message::Leave(proto::LeaveRequest { action: proto::leave_request::Action::Disconnect.into(), @@ -784,7 +788,6 @@ impl SessionInner { })) .await; - self.closed.store(true, Ordering::Release); self.signal_client.close().await; self.publisher_pc.close(); self.subscriber_pc.close();