Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convenience API for iterating frames from a participant #396

Merged
merged 36 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3a93b61
Convencience API for iterating frames
keepingitneil Aug 20, 2024
200c121
video stream
keepingitneil Aug 20, 2024
0adf0f7
remove unused
keepingitneil Aug 20, 2024
d2ff1ca
fix import
keepingitneil Aug 20, 2024
a6aa6ad
Audio stream impl
keepingitneil Aug 20, 2024
72882a7
ffi
keepingitneil Aug 20, 2024
71e7f10
fix warnings
keepingitneil Aug 20, 2024
f32ae4a
Merge branch 'main' into neil/ts
keepingitneil Aug 20, 2024
aebea89
cargo fmt
keepingitneil Aug 21, 2024
6a5d7f6
Address comments
keepingitneil Aug 21, 2024
dc14126
Merge branch 'main' into neil/ts
keepingitneil Sep 4, 2024
76e8ec0
cooking
keepingitneil Sep 5, 2024
f229c1f
testing
keepingitneil Sep 5, 2024
03ac21b
as
keepingitneil Sep 5, 2024
7bed59c
.
keepingitneil Sep 5, 2024
58387c8
.
keepingitneil Sep 5, 2024
06bdf24
.
keepingitneil Sep 5, 2024
4de91a6
.
keepingitneil Sep 5, 2024
19c21bb
.
keepingitneil Sep 5, 2024
c7b56c5
.
keepingitneil Sep 5, 2024
6093ae5
.
keepingitneil Sep 5, 2024
638e95e
.
keepingitneil Sep 5, 2024
e9bd73b
.
keepingitneil Sep 5, 2024
dac9408
.
keepingitneil Sep 5, 2024
3b15340
.
keepingitneil Sep 5, 2024
094edb7
.
keepingitneil Sep 5, 2024
b7fe22b
.
keepingitneil Sep 5, 2024
3567262
.
keepingitneil Sep 5, 2024
363e2f5
.
keepingitneil Sep 5, 2024
ccbbd60
.
keepingitneil Sep 5, 2024
18ac7cb
.
keepingitneil Sep 5, 2024
0feebf6
.
keepingitneil Sep 5, 2024
a60e6a4
.
keepingitneil Sep 5, 2024
87cc2d5
.
keepingitneil Sep 5, 2024
d2486e9
.
keepingitneil Sep 5, 2024
97413c9
.
keepingitneil Sep 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions livekit-ffi/protocol/audio_frame.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package livekit.proto;
option csharp_namespace = "LiveKit.Proto";

import "handle.proto";
import "track.proto";

// Create a new AudioStream
// AudioStream is used to receive audio frames from a track
Expand All @@ -27,6 +28,14 @@ message NewAudioStreamRequest {
}
message NewAudioStreamResponse { OwnedAudioStream stream = 1; }

message AudioStreamFromParticipantRequest {
uint64 participant_handle = 1;
AudioStreamType type = 2;
optional TrackSource track_source = 3;
}

message AudioStreamFromParticipantResponse { OwnedAudioStream stream = 1; }

// Create a new AudioSource
message NewAudioSourceRequest {
AudioSourceType type = 1;
Expand Down
17 changes: 11 additions & 6 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ message FfiRequest {
NewVideoSourceRequest new_video_source = 19;
CaptureVideoFrameRequest capture_video_frame = 20;
VideoConvertRequest video_convert = 21;
VideoStreamFromParticipantRequest video_stream_from_participant = 22;

// Audio
NewAudioStreamRequest new_audio_stream = 23;
Expand All @@ -88,6 +89,7 @@ message FfiRequest {
NewAudioResamplerRequest new_audio_resampler = 26;
RemixAndResampleRequest remix_and_resample = 27;
E2eeRequest e2ee = 28;
AudioStreamFromParticipantRequest audio_stream_from_participant = 29;
}
}

Expand Down Expand Up @@ -120,14 +122,17 @@ message FfiResponse {
NewVideoSourceResponse new_video_source = 19;
CaptureVideoFrameResponse capture_video_frame = 20;
VideoConvertResponse video_convert = 21;
VideoStreamFromParticipantResponse video_stream_from_participant = 22;

// Audio
NewAudioStreamResponse new_audio_stream = 22;
NewAudioSourceResponse new_audio_source = 23;
CaptureAudioFrameResponse capture_audio_frame = 24;
NewAudioResamplerResponse new_audio_resampler = 25;
RemixAndResampleResponse remix_and_resample = 26;
E2eeResponse e2ee = 27;
NewAudioStreamResponse new_audio_stream = 23;
NewAudioSourceResponse new_audio_source = 24;
CaptureAudioFrameResponse capture_audio_frame = 25;
NewAudioResamplerResponse new_audio_resampler = 26;
RemixAndResampleResponse remix_and_resample = 27;
AudioStreamFromParticipantResponse audio_stream_from_participant = 28;

E2eeResponse e2ee = 29;
}
}

