Skip to content

Commit

Permalink
fix(client-common): handle video/audio stream seq more intelligently
Browse files Browse the repository at this point in the history
  • Loading branch information
colinmarc committed Oct 21, 2024
1 parent 632bcb1 commit 4bab390
Showing 1 changed file with 32 additions and 32 deletions.
64 changes: 32 additions & 32 deletions mm-client-common/src/attachment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ impl Attachment {
attached_msg: attached,

video_packet_ring: PacketRing::new(),
video_stream_seq: 0,
prev_video_stream_seq: 0,
video_stream_seq: None,
prev_video_stream_seq: None,
video_stream_seq_offset,

audio_packet_ring: PacketRing::new(),
audio_stream_seq: 0,
prev_audio_stream_seq: 0,
audio_stream_seq: None,
prev_audio_stream_seq: None,
audio_stream_seq_offset: 0,

notify_detached: Some(detached_tx),
Expand Down Expand Up @@ -342,13 +342,13 @@ pub(crate) struct AttachmentState {
reattach_required: bool,

video_packet_ring: PacketRing,
video_stream_seq: u64,
prev_video_stream_seq: u64,
video_stream_seq: Option<u64>,
prev_video_stream_seq: Option<u64>,
video_stream_seq_offset: u64,

audio_packet_ring: PacketRing,
audio_stream_seq: u64,
prev_audio_stream_seq: u64,
audio_stream_seq: Option<u64>,
prev_audio_stream_seq: Option<u64>,
audio_stream_seq_offset: u64,

// A future representing the end of the attachment.
Expand All @@ -367,15 +367,15 @@ impl AttachmentState {
protocol::MessageType::VideoChunk(chunk) => {
// We always send packets for two streams - the current one and
// (if there is one) the previous one.
if chunk.stream_seq > self.video_stream_seq {
if self.video_stream_seq.is_none_or(|s| s < chunk.stream_seq) {
// A new stream started.
self.prev_video_stream_seq = self.video_stream_seq;
self.video_stream_seq = chunk.stream_seq;
self.video_stream_seq = Some(chunk.stream_seq);

let res = self.attached_msg.streaming_resolution.unwrap_or_default();

self.delegate.video_stream_start(
self.video_stream_seq + self.video_stream_seq_offset,
chunk.stream_seq + self.video_stream_seq_offset,
VideoStreamParams {
width: res.width,
height: res.height,
Expand All @@ -385,26 +385,26 @@ impl AttachmentState {
);

// Discard any older packets.
self.video_packet_ring
.discard(self.prev_video_stream_seq.saturating_sub(1));
if let Some(prev) = self.prev_video_stream_seq {
self.video_packet_ring.discard(prev.saturating_sub(1));
}
}

if let Err(err) = self.video_packet_ring.recv_chunk(chunk) {
error!("error in packet ring: {:#}", err);
}

for mut packet in self
.video_packet_ring
.drain_completed(self.prev_video_stream_seq)
{
packet.stream_seq += self.video_stream_seq_offset;
self.delegate.video_packet(Arc::new(packet));
if let Some(prev) = self.prev_video_stream_seq {
for mut packet in self.video_packet_ring.drain_completed(prev) {
packet.stream_seq += self.video_stream_seq_offset;
self.delegate.video_packet(Arc::new(packet));
}
}

if self.video_stream_seq != self.prev_video_stream_seq {
for mut packet in self
.video_packet_ring
.drain_completed(self.video_stream_seq)
.drain_completed(self.video_stream_seq.unwrap())
{
packet.stream_seq += self.video_stream_seq_offset;
self.delegate.video_packet(Arc::new(packet));
Expand All @@ -414,10 +414,10 @@ impl AttachmentState {
protocol::MessageType::AudioChunk(chunk) => {
// We always send packets for two streams - the current one and
// (if there is one) the previous one.
if chunk.stream_seq > self.audio_stream_seq {
if self.audio_stream_seq.is_none_or(|s| s < chunk.stream_seq) {
// A new stream started.
self.prev_audio_stream_seq = self.audio_stream_seq;
self.audio_stream_seq = chunk.stream_seq;
self.audio_stream_seq = Some(chunk.stream_seq);

let channels = self
.attached_msg
Expand All @@ -427,7 +427,7 @@ impl AttachmentState {
.unwrap_or_default();

self.delegate.audio_stream_start(
self.audio_stream_seq + self.audio_stream_seq_offset,
chunk.stream_seq + self.audio_stream_seq_offset,
AudioStreamParams {
codec: self.attached_msg.audio_codec(),
sample_rate: self.attached_msg.sample_rate_hz,
Expand All @@ -436,26 +436,26 @@ impl AttachmentState {
);

// Discard any older packets.
self.audio_packet_ring
.discard(self.prev_audio_stream_seq.saturating_sub(1));
if let Some(prev) = self.prev_audio_stream_seq {
self.audio_packet_ring.discard(prev.saturating_sub(1));
}
}

if let Err(err) = self.audio_packet_ring.recv_chunk(chunk) {
error!("error in packet ring: {:#}", err);
}

for mut packet in self
.audio_packet_ring
.drain_completed(self.prev_audio_stream_seq)
{
packet.stream_seq += self.audio_stream_seq_offset;
self.delegate.audio_packet(Arc::new(packet));
if let Some(prev) = self.prev_audio_stream_seq {
for mut packet in self.audio_packet_ring.drain_completed(prev) {
packet.stream_seq += self.audio_stream_seq_offset;
self.delegate.audio_packet(Arc::new(packet));
}
}

if self.audio_stream_seq != self.prev_audio_stream_seq {
for mut packet in self
.audio_packet_ring
.drain_completed(self.audio_stream_seq)
.drain_completed(self.audio_stream_seq.unwrap())
{
packet.stream_seq += self.audio_stream_seq_offset;
self.delegate.audio_packet(Arc::new(packet));
Expand Down

0 comments on commit 4bab390

Please sign in to comment.