Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update protocol to v15 #394

Merged
merged 15 commits into from
Aug 20, 2024
15 changes: 11 additions & 4 deletions livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub type SignalEvents = mpsc::UnboundedReceiver<SignalEvent>;
pub type SignalResult<T> = Result<T, SignalError>;

pub const JOIN_RESPONSE_TIMEOUT: Duration = Duration::from_secs(5);
pub const PROTOCOL_VERSION: u32 = 9;
pub const PROTOCOL_VERSION: u32 = 15;

#[derive(Error, Debug)]
pub enum SignalError {
Expand Down Expand Up @@ -92,6 +92,7 @@ struct SignalInner {
url: String,
options: SignalOptions,
join_response: proto::JoinResponse,
request_id: AsyncMutex<u32>,
nbsp marked this conversation as resolved.
Show resolved Hide resolved
}

pub struct SignalClient {
Expand Down Expand Up @@ -145,7 +146,7 @@ impl SignalClient {
/// Send a signal to the server (e.g. publish, subscribe, etc.)
/// This will automatically queue the message if the connection fails
/// The queue is flushed on the next restart
pub async fn send(&self, signal: proto::signal_request::Message) {
pub async fn send(&self, signal: proto::signal_request::Message) -> u32 {
self.inner.send(signal).await
}

Expand Down Expand Up @@ -213,6 +214,7 @@ impl SignalInner {
options,
url: url.to_string(),
join_response: join_response.clone(),
request_id: Default::default(),
});

Ok((inner, join_response, events))
Expand Down Expand Up @@ -278,10 +280,13 @@ impl SignalInner {
}

/// Send a signal to the server
pub async fn send(&self, signal: proto::signal_request::Message) {
pub async fn send(&self, signal: proto::signal_request::Message) -> u32 {
let mut id = self.request_id.lock().await;
*id += 1;

if self.reconnecting.load(Ordering::Acquire) {
self.queue_message(signal).await;
return;
return self.request_id.lock().await.clone();
}

self.flush_queue().await; // The queue must be flusehd before sending any new signal
Expand All @@ -291,6 +296,8 @@ impl SignalInner {
self.queue_message(signal).await;
}
}

self.request_id.lock().await.clone()
}

async fn queue_message(&self, signal: proto::signal_request::Message) {
Expand Down
36 changes: 21 additions & 15 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ message TrackPublishOptions {
bool red = 5;
bool simulcast = 6;
TrackSource source = 7;
string stream = 8;
}

enum IceTransportType {
Expand Down Expand Up @@ -318,21 +319,22 @@ message RoomEvent {
ParticipantDisconnected participant_disconnected = 3;
LocalTrackPublished local_track_published = 4;
LocalTrackUnpublished local_track_unpublished = 5;
TrackPublished track_published = 6;
TrackUnpublished track_unpublished = 7;
TrackSubscribed track_subscribed = 8;
TrackUnsubscribed track_unsubscribed = 9;
TrackSubscriptionFailed track_subscription_failed = 10;
TrackMuted track_muted = 11;
TrackUnmuted track_unmuted = 12;
ActiveSpeakersChanged active_speakers_changed = 13;
RoomMetadataChanged room_metadata_changed = 14;
RoomSidChanged room_sid_changed = 15;
ParticipantMetadataChanged participant_metadata_changed = 16;
ParticipantNameChanged participant_name_changed = 17;
ParticipantAttributesChanged participant_attributes_changed = 18;
ConnectionQualityChanged connection_quality_changed = 19;
ConnectionStateChanged connection_state_changed = 20;
LocalTrackSubscribed local_track_subscribed = 6;
TrackPublished track_published = 7;
TrackUnpublished track_unpublished = 8;
TrackSubscribed track_subscribed = 9;
TrackUnsubscribed track_unsubscribed = 10;
TrackSubscriptionFailed track_subscription_failed = 11;
TrackMuted track_muted = 12;
TrackUnmuted track_unmuted = 13;
ActiveSpeakersChanged active_speakers_changed = 14;
RoomMetadataChanged room_metadata_changed = 15;
RoomSidChanged room_sid_changed = 16;
ParticipantMetadataChanged participant_metadata_changed = 17;
ParticipantNameChanged participant_name_changed = 18;
ParticipantAttributesChanged participant_attributes_changed = 19;
ConnectionQualityChanged connection_quality_changed = 20;
ConnectionStateChanged connection_state_changed = 21;
// Connected connected = 21;
Disconnected disconnected = 22;
Reconnecting reconnecting = 23;
Expand Down Expand Up @@ -371,6 +373,10 @@ message LocalTrackUnpublished {
string publication_sid = 1;
}

message LocalTrackSubscribed {
string track_sid = 2;
}

message TrackPublished {
string participant_identity = 1;
OwnedTrackPublication publication = 2;
Expand Down
1 change: 1 addition & 0 deletions livekit-ffi/src/conversion/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ impl From<proto::TrackPublishOptions> for TrackPublishOptions {
dtx: opts.dtx,
red: opts.red,
simulcast: opts.simulcast,
stream: opts.stream,
}
}
}
Expand Down
40 changes: 25 additions & 15 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2240,6 +2240,8 @@ pub struct TrackPublishOptions {
pub simulcast: bool,
#[prost(enumeration="TrackSource", tag="7")]
pub source: i32,
#[prost(string, tag="8")]
pub stream: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -2316,7 +2318,7 @@ pub struct OwnedBuffer {
pub struct RoomEvent {
#[prost(uint64, tag="1")]
pub room_handle: u64,
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 22, 23, 24, 25, 26, 27, 28")]
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28")]
pub message: ::core::option::Option<room_event::Message>,
}
/// Nested message and enum types in `RoomEvent`.
Expand All @@ -2333,34 +2335,36 @@ pub mod room_event {
#[prost(message, tag="5")]
LocalTrackUnpublished(super::LocalTrackUnpublished),
#[prost(message, tag="6")]
TrackPublished(super::TrackPublished),
LocalTrackSubscribed(super::LocalTrackSubscribed),
#[prost(message, tag="7")]
TrackUnpublished(super::TrackUnpublished),
TrackPublished(super::TrackPublished),
#[prost(message, tag="8")]
TrackSubscribed(super::TrackSubscribed),
TrackUnpublished(super::TrackUnpublished),
#[prost(message, tag="9")]
TrackUnsubscribed(super::TrackUnsubscribed),
TrackSubscribed(super::TrackSubscribed),
#[prost(message, tag="10")]
TrackSubscriptionFailed(super::TrackSubscriptionFailed),
TrackUnsubscribed(super::TrackUnsubscribed),
#[prost(message, tag="11")]
TrackMuted(super::TrackMuted),
TrackSubscriptionFailed(super::TrackSubscriptionFailed),
#[prost(message, tag="12")]
TrackUnmuted(super::TrackUnmuted),
TrackMuted(super::TrackMuted),
#[prost(message, tag="13")]
ActiveSpeakersChanged(super::ActiveSpeakersChanged),
TrackUnmuted(super::TrackUnmuted),
#[prost(message, tag="14")]
RoomMetadataChanged(super::RoomMetadataChanged),
ActiveSpeakersChanged(super::ActiveSpeakersChanged),
#[prost(message, tag="15")]
RoomSidChanged(super::RoomSidChanged),
RoomMetadataChanged(super::RoomMetadataChanged),
#[prost(message, tag="16")]
ParticipantMetadataChanged(super::ParticipantMetadataChanged),
RoomSidChanged(super::RoomSidChanged),
#[prost(message, tag="17")]
ParticipantNameChanged(super::ParticipantNameChanged),
ParticipantMetadataChanged(super::ParticipantMetadataChanged),
#[prost(message, tag="18")]
ParticipantAttributesChanged(super::ParticipantAttributesChanged),
ParticipantNameChanged(super::ParticipantNameChanged),
#[prost(message, tag="19")]
ConnectionQualityChanged(super::ConnectionQualityChanged),
ParticipantAttributesChanged(super::ParticipantAttributesChanged),
#[prost(message, tag="20")]
ConnectionQualityChanged(super::ConnectionQualityChanged),
#[prost(message, tag="21")]
ConnectionStateChanged(super::ConnectionStateChanged),
/// Connected connected = 21;
#[prost(message, tag="22")]
Expand Down Expand Up @@ -2426,6 +2430,12 @@ pub struct LocalTrackUnpublished {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LocalTrackSubscribed {
#[prost(string, tag="2")]
pub track_sid: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TrackPublished {
#[prost(string, tag="1")]
pub participant_identity: ::prost::alloc::string::String,
Expand Down
5 changes: 5 additions & 0 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,11 @@ async fn forward_event(

inner.pending_unpublished_tracks.lock().insert(publication.sid());
}
RoomEvent::LocalTrackSubscribed { track } => {
let _ = send_event(proto::room_event::Message::LocalTrackSubscribed(
proto::LocalTrackSubscribed { track_sid: track.sid().to_string() },
));
}
RoomEvent::TrackPublished { publication, participant } => {
let handle_id = server.next_id();
let ffi_publication = FfiPublication {
Expand Down
87 changes: 77 additions & 10 deletions livekit-protocol/src/livekit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ pub struct ParticipantInfo {
pub kind: i32,
#[prost(map="string, string", tag="15")]
pub attributes: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
#[prost(enumeration="DisconnectReason", tag="16")]
pub disconnect_reason: i32,
}
/// Nested message and enum types in `ParticipantInfo`.
pub mod participant_info {
Expand Down Expand Up @@ -738,6 +740,67 @@ pub struct RtpStats {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RtpForwarderState {
#[prost(bool, tag="1")]
pub started: bool,
#[prost(int32, tag="2")]
pub reference_layer_spatial: i32,
#[prost(int64, tag="3")]
pub pre_start_time: i64,
#[prost(uint64, tag="4")]
pub ext_first_timestamp: u64,
#[prost(uint64, tag="5")]
pub dummy_start_timestamp_offset: u64,
#[prost(message, optional, tag="6")]
pub rtp_munger: ::core::option::Option<RtpMungerState>,
#[prost(oneof="rtp_forwarder_state::CodecMunger", tags="7")]
pub codec_munger: ::core::option::Option<rtp_forwarder_state::CodecMunger>,
}
/// Nested message and enum types in `RTPForwarderState`.
pub mod rtp_forwarder_state {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum CodecMunger {
#[prost(message, tag="7")]
Vp8Munger(super::Vp8MungerState),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RtpMungerState {
#[prost(uint64, tag="1")]
pub ext_last_sequence_number: u64,
#[prost(uint64, tag="2")]
pub ext_second_last_sequence_number: u64,
#[prost(uint64, tag="3")]
pub ext_last_timestamp: u64,
#[prost(uint64, tag="4")]
pub ext_second_last_timestamp: u64,
#[prost(bool, tag="5")]
pub last_marker: bool,
#[prost(bool, tag="6")]
pub second_last_marker: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Vp8MungerState {
#[prost(int32, tag="1")]
pub ext_last_picture_id: i32,
#[prost(bool, tag="2")]
pub picture_id_used: bool,
#[prost(uint32, tag="3")]
pub last_tl0_pic_idx: u32,
#[prost(bool, tag="4")]
pub tl0_pic_idx_used: bool,
#[prost(bool, tag="5")]
pub tid_used: bool,
#[prost(uint32, tag="6")]
pub last_key_idx: u32,
#[prost(bool, tag="7")]
pub key_idx_used: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TimedVersion {
#[prost(int64, tag="1")]
pub unix_micro: i64,
Expand Down Expand Up @@ -2274,9 +2337,9 @@ pub mod signal_response {
/// Subscription response, client should not expect any media from this subscription if it fails
#[prost(message, tag="21")]
SubscriptionResponse(super::SubscriptionResponse),
/// Errors relating to user inititated requests that carry a `request_id`
/// Response relating to user inititated requests that carry a `request_id`
#[prost(message, tag="22")]
ErrorResponse(super::ErrorResponse),
RequestResponse(super::RequestResponse),
/// notify to the publisher when a published track has been subscribed for the first time
#[prost(message, tag="23")]
TrackSubscribed(super::TrackSubscribed),
Expand Down Expand Up @@ -2778,20 +2841,20 @@ pub struct SubscriptionResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ErrorResponse {
pub struct RequestResponse {
#[prost(uint32, tag="1")]
pub request_id: u32,
#[prost(enumeration="error_response::Reason", tag="2")]
#[prost(enumeration="request_response::Reason", tag="2")]
pub reason: i32,
#[prost(string, tag="3")]
pub message: ::prost::alloc::string::String,
}
/// Nested message and enum types in `ErrorResponse`.
pub mod error_response {
/// Nested message and enum types in `RequestResponse`.
pub mod request_response {
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum Reason {
Unknown = 0,
Ok = 0,
NotFound = 1,
NotAllowed = 2,
LimitExceeded = 3,
Expand All @@ -2803,7 +2866,7 @@ pub mod error_response {
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Reason::Unknown => "UNKNOWN",
Reason::Ok => "OK",
Reason::NotFound => "NOT_FOUND",
Reason::NotAllowed => "NOT_ALLOWED",
Reason::LimitExceeded => "LIMIT_EXCEEDED",
Expand All @@ -2812,7 +2875,7 @@ pub mod error_response {
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"UNKNOWN" => Some(Self::Unknown),
"OK" => Some(Self::Ok),
"NOT_FOUND" => Some(Self::NotFound),
"NOT_ALLOWED" => Some(Self::NotAllowed),
"LIMIT_EXCEEDED" => Some(Self::LimitExceeded),
Expand Down Expand Up @@ -2944,6 +3007,8 @@ pub struct JobState {
pub ended_at: i64,
#[prost(int64, tag="5")]
pub updated_at: i64,
#[prost(string, tag="6")]
pub participant_identity: ::prost::alloc::string::String,
}
/// from Worker to Server
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -3234,10 +3299,12 @@ pub struct RoomAgentDispatch {
pub struct DeleteAgentDispatchRequest {
#[prost(string, tag="1")]
pub dispatch_id: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub room: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListAgentDispatchRequesst {
pub struct ListAgentDispatchRequest {
/// if set, only the dispatch whose id is given will be returned
#[prost(string, tag="1")]
pub dispatch_id: ::prost::alloc::string::String,
Expand Down
Loading
Loading