Expand Down
12 changes: 12 additions & 0 deletions livekit-ffi/protocol/video_frame.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package livekit.proto;
option csharp_namespace = "LiveKit.Proto";

import "handle.proto";
import "track.proto";

// Create a new VideoStream
// VideoStream is used to receive video frames from a track
Expand All @@ -30,6 +31,17 @@ message NewVideoStreamRequest {
}
message NewVideoStreamResponse { OwnedVideoStream stream = 1; }

// Request a video stream from a participant
message VideoStreamFromParticipantRequest {
uint64 participant_handle = 1;
VideoStreamType type = 2;
TrackSource track_source = 3;
optional VideoBufferType format = 4;
bool normalize_stride = 5;
}

message VideoStreamFromParticipantResponse { OwnedVideoStream stream = 1;}

// Create a new VideoSource
// VideoSource is used to send video frame to a track
message NewVideoSourceRequest {
Expand Down
62 changes: 54 additions & 8 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// @generated
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FrameCryptor {
Expand Down Expand Up @@ -1542,6 +1543,27 @@ pub struct NewVideoStreamResponse {
#[prost(message, optional, tag="1")]
pub stream: ::core::option::Option<OwnedVideoStream>,
}
/// Request a video stream from a participant
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct VideoStreamFromParticipantRequest {
#[prost(uint64, tag="1")]
pub participant_handle: u64,
#[prost(enumeration="VideoStreamType", tag="2")]
pub r#type: i32,
#[prost(enumeration="TrackSource", tag="3")]
pub track_source: i32,
#[prost(enumeration="VideoBufferType", optional, tag="4")]
pub format: ::core::option::Option<i32>,
#[prost(bool, tag="5")]
pub normalize_stride: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct VideoStreamFromParticipantResponse {
#[prost(message, optional, tag="1")]
pub stream: ::core::option::Option<OwnedVideoStream>,
}
/// Create a new VideoSource
/// VideoSource is used to send video frame to a track
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -2857,6 +2879,22 @@ pub struct NewAudioStreamResponse {
#[prost(message, optional, tag="1")]
pub stream: ::core::option::Option<OwnedAudioStream>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AudioStreamFromParticipantRequest {
#[prost(uint64, tag="1")]
pub participant_handle: u64,
#[prost(enumeration="AudioStreamType", tag="2")]
pub r#type: i32,
#[prost(enumeration="TrackSource", optional, tag="3")]
pub track_source: ::core::option::Option<i32>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AudioStreamFromParticipantResponse {
#[prost(message, optional, tag="1")]
pub stream: ::core::option::Option<OwnedAudioStream>,
}
/// Create a new AudioSource
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -3128,7 +3166,7 @@ impl AudioSourceType {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 23, 24, 25, 26, 27, 28")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
Expand Down Expand Up @@ -3179,6 +3217,8 @@ pub mod ffi_request {
CaptureVideoFrame(super::CaptureVideoFrameRequest),
#[prost(message, tag="21")]
VideoConvert(super::VideoConvertRequest),
#[prost(message, tag="22")]
VideoStreamFromParticipant(super::VideoStreamFromParticipantRequest),
/// Audio
#[prost(message, tag="23")]
NewAudioStream(super::NewAudioStreamRequest),
Expand All @@ -3192,13 +3232,15 @@ pub mod ffi_request {
RemixAndResample(super::RemixAndResampleRequest),
#[prost(message, tag="28")]
E2ee(super::E2eeRequest),
#[prost(message, tag="29")]
AudioStreamFromParticipant(super::AudioStreamFromParticipantRequest),
}
}
/// This is the output of livekit_ffi_request function.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiResponse {
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27")]
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29")]
pub message: ::core::option::Option<ffi_response::Message>,
}
/// Nested message and enum types in `FfiResponse`.
Expand Down Expand Up @@ -3249,18 +3291,22 @@ pub mod ffi_response {
CaptureVideoFrame(super::CaptureVideoFrameResponse),
#[prost(message, tag="21")]
VideoConvert(super::VideoConvertResponse),
/// Audio
#[prost(message, tag="22")]
NewAudioStream(super::NewAudioStreamResponse),
VideoStreamFromParticipant(super::VideoStreamFromParticipantResponse),
/// Audio
#[prost(message, tag="23")]
NewAudioSource(super::NewAudioSourceResponse),
NewAudioStream(super::NewAudioStreamResponse),
#[prost(message, tag="24")]
CaptureAudioFrame(super::CaptureAudioFrameResponse),
NewAudioSource(super::NewAudioSourceResponse),
#[prost(message, tag="25")]
NewAudioResampler(super::NewAudioResamplerResponse),
CaptureAudioFrame(super::CaptureAudioFrameResponse),
#[prost(message, tag="26")]
RemixAndResample(super::RemixAndResampleResponse),
NewAudioResampler(super::NewAudioResamplerResponse),
#[prost(message, tag="27")]
RemixAndResample(super::RemixAndResampleResponse),
#[prost(message, tag="28")]
AudioStreamFromParticipant(super::AudioStreamFromParticipantResponse),
#[prost(message, tag="29")]
E2ee(super::E2eeResponse),
}
}
Expand Down
97 changes: 95 additions & 2 deletions livekit-ffi/src/server/audio_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// limitations under the License.

use futures_util::StreamExt;
use livekit::track::Track;
use livekit::webrtc::{audio_stream::native::NativeAudioStream, prelude::*};
use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot};

use super::{room::FfiTrack, FfiHandle};
use crate::server::utils;
use crate::{proto, server, FfiError, FfiHandleId, FfiResult};

pub struct FfiAudioStream {
Expand All @@ -38,7 +40,7 @@ impl FfiAudioStream {
///
/// It is possible that the client receives an AudioFrame after the task is closed. The client
/// musts ignore it.
pub fn setup(
pub fn from_track(
server: &'static server::FfiServer,
new_stream: proto::NewAudioStreamRequest,
) -> FfiResult<proto::OwnedAudioStream> {
Expand Down Expand Up @@ -80,6 +82,97 @@ impl FfiAudioStream {
})
}

pub fn from_participant(
server: &'static server::FfiServer,
request: proto::AudioStreamFromParticipantRequest,
) -> FfiResult<proto::OwnedAudioStream> {
let (close_tx, close_rx) = oneshot::channel();
let handle_id = server.next_id();
let stream_type = request.r#type();
let audio_stream = match stream_type {
#[cfg(not(target_arch = "wasm32"))]
proto::AudioStreamType::AudioStreamNative => {
let audio_stream = Self { handle_id, stream_type, close_tx };

let handle = server.async_runtime.spawn(Self::participant_audio_stream_task(
server, request, handle_id, close_rx,
));
server.watch_panic(handle);
Ok::<FfiAudioStream, FfiError>(audio_stream)
}
_ => return Err(FfiError::InvalidRequest("unsupported audio stream type".into())),
}?;

// Store the new audio stream and return the info
let info = proto::AudioStreamInfo::from(&audio_stream);
server.store_handle(handle_id, audio_stream);

Ok(proto::OwnedAudioStream {
handle: Some(proto::FfiOwnedHandle { id: handle_id }),
info: Some(info),
})
}

async fn participant_audio_stream_task(
server: &'static server::FfiServer,
request: proto::AudioStreamFromParticipantRequest,
stream_handle: FfiHandleId,
mut close_rx: oneshot::Receiver<()>,
) {
let ffi_participant =
utils::ffi_participant_from_handle(server, request.participant_handle);
let ffi_participant = match ffi_participant {
Ok(ffi_participant) => ffi_participant,
Err(err) => {
log::error!("failed to get participant: {}", err);
return;
}
};

let track_source = request.track_source();
let (track_tx, mut track_rx) = mpsc::channel::<Track>(1);
server.async_runtime.spawn(utils::track_changed_trigger(
ffi_participant,
track_source.into(),
track_tx,
));
// track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done
nbsp marked this conversation as resolved.
Show resolved Hide resolved

loop {
let track = track_rx.recv().await;
if let Some(track) = track {
let rtc_track = track.rtc_track();
let MediaStreamTrack::Audio(rtc_track) = rtc_track else {
continue;
};
let (c_tx, c_rx) = oneshot::channel::<()>();
let (done_tx, mut done_rx) = oneshot::channel::<()>();
server.async_runtime.spawn(async move {
Self::native_audio_stream_task(
server,
stream_handle,
NativeAudioStream::new(rtc_track),
c_rx,
)
.await;
let _ = done_tx.send(());
});
tokio::select! {
_ = &mut close_rx => {
let _ = c_tx.send(());
return
}
nbsp marked this conversation as resolved.
Show resolved Hide resolved
_ = &mut done_rx => {
break
}
}
} else {
// When tracks are done (i.e. the participant leaves the room), we are done
break;
}
}
}

async fn native_audio_stream_task(
server: &'static server::FfiServer,
stream_handle_id: FfiHandleId,
Expand Down
1 change: 1 addition & 0 deletions livekit-ffi/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub mod colorcvt;
pub mod logger;
pub mod requests;
pub mod room;
mod utils;
pub mod video_source;
pub mod video_stream;

Expand Down
Loading
Loading