Skip to content

Commit

Permalink
subscription: add telemetry
Browse files Browse the repository at this point in the history
Resolves #141.
  • Loading branch information
daniel-abramov committed Mar 16, 2023
1 parent 9c8b062 commit 3b621dd
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pkg/conference/participant/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (t *Tracker) AddPublishedTrack(
remoteTrack,
metadata,
participant.Logger,
participant.Telemetry,
participant.Telemetry.ChildBuilder(),
)
if err != nil {
return err
Expand Down
14 changes: 13 additions & 1 deletion pkg/conference/subscription/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"time"

"github.com/matrix-org/waterfall/pkg/conference/subscription/rewriter"
"github.com/matrix-org/waterfall/pkg/telemetry"
"github.com/matrix-org/waterfall/pkg/webrtc_ext"
"github.com/matrix-org/waterfall/pkg/worker"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
)

type RequestKeyFrameFn = func(simulcast webrtc_ext.SimulcastLayer) error
Expand All @@ -27,7 +29,9 @@ type VideoSubscription struct {
controller SubscriptionController
requestKeyFrameFn RequestKeyFrameFn
worker *worker.Worker[rtp.Packet]
logger *logrus.Entry

logger *logrus.Entry
telemetry *telemetry.Telemetry
}

func NewVideoSubscription(
Expand All @@ -36,6 +40,7 @@ func NewVideoSubscription(
controller SubscriptionController,
requestKeyFrameFn RequestKeyFrameFn,
logger *logrus.Entry,
telemetryBuilder *telemetry.ChildBuilder,
) (*VideoSubscription, error) {
// Create a new track.
rtpTrack, err := webrtc.NewTrackLocalStaticRTP(info.Codec, info.TrackID, info.StreamID)
Expand All @@ -61,6 +66,7 @@ func NewVideoSubscription(
requestKeyFrameFn,
nil,
logger,
telemetryBuilder.Create("VideoSubscription"),
}

// Create a worker state.
Expand All @@ -78,6 +84,9 @@ func NewVideoSubscription(
// Or if something is wrong with the subscription (i.e. this quality is not being sent anymore).
layer := webrtc_ext.SimulcastLayer(subscription.currentLayer.Load())
logger.Infof("No RTP on subscription to %s (%s) for 10 seconds", subscription.info.TrackID, layer)

// This is susceptible to false-positives for muted videos!
subscription.telemetry.Fail(fmt.Errorf("No incoming RTP packets for 10 seconds"))
},
OnTask: workerState.handlePacket,
}
Expand All @@ -97,6 +106,7 @@ func NewVideoSubscription(
func (s *VideoSubscription) Unsubscribe() error {
s.worker.Stop()
s.logger.Infof("Unsubscribing from %s (%s)", s.info.TrackID, webrtc_ext.SimulcastLayer(s.currentLayer.Load()))
s.telemetry.End()
return s.controller.RemoveTrack(s.rtpSender)
}

Expand All @@ -107,6 +117,7 @@ func (s *VideoSubscription) WriteRTP(packet rtp.Packet) error {

func (s *VideoSubscription) SwitchLayer(simulcast webrtc_ext.SimulcastLayer) {
s.logger.Infof("Switching layer on %s to %s", s.info.TrackID, simulcast)
s.telemetry.AddEvent("switching simulcast layer", attribute.String("layer", simulcast.String()))
s.currentLayer.Store(int32(simulcast))
s.requestKeyFrameFn(simulcast)
}
Expand All @@ -127,6 +138,7 @@ func (s *VideoSubscription) readRTCP() {
if errors.Is(err, io.ErrClosedPipe) || errors.Is(err, io.EOF) {
layer := webrtc_ext.SimulcastLayer(s.currentLayer.Load())
s.logger.Debugf("failed to read RTCP on track: %s (%s): %s", s.info.TrackID, layer, err)
s.telemetry.Fail(err)
s.worker.Stop()
return
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/conference/track/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ func (p *PublishedTrack[SubscriberID]) addVideoPublisher(track *webrtc.TrackRemo
simulcast := webrtc_ext.RIDToSimulcastLayer(track.RID())
p.video.publishers[simulcast] = pub

p.telemetry.AddEvent("video publisher added", attribute.String("simulcast", simulcast.String()))
defer p.telemetry.AddEvent("video publisher started", attribute.String("simulcast", simulcast.String()))

// Listen on `done` and remove the track once it's done.
p.activePublishers.Add(1)
go func() {
defer p.activePublishers.Done()
defer p.telemetry.AddEvent("video publisher stopped", attribute.String("simulcast", simulcast.String()))
<-done

p.mutex.Lock()
Expand Down
28 changes: 12 additions & 16 deletions pkg/conference/track/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func NewPublishedTrack[SubscriberID SubscriberIdentifier](
track *webrtc.TrackRemote,
metadata TrackMetadata,
logger *logrus.Entry,
parentTelemetry *telemetry.Telemetry,
telemetryBuilder *telemetry.ChildBuilder,
) (*PublishedTrack[SubscriberID], error) {
telemetry := parentTelemetry.CreateChild(
telemetry := telemetryBuilder.Create(
"PublishedTrack",
attribute.String("track_id", track.ID()),
attribute.String("type", track.Kind().String()),
Expand Down Expand Up @@ -158,6 +158,7 @@ func (p *PublishedTrack[SubscriberID]) AddPublisher(track *webrtc.TrackRemote) e
// the negotiation when the SSRC changes and Pion fires a new track for the track that has already
// been published.
if pub := p.video.publishers[simulcast]; pub != nil {
p.telemetry.AddEvent("replacing publisher", attribute.String("simulcast", simulcast.String()))
pub.ReplaceTrack(&publisher.RemoteTrack{track})
return nil
}
Expand All @@ -172,6 +173,7 @@ func (p *PublishedTrack[SubscriberID]) AddPublisher(track *webrtc.TrackRemote) e
func (p *PublishedTrack[SubscriberID]) Stop() {
// Command all publishers to stop, unless already stopped.
if !p.isClosed() {
p.telemetry.AddEvent("stopping")
close(p.stopPublishers)
}
}
Expand Down Expand Up @@ -207,12 +209,6 @@ func (p *PublishedTrack[SubscriberID]) Subscribe(

// If we do, let's switch the layer.
if currentLayer != layer {
defer p.telemetry.AddEvent(
"switched layer",
attribute.String("id", subscriberID.String()),
attribute.String("layer", layer.String()),
)

p.video.publishers[currentLayer].RemoveSubscription(sub)
sub.SwitchLayer(layer)
p.video.publishers[layer].AddSubscription(sub)
Expand All @@ -233,7 +229,14 @@ func (p *PublishedTrack[SubscriberID]) Subscribe(
handler := func(simulcast webrtc_ext.SimulcastLayer) error {
return p.video.keyframeHandler.Send(simulcast)
}
sub, err = subscription.NewVideoSubscription(p.info, layer, controller, handler, logger)
sub, err = subscription.NewVideoSubscription(
p.info,
layer,
controller,
handler,
logger,
p.telemetry.ChildBuilder(attribute.String("id", subscriberID.String())),
)
case webrtc.RTPCodecTypeAudio:
sub, err = subscription.NewAudioSubscription(p.audio.outputTrack, controller)
}
Expand All @@ -253,11 +256,6 @@ func (p *PublishedTrack[SubscriberID]) Subscribe(
}

p.logger.Info("New subscriber:", subscriberID)
p.telemetry.AddEvent(
"new subscriber",
attribute.String("id", subscriberID.String()),
attribute.String("layer", layer.String()),
)

return nil
}
Expand All @@ -275,8 +273,6 @@ func (p *PublishedTrack[SubscriberID]) Unsubscribe(subscriberID SubscriberID) {
p.video.publishers[sub.Simulcast()].RemoveSubscription(sub)
}
}

p.telemetry.AddEvent("unsubscribed", attribute.String("id", subscriberID.String()))
}

func (p *PublishedTrack[SubscriberID]) Owner() SubscriberID {
Expand Down
16 changes: 16 additions & 0 deletions pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,19 @@ func (t *Telemetry) Fail(err error) {
func (t *Telemetry) End() {
t.span.End()
}

type ChildBuilder struct {
parent *Telemetry
attributes []attribute.KeyValue
}

func (t *Telemetry) ChildBuilder(attributes ...attribute.KeyValue) *ChildBuilder {
return &ChildBuilder{
parent: t,
attributes: attributes,
}
}

func (cb *ChildBuilder) Create(name string, attributes ...attribute.KeyValue) *Telemetry {
return cb.parent.CreateChild(name, append(cb.attributes, attributes...)...)
}

0 comments on commit 3b621dd

Please sign in to comment.