From 3a93b6174c40506d917db05c8fc04659699a2c38 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Mon, 19 Aug 2024 17:49:56 -0700 Subject: [PATCH 01/34] Convencience API for iterating frames --- libwebrtc/src/video_stream.rs | 4 ++ livekit-ffi/protocol/audio_frame.proto | 9 +++ livekit-ffi/protocol/ffi.proto | 2 + livekit-ffi/protocol/video_frame.proto | 12 ++++ livekit-ffi/src/livekit.proto.rs | 44 ++++++++++++- livekit-ffi/src/server/mod.rs | 1 + livekit-ffi/src/server/utils.rs | 51 +++++++++++++++ livekit-ffi/src/server/video_stream.rs | 91 ++++++++++++++++++++++++-- livekit-protocol/protocol | 2 +- 9 files changed, 210 insertions(+), 6 deletions(-) create mode 100644 livekit-ffi/src/server/utils.rs diff --git a/libwebrtc/src/video_stream.rs b/libwebrtc/src/video_stream.rs index 445dc6351..901b2f5d3 100644 --- a/libwebrtc/src/video_stream.rs +++ b/libwebrtc/src/video_stream.rs @@ -51,6 +51,10 @@ pub mod native { pub fn close(&mut self) { self.handle.close(); } + + pub fn set_native_video_stream(&mut self, native_video_stream: stream_imp::NativeVideoStream) { + self.handle = native_video_stream; + } } impl Stream for NativeVideoStream { diff --git a/livekit-ffi/protocol/audio_frame.proto b/livekit-ffi/protocol/audio_frame.proto index dd7be86a2..5379c4885 100644 --- a/livekit-ffi/protocol/audio_frame.proto +++ b/livekit-ffi/protocol/audio_frame.proto @@ -18,6 +18,7 @@ package livekit.proto; option csharp_namespace = "LiveKit.Proto"; import "handle.proto"; +import "track.proto"; // Create a new AudioStream // AudioStream is used to receive audio frames from a track @@ -27,6 +28,14 @@ message NewAudioStreamRequest { } message NewAudioStreamResponse { OwnedAudioStream stream = 1; } +message AudioStreamFromParticipantRequest { + uint64 participant_handle = 1; + AudioStreamType type = 2; + optional TrackSource track_source = 3; +} + +message AudioStreamFromParticipantResponse { OwnedAudioStream stream = 1; } + // Create a new AudioSource message NewAudioSourceRequest { AudioSourceType type = 1; diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index 6dae560b5..1385337a1 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -80,6 +80,7 @@ message FfiRequest { NewVideoSourceRequest new_video_source = 19; CaptureVideoFrameRequest capture_video_frame = 20; VideoConvertRequest video_convert = 21; + VideoStreamFromParticipantRequest video_stream_from_participant = 22; // Audio NewAudioStreamRequest new_audio_stream = 23; @@ -88,6 +89,7 @@ message FfiRequest { NewAudioResamplerRequest new_audio_resampler = 26; RemixAndResampleRequest remix_and_resample = 27; E2eeRequest e2ee = 28; + AudioStreamFromParticipantRequest audio_stream_from_participant = 29; } } diff --git a/livekit-ffi/protocol/video_frame.proto b/livekit-ffi/protocol/video_frame.proto index ffb2e0bde..090518b5f 100644 --- a/livekit-ffi/protocol/video_frame.proto +++ b/livekit-ffi/protocol/video_frame.proto @@ -18,6 +18,7 @@ package livekit.proto; option csharp_namespace = "LiveKit.Proto"; import "handle.proto"; +import "track.proto"; // Create a new VideoStream // VideoStream is used to receive video frames from a track @@ -30,6 +31,17 @@ message NewVideoStreamRequest { } message NewVideoStreamResponse { OwnedVideoStream stream = 1; } +// Request a video stream from a participant +message VideoStreamFromParticipantRequest { + uint64 participant_handle = 1; + VideoStreamType type = 2; + TrackSource track_source = 3; + optional VideoBufferType format = 4; + bool normalize_stride = 5; +} + +message VideoStreamFromParticipantResponse { OwnedVideoStream stream = 1;} + // Create a new VideoSource // VideoSource is used to send video frame to a track message NewVideoSourceRequest { diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 6a3c8a3d1..e31dbb920 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -1,4 +1,5 @@ // @generated +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FrameCryptor { @@ -1542,6 +1543,27 @@ pub struct NewVideoStreamResponse { #[prost(message, optional, tag="1")] pub stream: ::core::option::Option, } +/// Request a video stream from a participant +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VideoStreamFromParticipantRequest { + #[prost(uint64, tag="1")] + pub participant_handle: u64, + #[prost(enumeration="VideoStreamType", tag="2")] + pub r#type: i32, + #[prost(enumeration="TrackSource", tag="3")] + pub track_source: i32, + #[prost(enumeration="VideoBufferType", optional, tag="4")] + pub format: ::core::option::Option, + #[prost(bool, tag="5")] + pub normalize_stride: bool, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VideoStreamFromParticipantResponse { + #[prost(message, optional, tag="1")] + pub stream: ::core::option::Option, +} /// Create a new VideoSource /// VideoSource is used to send video frame to a track #[allow(clippy::derive_partial_eq_without_eq)] @@ -2847,6 +2869,22 @@ pub struct NewAudioStreamResponse { #[prost(message, optional, tag="1")] pub stream: ::core::option::Option, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AudioStreamFromParticipantRequest { + #[prost(uint64, tag="1")] + pub participant_handle: u64, + #[prost(enumeration="AudioStreamType", tag="2")] + pub r#type: i32, + #[prost(enumeration="TrackSource", optional, tag="3")] + pub track_source: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AudioStreamFromParticipantResponse { + #[prost(message, optional, tag="1")] + pub stream: ::core::option::Option, +} /// Create a new AudioSource #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -3118,7 +3156,7 @@ impl AudioSourceType { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiRequest { - #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 23, 24, 25, 26, 27, 28")] + #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiRequest`. @@ -3169,6 +3207,8 @@ pub mod ffi_request { CaptureVideoFrame(super::CaptureVideoFrameRequest), #[prost(message, tag="21")] VideoConvert(super::VideoConvertRequest), + #[prost(message, tag="22")] + VideoStreamFromParticipant(super::VideoStreamFromParticipantRequest), /// Audio #[prost(message, tag="23")] NewAudioStream(super::NewAudioStreamRequest), @@ -3182,6 +3222,8 @@ pub mod ffi_request { RemixAndResample(super::RemixAndResampleRequest), #[prost(message, tag="28")] E2ee(super::E2eeRequest), + #[prost(message, tag="29")] + AudioStreamFromParticipant(super::AudioStreamFromParticipantRequest), } } /// This is the output of livekit_ffi_request function. diff --git a/livekit-ffi/src/server/mod.rs b/livekit-ffi/src/server/mod.rs index 2d3cf6086..ac7c94b91 100644 --- a/livekit-ffi/src/server/mod.rs +++ b/livekit-ffi/src/server/mod.rs @@ -36,6 +36,7 @@ pub mod colorcvt; pub mod logger; pub mod requests; pub mod room; +mod utils; pub mod video_source; pub mod video_stream; diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs new file mode 100644 index 000000000..e536445ff --- /dev/null +++ b/livekit-ffi/src/server/utils.rs @@ -0,0 +1,51 @@ +use livekit::prelude::{RoomEvent, Track, TrackSource}; +use tokio::sync::mpsc; + +use super::room::FfiParticipant; + +pub async fn track_changed_trigger( + participant: FfiParticipant, + track_source: TrackSource, + track_tx: mpsc::Sender, +) { + for track_pub in participant.participant.track_publications().values() { + if track_pub.source() == track_source.into() { + let track = track_pub.track(); + match track { + Some(track) => { + track_tx.send(track).await; + } + _ => {} + } + } + } + let room = &participant.room.room; + let mut room_event_rx = room.subscribe(); + while let Some(event) = room_event_rx.recv().await { + match event { + RoomEvent::TrackPublished { publication, participant: p } => { + if participant.participant.identity() != p.identity() { + continue; + } + if publication.source() == track_source.into() { + let track = publication.track(); + match track { + Some(track) => { + track_tx.send(track.into()).await; + } + _ => {} + } + } + } + RoomEvent::ParticipantDisconnected(participant) => { + if participant.identity() == participant.identity() { + break; + } + } + RoomEvent::Disconnected { reason: _ } => { + break; + } + _ => {} + } + } +} diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 83fc79b54..71a49b7e7 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -13,10 +13,18 @@ // limitations under the License. use futures_util::StreamExt; -use livekit::webrtc::{prelude::*, video_stream::native::NativeVideoStream}; -use tokio::sync::oneshot; - -use super::{colorcvt, room::FfiTrack, FfiHandle}; +use livekit::{ + prelude::Track, + webrtc::{prelude::*, video_stream::native::NativeVideoStream}, +}; +use tokio::sync::{mpsc, oneshot}; + +use super::{ + colorcvt, + room::{FfiParticipant, FfiTrack}, + FfiHandle, +}; +use crate::server::utils; use crate::{proto, server, FfiError, FfiHandleId, FfiResult}; pub struct FfiVideoStream { @@ -141,4 +149,79 @@ impl FfiVideoStream { log::warn!("failed to send video EOS: {}", err); } } + + async fn participant_video_stream_task( + server: &'static server::FfiServer, + request: proto::VideoStreamFromParticipantRequest, + stream_handle: FfiHandleId, + participant_handle: FfiHandleId, + mut close_rx: oneshot::Receiver<()>, + dst_type: Option, + normalize_stride: bool, + ) { + let p = server.retrieve_handle::(participant_handle); + let p = match p { + Ok(p) => p, + Err(err) => { + log::error!("failed to retrieve participant: {}", err); + return; + } + }; + let track_source = request.track_source(); + let (track_tx, mut track_rx) = mpsc::channel::(1); + server.async_runtime.spawn(utils::track_changed_trigger( + p.clone(), + track_source.into(), + track_tx, + )); + // track_tx is no longer held, so the track_rx will be closed + + loop { + let track = track_rx.recv().await; + if let Some(track) = track { + let rtc_track = track.rtc_track(); + let MediaStreamTrack::Video(rtc_track) = rtc_track else { + continue; + }; + + Self::native_video_stream_task( + server, + stream_handle, + dst_type, + normalize_stride, + NativeVideoStream::new(rtc_track), + close_rx, + ) + .await; + } else { + break; + } + } + } + + pub fn from_participant( + server: &'static server::FfiServer, + request: proto::VideoStreamFromParticipantRequest, + ) -> FfiResult { + let ffi_participant = + server.retrieve_handle::(request.participant_handle)?.clone(); + let (close_tx, close_rx) = oneshot::channel(); + let stream_type = request.r#type(); + let handle_id = server.next_id(); + let stream = match stream_type { + #[cfg(not(target_arch = "wasm32"))] + proto::VideoStreamType::VideoStreamNative => { + let video_stream = Self { handle_id, close_tx, stream_type }; + Ok::(video_stream) + } + _ => return Err(FfiError::InvalidRequest("unsupported video stream type".into())), + }?; + let info = proto::VideoStreamInfo::from(&stream); + server.store_handle(stream.handle_id, stream); + + Ok(proto::OwnedVideoStream { + handle: Some(proto::FfiOwnedHandle { id: handle_id }), + info: Some(info), + }) + } } diff --git a/livekit-protocol/protocol b/livekit-protocol/protocol index 5a524703e..9e23fdbd0 160000 --- a/livekit-protocol/protocol +++ b/livekit-protocol/protocol @@ -1 +1 @@ -Subproject commit 5a524703ead68a9946efc6e2a4d62203ec688fbe +Subproject commit 9e23fdbd08859769a29ea4319a973e35ac01d0bc From 200c121dc809fdc96d8866c735e6ef65e6e08b72 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Tue, 20 Aug 2024 14:06:38 -0700 Subject: [PATCH 02/34] video stream --- livekit-ffi/src/server/utils.rs | 12 +++ livekit-ffi/src/server/video_stream.rs | 115 ++++++++++++++----------- 2 files changed, 76 insertions(+), 51 deletions(-) diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index e536445ff..1b07b0f4b 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -2,6 +2,7 @@ use livekit::prelude::{RoomEvent, Track, TrackSource}; use tokio::sync::mpsc; use super::room::FfiParticipant; +use crate::{server, FfiError, FfiHandleId}; pub async fn track_changed_trigger( participant: FfiParticipant, @@ -49,3 +50,14 @@ pub async fn track_changed_trigger( } } } + +pub fn ffi_participant_from_handle( + server: &'static server::FfiServer, + handle_id: FfiHandleId, +) -> Result { + let ffi_participant_handle = server.retrieve_handle::(handle_id); + if ffi_participant_handle.is_err() { + return Err(FfiError::InvalidRequest("participant not found".into())); + } + return Ok(ffi_participant_handle.unwrap().clone()); +} diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 71a49b7e7..2a42106e1 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -19,11 +19,7 @@ use livekit::{ }; use tokio::sync::{mpsc, oneshot}; -use super::{ - colorcvt, - room::{FfiParticipant, FfiTrack}, - FfiHandle, -}; +use super::{colorcvt, FfiHandle}; use crate::server::utils; use crate::{proto, server, FfiError, FfiHandleId, FfiResult}; @@ -46,7 +42,7 @@ impl FfiVideoStream { /// /// It is possible that the client receives a VideoFrame after the task is closed. The client /// musts ignore it. - pub fn setup( + pub fn from_track( server: &'static server::FfiServer, new_stream: proto::NewVideoStreamRequest, ) -> FfiResult { @@ -88,6 +84,35 @@ impl FfiVideoStream { }) } + pub fn from_participant( + server: &'static server::FfiServer, + request: proto::VideoStreamFromParticipantRequest, + ) -> FfiResult { + let (close_tx, close_rx) = oneshot::channel(); + let stream_type = request.r#type(); + let handle_id = server.next_id(); + let dst_type = request.format.and_then(|_| Some(request.format())); + let stream = match stream_type { + #[cfg(not(target_arch = "wasm32"))] + proto::VideoStreamType::VideoStreamNative => { + let video_stream = Self { handle_id, close_tx, stream_type }; + let handle = server.async_runtime.spawn(Self::participant_video_stream_task( + server, request, handle_id, dst_type, close_rx, + )); + server.watch_panic(handle); + Ok::(video_stream) + } + _ => return Err(FfiError::InvalidRequest("unsupported video stream type".into())), + }?; + let info = proto::VideoStreamInfo::from(&stream); + server.store_handle(stream.handle_id, stream); + + Ok(proto::OwnedVideoStream { + handle: Some(proto::FfiOwnedHandle { id: handle_id }), + info: Some(info), + }) + } + async fn native_video_stream_task( server: &'static server::FfiServer, stream_handle: FfiHandleId, @@ -154,27 +179,27 @@ impl FfiVideoStream { server: &'static server::FfiServer, request: proto::VideoStreamFromParticipantRequest, stream_handle: FfiHandleId, - participant_handle: FfiHandleId, - mut close_rx: oneshot::Receiver<()>, dst_type: Option, - normalize_stride: bool, + mut close_rx: oneshot::Receiver<()>, ) { - let p = server.retrieve_handle::(participant_handle); - let p = match p { - Ok(p) => p, + let ffi_participant = + utils::ffi_participant_from_handle(server, request.participant_handle); + let ffi_participant = match ffi_participant { + Ok(ffi_participant) => ffi_participant, Err(err) => { - log::error!("failed to retrieve participant: {}", err); + log::error!("failed to get participant: {}", err); return; } }; + let track_source = request.track_source(); let (track_tx, mut track_rx) = mpsc::channel::(1); server.async_runtime.spawn(utils::track_changed_trigger( - p.clone(), + ffi_participant, track_source.into(), track_tx, )); - // track_tx is no longer held, so the track_rx will be closed + // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done loop { let track = track_rx.recv().await; @@ -183,45 +208,33 @@ impl FfiVideoStream { let MediaStreamTrack::Video(rtc_track) = rtc_track else { continue; }; - - Self::native_video_stream_task( - server, - stream_handle, - dst_type, - normalize_stride, - NativeVideoStream::new(rtc_track), - close_rx, - ) - .await; + let (c_tx, c_rx) = oneshot::channel::<()>(); + let (done_tx, mut done_rx) = oneshot::channel::<()>(); + server.async_runtime.spawn(async move { + Self::native_video_stream_task( + server, + stream_handle, + dst_type, + request.normalize_stride, + NativeVideoStream::new(rtc_track), + c_rx, + ) + .await; + done_tx.send(()); + }); + tokio::select! { + _ = &mut close_rx => { + c_tx.send(()); + return + } + _ = &mut done_rx => { + break + } + } } else { + // When tracks are done (i.e. the participant leaves the room), we are done break; } } } - - pub fn from_participant( - server: &'static server::FfiServer, - request: proto::VideoStreamFromParticipantRequest, - ) -> FfiResult { - let ffi_participant = - server.retrieve_handle::(request.participant_handle)?.clone(); - let (close_tx, close_rx) = oneshot::channel(); - let stream_type = request.r#type(); - let handle_id = server.next_id(); - let stream = match stream_type { - #[cfg(not(target_arch = "wasm32"))] - proto::VideoStreamType::VideoStreamNative => { - let video_stream = Self { handle_id, close_tx, stream_type }; - Ok::(video_stream) - } - _ => return Err(FfiError::InvalidRequest("unsupported video stream type".into())), - }?; - let info = proto::VideoStreamInfo::from(&stream); - server.store_handle(stream.handle_id, stream); - - Ok(proto::OwnedVideoStream { - handle: Some(proto::FfiOwnedHandle { id: handle_id }), - info: Some(info), - }) - } } From 0adf0f7e324a9e6552303f3dbfd84dfe00376402 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Tue, 20 Aug 2024 14:08:12 -0700 Subject: [PATCH 03/34] remove unused --- libwebrtc/src/video_stream.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libwebrtc/src/video_stream.rs b/libwebrtc/src/video_stream.rs index 901b2f5d3..445dc6351 100644 --- a/libwebrtc/src/video_stream.rs +++ b/libwebrtc/src/video_stream.rs @@ -51,10 +51,6 @@ pub mod native { pub fn close(&mut self) { self.handle.close(); } - - pub fn set_native_video_stream(&mut self, native_video_stream: stream_imp::NativeVideoStream) { - self.handle = native_video_stream; - } } impl Stream for NativeVideoStream { From d2ff1cae75356d64a35e0581f6b45b2f1308b39a Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Tue, 20 Aug 2024 14:09:10 -0700 Subject: [PATCH 04/34] fix import --- livekit-ffi/src/server/video_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 2a42106e1..416657880 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -19,7 +19,7 @@ use livekit::{ }; use tokio::sync::{mpsc, oneshot}; -use super::{colorcvt, FfiHandle}; +use super::{colorcvt, room::FfiTrack, FfiHandle}; use crate::server::utils; use crate::{proto, server, FfiError, FfiHandleId, FfiResult}; From a6aa6ad3b357cd8a325ecfa5e6892a4d6c78fa2b Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Tue, 20 Aug 2024 14:25:00 -0700 Subject: [PATCH 05/34] Audio stream impl --- livekit-ffi/src/server/audio_stream.rs | 100 ++++++++++++++++++++++++- livekit-ffi/src/server/requests.rs | 4 +- livekit-ffi/src/server/utils.rs | 4 +- 3 files changed, 103 insertions(+), 5 deletions(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index b5edcad0d..6f2a1196f 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -13,11 +13,13 @@ // limitations under the License. use futures_util::StreamExt; +use livekit::track::Track; use livekit::webrtc::{audio_stream::native::NativeAudioStream, prelude::*}; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use super::{room::FfiTrack, FfiHandle}; use crate::{proto, server, FfiError, FfiHandleId, FfiResult}; +use crate::server::utils; pub struct FfiAudioStream { pub handle_id: FfiHandleId, @@ -38,7 +40,7 @@ impl FfiAudioStream { /// /// It is possible that the client receives an AudioFrame after the task is closed. The client /// musts ignore it. - pub fn setup( + pub fn from_track( server: &'static server::FfiServer, new_stream: proto::NewAudioStreamRequest, ) -> FfiResult { @@ -80,6 +82,100 @@ impl FfiAudioStream { }) } + pub fn from_participant( + server: &'static server::FfiServer, + request: proto::AudioStreamFromParticipantRequest, + ) -> FfiResult { + let (close_tx, close_rx) = oneshot::channel(); + let handle_id = server.next_id(); + let stream_type = request.r#type(); + let audio_stream = match stream_type { + #[cfg(not(target_arch = "wasm32"))] + proto::AudioStreamType::AudioStreamNative => { + let audio_stream = Self { handle_id, stream_type, close_tx }; + + let handle = server.async_runtime.spawn(Self::participant_audio_stream_task( + server, + request, + handle_id, + close_rx, + )); + server.watch_panic(handle); + Ok::(audio_stream) + } + _ => return Err(FfiError::InvalidRequest("unsupported audio stream type".into())), + }?; + + // Store the new audio stream and return the info + let info = proto::AudioStreamInfo::from(&audio_stream); + server.store_handle(handle_id, audio_stream); + + Ok(proto::OwnedAudioStream { + handle: Some(proto::FfiOwnedHandle { id: handle_id }), + info: Some(info), + }) + } + + async fn participant_audio_stream_task( + server: &'static server::FfiServer, + request: proto::AudioStreamFromParticipantRequest, + stream_handle: FfiHandleId, + mut close_rx: oneshot::Receiver<()>, + ) { + let ffi_participant = + utils::ffi_participant_from_handle(server, request.participant_handle); + let ffi_participant = match ffi_participant { + Ok(ffi_participant) => ffi_participant, + Err(err) => { + log::error!("failed to get participant: {}", err); + return; + } + }; + + let track_source = request.track_source(); + let (track_tx, mut track_rx) = mpsc::channel::(1); + server.async_runtime.spawn(utils::track_changed_trigger( + ffi_participant, + track_source.into(), + track_tx, + )); + // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done + + loop { + let track = track_rx.recv().await; + if let Some(track) = track { + let rtc_track = track.rtc_track(); + let MediaStreamTrack::Audio(rtc_track) = rtc_track else { + continue; + }; + let (c_tx, c_rx) = oneshot::channel::<()>(); + let (done_tx, mut done_rx) = oneshot::channel::<()>(); + server.async_runtime.spawn(async move { + Self::native_audio_stream_task( + server, + stream_handle, + NativeAudioStream::new(rtc_track), + c_rx, + ) + .await; + done_tx.send(()); + }); + tokio::select! { + _ = &mut close_rx => { + c_tx.send(()); + return + } + _ = &mut done_rx => { + break + } + } + } else { + // When tracks are done (i.e. the participant leaves the room), we are done + break; + } + } + } + async fn native_audio_stream_task( server: &'static server::FfiServer, stream_handle_id: FfiHandleId, diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index c2e93853d..0533069db 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -269,7 +269,7 @@ fn on_new_video_stream( server: &'static FfiServer, new_stream: proto::NewVideoStreamRequest, ) -> FfiResult { - let stream_info = video_stream::FfiVideoStream::setup(server, new_stream)?; + let stream_info = video_stream::FfiVideoStream::from_track(server, new_stream)?; Ok(proto::NewVideoStreamResponse { stream: Some(stream_info) }) } @@ -327,7 +327,7 @@ fn on_new_audio_stream( server: &'static FfiServer, new_stream: proto::NewAudioStreamRequest, ) -> FfiResult { - let stream_info = audio_stream::FfiAudioStream::setup(server, new_stream)?; + let stream_info = audio_stream::FfiAudioStream::from_track(server, new_stream)?; Ok(proto::NewAudioStreamResponse { stream: Some(stream_info) }) } diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index 1b07b0f4b..4a453f907 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -1,7 +1,9 @@ use livekit::prelude::{RoomEvent, Track, TrackSource}; use tokio::sync::mpsc; -use super::room::FfiParticipant; +use super::{ + room::{FfiParticipant}, +}; use crate::{server, FfiError, FfiHandleId}; pub async fn track_changed_trigger( From 72882a7ba5ec1207aded9ea7445bb94413d523a3 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Tue, 20 Aug 2024 14:41:45 -0700 Subject: [PATCH 06/34] ffi --- livekit-ffi/protocol/ffi.proto | 15 +++++++++------ livekit-ffi/src/livekit.proto.rs | 18 +++++++++++------- livekit-ffi/src/server/requests.rs | 27 +++++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 13 deletions(-) diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index 1385337a1..2d20bd1f1 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -122,14 +122,17 @@ message FfiResponse { NewVideoSourceResponse new_video_source = 19; CaptureVideoFrameResponse capture_video_frame = 20; VideoConvertResponse video_convert = 21; + VideoStreamFromParticipantResponse video_stream_from_participant = 22; // Audio - NewAudioStreamResponse new_audio_stream = 22; - NewAudioSourceResponse new_audio_source = 23; - CaptureAudioFrameResponse capture_audio_frame = 24; - NewAudioResamplerResponse new_audio_resampler = 25; - RemixAndResampleResponse remix_and_resample = 26; - E2eeResponse e2ee = 27; + NewAudioStreamResponse new_audio_stream = 23; + NewAudioSourceResponse new_audio_source = 24; + CaptureAudioFrameResponse capture_audio_frame = 25; + NewAudioResamplerResponse new_audio_resampler = 26; + RemixAndResampleResponse remix_and_resample = 27; + AudioStreamFromParticipantResponse audio_stream_from_participant = 28; + + E2eeResponse e2ee = 29; } } diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index e31dbb920..eeffdfc39 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -3230,7 +3230,7 @@ pub mod ffi_request { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiResponse { - #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27")] + #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiResponse`. @@ -3281,18 +3281,22 @@ pub mod ffi_response { CaptureVideoFrame(super::CaptureVideoFrameResponse), #[prost(message, tag="21")] VideoConvert(super::VideoConvertResponse), - /// Audio #[prost(message, tag="22")] - NewAudioStream(super::NewAudioStreamResponse), + VideoStreamFromParticipant(super::VideoStreamFromParticipantResponse), + /// Audio #[prost(message, tag="23")] - NewAudioSource(super::NewAudioSourceResponse), + NewAudioStream(super::NewAudioStreamResponse), #[prost(message, tag="24")] - CaptureAudioFrame(super::CaptureAudioFrameResponse), + NewAudioSource(super::NewAudioSourceResponse), #[prost(message, tag="25")] - NewAudioResampler(super::NewAudioResamplerResponse), + CaptureAudioFrame(super::CaptureAudioFrameResponse), #[prost(message, tag="26")] - RemixAndResample(super::RemixAndResampleResponse), + NewAudioResampler(super::NewAudioResamplerResponse), #[prost(message, tag="27")] + RemixAndResample(super::RemixAndResampleResponse), + #[prost(message, tag="28")] + AudioStreamFromParticipant(super::AudioStreamFromParticipantResponse), + #[prost(message, tag="29")] E2ee(super::E2eeResponse), } } diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index 0533069db..005fe67d2 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -273,6 +273,14 @@ fn on_new_video_stream( Ok(proto::NewVideoStreamResponse { stream: Some(stream_info) }) } +fn on_video_stream_from_participant( + server: &'static FfiServer, + request: proto::VideoStreamFromParticipantRequest, +) -> FfiResult { + let stream_info = video_stream::FfiVideoStream::from_participant(server, request)?; + Ok(proto::VideoStreamFromParticipantResponse { stream: Some(stream_info) }) +} + /// Create a new video source, used to publish data to a track fn on_new_video_source( server: &'static FfiServer, @@ -331,6 +339,15 @@ fn on_new_audio_stream( Ok(proto::NewAudioStreamResponse { stream: Some(stream_info) }) } +// Create a new audio stream from a participant and track source +fn on_audio_stream_from_participant_stream( + server: &'static FfiServer, + request: proto::AudioStreamFromParticipantRequest, +) -> FfiResult { + let stream_info = audio_stream::FfiAudioStream::from_participant(server, request)?; + Ok(proto::AudioStreamFromParticipantResponse { stream: Some(stream_info) }) +} + /// Create a new audio source (used to publish audio frames to a track) fn on_new_audio_source( server: &'static FfiServer, @@ -633,6 +650,11 @@ pub fn handle_request( proto::ffi_request::Message::NewVideoStream(new_stream) => { proto::ffi_response::Message::NewVideoStream(on_new_video_stream(server, new_stream)?) } + proto::ffi_request::Message::VideoStreamFromParticipant(new_stream) => { + proto::ffi_response::Message::VideoStreamFromParticipant( + on_video_stream_from_participant(server, new_stream)?, + ) + } proto::ffi_request::Message::NewVideoSource(new_source) => { proto::ffi_response::Message::NewVideoSource(on_new_video_source(server, new_source)?) } @@ -648,6 +670,11 @@ pub fn handle_request( proto::ffi_request::Message::NewAudioSource(new_source) => { proto::ffi_response::Message::NewAudioSource(on_new_audio_source(server, new_source)?) } + proto::ffi_request::Message::AudioStreamFromParticipant(new_stream) => { + proto::ffi_response::Message::AudioStreamFromParticipant( + on_audio_stream_from_participant_stream(server, new_stream)?, + ) + } proto::ffi_request::Message::CaptureAudioFrame(push) => { proto::ffi_response::Message::CaptureAudioFrame(on_capture_audio_frame(server, push)?) } From 71e7f10334f7ec3bcdb8563fb22a2b33ea1a4c0b Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Tue, 20 Aug 2024 14:44:29 -0700 Subject: [PATCH 07/34] fix warnings --- livekit-ffi/src/server/audio_stream.rs | 4 ++-- livekit-ffi/src/server/utils.rs | 4 ++-- livekit-ffi/src/server/video_stream.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 6f2a1196f..b333723b6 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -158,11 +158,11 @@ impl FfiAudioStream { c_rx, ) .await; - done_tx.send(()); + let _ = done_tx.send(()); }); tokio::select! { _ = &mut close_rx => { - c_tx.send(()); + let _ = c_tx.send(()); return } _ = &mut done_rx => { diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index 4a453f907..17a9ea5a3 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -16,7 +16,7 @@ pub async fn track_changed_trigger( let track = track_pub.track(); match track { Some(track) => { - track_tx.send(track).await; + let _ = track_tx.send(track).await; } _ => {} } @@ -34,7 +34,7 @@ pub async fn track_changed_trigger( let track = publication.track(); match track { Some(track) => { - track_tx.send(track.into()).await; + let _ = track_tx.send(track.into()).await; } _ => {} } diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 416657880..90a6c2dc3 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -220,11 +220,11 @@ impl FfiVideoStream { c_rx, ) .await; - done_tx.send(()); + let _ = done_tx.send(()); }); tokio::select! { _ = &mut close_rx => { - c_tx.send(()); + let _ = c_tx.send(()); return } _ = &mut done_rx => { From aebea8981152776cb4114053e6798f6291356c3f Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Tue, 20 Aug 2024 18:57:44 -0700 Subject: [PATCH 08/34] cargo fmt --- livekit-ffi/src/server/audio_stream.rs | 7 ++----- livekit-ffi/src/server/utils.rs | 4 +--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index b333723b6..180b6d490 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -18,8 +18,8 @@ use livekit::webrtc::{audio_stream::native::NativeAudioStream, prelude::*}; use tokio::sync::{mpsc, oneshot}; use super::{room::FfiTrack, FfiHandle}; -use crate::{proto, server, FfiError, FfiHandleId, FfiResult}; use crate::server::utils; +use crate::{proto, server, FfiError, FfiHandleId, FfiResult}; pub struct FfiAudioStream { pub handle_id: FfiHandleId, @@ -95,10 +95,7 @@ impl FfiAudioStream { let audio_stream = Self { handle_id, stream_type, close_tx }; let handle = server.async_runtime.spawn(Self::participant_audio_stream_task( - server, - request, - handle_id, - close_rx, + server, request, handle_id, close_rx, )); server.watch_panic(handle); Ok::(audio_stream) diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index 17a9ea5a3..eb94e480e 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -1,9 +1,7 @@ use livekit::prelude::{RoomEvent, Track, TrackSource}; use tokio::sync::mpsc; -use super::{ - room::{FfiParticipant}, -}; +use super::room::FfiParticipant; use crate::{server, FfiError, FfiHandleId}; pub async fn track_changed_trigger( From 6a5d7f66badee25a6b5efb9d167f0ac146887039 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Wed, 21 Aug 2024 10:51:49 -0700 Subject: [PATCH 09/34] Address comments --- livekit-ffi/src/server/audio_stream.rs | 4 ++-- livekit-ffi/src/server/utils.rs | 16 ++++------------ livekit-ffi/src/server/video_stream.rs | 4 ++-- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 180b6d490..8cb348bcd 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -136,7 +136,7 @@ impl FfiAudioStream { track_source.into(), track_tx, )); - // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done + // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done loop { let track = track_rx.recv().await; @@ -167,7 +167,7 @@ impl FfiAudioStream { } } } else { - // When tracks are done (i.e. the participant leaves the room), we are done + // when tracks are done (i.e. the participant leaves the room), we are done break; } } diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index eb94e480e..2758111b4 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -11,12 +11,8 @@ pub async fn track_changed_trigger( ) { for track_pub in participant.participant.track_publications().values() { if track_pub.source() == track_source.into() { - let track = track_pub.track(); - match track { - Some(track) => { - let _ = track_tx.send(track).await; - } - _ => {} + if let Some(track) = track_pub.track() { + let _ = track_tx.send(track).await; } } } @@ -29,12 +25,8 @@ pub async fn track_changed_trigger( continue; } if publication.source() == track_source.into() { - let track = publication.track(); - match track { - Some(track) => { - let _ = track_tx.send(track.into()).await; - } - _ => {} + if let Some(track) = publication.track() { + let _ = track_tx.send(track.into()).await; } } } diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 90a6c2dc3..677ac6b33 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -199,7 +199,7 @@ impl FfiVideoStream { track_source.into(), track_tx, )); - // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done + // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done loop { let track = track_rx.recv().await; @@ -232,7 +232,7 @@ impl FfiVideoStream { } } } else { - // When tracks are done (i.e. the participant leaves the room), we are done + // when tracks are done (i.e. the participant leaves the room), we are done break; } } From 76e8ec0bf9f5df52c52bc214a656e8b36302f22c Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Wed, 4 Sep 2024 17:19:29 -0700 Subject: [PATCH 10/34] cooking --- livekit-ffi/protocol/audio_frame.proto | 2 ++ livekit-ffi/src/livekit.proto.rs | 4 ++++ livekit-ffi/src/server/audio_stream.rs | 17 +++++++++++++---- livekit-ffi/src/server/video_stream.rs | 11 ++++++++--- 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/livekit-ffi/protocol/audio_frame.proto b/livekit-ffi/protocol/audio_frame.proto index 9ca9e9762..abe304ff5 100644 --- a/livekit-ffi/protocol/audio_frame.proto +++ b/livekit-ffi/protocol/audio_frame.proto @@ -34,6 +34,8 @@ message AudioStreamFromParticipantRequest { uint64 participant_handle = 1; AudioStreamType type = 2; optional TrackSource track_source = 3; + uint32 sample_rate = 5; + uint32 num_channels = 6; } message AudioStreamFromParticipantResponse { OwnedAudioStream stream = 1; } diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 73032d43e..38351d72a 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -2922,6 +2922,10 @@ pub struct AudioStreamFromParticipantRequest { pub r#type: i32, #[prost(enumeration="TrackSource", optional, tag="3")] pub track_source: ::core::option::Option, + #[prost(uint32, tag="5")] + pub sample_rate: u32, + #[prost(uint32, tag="6")] + pub num_channels: u32, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index e5aa8adcd..635baf0f5 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -94,16 +94,19 @@ impl FfiAudioStream { server: &'static server::FfiServer, request: proto::AudioStreamFromParticipantRequest, ) -> FfiResult { - let (close_tx, close_rx) = oneshot::channel(); + let (self_dropped_tx, self_dropped_rx) = oneshot::channel(); let handle_id = server.next_id(); let stream_type = request.r#type(); let audio_stream = match stream_type { #[cfg(not(target_arch = "wasm32"))] proto::AudioStreamType::AudioStreamNative => { - let audio_stream = Self { handle_id, stream_type, close_tx }; + let audio_stream = Self { handle_id, stream_type, self_dropped_tx }; let handle = server.async_runtime.spawn(Self::participant_audio_stream_task( - server, request, handle_id, close_rx, + server, + request, + handle_id, + self_dropped_rx, )); server.watch_panic(handle); Ok::(audio_stream) @@ -155,12 +158,18 @@ impl FfiAudioStream { }; let (c_tx, c_rx) = oneshot::channel::<()>(); let (done_tx, mut done_rx) = oneshot::channel::<()>(); + let sample_rate = + if request.sample_rate == 0 { 48000 } else { request.sample_rate as i32 }; + + let num_channels = + if request.num_channels == 0 { 1 } else { request.num_channels as i32 }; server.async_runtime.spawn(async move { Self::native_audio_stream_task( server, stream_handle, - NativeAudioStream::new(rtc_track), + NativeAudioStream::new(rtc_track, sample_rate, num_channels), c_rx, + server.watch_handle_dropped(request.participant_handle), ) .await; let _ = done_tx.send(()); diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index c103c9075..7e7623072 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -89,16 +89,20 @@ impl FfiVideoStream { server: &'static server::FfiServer, request: proto::VideoStreamFromParticipantRequest, ) -> FfiResult { - let (close_tx, close_rx) = oneshot::channel(); + let (self_dropped_tx, self_dropped_rx) = oneshot::channel(); let stream_type = request.r#type(); let handle_id = server.next_id(); let dst_type = request.format.and_then(|_| Some(request.format())); let stream = match stream_type { #[cfg(not(target_arch = "wasm32"))] proto::VideoStreamType::VideoStreamNative => { - let video_stream = Self { handle_id, close_tx, stream_type }; + let video_stream = Self { handle_id, self_dropped_tx, stream_type }; let handle = server.async_runtime.spawn(Self::participant_video_stream_task( - server, request, handle_id, dst_type, close_rx, + server, + request, + handle_id, + dst_type, + self_dropped_rx, )); server.watch_panic(handle); Ok::(video_stream) @@ -222,6 +226,7 @@ impl FfiVideoStream { request.normalize_stride, NativeVideoStream::new(rtc_track), c_rx, + server.watch_handle_dropped(request.participant_handle), ) .await; let _ = done_tx.send(()); From f229c1f0b0822581a82f4116d3a2f9a91f2b8ef7 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 10:33:38 -0700 Subject: [PATCH 11/34] testing --- livekit-ffi/src/server/audio_stream.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 635baf0f5..0151657df 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -128,7 +128,7 @@ impl FfiAudioStream { server: &'static server::FfiServer, request: proto::AudioStreamFromParticipantRequest, stream_handle: FfiHandleId, - mut close_rx: oneshot::Receiver<()>, + mut self_dropped_rx: oneshot::Receiver<()>, ) { let ffi_participant = utils::ffi_participant_from_handle(server, request.participant_handle); @@ -175,7 +175,7 @@ impl FfiAudioStream { let _ = done_tx.send(()); }); tokio::select! { - _ = &mut close_rx => { + _ = &mut self_dropped_rx => { let _ = c_tx.send(()); return } @@ -209,6 +209,7 @@ impl FfiAudioStream { let Some(frame) = frame else { break; }; + log::info!("NEIL received audio frame: {:?}", frame); let handle_id = server.next_id(); let buffer_info = proto::AudioFrameBufferInfo::from(&frame); From 03ac21b02e9db5b28e7f0c709f297e92d92d7a26 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 10:49:08 -0700 Subject: [PATCH 12/34] as --- livekit-ffi/src/server/audio_stream.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 0151657df..51bf46002 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -150,7 +150,9 @@ impl FfiAudioStream { // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done loop { + log::info!("NEIL track loop"); let track = track_rx.recv().await; + log::info!("NEIL got track"); if let Some(track) = track { let rtc_track = track.rtc_track(); let MediaStreamTrack::Audio(rtc_track) = rtc_track else { @@ -197,12 +199,15 @@ impl FfiAudioStream { mut self_dropped_rx: oneshot::Receiver<()>, mut handle_dropped_rx: oneshot::Receiver<()>, ) { + log::info!("NEIL native_audio_stream_task"); loop { tokio::select! { _ = &mut self_dropped_rx => { + log::info!("NEIL self dropped"); break; } _ = &mut handle_dropped_rx => { + log::info!("NEIL handle dropped"); break; } frame = native_stream.next() => { From 7bed59c12a4ba307968d8e6b80ad814f752b932f Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 11:03:07 -0700 Subject: [PATCH 13/34] . --- livekit-ffi/src/server/audio_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 51bf46002..ec049ee9a 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -152,7 +152,7 @@ impl FfiAudioStream { loop { log::info!("NEIL track loop"); let track = track_rx.recv().await; - log::info!("NEIL got track"); + log::info!("NEIL got track {:?}", track); if let Some(track) = track { let rtc_track = track.rtc_track(); let MediaStreamTrack::Audio(rtc_track) = rtc_track else { From 58387c8bc0279262f47534014299cd6659a29a75 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 11:16:53 -0700 Subject: [PATCH 14/34] . --- livekit-ffi/src/server/utils.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index 2758111b4..93aacf756 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -21,6 +21,7 @@ pub async fn track_changed_trigger( while let Some(event) = room_event_rx.recv().await { match event { RoomEvent::TrackPublished { publication, participant: p } => { + log::info!("NEIL track published: {:?}", publication); if participant.participant.identity() != p.identity() { continue; } @@ -31,6 +32,7 @@ pub async fn track_changed_trigger( } } RoomEvent::ParticipantDisconnected(participant) => { + log::info!("NEIL part dis: {:?}", publication); if participant.identity() == participant.identity() { break; } From 06bdf24d7ec9d7374af54ff9b9b920f0b79a449c Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 11:17:33 -0700 Subject: [PATCH 15/34] . --- livekit-ffi/src/server/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index 93aacf756..07582f671 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -32,7 +32,7 @@ pub async fn track_changed_trigger( } } RoomEvent::ParticipantDisconnected(participant) => { - log::info!("NEIL part dis: {:?}", publication); + log::info!("NEIL part dis"); if participant.identity() == participant.identity() { break; } From 4de91a6631f23c8ad340599c82d6f7682ec58232 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 11:35:32 -0700 Subject: [PATCH 16/34] . --- livekit-ffi/src/server/audio_stream.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index ec049ee9a..7c56081a2 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -214,7 +214,6 @@ impl FfiAudioStream { let Some(frame) = frame else { break; }; - log::info!("NEIL received audio frame: {:?}", frame); let handle_id = server.next_id(); let buffer_info = proto::AudioFrameBufferInfo::from(&frame); From 19c21bbaf1e5e6bed21dc53ba88105c674fe7eb7 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 12:01:45 -0700 Subject: [PATCH 17/34] . --- livekit-ffi/src/server/audio_stream.rs | 30 ++++++++++++++++++++------ 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 7c56081a2..8aa1a7024 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -73,6 +73,7 @@ impl FfiAudioStream { native_stream, self_dropped_rx, server.watch_handle_dropped(new_stream.track_handle), + true, )); server.watch_panic(handle); Ok::(audio_stream) @@ -172,6 +173,7 @@ impl FfiAudioStream { NativeAudioStream::new(rtc_track, sample_rate, num_channels), c_rx, server.watch_handle_dropped(request.participant_handle), + false, ) .await; let _ = done_tx.send(()); @@ -190,6 +192,15 @@ impl FfiAudioStream { break; } } + log::info!("NEIL sending eos"); + if let Err(err) = server.send_event(proto::ffi_event::Message::AudioStreamEvent( + proto::AudioStreamEvent { + stream_handle: stream_handle, + message: Some(proto::audio_stream_event::Message::Eos(proto::AudioStreamEos {})), + }, + )) { + log::warn!("failed to send audio eos: {}", err); + } } async fn native_audio_stream_task( @@ -198,6 +209,7 @@ impl FfiAudioStream { mut native_stream: NativeAudioStream, mut self_dropped_rx: oneshot::Receiver<()>, mut handle_dropped_rx: oneshot::Receiver<()>, + send_eos: bool, ) { log::info!("NEIL native_audio_stream_task"); loop { @@ -238,13 +250,17 @@ impl FfiAudioStream { } } } - if let Err(err) = server.send_event(proto::ffi_event::Message::AudioStreamEvent( - proto::AudioStreamEvent { - stream_handle: stream_handle_id, - message: Some(proto::audio_stream_event::Message::Eos(proto::AudioStreamEos {})), - }, - )) { - log::warn!("failed to send audio eos: {}", err); + if send_eos { + if let Err(err) = server.send_event(proto::ffi_event::Message::AudioStreamEvent( + proto::AudioStreamEvent { + stream_handle: stream_handle_id, + message: Some(proto::audio_stream_event::Message::Eos( + proto::AudioStreamEos {}, + )), + }, + )) { + log::warn!("failed to send audio eos: {}", err); + } } } } From c7b56c50d309ff6e49b7326c93ae89103a4e7814 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 12:20:31 -0700 Subject: [PATCH 18/34] . --- livekit-ffi/src/server/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/livekit-ffi/src/server/mod.rs b/livekit-ffi/src/server/mod.rs index 74d00fcd3..6901cb631 100644 --- a/livekit-ffi/src/server/mod.rs +++ b/livekit-ffi/src/server/mod.rs @@ -172,6 +172,7 @@ impl FfiServer { where T: FfiHandle, { + log::trace!("storing handle {} of type {}", id, std::any::type_name::()); self.ffi_handles.insert(id, Box::new(handle)); } From 6093ae50ec37ca82b1bed351a54c04a5e497a1e4 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 12:39:52 -0700 Subject: [PATCH 19/34] . --- livekit-ffi/src/server/mod.rs | 1 - livekit-ffi/src/server/room.rs | 5 ----- 2 files changed, 6 deletions(-) diff --git a/livekit-ffi/src/server/mod.rs b/livekit-ffi/src/server/mod.rs index 6901cb631..74d00fcd3 100644 --- a/livekit-ffi/src/server/mod.rs +++ b/livekit-ffi/src/server/mod.rs @@ -172,7 +172,6 @@ impl FfiServer { where T: FfiHandle, { - log::trace!("storing handle {} of type {}", id, std::any::type_name::()); self.ffi_handles.insert(id, Box::new(handle)); } diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 20a4c9b0c..a8ebefbfd 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -837,11 +837,6 @@ async fn forward_event( } RoomEvent::TrackUnsubscribed { track, publication: _, participant } => { let track_sid = track.sid(); - if let Some(handle) = inner.track_handle_lookup.lock().remove(&track_sid) { - server.drop_handle(handle); - } else { - log::warn!("track {} was not found in the lookup table", track_sid); - } let _ = send_event(proto::room_event::Message::TrackUnsubscribed( proto::TrackUnsubscribed { participant_identity: participant.identity().to_string(), From 638e95e39213bd90b0b47497e95729254457a10c Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 12:40:02 -0700 Subject: [PATCH 20/34] . --- livekit-ffi/src/server/room.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index a8ebefbfd..cf6fe02f5 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -836,7 +836,6 @@ async fn forward_event( })); } RoomEvent::TrackUnsubscribed { track, publication: _, participant } => { - let track_sid = track.sid(); let _ = send_event(proto::room_event::Message::TrackUnsubscribed( proto::TrackUnsubscribed { participant_identity: participant.identity().to_string(), From e9bd73bce5c0b8e5fc7b79545aae1cd14957af0e Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 12:50:32 -0700 Subject: [PATCH 21/34] . --- livekit-ffi/src/server/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index 07582f671..384ff7f43 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -21,7 +21,7 @@ pub async fn track_changed_trigger( while let Some(event) = room_event_rx.recv().await { match event { RoomEvent::TrackPublished { publication, participant: p } => { - log::info!("NEIL track published: {:?}", publication); + log::info!("NEIL track published: {:?} {:?} {:?}", publication, track_source, publication.source()); if participant.participant.identity() != p.identity() { continue; } From dac9408613f96e6786e276fe4823c1e7a276aed5 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 12:52:01 -0700 Subject: [PATCH 22/34] . --- livekit-ffi/src/server/utils.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index 384ff7f43..72bbac3a6 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -23,6 +23,7 @@ pub async fn track_changed_trigger( RoomEvent::TrackPublished { publication, participant: p } => { log::info!("NEIL track published: {:?} {:?} {:?}", publication, track_source, publication.source()); if participant.participant.identity() != p.identity() { + log::info!("NEIL part id not eq"); continue; } if publication.source() == track_source.into() { From 3b1534075fb15fc42922a55d3d4d875eff22c4db Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 12:53:36 -0700 Subject: [PATCH 23/34] . --- livekit-ffi/src/server/utils.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index 72bbac3a6..f89f7f585 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -27,7 +27,9 @@ pub async fn track_changed_trigger( continue; } if publication.source() == track_source.into() { + log::info!("NEIL pub source eq"); if let Some(track) = publication.track() { + log::info!("NEIL pub track"); let _ = track_tx.send(track.into()).await; } } From 094edb75ef98cd6ca29ea74ad59a602a1b91849a Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 12:56:31 -0700 Subject: [PATCH 24/34] . --- livekit-ffi/src/server/utils.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index f89f7f585..81c790151 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -20,18 +20,12 @@ pub async fn track_changed_trigger( let mut room_event_rx = room.subscribe(); while let Some(event) = room_event_rx.recv().await { match event { - RoomEvent::TrackPublished { publication, participant: p } => { - log::info!("NEIL track published: {:?} {:?} {:?}", publication, track_source, publication.source()); + RoomEvent::TrackSubscribed { track, publication, participant: p } => { if participant.participant.identity() != p.identity() { - log::info!("NEIL part id not eq"); continue; } if publication.source() == track_source.into() { - log::info!("NEIL pub source eq"); - if let Some(track) = publication.track() { - log::info!("NEIL pub track"); - let _ = track_tx.send(track.into()).await; - } + let _ = track_tx.send(track.into()).await; } } RoomEvent::ParticipantDisconnected(participant) => { From b7fe22bf0b684cc3808fa0009da8c5cb3e003e1d Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 13:18:14 -0700 Subject: [PATCH 25/34] . --- livekit-ffi/src/server/utils.rs | 6 +++--- livekit-ffi/src/server/video_stream.rs | 29 +++++++++++++++++++------- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index 81c790151..f850b0b20 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -28,10 +28,10 @@ pub async fn track_changed_trigger( let _ = track_tx.send(track.into()).await; } } - RoomEvent::ParticipantDisconnected(participant) => { + RoomEvent::ParticipantDisconnected(p) => { log::info!("NEIL part dis"); - if participant.identity() == participant.identity() { - break; + if p.identity() == participant.participant.identity() { + return; } } RoomEvent::Disconnected { reason: _ } => { diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 7e7623072..af0e13fcc 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -68,6 +68,7 @@ impl FfiVideoStream { NativeVideoStream::new(rtc_track), self_dropped_rx, server.watch_handle_dropped(new_stream.track_handle), + true, )); server.watch_panic(handle); Ok::(video_stream) @@ -126,6 +127,7 @@ impl FfiVideoStream { mut native_stream: NativeVideoStream, mut self_dropped_rx: oneshot::Receiver<()>, mut handle_dropped_rx: oneshot::Receiver<()>, + send_eos: bool, ) { loop { tokio::select! { @@ -173,13 +175,17 @@ impl FfiVideoStream { } } - if let Err(err) = server.send_event(proto::ffi_event::Message::VideoStreamEvent( - proto::VideoStreamEvent { - stream_handle, - message: Some(proto::video_stream_event::Message::Eos(proto::VideoStreamEos {})), - }, - )) { - log::warn!("failed to send video EOS: {}", err); + if send_eos { + if let Err(err) = server.send_event(proto::ffi_event::Message::VideoStreamEvent( + proto::VideoStreamEvent { + stream_handle, + message: Some(proto::video_stream_event::Message::Eos( + proto::VideoStreamEos {}, + )), + }, + )) { + log::warn!("failed to send video EOS: {}", err); + } } } @@ -227,6 +233,7 @@ impl FfiVideoStream { NativeVideoStream::new(rtc_track), c_rx, server.watch_handle_dropped(request.participant_handle), + false, ) .await; let _ = done_tx.send(()); @@ -245,5 +252,13 @@ impl FfiVideoStream { break; } } + if let Err(err) = server.send_event(proto::ffi_event::Message::VideoStreamEvent( + proto::VideoStreamEvent { + stream_handle, + message: Some(proto::video_stream_event::Message::Eos(proto::VideoStreamEos {})), + }, + )) { + log::warn!("failed to send video EOS: {}", err); + } } } From 3567262136d44f7e6e7f2f08a8ab0fc5c4bdb787 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 13:24:03 -0700 Subject: [PATCH 26/34] . --- livekit-ffi/src/server/utils.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index f850b0b20..40cd3ad44 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -31,6 +31,7 @@ pub async fn track_changed_trigger( RoomEvent::ParticipantDisconnected(p) => { log::info!("NEIL part dis"); if p.identity() == participant.participant.identity() { + log::info!("NEIL part dis 2"); return; } } From 363e2f5a75b0c3ff11ae5c9050c71605071b8756 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 13:25:43 -0700 Subject: [PATCH 27/34] . --- livekit-ffi/src/server/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index 40cd3ad44..a0118cf94 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -29,7 +29,7 @@ pub async fn track_changed_trigger( } } RoomEvent::ParticipantDisconnected(p) => { - log::info!("NEIL part dis"); + log::info!("NEIL part dis {:?} {:?}", p.identity(), participant.participant.identity()); if p.identity() == participant.participant.identity() { log::info!("NEIL part dis 2"); return; From ccbbd60fd9a700150d34b9b49f615e3867a07064 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 13:55:29 -0700 Subject: [PATCH 28/34] . --- livekit-ffi/src/server/audio_stream.rs | 20 +++++++++++++++++++- livekit-ffi/src/server/utils.rs | 11 +++++++++-- livekit-ffi/src/server/video_stream.rs | 2 ++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 8aa1a7024..99ad8b852 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -143,10 +143,12 @@ impl FfiAudioStream { let track_source = request.track_source(); let (track_tx, mut track_rx) = mpsc::channel::(1); + let (track_finished_tx, mut track_finished_rx) = mpsc::channel::(1); server.async_runtime.spawn(utils::track_changed_trigger( ffi_participant, track_source.into(), track_tx, + track_finished_tx, )); // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done @@ -160,19 +162,35 @@ impl FfiAudioStream { continue; }; let (c_tx, c_rx) = oneshot::channel::<()>(); + let (handle_dropped_tx, handle_dropped_rx) = oneshot::channel::<()>(); let (done_tx, mut done_rx) = oneshot::channel::<()>(); let sample_rate = if request.sample_rate == 0 { 48000 } else { request.sample_rate as i32 }; let num_channels = if request.num_channels == 0 { 1 } else { request.num_channels as i32 }; + + server.async_runtime.spawn(async move { + tokio::select! { + t = track_finished_rx.recv() => { + let Some(t) = t else { + return + }; + if t.sid() == track.sid() { + handle_dropped_tx.send(()).ok(); + return + } + } + } + }); + server.async_runtime.spawn(async move { Self::native_audio_stream_task( server, stream_handle, NativeAudioStream::new(rtc_track, sample_rate, num_channels), c_rx, - server.watch_handle_dropped(request.participant_handle), + handle_dropped_rx, false, ) .await; diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index a0118cf94..c71d9bce5 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -8,6 +8,7 @@ pub async fn track_changed_trigger( participant: FfiParticipant, track_source: TrackSource, track_tx: mpsc::Sender, + track_finished_tx: mpsc::Sender, ) { for track_pub in participant.participant.track_publications().values() { if track_pub.source() == track_source.into() { @@ -28,10 +29,16 @@ pub async fn track_changed_trigger( let _ = track_tx.send(track.into()).await; } } + RoomEvent::TrackUnsubscribed { track, publication, participant: p } => { + if p.identity() != participant.participant.identity() { + continue; + } + if publication.source() == track_source.into() { + let _ = track_finished_tx.send(track.into()).await; + } + } RoomEvent::ParticipantDisconnected(p) => { - log::info!("NEIL part dis {:?} {:?}", p.identity(), participant.participant.identity()); if p.identity() == participant.participant.identity() { - log::info!("NEIL part dis 2"); return; } } diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index af0e13fcc..450062407 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -208,10 +208,12 @@ impl FfiVideoStream { let track_source = request.track_source(); let (track_tx, mut track_rx) = mpsc::channel::(1); + let (track_finished_tx, _track_finished_rx) = mpsc::channel::(1); server.async_runtime.spawn(utils::track_changed_trigger( ffi_participant, track_source.into(), track_tx, + track_finished_tx, )); // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done From 18ac7cb4a9c038e4009a11e76fd00ce2e86424ce Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 13:58:42 -0700 Subject: [PATCH 29/34] . --- livekit-ffi/src/server/audio_stream.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 99ad8b852..0c061a072 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -199,9 +199,11 @@ impl FfiAudioStream { tokio::select! { _ = &mut self_dropped_rx => { let _ = c_tx.send(()); + log::info!("NEIL self_drop_rx"); return } _ = &mut done_rx => { + log::info!("NEIL done_rx"); break } } From 0feebf69216cabbd67e6926ca44fc42ea335f57a Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 14:03:02 -0700 Subject: [PATCH 30/34] . --- livekit-ffi/src/server/audio_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 0c061a072..19a9c948f 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -204,7 +204,7 @@ impl FfiAudioStream { } _ = &mut done_rx => { log::info!("NEIL done_rx"); - break + continue } } } else { From a60e6a403894d5818b52582d23296d68aaf055e2 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 15:19:06 -0700 Subject: [PATCH 31/34] . --- livekit-ffi/src/server/audio_stream.rs | 9 +++++---- livekit-ffi/src/server/utils.rs | 6 +++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 19a9c948f..669192968 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -15,7 +15,7 @@ use futures_util::StreamExt; use livekit::track::Track; use livekit::webrtc::{audio_stream::native::NativeAudioStream, prelude::*}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{broadcast, mpsc, oneshot}; use super::{room::FfiTrack, FfiHandle}; use crate::server::utils; @@ -143,12 +143,12 @@ impl FfiAudioStream { let track_source = request.track_source(); let (track_tx, mut track_rx) = mpsc::channel::(1); - let (track_finished_tx, mut track_finished_rx) = mpsc::channel::(1); + let (track_finished_tx, _) = broadcast::channel::(1); server.async_runtime.spawn(utils::track_changed_trigger( ffi_participant, track_source.into(), track_tx, - track_finished_tx, + track_finished_tx.clone(), )); // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done @@ -170,10 +170,11 @@ impl FfiAudioStream { let num_channels = if request.num_channels == 0 { 1 } else { request.num_channels as i32 }; + let mut track_finished_rx = track_finished_tx.subscribe(); server.async_runtime.spawn(async move { tokio::select! { t = track_finished_rx.recv() => { - let Some(t) = t else { + let Ok(t) = t else { return }; if t.sid() == track.sid() { diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs index c71d9bce5..59f0e037d 100644 --- a/livekit-ffi/src/server/utils.rs +++ b/livekit-ffi/src/server/utils.rs @@ -1,5 +1,5 @@ use livekit::prelude::{RoomEvent, Track, TrackSource}; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use super::room::FfiParticipant; use crate::{server, FfiError, FfiHandleId}; @@ -8,7 +8,7 @@ pub async fn track_changed_trigger( participant: FfiParticipant, track_source: TrackSource, track_tx: mpsc::Sender, - track_finished_tx: mpsc::Sender, + track_finished_tx: broadcast::Sender, ) { for track_pub in participant.participant.track_publications().values() { if track_pub.source() == track_source.into() { @@ -34,7 +34,7 @@ pub async fn track_changed_trigger( continue; } if publication.source() == track_source.into() { - let _ = track_finished_tx.send(track.into()).await; + let _ = track_finished_tx.send(track.into()); } } RoomEvent::ParticipantDisconnected(p) => { From 87cc2d5edb3a7d18108f0ed40762418686ed3e17 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 15:20:26 -0700 Subject: [PATCH 32/34] . --- livekit-ffi/src/server/video_stream.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 450062407..54bef8ffc 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -17,7 +17,7 @@ use livekit::{ prelude::Track, webrtc::{prelude::*, video_stream::native::NativeVideoStream}, }; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{broadcast, mpsc, oneshot}; use super::{colorcvt, room::FfiTrack, FfiHandle}; use crate::server::utils; @@ -208,12 +208,12 @@ impl FfiVideoStream { let track_source = request.track_source(); let (track_tx, mut track_rx) = mpsc::channel::(1); - let (track_finished_tx, _track_finished_rx) = mpsc::channel::(1); + let (track_finished_tx, track_finished_rx) = broadcast::channel::(1); server.async_runtime.spawn(utils::track_changed_trigger( ffi_participant, track_source.into(), track_tx, - track_finished_tx, + track_finished_tx.clone(), )); // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done @@ -226,6 +226,22 @@ impl FfiVideoStream { }; let (c_tx, c_rx) = oneshot::channel::<()>(); let (done_tx, mut done_rx) = oneshot::channel::<()>(); + + let mut track_finished_rx = track_finished_tx.subscribe(); + server.async_runtime.spawn(async move { + tokio::select! { + t = track_finished_rx.recv() => { + let Ok(t) = t else { + return + }; + if t.sid() == track.sid() { + handle_dropped_tx.send(()).ok(); + return + } + } + } + }); + server.async_runtime.spawn(async move { Self::native_video_stream_task( server, From d2486e96b20c3b09c0b680abd597a2c260856a56 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 15:21:54 -0700 Subject: [PATCH 33/34] . --- livekit-ffi/src/server/video_stream.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 54bef8ffc..5b2d930b2 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -225,6 +225,7 @@ impl FfiVideoStream { continue; }; let (c_tx, c_rx) = oneshot::channel::<()>(); + let (handle_dropped_tx, handle_dropped_rx) = oneshot::channel::<()>(); let (done_tx, mut done_rx) = oneshot::channel::<()>(); let mut track_finished_rx = track_finished_tx.subscribe(); @@ -250,7 +251,7 @@ impl FfiVideoStream { request.normalize_stride, NativeVideoStream::new(rtc_track), c_rx, - server.watch_handle_dropped(request.participant_handle), + handle_dropped_rx, false, ) .await; @@ -262,7 +263,7 @@ impl FfiVideoStream { return } _ = &mut done_rx => { - break + continue } } } else { From 97413c91ad3523cfec1ef15ceb634d48be416ce6 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 15:23:47 -0700 Subject: [PATCH 34/34] . --- livekit-ffi/src/server/audio_stream.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 669192968..bef90859a 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -153,9 +153,7 @@ impl FfiAudioStream { // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done loop { - log::info!("NEIL track loop"); let track = track_rx.recv().await; - log::info!("NEIL got track {:?}", track); if let Some(track) = track { let rtc_track = track.rtc_track(); let MediaStreamTrack::Audio(rtc_track) = rtc_track else { @@ -200,11 +198,9 @@ impl FfiAudioStream { tokio::select! { _ = &mut self_dropped_rx => { let _ = c_tx.send(()); - log::info!("NEIL self_drop_rx"); return } _ = &mut done_rx => { - log::info!("NEIL done_rx"); continue } } @@ -213,7 +209,6 @@ impl FfiAudioStream { break; } } - log::info!("NEIL sending eos"); if let Err(err) = server.send_event(proto::ffi_event::Message::AudioStreamEvent( proto::AudioStreamEvent { stream_handle: stream_handle, @@ -232,15 +227,12 @@ impl FfiAudioStream { mut handle_dropped_rx: oneshot::Receiver<()>, send_eos: bool, ) { - log::info!("NEIL native_audio_stream_task"); loop { tokio::select! { _ = &mut self_dropped_rx => { - log::info!("NEIL self dropped"); break; } _ = &mut handle_dropped_rx => { - log::info!("NEIL handle dropped"); break; } frame = native_stream.next() => {