From 0fe6ae76affc50a740ec628c4302d01648a21173 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 5 Sep 2024 15:39:55 -0700 Subject: [PATCH] Convenience API for iterating frames from a participant (#396) Convencience API for iterating frames --- Cargo.lock | 8 +- livekit-ffi/protocol/audio_frame.proto | 11 ++ livekit-ffi/protocol/ffi.proto | 29 ++--- livekit-ffi/protocol/video_frame.proto | 12 ++ livekit-ffi/src/livekit.proto.rs | 78 ++++++++++--- livekit-ffi/src/server/audio_stream.rs | 154 +++++++++++++++++++++++-- livekit-ffi/src/server/mod.rs | 1 + livekit-ffi/src/server/requests.rs | 31 ++++- livekit-ffi/src/server/room.rs | 6 - livekit-ffi/src/server/utils.rs | 62 ++++++++++ livekit-ffi/src/server/video_stream.rs | 141 +++++++++++++++++++++- 11 files changed, 482 insertions(+), 51 deletions(-) create mode 100644 livekit-ffi/src/server/utils.rs diff --git a/Cargo.lock b/Cargo.lock index cd647d60e..907ca7ae0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1490,7 +1490,7 @@ dependencies = [ [[package]] name = "libwebrtc" -version = "0.3.5" +version = "0.3.6" dependencies = [ "cxx", "env_logger", @@ -1546,7 +1546,7 @@ checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" [[package]] name = "livekit" -version = "0.5.1" +version = "0.5.2" dependencies = [ "futures-util", "lazy_static", @@ -3234,7 +3234,7 @@ checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" [[package]] name = "webrtc-sys" -version = "0.3.3" +version = "0.3.4" dependencies = [ "cc", "cxx", @@ -3247,7 +3247,7 @@ dependencies = [ [[package]] name = "webrtc-sys-build" -version = "0.3.3" +version = "0.3.4" dependencies = [ "fs2", "regex", diff --git a/livekit-ffi/protocol/audio_frame.proto b/livekit-ffi/protocol/audio_frame.proto index 35f6b8372..abe304ff5 100644 --- a/livekit-ffi/protocol/audio_frame.proto +++ b/livekit-ffi/protocol/audio_frame.proto @@ -18,6 +18,7 @@ package livekit.proto; option csharp_namespace = "LiveKit.Proto"; import "handle.proto"; +import "track.proto"; // Create a new AudioStream // AudioStream is used to receive audio frames from a track @@ -29,6 +30,16 @@ message NewAudioStreamRequest { } message NewAudioStreamResponse { OwnedAudioStream stream = 1; } +message AudioStreamFromParticipantRequest { + uint64 participant_handle = 1; + AudioStreamType type = 2; + optional TrackSource track_source = 3; + uint32 sample_rate = 5; + uint32 num_channels = 6; +} + +message AudioStreamFromParticipantResponse { OwnedAudioStream stream = 1; } + // Create a new AudioSource message NewAudioSourceRequest { AudioSourceType type = 1; diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index 5d4f9bc0a..ba87d8417 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -82,15 +82,16 @@ message FfiRequest { NewVideoSourceRequest new_video_source = 21; CaptureVideoFrameRequest capture_video_frame = 22; VideoConvertRequest video_convert = 23; + VideoStreamFromParticipantRequest video_stream_from_participant = 24; // Audio - NewAudioStreamRequest new_audio_stream = 24; - NewAudioSourceRequest new_audio_source = 25; - CaptureAudioFrameRequest capture_audio_frame = 26; - NewAudioResamplerRequest new_audio_resampler = 27; - RemixAndResampleRequest remix_and_resample = 28; - - E2eeRequest e2ee = 29; + NewAudioStreamRequest new_audio_stream = 25; + NewAudioSourceRequest new_audio_source = 26; + CaptureAudioFrameRequest capture_audio_frame = 27; + NewAudioResamplerRequest new_audio_resampler = 28; + RemixAndResampleRequest remix_and_resample = 29; + E2eeRequest e2ee = 30; + AudioStreamFromParticipantRequest audio_stream_from_participant = 31; } } @@ -125,14 +126,16 @@ message FfiResponse { NewVideoSourceResponse new_video_source = 21; CaptureVideoFrameResponse capture_video_frame = 22; VideoConvertResponse video_convert = 23; + VideoStreamFromParticipantResponse video_stream_from_participant = 24; // Audio - NewAudioStreamResponse new_audio_stream = 24; - NewAudioSourceResponse new_audio_source = 25; - CaptureAudioFrameResponse capture_audio_frame = 26; - NewAudioResamplerResponse new_audio_resampler = 27; - RemixAndResampleResponse remix_and_resample = 28; - E2eeResponse e2ee = 29; + NewAudioStreamResponse new_audio_stream = 25; + NewAudioSourceResponse new_audio_source = 26; + CaptureAudioFrameResponse capture_audio_frame = 27; + NewAudioResamplerResponse new_audio_resampler = 28; + RemixAndResampleResponse remix_and_resample = 29; + AudioStreamFromParticipantResponse audio_stream_from_participant = 30; + E2eeResponse e2ee = 31; } } diff --git a/livekit-ffi/protocol/video_frame.proto b/livekit-ffi/protocol/video_frame.proto index ffb2e0bde..090518b5f 100644 --- a/livekit-ffi/protocol/video_frame.proto +++ b/livekit-ffi/protocol/video_frame.proto @@ -18,6 +18,7 @@ package livekit.proto; option csharp_namespace = "LiveKit.Proto"; import "handle.proto"; +import "track.proto"; // Create a new VideoStream // VideoStream is used to receive video frames from a track @@ -30,6 +31,17 @@ message NewVideoStreamRequest { } message NewVideoStreamResponse { OwnedVideoStream stream = 1; } +// Request a video stream from a participant +message VideoStreamFromParticipantRequest { + uint64 participant_handle = 1; + VideoStreamType type = 2; + TrackSource track_source = 3; + optional VideoBufferType format = 4; + bool normalize_stride = 5; +} + +message VideoStreamFromParticipantResponse { OwnedVideoStream stream = 1;} + // Create a new VideoSource // VideoSource is used to send video frame to a track message NewVideoSourceRequest { diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 141966bba..38351d72a 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -1,4 +1,5 @@ // @generated +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FrameCryptor { @@ -1572,6 +1573,27 @@ pub struct NewVideoStreamResponse { #[prost(message, optional, tag="1")] pub stream: ::core::option::Option, } +/// Request a video stream from a participant +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VideoStreamFromParticipantRequest { + #[prost(uint64, tag="1")] + pub participant_handle: u64, + #[prost(enumeration="VideoStreamType", tag="2")] + pub r#type: i32, + #[prost(enumeration="TrackSource", tag="3")] + pub track_source: i32, + #[prost(enumeration="VideoBufferType", optional, tag="4")] + pub format: ::core::option::Option, + #[prost(bool, tag="5")] + pub normalize_stride: bool, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VideoStreamFromParticipantResponse { + #[prost(message, optional, tag="1")] + pub stream: ::core::option::Option, +} /// Create a new VideoSource /// VideoSource is used to send video frame to a track #[allow(clippy::derive_partial_eq_without_eq)] @@ -2891,6 +2913,26 @@ pub struct NewAudioStreamResponse { #[prost(message, optional, tag="1")] pub stream: ::core::option::Option, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AudioStreamFromParticipantRequest { + #[prost(uint64, tag="1")] + pub participant_handle: u64, + #[prost(enumeration="AudioStreamType", tag="2")] + pub r#type: i32, + #[prost(enumeration="TrackSource", optional, tag="3")] + pub track_source: ::core::option::Option, + #[prost(uint32, tag="5")] + pub sample_rate: u32, + #[prost(uint32, tag="6")] + pub num_channels: u32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AudioStreamFromParticipantResponse { + #[prost(message, optional, tag="1")] + pub stream: ::core::option::Option, +} /// Create a new AudioSource #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -3162,7 +3204,7 @@ impl AudioSourceType { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiRequest { - #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29")] + #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiRequest`. @@ -3217,26 +3259,30 @@ pub mod ffi_request { CaptureVideoFrame(super::CaptureVideoFrameRequest), #[prost(message, tag="23")] VideoConvert(super::VideoConvertRequest), - /// Audio #[prost(message, tag="24")] - NewAudioStream(super::NewAudioStreamRequest), + VideoStreamFromParticipant(super::VideoStreamFromParticipantRequest), + /// Audio #[prost(message, tag="25")] - NewAudioSource(super::NewAudioSourceRequest), + NewAudioStream(super::NewAudioStreamRequest), #[prost(message, tag="26")] - CaptureAudioFrame(super::CaptureAudioFrameRequest), + NewAudioSource(super::NewAudioSourceRequest), #[prost(message, tag="27")] - NewAudioResampler(super::NewAudioResamplerRequest), + CaptureAudioFrame(super::CaptureAudioFrameRequest), #[prost(message, tag="28")] - RemixAndResample(super::RemixAndResampleRequest), + NewAudioResampler(super::NewAudioResamplerRequest), #[prost(message, tag="29")] + RemixAndResample(super::RemixAndResampleRequest), + #[prost(message, tag="30")] E2ee(super::E2eeRequest), + #[prost(message, tag="31")] + AudioStreamFromParticipant(super::AudioStreamFromParticipantRequest), } } /// This is the output of livekit_ffi_request function. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiResponse { - #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29")] + #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiResponse`. @@ -3291,18 +3337,22 @@ pub mod ffi_response { CaptureVideoFrame(super::CaptureVideoFrameResponse), #[prost(message, tag="23")] VideoConvert(super::VideoConvertResponse), - /// Audio #[prost(message, tag="24")] - NewAudioStream(super::NewAudioStreamResponse), + VideoStreamFromParticipant(super::VideoStreamFromParticipantResponse), + /// Audio #[prost(message, tag="25")] - NewAudioSource(super::NewAudioSourceResponse), + NewAudioStream(super::NewAudioStreamResponse), #[prost(message, tag="26")] - CaptureAudioFrame(super::CaptureAudioFrameResponse), + NewAudioSource(super::NewAudioSourceResponse), #[prost(message, tag="27")] - NewAudioResampler(super::NewAudioResamplerResponse), + CaptureAudioFrame(super::CaptureAudioFrameResponse), #[prost(message, tag="28")] - RemixAndResample(super::RemixAndResampleResponse), + NewAudioResampler(super::NewAudioResamplerResponse), #[prost(message, tag="29")] + RemixAndResample(super::RemixAndResampleResponse), + #[prost(message, tag="30")] + AudioStreamFromParticipant(super::AudioStreamFromParticipantResponse), + #[prost(message, tag="31")] E2ee(super::E2eeResponse), } } diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index d0fd42972..bef90859a 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -13,10 +13,12 @@ // limitations under the License. use futures_util::StreamExt; +use livekit::track::Track; use livekit::webrtc::{audio_stream::native::NativeAudioStream, prelude::*}; -use tokio::sync::oneshot; +use tokio::sync::{broadcast, mpsc, oneshot}; use super::{room::FfiTrack, FfiHandle}; +use crate::server::utils; use crate::{proto, server, FfiError, FfiHandleId, FfiResult}; pub struct FfiAudioStream { @@ -38,7 +40,7 @@ impl FfiAudioStream { /// /// It is possible that the client receives an AudioFrame after the task is closed. The client /// musts ignore it. - pub fn setup( + pub fn from_track( server: &'static server::FfiServer, new_stream: proto::NewAudioStreamRequest, ) -> FfiResult { @@ -71,6 +73,7 @@ impl FfiAudioStream { native_stream, self_dropped_rx, server.watch_handle_dropped(new_stream.track_handle), + true, )); server.watch_panic(handle); Ok::(audio_stream) @@ -88,12 +91,141 @@ impl FfiAudioStream { }) } + pub fn from_participant( + server: &'static server::FfiServer, + request: proto::AudioStreamFromParticipantRequest, + ) -> FfiResult { + let (self_dropped_tx, self_dropped_rx) = oneshot::channel(); + let handle_id = server.next_id(); + let stream_type = request.r#type(); + let audio_stream = match stream_type { + #[cfg(not(target_arch = "wasm32"))] + proto::AudioStreamType::AudioStreamNative => { + let audio_stream = Self { handle_id, stream_type, self_dropped_tx }; + + let handle = server.async_runtime.spawn(Self::participant_audio_stream_task( + server, + request, + handle_id, + self_dropped_rx, + )); + server.watch_panic(handle); + Ok::(audio_stream) + } + _ => return Err(FfiError::InvalidRequest("unsupported audio stream type".into())), + }?; + + // Store the new audio stream and return the info + let info = proto::AudioStreamInfo::from(&audio_stream); + server.store_handle(handle_id, audio_stream); + + Ok(proto::OwnedAudioStream { + handle: Some(proto::FfiOwnedHandle { id: handle_id }), + info: Some(info), + }) + } + + async fn participant_audio_stream_task( + server: &'static server::FfiServer, + request: proto::AudioStreamFromParticipantRequest, + stream_handle: FfiHandleId, + mut self_dropped_rx: oneshot::Receiver<()>, + ) { + let ffi_participant = + utils::ffi_participant_from_handle(server, request.participant_handle); + let ffi_participant = match ffi_participant { + Ok(ffi_participant) => ffi_participant, + Err(err) => { + log::error!("failed to get participant: {}", err); + return; + } + }; + + let track_source = request.track_source(); + let (track_tx, mut track_rx) = mpsc::channel::(1); + let (track_finished_tx, _) = broadcast::channel::(1); + server.async_runtime.spawn(utils::track_changed_trigger( + ffi_participant, + track_source.into(), + track_tx, + track_finished_tx.clone(), + )); + // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done + + loop { + let track = track_rx.recv().await; + if let Some(track) = track { + let rtc_track = track.rtc_track(); + let MediaStreamTrack::Audio(rtc_track) = rtc_track else { + continue; + }; + let (c_tx, c_rx) = oneshot::channel::<()>(); + let (handle_dropped_tx, handle_dropped_rx) = oneshot::channel::<()>(); + let (done_tx, mut done_rx) = oneshot::channel::<()>(); + let sample_rate = + if request.sample_rate == 0 { 48000 } else { request.sample_rate as i32 }; + + let num_channels = + if request.num_channels == 0 { 1 } else { request.num_channels as i32 }; + + let mut track_finished_rx = track_finished_tx.subscribe(); + server.async_runtime.spawn(async move { + tokio::select! { + t = track_finished_rx.recv() => { + let Ok(t) = t else { + return + }; + if t.sid() == track.sid() { + handle_dropped_tx.send(()).ok(); + return + } + } + } + }); + + server.async_runtime.spawn(async move { + Self::native_audio_stream_task( + server, + stream_handle, + NativeAudioStream::new(rtc_track, sample_rate, num_channels), + c_rx, + handle_dropped_rx, + false, + ) + .await; + let _ = done_tx.send(()); + }); + tokio::select! { + _ = &mut self_dropped_rx => { + let _ = c_tx.send(()); + return + } + _ = &mut done_rx => { + continue + } + } + } else { + // when tracks are done (i.e. the participant leaves the room), we are done + break; + } + } + if let Err(err) = server.send_event(proto::ffi_event::Message::AudioStreamEvent( + proto::AudioStreamEvent { + stream_handle: stream_handle, + message: Some(proto::audio_stream_event::Message::Eos(proto::AudioStreamEos {})), + }, + )) { + log::warn!("failed to send audio eos: {}", err); + } + } + async fn native_audio_stream_task( server: &'static server::FfiServer, stream_handle_id: FfiHandleId, mut native_stream: NativeAudioStream, mut self_dropped_rx: oneshot::Receiver<()>, mut handle_dropped_rx: oneshot::Receiver<()>, + send_eos: bool, ) { loop { tokio::select! { @@ -131,13 +263,17 @@ impl FfiAudioStream { } } } - if let Err(err) = server.send_event(proto::ffi_event::Message::AudioStreamEvent( - proto::AudioStreamEvent { - stream_handle: stream_handle_id, - message: Some(proto::audio_stream_event::Message::Eos(proto::AudioStreamEos {})), - }, - )) { - log::warn!("failed to send audio eos: {}", err); + if send_eos { + if let Err(err) = server.send_event(proto::ffi_event::Message::AudioStreamEvent( + proto::AudioStreamEvent { + stream_handle: stream_handle_id, + message: Some(proto::audio_stream_event::Message::Eos( + proto::AudioStreamEos {}, + )), + }, + )) { + log::warn!("failed to send audio eos: {}", err); + } } } } diff --git a/livekit-ffi/src/server/mod.rs b/livekit-ffi/src/server/mod.rs index 1a26fd6ae..74d00fcd3 100644 --- a/livekit-ffi/src/server/mod.rs +++ b/livekit-ffi/src/server/mod.rs @@ -40,6 +40,7 @@ pub mod colorcvt; pub mod logger; pub mod requests; pub mod room; +mod utils; pub mod video_source; pub mod video_stream; diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index a13ade6a6..705e86a1d 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -329,10 +329,18 @@ fn on_new_video_stream( server: &'static FfiServer, new_stream: proto::NewVideoStreamRequest, ) -> FfiResult { - let stream_info = video_stream::FfiVideoStream::setup(server, new_stream)?; + let stream_info = video_stream::FfiVideoStream::from_track(server, new_stream)?; Ok(proto::NewVideoStreamResponse { stream: Some(stream_info) }) } +fn on_video_stream_from_participant( + server: &'static FfiServer, + request: proto::VideoStreamFromParticipantRequest, +) -> FfiResult { + let stream_info = video_stream::FfiVideoStream::from_participant(server, request)?; + Ok(proto::VideoStreamFromParticipantResponse { stream: Some(stream_info) }) +} + /// Create a new video source, used to publish data to a track fn on_new_video_source( server: &'static FfiServer, @@ -387,10 +395,19 @@ fn on_new_audio_stream( server: &'static FfiServer, new_stream: proto::NewAudioStreamRequest, ) -> FfiResult { - let stream_info = audio_stream::FfiAudioStream::setup(server, new_stream)?; + let stream_info = audio_stream::FfiAudioStream::from_track(server, new_stream)?; Ok(proto::NewAudioStreamResponse { stream: Some(stream_info) }) } +// Create a new audio stream from a participant and track source +fn on_audio_stream_from_participant_stream( + server: &'static FfiServer, + request: proto::AudioStreamFromParticipantRequest, +) -> FfiResult { + let stream_info = audio_stream::FfiAudioStream::from_participant(server, request)?; + Ok(proto::AudioStreamFromParticipantResponse { stream: Some(stream_info) }) +} + /// Create a new audio source (used to publish audio frames to a track) fn on_new_audio_source( server: &'static FfiServer, @@ -699,6 +716,11 @@ pub fn handle_request( proto::ffi_request::Message::NewVideoStream(new_stream) => { proto::ffi_response::Message::NewVideoStream(on_new_video_stream(server, new_stream)?) } + proto::ffi_request::Message::VideoStreamFromParticipant(new_stream) => { + proto::ffi_response::Message::VideoStreamFromParticipant( + on_video_stream_from_participant(server, new_stream)?, + ) + } proto::ffi_request::Message::NewVideoSource(new_source) => { proto::ffi_response::Message::NewVideoSource(on_new_video_source(server, new_source)?) } @@ -714,6 +736,11 @@ pub fn handle_request( proto::ffi_request::Message::NewAudioSource(new_source) => { proto::ffi_response::Message::NewAudioSource(on_new_audio_source(server, new_source)?) } + proto::ffi_request::Message::AudioStreamFromParticipant(new_stream) => { + proto::ffi_response::Message::AudioStreamFromParticipant( + on_audio_stream_from_participant_stream(server, new_stream)?, + ) + } proto::ffi_request::Message::CaptureAudioFrame(push) => { proto::ffi_response::Message::CaptureAudioFrame(on_capture_audio_frame(server, push)?) } diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 20a4c9b0c..cf6fe02f5 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -836,12 +836,6 @@ async fn forward_event( })); } RoomEvent::TrackUnsubscribed { track, publication: _, participant } => { - let track_sid = track.sid(); - if let Some(handle) = inner.track_handle_lookup.lock().remove(&track_sid) { - server.drop_handle(handle); - } else { - log::warn!("track {} was not found in the lookup table", track_sid); - } let _ = send_event(proto::room_event::Message::TrackUnsubscribed( proto::TrackUnsubscribed { participant_identity: participant.identity().to_string(), diff --git a/livekit-ffi/src/server/utils.rs b/livekit-ffi/src/server/utils.rs new file mode 100644 index 000000000..59f0e037d --- /dev/null +++ b/livekit-ffi/src/server/utils.rs @@ -0,0 +1,62 @@ +use livekit::prelude::{RoomEvent, Track, TrackSource}; +use tokio::sync::{broadcast, mpsc}; + +use super::room::FfiParticipant; +use crate::{server, FfiError, FfiHandleId}; + +pub async fn track_changed_trigger( + participant: FfiParticipant, + track_source: TrackSource, + track_tx: mpsc::Sender, + track_finished_tx: broadcast::Sender, +) { + for track_pub in participant.participant.track_publications().values() { + if track_pub.source() == track_source.into() { + if let Some(track) = track_pub.track() { + let _ = track_tx.send(track).await; + } + } + } + let room = &participant.room.room; + let mut room_event_rx = room.subscribe(); + while let Some(event) = room_event_rx.recv().await { + match event { + RoomEvent::TrackSubscribed { track, publication, participant: p } => { + if participant.participant.identity() != p.identity() { + continue; + } + if publication.source() == track_source.into() { + let _ = track_tx.send(track.into()).await; + } + } + RoomEvent::TrackUnsubscribed { track, publication, participant: p } => { + if p.identity() != participant.participant.identity() { + continue; + } + if publication.source() == track_source.into() { + let _ = track_finished_tx.send(track.into()); + } + } + RoomEvent::ParticipantDisconnected(p) => { + if p.identity() == participant.participant.identity() { + return; + } + } + RoomEvent::Disconnected { reason: _ } => { + break; + } + _ => {} + } + } +} + +pub fn ffi_participant_from_handle( + server: &'static server::FfiServer, + handle_id: FfiHandleId, +) -> Result { + let ffi_participant_handle = server.retrieve_handle::(handle_id); + if ffi_participant_handle.is_err() { + return Err(FfiError::InvalidRequest("participant not found".into())); + } + return Ok(ffi_participant_handle.unwrap().clone()); +} diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 4512e4778..5b2d930b2 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -13,10 +13,14 @@ // limitations under the License. use futures_util::StreamExt; -use livekit::webrtc::{prelude::*, video_stream::native::NativeVideoStream}; -use tokio::sync::oneshot; +use livekit::{ + prelude::Track, + webrtc::{prelude::*, video_stream::native::NativeVideoStream}, +}; +use tokio::sync::{broadcast, mpsc, oneshot}; use super::{colorcvt, room::FfiTrack, FfiHandle}; +use crate::server::utils; use crate::{proto, server, FfiError, FfiHandleId, FfiResult}; pub struct FfiVideoStream { @@ -38,7 +42,7 @@ impl FfiVideoStream { /// /// It is possible that the client receives a VideoFrame after the task is closed. The client /// musts ignore it. - pub fn setup( + pub fn from_track( server: &'static server::FfiServer, new_stream: proto::NewVideoStreamRequest, ) -> FfiResult { @@ -64,6 +68,7 @@ impl FfiVideoStream { NativeVideoStream::new(rtc_track), self_dropped_rx, server.watch_handle_dropped(new_stream.track_handle), + true, )); server.watch_panic(handle); Ok::(video_stream) @@ -81,6 +86,39 @@ impl FfiVideoStream { }) } + pub fn from_participant( + server: &'static server::FfiServer, + request: proto::VideoStreamFromParticipantRequest, + ) -> FfiResult { + let (self_dropped_tx, self_dropped_rx) = oneshot::channel(); + let stream_type = request.r#type(); + let handle_id = server.next_id(); + let dst_type = request.format.and_then(|_| Some(request.format())); + let stream = match stream_type { + #[cfg(not(target_arch = "wasm32"))] + proto::VideoStreamType::VideoStreamNative => { + let video_stream = Self { handle_id, self_dropped_tx, stream_type }; + let handle = server.async_runtime.spawn(Self::participant_video_stream_task( + server, + request, + handle_id, + dst_type, + self_dropped_rx, + )); + server.watch_panic(handle); + Ok::(video_stream) + } + _ => return Err(FfiError::InvalidRequest("unsupported video stream type".into())), + }?; + let info = proto::VideoStreamInfo::from(&stream); + server.store_handle(stream.handle_id, stream); + + Ok(proto::OwnedVideoStream { + handle: Some(proto::FfiOwnedHandle { id: handle_id }), + info: Some(info), + }) + } + async fn native_video_stream_task( server: &'static server::FfiServer, stream_handle: FfiHandleId, @@ -89,6 +127,7 @@ impl FfiVideoStream { mut native_stream: NativeVideoStream, mut self_dropped_rx: oneshot::Receiver<()>, mut handle_dropped_rx: oneshot::Receiver<()>, + send_eos: bool, ) { loop { tokio::select! { @@ -136,6 +175,102 @@ impl FfiVideoStream { } } + if send_eos { + if let Err(err) = server.send_event(proto::ffi_event::Message::VideoStreamEvent( + proto::VideoStreamEvent { + stream_handle, + message: Some(proto::video_stream_event::Message::Eos( + proto::VideoStreamEos {}, + )), + }, + )) { + log::warn!("failed to send video EOS: {}", err); + } + } + } + + async fn participant_video_stream_task( + server: &'static server::FfiServer, + request: proto::VideoStreamFromParticipantRequest, + stream_handle: FfiHandleId, + dst_type: Option, + mut close_rx: oneshot::Receiver<()>, + ) { + let ffi_participant = + utils::ffi_participant_from_handle(server, request.participant_handle); + let ffi_participant = match ffi_participant { + Ok(ffi_participant) => ffi_participant, + Err(err) => { + log::error!("failed to get participant: {}", err); + return; + } + }; + + let track_source = request.track_source(); + let (track_tx, mut track_rx) = mpsc::channel::(1); + let (track_finished_tx, track_finished_rx) = broadcast::channel::(1); + server.async_runtime.spawn(utils::track_changed_trigger( + ffi_participant, + track_source.into(), + track_tx, + track_finished_tx.clone(), + )); + // track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done + + loop { + let track = track_rx.recv().await; + if let Some(track) = track { + let rtc_track = track.rtc_track(); + let MediaStreamTrack::Video(rtc_track) = rtc_track else { + continue; + }; + let (c_tx, c_rx) = oneshot::channel::<()>(); + let (handle_dropped_tx, handle_dropped_rx) = oneshot::channel::<()>(); + let (done_tx, mut done_rx) = oneshot::channel::<()>(); + + let mut track_finished_rx = track_finished_tx.subscribe(); + server.async_runtime.spawn(async move { + tokio::select! { + t = track_finished_rx.recv() => { + let Ok(t) = t else { + return + }; + if t.sid() == track.sid() { + handle_dropped_tx.send(()).ok(); + return + } + } + } + }); + + server.async_runtime.spawn(async move { + Self::native_video_stream_task( + server, + stream_handle, + dst_type, + request.normalize_stride, + NativeVideoStream::new(rtc_track), + c_rx, + handle_dropped_rx, + false, + ) + .await; + let _ = done_tx.send(()); + }); + tokio::select! { + _ = &mut close_rx => { + let _ = c_tx.send(()); + return + } + _ = &mut done_rx => { + continue + } + } + } else { + // when tracks are done (i.e. the participant leaves the room), we are done + break; + } + } if let Err(err) = server.send_event(proto::ffi_event::Message::VideoStreamEvent( proto::VideoStreamEvent { stream_handle,