diff --git a/pkg/media/ivfreader/ivfreader.go b/pkg/media/ivfreader/ivfreader.go index 29bd7b34b70..4f473eceee9 100644 --- a/pkg/media/ivfreader/ivfreader.go +++ b/pkg/media/ivfreader/ivfreader.go @@ -24,6 +24,7 @@ var ( errIncompleteFileHeader = errors.New("incomplete file header") errSignatureMismatch = errors.New("IVF signature mismatch") errUnknownIVFVersion = errors.New("IVF version unknown, parser may not parse correctly") + errInvalidMediaTimebase = errors.New("invalid media timebase") ) // IVFFileHeader 32-byte header for IVF files @@ -52,6 +53,8 @@ type IVFFrameHeader struct { type IVFReader struct { stream io.Reader bytesReadSuccesfully int64 + timebaseDenominator uint32 + timebaseNumerator uint32 } // NewWith returns a new IVF reader and IVF file header @@ -69,6 +72,11 @@ func NewWith(in io.Reader) (*IVFReader, *IVFFileHeader, error) { if err != nil { return nil, nil, err } + if header.TimebaseDenominator == 0 { + return nil, nil, errInvalidMediaTimebase + } + reader.timebaseDenominator = header.TimebaseDenominator + reader.timebaseNumerator = header.TimebaseNumerator return reader, header, nil } @@ -80,6 +88,10 @@ func (i *IVFReader) ResetReader(reset func(bytesRead int64) io.Reader) { i.stream = reset(i.bytesReadSuccesfully) } +func (i *IVFReader) ptsToTimestamp(pts uint64) uint64 { + return pts * uint64(i.timebaseDenominator) / uint64(i.timebaseNumerator) +} + // ParseNextFrame reads from stream and returns IVF frame payload, header, // and an error if there is incomplete frame data. // Returns all nil values when no more frames are available. @@ -95,9 +107,10 @@ func (i *IVFReader) ParseNextFrame() ([]byte, *IVFFrameHeader, error) { return nil, nil, err } + pts := binary.LittleEndian.Uint64(buffer[4:12]) header = &IVFFrameHeader{ FrameSize: binary.LittleEndian.Uint32(buffer[:4]), - Timestamp: binary.LittleEndian.Uint64(buffer[4:12]), + Timestamp: i.ptsToTimestamp(pts), } payload := make([]byte, header.FrameSize) diff --git a/pkg/media/ivfwriter/ivfwriter.go b/pkg/media/ivfwriter/ivfwriter.go index 6af42ce5414..156d5b54f00 100644 --- a/pkg/media/ivfwriter/ivfwriter.go +++ b/pkg/media/ivfwriter/ivfwriter.go @@ -29,6 +29,8 @@ const ( ivfFileHeaderSignature = "DKIF" ) +var errInvalidMediaTimebase = errors.New("invalid media timebase") + // IVFWriter is used to take RTP packets and write them to an IVF on disk type IVFWriter struct { ioWriter io.Writer @@ -37,6 +39,11 @@ type IVFWriter struct { isVP8, isAV1 bool + timebaseDenominator uint32 + timebaseNumerator uint32 + firstFrameTimestamp uint32 + clockRate uint64 + // VP8 currentFrame []byte @@ -65,8 +72,11 @@ func NewWith(out io.Writer, opts ...Option) (*IVFWriter, error) { } writer := &IVFWriter{ - ioWriter: out, - seenKeyFrame: false, + ioWriter: out, + seenKeyFrame: false, + timebaseDenominator: 30, + timebaseNumerator: 1, + clockRate: 90000, } for _, o := range opts { @@ -82,6 +92,10 @@ func NewWith(out io.Writer, opts ...Option) (*IVFWriter, error) { if err := writer.writeHeader(); err != nil { return nil, err } + + if writer.timebaseDenominator == 0 { + return nil, errInvalidMediaTimebase + } return writer, nil } @@ -98,21 +112,25 @@ func (i *IVFWriter) writeHeader() error { copy(header[8:], "AV01") } - binary.LittleEndian.PutUint16(header[12:], 640) // Width in pixels - binary.LittleEndian.PutUint16(header[14:], 480) // Height in pixels - binary.LittleEndian.PutUint32(header[16:], 30) // Framerate denominator - binary.LittleEndian.PutUint32(header[20:], 1) // Framerate numerator - binary.LittleEndian.PutUint32(header[24:], 900) // Frame count, will be updated on first Close() call - binary.LittleEndian.PutUint32(header[28:], 0) // Unused + binary.LittleEndian.PutUint16(header[12:], 640) // Width in pixels + binary.LittleEndian.PutUint16(header[14:], 480) // Height in pixels + binary.LittleEndian.PutUint32(header[16:], i.timebaseDenominator) // Framerate denominator + binary.LittleEndian.PutUint32(header[20:], i.timebaseNumerator) // Framerate numerator + binary.LittleEndian.PutUint32(header[24:], 900) // Frame count, will be updated on first Close() call + binary.LittleEndian.PutUint32(header[28:], 0) // Unused _, err := i.ioWriter.Write(header) return err } +func (i *IVFWriter) timestampToPts(timestamp uint64) uint64 { + return timestamp * uint64(i.timebaseNumerator) / uint64(i.timebaseDenominator) +} + func (i *IVFWriter) writeFrame(frame []byte, timestamp uint64) error { frameHeader := make([]byte, 12) - binary.LittleEndian.PutUint32(frameHeader[0:], uint32(len(frame))) // Frame length - binary.LittleEndian.PutUint64(frameHeader[4:], timestamp) // PTS + binary.LittleEndian.PutUint32(frameHeader[0:], uint32(len(frame))) // Frame length + binary.LittleEndian.PutUint64(frameHeader[4:], i.timestampToPts(timestamp)) // PTS i.count++ if _, err := i.ioWriter.Write(frameHeader); err != nil { @@ -130,6 +148,11 @@ func (i *IVFWriter) WriteRTP(packet *rtp.Packet) error { return nil } + if i.count == 0 { + i.firstFrameTimestamp = packet.Header.Timestamp + } + relativeTstampMs := 1000 * uint64(packet.Header.Timestamp-i.firstFrameTimestamp) / i.clockRate + if i.isVP8 { vp8Packet := codecs.VP8Packet{} if _, err := vp8Packet.Unmarshal(packet.Payload); err != nil { @@ -153,7 +176,7 @@ func (i *IVFWriter) WriteRTP(packet *rtp.Packet) error { return nil } - if err := i.writeFrame(i.currentFrame, uint64(packet.Header.Timestamp)); err != nil { + if err := i.writeFrame(i.currentFrame, relativeTstampMs); err != nil { return err } i.currentFrame = nil @@ -169,7 +192,7 @@ func (i *IVFWriter) WriteRTP(packet *rtp.Packet) error { } for j := range obus { - if err := i.writeFrame(obus[j], uint64(packet.Header.Timestamp)); err != nil { + if err := i.writeFrame(obus[j], relativeTstampMs); err != nil { return err } } diff --git a/pkg/media/ivfwriter/ivfwriter_test.go b/pkg/media/ivfwriter/ivfwriter_test.go index 1d7e5b4eaf8..8939543636c 100644 --- a/pkg/media/ivfwriter/ivfwriter_test.go +++ b/pkg/media/ivfwriter/ivfwriter_test.go @@ -5,7 +5,6 @@ package ivfwriter import ( "bytes" - "encoding/binary" "io" "testing" @@ -258,25 +257,19 @@ func TestIVFWriter_AV1(t *testing.T) { t.Run("Unfragmented", func(t *testing.T) { buffer := &bytes.Buffer{} - expectedTimestamp := uint32(3653407706) + writer, err := NewWith(buffer, WithCodec(mimeTypeAV1)) + assert.NoError(t, err) - // the timestamp is an uint32, 4 bytes from offset 36 - expectedPayloadWithTimestamp := []byte{ + assert.NoError(t, writer.WriteRTP(&rtp.Packet{Payload: []byte{0x00, 0x01, 0xFF}})) + assert.NoError(t, writer.Close()) + assert.Equal(t, buffer.Bytes(), []byte{ 0x44, 0x4b, 0x49, 0x46, 0x0, 0x0, 0x20, 0x0, 0x41, 0x56, 0x30, 0x31, 0x80, 0x2, 0xe0, 0x1, 0x1e, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x84, 0x3, 0x0, 0x0, 0x0, 0x0, - 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0xda, 0x93, 0xc2, - 0xd9, 0x0, 0x0, 0x0, 0x0, 0xff, - } - - writer, err := NewWith(buffer, WithCodec(mimeTypeAV1)) - assert.NoError(t, err) - - assert.NoError(t, writer.WriteRTP(&rtp.Packet{Header: rtp.Header{Timestamp: expectedTimestamp}, Payload: []byte{0x00, 0x01, 0xFF}})) - assert.NoError(t, writer.Close()) - assert.Equal(t, expectedPayloadWithTimestamp, buffer.Bytes()) - assert.Equal(t, expectedTimestamp, binary.LittleEndian.Uint32(expectedPayloadWithTimestamp[36:40])) + 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, + }) }) t.Run("Fragmented", func(t *testing.T) {