From d2c5ea59a1837b22a81f057fb981c00fd8fd32d0 Mon Sep 17 00:00:00 2001 From: Xavier Drudis Date: Thu, 2 Jan 2025 08:13:44 -0800 Subject: [PATCH 1/7] Revert "IVF writer fix invalid timestamp into headers" This reverts commit 4f40756d9c2500efc24244a518f134dab580cbb0. --- pkg/media/ivfwriter/ivfwriter.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/media/ivfwriter/ivfwriter.go b/pkg/media/ivfwriter/ivfwriter.go index 6af42ce5414..49911ebba9a 100644 --- a/pkg/media/ivfwriter/ivfwriter.go +++ b/pkg/media/ivfwriter/ivfwriter.go @@ -109,10 +109,10 @@ func (i *IVFWriter) writeHeader() error { return err } -func (i *IVFWriter) writeFrame(frame []byte, timestamp uint64) error { +func (i *IVFWriter) writeFrame(frame []byte) error { frameHeader := make([]byte, 12) binary.LittleEndian.PutUint32(frameHeader[0:], uint32(len(frame))) // Frame length - binary.LittleEndian.PutUint64(frameHeader[4:], timestamp) // PTS + binary.LittleEndian.PutUint64(frameHeader[4:], i.count) // PTS i.count++ if _, err := i.ioWriter.Write(frameHeader); err != nil { @@ -153,7 +153,7 @@ func (i *IVFWriter) WriteRTP(packet *rtp.Packet) error { return nil } - if err := i.writeFrame(i.currentFrame, uint64(packet.Header.Timestamp)); err != nil { + if err := i.writeFrame(i.currentFrame); err != nil { return err } i.currentFrame = nil @@ -169,7 +169,7 @@ func (i *IVFWriter) WriteRTP(packet *rtp.Packet) error { } for j := range obus { - if err := i.writeFrame(obus[j], uint64(packet.Header.Timestamp)); err != nil { + if err := i.writeFrame(obus[j]); err != nil { return err } } From 5f74a5a1bddbff59cf069e3eefde4ee23ea128eb Mon Sep 17 00:00:00 2001 From: Xavier Drudis Date: Thu, 2 Jan 2025 08:13:47 -0800 Subject: [PATCH 2/7] Revert "Adapt an existing test to provide coverage" This reverts commit dbe26d34d8a53e9c3c50c40c513c77217f7465e2. --- pkg/media/ivfwriter/ivfwriter_test.go | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/pkg/media/ivfwriter/ivfwriter_test.go b/pkg/media/ivfwriter/ivfwriter_test.go index 1d7e5b4eaf8..8939543636c 100644 --- a/pkg/media/ivfwriter/ivfwriter_test.go +++ b/pkg/media/ivfwriter/ivfwriter_test.go @@ -5,7 +5,6 @@ package ivfwriter import ( "bytes" - "encoding/binary" "io" "testing" @@ -258,25 +257,19 @@ func TestIVFWriter_AV1(t *testing.T) { t.Run("Unfragmented", func(t *testing.T) { buffer := &bytes.Buffer{} - expectedTimestamp := uint32(3653407706) + writer, err := NewWith(buffer, WithCodec(mimeTypeAV1)) + assert.NoError(t, err) - // the timestamp is an uint32, 4 bytes from offset 36 - expectedPayloadWithTimestamp := []byte{ + assert.NoError(t, writer.WriteRTP(&rtp.Packet{Payload: []byte{0x00, 0x01, 0xFF}})) + assert.NoError(t, writer.Close()) + assert.Equal(t, buffer.Bytes(), []byte{ 0x44, 0x4b, 0x49, 0x46, 0x0, 0x0, 0x20, 0x0, 0x41, 0x56, 0x30, 0x31, 0x80, 0x2, 0xe0, 0x1, 0x1e, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x84, 0x3, 0x0, 0x0, 0x0, 0x0, - 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0xda, 0x93, 0xc2, - 0xd9, 0x0, 0x0, 0x0, 0x0, 0xff, - } - - writer, err := NewWith(buffer, WithCodec(mimeTypeAV1)) - assert.NoError(t, err) - - assert.NoError(t, writer.WriteRTP(&rtp.Packet{Header: rtp.Header{Timestamp: expectedTimestamp}, Payload: []byte{0x00, 0x01, 0xFF}})) - assert.NoError(t, writer.Close()) - assert.Equal(t, expectedPayloadWithTimestamp, buffer.Bytes()) - assert.Equal(t, expectedTimestamp, binary.LittleEndian.Uint32(expectedPayloadWithTimestamp[36:40])) + 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, + }) }) t.Run("Fragmented", func(t *testing.T) { From ff744a26c1b383078678caafedb2c11aa012875e Mon Sep 17 00:00:00 2001 From: Xavier Drudis Date: Thu, 2 Jan 2025 10:33:42 -0800 Subject: [PATCH 3/7] Fix IVF timestamps --- pkg/media/ivfreader/ivfreader.go | 9 ++++++- pkg/media/ivfwriter/ivfwriter.go | 41 ++++++++++++++++++++++---------- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/pkg/media/ivfreader/ivfreader.go b/pkg/media/ivfreader/ivfreader.go index 29bd7b34b70..1006fef902c 100644 --- a/pkg/media/ivfreader/ivfreader.go +++ b/pkg/media/ivfreader/ivfreader.go @@ -52,6 +52,7 @@ type IVFFrameHeader struct { type IVFReader struct { stream io.Reader bytesReadSuccesfully int64 + fileHeader *IVFFileHeader } // NewWith returns a new IVF reader and IVF file header @@ -69,6 +70,7 @@ func NewWith(in io.Reader) (*IVFReader, *IVFFileHeader, error) { if err != nil { return nil, nil, err } + reader.fileHeader = header return reader, header, nil } @@ -80,6 +82,10 @@ func (i *IVFReader) ResetReader(reset func(bytesRead int64) io.Reader) { i.stream = reset(i.bytesReadSuccesfully) } +func (i *IVFReader) ptsToTimestamp(pts uint64) uint64 { + return pts * uint64(i.fileHeader.TimebaseDenominator) / uint64(i.fileHeader.TimebaseNumerator) +} + // ParseNextFrame reads from stream and returns IVF frame payload, header, // and an error if there is incomplete frame data. // Returns all nil values when no more frames are available. @@ -95,9 +101,10 @@ func (i *IVFReader) ParseNextFrame() ([]byte, *IVFFrameHeader, error) { return nil, nil, err } + pts := binary.LittleEndian.Uint64(buffer[4:12]) header = &IVFFrameHeader{ FrameSize: binary.LittleEndian.Uint32(buffer[:4]), - Timestamp: binary.LittleEndian.Uint64(buffer[4:12]), + Timestamp: i.ptsToTimestamp(pts), } payload := make([]byte, header.FrameSize) diff --git a/pkg/media/ivfwriter/ivfwriter.go b/pkg/media/ivfwriter/ivfwriter.go index 49911ebba9a..4f482b99dc8 100644 --- a/pkg/media/ivfwriter/ivfwriter.go +++ b/pkg/media/ivfwriter/ivfwriter.go @@ -37,6 +37,11 @@ type IVFWriter struct { isVP8, isAV1 bool + timebaseDenominator uint32 + timebaseNumerator uint32 + firstFrameTimestamp uint32 + clockRate uint64 + // VP8 currentFrame []byte @@ -65,8 +70,11 @@ func NewWith(out io.Writer, opts ...Option) (*IVFWriter, error) { } writer := &IVFWriter{ - ioWriter: out, - seenKeyFrame: false, + ioWriter: out, + seenKeyFrame: false, + timebaseDenominator: 30, + timebaseNumerator: 1, + clockRate: 90000, } for _, o := range opts { @@ -98,21 +106,25 @@ func (i *IVFWriter) writeHeader() error { copy(header[8:], "AV01") } - binary.LittleEndian.PutUint16(header[12:], 640) // Width in pixels - binary.LittleEndian.PutUint16(header[14:], 480) // Height in pixels - binary.LittleEndian.PutUint32(header[16:], 30) // Framerate denominator - binary.LittleEndian.PutUint32(header[20:], 1) // Framerate numerator - binary.LittleEndian.PutUint32(header[24:], 900) // Frame count, will be updated on first Close() call - binary.LittleEndian.PutUint32(header[28:], 0) // Unused + binary.LittleEndian.PutUint16(header[12:], 640) // Width in pixels + binary.LittleEndian.PutUint16(header[14:], 480) // Height in pixels + binary.LittleEndian.PutUint32(header[16:], i.timebaseDenominator) // Framerate denominator + binary.LittleEndian.PutUint32(header[20:], i.timebaseNumerator) // Framerate numerator + binary.LittleEndian.PutUint32(header[24:], 900) // Frame count, will be updated on first Close() call + binary.LittleEndian.PutUint32(header[28:], 0) // Unused _, err := i.ioWriter.Write(header) return err } -func (i *IVFWriter) writeFrame(frame []byte) error { +func (i *IVFWriter) timestampToPts(timestamp uint64) uint64 { + return timestamp * uint64(i.timebaseNumerator) / uint64(i.timebaseDenominator) +} + +func (i *IVFWriter) writeFrame(frame []byte, timestamp uint64) error { frameHeader := make([]byte, 12) binary.LittleEndian.PutUint32(frameHeader[0:], uint32(len(frame))) // Frame length - binary.LittleEndian.PutUint64(frameHeader[4:], i.count) // PTS + binary.LittleEndian.PutUint64(frameHeader[4:], i.timestampToPts(timestamp)) i.count++ if _, err := i.ioWriter.Write(frameHeader); err != nil { @@ -130,6 +142,11 @@ func (i *IVFWriter) WriteRTP(packet *rtp.Packet) error { return nil } + if i.count == 0 { + i.firstFrameTimestamp = packet.Header.Timestamp + } + relativeTstampMs := 1000 * uint64(packet.Header.Timestamp) / i.clockRate + if i.isVP8 { vp8Packet := codecs.VP8Packet{} if _, err := vp8Packet.Unmarshal(packet.Payload); err != nil { @@ -153,7 +170,7 @@ func (i *IVFWriter) WriteRTP(packet *rtp.Packet) error { return nil } - if err := i.writeFrame(i.currentFrame); err != nil { + if err := i.writeFrame(i.currentFrame, relativeTstampMs); err != nil { return err } i.currentFrame = nil @@ -169,7 +186,7 @@ func (i *IVFWriter) WriteRTP(packet *rtp.Packet) error { } for j := range obus { - if err := i.writeFrame(obus[j]); err != nil { + if err := i.writeFrame(obus[j], relativeTstampMs); err != nil { return err } } From 264e0d8822a55ccaa0e02c180d06f8a51b32b989 Mon Sep 17 00:00:00 2001 From: Xavier Drudis Date: Thu, 2 Jan 2025 10:37:47 -0800 Subject: [PATCH 4/7] Nits --- pkg/media/ivfreader/ivfreader.go | 8 +++++--- pkg/media/ivfwriter/ivfwriter.go | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/media/ivfreader/ivfreader.go b/pkg/media/ivfreader/ivfreader.go index 1006fef902c..04a5df1bfe0 100644 --- a/pkg/media/ivfreader/ivfreader.go +++ b/pkg/media/ivfreader/ivfreader.go @@ -52,7 +52,8 @@ type IVFFrameHeader struct { type IVFReader struct { stream io.Reader bytesReadSuccesfully int64 - fileHeader *IVFFileHeader + timebaseDenominator uint32 + timebaseNumerator uint32 } // NewWith returns a new IVF reader and IVF file header @@ -70,7 +71,8 @@ func NewWith(in io.Reader) (*IVFReader, *IVFFileHeader, error) { if err != nil { return nil, nil, err } - reader.fileHeader = header + reader.timebaseDenominator = header.TimebaseDenominator + reader.timebaseNumerator = header.TimebaseNumerator return reader, header, nil } @@ -83,7 +85,7 @@ func (i *IVFReader) ResetReader(reset func(bytesRead int64) io.Reader) { } func (i *IVFReader) ptsToTimestamp(pts uint64) uint64 { - return pts * uint64(i.fileHeader.TimebaseDenominator) / uint64(i.fileHeader.TimebaseNumerator) + return pts * uint64(i.timebaseDenominator) / uint64(i.timebaseNumerator) } // ParseNextFrame reads from stream and returns IVF frame payload, header, diff --git a/pkg/media/ivfwriter/ivfwriter.go b/pkg/media/ivfwriter/ivfwriter.go index 4f482b99dc8..bf1a357d333 100644 --- a/pkg/media/ivfwriter/ivfwriter.go +++ b/pkg/media/ivfwriter/ivfwriter.go @@ -123,8 +123,8 @@ func (i *IVFWriter) timestampToPts(timestamp uint64) uint64 { func (i *IVFWriter) writeFrame(frame []byte, timestamp uint64) error { frameHeader := make([]byte, 12) - binary.LittleEndian.PutUint32(frameHeader[0:], uint32(len(frame))) // Frame length - binary.LittleEndian.PutUint64(frameHeader[4:], i.timestampToPts(timestamp)) + binary.LittleEndian.PutUint32(frameHeader[0:], uint32(len(frame))) // Frame length + binary.LittleEndian.PutUint64(frameHeader[4:], i.timestampToPts(timestamp)) // PTS i.count++ if _, err := i.ioWriter.Write(frameHeader); err != nil { From b7b6089e847cbc3671860d00dd1975659551c424 Mon Sep 17 00:00:00 2001 From: Xavier Drudis Date: Thu, 2 Jan 2025 10:51:58 -0800 Subject: [PATCH 5/7] Restore relative timestamp --- pkg/media/ivfwriter/ivfwriter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/media/ivfwriter/ivfwriter.go b/pkg/media/ivfwriter/ivfwriter.go index bf1a357d333..ac13feaffbe 100644 --- a/pkg/media/ivfwriter/ivfwriter.go +++ b/pkg/media/ivfwriter/ivfwriter.go @@ -145,7 +145,7 @@ func (i *IVFWriter) WriteRTP(packet *rtp.Packet) error { if i.count == 0 { i.firstFrameTimestamp = packet.Header.Timestamp } - relativeTstampMs := 1000 * uint64(packet.Header.Timestamp) / i.clockRate + relativeTstampMs := 1000 * uint64(packet.Header.Timestamp-i.firstFrameTimestamp) / i.clockRate if i.isVP8 { vp8Packet := codecs.VP8Packet{} From cb28fe30a2112767ce0245f1b34ebced62c9fdb6 Mon Sep 17 00:00:00 2001 From: Xavier Drudis Date: Thu, 2 Jan 2025 11:13:04 -0800 Subject: [PATCH 6/7] Defense against timebase with denominator 0 --- pkg/media/ivfwriter/ivfwriter.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/media/ivfwriter/ivfwriter.go b/pkg/media/ivfwriter/ivfwriter.go index ac13feaffbe..156d5b54f00 100644 --- a/pkg/media/ivfwriter/ivfwriter.go +++ b/pkg/media/ivfwriter/ivfwriter.go @@ -29,6 +29,8 @@ const ( ivfFileHeaderSignature = "DKIF" ) +var errInvalidMediaTimebase = errors.New("invalid media timebase") + // IVFWriter is used to take RTP packets and write them to an IVF on disk type IVFWriter struct { ioWriter io.Writer @@ -90,6 +92,10 @@ func NewWith(out io.Writer, opts ...Option) (*IVFWriter, error) { if err := writer.writeHeader(); err != nil { return nil, err } + + if writer.timebaseDenominator == 0 { + return nil, errInvalidMediaTimebase + } return writer, nil } From 803fd085aaeafecb01b9147f18ee0ff446ba7b5d Mon Sep 17 00:00:00 2001 From: Xavier Drudis Date: Thu, 2 Jan 2025 11:51:49 -0800 Subject: [PATCH 7/7] Defense against timebase with denominator 0 --- pkg/media/ivfreader/ivfreader.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/media/ivfreader/ivfreader.go b/pkg/media/ivfreader/ivfreader.go index 04a5df1bfe0..4f473eceee9 100644 --- a/pkg/media/ivfreader/ivfreader.go +++ b/pkg/media/ivfreader/ivfreader.go @@ -24,6 +24,7 @@ var ( errIncompleteFileHeader = errors.New("incomplete file header") errSignatureMismatch = errors.New("IVF signature mismatch") errUnknownIVFVersion = errors.New("IVF version unknown, parser may not parse correctly") + errInvalidMediaTimebase = errors.New("invalid media timebase") ) // IVFFileHeader 32-byte header for IVF files @@ -71,6 +72,9 @@ func NewWith(in io.Reader) (*IVFReader, *IVFFileHeader, error) { if err != nil { return nil, nil, err } + if header.TimebaseDenominator == 0 { + return nil, nil, errInvalidMediaTimebase + } reader.timebaseDenominator = header.TimebaseDenominator reader.timebaseNumerator = header.TimebaseNumerator